Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect on every exception from conn #173

Closed
Artimi opened this issue May 25, 2017 · 9 comments
Closed

Reconnect on every exception from conn #173

Artimi opened this issue May 25, 2017 · 9 comments
Labels
Milestone

Comments

@Artimi
Copy link

Artimi commented May 25, 2017

Hi,
we've encountered again the problem with stale connection that does not reconnect. First I get some an error in AIOKafkaClient.send() from AIOKafkaConnection.send() method.

Got error produce response: ConnectionError: Connection at kafka.example.com closed

This may happen and we should be able to recover. However, we do not reconnect and in some time after this we start to get exceptions from MessageAccumulator that is full and not drained:

Traceback (most recent call last):
  File "/home/artimi/env/lib/python3.5/site-packages/messaging_client/mqclient/kafka_producer.py", line 72, in _run
    await self._producer.send(topic, payload)\
  File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/producer.py", line 279, in send
    tp, key_bytes, value_bytes, self._request_timeout_ms / 1000)
  File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/message_accumulator.py", line 208, in add_message
    raise KafkaTimeoutError()
kafka.errors.KafkaTimeoutError: KafkaTimeoutError

I was thinking that we could do the same thing that was done in #149: when there is any error from AIOKafkaConnection.send() close connection which is then reconnected. Now it would wrap whole method AIOKafkaConnection.send() and if any exception was raised from this function we would close the connection. This approach would solve this for every usage of AIOKafkaConnection.send() in the code (I found 2 in Fetcher, 1 in GroupCoordinator and 1 in AIOKafkaClient). However, I'm not sure if there could some case that in infinite loop reopens connection because of some recurrent error.
What do you think about that? I'm willing to implement this on my own if you think it is a good idea.

aiokafka version: 0.2.2
kafka version: 0.9.0

@tvoinarovskyi
Copy link
Member

Are you by any chance using ack=0?

@Artimi
Copy link
Author

Artimi commented May 26, 2017

@Drizzt1991 No we are using default acks=1

@Artimi Artimi changed the title Reconnect on every exception from Reconnect on every exception from conn Jun 5, 2017
@Artimi
Copy link
Author

Artimi commented Jun 5, 2017

@Drizzt1991 What do you think about that problem? I could use retry_backoff_ms to limit reconnecting so it is not instant. Should I go for it?

@tvoinarovskyi
Copy link
Member

@Artimi Hi there, sorry for the long response.
It seems to me like this is not a bug in closing the connection, but rather in some logic part of the application.
Did you try some fix already and it helped?

@tvoinarovskyi
Copy link
Member

@Artimi The deal here is that you got Connection at ... closed. This is not an error you get on connection break (it would write Connection at ... broken), but raised when you close the connection using conn.close().
The only places connections are closed are you're fix for KafkaTimeout, producer.stop(), consumer.stop(). And I'm still not sure how you even got that situation.

@tvoinarovskyi
Copy link
Member

And could you confirm, that there's no Unexpected error in sender routine logs.

@Artimi
Copy link
Author

Artimi commented Jun 12, 2017

Hey @Drizzt1991 thanks for the response. I again walk through the logs and found that there was CorrelationIdError

May 23rd 2017, 08:25:57.908 Unable to request metadata from node with id 0: CorrelationIdError: Correlation ids do not match: sent 18, recv 19
May 23rd 2017, 08:25:57.909 Unable to update metadata from [0]
May 23rd 2017, 08:25:57.912 Got error produce response: ConnectionError: Connection at kafka...:1111 closed
May 23rd 2017, 08:36:50.484 Error during sending message\
Traceback (most recent call last):\
  File "/home/artimi/env/lib/python3.5/site-packages/messaging_client/mqclient/kafka_producer.py", line 72, in _run\
    await self._producer.send(topic, payload)\
  File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/producer.py", line 279, in send\
    tp, key_bytes, value_bytes, self._request_timeout_ms / 1000)\
  File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/message_accumulator.py", line 208, in add_message\
    raise KafkaTimeoutError()\
kafka.errors.KafkaTimeoutError: KafkaTimeoutError

Second occurrence was similar:

May 17th 2017, 09:40:54.684 Unable to request metadata from node with id 0: ConnectionError: Connection at kafka...:1111 closed
May 17th 2017, 09:40:54.684 Got error produce response: CorrelationIdError: Correlation ids do not match: sent 50801, recv 50802
May 17th 2017, 09:40:54.685 Unable to update metadata from [0]
May 17th 2017, 09:41:32.498 Error during sending message\
Traceback (most recent call last):\
  File "/home/artimi/env/lib/python3.5/site-packages/messaging_client/mqclient/kafka_producer.py", line 72, in _run\
    await self._producer.send(topic, payload)\
  File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/producer.py", line 279, in send\
    tp, key_bytes, value_bytes, self._request_timeout_ms / 1000)\
  File "/home/artimi/env/lib/python3.5/site-packages/aiokafka/message_accumulator.py", line 208, in add_message\
    raise KafkaTimeoutError()\
kafka.errors.KafkaTimeoutError: KafkaTimeoutError

As I'm looking at it now I think that there might be some help from my side. We are calling producer.client._metadata_update directly at start of our producer to have current metadata (it is because we want to force kafka to create topic for producing if missing). However result of _metadata_update was not checked and retried. So it could happen that we fail at getting new metadata and don't retry it. In producer we have default metadata_max_age_ms = 300000 so it can wait another 5 minute to get metadata and this might somehow affect the message accumulator. Maybe it will be just enough to try get metadata until I succeed at start. But I think that the connection should be reset in AIOKafkaProducer._send_produce_req because it is retriable error.

@tvoinarovskyi tvoinarovskyi added this to the 0.2.3 milestone Jul 4, 2017
@Artimi
Copy link
Author

Artimi commented Jul 7, 2017

@Drizzt1991 It seems that since we fixed our error with _metadata_update we no longer have this problem. So it seems that problem was indeed failed _metadata_update that was not retried and before it could be updated again it raised KafkaTimeoutError and ended in that weird state. I will watch that but I want to believe it was solved. Should I close the issue?

@tvoinarovskyi
Copy link
Member

@Artimi Hey, so I still could not find how you managed to get a CorrelationIdError, but we made a lot of fixes to producer in v0.2.3. Please try it and re-open or open a new ticket if you have any other issues.

tvoinarovskyi added a commit that referenced this issue Jul 24, 2017
* Fixed retry problem in Producer, when buffer is not reset to 0 offset.
  Thanks to @ngavrysh for the fix in Tubular/aiokafka fork. (issue #184)
* Fixed how Producer handles retries on Leader node failure. It just did not
  work before... Thanks to @blugowski for the help in locating the problem.
  (issue #176, issue #173)
* Fixed degrade in v0.2.2 on Consumer with no group_id. (issue #166)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants