-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Topics do not become available after autocreation #486
Comments
Hmm, seems ok from the code. Maybe it's an actual bug. Will need to try the scenario. As an option, could you try to pass topics to the constructor as arguments. It will actually block wait for them on start() if you pass them that way. |
tried to reproduce the example, but failed. On both master and 0.5.0 with Broker 2.1.0 the script worked as expected and created all topics. Used the script: import asyncio
from aiokafka import AIOKafkaConsumer
import logging
log_level = logging.DEBUG
log_format = '[%(asctime)s] %(levelname)s [%(name)s]: %(message)s'
logging.basicConfig(level=logging.INFO, format=log_format)
log = logging.getLogger('kafka')
log.setLevel(log_level)
loop = asyncio.get_event_loop()
async def consume():
topic_list = ("admin", "ns", "vim_account", "wim_account", "sdn", "nsi")
consumer = AIOKafkaConsumer(
loop=loop, bootstrap_servers="localhost:9092",
group_id="test_group",
metadata_max_age_ms=10000)
await consumer.start()
consumer.subscribe(topic_list)
try:
async for msg in consumer:
print(msg.value)
finally:
await consumer.stop()
loop.run_until_complete(consume()) Can you provide more input on what version of aiokafka and Broker version are you using. |
I tested with I will try the workaround with passing the values in the constructor and your script as soon I get the time! Thanks for the hint already. |
I just did a quick test with setting the value in the constructor but without success... Unfortunately, I currently have no time for further analysis. |
Hello, I think I hit the same problem. Kafka and aiokafka are the latest release, code is Tranactional Consume-Process-Produce and producer is send_one function both example from the docs, kafka is configured to autocreate topics (single kafka instance, single partition, single replice). When I run the producer, it outputs: "Topic c2 is not available during auto-create initialization" Running: bin/kafka-consumer-groups.sh --describe --all-groups Thanks, |
This is the log when this happens:
|
Should be fixed with release 0.6.0. |
@tvoinarovskyi Hi, any news here? I didn't find any mention of this issue in the release notes of version 0.6.0. I have the same issue when using a pattern consumer and waiting in a poll loop. I am using the test flow is:
from aiokafka.cluster I get I've added this test because we want to initialize a consumer which would consume messages by pattern in an environment where topics are dynamically created. Some more logs:
|
Hi,
I got a server which is configured to auto-create topics.
The client code looks as follows:
The first launch of the client now never actually receives any messages if the Kafka instance is pristine. It looks like the newly generated groups are never picked up. Either restarting the client or configuring Kafka to automatically create the topic on start resolves the issue.
An attempt was made to decrease
metadata_max_age_ms
, however, the logs do not indicate that an update of the metadata actually happens after the maximum age has passed.log of the client:
The client was left running for ~10 minutes.
The text was updated successfully, but these errors were encountered: