Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flush method to AIOKafkaProducer #209

Merged
merged 4 commits into from
Aug 28, 2017

Conversation

vineetgoel
Copy link
Contributor

This separates out the logic to await flushing the message accumulator into a flush() method that can be used to flush remaining buffered messages without having to close() the producer.

@codecov
Copy link

codecov bot commented Aug 22, 2017

Codecov Report

Merging #209 into master will decrease coverage by 0.04%.
The diff coverage is 80%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #209      +/-   ##
==========================================
- Coverage   97.22%   97.17%   -0.05%     
==========================================
  Files          12       12              
  Lines        2088     2092       +4     
==========================================
+ Hits         2030     2033       +3     
- Misses         58       59       +1
Impacted Files Coverage Δ
aiokafka/message_accumulator.py 100% <100%> (ø) ⬆️
aiokafka/producer.py 97.23% <50%> (-0.53%) ⬇️
aiokafka/fetcher.py 96.98% <0%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 8ab8d45...485351d. Read the comment docs.

# NOTE: we copy to avoid mutation during `yield from` below
self._closed = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you set self._closed flag here? It's not needed I think.

Copy link
Contributor Author

@vineetgoel vineetgoel Aug 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to avoid buffering more messages while flushing but you are right, that might not be needed. Will remove.

@@ -201,12 +201,16 @@ def start(self):
log.debug("Kafka producer started")

@asyncio.coroutine
def flush(self):
"""Wait untill all batches are Delivered and futures resolved"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*untill -> until please.

@tvoinarovskyi
Copy link
Member

tvoinarovskyi commented Aug 27, 2017

Hey there, I would love to merge it, but the build it breaks the behavior of ProducerClosed error if send called after close.

@tvoinarovskyi tvoinarovskyi merged commit 915f5e9 into aio-libs:master Aug 28, 2017
@tvoinarovskyi
Copy link
Member

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants