-
-
Notifications
You must be signed in to change notification settings - Fork 553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix cleanup logic in graphql_ws protocol handler #3778
Fix cleanup logic in graphql_ws protocol handler #3778
Conversation
Reviewer's Guide by SourceryThis pull request enhances cleanup logic in the graphql_ws protocol handler by ensuring that all task subscriptions are fully cancelled during cleanup. It also refactors the echo subscription to accurately track active subscriptions using a counter and updates tests to verify that no residual subscription tasks remain after client disconnects. Sequence diagram for cleanup logic in graphql_ws handlersequenceDiagram
participant H as GraphQLWSHandler
%% When handling disconnect, the handler calls cleanup()
H->>H: handle()
H->>H: cleanup()
loop For each operation_id in tasks
H->>H: cleanup_operation(operation_id)
end
Note right of H: Ensures cancellation of all active tasks
GraphQLWSHandler class diagram with updated cleanup logicclassDiagram
class GraphQLWSHandler {
- subscriptions: dict
- tasks: dict
+ handle(): None
+ handle_message(...): None
+ cleanup(): None
+ cleanup_operation(operation_id: str): None
+ send_data_message(execution_result: ExecutionResult, operation_id: str): None
}
note for GraphQLWSHandler "Enhanced cleanup logic: cleanup() iterates over tasks
and calls cleanup_operation() for proper cancellation of subscriptions."
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Thanks for adding the Here's a preview of the changelog: This release fixes the issue that some subscription resolvers were not canceled if a client unexpectedly disconnected. Here's the tweet text:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jakub-bacic - I've reviewed your changes - here's some feedback:
Overall Comments:
- Consider adding a comment explaining why
cleanup_operation
is called for eachoperation_id
incleanup
.
Here's what I looked at during the review
- 🟢 General issues: all looks good
- 🟢 Security: all looks good
- 🟡 Testing: 1 issue found
- 🟢 Complexity: all looks good
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
@@ -1113,3 +1113,4 @@ async def test_unexpected_client_disconnects_are_gracefully_handled( | |||
await ws.close() | |||
await asyncio.sleep(1) | |||
assert not process_errors.called | |||
assert Subscription.active_echo_subscriptions == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Add a test case for multiple subscriptions.
It would be beneficial to add a test case where multiple subscriptions are active and the client disconnects unexpectedly. This would ensure that the cleanup logic correctly handles multiple active subscriptions.
Suggested implementation:
from tests.views.schema import MyExtension, Schema, Subscription
import asyncio
import pytest
@pytest.mark.asyncio
async def test_multiple_subscriptions_cleanup(ws_client: DebuggableGraphQLTransportWSHandler):
# Submit two subscriptions to simulate multiple active subscriptions.
subscription_query = "subscription { echo }"
await ws_client.send_subscription(subscription_query)
await ws_client.send_subscription(subscription_query)
# Verify that there are two active subscriptions.
assert Subscription.active_echo_subscriptions == 2
# Simulate an unexpected disconnect.
await ws_client.close()
await asyncio.sleep(1)
# Verify that the cleanup logic has removed all active subscriptions.
assert Subscription.active_echo_subscriptions == 0
Ensure that the ws_client used in the test has a method "send_subscription" or update the subscription sending mechanism accordingly to align with your test infrastructure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix.
I don't think the tests really proves something tho.
you never assert
assert Subscription.active_echo_subscriptions > 0
I think the way to go is the to add another query for subscription count, then query for that before and after the subscription.
The test fails on that assertion if you revert the fix in cleanup logic so I think it does prove the changes are needed 😅. Adding additional GQL query just for checking that sounds a little bit extreme to me but it's definitely doable if we want to go that way. Alternatively, we can include two assertions (before and after --- a/tests/websockets/test_graphql_ws.py
+++ b/tests/websockets/test_graphql_ws.py
@@ -806,12 +806,16 @@ async def test_unexpected_client_disconnects_are_gracefully_handled(
"type": "start",
"id": "sub1",
"payload": {
- "query": 'subscription { echo(message: "Hi", delay: 0.5) }',
+ "query": 'subscription { echo(message: "Hi", delay: 2) }',
},
}
)
+ await asyncio.sleep(0.5)
+ assert Subscription.active_echo_subscriptions == 1
+
await ws.close()
- await asyncio.sleep(1)
+ await asyncio.sleep(0.5)
+
assert not process_errors.called
assert Subscription.active_echo_subscriptions == 0 Any thoughts? |
Seems flaky IMO 🙈 |
The original test also relies on If I understood you correctly, your point was that if we assert class Subscription:
echo_subscription_calls = []
@strawberry.subscription
async def echo(self, message: str, delay: float = 0) -> AsyncGenerator[str, None]:
try:
Subscription.echo_subscription_calls.append(("before", message, delay))
await asyncio.sleep(delay)
yield message
finally:
Subscription.echo_subscription_calls.append(("after", message, delay))
...
# in tests
assert Subscription.echo_subscription_calls == [
("before", "Hi", 0.5),
("after", "Hi", 0.5),
] Other attempts I tried:
|
@@ -202,6 +201,10 @@ async def cleanup_operation(self, operation_id: str) -> None: | |||
await self.tasks[operation_id] | |||
del self.tasks[operation_id] | |||
|
|||
async def cleanup(self) -> None: | |||
for operation_id in list(self.tasks.keys()): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: is it ok to just do this for self.tasks
and not for self.subscriptions
anymore? Wondering if the correct thing would be to do:
for operation_id in list(self.tasks.keys()): | |
for operation_id in set(self.tasks) | set(self.subscriptions): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can see, each subscription has a matching task with the same operation_id
. However, the opposite isn't true - not every task has a matching subscription entry (that was the root cause of this issue and why a simple switch from self.subscriptions
to self.tasks
fixed that).
Task is added in handle_start
:
result_handler = self.handle_async_results(
operation_id, query, operation_name, variables
)
self.tasks[operation_id] = asyncio.create_task(result_handler)
Meanwhile, the subscription entry is added later in handle_async_results
, only after receiving the first value from the asynchronous generator:
agen_or_err = await self.schema.subscribe(
query=query,
variable_values=variables,
operation_name=operation_name,
context_value=self.context,
root_value=self.root_value,
)
if isinstance(agen_or_err, PreExecutionError):
assert agen_or_err.errors
await self.send_message(
{
"type": "error",
"id": operation_id,
"payload": agen_or_err.errors[0].formatted,
}
)
else:
self.subscriptions[operation_id] = agen_or_err
That's why if cleanup is executed before the subscription resolver yields any value, the task will have no matching entry in self.subscriptions
yet.
Additionally, it's safe to call cleanup_operation
for each task because it checks internally if the given operation exists in self.subscriptions
before attempting to delete it.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #3778 +/- ##
=======================================
Coverage 97.23% 97.23%
=======================================
Files 503 503
Lines 33529 33542 +13
Branches 1716 1716
=======================================
+ Hits 32603 32616 +13
Misses 707 707
Partials 219 219 |
CodSpeed Performance ReportMerging #3778 will not alter performanceComparing Summary
|
Other than #3778 (comment), which has already been answered, I think in general this LGTM @nrbnlulu any other considerations? @patrick91 @DoctorJohn, do you want to take a look as well? |
test case still doesn't prove anything... |
I wouldn’t say it proves nothing - the test fails on the original code but passes now, meaning it at least reproduces the issue. Adding a GQL query just to fetch the number of active subscriptions feels like overkill, and I’m not sure it will resolve the issue 🤔. Won’t it still be a race condition? Do we have a guarantee that the echo subscription increments the counter before the query fetches its value? Is there anything other than the testing part you're concerned about? I can revisit this next Wednesday. I’ll explore a few alternatives and keep you posted :) |
I don't disagree.. I just say that it would be better for both readability and robustness.
Thats just my personal opinion.. I prefer query since the schema could (theoretically) be served in another process / thread).. but as I said thats just my non-obligating opinion... BTW if you don't have time I can add that test. LMK if you need help. |
9d6f9d8
to
0f8868f
Compare
for more information, see https://pre-commit.ci
I pushed the improved version. I used LMK if that looks good enough now :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, thank you!
Thanks for contributing to Strawberry! 🎉 You've been invited to join You can also request a free sticker by filling this form: https://forms.gle/dmnfQUPoY5gZbVT67 And don't forget to join our discord server: https://strawberry.rocks/discord 🔥 |
Description
Ensure all tasks are canceled when performing cleanup in subscription handlers when
graphql_ws
protocol is used.Types of Changes
Issues Fixed or Closed by This PR
Checklist
Summary by Sourcery
Fix cleanup logic for GraphQL subscriptions using the
graphql_ws
protocol to ensure all tasks are cancelled during cleanup.Bug Fixes:
graphql_ws
protocol handler.Tests: