Skip to content

Commit

Permalink
Fix issue #536 (#539)
Browse files Browse the repository at this point in the history
* Fix issue #536 for race condition between metadata update and subscription change

* Fix test

* Add some debug prints

* Rework the patch to properly reassign partitions

* Fix lint
  • Loading branch information
tvoinarovskyi authored Apr 23, 2020
1 parent 34b7ada commit 8d6ed13
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 3 deletions.
9 changes: 9 additions & 0 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,15 @@ def subscribe(self, topics=(), pattern=None, listener=None):
self._subscription.subscribe(
topics=topics, listener=listener)
self._client.set_topics(self._subscription.subscription.topics)
if self._group_id is None:
# We have reset the assignment, but client.set_topics will
# not always do a metadata update. We force it to do it even
# if metadata did not change. This will trigger a reassignment
# on NoGroupCoordinator, but only if snapshot did not change,
# thus we reset it too.
self._client.force_metadata_update()
if self._coordinator is not None:
self._coordinator._metadata_snapshot = {}
log.info("Subscribed to topic(s): %s", topics)

def subscription(self):
Expand Down
15 changes: 13 additions & 2 deletions aiokafka/consumer/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,14 @@ def assign_all_partitions(self, check_unknown=False):
partitions = []
for topic in self._subscription.subscription.topics:
p_ids = self._cluster.partitions_for_topic(topic)
if not p_ids and check_unknown:
raise Errors.UnknownTopicOrPartitionError()
if not p_ids:
if check_unknown:
raise Errors.UnknownTopicOrPartitionError()
else:
# We probably just changed subscription during metadata
# update. No problem, lets wait for the next metadata
# update
continue
for p_id in p_ids:
partitions.append(TopicPartition(topic, p_id))

Expand All @@ -123,6 +129,7 @@ async def _reset_committed_routine(self):

assignment = self._subscription.subscription.assignment
if assignment is None:

await self._subscription.wait_for_assignment()
continue

Expand All @@ -142,6 +149,10 @@ async def _reset_committed_routine(self):
return_when=asyncio.FIRST_COMPLETED,
loop=self._loop)

if not event_waiter.done():
event_waiter.cancel()
event_waiter = None

except asyncio.CancelledError:
pass

Expand Down
37 changes: 36 additions & 1 deletion tests/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from aiokafka.client import AIOKafkaClient
from aiokafka.structs import OffsetAndMetadata, TopicPartition
from aiokafka.consumer.group_coordinator import (
GroupCoordinator, CoordinatorGroupRebalance)
GroupCoordinator, CoordinatorGroupRebalance, NoGroupCoordinator)
from aiokafka.consumer.subscription_state import SubscriptionState
from aiokafka.util import create_future, ensure_future

Expand Down Expand Up @@ -1343,3 +1343,38 @@ async def do_rejoin(subsc):
self.assertEqual(autocommit_mock.call_count, 4)
self.assertEqual(metadata_mock.call_count, 1)
self.assertEqual(last_commit_mock.call_count, 1)

@run_until_complete
async def test_no_group_subscribe_during_metadata_update(self):
# Issue #536. During metadata update we can't assume the subscription
# did not change. We should handle the case by refreshing meta again.
client = AIOKafkaClient(
loop=self.loop, bootstrap_servers=self.hosts)
await client.bootstrap()
await self.wait_topic(client, 'topic1')
await self.wait_topic(client, 'topic2')
await client.set_topics(('other_topic', ))

subscription = SubscriptionState(loop=self.loop)
coordinator = NoGroupCoordinator(
client, subscription, loop=self.loop)
subscription.subscribe(topics=set(['topic1']))
client.set_topics(('topic1', ))
await asyncio.sleep(0.0001, loop=self.loop)

# Change subscription before metadata update is received
subscription.subscribe(topics=set(['topic2']))
metadata_fut = client.set_topics(('topic2', ))

try:
await asyncio.wait_for(
metadata_fut,
timeout=0.2
)
except asyncio.TimeoutError:
pass

self.assertFalse(client._sync_task.done())

await coordinator.close()
await client.close()

0 comments on commit 8d6ed13

Please sign in to comment.