Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RecursionError in MessageAccumulator.add_message method #409

Closed
vmagamedov opened this issue Aug 10, 2018 · 4 comments
Closed

RecursionError in MessageAccumulator.add_message method #409

vmagamedov opened this issue Aug 10, 2018 · 4 comments
Assignees
Milestone

Comments

@vmagamedov
Copy link

There were something wrong with Kafka:

aiokafka.producer.producer: Got error produce response on topic-partition TopicPartition(topic='taskqueue.default', partition=1), retrying. Error: <class 'kafka.errors.NotLeaderForPartitionError'>

Then one of producer.send_and_wait() method calls was cancelled due to timeout error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/grpclib/server.py", line 149, in request_handler
    await method.func(stream)
  File "/usr/local/lib/python3.6/dist-packages/taskqueue/server/service.py", line 49, in Add
    await self._producer.send_and_wait(topic, task.SerializeToString())
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/producer.py", line 318, in send_and_wait
    return (yield from future)
concurrent.futures._base.CancelledError

Then every other attempt to send a message fails immediately with this error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/grpclib/server.py", line 149, in request_handler
    await method.func(stream)
  File "/usr/local/lib/python3.6/dist-packages/taskqueue/server/service.py", line 49, in Add
    await self._producer.send_and_wait(topic, task.SerializeToString())
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/producer.py", line 317, in send_and_wait
    topic, value, key, partition, timestamp_ms)
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/producer.py", line 309, in send
    timestamp_ms=timestamp_ms)
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
    tp, key, value, timeout, timestamp_ms))
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
    tp, key, value, timeout, timestamp_ms))
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/message_accumulator.py", line 257, in add_message
    tp, key, value, timeout, timestamp_ms))
  [Previous line repeated 960 more times]
  File "/usr/local/lib/python3.6/dist-packages/aiokafka/producer/message_accumulator.py", line 252, in add_message
    yield from batch.wait_drain(timeout)
  File "/usr/lib/python3.6/asyncio/tasks.py", line 301, in wait
    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
  File "/usr/lib/python3.6/asyncio/coroutines.py", line 272, in iscoroutine
    return isinstance(obj, _COROUTINE_TYPES)
  File "/usr/lib/python3.6/abc.py", line 190, in __instancecheck__
    subclass in cls._abc_negative_cache):
  File "/usr/lib/python3.6/_weakrefset.py", line 75, in __contains__
    return wr in self.data
RecursionError: maximum recursion depth exceeded in comparison

Kafka restored to the normal state, but only two of three service instances also restored. One instance constantly fails with RecursionError. Instance restart helps.

I think that RecursionError happens because yield from batch.wait_drain(timeout) returns immediately.

And probably this is possible in one case: when batch was popped from accumulator, and then added again using MessageAccumulator.reenqueue method call. But at this time batch returned to the MessageAccumulator._batches dict has already _drain_waiter future in done state. Which causes RecursionError.

@tvoinarovskyi
Copy link
Member

Will have to look at it tomorrow. Thanks for the issue, seems like a bug!)

@tvoinarovskyi tvoinarovskyi self-assigned this Sep 2, 2018
@tvoinarovskyi
Copy link
Member

Hey, do you happen to use linger_ms option?

@tvoinarovskyi
Copy link
Member

@vmagamedov Again thanks for the issue. I was able to reproduce it only in combination with linger_ms, but got the same exception. I hope this will fix it for you!

@tvoinarovskyi tvoinarovskyi modified the milestones: 0.4.1, 0.4.2 Sep 2, 2018
@vmagamedov
Copy link
Author

Thanks for the fix! I'm using AIOKafkaProducer with default values, but I think that this fix should do the trick.

tvoinarovskyi added a commit that referenced this issue Sep 2, 2018
Added fix for issue #409, RecursionError in producer.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants