Skip to content

Commit

Permalink
Fix issue #536 for race condition between metadata update and subscri…
Browse files Browse the repository at this point in the history
…ption change
  • Loading branch information
tvoinarovskyi committed Aug 10, 2019
1 parent ef883b1 commit a711d40
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 3 deletions.
11 changes: 9 additions & 2 deletions aiokafka/consumer/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,15 @@ def assign_all_partitions(self, check_unknown=False):
partitions = []
for topic in self._subscription.subscription.topics:
p_ids = self._cluster.partitions_for_topic(topic)
if not p_ids and check_unknown:
raise Errors.UnknownTopicOrPartitionError()
if not p_ids:
if check_unknown:
raise Errors.UnknownTopicOrPartitionError()
else:
# We probably just changed subscription during metadata
# update. No problem, lets wait for the next metadata
# update and make sure it's triggered just in case
self._client.force_metadata_update()
continue
for p_id in p_ids:
partitions.append(TopicPartition(topic, p_id))

Expand Down
37 changes: 36 additions & 1 deletion tests/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from aiokafka.client import AIOKafkaClient
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.consumer.group_coordinator import (
GroupCoordinator, CoordinatorGroupRebalance)
GroupCoordinator, CoordinatorGroupRebalance, NoGroupCoordinator)
from aiokafka.consumer.subscription_state import SubscriptionState
from aiokafka.util import create_future, ensure_future

Expand Down Expand Up @@ -1329,3 +1329,38 @@ async def do_rejoin(subsc):
self.assertEqual(autocommit_mock.call_count, 4)
self.assertEqual(metadata_mock.call_count, 1)
self.assertEqual(last_commit_mock.call_count, 1)

@run_until_complete
async def test_no_group_subscribe_during_metadata_update(self):
# Issue #536. During metadata update we can't assume the subscription
# did not change. We should handle the case by refreshing meta again.
client = AIOKafkaClient(
loop=self.loop, bootstrap_servers=self.hosts)
await client.bootstrap()
await self.wait_topic(client, 'topic1')
await self.wait_topic(client, 'topic2')
await client.set_topics(('other_topic', ))

subscription = SubscriptionState(loop=self.loop)
coordinator = NoGroupCoordinator(
client, subscription, loop=self.loop)
subscription.subscribe(topics=set(['topic1']))
client.set_topics(('topic1', ))
await asyncio.sleep(0.0001, loop=self.loop)

# Change subscription before metadata update is received
subscription.subscribe(topics=set(['topic2']))
metadata_fut = client.set_topics(('topic2', ))

try:
await asyncio.wait_for(
metadata_fut,
timeout=0.2
)
except asyncio.TimeoutError:
pass

self.assertFalse(client._sync_task.done())

await coordinator.close()
await client.close()

0 comments on commit a711d40

Please sign in to comment.