Skip to content

Commit

Permalink
Merge pull request #190 from ask/patch1
Browse files Browse the repository at this point in the history
Allow on_partition_assigned and on_partition_revoked to be async functions
  • Loading branch information
tvoinarovskyi authored Jul 24, 2017
2 parents 426b15c + b4d31fd commit 9f37893
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions aiokafka/group_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,10 @@ def _on_join_prepare(self, generation, member_id):
if self._subscription.listener:
try:
revoked = set(self._subscription.assigned_partitions())
self._subscription.listener.on_partitions_revoked(revoked)
res = self._subscription.listener.on_partitions_revoked(
revoked)
if asyncio.iscoroutine(res):
yield from res
except Exception:
log.exception("User provided subscription listener %s"
" for group %s failed on_partitions_revoked",
Expand Down Expand Up @@ -350,6 +353,7 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
group_assignment[member_id] = assignment
return group_assignment

@asyncio.coroutine
def _on_join_complete(self, generation, member_id, protocol,
member_assignment_bytes):
assignor = self._lookup_assignor(protocol)
Expand All @@ -375,7 +379,10 @@ def _on_join_complete(self, generation, member_id, protocol,
# execute the user's callback after rebalance
if self._subscription.listener:
try:
self._subscription.listener.on_partitions_assigned(assigned)
res = self._subscription.listener.on_partitions_assigned(
assigned)
if asyncio.iscoroutine(res):
yield from res
except Exception:
log.exception("User provided listener %s for group %s"
" failed on partition assignment: %s",
Expand Down Expand Up @@ -713,7 +720,7 @@ def ensure_active_group(self):
continue
if assignment is not None:
protocol, member_assignment_bytes = assignment
self._on_join_complete(
yield from self._on_join_complete(
self.generation, self.member_id,
protocol, member_assignment_bytes)
self.needs_join_prepare = True
Expand Down

0 comments on commit 9f37893

Please sign in to comment.