Skip to content

Commit

Permalink
fix auto.offset.reset bug while consuming messages using KafkaGroupIO…
Browse files Browse the repository at this point in the history
…Dataset (#1134)

* fix auto.offset.reset bug while consuming messages

* update docstrings
  • Loading branch information
kvignesh1420 authored Sep 26, 2020
1 parent 8791d8a commit 9e98604
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 32 deletions.
40 changes: 28 additions & 12 deletions tensorflow_io/core/kernels/kafka_kernels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -783,17 +783,22 @@ class KafkaRebalanceCb : public RdKafka::RebalanceCb {
// RD_KAFKA_OFFSET_END -1
// RD_KAFKA_OFFSET_STORED -1000
// RD_KAFKA_OFFSET_INVALID -1001
if (partitions[partition]->offset() == -1001) {
LOG(INFO)
<< "The consumer group was newly created, reading from beginning";
partitions[partition]->set_offset(RdKafka::Topic::OFFSET_BEGINNING);
}

LOG(INFO) << "REBALANCE: " << partitions[partition]->topic() << "["
<< partitions[partition]->partition() << "], "
<< partitions[partition]->offset() << " "
<< partitions[partition]->err();
}
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
// librdkafka does not actually look up the stored offsets before
// calling your rebalance callback, the partition offsets are set to
// RD_KAFKA_OFFSET_INVALID at this point to allow us to change it to use
// some sort of external offset store. But calling assign() with offset
// RD_KAFKA_OFFSET_INVALID will cause librdkafka to look up the stored
// offset on the broker.
// If there was no stored offset it will fall back to `auto.offset.reset`
// configuration parameter.

LOG(INFO) << "REBALANCE: Assigning partitions";
consumer->assign(partitions);
partition_count = (int)partitions.size();
Expand Down Expand Up @@ -832,6 +837,9 @@ class KafkaGroupReadableResource : public ResourceBase {

string errstr;
RdKafka::Conf::ConfResult result = RdKafka::Conf::CONF_UNKNOWN;

// The default kafka topic configurations are set first before
// setting the global confs
for (size_t i = 0; i < metadata.size(); i++) {
if (metadata[i].find("conf.topic.") == 0) {
std::vector<string> parts = str_util::Split(metadata[i], "=");
Expand All @@ -844,8 +852,20 @@ class KafkaGroupReadableResource : public ResourceBase {
return errors::Internal("failed to do topic configuration:",
metadata[i], "error:", errstr);
}
} else if (metadata[i] != "" &&
metadata[i].find("conf.") == string::npos) {
LOG(INFO) << "Kafka configuration: " << metadata[i];
}
}
if ((result = conf->set("default_topic_conf", conf_topic.get(), errstr)) !=
RdKafka::Conf::CONF_OK) {
return errors::Internal("failed to set default_topic_conf:", errstr);
}

// Once the `default_topic_conf` is set, the global confs can now be set
// without any risk of being overwritten.
// Setting the global confs before setting the `default_topic_conf`
// results in erratic behaviour.
for (size_t i = 0; i < metadata.size(); i++) {
if (metadata[i] != "" && metadata[i].find("conf.") == string::npos) {
std::vector<string> parts = str_util::Split(metadata[i], "=");
if (parts.size() != 2) {
return errors::InvalidArgument("invalid topic configuration: ",
Expand All @@ -856,12 +876,8 @@ class KafkaGroupReadableResource : public ResourceBase {
return errors::Internal("failed to do global configuration: ",
metadata[i], "error:", errstr);
}
LOG(INFO) << "Kafka configuration: " << metadata[i];
}
LOG(INFO) << "Kafka configuration: " << metadata[i];
}
if ((result = conf->set("default_topic_conf", conf_topic.get(), errstr)) !=
RdKafka::Conf::CONF_OK) {
return errors::Internal("failed to set default_topic_conf:", errstr);
}

// default consumer.properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,24 @@ class KafkaBatchIODataset(tf.data.Dataset):
>>> dataset = tfio.experimental.streaming.KafkaBatchIODataset(
topics=["topic1"],
group_id="cg",
servers="localhost:9092"
servers="localhost:9092",
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
],
)
>>> for mini_batch in dataset:
... mini_batch = mini_batch.map(
... lambda m, k: (tf.cast(m, tf.float32), tf.cast(k, tf.float32)))
Since `mini_batch` is of type `tf.data.Dataset` we can perform all the operations that it
inherits from `tf.data.Dataset`.
inherits from `tf.data.Dataset`. Also, the `auto.offset.reset` configuration is set to
`earliest` so that in case the consumer group is being newly created, it will start
reading the messages from the beginning. If it is not set, it defaults to `latest`.
For additional configurations, please refer the librdkafka's configurations:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
To train a keras model on this stream of incoming data:
Expand Down Expand Up @@ -75,22 +84,23 @@ def __init__(
group_id: The id of the consumer group. For example: cgstream
servers: An optional list of bootstrap servers.
For example: `localhost:9092`.
stream_timeout: An optional timeout value (in milliseconds) to wait for
stream_timeout: An optional timeout value (in milliseconds) to wait for
the new messages from kafka to be retrieved by the consumers.
By default it is set to -1 to block indefinitely.
message_poll_timeout: An optional timeout duration (in milliseconds)
after which the kafka consumer throws a timeout error while fetching
a single message. This value also represents the intervals at which
the kafka topic(s) are polled for new messages while using the `stream_timeout`.
the kafka topic(s) are polled for new messages while using the `stream_timeout`.
configuration: An optional `tf.string` tensor containing
configurations in [Key=Value] format.
Global configuration: please refer to 'Global configuration properties'
in librdkafka doc. Examples include
["enable.auto.commit=false", "heartbeat.interval.ms=2000"]
Topic configuration: please refer to 'Topic configuration properties'
in librdkafka doc. Note all topic configurations should be
prefixed with `configuration.topic.`. Examples include
prefixed with `conf.topic.`. Examples include
["conf.topic.auto.offset.reset=earliest"]
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
internal: Whether the dataset is being created from within the named scope.
Default: True
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,16 @@ class KafkaGroupIODataset(tf.data.Dataset):
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
],
)
In the above example, the `auto.offset.reset` configuration is set to `earliest` so that
in case the consumer group is being newly created, it will start reading the messages from
the beginning. If it is not set, it defaults to `latest`. For additional configurations,
please refer the librdkafka's configurations:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
In addition to the standard streaming functionality, there is added support for a timeout
based stream. Once the existing data has been fetched, this dataset will block for
an additional `stream_timeout` milliseconds, for the new messages to be captured.
Expand All @@ -78,6 +85,7 @@ class KafkaGroupIODataset(tf.data.Dataset):
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
],
)
>>> for (message, key) in dataset:
Expand All @@ -90,7 +98,7 @@ class KafkaGroupIODataset(tf.data.Dataset):
As the kafka deployments vary in configuration as per various use-cases, the time required for
the consumers to fetch a single message might also vary. This timeout value can be adjusted
using the `message_poll_timeout` parameter.
The `message_poll_timeout` value represents the duration which the consumers
have to wait while fetching a new message. However, even if we receive a new message
before the `message_poll_timeout` interval finishes, the consumer doesn't resume the
Expand All @@ -115,11 +123,11 @@ def __init__(
"""
Args:
topics: A `tf.string` tensor containing topic names in [topic] format.
For example: ["topic1"]
For example: ["topic1", "topic2"]
group_id: The id of the consumer group. For example: cgstream
servers: An optional list of bootstrap servers.
For example: `localhost:9092`.
stream_timeout: An optional timeout duration (in milliseconds) to block until
stream_timeout: An optional timeout duration (in milliseconds) to block until
the new messages from kafka are fetched.
By default it is set to 0 milliseconds and doesn't block for new messages.
To block indefinitely, set it to -1.
Expand All @@ -134,8 +142,9 @@ def __init__(
["enable.auto.commit=false", "heartbeat.interval.ms=2000"]
Topic configuration: please refer to 'Topic configuration properties'
in librdkafka doc. Note all topic configurations should be
prefixed with `configuration.topic.`. Examples include
prefixed with `conf.topic.`. Examples include
["conf.topic.auto.offset.reset=earliest"]
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
internal: Whether the dataset is being created from within the named scope.
Default: True
"""
Expand Down
6 changes: 4 additions & 2 deletions tensorflow_io/core/python/ops/io_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ def from_kafka(
["enable.auto.commit=false", "heartbeat.interval.ms=2000"]
Topic configuration: please refer to 'Topic configuration properties'
in librdkafka doc. Note all topic configurations should be
prefixed with `configuration.topic.`. Examples include
prefixed with `conf.topic.`. Examples include
["conf.topic.auto.offset.reset=earliest"]
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
name: A name prefix for the IODataset (optional).
Returns:
Expand Down Expand Up @@ -361,8 +362,9 @@ def from_kafka(
["enable.auto.commit=false", "heartbeat.interval.ms=2000"]
Topic configuration: please refer to 'Topic configuration properties'
in librdkafka doc. Note all topic configurations should be
prefixed with `configuration.topic.`. Examples include
prefixed with `conf.topic.`. Examples include
["conf.topic.auto.offset.reset=earliest"]
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
name: A name prefix for the IODataset (optional).
Returns:
Expand Down
6 changes: 4 additions & 2 deletions tensorflow_io/core/python/ops/kafka_dataset_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ def __init__(
["enable.auto.commit=false", "heartbeat.interval.ms=2000"]
Topic configuration: please refer to 'Topic configuration properties'
in librdkafka doc. Note all topic configurations should be
prefixed with `configuration.topic.`. Examples include
prefixed with `conf.topic.`. Examples include
["conf.topic.auto.offset.reset=earliest"]
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
internal: Whether the dataset is being created from within the named scope.
Default: True
"""
Expand Down Expand Up @@ -106,8 +107,9 @@ def __init__(self, topic, partition, offset, servers, configuration, internal=Tr
["enable.auto.commit=false", "heartbeat.interval.ms=2000"]
Topic configuration: please refer to 'Topic configuration properties'
in librdkafka doc. Note all topic configurations should be
prefixed with `configuration.topic.`. Examples include
prefixed with `conf.topic.`. Examples include
["conf.topic.auto.offset.reset=earliest"]
Reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
internal: Whether the dataset is being created from within the named scope.
Default: True
"""
Expand Down
97 changes: 90 additions & 7 deletions tests/test_kafka_eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,11 @@ def test_kafka_group_io_dataset_primary_cg():
topics=["key-partition-test"],
group_id="cgtestprimary",
servers="localhost:9092",
configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"],
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
],
)
assert np.all(
sorted([k.numpy() for (k, _) in dataset])
Expand Down Expand Up @@ -238,7 +242,11 @@ def test_kafka_group_io_dataset_primary_cg_new_topic():
topics=["key-test"],
group_id="cgtestprimary",
servers="localhost:9092",
configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"],
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
],
)
assert np.all(
sorted([k.numpy() for (k, _) in dataset])
Expand Down Expand Up @@ -302,7 +310,11 @@ def test_kafka_group_io_dataset_secondary_cg():
topics=["key-partition-test"],
group_id="cgtestsecondary",
servers="localhost:9092",
configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"],
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
],
)
assert np.all(
sorted([k.numpy() for (k, _) in dataset])
Expand All @@ -319,14 +331,77 @@ def test_kafka_group_io_dataset_tertiary_cg_multiple_topics():
topics=["key-partition-test", "key-test"],
group_id="cgtesttertiary",
servers="localhost:9092",
configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"],
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
],
)
assert np.all(
sorted([k.numpy() for (k, _) in dataset])
== sorted([("D" + str(i)).encode() for i in range(100)] * 2)
)


def test_kafka_group_io_dataset_auto_offset_reset():
"""Test the functionality of the `auto.offset.reset` configuration
at global and topic level"""

dataset = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["key-partition-test"],
group_id="cgglobaloffsetearliest",
servers="localhost:9092",
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
],
)
assert np.all(
sorted([k.numpy() for (k, _) in dataset])
== sorted([("D" + str(i)).encode() for i in range(100)])
)

dataset = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["key-partition-test"],
group_id="cgglobaloffsetlatest",
servers="localhost:9092",
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=latest",
],
)
assert np.all(sorted([k.numpy() for (k, _) in dataset]) == [])

dataset = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["key-partition-test"],
group_id="cgtopicoffsetearliest",
servers="localhost:9092",
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"conf.topic.auto.offset.reset=earliest",
],
)
assert np.all(
sorted([k.numpy() for (k, _) in dataset])
== sorted([("D" + str(i)).encode() for i in range(100)])
)

dataset = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["key-partition-test"],
group_id="cgtopicoffsetlatest",
servers="localhost:9092",
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"conf.topic.auto.offset.reset=latest",
],
)
assert np.all(sorted([k.numpy() for (k, _) in dataset]) == [])


def test_kafka_group_io_dataset_invalid_stream_timeout():
"""Test the functionality of the KafkaGroupIODataset when the
consumer is configured to have an invalid stream_timeout value which is
Expand Down Expand Up @@ -370,7 +445,11 @@ def write_messages_background():
group_id="cgteststreamvalid",
servers="localhost:9092",
stream_timeout=20000,
configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"],
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
],
)

# start writing the new messages to kafka using the background job.
Expand All @@ -394,15 +473,19 @@ def test_kafka_batch_io_dataset():
online-training fashion.
NOTE: This kind of dataset is suitable in scenarios where the 'keys' of 'messages'
act as labels.
act as labels. If not, additional transformations are required.
"""

dataset = tfio.experimental.streaming.KafkaBatchIODataset(
topics=["mini-batch-test"],
group_id="cgminibatch",
servers=None,
stream_timeout=5000,
configuration=["session.timeout.ms=7000", "max.poll.interval.ms=8000"],
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest",
],
)

NUM_COLUMNS = 1
Expand Down

0 comments on commit 9e98604

Please sign in to comment.