Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Best approach to read messages from multiple topics or partitions #322

Closed
rathdeep opened this issue Jul 2, 2015 · 7 comments
Closed

Best approach to read messages from multiple topics or partitions #322

rathdeep opened this issue Jul 2, 2015 · 7 comments

Comments

@rathdeep
Copy link

rathdeep commented Jul 2, 2015

Hi,

I have few questions which I want to ask this community.

A>>>>>>>>>>>
I have gone through some threads on reading from different partitions of a topic.
Which approach is better:
1>Redirect messages from different partitions to the shared queue (rd_kafka_queue_new).
Or
2> Without shared queue instead directly calling rd_kafka_consume_start on each partition. Then register same callback method using rd_kafka_consume_callback for each partition.

B>>>>>>>>>>>
After running above scenario, I saw one set of TCP connections between consumer and each of the brokers of the topic. Basically, with creation of one Kafka handle via following API call
rd_kafka_new(RD_KAFKA_CONSUMER, conf,
errstr, sizeof(errstr)))
Using this Kafka handle, I consumed messages from all partitions of a topic.

If I want to read messages from another topic, is it possible to reuse the same Kafka handle created above (so that number of TCP connections are limited) ?
Creating a new Kafka handle per topic works.

C>>>>>>>>>>>
A generic question regarding Kafka. Do you know when a offset for a partition gets reset to 0.

Thanks in advance for your responses.

@rathdeep rathdeep changed the title Better approach to read from multiple topics or partitions Best approach to read messages from multiple topics or partitions Jul 2, 2015
@DEvil0000
Copy link
Contributor

I am not sure if I get you correct.
A: I use c++ API. But 2 sounds good to me.
B: I use c++ API so I let Kafka handle this - take a look at the code.
I think this depends on your setup: the number of partitions per topic and the replication factor. Or lets say it depends on which Broker is the leader for which partitions. AFAIK You have to be connected to the current Leader of the partition you want to consume or produce.
C: You can reset it manually when you start to consume with a consumer. Depending on your code/config new Consumers may start at 0 or at the current end of the topic. I am pretty sure at some point it has to reset because the bits to store the offset are not unlimited.
I hope this was still helpful ;)

@rathdeep
Copy link
Author

rathdeep commented Jul 2, 2015

Thanks DEvil0000.

Regarding B, i see TCP connection established between even consumers and brokers which are not leaders for the topic.

And I see all the C++ APIs are wrappers over C API, so behavior should be same whether we use C /C++.

My question B was if using same kafka handle i.e. "RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);" call in C++, can we consume messages from different topics. I am unable to figure out how to achieve this if at all possible. If yes, it will save on number of TCP connections consumer has to maintain. Would be useful if we want to read all data in one single thread.

Regarding question C, if max value of int64_t is exceeded, it might reset. But is there any other logic. Could not find any write-up on this, so was my question. Surely, Consumer has to manage offset on its side.

@edenhill
Copy link
Contributor

edenhill commented Jul 3, 2015

@rathdeep

A
If you dont use the queues you will need to call rd_kafka_consume*() for each topic+partition combo, this isn't very convenient and thats why the queue interface was added to provide polling for all started topic+partitions with one single ..consume_*_queue() call.
In other words: use the queue interface.

B
Yes, you can consume any number of topics and partitions from the same rd_kafka_t handle.
The same is true for produce.

C
A partition's offset is never reset to 0 (a 64-bit integer will not wrap).
The only occassion this occurs is if the broker is stopped and all its state files are removed and then restarted, but that is not recommended since existing consumers will loose their offset tracking.

@edenhill
Copy link
Contributor

edenhill commented Jul 3, 2015

The C++ interface is currently lagging behind somewhat and does not feature the queue interface.
It will be added in the future though.

@rathdeep
Copy link
Author

rathdeep commented Jul 3, 2015

Thanks a lot edenhill. I tried to consume multiple topics using same handle.

Modified rdkafka_example.c as follows, but getting core dump on second rd_kafka_topic_new. Will try to debug more.
(gdb) where
#0 0x000000343b078d80 in strlen () from /lib64/libc.so.6
#1 0x000000000041697d in rd_kafka_topic_new (rk=0x5c4a1c0, topic=, conf=0x5c4a140) at rdkafka_proto.h:152
#2 0x00000000004033c6 in main (argc=13, argv=) at rdkafka_example.c:598

While doing gdb, I see following code being run from multiple threads (2)
/////

    char* topic1 = NULL;

..................
case 'r':
topic1 = optarg;
break;
.....................
/* Create topic */
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
rkt1 = rd_kafka_topic_new(rk, topic1, topic_conf);  core dumps here line number 598

          /* Start consuming */
            if (rd_kafka_consume_start(rkt, partition, start_offset) == -1){
                    fprintf(stderr, "%% Failed to start consuming: %s\n",
                            rd_kafka_err2str(rd_kafka_errno2err(errno)));
                    exit(1);
            }

            if (rd_kafka_consume_start(rkt1, partition, start_offset) == -1){
                    fprintf(stderr, "%% Failed to start consuming: %s\n",
                            rd_kafka_err2str(rd_kafka_errno2err(errno)));
                    exit(1);
            }


            while (run) {
                    rd_kafka_message_t *rkmessage;

                    /* Consume single message.
                     * See rdkafka_performance.c for high speed
                     * consuming of messages. */
                    rkmessage = rd_kafka_consume(rkt, partition, 1000);
                    if (!rkmessage) /* timeout */
                            continue;

                    msg_consume(rkmessage, NULL);

                    rd_kafka_message_t *rkmessage1 = rd_kafka_consume(rkt1, partition, 1000);
                    if (!rkmessage1) /* timeout */
                            continue;

                    msg_consume(rkmessage1, NULL);


                    /* Return message to rdkafka */
                    rd_kafka_message_destroy(rkmessage);
                    rd_kafka_message_destroy(rkmessage1);
            }

@yungchin
Copy link

yungchin commented Jul 4, 2015

On 3 July 2015 at 18:05, rathdeep [email protected] wrote:

/* Create topic */

rkt = rd_kafka_topic_new(rk, topic, topic_conf);
rkt1 = rd_kafka_topic_new(rk, topic1, topic_conf);  core dumps here line
number 598

I think this happens because rd_kafka_topic_new frees topic_conf, so you
have to duplicate that if you want to use it twice

@rathdeep
Copy link
Author

rathdeep commented Jul 6, 2015

Thanks yungchin.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants