-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Messages are no longer read after subscriptions change #536
Comments
@tvoinarovskyi would you or anyone else have a chance to look at this? I'm happy to walk anyone through and spend the time to get it debugged. |
Hi there, sorry, did not have much time to dig into this. Simple tests with a consumer that switches off between topics worked for me. My setup was: from aiokafka import AIOKafkaConsumer
import asyncio
import random
loop = asyncio.get_event_loop()
consumer = None
async def consume():
global consumer
consumer = AIOKafkaConsumer(
loop=loop, bootstrap_servers='localhost:9092',
group_id="my_group"
)
# Get cluster layout and topic/partition allocation
await consumer.start()
try:
async for msg in consumer:
print(msg.value)
finally:
await consumer.stop()
async def topic_updater():
while True:
i = random.randint(1, 3)
topic = "my_topic{}".format(i)
if consumer is not None:
print('subscribe', topic)
consumer.subscribe([topic])
await asyncio.sleep(10)
loop.run_until_complete(asyncio.gather(consume(), topic_updater())) from aiokafka import AIOKafkaProducer
import asyncio
loop = asyncio.get_event_loop()
async def send_one():
producer = AIOKafkaProducer(
loop=loop, bootstrap_servers='localhost:9092',
retry_backoff_ms=2000)
# Get cluster layout and topic/partition allocation
await producer.start()
while True:
try:
# Produce messages
res = await producer.send_and_wait("my_topic1", b"Super 1")
print(res)
res = await producer.send_and_wait("my_topic2", b"Super 2")
print(res)
res = await producer.send_and_wait("my_topic3", b"Super 3")
print(res)
await asyncio.sleep(1)
except:
await producer.stop()
raise
loop.run_until_complete(send_one()) Could you check if this one works for you? If you could provide a minimal example that does not work for you or some debug logs to get an idea of what does not work in your case. The difference between aiokafka and pykafka should be minimal in how protocol stuff works, both are based on the Java approach. |
Just to confirm that the problem is not here. From your example
You can't use blocking functions in async code, it will block other libraries that are executing, like aiokafka. In the example that would be a busy CPU loop, there must be an await in that function. I presume that was taken out just to demonstrate an example. |
@tvoinarovskyi I'll try out your sample code; thank you for exploring this. Yes, in my example there would be some code where the |
@tvoinarovskyi So using your (slightly modified) code, I seem to have reproduced my issue or possibly #486 . No messages are received by the consumer. So I started a new kafka service in Kubernetes using the Bitnami chart, https://bitnami.com/stack/kafka/helm . This basic chart starts a single kafka broker by default. I ran the consumer first, that way the topics are auto-created by the consumer. The consumer raun for about 10 seconds before I started the producer. I stopped them at about the same time. The exact code I used and output is below. Again, this is on aiokafka 0.5.2 and Kafka 2.2.0. Thank you, Rob My consumer:
Consumer output:
My producer:
My producer output:
|
@tvoinarovskyi So here's another datapoint.... I just ran the consumer this time. I modified it to use the
|
I can also reproduce this with master. I created a branch with a lot of logging added along the failure path. It comes down to the group coordinator raising an exception here: https://github.com/rjemanuele/aiokafka/blob/debugging_metadata_not_updating/aiokafka/consumer/group_coordinator.py#L114
The first call to I'm looking at options for correcting it. |
@rjemanuele Could you try the changeset in #539. Should fix the problem. Thanks for debugging this! |
… subscription change
Should be fixed with release 0.6.0. Feel free to ask if you still have issues. |
Greeting!
We just started using aiokakfa (0.5.2) for a new asyncio application after using pykafka previously. In my application I am changing topics frequently based on external inputs.
Pseudo code:
In some cases this can cause new topics to be created (possibly related to #486 ). After the second topic update which calls
subscribe
I stop receiving messages from all the topics, yet I am positive new messages are going into Kafka on those topics.Is there something else I should be doing?
Thank you,
Rob
The text was updated successfully, but these errors were encountered: