From 99c08a1db501b7dafbb8d08b09eca6bb4ecdcbfb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Oct 2020 10:30:58 +0100 Subject: [PATCH 1/3] Only send RDATA for instance local events. When pulling events out of the DB to send over replication we were not filtering by instance name, and so we were sending events for other intsances. --- synapse/replication/tcp/streams/events.py | 6 ++--- synapse/storage/databases/main/events.py | 12 +++++---- .../storage/databases/main/events_worker.py | 25 +++++++++++-------- .../delta/58/20instance_name_event_tables.sql | 17 +++++++++++++ 4 files changed, 42 insertions(+), 18 deletions(-) create mode 100644 synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index ccc7ca30d8a9..82e9e0d64ece 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -155,7 +155,7 @@ async def _update_function( # now we fetch up to that many rows from the events table event_rows = await self._store.get_all_new_forward_event_rows( - from_token, current_token, target_row_count + instance_name, from_token, current_token, target_row_count ) # type: List[Tuple] # we rely on get_all_new_forward_event_rows strictly honouring the limit, so @@ -180,7 +180,7 @@ async def _update_function( upper_limit, state_rows_limited, ) = await self._store.get_all_updated_current_state_deltas( - from_token, upper_limit, target_row_count + instance_name, from_token, upper_limit, target_row_count ) limited = limited or state_rows_limited @@ -189,7 +189,7 @@ async def _update_function( # not to bother with the limit. ex_outliers_rows = await self._store.get_ex_outlier_stream_rows( - from_token, upper_limit + instance_name, from_token, upper_limit ) # type: List[Tuple] # we now need to turn the raw database rows returned into tuples suitable diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b4abd961b97f..b19c424ba952 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -426,12 +426,12 @@ def _update_current_state_txn( # so that async background tasks get told what happened. sql = """ INSERT INTO current_state_delta_stream - (stream_id, room_id, type, state_key, event_id, prev_event_id) - SELECT ?, room_id, type, state_key, null, event_id + (stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id) + SELECT ?, ?, room_id, type, state_key, null, event_id FROM current_state_events WHERE room_id = ? """ - txn.execute(sql, (stream_id, room_id)) + txn.execute(sql, (stream_id, self._instance_name, room_id)) self.db_pool.simple_delete_txn( txn, table="current_state_events", keyvalues={"room_id": room_id}, @@ -452,8 +452,8 @@ def _update_current_state_txn( # sql = """ INSERT INTO current_state_delta_stream - (stream_id, room_id, type, state_key, event_id, prev_event_id) - SELECT ?, ?, ?, ?, ?, ( + (stream_id, instance_name, room_id, type, state_key, event_id, prev_event_id) + SELECT ?, ?, ?, ?, ?, ?, ( SELECT event_id FROM current_state_events WHERE room_id = ? AND type = ? AND state_key = ? ) @@ -463,6 +463,7 @@ def _update_current_state_txn( ( ( stream_id, + self._instance_name, room_id, etype, state_key, @@ -755,6 +756,7 @@ def _update_outliers_txn(self, txn, events_and_contexts): "event_stream_ordering": stream_order, "event_id": event.event_id, "state_group": state_group_id, + "instance_name": self._instance_name, }, ) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b7ed8ca6ab06..b4be6d9d17fc 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1043,7 +1043,7 @@ def get_current_events_token(self): return self._stream_id_gen.get_current_token() async def get_all_new_forward_event_rows( - self, last_id: int, current_id: int, limit: int + self, instance_name: str, last_id: int, current_id: int, limit: int ) -> List[Tuple]: """Returns new events, for the Events replication stream @@ -1067,10 +1067,11 @@ def get_all_new_forward_event_rows(txn): " LEFT JOIN state_events USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " WHERE ? < stream_ordering AND stream_ordering <= ?" + " AND instance_name = ?" " ORDER BY stream_ordering ASC" " LIMIT ?" ) - txn.execute(sql, (last_id, current_id, limit)) + txn.execute(sql, (last_id, current_id, instance_name, limit)) return txn.fetchall() return await self.db_pool.runInteraction( @@ -1078,7 +1079,7 @@ def get_all_new_forward_event_rows(txn): ) async def get_ex_outlier_stream_rows( - self, last_id: int, current_id: int + self, instance_name: str, last_id: int, current_id: int ) -> List[Tuple]: """Returns de-outliered events, for the Events replication stream @@ -1097,16 +1098,17 @@ def get_ex_outlier_stream_rows_txn(txn): "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," " state_key, redacts, relates_to_id" " FROM events AS e" - " INNER JOIN ex_outlier_stream USING (event_id)" + " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " WHERE ? < event_stream_ordering" " AND event_stream_ordering <= ?" + " AND out.instance_name = ?" " ORDER BY event_stream_ordering ASC" ) - txn.execute(sql, (last_id, current_id)) + txn.execute(sql, (last_id, current_id, instance_name)) return txn.fetchall() return await self.db_pool.runInteraction( @@ -1149,10 +1151,11 @@ def get_all_new_backfill_event_rows(txn): " LEFT JOIN state_events USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " WHERE ? > stream_ordering AND stream_ordering >= ?" + " AND instance_name = ?" " ORDER BY stream_ordering ASC" " LIMIT ?" ) - txn.execute(sql, (-last_id, -current_id, limit)) + txn.execute(sql, (-last_id, -current_id, instance_name, limit)) new_event_updates = [(row[0], row[1:]) for row in txn] limited = False @@ -1166,15 +1169,16 @@ def get_all_new_backfill_event_rows(txn): "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," " state_key, redacts, relates_to_id" " FROM events AS e" - " INNER JOIN ex_outlier_stream USING (event_id)" + " INNER JOIN ex_outlier_stream AS out USING (event_id)" " LEFT JOIN redactions USING (event_id)" " LEFT JOIN state_events USING (event_id)" " LEFT JOIN event_relations USING (event_id)" " WHERE ? > event_stream_ordering" " AND event_stream_ordering >= ?" + " AND out.instance_name = ?" " ORDER BY event_stream_ordering DESC" ) - txn.execute(sql, (-last_id, -upper_bound)) + txn.execute(sql, (-last_id, -upper_bound, instance_name)) new_event_updates.extend((row[0], row[1:]) for row in txn) if len(new_event_updates) >= limit: @@ -1188,7 +1192,7 @@ def get_all_new_backfill_event_rows(txn): ) async def get_all_updated_current_state_deltas( - self, from_token: int, to_token: int, target_row_count: int + self, instance_name: str, from_token: int, to_token: int, target_row_count: int ) -> Tuple[List[Tuple], int, bool]: """Fetch updates from current_state_delta_stream @@ -1214,9 +1218,10 @@ def get_all_updated_current_state_deltas_txn(txn): SELECT stream_id, room_id, type, state_key, event_id FROM current_state_delta_stream WHERE ? < stream_id AND stream_id <= ? + AND instance_name = ? ORDER BY stream_id ASC LIMIT ? """ - txn.execute(sql, (from_token, to_token, target_row_count)) + txn.execute(sql, (from_token, to_token, instance_name, target_row_count)) return txn.fetchall() def get_deltas_for_stream_id_txn(txn, stream_id): diff --git a/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql b/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql new file mode 100644 index 000000000000..ad1f48142880 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/20instance_name_event_tables.sql @@ -0,0 +1,17 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE current_state_delta_stream ADD COLUMN instance_name TEXT; +ALTER TABLE ex_outlier_stream ADD COLUMN instance_name TEXT; From 1b3698464b019e0533db1acbaf05efc5a8a25e46 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Oct 2020 12:08:46 +0100 Subject: [PATCH 2/3] Fix backfill stream to use instance names. --- synapse/replication/tcp/streams/_base.py | 11 ++++++++--- synapse/storage/databases/main/events_worker.py | 7 +++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 54dccd15a627..61b282ab2dab 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -240,13 +240,18 @@ class BackfillStream(Stream): ROW_TYPE = BackfillStreamRow def __init__(self, hs): - store = hs.get_datastore() + self.store = hs.get_datastore() super().__init__( hs.get_instance_name(), - current_token_without_instance(store.get_current_backfill_token), - store.get_all_new_backfill_event_rows, + self._current_token, + self.store.get_all_new_backfill_event_rows, ) + def _current_token(self, instance_name: str) -> int: + # The backfill stream over replication operates on *positive* numbers, + # which means we need to negate it. + return -self.store._backfill_id_gen.get_current_token_for_writer(instance_name) + class PresenceStream(Stream): PresenceStreamRow = namedtuple( diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b4be6d9d17fc..4e74fafe43d1 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1034,10 +1034,6 @@ async def get_room_complexity(self, room_id): return {"v1": complexity_v1} - def get_current_backfill_token(self): - """The current minimum token that backfilled events have reached""" - return -self._backfill_id_gen.get_current_token() - def get_current_events_token(self): """The current maximum token that events have reached""" return self._stream_id_gen.get_current_token() @@ -1121,6 +1117,9 @@ async def get_all_new_backfill_event_rows( """Get updates for backfill replication stream, including all new backfilled events and events that have gone from being outliers to not. + NOTE: The IDs given here are from replication, and so should be + *positive*. + Args: instance_name: The writer we want to fetch updates from. Unused here since there is only ever one writer. From 9282eb3b3f776e34ebaecbbf0c1871a5a91b9c78 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Oct 2020 10:33:17 +0100 Subject: [PATCH 3/3] Newsfile --- changelog.d/8496.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8496.misc diff --git a/changelog.d/8496.misc b/changelog.d/8496.misc new file mode 100644 index 000000000000..237cb3b31135 --- /dev/null +++ b/changelog.d/8496.misc @@ -0,0 +1 @@ +Allow events to be sent to clients sooner when using sharded event persisters.