Skip to content

Commit

Permalink
Add wait tests for other sections
Browse files Browse the repository at this point in the history
  • Loading branch information
MadLittleMods committed Jul 22, 2024
1 parent 9043180 commit d48fab2
Showing 1 changed file with 313 additions and 0 deletions.
313 changes: 313 additions & 0 deletions tests/rest/client/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
)
from tests.server import FakeChannel, TimedOutException
from tests.test_utils.event_injection import mark_event_as_partial_state
from tests.unittest import skip_unless

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1251,6 +1252,8 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
self.storage_controllers = hs.get_storage_controllers()
self.account_data_handler = hs.get_account_data_handler()
self.notifier = hs.get_notifier()

def _assertRequiredStateIncludes(
self,
Expand Down Expand Up @@ -1376,6 +1379,52 @@ def _create_dm_room(

return room_id

def _bump_notifier_wait_for_events(self, user_id: str) -> None:
"""
Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
Sync results.
"""
# We're expecting some new activity from this point onwards
from_token = self.event_sources.get_current_token()

triggered_notifier_wait_for_events = False

async def _on_new_acivity(
before_token: StreamToken, after_token: StreamToken
) -> bool:
nonlocal triggered_notifier_wait_for_events
triggered_notifier_wait_for_events = True
return True

# Listen for some new activity for the user. We're just trying to confirm that
# our bump below actually does what we think it does (triggers new activity for
# the user).
result_awaitable = self.notifier.wait_for_events(
user_id,
1000,
_on_new_acivity,
from_token=from_token,
)

# Update the account data so that `notifier.wait_for_events(...)` wakes up.
# We're bumping account data because it won't show up in the Sliding Sync
# response so it won't affect whether we have results.
self.get_success(
self.account_data_handler.add_account_data_for_user(
user_id,
"org.matrix.foobarbaz",
{"foo": "bar"},
)
)

# Wait for our notifier result
self.get_success(result_awaitable)

if not triggered_notifier_wait_for_events:
raise AssertionError(
"Expected `notifier.wait_for_events(...)` to be triggered"
)

def test_sync_list(self) -> None:
"""
Test that room IDs show up in the Sliding Sync `lists`
Expand Down Expand Up @@ -1481,6 +1530,124 @@ def test_wait_for_sync_token(self) -> None:
# with because we weren't able to find anything new yet.
self.assertEqual(channel.json_body["pos"], future_position_token_serialized)

def test_wait_for_new_data(self) -> None:
"""
Test to make sure that the Sliding Sync request waits for new data to arrive.
(Only applies to incremental syncs with a `timeout` specified)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")

room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id, user1_id, tok=user1_tok)

from_token = self.event_sources.get_current_token()

# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint
+ "?timeout=10000"
+ f"&pos={self.get_success(from_token.to_string(self.store))}",
{
"lists": {
"foo-list": {
"ranges": [[0, 0]],
"required_state": [],
"timeline_limit": 1,
}
}
},
access_token=user1_tok,
await_result=False,
)
# Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=5000)
# Bump the room with new events to trigger new results
event_response1 = self.helper.send(
room_id, "new activity in room", tok=user1_tok
)
# Should respond before the 10 second timeout
channel.await_result(timeout_ms=3000)
self.assertEqual(channel.code, 200, channel.json_body)

# Check to make sure the new event is returned
self.assertEqual(
[
event["event_id"]
for event in channel.json_body["rooms"][room_id]["timeline"]
],
[
event_response1["event_id"],
],
channel.json_body["rooms"][room_id]["timeline"],
)

# TODO: Once we remove `ops`, we should be able to add a `RoomResult.__bool__` to
# check if there are any updates since the `from_token`.
@skip_unless(
False,
"Once we remove ops from the Sliding Sync response, this test should pass",
)
def test_wait_for_new_data_timeout(self) -> None:
"""
Test to make sure that the Sliding Sync request waits for new data to arrive but
no data ever arrives so we timeout. We're also making sure that the default data
doesn't trigger a false-positive for new data.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")

room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
self.helper.join(room_id, user1_id, tok=user1_tok)

from_token = self.event_sources.get_current_token()

# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint
+ "?timeout=10000"
+ f"&pos={self.get_success(from_token.to_string(self.store))}",
{
"lists": {
"foo-list": {
"ranges": [[0, 0]],
"required_state": [],
"timeline_limit": 1,
}
}
},
access_token=user1_tok,
await_result=False,
)
# Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=5000)
# Wake-up `notifier.wait_for_events(...)` that will cause us test
# `SlidingSyncResult.__bool__` for new results.
self._bump_notifier_wait_for_events(user1_id)
# Block for a little bit more to ensure we don't see any new results.
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=4000)
# Wait for the sync to complete (wait for the rest of the 10 second timeout,
# 5000 + 4000 + 1200 > 10000)
channel.await_result(timeout_ms=1200)
self.assertEqual(channel.code, 200, channel.json_body)

# We still see rooms because that's how Sliding Sync lists work but we reached
# the timeout before seeing them
self.assertEqual(
[event["event_id"] for event in channel.json_body["rooms"].keys()],
[room_id],
)

def test_filter_list(self) -> None:
"""
Test that filters apply to `lists`
Expand Down Expand Up @@ -4245,10 +4412,59 @@ def default_config(self) -> JsonDict:

def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.event_sources = hs.get_event_sources()
self.account_data_handler = hs.get_account_data_handler()
self.notifier = hs.get_notifier()
self.sync_endpoint = (
"/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
)

def _bump_notifier_wait_for_events(self, user_id: str) -> None:
"""
Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
Sync results.
"""
# We're expecting some new activity from this point onwards
from_token = self.event_sources.get_current_token()

triggered_notifier_wait_for_events = False

async def _on_new_acivity(
before_token: StreamToken, after_token: StreamToken
) -> bool:
nonlocal triggered_notifier_wait_for_events
triggered_notifier_wait_for_events = True
return True

# Listen for some new activity for the user. We're just trying to confirm that
# our bump below actually does what we think it does (triggers new activity for
# the user).
result_awaitable = self.notifier.wait_for_events(
user_id,
1000,
_on_new_acivity,
from_token=from_token,
)

# Update the account data so that `notifier.wait_for_events(...)` wakes up.
# We're bumping account data because it won't show up in the Sliding Sync
# response so it won't affect whether we have results.
self.get_success(
self.account_data_handler.add_account_data_for_user(
user_id,
"org.matrix.foobarbaz",
{"foo": "bar"},
)
)

# Wait for our notifier result
self.get_success(result_awaitable)

if not triggered_notifier_wait_for_events:
raise AssertionError(
"Expected `notifier.wait_for_events(...)` to be triggered"
)

def _assert_to_device_response(
self, channel: FakeChannel, expected_messages: List[JsonDict]
) -> str:
Expand Down Expand Up @@ -4413,6 +4629,103 @@ def test_data_incremental_sync(self) -> None:
)
self._assert_to_device_response(channel, [])

def test_wait_for_new_data(self) -> None:
"""
Test to make sure that the Sliding Sync request waits for new data to arrive.
(Only applies to incremental syncs with a `timeout` specified)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass", "d1")
user2_id = self.register_user("u2", "pass")
user2_tok = self.login(user2_id, "pass", "d2")

from_token = self.event_sources.get_current_token()

# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint
+ "?timeout=10000"
+ f"&pos={self.get_success(from_token.to_string(self.store))}",
{
"lists": {},
"extensions": {
"to_device": {
"enabled": True,
}
},
},
access_token=user1_tok,
await_result=False,
)
# Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=5000)
# Bump the to-device messages to trigger new results
test_msg = {"foo": "bar"}
send_to_device_channel = self.make_request(
"PUT",
"/_matrix/client/r0/sendToDevice/m.test/1234",
content={"messages": {user1_id: {"d1": test_msg}}},
access_token=user2_tok,
)
self.assertEqual(
send_to_device_channel.code, 200, send_to_device_channel.result
)
# Should respond before the 10 second timeout
channel.await_result(timeout_ms=3000)
self.assertEqual(channel.code, 200, channel.json_body)

self._assert_to_device_response(
channel,
[{"content": test_msg, "sender": user2_id, "type": "m.test"}],
)

def test_wait_for_new_data_timeout(self) -> None:
"""
Test to make sure that the Sliding Sync request waits for new data to arrive but
no data ever arrives so we timeout. We're also making sure that the default data
from the To-Device extension doesn't trigger a false-positive for new data.
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

from_token = self.event_sources.get_current_token()

# Make the Sliding Sync request
channel = self.make_request(
"POST",
self.sync_endpoint
+ "?timeout=10000"
+ f"&pos={self.get_success(from_token.to_string(self.store))}",
{
"lists": {},
"extensions": {
"to_device": {
"enabled": True,
}
},
},
access_token=user1_tok,
await_result=False,
)
# Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=5000)
# Wake-up `notifier.wait_for_events(...)` that will cause us test
# `SlidingSyncResult.__bool__` for new results.
self._bump_notifier_wait_for_events(user1_id)
# Block for a little bit more to ensure we don't see any new results.
with self.assertRaises(TimedOutException):
channel.await_result(timeout_ms=4000)
# Wait for the sync to complete (wait for the rest of the 10 second timeout,
# 5000 + 4000 + 1200 > 10000)
channel.await_result(timeout_ms=1200)
self.assertEqual(channel.code, 200, channel.json_body)

self._assert_to_device_response(channel, [])


class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
"""Tests for the e2ee sliding sync extension"""
Expand Down

0 comments on commit d48fab2

Please sign in to comment.