-
Notifications
You must be signed in to change notification settings - Fork 110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Request errors on overloaded systems may poison the connection pool #550
Comments
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
Not stale. |
Facing this or very similar behaviour under some load in production microservice after upgrading httpx to ( Could reproduce issue with pure httpcore (adapted @vlaci script, `httpcore==0.16.3`)import asyncio
import traceback
from collections import Counter
from socket import socket
import httpcore
extensions = {
"timeout": {
"connect": .2,
"read": .2,
"pool": .2,
"write": .2,
},
# "trace": log,
}
def report(pool):
print("-- Report:", f"--- {pool.connections=}", f"--- {pool._requests=}", sep="\n")
if pool.connections:
conn = pool.connections[0]
print(f"--- {conn=}")
print(f"--- Connection has expired? - {conn.has_expired()}")
print(f"--- Inner connection is {conn._connection}")
async def main():
_socket = socket()
_socket.bind(("localhost", 0))
_socket.listen()
async with httpcore.AsyncConnectionPool(
max_connections=1,
max_keepalive_connections=0,
) as pool:
# Control
await pool.request("GET", "https://en.wikipedia.org/wiki/2023")
print("- Control is OK")
report(pool)
# Saturate pool
num_tasks = 600
url = "http://{}:{}".format(*_socket.getsockname())
results = await asyncio.gather(
*[
asyncio.create_task(pool.request(
"GET",
url,
extensions=extensions,
))
for _ in range(num_tasks)
],
return_exceptions=True,
)
print(f"- Finished saturation\n-- ({num_tasks=}): {dict(Counter(type(res) for res in results))}")
report(pool)
# Control
try:
await pool.request("GET", "https://en.wikipedia.org/wiki/2023", extensions=extensions) # PoolTimeout
# await pool.request("GET", url, extensions=extensions) Expected ReadTimeout, got PoolTimeout
except httpcore.PoolTimeout:
print("- ERROR: Got pool timeout")
traceback.print_exc(chain=True)
else:
print("- No pool timeout!") Some observationsEvery task is trying to acquire connection (`AsyncConnectionPool._attempt_to_acquire_connection` in `AsyncConnectionPool.handle_async_request`).First one (lets name it t1) succeeded and passes through Then t1 can create a real connection in This leads to call Now two things can happen. First one is pool receives new request to some different url. It tries to handle it but hangs until PoolTimeout on Second is pool receives new request to same url as it is in connection wrapper inside pool ("CONNECTING" one). In this case request can't be processed because connection wrapper can handle request but wrapper is not available at the same time (failed is False and connection is None and httpx2 property is False either) I see two approaches:
I tried second approach and it worked well - no pool timeouts and "control" requests are successfull: async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
#...
# Attempt to close CONNECTING connections that noone needs
if self.is_full_pool:
for idx, connection in enumerate(self._pool): # Try to check old connections first
if not connection.is_connecting():
continue
for req_status in self._requests:
if req_status is status: # skip current request
continue
cur_origin = req_status.request.url.origin
if connection.can_handle_request(cur_origin):
break
else: # There is no requests that can be handled by this connection
await connection.aclose()
self._pool.pop(idx)
# print(f"XXX {id(status)} Zombie killed!")
break |
I have recently stumbled on this problem and I think it is a racing condition around the cleanup logic during exceptions and the The situation happens when a pending request timed out (PoolTimeout) immediately after an active request timed out (ReadTimeout or any other exceptions from the connection.handle_async_request) Explanationimagine an active connection A timed out waiting for server response (ReadTimeout), while at the same time (immediately after), the next pending request B timed out waiting for pool (PoolTimeout). a walk through of the events:
Now, since B raised exception and no longer proceed, the connection in the pool was never initialized or deleted. Its Potential SolutionThe root cause of the issue is that the connection handling logic of B is skipped due to PoolTimeout. But in this case, the PoolTimeout is almost meaningless: it was a few milliseconds away, if not less, for request B to make it in time. With that said, I suggest that B continues to execute if it realizes that a connection is assigned in its PoolTimeout exception handling. That is, for line 232-234 in async/connection_pool.py, replace with: async with self._pool_lock:
if status.connection is not None:
connection = status.connection
# continue handling the connection
else:
self._requests.remove(status)
raise exc I have personally tested this in my local setup, and it gave the expected improvement. However, I am not totally familiar with the library and whether this will affect anything unrelated, hopefully not. Anyways, I hope this can be considered or at least help to clear up the mist. Thank you for creating and maintaining this library! |
@tomchristie @florimondmanca apologies for the ping, but can you take a look at the comments here? It seems like an important issue. |
Few thoughts. I think it is safer to handle this way in a separate "except branch" for PoolTimeout only. Also reusing connection after general PoolTimeout exception maybe unsafe and unexpected. Tested your approach locally and it worked, I like it, it's quite simple |
Let me check it. I'll try to reproduce it locally first |
Can confirm that I've also reproduced the issue. I used adapted @nihilSup's example slightly, using `trio`...import trio
import traceback
from collections import Counter
from socket import socket
import httpcore
extensions = {
"timeout": {
"connect": .2,
"read": .2,
"pool": .2,
"write": .2,
},
# "trace": log,
}
def report(pool):
print("-- Report:", f"--- {pool.connections=}", f"--- {pool._requests=}", sep="\n")
if pool.connections:
conn = pool.connections[0]
print(f"--- {conn=}")
print(f"--- Connection has expired? - {conn.has_expired()}")
print(f"--- Inner connection is {conn._connection}")
async def make_request(pool, url, extensions):
try:
r = await pool.request(
"GET",
url,
extensions=extensions,
)
except httpcore.TimeoutException as exc:
print(type(exc))
else:
print(r)
async def main():
_socket = socket()
_socket.bind(("localhost", 0))
_socket.listen()
async with httpcore.AsyncConnectionPool(
max_connections=1,
max_keepalive_connections=0,
) as pool:
# Control
await pool.request("GET", "https://en.wikipedia.org/wiki/2023")
print("- Control is OK")
report(pool)
# Saturate pool
num_tasks = 300
url = "http://{}:{}".format(*_socket.getsockname())
async with trio.open_nursery() as nursery:
for _ in range(num_tasks):
nursery.start_soon(make_request, pool, url, extensions)
print(f"- Finished saturation\n")
report(pool)
# Control
try:
await pool.request("GET", "https://en.wikipedia.org/wiki/2023", extensions=extensions) # PoolTimeout
except httpcore.PoolTimeout:
print("- ERROR: Got pool timeout")
traceback.print_exc(chain=True)
else:
print("- No pool timeout!")
trio.run(main) I was able to confirm the I was also able to confirm the suggested fix, although I'm not yet entirely comfortable with it. It seems a little brittle, and I'd prefer to understand first if we're able to avoid this state. I wonder if we're able to reliably reproduce this state, using a mocked out network backend similar to some of the cases in |
@tomchristie On weekend I prepared draft PR with tests both for sync and async implementations. httpcore/backends/mock.pyclass HangingStream(MockStream):
def read(self, max_bytes: int, timeout: Optional[float] = None) -> bytes:
if self._closed:
raise ReadError("Connection closed")
time.sleep(timeout or 0.1)
raise ReadTimeout
class MockBackend(NetworkBackend):
def __init__(
self,
buffer: typing.List[bytes],
http2: bool = False,
resp_stream_cls: Optional[Type[NetworkStream]] = None,
) -> None:
self._buffer = buffer
self._http2 = http2
self._resp_stream_cls: Type[MockStream] = resp_stream_cls or MockStream
def connect_tcp(
self,
host: str,
port: int,
timeout: Optional[float] = None,
local_address: Optional[str] = None,
) -> NetworkStream:
return self._resp_stream_cls(list(self._buffer), http2=self._http2)
def connect_unix_socket(
self, path: str, timeout: Optional[float] = None
) -> NetworkStream:
return self._resp_stream_cls(list(self._buffer), http2=self._http2)
def sleep(self, seconds: float) -> None:
pass
class AsyncHangingStream(AsyncMockStream):
async def read(self, max_bytes: int, timeout: Optional[float] = None) -> bytes:
if self._closed:
raise ReadError("Connection closed")
await anyio.sleep(timeout or 0.1)
raise ReadTimeout
class AsyncMockBackend(AsyncNetworkBackend):
def __init__(
self,
buffer: typing.List[bytes],
http2: bool = False,
resp_stream_cls: Optional[Type[AsyncNetworkStream]] = None,
) -> None:
self._buffer = buffer
self._http2 = http2
self._resp_stream_cls: Type[AsyncMockStream] = resp_stream_cls or AsyncMockStream
async def connect_tcp(
self,
host: str,
port: int,
timeout: Optional[float] = None,
local_address: Optional[str] = None,
) -> AsyncNetworkStream:
return self._resp_stream_cls(list(self._buffer), http2=self._http2)
async def connect_unix_socket(
self, path: str, timeout: Optional[float] = None
) -> AsyncNetworkStream:
return self._resp_stream_cls(list(self._buffer), http2=self._http2)
async def sleep(self, seconds: float) -> None:
pass tests/_async/test_connection_pool.py@pytest.mark.trio
async def test_pool_under_load():
"""
Pool must remain operational after some peak load.
"""
network_backend = AsyncMockBackend([], resp_stream_cls=AsyncHangingStream)
async def fetch(_pool: AsyncConnectionPool, *exceptions: Type[BaseException]):
with contextlib.suppress(*exceptions):
async with pool.stream(
"GET",
"http://a.com/",
extensions={
"timeout": {
"connect": 0.1,
"read": 0.1,
"pool": 0.1,
"write": 0.1,
},
},
) as response:
await response.aread()
async with AsyncConnectionPool(
max_connections=1, network_backend=network_backend
) as pool:
async with concurrency.open_nursery() as nursery:
for _ in range(300):
# Sending many requests to the same url. All of them but one will have PoolTimeout. One will
# be finished with ReadTimeout
nursery.start_soon(fetch, pool, PoolTimeout, ReadTimeout)
if pool.connections: # There is one connection in pool in "CONNECTING" state
assert pool.connections[0].is_connecting()
with pytest.raises(ReadTimeout): # ReadTimeout indicates that connection could be retrieved
await fetch(pool) tests/_sync/test_connection_pool.pydef test_pool_under_load():
"""
Pool must remain operational after some peak load.
"""
network_backend = MockBackend([], resp_stream_cls=HangingStream)
def fetch(_pool: ConnectionPool, *exceptions: Type[BaseException]):
with contextlib.suppress(*exceptions):
with pool.stream(
"GET",
"http://a.com/",
extensions={
"timeout": {
"connect": 0.1,
"read": 0.1,
"pool": 0.1,
"write": 0.1,
},
},
) as response:
response.read()
with ConnectionPool(
max_connections=1, network_backend=network_backend
) as pool:
with concurrency.open_nursery() as nursery:
for _ in range(300):
# Sending many requests to the same url. All of them but one will have PoolTimeout. One will
# be finished with ReadTimeout
nursery.start_soon(fetch, pool, PoolTimeout, ReadTimeout)
if pool.connections: # There is one connection in pool in "CONNECTING" state
assert pool.connections[0].is_connecting()
with pytest.raises(ReadTimeout): # ReadTimeout indicates that connection could be retrieved
fetch(pool)
|
An observation after I played with your test code locally @nihilSup We can reproduce this zombie situation without under @pytest.mark.trio
async def test_pool_timeout_connection_cleanup():
network_backend = AsyncMockBackend(
[
b"HTTP/1.1 200 OK\r\n",
b"Content-Type: plain/text\r\n",
b"Content-Length: 13\r\n",
b"\r\n",
b"Hello, world!",
]
)
async with AsyncConnectionPool(
network_backend=network_backend, max_connections=2
) as pool:
with pytest.raises(PoolTimeout):
extensions = {"timeout": {"pool": 0}}
await pool.request("GET", "https://example.com/", extensions=extensions)
# wait for a considerable amount of time to make sure all requests time out
await concurrency.sleep(1)
print(pool.connections) # [<AsyncHTTPConnection [CONNECTING]>]
print(pool._requests) # [] bad! both should be empty I think the reason why this achieves the same outcome is because as we set timeout to 0, httpcore/httpcore/_async/connection_pool.py Line 221 in 7eb2022
This could be a more minimal reproduction of the scenario. But the difference is that the connection assignment in real situation happened in another request by the ReadTimeout. |
Good catch, I think this fact shows clearly "missed state" in connection's "state machine". As for unit tests I think we want to simulate some situation, e.g peak load with errors and pool saturation and test that pool can be operational after this, so maybe my test is a little bit more "unbiased" |
Played with a more minimal reproduction by @c25vdw. I think we need it also as separate unit-test to check corner case of 0 PoolTimeout. Also another possible zombie cleanup: httpcore/httpcore/_async/connection_pool.py Lines 228 to 234 in 7eb2022
Replace with thisexcept BaseException as exc:
# If we timeout here, or if the task is cancelled, then make
# sure to remove the request from the queue before bubbling
# up the exception.
async with self._pool_lock:
self._requests.remove(status) # Remove as usual
if (
status.connection and status.connection.is_connecting()
and not any(req.connection is status.connection for req in self._requests)
):
# Zombie cleanup
for idx, conn in enumerate(self._pool):
if conn is status.connection:
self._pool.pop(idx)
break
raise exc This cleanup also based on the same assumption that if we have CONNECTING connection and there is no requests which need it we can safely remove it from pool. |
Sorry for this mess, didn't expect this, passing all the linters in ci took a very long time, so I had to convert the PR to a draft beforehand. |
I think that I'm running into this issue as well. @nihilSup : it looks like you've got all the linting issues sorted out now, right? What's left to move this fix forward and is there anything someone else can do to help? I'm looking at the code but am a little out of my depth here. |
@camlee yes |
Indeed - I've run into this issue in two of my prod codebases and would like to fix ASAP - please let me know what I can do to help. |
I'm going to take a punt and work on the assumption that #688 has closed this for us. |
I experimented with versions Having some long-responding server on I expect that the following code will not raise PoolTimeout, especially when I canceled all the tasks and waited for an additional 5 seconds for cancelation to complete.
My case is probably related to the issue discussed here. |
I'm still seeing this on httpcore 1.0.5. I'm using the repro from the very first post on this issue:
|
Okay, looks like you've got a nice simple reproducible case there, shall we work through that and then consider re-opening? Next step... reduce the case above to just use the |
This issue appears to have been solved for AsyncConnectionPool since 1.0.3 thanks to #880 that removed the chance of a context switch during exception handling. It's still reproducible with the sync connection pool in a multi-threaded scenario. |
It is somewhat related to #502 as we started first observing the below issue in httpcore 0.14.5. Upgrading to 0.14.7 seemingly fixed the issue but we found out that there is still some underlying race condition that can poison the pool, making it unable to serve requests ever again. The issue came up using
httpx==0.22.0 httpcore==0.14.7
, it is not present usinghttpx==0.16.1 httpcore==0.12.3
Here is a script brute forcing the client and reliably triggering the issue on my machine. (The script is written for the sync version, but we are having this issue in production with the async version as well).
Repro script
PARALLELISM
andREQUESTS
may need adjustments to reproduce the issue.After running the above code, the pool is no longer operational, containing a stuck connection:
My hunch is that
RequestStatus.set_connection()
will succeed from_attempt_to_acquire_connection
butstatus.wait_for_connection()
can still time-out on an overloaded system leaving the half-created connection there. Subsequent request attempts won't be able to use this connection instance as it isn'tavailable
, and it won't be cleaned up as it is notclosed
norfailed
.The interesting about this situation is that a just crafted connection can be put in the pool without guaranteeing that it will be used during the current requests.
The text was updated successfully, but these errors were encountered: