Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring to allow fetcher to always reset offsets properly. #286

Merged
merged 24 commits into from
Jan 25, 2018

Conversation

tvoinarovskyi
Copy link
Member

@tvoinarovskyi tvoinarovskyi commented Jan 17, 2018

This is a major work, that I was postponing for a long time. The issue was that SubscriptionState used in kafka-python is not suited for asyncio, as it is not aware of Futures and operates purely on flags. For example, lets look at this pseudo-code:

async def update_fetch_position(self, partitions):
    pending_resets = []
    for tp in partitions:
        if not self._subscriptions.is_assigned(tp):
            continue
        if self._subscriptions.assignment[tp].awaiting_reset():
            pending_resets.append(ensure_future(self._do_partition_reset(tp)))

    if pending_resets:
        await asyncio.gather(pending_resets)

async def _do_partition_reset(self, tp):
    assert self._subscriptions.is_assigned(tp)

    tp_state = self._subscriptions.assignment[tp]
    reset_strategy = tp_state.reset_strategy

    # ...

The code could actually hit the assert in _do_partition_reset because created tasks are scheduled and can take some time to start executing. Ie the assignment can change before _do_partition_reset is executed.

In kafka-python this is never a problem because a thread can never be interrupted. We could emulate the same behaviour by adding locks at different places in the code, leading to all public API to become coroutines. That would break a lot, so instead this PR tries to solve those problems by refactoring SubscriptionState to contain some futures, etc.

some points need addressing:

  • Refactor unit tests, that relied on mocking SubscriptionState
  • Add more tests for SubscriptionState
  • Fix coverage drop, it should be present.
  • Document API changes. (commit can raise CommitFailedError now)

@codecov
Copy link

codecov bot commented Jan 18, 2018

Codecov Report

Merging #286 into master will increase coverage by 0.93%.
The diff coverage is 99.89%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #286      +/-   ##
==========================================
+ Coverage   97.59%   98.53%   +0.93%     
==========================================
  Files          20       21       +1     
  Lines        2535     2995     +460     
==========================================
+ Hits         2474     2951     +477     
+ Misses         61       44      -17
Impacted Files Coverage Δ
aiokafka/client.py 98.13% <ø> (ø) ⬆️
aiokafka/consumer/subscription_state.py 100% <100%> (ø)
aiokafka/consumer/consumer.py 99.28% <100%> (+1.47%) ⬆️
aiokafka/consumer/fetcher.py 96.71% <100%> (-1.56%) ⬇️
aiokafka/consumer/group_coordinator.py 98.87% <99.75%> (+4.68%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 93f3128...d3b407b. Read the comment docs.

@tvoinarovskyi tvoinarovskyi changed the title [WIP] Refactoring to allow fetcher to always reset offsets properly. Refactoring to allow fetcher to always reset offsets properly. Jan 22, 2018
@vineetgoel
Copy link
Contributor

@tvoinarovskyi do we have an ETA on when this PR should be merged and released?

@tvoinarovskyi
Copy link
Member Author

tvoinarovskyi commented Jan 24, 2018

Doing final adjustments and ready to merge. Wanted some feedback on it before releasing, mainly confirmation that it fixes #261 and #264. Also, I would like to fix #267 before release, started working on it today. I would say that release is possible to the end of this month, maybe this weekend.

@tvoinarovskyi tvoinarovskyi force-pushed the fix_position_reset_after_assign branch from 424ab6f to d8941be Compare January 25, 2018 14:59
@tvoinarovskyi tvoinarovskyi merged commit 6eaf210 into master Jan 25, 2018
@tvoinarovskyi tvoinarovskyi deleted the fix_position_reset_after_assign branch January 25, 2018 16:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants