Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topics do not become available after autocreation #486

Closed
Ablu opened this issue Mar 7, 2019 · 8 comments
Closed

Topics do not become available after autocreation #486

Ablu opened this issue Mar 7, 2019 · 8 comments

Comments

@Ablu
Copy link

Ablu commented Mar 7, 2019

Hi,

I got a server which is configured to auto-create topics.

The client code looks as follows:

            topic_list = ("admin", "ns", "vim_account", "wim_account", "sdn", "nsi")
            self.consumer = AIOKafkaConsumer(loop=loop, bootstrap_servers=self.broker, group_id=group_id,
                                             metadata_max_age_ms=10_000)
            await self.consumer.start()
            self.consumer.subscribe(topic_list)
            async for message in self.consumer:
                # handle message

The first launch of the client now never actually receives any messages if the Kafka instance is pristine. It looks like the newly generated groups are never picked up. Either restarting the client or configuring Kafka to automatically create the topic on start resolves the issue.

An attempt was made to decrease metadata_max_age_ms, however, the logs do not indicate that an update of the metadata actually happens after the maximum age has passed.

log of the client:

DEBUG:aiokafka:Attempting to bootstrap via node at 172.17.0.4:9092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=172.17.0.4 port=9092> Response 1: MetadataResponse_v0(brokers=[(node_id=1001, host='172.17.0.4', port=9092)], topics=[(error_code=0, topic='users', partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='ns', partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=10, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=20, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=40, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=30, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=9, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=11, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=31, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=39, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=13, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=18, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=22, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=8, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=32, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=43, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=29, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=34, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=6, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=41, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=27, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=48, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=5, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=15, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=35, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=25, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=46, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=26, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=36, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=44, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=16, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=37, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=17, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=45, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=24, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=38, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=33, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=23, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=28, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=12, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=19, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=14, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=47, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=49, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=42, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=7, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=21, leader=1001, replicas=[1001], isr=[1001])])])
DEBUG:kafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 3, groups: 0)
DEBUG:aiokafka.conn:Closing connection at 172.17.0.4:9092
DEBUG:aiokafka:Received cluster metadata: ClusterMetadata(brokers: 1, topics: 3, groups: 0)
DEBUG:aiokafka:Initiating connection to node 1001 at 172.17.0.4:9092
DEBUG:aiokafka.conn:<AIOKafkaConnection host=172.17.0.4 port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=7), (api_key=1, min_version=0, max_version=10), (api_key=2, min_version=0, max_version=4), (api_key=3, min_version=0, max_version=7), (api_key=4, min_version=0, max_version=1), (api_key=5, min_version=0, max_version=0), (api_key=6, min_version=0, max_version=4), (api_key=7, min_version=0, max_version=1), (api_key=8, min_version=0, max_version=6), (api_key=9, min_version=0, max_version=5), (api_key=10, min_version=0, max_version=2), (api_key=11, min_version=0, max_version=3), (api_key=12, min_version=0, max_version=2), (api_key=13, min_version=0, max_version=2), (api_key=14, min_version=0, max_version=2), (api_key=15, min_version=0, max_version=2), (api_key=16, min_version=0, max_version=2), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=2), (api_key=19, min_version=0, max_version=3), (api_key=20, min_version=0, max_version=3), (api_key=21, min_version=0, max_version=1), (api_key=22, min_version=0, max_version=1), (api_key=23, min_version=0, max_version=2), (api_key=24, min_version=0, max_version=1), (api_key=25, min_version=0, max_version=1), (api_key=26, min_version=0, max_version=1), (api_key=27, min_version=0, max_version=0), (api_key=28, min_version=0, max_version=2), (api_key=29, min_version=0, max_version=1), (api_key=30, min_version=0, max_version=1), (api_key=31, min_version=0, max_version=1), (api_key=32, min_version=0, max_version=2), (api_key=33, min_version=0, max_version=1), (api_key=34, min_version=0, max_version=1), (api_key=35, min_version=0, max_version=1), (api_key=36, min_version=0, max_version=0), (api_key=37, min_version=0, max_version=1), (api_key=38, min_version=0, max_version=1), (api_key=39, min_version=0, max_version=1), (api_key=40, min_version=0, max_version=1), (api_key=41, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=1)])
DEBUG:aiokafka.conn:<AIOKafkaConnection host=172.17.0.4 port=9092> Response 2: MetadataResponse_v0(brokers=[(node_id=1001, host='172.17.0.4', port=9092)], topics=[(error_code=0, topic='users', partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='ns', partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=10, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=20, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=40, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=30, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=9, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=11, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=31, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=39, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=13, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=18, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=22, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=8, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=32, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=43, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=29, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=34, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=1, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=6, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=41, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=27, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=48, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=5, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=15, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=35, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=25, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=46, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=26, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=36, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=44, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=16, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=37, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=17, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=45, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=3, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=24, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=38, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=33, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=23, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=28, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=2, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=12, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=19, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=14, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=4, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=47, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=49, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=42, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=7, leader=1001, replicas=[1001], isr=[1001]), (error_code=0, partition=21, leader=1001, replicas=[1001], isr=[1001])])])
DEBUG:aiokafka.conn:Closing connection at 172.17.0.4:9092
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'nsi', 'wim_account', 'sdn', 'vim_account', 'admin', 'ns'})
INFO:aiokafka.consumer.consumer:Subscribed to topic(s): {'sdn', 'vim_account', 'admin', 'nsi', 'wim_account', 'ns'}
DEBUG:aiokafka:Initiating connection to node 1001 at 172.17.0.4:9092
DEBUG:aiokafka:Sending metadata request MetadataRequest_v1(topics=['nsi', 'wim_account', 'sdn', 'vim_account', 'admin', 'ns']) to node 1001
DEBUG:aiokafka.conn:<AIOKafkaConnection host=172.17.0.4 port=9092> Response 1: MetadataResponse_v1(brokers=[(node_id=1001, host='172.17.0.4', port=9092, rack=None)], controller_id=1001, topics=[(error_code=0, topic='ns', is_internal=False, partitions=[(error_code=0, partition=0, leader=1001, replicas=[1001], isr=[1001])]), (error_code=5, topic='sdn', is_internal=False, partitions=[]), (error_code=5, topic='vim_account', is_internal=False, partitions=[]), (error_code=5, topic='admin', is_internal=False, partitions=[]), (error_code=5, topic='nsi', is_internal=False, partitions=[]), (error_code=5, topic='wim_account', is_internal=False, partitions=[])])
WARNING:kafka.cluster:Topic sdn is not available during auto-create initialization
WARNING:kafka.cluster:Topic vim_account is not available during auto-create initialization
WARNING:kafka.cluster:Topic admin is not available during auto-create initialization
WARNING:kafka.cluster:Topic nsi is not available during auto-create initialization
WARNING:kafka.cluster:Topic wim_account is not available during auto-create initialization
DEBUG:kafka.cluster:Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
DEBUG:aiokafka.consumer.group_coordinator:Metadata for topic has changed from {} to {'nsi': 0, 'wim_account': 0, 'sdn': 0, 'vim_account': 0, 'admin': 0, 'ns': 1}. 

The client was left running for ~10 minutes.

@tvoinarovskyi
Copy link
Member

tvoinarovskyi commented Mar 9, 2019

Hmm, seems ok from the code. Maybe it's an actual bug. Will need to try the scenario. As an option, could you try to pass topics to the constructor as arguments. It will actually block wait for them on start() if you pass them that way.

@tvoinarovskyi
Copy link
Member

tried to reproduce the example, but failed. On both master and 0.5.0 with Broker 2.1.0 the script worked as expected and created all topics. Used the script:

import asyncio

from aiokafka import AIOKafkaConsumer

import logging

log_level = logging.DEBUG
log_format = '[%(asctime)s] %(levelname)s [%(name)s]: %(message)s'
logging.basicConfig(level=logging.INFO, format=log_format)
log = logging.getLogger('kafka')
log.setLevel(log_level)

loop = asyncio.get_event_loop()


async def consume():
    topic_list = ("admin", "ns", "vim_account", "wim_account", "sdn", "nsi")
    consumer = AIOKafkaConsumer(
        loop=loop, bootstrap_servers="localhost:9092",
        group_id="test_group",
        metadata_max_age_ms=10000)
    await consumer.start()
    consumer.subscribe(topic_list)
    try:
        async for msg in consumer:
            print(msg.value)
    finally:
        await consumer.stop()

loop.run_until_complete(consume())

Can you provide more input on what version of aiokafka and Broker version are you using.

@Ablu
Copy link
Author

Ablu commented Mar 10, 2019

Can you provide more input on what version of aiokafka and Broker version are you using.

I tested with 0.4.3 and 0.5.0 of aiokafka. Kafka itself is Docker wurstmeister/kafka:latest together with wurstmeister/zookeeper:latest.

I will try the workaround with passing the values in the constructor and your script as soon I get the time! Thanks for the hint already.

@Ablu
Copy link
Author

Ablu commented Mar 22, 2019

I just did a quick test with setting the value in the constructor but without success...

Unfortunately, I currently have no time for further analysis.

@vedranf
Copy link

vedranf commented Sep 16, 2019

Hello,

I think I hit the same problem. Kafka and aiokafka are the latest release, code is Tranactional Consume-Process-Produce and producer is send_one function both example from the docs, kafka is configured to autocreate topics (single kafka instance, single partition, single replice). When I run the producer, it outputs:

"Topic c2 is not available during auto-create initialization"

Running: bin/kafka-consumer-groups.sh --describe --all-groups
shows nothing. However on the second run there is no warning, topics are there and messages are passed to consumers. But initial messages never reach consumers, CURRENT-OFFSET is empty/dash, but LOG-END-OFFSET shows some number so in the end I always have some lag shown in out_topic (per Tranactional Consume-Process-Produce example). Precreating topics resolves the problem, however given that topics are dynamic, checking that they exists before pushing every message isn't very efficient.

Thanks,
Vedran

@vedranf
Copy link

vedranf commented Sep 17, 2019

This is the log when this happens:

[2019-09-17 19:43:29,207] INFO Creating topic c7 with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(0)) (kafka.zk.AdminZkClient) [2019-09-17 19:43:29,208] INFO Got user-level KeeperException when processing sessionid:0x10053d2a1aa0006 type:setData cxid:0x338 zxid:0x559 txntype:-1 reqpath:n/a Error Path:/config/topics/c7 Error:KeeperErrorCode = NoNode for /config/topics/c7 (org.apache.zookeeper.server.PrepRequestProcessor) [2019-09-17 19:43:29,224] INFO [KafkaApi-0] Auto creation of topic c7 with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis) [2019-09-17 19:43:29,236] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(c7-0) (kafka.server.ReplicaFetcherManager) [2019-09-17 19:43:29,243] INFO [Log partition=c7-0, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log) [2019-09-17 19:43:29,243] INFO [Log partition=c7-0, dir=/tmp/kafka-logs] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 2 ms (kafka.log.Log) [2019-09-17 19:43:29,244] INFO Created log for partition c7-0 in /tmp/kafka-logs with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.3-IV1, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager) [2019-09-17 19:43:29,244] INFO [Partition c7-0 broker=0] No checkpointed highwatermark is found for partition c7-0 (kafka.cluster.Partition) [2019-09-17 19:43:29,244] INFO Replica loaded for partition c7-0 with initial high watermark 0 (kafka.cluster.Replica) [2019-09-17 19:43:29,245] INFO [Partition c7-0 broker=0] c7-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)

@tvoinarovskyi
Copy link
Member

Should be fixed with release 0.6.0.

@francoroy
Copy link

@tvoinarovskyi Hi, any news here? I didn't find any mention of this issue in the release notes of version 0.6.0.

I have the same issue when using a pattern consumer and waiting in a poll loop. I am using aiokafka 0.7.0 and kafka 2.2.0

the test flow is:

  1. produce messages to test_1_topic
  2. create a consumer for pattern test_.*_topic
  3. assert on the consumption of all messages from test_1_topic - SUCCESS
  4. produce messages to test_2_topic
  5. assert on the consumption of all messages from test_2_topic (in a poll loop. poll timeout=1s. sleep 3s. poll again. attempts = 10) - FAILED.

from aiokafka.cluster I get
WARNING aiokafka.cluster:cluster.py:88 Topic test_2_topic is not available during auto-create initialization

I've added this test because we want to initialize a consumer which would consume messages by pattern in an environment where topics are dynamically created.
from dpkp/kafka-python#1088 I also understand that this can occur if a topic is deleted.

Some more logs:

DEBUG    aiokafka.consumer.group_coordinator:group_coordinator.py:1120 Fetching committed offsets for partitions: [TopicPartition(topic='test_1_topic', partition=0)]
DEBUG    aiokafka.consumer.group_coordinator:group_coordinator.py:1159 No committed offset for partition TopicPartition(topic='test_1_topic', partition=0)
DEBUG    aiokafka.consumer.group_coordinator:group_coordinator.py:994 Sending offset-commit request with {TopicPartition(topic='test_1_topic', partition=0): OffsetAndMetadata(offset=20, metadata='')} for group test1 to 0
DEBUG    aiokafka.consumer.group_coordinator:group_coordinator.py:1007 Committed offset OffsetAndMetadata(offset=20, metadata='') for partition TopicPartition(topic='test_1_topic', partition=0)
WARNING  aiokafka.cluster:cluster.py:88 Topic test_2_topic is not available during auto-create initialization
DEBUG    aiokafka.cluster:cluster.py:112 Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
DEBUG    aiokafka.cluster:cluster.py:112 Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 2, groups: 0)
DEBUG    aiokafka.producer.producer:producer.py:495 Sending batch to TopicPartition(topic='test_2_topic', partition=0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants