diff --git a/tensorflow_io/core/kernels/kafka_kernels.cc b/tensorflow_io/core/kernels/kafka_kernels.cc index 74a4a3037..2d03a0883 100644 --- a/tensorflow_io/core/kernels/kafka_kernels.cc +++ b/tensorflow_io/core/kernels/kafka_kernels.cc @@ -783,17 +783,22 @@ class KafkaRebalanceCb : public RdKafka::RebalanceCb { // RD_KAFKA_OFFSET_END -1 // RD_KAFKA_OFFSET_STORED -1000 // RD_KAFKA_OFFSET_INVALID -1001 - if (partitions[partition]->offset() == -1001) { - LOG(INFO) - << "The consumer group was newly created, reading from beginning"; - partitions[partition]->set_offset(RdKafka::Topic::OFFSET_BEGINNING); - } + LOG(INFO) << "REBALANCE: " << partitions[partition]->topic() << "[" << partitions[partition]->partition() << "], " << partitions[partition]->offset() << " " << partitions[partition]->err(); } if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { + // librdkafka does not actually look up the stored offsets before + // calling your rebalance callback, the partition offsets are set to + // RD_KAFKA_OFFSET_INVALID at this point to allow us to change it to use + // some sort of external offset store. But calling assign() with offset + // RD_KAFKA_OFFSET_INVALID will cause librdkafka to look up the stored + // offset on the broker. + // If there was no stored offset it will fall back to `auto.offset.reset` + // configuration parameter. + LOG(INFO) << "REBALANCE: Assigning partitions"; consumer->assign(partitions); partition_count = (int)partitions.size(); @@ -832,6 +837,9 @@ class KafkaGroupReadableResource : public ResourceBase { string errstr; RdKafka::Conf::ConfResult result = RdKafka::Conf::CONF_UNKNOWN; + + // The default kafka topic configurations are set first before + // setting the global confs for (size_t i = 0; i < metadata.size(); i++) { if (metadata[i].find("conf.topic.") == 0) { std::vector parts = str_util::Split(metadata[i], "="); @@ -844,8 +852,20 @@ class KafkaGroupReadableResource : public ResourceBase { return errors::Internal("failed to do topic configuration:", metadata[i], "error:", errstr); } - } else if (metadata[i] != "" && - metadata[i].find("conf.") == string::npos) { + LOG(INFO) << "Kafka configuration: " << metadata[i]; + } + } + if ((result = conf->set("default_topic_conf", conf_topic.get(), errstr)) != + RdKafka::Conf::CONF_OK) { + return errors::Internal("failed to set default_topic_conf:", errstr); + } + + // Once the `default_topic_conf` is set, the global confs can now be set + // without any risk of being overwritten. + // Setting the global confs before setting the `default_topic_conf` + // results in erratic behaviour. + for (size_t i = 0; i < metadata.size(); i++) { + if (metadata[i] != "" && metadata[i].find("conf.") == string::npos) { std::vector parts = str_util::Split(metadata[i], "="); if (parts.size() != 2) { return errors::InvalidArgument("invalid topic configuration: ", @@ -856,12 +876,8 @@ class KafkaGroupReadableResource : public ResourceBase { return errors::Internal("failed to do global configuration: ", metadata[i], "error:", errstr); } + LOG(INFO) << "Kafka configuration: " << metadata[i]; } - LOG(INFO) << "Kafka configuration: " << metadata[i]; - } - if ((result = conf->set("default_topic_conf", conf_topic.get(), errstr)) != - RdKafka::Conf::CONF_OK) { - return errors::Internal("failed to set default_topic_conf:", errstr); } // default consumer.properties: diff --git a/tensorflow_io/core/python/experimental/kafka_batch_io_dataset_ops.py b/tensorflow_io/core/python/experimental/kafka_batch_io_dataset_ops.py index a6d08d6d5..01bf3b31c 100644 --- a/tensorflow_io/core/python/experimental/kafka_batch_io_dataset_ops.py +++ b/tensorflow_io/core/python/experimental/kafka_batch_io_dataset_ops.py @@ -38,7 +38,12 @@ class KafkaBatchIODataset(tf.data.Dataset): >>> dataset = tfio.experimental.streaming.KafkaBatchIODataset( topics=["topic1"], group_id="cg", - servers="localhost:9092" + servers="localhost:9092", + configuration=[ + "session.timeout.ms=7000", + "max.poll.interval.ms=8000", + "auto.offset.reset=earliest", + ], ) >>> for mini_batch in dataset: @@ -46,7 +51,11 @@ class KafkaBatchIODataset(tf.data.Dataset): ... lambda m, k: (tf.cast(m, tf.float32), tf.cast(k, tf.float32))) Since `mini_batch` is of type `tf.data.Dataset` we can perform all the operations that it - inherits from `tf.data.Dataset`. + inherits from `tf.data.Dataset`. Also, the `auto.offset.reset` configuration is set to + `earliest` so that in case the consumer group is being newly created, it will start + reading the messages from the beginning. If it is not set, it defaults to `latest`. + For additional configurations, please refer the librdkafka's configurations: + https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md To train a keras model on this stream of incoming data: @@ -75,13 +84,13 @@ def __init__( group_id: The id of the consumer group. For example: cgstream servers: An optional list of bootstrap servers. For example: `localhost:9092`. - stream_timeout: An optional timeout value (in milliseconds) to wait for + stream_timeout: An optional timeout value (in milliseconds) to wait for the new messages from kafka to be retrieved by the consumers. By default it is set to -1 to block indefinitely. message_poll_timeout: An optional timeout duration (in milliseconds) after which the kafka consumer throws a timeout error while fetching a single message. This value also represents the intervals at which - the kafka topic(s) are polled for new messages while using the `stream_timeout`. + the kafka topic(s) are polled for new messages while using the `stream_timeout`. configuration: An optional `tf.string` tensor containing configurations in [Key=Value] format. Global configuration: please refer to 'Global configuration properties' @@ -89,8 +98,9 @@ def __init__( ["enable.auto.commit=false", "heartbeat.interval.ms=2000"] Topic configuration: please refer to 'Topic configuration properties' in librdkafka doc. Note all topic configurations should be - prefixed with `configuration.topic.`. Examples include + prefixed with `conf.topic.`. Examples include ["conf.topic.auto.offset.reset=earliest"] + Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md internal: Whether the dataset is being created from within the named scope. Default: True """ diff --git a/tensorflow_io/core/python/experimental/kafka_group_io_dataset_ops.py b/tensorflow_io/core/python/experimental/kafka_group_io_dataset_ops.py index 47a628b14..3ced1b3f9 100644 --- a/tensorflow_io/core/python/experimental/kafka_group_io_dataset_ops.py +++ b/tensorflow_io/core/python/experimental/kafka_group_io_dataset_ops.py @@ -63,9 +63,16 @@ class KafkaGroupIODataset(tf.data.Dataset): configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", + "auto.offset.reset=earliest", ], ) + In the above example, the `auto.offset.reset` configuration is set to `earliest` so that + in case the consumer group is being newly created, it will start reading the messages from + the beginning. If it is not set, it defaults to `latest`. For additional configurations, + please refer the librdkafka's configurations: + https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + In addition to the standard streaming functionality, there is added support for a timeout based stream. Once the existing data has been fetched, this dataset will block for an additional `stream_timeout` milliseconds, for the new messages to be captured. @@ -78,6 +85,7 @@ class KafkaGroupIODataset(tf.data.Dataset): configuration=[ "session.timeout.ms=7000", "max.poll.interval.ms=8000", + "auto.offset.reset=earliest", ], ) >>> for (message, key) in dataset: @@ -90,7 +98,7 @@ class KafkaGroupIODataset(tf.data.Dataset): As the kafka deployments vary in configuration as per various use-cases, the time required for the consumers to fetch a single message might also vary. This timeout value can be adjusted using the `message_poll_timeout` parameter. - + The `message_poll_timeout` value represents the duration which the consumers have to wait while fetching a new message. However, even if we receive a new message before the `message_poll_timeout` interval finishes, the consumer doesn't resume the @@ -115,11 +123,11 @@ def __init__( """ Args: topics: A `tf.string` tensor containing topic names in [topic] format. - For example: ["topic1"] + For example: ["topic1", "topic2"] group_id: The id of the consumer group. For example: cgstream servers: An optional list of bootstrap servers. For example: `localhost:9092`. - stream_timeout: An optional timeout duration (in milliseconds) to block until + stream_timeout: An optional timeout duration (in milliseconds) to block until the new messages from kafka are fetched. By default it is set to 0 milliseconds and doesn't block for new messages. To block indefinitely, set it to -1. @@ -134,8 +142,9 @@ def __init__( ["enable.auto.commit=false", "heartbeat.interval.ms=2000"] Topic configuration: please refer to 'Topic configuration properties' in librdkafka doc. Note all topic configurations should be - prefixed with `configuration.topic.`. Examples include + prefixed with `conf.topic.`. Examples include ["conf.topic.auto.offset.reset=earliest"] + Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md internal: Whether the dataset is being created from within the named scope. Default: True """ diff --git a/tensorflow_io/core/python/ops/io_dataset.py b/tensorflow_io/core/python/ops/io_dataset.py index e241436ea..84720994d 100644 --- a/tensorflow_io/core/python/ops/io_dataset.py +++ b/tensorflow_io/core/python/ops/io_dataset.py @@ -144,8 +144,9 @@ def from_kafka( ["enable.auto.commit=false", "heartbeat.interval.ms=2000"] Topic configuration: please refer to 'Topic configuration properties' in librdkafka doc. Note all topic configurations should be - prefixed with `configuration.topic.`. Examples include + prefixed with `conf.topic.`. Examples include ["conf.topic.auto.offset.reset=earliest"] + Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md name: A name prefix for the IODataset (optional). Returns: @@ -361,8 +362,9 @@ def from_kafka( ["enable.auto.commit=false", "heartbeat.interval.ms=2000"] Topic configuration: please refer to 'Topic configuration properties' in librdkafka doc. Note all topic configurations should be - prefixed with `configuration.topic.`. Examples include + prefixed with `conf.topic.`. Examples include ["conf.topic.auto.offset.reset=earliest"] + Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md name: A name prefix for the IODataset (optional). Returns: diff --git a/tensorflow_io/core/python/ops/kafka_dataset_ops.py b/tensorflow_io/core/python/ops/kafka_dataset_ops.py index f66dfd589..e41e52c24 100644 --- a/tensorflow_io/core/python/ops/kafka_dataset_ops.py +++ b/tensorflow_io/core/python/ops/kafka_dataset_ops.py @@ -40,8 +40,9 @@ def __init__( ["enable.auto.commit=false", "heartbeat.interval.ms=2000"] Topic configuration: please refer to 'Topic configuration properties' in librdkafka doc. Note all topic configurations should be - prefixed with `configuration.topic.`. Examples include + prefixed with `conf.topic.`. Examples include ["conf.topic.auto.offset.reset=earliest"] + Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md internal: Whether the dataset is being created from within the named scope. Default: True """ @@ -106,8 +107,9 @@ def __init__(self, topic, partition, offset, servers, configuration, internal=Tr ["enable.auto.commit=false", "heartbeat.interval.ms=2000"] Topic configuration: please refer to 'Topic configuration properties' in librdkafka doc. Note all topic configurations should be - prefixed with `configuration.topic.`. Examples include + prefixed with `conf.topic.`. Examples include ["conf.topic.auto.offset.reset=earliest"] + Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md internal: Whether the dataset is being created from within the named scope. Default: True """ diff --git a/tests/test_kafka_eager.py b/tests/test_kafka_eager.py index 2406e37ff..06e82b12a 100644 --- a/tests/test_kafka_eager.py +++ b/tests/test_kafka_eager.py @@ -209,7 +209,11 @@ def test_kafka_group_io_dataset_primary_cg(): topics=["key-partition-test"], group_id="cgtestprimary", servers="localhost:9092", - configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"], + configuration=[ + "session.timeout.ms=7000", + "max.poll.interval.ms=8000", + "auto.offset.reset=earliest", + ], ) assert np.all( sorted([k.numpy() for (k, _) in dataset]) @@ -238,7 +242,11 @@ def test_kafka_group_io_dataset_primary_cg_new_topic(): topics=["key-test"], group_id="cgtestprimary", servers="localhost:9092", - configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"], + configuration=[ + "session.timeout.ms=7000", + "max.poll.interval.ms=8000", + "auto.offset.reset=earliest", + ], ) assert np.all( sorted([k.numpy() for (k, _) in dataset]) @@ -302,7 +310,11 @@ def test_kafka_group_io_dataset_secondary_cg(): topics=["key-partition-test"], group_id="cgtestsecondary", servers="localhost:9092", - configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"], + configuration=[ + "session.timeout.ms=7000", + "max.poll.interval.ms=8000", + "auto.offset.reset=earliest", + ], ) assert np.all( sorted([k.numpy() for (k, _) in dataset]) @@ -319,7 +331,11 @@ def test_kafka_group_io_dataset_tertiary_cg_multiple_topics(): topics=["key-partition-test", "key-test"], group_id="cgtesttertiary", servers="localhost:9092", - configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"], + configuration=[ + "session.timeout.ms=7000", + "max.poll.interval.ms=8000", + "auto.offset.reset=earliest", + ], ) assert np.all( sorted([k.numpy() for (k, _) in dataset]) @@ -327,6 +343,65 @@ def test_kafka_group_io_dataset_tertiary_cg_multiple_topics(): ) +def test_kafka_group_io_dataset_auto_offset_reset(): + """Test the functionality of the `auto.offset.reset` configuration + at global and topic level""" + + dataset = tfio.experimental.streaming.KafkaGroupIODataset( + topics=["key-partition-test"], + group_id="cgglobaloffsetearliest", + servers="localhost:9092", + configuration=[ + "session.timeout.ms=7000", + "max.poll.interval.ms=8000", + "auto.offset.reset=earliest", + ], + ) + assert np.all( + sorted([k.numpy() for (k, _) in dataset]) + == sorted([("D" + str(i)).encode() for i in range(100)]) + ) + + dataset = tfio.experimental.streaming.KafkaGroupIODataset( + topics=["key-partition-test"], + group_id="cgglobaloffsetlatest", + servers="localhost:9092", + configuration=[ + "session.timeout.ms=7000", + "max.poll.interval.ms=8000", + "auto.offset.reset=latest", + ], + ) + assert np.all(sorted([k.numpy() for (k, _) in dataset]) == []) + + dataset = tfio.experimental.streaming.KafkaGroupIODataset( + topics=["key-partition-test"], + group_id="cgtopicoffsetearliest", + servers="localhost:9092", + configuration=[ + "session.timeout.ms=7000", + "max.poll.interval.ms=8000", + "conf.topic.auto.offset.reset=earliest", + ], + ) + assert np.all( + sorted([k.numpy() for (k, _) in dataset]) + == sorted([("D" + str(i)).encode() for i in range(100)]) + ) + + dataset = tfio.experimental.streaming.KafkaGroupIODataset( + topics=["key-partition-test"], + group_id="cgtopicoffsetlatest", + servers="localhost:9092", + configuration=[ + "session.timeout.ms=7000", + "max.poll.interval.ms=8000", + "conf.topic.auto.offset.reset=latest", + ], + ) + assert np.all(sorted([k.numpy() for (k, _) in dataset]) == []) + + def test_kafka_group_io_dataset_invalid_stream_timeout(): """Test the functionality of the KafkaGroupIODataset when the consumer is configured to have an invalid stream_timeout value which is @@ -370,7 +445,11 @@ def write_messages_background(): group_id="cgteststreamvalid", servers="localhost:9092", stream_timeout=20000, - configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"], + configuration=[ + "session.timeout.ms=7000", + "max.poll.interval.ms=8000", + "auto.offset.reset=earliest", + ], ) # start writing the new messages to kafka using the background job. @@ -394,7 +473,7 @@ def test_kafka_batch_io_dataset(): online-training fashion. NOTE: This kind of dataset is suitable in scenarios where the 'keys' of 'messages' - act as labels. + act as labels. If not, additional transformations are required. """ dataset = tfio.experimental.streaming.KafkaBatchIODataset( @@ -402,7 +481,11 @@ def test_kafka_batch_io_dataset(): group_id="cgminibatch", servers=None, stream_timeout=5000, - configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"], + configuration=[ + "session.timeout.ms=7000", + "max.poll.interval.ms=8000", + "auto.offset.reset=earliest", + ], ) NUM_COLUMNS = 1