This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Add background update for add chain cover index #9029
Merged
Merged
Changes from 9 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
30c6411
Split out chain calculation
erikjohnston 7b7b325
Add background update to add chain cover info
erikjohnston 70571ae
Add test
erikjohnston e94b894
Newsfile
erikjohnston dfea16c
Fix port script for auth chain index
erikjohnston be79c61
Update synapse/storage/databases/main/schema/delta/59/06chain_cover_i…
erikjohnston 3d069a5
Fixup changelogs
erikjohnston 56f4c47
We still want to do set(..)
erikjohnston 4c4541e
Revert change to simple_delete_txn
erikjohnston e8e7787
Fix tests
erikjohnston File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
Improve efficiency of large state resolutions for new rooms. | ||
Improve efficiency of large state resolutions. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Improve efficiency of large state resolutions. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,13 +14,13 @@ | |
# limitations under the License. | ||
|
||
import logging | ||
from typing import List, Tuple | ||
from typing import Dict, List, Optional, Tuple | ||
|
||
from synapse.api.constants import EventContentFields | ||
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS | ||
from synapse.events import make_event_from_dict | ||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause | ||
from synapse.storage.database import DatabasePool | ||
from synapse.storage.database import DatabasePool, make_tuple_comparison_clause | ||
from synapse.storage.types import Cursor | ||
from synapse.types import JsonDict | ||
|
||
|
@@ -108,6 +108,10 @@ def __init__(self, database: DatabasePool, db_conn, hs): | |
"rejected_events_metadata", self._rejected_events_metadata, | ||
) | ||
|
||
self.db_pool.updates.register_background_update_handler( | ||
"chain_cover", self._chain_cover_index, | ||
) | ||
|
||
async def _background_reindex_fields_sender(self, progress, batch_size): | ||
target_min_stream_id = progress["target_min_stream_id_inclusive"] | ||
max_stream_id = progress["max_stream_id_exclusive"] | ||
|
@@ -706,3 +710,187 @@ def get_rejected_events( | |
) | ||
|
||
return len(results) | ||
|
||
async def _chain_cover_index(self, progress: dict, batch_size: int) -> int: | ||
"""A background updates that iterates over all rooms and generates the | ||
chain cover index for them. | ||
""" | ||
|
||
current_room_id = progress.get("current_room_id", "") | ||
|
||
# Have we finished processing the current room. | ||
finished = progress.get("finished", True) | ||
|
||
# Where we've processed up to in the room, defaults to the start of the | ||
# room. | ||
last_depth = progress.get("last_depth", -1) | ||
last_stream = progress.get("last_stream", -1) | ||
|
||
# Have we set the `has_auth_chain_index` for the room yet. | ||
has_set_room_has_chain_index = progress.get( | ||
"has_set_room_has_chain_index", False | ||
) | ||
|
||
if finished: | ||
# If we've finished with the previous room (or its our first | ||
# iteration) we move on to the next room. | ||
|
||
def _get_next_room(txn: Cursor) -> Optional[str]: | ||
sql = """ | ||
SELECT room_id FROM rooms | ||
WHERE room_id > ? | ||
AND ( | ||
NOT has_auth_chain_index | ||
OR has_auth_chain_index IS NULL | ||
) | ||
ORDER BY room_id | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
LIMIT 1 | ||
""" | ||
txn.execute(sql, (current_room_id,)) | ||
row = txn.fetchone() | ||
if row: | ||
return row[0] | ||
|
||
return None | ||
|
||
current_room_id = await self.db_pool.runInteraction( | ||
"_chain_cover_index", _get_next_room | ||
) | ||
if not current_room_id: | ||
await self.db_pool.updates._end_background_update("chain_cover") | ||
return 0 | ||
|
||
logger.debug("Adding chain cover to %s", current_room_id) | ||
|
||
def _calculate_auth_chain( | ||
txn: Cursor, last_depth: int, last_stream: int | ||
) -> Tuple[int, int, int]: | ||
# Get the next set of events in the room (that we haven't already | ||
# computed chain cover for). We do this in topological order. | ||
|
||
# We want to do a `(topological_ordering, stream_ordering) > (?,?)` | ||
# comparison, but that is not supported on older SQLite versions | ||
tuple_clause, tuple_args = make_tuple_comparison_clause( | ||
self.database_engine, | ||
[ | ||
("topological_ordering", last_depth), | ||
("stream_ordering", last_stream), | ||
], | ||
) | ||
|
||
sql = """ | ||
SELECT | ||
event_id, state_events.type, state_events.state_key, | ||
topological_ordering, stream_ordering | ||
FROM events | ||
INNER JOIN state_events USING (event_id) | ||
LEFT JOIN event_auth_chains USING (event_id) | ||
LEFT JOIN event_auth_chain_to_calculate USING (event_id) | ||
WHERE events.room_id = ? | ||
AND event_auth_chains.event_id IS NULL | ||
AND event_auth_chain_to_calculate.event_id IS NULL | ||
AND %(tuple_cmp)s | ||
ORDER BY topological_ordering, stream_ordering | ||
LIMIT ? | ||
""" % { | ||
"tuple_cmp": tuple_clause, | ||
} | ||
|
||
args = [current_room_id] | ||
args.extend(tuple_args) | ||
args.append(batch_size) | ||
|
||
txn.execute(sql, args) | ||
rows = txn.fetchall() | ||
clokep marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Put the results in the necessary format for | ||
# `_add_chain_cover_index` | ||
event_to_room_id = {row[0]: current_room_id for row in rows} | ||
event_to_types = {row[0]: (row[1], row[2]) for row in rows} | ||
|
||
new_last_depth = rows[-1][3] if rows else last_depth # type: int | ||
new_last_stream = rows[-1][4] if rows else last_stream # type: int | ||
|
||
count = len(rows) | ||
|
||
# We also need to fetch the auth events for them. | ||
auth_events = self.db_pool.simple_select_many_txn( | ||
txn, | ||
table="event_auth", | ||
column="event_id", | ||
iterable=event_to_room_id, | ||
keyvalues={}, | ||
retcols=("event_id", "auth_id"), | ||
) | ||
|
||
event_to_auth_chain = {} # type: Dict[str, List[str]] | ||
for row in auth_events: | ||
event_to_auth_chain.setdefault(row["event_id"], []).append( | ||
row["auth_id"] | ||
) | ||
|
||
# Calculate and persist the chain cover index for this set of events. | ||
# | ||
# Annoyingly we need to gut wrench into the persit event store so that | ||
# we can reuse the function to calculate the chain cover for rooms. | ||
self.hs.get_datastores().persist_events._add_chain_cover_index( | ||
txn, event_to_room_id, event_to_types, event_to_auth_chain, | ||
) | ||
|
||
return new_last_depth, new_last_stream, count | ||
|
||
last_depth, last_stream, count = await self.db_pool.runInteraction( | ||
"_chain_cover_index", _calculate_auth_chain, last_depth, last_stream | ||
) | ||
|
||
total_rows_processed = count | ||
|
||
if count < batch_size and not has_set_room_has_chain_index: | ||
# If we've done all the events in the room we flip the | ||
# `has_auth_chain_index` in the DB. Note that its possible for | ||
# further events to be persisted between the above and setting the | ||
# flag without having the chain cover calculated for them. This is | ||
# fine as a) the code gracefully handles these cases and b) we'll | ||
# calculate them below. | ||
|
||
await self.db_pool.simple_update( | ||
table="rooms", | ||
keyvalues={"room_id": current_room_id}, | ||
updatevalues={"has_auth_chain_index": True}, | ||
desc="_chain_cover_index", | ||
) | ||
has_set_room_has_chain_index = True | ||
|
||
# Handle any events that might have raced with us flipping the | ||
# bit above. | ||
last_depth, last_stream, count = await self.db_pool.runInteraction( | ||
"_chain_cover_index", _calculate_auth_chain, last_depth, last_stream | ||
) | ||
|
||
total_rows_processed += count | ||
|
||
# Note that at this point its technically possible that more events | ||
# than our `batch_size` have been persisted without their chain | ||
# cover, so we need to continue processing this room if the last | ||
# count returned was equal to the `batch_size`. | ||
|
||
if count < batch_size: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Specifically not, since we want to know if the last call to |
||
# We've finished calculating the index for this room, move on to the | ||
# next room. | ||
await self.db_pool.updates._background_update_progress( | ||
"chain_cover", {"current_room_id": current_room_id, "finished": True}, | ||
) | ||
else: | ||
# We still have outstanding events to calculate the index for. | ||
await self.db_pool.updates._background_update_progress( | ||
"chain_cover", | ||
{ | ||
"current_room_id": current_room_id, | ||
"last_depth": last_depth, | ||
"last_stream": last_stream, | ||
"has_auth_chain_index": has_set_room_has_chain_index, | ||
"finished": False, | ||
}, | ||
) | ||
|
||
return total_rows_processed |
17 changes: 17 additions & 0 deletions
17
synapse/storage/databases/main/schema/delta/59/06chain_cover_index.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
/* Copyright 2020 The Matrix.org Foundation C.I.C | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES | ||
(5906, 'chain_cover', '{}', 'rejected_events_metadata'); |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a bugfix or is the code used differently due to the background update? (Same for the changes below in this file.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latter, the BG update will no longer add empty entries for create events to the
event_to_auth_chain
map