From 4fdd4845d0000ccfc9f9be20c121fd38645a9267 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Jun 2020 17:36:40 +0100 Subject: [PATCH 1/4] Ensure account data stream IDs are unique. The account data stream is shared between three tables, and the maximum allocated ID was tracked in a dedicated table. Updating the max ID happened outside the transaction that allocated the ID, leading to a race where if the server was eg restarted then the same ID could be allocated but the max ID failed to be updated, leading it to be reused. The ID generators have support for tracking across multiple tables, so we may as well use that instead of a dedicated table. --- .../replication/slave/storage/account_data.py | 8 +++- .../storage/data_stores/main/account_data.py | 41 ++++--------------- synapse/storage/data_stores/main/tags.py | 7 ---- 3 files changed, 14 insertions(+), 42 deletions(-) diff --git a/synapse/replication/slave/storage/account_data.py b/synapse/replication/slave/storage/account_data.py index 2a4f5c7cfd65..9db6c62bc74f 100644 --- a/synapse/replication/slave/storage/account_data.py +++ b/synapse/replication/slave/storage/account_data.py @@ -24,7 +24,13 @@ class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore): def __init__(self, database: Database, db_conn, hs): self._account_data_id_gen = SlavedIdTracker( - db_conn, "account_data_max_stream_id", "stream_id" + db_conn, + "account_data", + "stream_id", + extra_tables=[ + ("room_account_data", "stream_id"), + ("room_tags_revisions", "stream_id"), + ], ) super(SlavedAccountDataStore, self).__init__(database, db_conn, hs) diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py index f9eef1b78ecc..f1b9c3dd42fa 100644 --- a/synapse/storage/data_stores/main/account_data.py +++ b/synapse/storage/data_stores/main/account_data.py @@ -297,7 +297,13 @@ def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): class AccountDataStore(AccountDataWorkerStore): def __init__(self, database: Database, db_conn, hs): self._account_data_id_gen = StreamIdGenerator( - db_conn, "account_data_max_stream_id", "stream_id" + db_conn, + "account_data_max_stream_id", + "stream_id", + extra_tables=[ + ("room_account_data", "stream_id"), + ("room_tags_revisions", "stream_id"), + ], ) super(AccountDataStore, self).__init__(database, db_conn, hs) @@ -339,14 +345,6 @@ def add_account_data_to_room(self, user_id, room_id, account_data_type, content) lock=False, ) - # it's theoretically possible for the above to succeed and the - # below to fail - in which case we might reuse a stream id on - # restart, and the above update might not get propagated. That - # doesn't sound any worse than the whole update getting lost, - # which is what would happen if we combined the two into one - # transaction. - yield self._update_max_stream_id(next_id) - self._account_data_stream_cache.entity_has_changed(user_id, next_id) self.get_account_data_for_user.invalidate((user_id,)) self.get_account_data_for_room.invalidate((user_id, room_id)) @@ -381,14 +379,6 @@ def add_account_data_for_user(self, user_id, account_data_type, content): lock=False, ) - # it's theoretically possible for the above to succeed and the - # below to fail - in which case we might reuse a stream id on - # restart, and the above update might not get propagated. That - # doesn't sound any worse than the whole update getting lost, - # which is what would happen if we combined the two into one - # transaction. - yield self._update_max_stream_id(next_id) - self._account_data_stream_cache.entity_has_changed(user_id, next_id) self.get_account_data_for_user.invalidate((user_id,)) self.get_global_account_data_by_type_for_user.invalidate( @@ -397,20 +387,3 @@ def add_account_data_for_user(self, user_id, account_data_type, content): result = self._account_data_id_gen.get_current_token() return result - - def _update_max_stream_id(self, next_id): - """Update the max stream_id - - Args: - next_id(int): The the revision to advance to. - """ - - def _update(txn): - update_max_id_sql = ( - "UPDATE account_data_max_stream_id" - " SET stream_id = ?" - " WHERE stream_id < ?" - ) - txn.execute(update_max_id_sql, (next_id, next_id)) - - return self.db.runInteraction("update_account_data_max_stream_id", _update) diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py index 2aa1bafd48d3..4099a959fd23 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py @@ -233,13 +233,6 @@ def _update_revision_txn(self, txn, user_id, room_id, next_id): self._account_data_stream_cache.entity_has_changed, user_id, next_id ) - update_max_id_sql = ( - "UPDATE account_data_max_stream_id" - " SET stream_id = ?" - " WHERE stream_id < ?" - ) - txn.execute(update_max_id_sql, (next_id, next_id)) - update_sql = ( "UPDATE room_tags_revisions" " SET stream_id = ?" From d69c879e9a3083cdca439ef9ddc7e834668ab368 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Jun 2020 17:39:19 +0100 Subject: [PATCH 2/4] Fix bug in account data replciation stream. If the same stream ID was used in both global and room account data then the getting udpates for the replication stream would fail due to `heapq.merge(..)` trying to compare a `str` with a `None`. (This is because you'd have two rows like `(534, '!room')` and `(534, None)` from the room and global account data tables). Fix is just to order by stream ID, since we don't rely on the ordering beyond that. The bug where stream IDs can be reused should be fixed now, so this case shouldn't happen going forward. Fixes #7617 --- synapse/replication/tcp/streams/_base.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index d42aaff05582..4acefc8a9686 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -600,8 +600,14 @@ async def _update_function( for stream_id, user_id, room_id, account_data_type in room_results ) - # we need to return a sorted list, so merge them together. - updates = list(heapq.merge(room_rows, global_rows)) + # We need to return a sorted list, so merge them together. + # + # Note: We order only by the stream ID to work around a bug where the + # same stream ID could appear in both `global_rows` and `room_rows`, + # leading to a comparison between the data tuples. The comparison could + # fail due to attempting to compare the `room_id` which results in a + # `TypeError` from comparing a `str` vs `None`. + updates = list(heapq.merge(room_rows, global_rows, key=lambda row: row[0])) return updates, to_token, limited From 6afdbd5019398b5721ef223e55f4b90dbb623eb8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Jun 2020 17:49:18 +0100 Subject: [PATCH 3/4] Newsfile --- changelog.d/7656.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7656.bugfix diff --git a/changelog.d/7656.bugfix b/changelog.d/7656.bugfix new file mode 100644 index 000000000000..1aeddb5fb99d --- /dev/null +++ b/changelog.d/7656.bugfix @@ -0,0 +1 @@ +Fix bug in account data replication stream. From 9d3a00eed786b3482851812524775ae928ba9a68 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Jun 2020 15:39:03 +0100 Subject: [PATCH 4/4] Add back table updates for backwards compat --- .../storage/data_stores/main/account_data.py | 41 +++++++++++++++++++ synapse/storage/data_stores/main/tags.py | 10 +++++ synapse/storage/prepare_database.py | 1 + 3 files changed, 52 insertions(+) diff --git a/synapse/storage/data_stores/main/account_data.py b/synapse/storage/data_stores/main/account_data.py index f1b9c3dd42fa..b58f04d00dff 100644 --- a/synapse/storage/data_stores/main/account_data.py +++ b/synapse/storage/data_stores/main/account_data.py @@ -345,6 +345,14 @@ def add_account_data_to_room(self, user_id, room_id, account_data_type, content) lock=False, ) + # it's theoretically possible for the above to succeed and the + # below to fail - in which case we might reuse a stream id on + # restart, and the above update might not get propagated. That + # doesn't sound any worse than the whole update getting lost, + # which is what would happen if we combined the two into one + # transaction. + yield self._update_max_stream_id(next_id) + self._account_data_stream_cache.entity_has_changed(user_id, next_id) self.get_account_data_for_user.invalidate((user_id,)) self.get_account_data_for_room.invalidate((user_id, room_id)) @@ -379,6 +387,18 @@ def add_account_data_for_user(self, user_id, account_data_type, content): lock=False, ) + # it's theoretically possible for the above to succeed and the + # below to fail - in which case we might reuse a stream id on + # restart, and the above update might not get propagated. That + # doesn't sound any worse than the whole update getting lost, + # which is what would happen if we combined the two into one + # transaction. + # + # Note: This is only here for backwards compat to allow admins to + # roll back to a previous Synapse version. Next time we update the + # database version we can remove this table. + yield self._update_max_stream_id(next_id) + self._account_data_stream_cache.entity_has_changed(user_id, next_id) self.get_account_data_for_user.invalidate((user_id,)) self.get_global_account_data_by_type_for_user.invalidate( @@ -387,3 +407,24 @@ def add_account_data_for_user(self, user_id, account_data_type, content): result = self._account_data_id_gen.get_current_token() return result + + def _update_max_stream_id(self, next_id): + """Update the max stream_id + + Args: + next_id(int): The the revision to advance to. + """ + + # Note: This is only here for backwards compat to allow admins to + # roll back to a previous Synapse version. Next time we update the + # database version we can remove this table. + + def _update(txn): + update_max_id_sql = ( + "UPDATE account_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) + + return self.db.runInteraction("update_account_data_max_stream_id", _update) diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py index 4099a959fd23..421901830249 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py @@ -233,6 +233,16 @@ def _update_revision_txn(self, txn, user_id, room_id, next_id): self._account_data_stream_cache.entity_has_changed, user_id, next_id ) + # Note: This is only here for backwards compat to allow admins to + # roll back to a previous Synapse version. Next time we update the + # database version we can remove this table. + update_max_id_sql = ( + "UPDATE account_data_max_stream_id" + " SET stream_id = ?" + " WHERE stream_id < ?" + ) + txn.execute(update_max_id_sql, (next_id, next_id)) + update_sql = ( "UPDATE room_tags_revisions" " SET stream_id = ?" diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index b95434f031fc..9cc3b51fe6a1 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -33,6 +33,7 @@ # schema files, so the users will be informed on server restarts. # XXX: If you're about to bump this to 59 (or higher) please create an update # that drops the unused `cache_invalidation_stream` table, as per #7436! +# XXX: Also add an update to drop `account_data_max_stream_id` as per #7656! SCHEMA_VERSION = 58 dir_path = os.path.abspath(os.path.dirname(__file__))