-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add flush method to AIOKafkaProducer #209
Conversation
Codecov Report
@@ Coverage Diff @@
## master #209 +/- ##
==========================================
- Coverage 97.22% 97.17% -0.05%
==========================================
Files 12 12
Lines 2088 2092 +4
==========================================
+ Hits 2030 2033 +3
- Misses 58 59 +1
Continue to review full report at Codecov.
|
aiokafka/message_accumulator.py
Outdated
# NOTE: we copy to avoid mutation during `yield from` below | ||
self._closed = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you set self._closed
flag here? It's not needed I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to avoid buffering more messages while flushing but you are right, that might not be needed. Will remove.
@@ -201,12 +201,16 @@ def start(self): | |||
log.debug("Kafka producer started") | |||
|
|||
@asyncio.coroutine | |||
def flush(self): | |||
"""Wait untill all batches are Delivered and futures resolved""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
*untill -> until please.
Hey there, I would love to merge it, but the build it breaks the behavior of ProducerClosed error if |
Thanks |
This separates out the logic to await flushing the message accumulator into a
flush()
method that can be used to flush remaining buffered messages without having toclose()
the producer.