From 24dfe29e0214061306fafdd27bee4a751af2ae35 Mon Sep 17 00:00:00 2001 From: fabregas Date: Tue, 13 Dec 2016 15:15:44 +0200 Subject: [PATCH] fixed heartbeat fails if there are no consumed topics --- aiokafka/group_coordinator.py | 3 +++ tests/test_consumer.py | 13 +++++++++++++ 2 files changed, 16 insertions(+) diff --git a/aiokafka/group_coordinator.py b/aiokafka/group_coordinator.py index e6167307..df7b86bf 100644 --- a/aiokafka/group_coordinator.py +++ b/aiokafka/group_coordinator.py @@ -787,6 +787,9 @@ def _heartbeat_task_routine(self): while True: yield from asyncio.sleep(sleep_time, loop=self.loop) + if not self._subscription.partitions_auto_assigned(): + # no partitions assigned yet, just wait + continue try: yield from self.ensure_active_group() diff --git a/tests/test_consumer.py b/tests/test_consumer.py index cc05a5d9..988da599 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -529,3 +529,16 @@ def test_consumer_arguments(self): self.topic, loop=self.loop, bootstrap_servers=self.hosts, security_protocol="SSL", ssl_context=None) + + @run_until_complete + def test_consumer_group_without_subscription(self): + consumer = AIOKafkaConsumer( + loop=self.loop, + group_id='group-{}'.format(self.id()), + bootstrap_servers=self.hosts, + enable_auto_commit=False, + auto_offset_reset='earliest', + heartbeat_interval_ms=100) + yield from consumer.start() + yield from asyncio.sleep(0.2, loop=self.loop) + yield from consumer.stop()