Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cleanup logic in graphql_ws protocol handler #3778

Merged

Conversation

jakub-bacic
Copy link
Contributor

@jakub-bacic jakub-bacic commented Feb 11, 2025

Description

Ensure all tasks are canceled when performing cleanup in subscription handlers when graphql_ws protocol is used.

Types of Changes

  • Core
  • Bugfix
  • New feature
  • Enhancement/optimization
  • Documentation

Issues Fixed or Closed by This PR

Checklist

  • My code follows the code style of this project.
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I have read the CONTRIBUTING document.
  • I have added tests to cover my changes.
  • I have tested the changes and verified that they work and don't break anything (as well as I can manage).

Summary by Sourcery

Fix cleanup logic for GraphQL subscriptions using the graphql_ws protocol to ensure all tasks are cancelled during cleanup.

Bug Fixes:

  • Fixed a bug where subscription tasks were not cancelled when cleaning up in graphql_ws protocol handler.

Tests:

  • Added tests to verify that all active subscriptions are cancelled when a client disconnects unexpectedly.

Copy link
Contributor

sourcery-ai bot commented Feb 11, 2025

Reviewer's Guide by Sourcery

This pull request enhances cleanup logic in the graphql_ws protocol handler by ensuring that all task subscriptions are fully cancelled during cleanup. It also refactors the echo subscription to accurately track active subscriptions using a counter and updates tests to verify that no residual subscription tasks remain after client disconnects.

Sequence diagram for cleanup logic in graphql_ws handler

sequenceDiagram
    participant H as GraphQLWSHandler
    %% When handling disconnect, the handler calls cleanup()
    H->>H: handle()
    H->>H: cleanup()
    loop For each operation_id in tasks
        H->>H: cleanup_operation(operation_id)
    end
    Note right of H: Ensures cancellation of all active tasks
Loading

GraphQLWSHandler class diagram with updated cleanup logic

classDiagram
    class GraphQLWSHandler {
        - subscriptions: dict
        - tasks: dict
        + handle(): None
        + handle_message(...): None
        + cleanup(): None
        + cleanup_operation(operation_id: str): None
        + send_data_message(execution_result: ExecutionResult, operation_id: str): None
    }

    note for GraphQLWSHandler "Enhanced cleanup logic: cleanup() iterates over tasks 
    and calls cleanup_operation() for proper cancellation of subscriptions."
Loading

File-Level Changes

Change Details Files
Enhanced the echo subscription to track active subscriptions.
  • Introduced an 'active_echo_subscriptions' counter in the Subscription type.
  • Wrapped the echo subscription logic in a try-finally block to increment and then decrement the counter.
tests/views/schema.py
Refactored cleanup logic in the graphql_ws protocol handler.
  • Removed the per-operation cleanup loop from the handle method.
  • Introduced a new 'cleanup' method that iterates over all active tasks and cleans them up by invoking cleanup_operation for each.
strawberry/subscriptions/protocols/graphql_ws/handlers.py
Updated tests to ensure proper subscription cleanup.
  • Modified tests to import the updated Subscription type.
  • Added assertions to verify that the active_echo_subscriptions counter is zero after client disconnects for both websocket and graphql transport tests.
tests/websockets/test_graphql_transport_ws.py
tests/websockets/test_graphql_ws.py

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!
  • Generate a plan of action for an issue: Comment @sourcery-ai plan on
    an issue to generate a plan of action for it.

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@botberry
Copy link
Member

botberry commented Feb 11, 2025

Thanks for adding the RELEASE.md file!

Here's a preview of the changelog:


This release fixes the issue that some subscription resolvers were not canceled if a client unexpectedly disconnected.

Here's the tweet text:

🆕 Release (next) is out! Thanks to Jakub Bacic for the PR 👏

Get it here 👉 https://strawberry.rocks/release/(next)

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @jakub-bacic - I've reviewed your changes - here's some feedback:

Overall Comments:

  • Consider adding a comment explaining why cleanup_operation is called for each operation_id in cleanup.
Here's what I looked at during the review
  • 🟢 General issues: all looks good
  • 🟢 Security: all looks good
  • 🟡 Testing: 1 issue found
  • 🟢 Complexity: all looks good
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@@ -1113,3 +1113,4 @@ async def test_unexpected_client_disconnects_are_gracefully_handled(
await ws.close()
await asyncio.sleep(1)
assert not process_errors.called
assert Subscription.active_echo_subscriptions == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add a test case for multiple subscriptions.

It would be beneficial to add a test case where multiple subscriptions are active and the client disconnects unexpectedly. This would ensure that the cleanup logic correctly handles multiple active subscriptions.

Suggested implementation:

from tests.views.schema import MyExtension, Schema, Subscription
import asyncio
import pytest

@pytest.mark.asyncio
async def test_multiple_subscriptions_cleanup(ws_client: DebuggableGraphQLTransportWSHandler):
    # Submit two subscriptions to simulate multiple active subscriptions.
    subscription_query = "subscription { echo }"
    await ws_client.send_subscription(subscription_query)
    await ws_client.send_subscription(subscription_query)

    # Verify that there are two active subscriptions.
    assert Subscription.active_echo_subscriptions == 2

    # Simulate an unexpected disconnect.
    await ws_client.close()
    await asyncio.sleep(1)

    # Verify that the cleanup logic has removed all active subscriptions.
    assert Subscription.active_echo_subscriptions == 0

Ensure that the ws_client used in the test has a method "send_subscription" or update the subscription sending mechanism accordingly to align with your test infrastructure.

Copy link
Member

@nrbnlulu nrbnlulu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix.
I don't think the tests really proves something tho.
you never assert
assert Subscription.active_echo_subscriptions > 0

I think the way to go is the to add another query for subscription count, then query for that before and after the subscription.

@jakub-bacic
Copy link
Contributor Author

The test fails on that assertion if you revert the fix in cleanup logic so I think it does prove the changes are needed 😅.

Adding additional GQL query just for checking that sounds a little bit extreme to me but it's definitely doable if we want to go that way. Alternatively, we can include two assertions (before and after ws.close) like this:

--- a/tests/websockets/test_graphql_ws.py
+++ b/tests/websockets/test_graphql_ws.py
@@ -806,12 +806,16 @@ async def test_unexpected_client_disconnects_are_gracefully_handled(
                 "type": "start",
                 "id": "sub1",
                 "payload": {
-                    "query": 'subscription { echo(message: "Hi", delay: 0.5) }',
+                    "query": 'subscription { echo(message: "Hi", delay: 2) }',
                 },
             }
         )
 
+        await asyncio.sleep(0.5)
+        assert Subscription.active_echo_subscriptions == 1
+
         await ws.close()
-        await asyncio.sleep(1)
+        await asyncio.sleep(0.5)
+
         assert not process_errors.called
         assert Subscription.active_echo_subscriptions == 0

Any thoughts?

@nrbnlulu
Copy link
Member

The test fails on that assertion if you revert the fix in cleanup logic so I think it does prove the changes are needed 😅.

Adding additional GQL query just for checking that sounds a little bit extreme to me but it's definitely doable if we want to go that way. Alternatively, we can include two assertions (before and after ws.close) like this:

--- a/tests/websockets/test_graphql_ws.py
+++ b/tests/websockets/test_graphql_ws.py
@@ -806,12 +806,16 @@ async def test_unexpected_client_disconnects_are_gracefully_handled(
                 "type": "start",
                 "id": "sub1",
                 "payload": {
-                    "query": 'subscription { echo(message: "Hi", delay: 0.5) }',
+                    "query": 'subscription { echo(message: "Hi", delay: 2) }',
                 },
             }
         )
 
+        await asyncio.sleep(0.5)
+        assert Subscription.active_echo_subscriptions == 1
+
         await ws.close()
-        await asyncio.sleep(1)
+        await asyncio.sleep(0.5)
+
         assert not process_errors.called
         assert Subscription.active_echo_subscriptions == 0

Any thoughts?

Seems flaky IMO 🙈

@jakub-bacic
Copy link
Contributor Author

The original test also relies on sleep function but I see your point - that change won't make it better.

If I understood you correctly, your point was that if we assert active_echo_subscription == 0 at the end of the test without checking if it's > 0 in the middle, we risk false positive (cause the initial/default value is also 0)? If so, the simplest solution to that problem is to replace "active subscription counter" with "full log of lifecycle events":

class Subscription:
     echo_subscription_calls = []

    @strawberry.subscription
    async def echo(self, message: str, delay: float = 0) -> AsyncGenerator[str, None]:
        try:
            Subscription.echo_subscription_calls.append(("before", message, delay))
            await asyncio.sleep(delay)
            yield message
        finally:
            Subscription.echo_subscription_calls.append(("after", message, delay))
            
...
# in tests

assert Subscription.echo_subscription_calls == [
  ("before", "Hi", 0.5),
  ("after", "Hi", 0.5),
]

Other attempts I tried:

  • patch echo subscription resolver so we don't have to modify Subscription class
    • mock.patch.object("Subscription", "echo", func) didn't have any effects when applied in the test
  • use on_operation hook in extension to track subscription lifecycle
    • when I added that hook to MyExtension, it seems to be executed only for ChannelsHttpClient (is that a known issue?)

@@ -202,6 +201,10 @@ async def cleanup_operation(self, operation_id: str) -> None:
await self.tasks[operation_id]
del self.tasks[operation_id]

async def cleanup(self) -> None:
for operation_id in list(self.tasks.keys()):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: is it ok to just do this for self.tasks and not for self.subscriptions anymore? Wondering if the correct thing would be to do:

Suggested change
for operation_id in list(self.tasks.keys()):
for operation_id in set(self.tasks) | set(self.subscriptions):

Copy link
Contributor Author

@jakub-bacic jakub-bacic Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see, each subscription has a matching task with the same operation_id. However, the opposite isn't true - not every task has a matching subscription entry (that was the root cause of this issue and why a simple switch from self.subscriptions to self.tasks fixed that).

Task is added in handle_start:

        result_handler = self.handle_async_results(
            operation_id, query, operation_name, variables
        )
        self.tasks[operation_id] = asyncio.create_task(result_handler)

Meanwhile, the subscription entry is added later in handle_async_results, only after receiving the first value from the asynchronous generator:

            agen_or_err = await self.schema.subscribe(
                query=query,
                variable_values=variables,
                operation_name=operation_name,
                context_value=self.context,
                root_value=self.root_value,
            )
            if isinstance(agen_or_err, PreExecutionError):
                assert agen_or_err.errors
                await self.send_message(
                    {
                        "type": "error",
                        "id": operation_id,
                        "payload": agen_or_err.errors[0].formatted,
                    }
                )
            else:
                self.subscriptions[operation_id] = agen_or_err

That's why if cleanup is executed before the subscription resolver yields any value, the task will have no matching entry in self.subscriptions yet.

Additionally, it's safe to call cleanup_operation for each task because it checks internally if the given operation exists in self.subscriptions before attempting to delete it.

Copy link

codecov bot commented Feb 12, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 97.23%. Comparing base (951e56c) to head (645fe89).
Report is 1 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #3778   +/-   ##
=======================================
  Coverage   97.23%   97.23%           
=======================================
  Files         503      503           
  Lines       33529    33542   +13     
  Branches     1716     1716           
=======================================
+ Hits        32603    32616   +13     
  Misses        707      707           
  Partials      219      219           

Copy link

codspeed-hq bot commented Feb 12, 2025

CodSpeed Performance Report

Merging #3778 will not alter performance

Comparing jakub-bacic:3777-subscription-mem-leak (645fe89) with main (951e56c)

Summary

✅ 21 untouched benchmarks

@bellini666
Copy link
Member

Other than #3778 (comment), which has already been answered, I think in general this LGTM

@nrbnlulu any other considerations?

@patrick91 @DoctorJohn, do you want to take a look as well?

@nrbnlulu
Copy link
Member

nrbnlulu commented Feb 13, 2025

test case still doesn't prove anything...
I had my 2 cents on how the test should be implemented.
but ig how @jakub-bacic suggested is also possible but a bit hacky IMO.

@jakub-bacic
Copy link
Contributor Author

jakub-bacic commented Feb 14, 2025

test case still doesn't prove anything... I had my 2 cents on how the test should be implemented. bug ig how @jakub-bacic suggested is also possible but a bit hacky IMO.

I wouldn’t say it proves nothing - the test fails on the original code but passes now, meaning it at least reproduces the issue.

Adding a GQL query just to fetch the number of active subscriptions feels like overkill, and I’m not sure it will resolve the issue 🤔. Won’t it still be a race condition? Do we have a guarantee that the echo subscription increments the counter before the query fetches its value?

Is there anything other than the testing part you're concerned about?

I can revisit this next Wednesday. I’ll explore a few alternatives and keep you posted :)

@nrbnlulu
Copy link
Member

nrbnlulu commented Feb 15, 2025

I wouldn’t say it proves nothing - the test fails on the original code but passes now, meaning it at least reproduces the issue.

I don't disagree.. I just say that it would be better for both readability and robustness.

Adding a GQL query just to fetch the number of active subscriptions feels like overkill, and I’m not sure it will resolve the issue 🤔. Won’t it still be a race condition? Do we have a guarantee that the echo subscription increments the counter before the query fetches its value?

Thats just my personal opinion.. I prefer query since the schema could (theoretically) be served in another process / thread).. but as I said thats just my non-obligating opinion...

BTW if you don't have time I can add that test. LMK if you need help.

@jakub-bacic jakub-bacic force-pushed the 3777-subscription-mem-leak branch from 9d6f9d8 to 0f8868f Compare February 20, 2025 12:16
@jakub-bacic
Copy link
Contributor Author

jakub-bacic commented Feb 20, 2025

I pushed the improved version. I used infinity subscription instead of echo so we can actually wait for the first value returned from it (see the extra await ws.receive_json() line) to deterministically assert the value of active_infinity_subscriptions (before and after as suggested by @nrbnlulu).

LMK if that looks good enough now :)

Copy link
Member

@patrick91 patrick91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, thank you!

@patrick91 patrick91 merged commit 33a5fbd into strawberry-graphql:main Feb 27, 2025
94 of 95 checks passed
@botberry
Copy link
Member

Thanks for contributing to Strawberry! 🎉 You've been invited to join
the Strawberry GraphQL organisation 😊

You can also request a free sticker by filling this form: https://forms.gle/dmnfQUPoY5gZbVT67

And don't forget to join our discord server: https://strawberry.rocks/discord 🔥

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Memory leak in subscriptions
5 participants