Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messages are no longer read after subscriptions change #536

Closed
rjemanuele opened this issue Jul 29, 2019 · 9 comments
Closed

Messages are no longer read after subscriptions change #536

rjemanuele opened this issue Jul 29, 2019 · 9 comments

Comments

@rjemanuele
Copy link

rjemanuele commented Jul 29, 2019

Greeting!

We just started using aiokakfa (0.5.2) for a new asyncio application after using pykafka previously. In my application I am changing topics frequently based on external inputs.

Pseudo code:

import os
import asyncio
from aiokafka import AIOKafkaConsumer

class MyChangingConsumer:
    def __init__(self):
        self.kafka_hosts = os.environ['KAFKA_HOSTS']
        self.kafka = None

    async def topic_updater(self):
        while True:
            # await get new topics from somewhere else
            self.update_topics_callback(topics)

    def update_topics_callback(self, topics):
        self.kafka.subscribe(topics=topics)
        
    def handle_message(self, msg):
        # i do something useful with the msg
        pass

    async def consume_from_kafka(self):
        await self.kafka.start()
        while True:
            msg = await self.kafka.getone()
            self.handle_message(msg)
 
    async def run(self):
        self.kafka = AIOKafkaConsumer(loop=asyncio.get_running_loop(), bootstrap_servers=self.kafka_hosts,
                                      enable_auto_commit=False, fetch_max_wait_ms=50)

        await asyncio.gather(self.topic_updater(), self.consume_from_kafka())

In some cases this can cause new topics to be created (possibly related to #486 ). After the second topic update which calls subscribe I stop receiving messages from all the topics, yet I am positive new messages are going into Kafka on those topics.

Is there something else I should be doing?

Thank you,

Rob

@rjemanuele
Copy link
Author

@tvoinarovskyi would you or anyone else have a chance to look at this? I'm happy to walk anyone through and spend the time to get it debugged.

@tvoinarovskyi
Copy link
Member

tvoinarovskyi commented Aug 2, 2019

Hi there, sorry, did not have much time to dig into this. Simple tests with a consumer that switches off between topics worked for me. My setup was:

from aiokafka import AIOKafkaConsumer
import asyncio
import random

loop = asyncio.get_event_loop()

consumer = None


async def consume():
    global consumer

    consumer = AIOKafkaConsumer(
        loop=loop, bootstrap_servers='localhost:9092',
        group_id="my_group"
    )
    # Get cluster layout and topic/partition allocation
    await consumer.start()
    try:
        async for msg in consumer:
            print(msg.value)
    finally:
        await consumer.stop()


async def topic_updater():
    while True:
        i = random.randint(1, 3)
        topic = "my_topic{}".format(i)
        if consumer is not None:
            print('subscribe', topic)
            consumer.subscribe([topic])
        await asyncio.sleep(10)

loop.run_until_complete(asyncio.gather(consume(), topic_updater()))
from aiokafka import AIOKafkaProducer
import asyncio

loop = asyncio.get_event_loop()

async def send_one():
    producer = AIOKafkaProducer(
        loop=loop, bootstrap_servers='localhost:9092',
        retry_backoff_ms=2000)
    # Get cluster layout and topic/partition allocation
    await producer.start()
    while True:
        try:
            # Produce messages
            res = await producer.send_and_wait("my_topic1", b"Super 1")
            print(res)
            res = await producer.send_and_wait("my_topic2", b"Super 2")
            print(res)
            res = await producer.send_and_wait("my_topic3", b"Super 3")
            print(res)
            await asyncio.sleep(1)
        except:
            await producer.stop()
            raise

loop.run_until_complete(send_one())

Could you check if this one works for you?

If you could provide a minimal example that does not work for you or some debug logs to get an idea of what does not work in your case. The difference between aiokafka and pykafka should be minimal in how protocol stuff works, both are based on the Java approach.

@tvoinarovskyi
Copy link
Member

tvoinarovskyi commented Aug 2, 2019

Just to confirm that the problem is not here. From your example

    async def topic_updater(self):
        while True:
            # await get new topics from somewhere else
            self.update_topics_callback(topics)

    def update_topics_callback(self, topics):
        self.kafka.subscribe(topics=topics)

You can't use blocking functions in async code, it will block other libraries that are executing, like aiokafka. In the example that would be a busy CPU loop, there must be an await in that function. I presume that was taken out just to demonstrate an example.

@rjemanuele
Copy link
Author

@tvoinarovskyi I'll try out your sample code; thank you for exploring this.

Yes, in my example there would be some code where the # await get new topics from somewhere else that does await on something else to receive those new topics.

@rjemanuele
Copy link
Author

rjemanuele commented Aug 6, 2019

@tvoinarovskyi So using your (slightly modified) code, I seem to have reproduced my issue or possibly #486 . No messages are received by the consumer.

So I started a new kafka service in Kubernetes using the Bitnami chart, https://bitnami.com/stack/kafka/helm . This basic chart starts a single kafka broker by default. I ran the consumer first, that way the topics are auto-created by the consumer. The consumer raun for about 10 seconds before I started the producer. I stopped them at about the same time. The exact code I used and output is below. Again, this is on aiokafka 0.5.2 and Kafka 2.2.0.

Thank you,

Rob

My consumer:

from aiokafka import AIOKafkaConsumer
import asyncio
import random

loop = asyncio.get_event_loop()

consumer = None


async def consume():
    global consumer

    consumer = AIOKafkaConsumer(
        loop=loop, bootstrap_servers='kafka.bus:9092',
    )
    # Get cluster layout and topic/partition allocation
    await consumer.start()
    try:
        async for msg in consumer:
            print(msg.value)
    finally:
        await consumer.stop()


async def topic_updater():
    while True:
        i = random.randint(1, 3)
        topic = "my_topic{}".format(i)
        if consumer is not None:
            print('subscribe', topic)
            consumer.subscribe([topic])
        await asyncio.sleep(10)

loop.run_until_complete(asyncio.gather(consume(), topic_updater()))

Consumer output:

root@aiokafka-test-local-mount:/python-mount# python consumer.py
subscribe my_topic1
Topic my_topic1 is not available during auto-create initialization
Topic my_topic1 is not available during auto-create initialization
Topic my_topic1 is not available during auto-create initialization
subscribe my_topic2
Topic my_topic2 is not available during auto-create initialization
subscribe my_topic2
subscribe my_topic3
subscribe my_topic2
subscribe my_topic2
subscribe my_topic1
subscribe my_topic2
subscribe my_topic1
subscribe my_topic1
subscribe my_topic3
subscribe my_topic1
subscribe my_topic1
subscribe my_topic2
subscribe my_topic1
subscribe my_topic1
subscribe my_topic3
subscribe my_topic2
subscribe my_topic3
subscribe my_topic1
subscribe my_topic1
subscribe my_topic1
subscribe my_topic3
subscribe my_topic1
^CTraceback (most recent call last):
  File "consumer.py", line 34, in <module>
    loop.run_until_complete(asyncio.gather(consume(), topic_updater()))
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete
    self.run_forever()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1739, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/local/lib/python3.7/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

My producer:

from aiokafka import AIOKafkaProducer
import asyncio

loop = asyncio.get_event_loop()

async def send_one():
    producer = AIOKafkaProducer(
        loop=loop, bootstrap_servers='kafka.bus:9092',
        retry_backoff_ms=2000)
    # Get cluster layout and topic/partition allocation
    await producer.start()
    while True:
        try:
            # Produce messages
            res = await producer.send_and_wait("my_topic1", b"Super 1")
            print(res)
            res = await producer.send_and_wait("my_topic2", b"Super 2")
            print(res)
            res = await producer.send_and_wait("my_topic3", b"Super 3")
            print(res)
            await asyncio.sleep(1)
        except:
            await producer.stop()
            raise

loop.run_until_complete(send_one())

My producer output:

root@aiokafka-test-local-mount:/python-mount# python producer.py
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=0, timestamp=1565052987758, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=0, timestamp=1565052987819, timestamp_type=0)
Topic my_topic3 is not available during auto-create initialization
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=0, timestamp=1565052989847, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=1, timestamp=1565052990859, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=1, timestamp=1565052990867, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=1, timestamp=1565052990872, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=2, timestamp=1565052991877, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=2, timestamp=1565052991881, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=2, timestamp=1565052991885, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=3, timestamp=1565052992890, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=3, timestamp=1565052992893, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=3, timestamp=1565052992898, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=4, timestamp=1565052993903, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=4, timestamp=1565052993907, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=4, timestamp=1565052993909, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=5, timestamp=1565052994913, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=5, timestamp=1565052994920, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=5, timestamp=1565052994927, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=6, timestamp=1565052995931, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=6, timestamp=1565052995935, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=6, timestamp=1565052995939, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=7, timestamp=1565052996942, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=7, timestamp=1565052996948, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=7, timestamp=1565052996951, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=8, timestamp=1565052997955, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=8, timestamp=1565052997960, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=8, timestamp=1565052997966, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=9, timestamp=1565052998971, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=9, timestamp=1565052998974, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=9, timestamp=1565052998977, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=10, timestamp=1565052999983, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=10, timestamp=1565052999988, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=10, timestamp=1565052999991, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=11, timestamp=1565053000997, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=11, timestamp=1565053001001, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=11, timestamp=1565053001004, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=12, timestamp=1565053002011, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=12, timestamp=1565053002015, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=12, timestamp=1565053002019, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=13, timestamp=1565053003024, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=13, timestamp=1565053003027, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=13, timestamp=1565053003034, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=14, timestamp=1565053004038, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=14, timestamp=1565053004041, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=14, timestamp=1565053004044, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=15, timestamp=1565053005052, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=15, timestamp=1565053005057, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=15, timestamp=1565053005062, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=16, timestamp=1565053006070, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=16, timestamp=1565053006072, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=16, timestamp=1565053006074, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=17, timestamp=1565053007080, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=17, timestamp=1565053007085, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=17, timestamp=1565053007088, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=18, timestamp=1565053008094, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=18, timestamp=1565053008099, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=18, timestamp=1565053008103, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=19, timestamp=1565053009109, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=19, timestamp=1565053009115, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=19, timestamp=1565053009118, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=20, timestamp=1565053010125, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=20, timestamp=1565053010130, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=20, timestamp=1565053010133, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=21, timestamp=1565053011137, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=21, timestamp=1565053011144, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=21, timestamp=1565053011148, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=22, timestamp=1565053012152, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=22, timestamp=1565053012156, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=22, timestamp=1565053012160, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=23, timestamp=1565053013166, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=23, timestamp=1565053013170, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=23, timestamp=1565053013177, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=24, timestamp=1565053014181, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=24, timestamp=1565053014184, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=24, timestamp=1565053014186, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=25, timestamp=1565053015191, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=25, timestamp=1565053015194, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=25, timestamp=1565053015198, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=26, timestamp=1565053016203, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=26, timestamp=1565053016210, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=26, timestamp=1565053016219, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=27, timestamp=1565053017224, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=27, timestamp=1565053017226, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=27, timestamp=1565053017230, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=28, timestamp=1565053018233, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=28, timestamp=1565053018237, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=28, timestamp=1565053018243, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=29, timestamp=1565053019247, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=29, timestamp=1565053019252, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=29, timestamp=1565053019256, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=30, timestamp=1565053020261, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=30, timestamp=1565053020264, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=30, timestamp=1565053020267, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=31, timestamp=1565053021272, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=31, timestamp=1565053021276, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=31, timestamp=1565053021279, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=32, timestamp=1565053022284, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=32, timestamp=1565053022288, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=32, timestamp=1565053022292, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=33, timestamp=1565053023300, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=33, timestamp=1565053023304, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=33, timestamp=1565053023307, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=34, timestamp=1565053024311, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=34, timestamp=1565053024316, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=34, timestamp=1565053024323, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=35, timestamp=1565053025332, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=35, timestamp=1565053025338, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=35, timestamp=1565053025344, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=36, timestamp=1565053026349, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=36, timestamp=1565053026354, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=36, timestamp=1565053026359, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=37, timestamp=1565053027365, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=37, timestamp=1565053027372, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=37, timestamp=1565053027375, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=38, timestamp=1565053028380, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=38, timestamp=1565053028382, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=38, timestamp=1565053028385, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=39, timestamp=1565053029388, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=39, timestamp=1565053029393, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=39, timestamp=1565053029402, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=40, timestamp=1565053030406, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=40, timestamp=1565053030409, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=40, timestamp=1565053030413, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=41, timestamp=1565053031417, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=41, timestamp=1565053031422, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=41, timestamp=1565053031429, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=42, timestamp=1565053032437, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=42, timestamp=1565053032440, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=42, timestamp=1565053032444, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=43, timestamp=1565053033451, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=43, timestamp=1565053033455, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=43, timestamp=1565053033459, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=44, timestamp=1565053034468, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=44, timestamp=1565053034473, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=44, timestamp=1565053034476, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=45, timestamp=1565053035480, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=45, timestamp=1565053035483, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=45, timestamp=1565053035489, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=46, timestamp=1565053036495, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=46, timestamp=1565053036500, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=46, timestamp=1565053036505, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=47, timestamp=1565053037510, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=47, timestamp=1565053037512, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=47, timestamp=1565053037514, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=48, timestamp=1565053038517, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=48, timestamp=1565053038520, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=48, timestamp=1565053038527, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=49, timestamp=1565053039533, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=49, timestamp=1565053039536, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=49, timestamp=1565053039542, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=50, timestamp=1565053040545, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=50, timestamp=1565053040549, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=50, timestamp=1565053040553, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=51, timestamp=1565053041557, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=51, timestamp=1565053041564, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=51, timestamp=1565053041569, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=52, timestamp=1565053042573, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=52, timestamp=1565053042579, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=52, timestamp=1565053042581, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=53, timestamp=1565053043587, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=53, timestamp=1565053043591, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=53, timestamp=1565053043595, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=54, timestamp=1565053044600, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=54, timestamp=1565053044603, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=54, timestamp=1565053044609, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=55, timestamp=1565053045616, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=55, timestamp=1565053045629, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=55, timestamp=1565053045639, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=56, timestamp=1565053046643, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=56, timestamp=1565053046646, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=56, timestamp=1565053046649, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=57, timestamp=1565053047654, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=57, timestamp=1565053047658, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=57, timestamp=1565053047663, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=58, timestamp=1565053048669, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=58, timestamp=1565053048671, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=58, timestamp=1565053048674, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=59, timestamp=1565053049678, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=59, timestamp=1565053049683, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=59, timestamp=1565053049692, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=60, timestamp=1565053050698, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=60, timestamp=1565053050701, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=60, timestamp=1565053050703, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=61, timestamp=1565053051709, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=61, timestamp=1565053051712, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=61, timestamp=1565053051715, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=62, timestamp=1565053052720, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=62, timestamp=1565053052724, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=62, timestamp=1565053052727, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=63, timestamp=1565053053733, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=63, timestamp=1565053053744, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=63, timestamp=1565053053751, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=64, timestamp=1565053054756, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=64, timestamp=1565053054759, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=64, timestamp=1565053054765, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=65, timestamp=1565053055768, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=65, timestamp=1565053055771, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=65, timestamp=1565053055775, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=66, timestamp=1565053056781, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=66, timestamp=1565053056786, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=66, timestamp=1565053056793, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=67, timestamp=1565053057797, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=67, timestamp=1565053057799, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=67, timestamp=1565053057805, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=68, timestamp=1565053058809, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=68, timestamp=1565053058812, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=68, timestamp=1565053058818, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=69, timestamp=1565053059825, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=69, timestamp=1565053059830, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=69, timestamp=1565053059833, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=70, timestamp=1565053060838, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=70, timestamp=1565053060843, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=70, timestamp=1565053060848, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=71, timestamp=1565053061853, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=71, timestamp=1565053061855, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=71, timestamp=1565053061862, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=72, timestamp=1565053062868, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=72, timestamp=1565053062872, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=72, timestamp=1565053062874, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=73, timestamp=1565053063878, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=73, timestamp=1565053063883, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=73, timestamp=1565053063887, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=74, timestamp=1565053064894, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=74, timestamp=1565053064898, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=74, timestamp=1565053064902, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=75, timestamp=1565053065905, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=75, timestamp=1565053065908, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=75, timestamp=1565053065913, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=76, timestamp=1565053066919, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=76, timestamp=1565053066921, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=76, timestamp=1565053066928, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=77, timestamp=1565053067937, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=77, timestamp=1565053067941, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=77, timestamp=1565053067944, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=78, timestamp=1565053068947, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=78, timestamp=1565053068951, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=78, timestamp=1565053068955, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=79, timestamp=1565053069960, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=79, timestamp=1565053069964, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=79, timestamp=1565053069967, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=80, timestamp=1565053070973, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=80, timestamp=1565053070980, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=80, timestamp=1565053070983, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=81, timestamp=1565053071988, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=81, timestamp=1565053071991, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=81, timestamp=1565053071994, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=82, timestamp=1565053072999, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=82, timestamp=1565053073008, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=82, timestamp=1565053073012, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=83, timestamp=1565053074021, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=83, timestamp=1565053074028, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=83, timestamp=1565053074033, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=84, timestamp=1565053075037, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=84, timestamp=1565053075042, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=84, timestamp=1565053075047, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=85, timestamp=1565053076058, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=85, timestamp=1565053076061, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=85, timestamp=1565053076063, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=86, timestamp=1565053077067, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=86, timestamp=1565053077070, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=86, timestamp=1565053077074, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=87, timestamp=1565053078078, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=87, timestamp=1565053078082, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=87, timestamp=1565053078089, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=88, timestamp=1565053079094, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=88, timestamp=1565053079099, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=88, timestamp=1565053079103, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=89, timestamp=1565053080107, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=89, timestamp=1565053080112, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=89, timestamp=1565053080114, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=90, timestamp=1565053081119, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=90, timestamp=1565053081127, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=90, timestamp=1565053081132, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=91, timestamp=1565053082138, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=91, timestamp=1565053082144, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=91, timestamp=1565053082146, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=92, timestamp=1565053083152, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=92, timestamp=1565053083157, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=92, timestamp=1565053083161, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=93, timestamp=1565053084167, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=93, timestamp=1565053084171, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=93, timestamp=1565053084173, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=94, timestamp=1565053085176, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=94, timestamp=1565053085180, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=94, timestamp=1565053085186, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=95, timestamp=1565053086191, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=95, timestamp=1565053086198, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=95, timestamp=1565053086201, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=96, timestamp=1565053087206, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=96, timestamp=1565053087209, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=96, timestamp=1565053087214, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=97, timestamp=1565053088217, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=97, timestamp=1565053088219, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=97, timestamp=1565053088228, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=98, timestamp=1565053089234, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=98, timestamp=1565053089240, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=98, timestamp=1565053089244, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=99, timestamp=1565053090249, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=99, timestamp=1565053090252, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=99, timestamp=1565053090255, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=100, timestamp=1565053091259, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=100, timestamp=1565053091262, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=100, timestamp=1565053091266, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=101, timestamp=1565053092271, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=101, timestamp=1565053092275, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=101, timestamp=1565053092278, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=102, timestamp=1565053093285, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=102, timestamp=1565053093291, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=102, timestamp=1565053093294, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=103, timestamp=1565053094299, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=103, timestamp=1565053094305, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=103, timestamp=1565053094310, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=104, timestamp=1565053095317, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=104, timestamp=1565053095323, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=104, timestamp=1565053095327, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=105, timestamp=1565053096332, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=105, timestamp=1565053096335, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=105, timestamp=1565053096338, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=106, timestamp=1565053097345, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=106, timestamp=1565053097350, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=106, timestamp=1565053097355, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=107, timestamp=1565053098362, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=107, timestamp=1565053098368, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=107, timestamp=1565053098373, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=108, timestamp=1565053099377, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=108, timestamp=1565053099384, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=108, timestamp=1565053099387, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=109, timestamp=1565053100392, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=109, timestamp=1565053100396, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=109, timestamp=1565053100400, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=110, timestamp=1565053101405, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=110, timestamp=1565053101411, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=110, timestamp=1565053101414, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=111, timestamp=1565053102417, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=111, timestamp=1565053102421, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=111, timestamp=1565053102427, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=112, timestamp=1565053103432, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=112, timestamp=1565053103436, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=112, timestamp=1565053103438, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=113, timestamp=1565053104443, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=113, timestamp=1565053104449, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=113, timestamp=1565053104452, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=114, timestamp=1565053105456, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=114, timestamp=1565053105459, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=114, timestamp=1565053105464, timestamp_type=0)
RecordMetadata(topic='my_topic1', partition=0, topic_partition=TopicPartition(topic='my_topic1', partition=0), offset=115, timestamp=1565053106468, timestamp_type=0)
RecordMetadata(topic='my_topic2', partition=0, topic_partition=TopicPartition(topic='my_topic2', partition=0), offset=115, timestamp=1565053106472, timestamp_type=0)
RecordMetadata(topic='my_topic3', partition=0, topic_partition=TopicPartition(topic='my_topic3', partition=0), offset=115, timestamp=1565053106475, timestamp_type=0)
^CTraceback (most recent call last):
  File "producer.py", line 26, in <module>
    loop.run_until_complete(send_one())
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete
    self.run_forever()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1739, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/local/lib/python3.7/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

@rjemanuele
Copy link
Author

@tvoinarovskyi So here's another datapoint.... I just ran the consumer this time. I modified it to use the logging module so I could enable debug logging. Notice how after the subscription changes, no fetches are made. This is after the first subscription caused a topic to be created.

root@aiokafka-test-local-mount:/python-mount# python consumer.py
2019-08-07 21:17:16,128 - DEBUG : aiokafka : Attempting to bootstrap via node at kafka.bus:9092
2019-08-07 21:17:16,128 - INFO : root : subscribed: my_topic2
2019-08-07 21:17:16,128 - INFO : aiokafka.consumer.subscription_state : Updating subscribed topics to: frozenset({'my_topic2'})
2019-08-07 21:17:16,128 - INFO : aiokafka.consumer.consumer : Subscribed to topic(s): {'my_topic2'}
2019-08-07 21:17:16,136 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka.bus port=9092> Request 1: MetadataRequest_v0(topics=[])
2019-08-07 21:17:16,148 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka.bus port=9092> Response 1: MetadataResponse_v0(brokers=[(node_id=1001, host='kafka-0.kafka-headless.bus.svc.cluster.local', port=9092)], topics=[])
2019-08-07 21:17:16,148 - DEBUG : aiokafka.cluster : Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 0, groups: 0)
2019-08-07 21:17:16,149 - DEBUG : aiokafka.conn : Closing connection at kafka.bus:9092
2019-08-07 21:17:16,149 - DEBUG : aiokafka : Received cluster metadata: ClusterMetadata(brokers: 1, topics: 0, groups: 0)
2019-08-07 21:17:16,149 - DEBUG : aiokafka : Initiating connection to node 1001 at kafka-0.kafka-headless.bus.svc.cluster.local:9092
2019-08-07 21:17:16,153 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 1: ApiVersionRequest_v0()
2019-08-07 21:17:16,158 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=7), (api_key=1, min_version=0, max_version=10), (api_key=2, min_version=0, max_version=5), (api_key=3, min_version=0, max_version=7), (api_key=4, min_version=0, max_version=2), (api_key=5, min_version=0, max_version=1), (api_key=6, min_version=0, max_version=5), (api_key=7, min_version=0, max_version=2), (api_key=8, min_version=0, max_version=6), (api_key=9, min_version=0, max_version=5), (api_key=10, min_version=0, max_version=2), (api_key=11, min_version=0, max_version=4), (api_key=12, min_version=0, max_version=2), (api_key=13, min_version=0, max_version=2), (api_key=14, min_version=0, max_version=2), (api_key=15, min_version=0, max_version=2), (api_key=16, min_version=0, max_version=2), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=2), (api_key=19, min_version=0, max_version=3), (api_key=20, min_version=0, max_version=3), (api_key=21, min_version=0, max_version=1), (api_key=22, min_version=0, max_version=1), (api_key=23, min_version=0, max_version=2), (api_key=24, min_version=0, max_version=1), (api_key=25, min_version=0, max_version=1), (api_key=26, min_version=0, max_version=1), (api_key=27, min_version=0, max_version=0), (api_key=28, min_version=0, max_version=2), (api_key=29, min_version=0, max_version=1), (api_key=30, min_version=0, max_version=1), (api_key=31, min_version=0, max_version=1), (api_key=32, min_version=0, max_version=2), (api_key=33, min_version=0, max_version=1), (api_key=34, min_version=0, max_version=1), (api_key=35, min_version=0, max_version=1), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=38, min_version=0, max_version=1), (api_key=39, min_version=0, max_version=1), (api_key=40, min_version=0, max_version=1), (api_key=41, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=1), (api_key=43, min_version=0, max_version=0)])
2019-08-07 21:17:16,159 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 2: MetadataRequest_v0(topics=[])
2019-08-07 21:17:16,161 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 2: MetadataResponse_v0(brokers=[(node_id=1001, host='kafka-0.kafka-headless.bus.svc.cluster.local', port=9092)], topics=[])
2019-08-07 21:17:16,161 - DEBUG : aiokafka.conn : Closing connection at kafka-0.kafka-headless.bus.svc.cluster.local:9092
2019-08-07 21:17:16,162 - DEBUG : aiokafka : Initiating connection to node 1001 at kafka-0.kafka-headless.bus.svc.cluster.local:9092
2019-08-07 21:17:16,164 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 1: ApiVersionRequest_v0()
2019-08-07 21:17:16,168 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=7), (api_key=1, min_version=0, max_version=10), (api_key=2, min_version=0, max_version=5), (api_key=3, min_version=0, max_version=7), (api_key=4, min_version=0, max_version=2), (api_key=5, min_version=0, max_version=1), (api_key=6, min_version=0, max_version=5), (api_key=7, min_version=0, max_version=2), (api_key=8, min_version=0, max_version=6), (api_key=9, min_version=0, max_version=5), (api_key=10, min_version=0, max_version=2), (api_key=11, min_version=0, max_version=4), (api_key=12, min_version=0, max_version=2), (api_key=13, min_version=0, max_version=2), (api_key=14, min_version=0, max_version=2), (api_key=15, min_version=0, max_version=2), (api_key=16, min_version=0, max_version=2), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=2), (api_key=19, min_version=0, max_version=3), (api_key=20, min_version=0, max_version=3), (api_key=21, min_version=0, max_version=1), (api_key=22, min_version=0, max_version=1), (api_key=23, min_version=0, max_version=2), (api_key=24, min_version=0, max_version=1), (api_key=25, min_version=0, max_version=1), (api_key=26, min_version=0, max_version=1), (api_key=27, min_version=0, max_version=0), (api_key=28, min_version=0, max_version=2), (api_key=29, min_version=0, max_version=1), (api_key=30, min_version=0, max_version=1), (api_key=31, min_version=0, max_version=1), (api_key=32, min_version=0, max_version=2), (api_key=33, min_version=0, max_version=1), (api_key=34, min_version=0, max_version=1), (api_key=35, min_version=0, max_version=1), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=38, min_version=0, max_version=1), (api_key=39, min_version=0, max_version=1), (api_key=40, min_version=0, max_version=1), (api_key=41, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=1), (api_key=43, min_version=0, max_version=0)])
2019-08-07 21:17:16,169 - DEBUG : aiokafka : Sending metadata request MetadataRequest_v1(topics=['my_topic2']) to node 1001
2019-08-07 21:17:16,169 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 2: MetadataRequest_v1(topics=['my_topic2'])
2019-08-07 21:17:16,222 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 2: MetadataResponse_v1(brokers=[(node_id=1001, host='kafka-0.kafka-headless.bus.svc.cluster.local', port=9092, rack=None)], controller_id=1001, topics=[(error_code=5, topic='my_topic2', is_internal=False, partitions=[])])
2019-08-07 21:17:16,223 - WARNING : aiokafka.cluster : Topic my_topic2 is not available during auto-create initialization
2019-08-07 21:17:16,223 - DEBUG : aiokafka.cluster : Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 0, groups: 0)
2019-08-07 21:17:16,324 - DEBUG : aiokafka : Sending metadata request MetadataRequest_v1(topics=['my_topic2']) to node 1001
2019-08-07 21:17:16,324 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 3: MetadataRequest_v1(topics=['my_topic2'])
2019-08-07 21:17:16,333 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 3: MetadataResponse_v1(brokers=[(node_id=1001, host='kafka-0.kafka-headless.bus.svc.cluster.local', port=9092, rack=None)], controller_id=1001, topics=[(error_code=5, topic='my_topic2', is_internal=False, partitions=[])])
2019-08-07 21:17:16,334 - WARNING : aiokafka.cluster : Topic my_topic2 is not available during auto-create initialization
2019-08-07 21:17:16,334 - DEBUG : aiokafka.cluster : Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 0, groups: 0)
2019-08-07 21:17:16,435 - DEBUG : aiokafka : Sending metadata request MetadataRequest_v1(topics=['my_topic2']) to node 1001
2019-08-07 21:17:16,436 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 4: MetadataRequest_v1(topics=['my_topic2'])
2019-08-07 21:17:16,446 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 4: MetadataResponse_v1(brokers=[(node_id=1001, host='kafka-0.kafka-headless.bus.svc.cluster.local', port=9092, rack=None)], controller_id=1001, topics=[(error_code=0, topic='my_topic2', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])])])
2019-08-07 21:17:16,447 - DEBUG : aiokafka.cluster : Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2019-08-07 21:17:16,448 - DEBUG : aiokafka.consumer.group_coordinator : Metadata for topic has changed from {} to {'my_topic2': 1}.
2019-08-07 21:17:16,449 - DEBUG : aiokafka.consumer.fetcher : Updating fetch positions for partitions [TopicPartition(topic='my_topic2', partition=0)]
2019-08-07 21:17:16,449 - DEBUG : aiokafka : Sending metadata request MetadataRequest_v1(topics=['my_topic2']) to node 1001
2019-08-07 21:17:16,450 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 5: MetadataRequest_v1(topics=['my_topic2'])
2019-08-07 21:17:16,451 - DEBUG : aiokafka.consumer.fetcher : No committed offset found for TopicPartition(topic='my_topic2', partition=0)
2019-08-07 21:17:16,451 - DEBUG : aiokafka.consumer.fetcher : Resetting offset for partition TopicPartition(topic='my_topic2', partition=0) using latest strategy.
2019-08-07 21:17:16,452 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 6: OffsetRequest_v1(replica_id=-1, topics=[(topic='my_topic2', partitions=[(partition=0, timestamp=-1)])])
2019-08-07 21:17:16,453 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 5: MetadataResponse_v1(brokers=[(node_id=1001, host='kafka-0.kafka-headless.bus.svc.cluster.local', port=9092, rack=None)], controller_id=1001, topics=[(error_code=0, topic='my_topic2', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])])])
2019-08-07 21:17:16,454 - DEBUG : aiokafka.cluster : Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
2019-08-07 21:17:16,467 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 6: OffsetResponse_v1(topics=[(topic='my_topic2', partitions=[(partition=0, error_code=0, timestamp=-1, offset=0)])])
2019-08-07 21:17:16,468 - DEBUG : aiokafka.consumer.fetcher : Handling ListOffsetResponse response for TopicPartition(topic='my_topic2', partition=0). Fetched offset 0, timestamp -1
2019-08-07 21:17:16,468 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:16,469 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 7: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:17,024 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 7: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:17,025 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:17,026 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 8: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:17,531 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 8: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:17,532 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:17,532 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 9: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:18,037 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 9: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:18,037 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:18,038 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 10: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:18,543 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 10: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:18,544 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:18,545 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 11: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:19,050 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 11: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:19,050 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:19,051 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 12: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:19,555 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 12: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:19,556 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:19,557 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 13: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:20,061 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 13: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:20,062 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:20,063 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 14: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:20,567 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 14: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:20,568 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:20,569 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 15: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:21,072 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 15: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:21,073 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:21,073 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 16: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:21,576 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 16: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:21,577 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:21,578 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 17: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:22,081 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 17: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:22,082 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:22,083 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 18: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:22,586 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 18: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:22,587 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:22,588 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 19: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:23,093 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 19: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:23,093 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:23,094 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 20: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:23,599 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 20: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:23,600 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:23,601 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 21: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:24,104 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 21: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:24,105 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:24,106 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 22: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:24,610 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 22: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:24,611 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:24,612 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 23: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:25,117 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 23: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:25,118 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:25,118 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 24: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:25,624 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 24: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='my_topic2', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')])])
2019-08-07 21:17:25,626 - DEBUG : aiokafka.consumer.fetcher : Adding fetch request for partition TopicPartition(topic='my_topic2', partition=0) at offset 0
2019-08-07 21:17:25,627 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 25: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='my_topic2', partitions=[(partition=0, offset=0, max_bytes=1048576)])])
2019-08-07 21:17:26,129 - INFO : root : subscribed: my_topic1
2019-08-07 21:17:26,129 - INFO : aiokafka.consumer.subscription_state : Updating subscribed topics to: frozenset({'my_topic1'})
2019-08-07 21:17:26,130 - INFO : aiokafka.consumer.consumer : Subscribed to topic(s): {'my_topic1'}
2019-08-07 21:17:26,131 - DEBUG : aiokafka : Sending metadata request MetadataRequest_v1(topics=['my_topic1']) to node 1001
2019-08-07 21:17:26,131 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Request 26: MetadataRequest_v1(topics=['my_topic1'])
2019-08-07 21:17:26,166 - DEBUG : aiokafka.conn : <AIOKafkaConnection host=kafka-0.kafka-headless.bus.svc.cluster.local port=9092> Response 26: MetadataResponse_v1(brokers=[(node_id=1001, host='kafka-0.kafka-headless.bus.svc.cluster.local', port=9092, rack=None)], controller_id=1001, topics=[(error_code=5, topic='my_topic1', is_internal=False, partitions=[])])
2019-08-07 21:17:26,167 - WARNING : aiokafka.cluster : Topic my_topic1 is not available during auto-create initialization
2019-08-07 21:17:26,167 - DEBUG : aiokafka.cluster : Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 0, groups: 0)
2019-08-07 21:17:26,168 - DEBUG : aiokafka.consumer.group_coordinator : Metadata for topic has changed from {'my_topic2': 1} to {'my_topic1': 0}.
2019-08-07 21:17:36,142 - INFO : root : subscribed: my_topic2
2019-08-07 21:17:36,142 - INFO : aiokafka.consumer.subscription_state : Updating subscribed topics to: frozenset({'my_topic2'})
2019-08-07 21:17:36,143 - INFO : aiokafka.consumer.consumer : Subscribed to topic(s): {'my_topic2'}
2019-08-07 21:17:46,145 - INFO : root : subscribed: my_topic2
2019-08-07 21:17:46,145 - INFO : aiokafka.consumer.subscription_state : Updating subscribed topics to: frozenset({'my_topic2'})
2019-08-07 21:17:46,145 - INFO : aiokafka.consumer.consumer : Subscribed to topic(s): {'my_topic2'}
^CTraceback (most recent call last):
  File "consumer.py", line 37, in <module>
    loop.run_until_complete(asyncio.gather(consume(), topic_updater()))
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete
    self.run_forever()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1739, in _run_once
    event_list = self._selector.select(timeout)
  File "/usr/local/lib/python3.7/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

@rjemanuele
Copy link
Author

I can also reproduce this with master. I created a branch with a lot of logging added along the failure path.

It comes down to the group coordinator raising an exception here: https://github.com/rjemanuele/aiokafka/blob/debugging_metadata_not_updating/aiokafka/consumer/group_coordinator.py#L114

p_ids is None on that line of code after the second call to the consumer's subscribe. That causes the unhandled exception which stops the task AIOKafkaClient._md_synchronizer.

The first call to subscribe succeeds which I think is due to the freshly initiated group_coordinator. After the second call to subscribe there are stale partitions in the coordinator.

I'm looking at options for correcting it.

@tvoinarovskyi
Copy link
Member

@rjemanuele Could you try the changeset in #539. Should fix the problem. Thanks for debugging this!

rjemanuele pushed a commit to rjemanuele/aiokafka that referenced this issue Aug 11, 2019
tvoinarovskyi added a commit that referenced this issue Apr 23, 2020
* Fix issue #536 for race condition between metadata update and subscription change

* Fix test

* Add some debug prints

* Rework the patch to properly reassign partitions

* Fix lint
@tvoinarovskyi
Copy link
Member

Should be fixed with release 0.6.0. Feel free to ask if you still have issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants