From aeaa36d9242e72609c56bfd3207502641e24b4fb Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 9 Aug 2022 18:16:09 -0500 Subject: [PATCH 01/13] Instrument the federation/backfill part of /messages Split out from https://github.com/matrix-org/synapse/pull/13440 --- synapse/federation/federation_client.py | 19 +++- synapse/handlers/federation.py | 7 +- synapse/handlers/federation_event.py | 87 ++++++++++++++++--- synapse/logging/opentracing.py | 13 +++ synapse/storage/controllers/persist_events.py | 26 ++++-- synapse/storage/controllers/state.py | 5 +- .../databases/main/event_federation.py | 7 ++ synapse/storage/databases/main/events.py | 2 + .../storage/databases/main/events_worker.py | 18 ++++ .../util/partial_state_events_tracker.py | 3 + 10 files changed, 164 insertions(+), 23 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 54ffbd817095..19f744647ead 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -61,7 +61,7 @@ ) from synapse.federation.transport.client import SendJoinResponse from synapse.http.types import QueryParams -from synapse.logging.opentracing import trace +from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache @@ -235,6 +235,7 @@ async def claim_client_keys( ) @trace + @tag_args async def backfill( self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> Optional[List[EventBase]]: @@ -337,6 +338,8 @@ async def get_pdu_from_destination_raw( return None + @trace + @tag_args async def get_pdu( self, destinations: Iterable[str], @@ -448,6 +451,8 @@ async def get_pdu( return event_copy + @trace + @tag_args async def get_room_state_ids( self, destination: str, room_id: str, event_id: str ) -> Tuple[List[str], List[str]]: @@ -467,6 +472,15 @@ async def get_room_state_ids( state_event_ids = result["pdu_ids"] auth_event_ids = result.get("auth_chain_ids", []) + set_tag( + SynapseTags.RESULT_PREFIX + f"state_event_ids ({len(state_event_ids)})", + str(state_event_ids), + ) + set_tag( + SynapseTags.RESULT_PREFIX + f"auth_event_ids ({len(auth_event_ids)})", + str(auth_event_ids), + ) + if not isinstance(state_event_ids, list) or not isinstance( auth_event_ids, list ): @@ -474,6 +488,8 @@ async def get_room_state_ids( return state_event_ids, auth_event_ids + @trace + @tag_args async def get_room_state( self, destination: str, @@ -533,6 +549,7 @@ async def get_room_state( return valid_state_events, valid_auth_events + @trace async def _check_sigs_and_hash_and_fetch( self, origin: str, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5042236742da..dd9f37256353 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -59,7 +59,7 @@ from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context -from synapse.logging.opentracing import trace +from synapse.logging.opentracing import SynapseTags, set_tag, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.replication.http.federation import ( @@ -370,6 +370,11 @@ async def _maybe_backfill_inner( logger.debug( "_maybe_backfill_inner: extremities_to_request %s", extremities_to_request ) + set_tag( + SynapseTags.RESULT_PREFIX + + f"extremities_to_request {len(extremities_to_request)}", + str(extremities_to_request), + ) # Now we need to decide which hosts to hit first. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 8968b705d43a..1a444a67481a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -59,7 +59,13 @@ from synapse.events.snapshot import EventContext from synapse.federation.federation_client import InvalidResponseError from synapse.logging.context import nested_logging_context -from synapse.logging.opentracing import trace +from synapse.logging.opentracing import ( + SynapseTags, + set_tag, + start_active_span, + tag_args, + trace, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( @@ -410,6 +416,7 @@ async def check_join_restrictions( prev_member_event, ) + @trace async def process_remote_join( self, origin: str, @@ -715,7 +722,7 @@ async def _get_missing_events_for_pdu( @trace async def _process_pulled_events( - self, origin: str, events: Iterable[EventBase], backfilled: bool + self, origin: str, events: List[EventBase], backfilled: bool ) -> None: """Process a batch of events we have pulled from a remote server @@ -730,6 +737,11 @@ async def _process_pulled_events( backfilled: True if this is part of a historical batch of events (inhibits notification to clients, and validation of device keys.) """ + set_tag( + SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events)})", + str([event.event_id for event in events]), + ) + set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) logger.debug( "processing pulled backfilled=%s events=%s", backfilled, @@ -753,6 +765,7 @@ async def _process_pulled_events( await self._process_pulled_event(origin, ev, backfilled=backfilled) @trace + @tag_args async def _process_pulled_event( self, origin: str, event: EventBase, backfilled: bool ) -> None: @@ -854,6 +867,7 @@ async def _process_pulled_event( else: raise + @trace async def _compute_event_context_with_maybe_missing_prevs( self, dest: str, event: EventBase ) -> EventContext: @@ -970,6 +984,8 @@ async def _compute_event_context_with_maybe_missing_prevs( event, state_ids_before_event=state_map, partial_state=partial_state ) + @trace + @tag_args async def _get_state_ids_after_missing_prev_event( self, destination: str, @@ -1009,10 +1025,10 @@ async def _get_state_ids_after_missing_prev_event( logger.debug("Fetching %i events from cache/store", len(desired_events)) have_events = await self._store.have_seen_events(room_id, desired_events) - missing_desired_events = desired_events - have_events + missing_desired_event_ids = desired_events - have_events logger.debug( "We are missing %i events (got %i)", - len(missing_desired_events), + len(missing_desired_event_ids), len(have_events), ) @@ -1024,13 +1040,24 @@ async def _get_state_ids_after_missing_prev_event( # already have a bunch of the state events. It would be nice if the # federation api gave us a way of finding out which we actually need. - missing_auth_events = set(auth_event_ids) - have_events - missing_auth_events.difference_update( - await self._store.have_seen_events(room_id, missing_auth_events) + missing_auth_event_ids = set(auth_event_ids) - have_events + missing_auth_event_ids.difference_update( + await self._store.have_seen_events(room_id, missing_auth_event_ids) ) - logger.debug("We are also missing %i auth events", len(missing_auth_events)) + logger.debug("We are also missing %i auth events", len(missing_auth_event_ids)) + + missing_event_ids = missing_desired_event_ids | missing_auth_event_ids - missing_events = missing_desired_events | missing_auth_events + set_tag( + SynapseTags.RESULT_PREFIX + + f"missing_auth_event_ids ({len(missing_auth_event_ids)})", + str(missing_auth_event_ids), + ) + set_tag( + SynapseTags.RESULT_PREFIX + + f"missing_desired_event_ids ({len(missing_desired_event_ids)})", + str(missing_desired_event_ids), + ) # Making an individual request for each of 1000s of events has a lot of # overhead. On the other hand, we don't really want to fetch all of the events @@ -1041,13 +1068,13 @@ async def _get_state_ids_after_missing_prev_event( # # TODO: might it be better to have an API which lets us do an aggregate event # request - if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids): + if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids): logger.debug("Requesting complete state from remote") await self._get_state_and_persist(destination, room_id, event_id) else: - logger.debug("Fetching %i events from remote", len(missing_events)) + logger.debug("Fetching %i events from remote", len(missing_event_ids)) await self._get_events_and_persist( - destination=destination, room_id=room_id, event_ids=missing_events + destination=destination, room_id=room_id, event_ids=missing_event_ids ) # We now need to fill out the state map, which involves fetching the @@ -1104,6 +1131,10 @@ async def _get_state_ids_after_missing_prev_event( event_id, failed_to_fetch, ) + set_tag( + SynapseTags.RESULT_PREFIX + f"failed_to_fetch ({len(failed_to_fetch)})", + str(failed_to_fetch), + ) if remote_event.is_state() and remote_event.rejected_reason is None: state_map[ @@ -1112,6 +1143,8 @@ async def _get_state_ids_after_missing_prev_event( return state_map + @trace + @tag_args async def _get_state_and_persist( self, destination: str, room_id: str, event_id: str ) -> None: @@ -1133,6 +1166,7 @@ async def _get_state_and_persist( destination=destination, room_id=room_id, event_ids=(event_id,) ) + @trace async def _process_received_pdu( self, origin: str, @@ -1283,6 +1317,7 @@ async def _resync_device(self, sender: str) -> None: except Exception: logger.exception("Failed to resync device for %s", sender) + @trace async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None: """Handles backfilling the insertion event when we receive a marker event that points to one. @@ -1414,6 +1449,8 @@ async def backfill_event_id( return event_from_response + @trace + @tag_args async def _get_events_and_persist( self, destination: str, room_id: str, event_ids: Collection[str] ) -> None: @@ -1459,6 +1496,7 @@ async def get_event(event_id: str) -> None: logger.info("Fetched %i events of %i requested", len(events), len(event_ids)) await self._auth_and_persist_outliers(room_id, events) + @trace async def _auth_and_persist_outliers( self, room_id: str, events: Iterable[EventBase] ) -> None: @@ -1477,6 +1515,12 @@ async def _auth_and_persist_outliers( """ event_map = {event.event_id: event for event in events} + event_ids = event_map.keys() + set_tag( + SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(event_ids)})", + str(event_ids), + ) + # filter out any events we have already seen. This might happen because # the events were eagerly pushed to us (eg, during a room join), or because # another thread has raced against us since we decided to request the event. @@ -1593,6 +1637,7 @@ async def prep(event: EventBase) -> None: backfilled=True, ) + @trace async def _check_event_auth( self, origin: Optional[str], event: EventBase, context: EventContext ) -> None: @@ -1631,6 +1676,11 @@ async def _check_event_auth( claimed_auth_events = await self._load_or_fetch_auth_events_for_event( origin, event ) + set_tag( + SynapseTags.RESULT_PREFIX + + f"claimed_auth_events ({len(claimed_auth_events)})", + str([ev.event_id for ev in claimed_auth_events]), + ) # ... and check that the event passes auth at those auth events. # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu: @@ -1728,6 +1778,7 @@ async def _check_event_auth( ) context.rejected = RejectedReason.AUTH_ERROR + @trace async def _maybe_kick_guest_users(self, event: EventBase) -> None: if event.type != EventTypes.GuestAccess: return @@ -1935,6 +1986,8 @@ async def _load_or_fetch_auth_events_for_event( # instead we raise an AuthError, which will make the caller ignore it. raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found") + @trace + @tag_args async def _get_remote_auth_chain_for_event( self, destination: str, room_id: str, event_id: str ) -> None: @@ -1963,6 +2016,7 @@ async def _get_remote_auth_chain_for_event( await self._auth_and_persist_outliers(room_id, remote_auth_events) + @trace async def _run_push_actions_and_persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> None: @@ -2071,8 +2125,13 @@ async def persist_events_and_notify( self._message_handler.maybe_schedule_expiry(event) if not backfilled: # Never notify for backfilled events - for event in events: - await self._notify_persisted_event(event, max_stream_token) + with start_active_span("notify_persisted_events"): + set_tag( + SynapseTags.RESULT_PREFIX + f"event_ids ({len(events)})", + str([ev.event_id for ev in events]), + ) + for event in events: + await self._notify_persisted_event(event, max_stream_token) return max_stream_token.stream diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index d1fa2cf8ae67..9ddcb35bf10c 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -310,6 +310,19 @@ class SynapseTags: # The name of the external cache CACHE_NAME = "cache.name" + # Used to tag function arguments + # + # Tag a named arg. The name of the argument should be appended to this + # prefix + FUNC_ARG_PREFIX = "ARG." + # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`) + FUNC_ARGS = "args" + # Tag keyword args + FUNC_KWARGS = "kwargs" + + # Some intermediate result that's interesting to the function + RESULT_PREFIX = "RESULT." + class SynapseBaggage: FORCE_TRACING = "synapse-force-tracing" diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index cf98b0ab48f8..854acec31ada 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -45,8 +45,14 @@ from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext -from synapse.logging import opentracing from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.logging.opentracing import ( + SynapseTags, + active_span, + set_tag, + start_active_span_follows_from, + trace, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.controllers.state import StateStorageController from synapse.storage.databases import Databases @@ -223,7 +229,7 @@ async def add_to_queue( queue.append(end_item) # also add our active opentracing span to the item so that we get a link back - span = opentracing.active_span() + span = active_span() if span: end_item.parent_opentracing_span_contexts.append(span.context) @@ -234,7 +240,7 @@ async def add_to_queue( res = await make_deferred_yieldable(end_item.deferred.observe()) # add another opentracing span which links to the persist trace. - with opentracing.start_active_span_follows_from( + with start_active_span_follows_from( f"{task.name}_complete", (end_item.opentracing_span_context,) ): pass @@ -266,7 +272,7 @@ async def handle_queue_loop() -> None: queue = self._get_drainining_queue(room_id) for item in queue: try: - with opentracing.start_active_span_follows_from( + with start_active_span_follows_from( item.task.name, item.parent_opentracing_span_contexts, inherit_force_tracing=True, @@ -355,7 +361,7 @@ async def _process_event_persist_queue_task( f"Found an unexpected task type in event persistence queue: {task}" ) - @opentracing.trace + @trace async def persist_events( self, events_and_contexts: Iterable[Tuple[EventBase, EventContext]], @@ -380,9 +386,17 @@ async def persist_events( PartialStateConflictError: if attempting to persist a partial state event in a room that has been un-partial stated. """ + event_ids: List[str] = [] partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: partitioned.setdefault(event.room_id, []).append((event, ctx)) + event_ids.append(event.event_id) + + set_tag( + SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(event_ids)})", + str(event_ids), + ) + set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) async def enqueue( item: Tuple[str, List[Tuple[EventBase, EventContext]]] @@ -418,7 +432,7 @@ async def enqueue( self.main_store.get_room_max_token(), ) - @opentracing.trace + @trace async def persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 0d480f101432..d622505741b7 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -29,7 +29,7 @@ from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.logging.opentracing import trace +from synapse.logging.opentracing import tag_args, trace from synapse.storage.state import StateFilter from synapse.storage.util.partial_state_events_tracker import ( PartialCurrentStateTracker, @@ -228,6 +228,7 @@ async def get_state_for_events( return {event: event_to_state[event] for event in event_ids} @trace + @tag_args async def get_state_ids_for_events( self, event_ids: Collection[str], @@ -332,6 +333,7 @@ def get_state_for_groups( ) @trace + @tag_args async def get_state_group_for_events( self, event_ids: Collection[str], @@ -473,6 +475,7 @@ async def get_current_state_deltas( prev_stream_id, max_stream_id ) + @trace async def get_current_state( self, room_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[EventBase]: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index eec55b647857..1ca7c762fd91 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -33,6 +33,7 @@ from synapse.api.errors import StoreError from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict +from synapse.logging.opentracing import tag_args, trace from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( @@ -709,6 +710,8 @@ def _get_auth_chain_difference_txn( # Return all events where not all sets can reach them. return {eid for eid, n in event_to_missing_sets.items() if n} + @trace + @tag_args async def get_oldest_event_ids_with_depth_in_room( self, room_id: str ) -> List[Tuple[str, int]]: @@ -767,6 +770,7 @@ def get_oldest_event_ids_with_depth_in_room_txn( room_id, ) + @trace async def get_insertion_event_backward_extremities_in_room( self, room_id: str ) -> List[Tuple[str, int]]: @@ -1339,6 +1343,8 @@ def _get_missing_events( event_results.reverse() return event_results + @trace + @tag_args async def get_successor_events(self, event_id: str) -> List[str]: """Fetch all events that have the given event as a prev event @@ -1375,6 +1381,7 @@ def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None: _delete_old_forward_extrem_cache_txn, ) + @trace async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None: await self.db_pool.simple_upsert( table="insertion_event_extremities", diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 5560b38a4832..a4010ee28dca 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -40,6 +40,7 @@ from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext +from synapse.logging.opentracing import trace from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -145,6 +146,7 @@ def __init__( self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen + @trace async def _persist_events_and_state_updates( self, events_and_contexts: List[Tuple[EventBase, EventContext]], diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e9ff6cfb3455..03de3d89f083 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -54,6 +54,7 @@ current_context, make_deferred_yieldable, ) +from synapse.logging.opentracing import start_active_span, tag_args, trace from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, @@ -430,6 +431,8 @@ async def get_events( return {e.event_id: e for e in events} + @trace + @tag_args async def get_events_as_list( self, event_ids: Collection[str], @@ -1092,6 +1095,11 @@ async def _get_events_from_db( fetched_events: Dict[str, _EventRow] = {} events_to_fetch = event_ids + is_recording_redaction_trace = False + fetching_redactions_tracing_span_cm = start_active_span( + "recursively fetching redactions" + ) + while events_to_fetch: row_map = await self._enqueue_events(events_to_fetch) @@ -1107,6 +1115,14 @@ async def _get_events_from_db( events_to_fetch = redaction_ids.difference(fetched_event_ids) if events_to_fetch: logger.debug("Also fetching redaction events %s", events_to_fetch) + # Start tracing how long it takes for us to get all of the redactions + if not is_recording_redaction_trace: + fetching_redactions_tracing_span_cm.__enter__() + is_recording_redaction_trace = True + + # Only stop recording if we were recording in the first place + if is_recording_redaction_trace: + fetching_redactions_tracing_span_cm.__exit__(None, None, None) # build a map from event_id to EventBase event_map: Dict[str, EventBase] = {} @@ -1424,6 +1440,8 @@ async def have_events_in_timeline(self, event_ids: Iterable[str]) -> Set[str]: return {r["event_id"] for r in rows} + @trace + @tag_args async def have_seen_events( self, room_id: str, event_ids: Iterable[str] ) -> Set[str]: diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py index 466e5137f2d3..b4bf49dace21 100644 --- a/synapse/storage/util/partial_state_events_tracker.py +++ b/synapse/storage/util/partial_state_events_tracker.py @@ -20,6 +20,7 @@ from twisted.internet.defer import Deferred from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.logging.opentracing import trace_with_opname from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore from synapse.util import unwrapFirstError @@ -58,6 +59,7 @@ def notify_un_partial_stated(self, event_id: str) -> None: for o in observers: o.callback(None) + @trace_with_opname("PartialStateEventsTracker.await_full_state") async def await_full_state(self, event_ids: Collection[str]) -> None: """Wait for all the given events to have full state. @@ -151,6 +153,7 @@ def notify_un_partial_stated(self, room_id: str) -> None: for o in observers: o.callback(None) + @trace_with_opname("PartialCurrentStateTracker.await_full_state") async def await_full_state(self, room_id: str) -> None: # We add the deferred immediately so that the DB call to check for # partial state doesn't race when we unpartial the room. From 6a389cd3e4d68a40207879d22424bc1d99c51f16 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Aug 2022 00:26:37 -0500 Subject: [PATCH 02/13] Add changelog --- changelog.d/13489.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13489.misc diff --git a/changelog.d/13489.misc b/changelog.d/13489.misc new file mode 100644 index 000000000000..4b433a510781 --- /dev/null +++ b/changelog.d/13489.misc @@ -0,0 +1 @@ +Instrument `/messages` for understandable traces in Jaeger. From eb20203f120f52c17c754bcb5704c7d52e7d9d24 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Aug 2022 00:30:25 -0500 Subject: [PATCH 03/13] Also explain to append --- synapse/logging/opentracing.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 9ddcb35bf10c..3d96c9894031 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -312,7 +312,7 @@ class SynapseTags: # Used to tag function arguments # - # Tag a named arg. The name of the argument should be appended to this + # Tag a named arg. The name of the argument should be appended to this. # prefix FUNC_ARG_PREFIX = "ARG." # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`) @@ -320,7 +320,8 @@ class SynapseTags: # Tag keyword args FUNC_KWARGS = "kwargs" - # Some intermediate result that's interesting to the function + # Some intermediate result that's interesting to the function. The label for + # the result should be appended to this. RESULT_PREFIX = "RESULT." From e3c2e117dcbe32df050b513452b8b19cd4ef1902 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Aug 2022 00:37:00 -0500 Subject: [PATCH 04/13] Make sure to re-use the tags for @tag_args --- synapse/logging/opentracing.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 3d96c9894031..fbfa6709bd98 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -981,9 +981,9 @@ def _wrapping_logic( # first argument only if it's named `self` or `cls`. This isn't fool-proof # but handles the idiomatic cases. for i, arg in enumerate(args[1:], start=1): # type: ignore[index] - set_tag("ARG_" + argspec.args[i], str(arg)) - set_tag("args", str(args[len(argspec.args) :])) # type: ignore[index] - set_tag("kwargs", str(kwargs)) + set_tag(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i], str(arg)) + set_tag(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :])) # type: ignore[index] + set_tag(SynapseTags.FUNC_KWARGS, str(kwargs)) yield return _custom_sync_async_decorator(func, _wrapping_logic) From caa5ee94276e5fa41442b5fc7045cc5be3f263d8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 11 Aug 2022 11:37:50 -0500 Subject: [PATCH 05/13] `Collection` for multiple iterable readble list Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> --- synapse/handlers/federation_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 1a444a67481a..aed7ed43acac 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -722,7 +722,7 @@ async def _get_missing_events_for_pdu( @trace async def _process_pulled_events( - self, origin: str, events: List[EventBase], backfilled: bool + self, origin: str, events: Collection[EventBase], backfilled: bool ) -> None: """Process a batch of events we have pulled from a remote server From 1f4911bf9e4528f6eded68cae4c736289bae695e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 11 Aug 2022 11:50:19 -0500 Subject: [PATCH 06/13] Record exception in span See https://github.com/matrix-org/synapse/pull/13489#discussion_r943444260 --- .../storage/databases/main/events_worker.py | 47 +++++++++++-------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 03de3d89f083..0d7ed757eb32 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1100,25 +1100,34 @@ async def _get_events_from_db( "recursively fetching redactions" ) - while events_to_fetch: - row_map = await self._enqueue_events(events_to_fetch) - - # we need to recursively fetch any redactions of those events - redaction_ids: Set[str] = set() - for event_id in events_to_fetch: - row = row_map.get(event_id) - fetched_event_ids.add(event_id) - if row: - fetched_events[event_id] = row - redaction_ids.update(row.redactions) - - events_to_fetch = redaction_ids.difference(fetched_event_ids) - if events_to_fetch: - logger.debug("Also fetching redaction events %s", events_to_fetch) - # Start tracing how long it takes for us to get all of the redactions - if not is_recording_redaction_trace: - fetching_redactions_tracing_span_cm.__enter__() - is_recording_redaction_trace = True + try: + while events_to_fetch: + row_map = await self._enqueue_events(events_to_fetch) + + # we need to recursively fetch any redactions of those events + redaction_ids: Set[str] = set() + for event_id in events_to_fetch: + row = row_map.get(event_id) + fetched_event_ids.add(event_id) + if row: + fetched_events[event_id] = row + redaction_ids.update(row.redactions) + + events_to_fetch = redaction_ids.difference(fetched_event_ids) + if events_to_fetch: + logger.debug("Also fetching redaction events %s", events_to_fetch) + # Start tracing how long it takes for us to get all of the redactions. + # Only start the span once while we recurse over and over. + if not is_recording_redaction_trace: + fetching_redactions_tracing_span_cm.__enter__() + is_recording_redaction_trace = True + except Exception as e: + # Only record the exception if we were recording in the first place + if is_recording_redaction_trace: + fetching_redactions_tracing_span_cm.__exit__( + type(e), None, e.__traceback__ + ) + raise # Only stop recording if we were recording in the first place if is_recording_redaction_trace: From e564f7a0ec02548c56d12b886f53f5c0b02e3a59 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 11 Aug 2022 15:02:28 -0500 Subject: [PATCH 07/13] Refactor recursive code so we can wrap just the redaction part See https://github.com/matrix-org/synapse/pull/13489#discussion_r943444260 --- .../storage/databases/main/events_worker.py | 51 ++++++------------- 1 file changed, 16 insertions(+), 35 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 0d7ed757eb32..768912816c06 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1093,45 +1093,26 @@ async def _get_events_from_db( """ fetched_event_ids: Set[str] = set() fetched_events: Dict[str, _EventRow] = {} - events_to_fetch = event_ids - - is_recording_redaction_trace = False - fetching_redactions_tracing_span_cm = start_active_span( - "recursively fetching redactions" - ) - try: - while events_to_fetch: + async def _recursively_fetch_redactions(row_map: Dict[str, _EventRow]) -> None: + # we need to recursively fetch any redactions of those events + redaction_ids: Set[str] = set() + for event_id, row in row_map.items(): + fetched_event_ids.add(event_id) + if row: + fetched_events[event_id] = row + redaction_ids.update(row.redactions) + events_to_fetch = redaction_ids.difference(fetched_event_ids) + if events_to_fetch: + logger.debug("Also fetching redaction events %s", events_to_fetch) row_map = await self._enqueue_events(events_to_fetch) + await _recursively_fetch_redactions(row_map) - # we need to recursively fetch any redactions of those events - redaction_ids: Set[str] = set() - for event_id in events_to_fetch: - row = row_map.get(event_id) - fetched_event_ids.add(event_id) - if row: - fetched_events[event_id] = row - redaction_ids.update(row.redactions) - - events_to_fetch = redaction_ids.difference(fetched_event_ids) - if events_to_fetch: - logger.debug("Also fetching redaction events %s", events_to_fetch) - # Start tracing how long it takes for us to get all of the redactions. - # Only start the span once while we recurse over and over. - if not is_recording_redaction_trace: - fetching_redactions_tracing_span_cm.__enter__() - is_recording_redaction_trace = True - except Exception as e: - # Only record the exception if we were recording in the first place - if is_recording_redaction_trace: - fetching_redactions_tracing_span_cm.__exit__( - type(e), None, e.__traceback__ - ) - raise + events_to_fetch = event_ids + row_map = await self._enqueue_events(events_to_fetch) - # Only stop recording if we were recording in the first place - if is_recording_redaction_trace: - fetching_redactions_tracing_span_cm.__exit__(None, None, None) + with start_active_span("recursively fetching redactions"): + await _recursively_fetch_redactions(row_map) # build a map from event_id to EventBase event_map: Dict[str, EventBase] = {} From ac1b8d51596ddbcf50897a67bcfb108b95c6eab1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 11 Aug 2022 15:13:48 -0500 Subject: [PATCH 08/13] Separate tag for event length See https://github.com/matrix-org/synapse/pull/13489#discussion_r943430782 --- synapse/federation/federation_client.py | 12 ++++- synapse/handlers/federation.py | 7 ++- synapse/handlers/federation_event.py | 45 ++++++++++++++----- synapse/storage/controllers/persist_events.py | 6 ++- 4 files changed, 55 insertions(+), 15 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 19f744647ead..987f6dad460c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -473,13 +473,21 @@ async def get_room_state_ids( auth_event_ids = result.get("auth_chain_ids", []) set_tag( - SynapseTags.RESULT_PREFIX + f"state_event_ids ({len(state_event_ids)})", + SynapseTags.RESULT_PREFIX + "state_event_ids", str(state_event_ids), ) set_tag( - SynapseTags.RESULT_PREFIX + f"auth_event_ids ({len(auth_event_ids)})", + SynapseTags.RESULT_PREFIX + "state_event_ids.length", + str(len(state_event_ids)), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "auth_event_ids", str(auth_event_ids), ) + set_tag( + SynapseTags.RESULT_PREFIX + "auth_event_ids.length", + str(len(auth_event_ids)), + ) if not isinstance(state_event_ids, list) or not isinstance( auth_event_ids, list diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index dd9f37256353..51f6ff4b5fe3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -371,10 +371,13 @@ async def _maybe_backfill_inner( "_maybe_backfill_inner: extremities_to_request %s", extremities_to_request ) set_tag( - SynapseTags.RESULT_PREFIX - + f"extremities_to_request {len(extremities_to_request)}", + SynapseTags.RESULT_PREFIX + "extremities_to_request", str(extremities_to_request), ) + set_tag( + SynapseTags.RESULT_PREFIX + "extremities_to_request.length", + str(len(extremities_to_request)), + ) # Now we need to decide which hosts to hit first. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index aed7ed43acac..75f37b7060cf 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -738,9 +738,13 @@ async def _process_pulled_events( notification to clients, and validation of device keys.) """ set_tag( - SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(events)})", + SynapseTags.FUNC_ARG_PREFIX + "event_ids", str([event.event_id for event in events]), ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", + str(len(events)), + ) set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) logger.debug( "processing pulled backfilled=%s events=%s", @@ -1049,15 +1053,21 @@ async def _get_state_ids_after_missing_prev_event( missing_event_ids = missing_desired_event_ids | missing_auth_event_ids set_tag( - SynapseTags.RESULT_PREFIX - + f"missing_auth_event_ids ({len(missing_auth_event_ids)})", + SynapseTags.RESULT_PREFIX + "missing_auth_event_ids", str(missing_auth_event_ids), ) set_tag( - SynapseTags.RESULT_PREFIX - + f"missing_desired_event_ids ({len(missing_desired_event_ids)})", + SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length", + str(len(missing_auth_event_ids)), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "missing_desired_event_ids", str(missing_desired_event_ids), ) + set_tag( + SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length", + str(len(missing_desired_event_ids)), + ) # Making an individual request for each of 1000s of events has a lot of # overhead. On the other hand, we don't really want to fetch all of the events @@ -1132,9 +1142,13 @@ async def _get_state_ids_after_missing_prev_event( failed_to_fetch, ) set_tag( - SynapseTags.RESULT_PREFIX + f"failed_to_fetch ({len(failed_to_fetch)})", + SynapseTags.RESULT_PREFIX + "failed_to_fetch", str(failed_to_fetch), ) + set_tag( + SynapseTags.RESULT_PREFIX + "failed_to_fetch.length", + str(len(failed_to_fetch)), + ) if remote_event.is_state() and remote_event.rejected_reason is None: state_map[ @@ -1517,9 +1531,13 @@ async def _auth_and_persist_outliers( event_ids = event_map.keys() set_tag( - SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(event_ids)})", + SynapseTags.FUNC_ARG_PREFIX + "event_ids", str(event_ids), ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + str(len(event_ids)), + ) # filter out any events we have already seen. This might happen because # the events were eagerly pushed to us (eg, during a room join), or because @@ -1677,10 +1695,13 @@ async def _check_event_auth( origin, event ) set_tag( - SynapseTags.RESULT_PREFIX - + f"claimed_auth_events ({len(claimed_auth_events)})", + SynapseTags.RESULT_PREFIX + "claimed_auth_events", str([ev.event_id for ev in claimed_auth_events]), ) + set_tag( + SynapseTags.RESULT_PREFIX + "claimed_auth_events.length", + str(len(claimed_auth_events)), + ) # ... and check that the event passes auth at those auth events. # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu: @@ -2127,9 +2148,13 @@ async def persist_events_and_notify( if not backfilled: # Never notify for backfilled events with start_active_span("notify_persisted_events"): set_tag( - SynapseTags.RESULT_PREFIX + f"event_ids ({len(events)})", + SynapseTags.RESULT_PREFIX + "event_ids", str([ev.event_id for ev in events]), ) + set_tag( + SynapseTags.RESULT_PREFIX + "event_ids.length", + str(len(events)), + ) for event in events: await self._notify_persisted_event(event, max_stream_token) diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 854acec31ada..0969dc7dfb58 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -393,9 +393,13 @@ async def persist_events( event_ids.append(event.event_id) set_tag( - SynapseTags.FUNC_ARG_PREFIX + f"event_ids ({len(event_ids)})", + SynapseTags.FUNC_ARG_PREFIX + "event_ids", str(event_ids), ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "event_ids", + str(len(event_ids)), + ) set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) async def enqueue( From 878d9ce0be80b0ca36fd3d742e8aabf5c43c5c98 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 11 Aug 2022 15:16:37 -0500 Subject: [PATCH 09/13] Fix missing .length --- synapse/handlers/federation_event.py | 2 +- synapse/storage/controllers/persist_events.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 75f37b7060cf..dd0d610fe917 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -1535,7 +1535,7 @@ async def _auth_and_persist_outliers( str(event_ids), ) set_tag( - SynapseTags.FUNC_ARG_PREFIX + "event_ids", + SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", str(len(event_ids)), ) diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 0969dc7dfb58..dad3731b9b50 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -397,7 +397,7 @@ async def persist_events( str(event_ids), ) set_tag( - SynapseTags.FUNC_ARG_PREFIX + "event_ids", + SynapseTags.FUNC_ARG_PREFIX + "event_ids.length", str(len(event_ids)), ) set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled)) From af7ec779aa6e0bae3f6ad04f5bb7c35d935a1250 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 15 Aug 2022 14:29:28 -0500 Subject: [PATCH 10/13] Fix typo --- synapse/logging/opentracing.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index fbfa6709bd98..482316a1ff90 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -312,8 +312,7 @@ class SynapseTags: # Used to tag function arguments # - # Tag a named arg. The name of the argument should be appended to this. - # prefix + # Tag a named arg. The name of the argument should be appended to this prefix. FUNC_ARG_PREFIX = "ARG." # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`) FUNC_ARGS = "args" @@ -321,7 +320,7 @@ class SynapseTags: FUNC_KWARGS = "kwargs" # Some intermediate result that's interesting to the function. The label for - # the result should be appended to this. + # the result should be appended to this prefix. RESULT_PREFIX = "RESULT." From 92e3e6a3022e17a688fcc55b217335c6f4f67816 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 15 Aug 2022 14:39:58 -0500 Subject: [PATCH 11/13] Use while recursion See https://github.com/matrix-org/synapse/pull/13489#discussion_r944328189 --- .../storage/databases/main/events_worker.py | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 768912816c06..f28c19eb8bfa 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1095,18 +1095,22 @@ async def _get_events_from_db( fetched_events: Dict[str, _EventRow] = {} async def _recursively_fetch_redactions(row_map: Dict[str, _EventRow]) -> None: - # we need to recursively fetch any redactions of those events - redaction_ids: Set[str] = set() - for event_id, row in row_map.items(): - fetched_event_ids.add(event_id) - if row: - fetched_events[event_id] = row - redaction_ids.update(row.redactions) - events_to_fetch = redaction_ids.difference(fetched_event_ids) - if events_to_fetch: - logger.debug("Also fetching redaction events %s", events_to_fetch) - row_map = await self._enqueue_events(events_to_fetch) - await _recursively_fetch_redactions(row_map) + # We use a `while` here instead of recursively calling the function + # to avoid stack overflows because Python doesn't reliably catch + # them and can crash (eg. + # https://nvd.nist.gov/vuln/detail/CVE-2022-31052). + while row_map: + # we need to recursively fetch any redactions of those events + redaction_ids: Set[str] = set() + for event_id, row in row_map.items(): + fetched_event_ids.add(event_id) + if row: + fetched_events[event_id] = row + redaction_ids.update(row.redactions) + events_to_fetch = redaction_ids.difference(fetched_event_ids) + if events_to_fetch: + logger.debug("Also fetching redaction events %s", events_to_fetch) + row_map = await self._enqueue_events(events_to_fetch) events_to_fetch = event_ids row_map = await self._enqueue_events(events_to_fetch) From 799c3d583a8d0236c83f7b208c0fc277730f0b3e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 15 Aug 2022 14:55:51 -0500 Subject: [PATCH 12/13] Refactor back to iterative function See https://github.com/matrix-org/synapse/pull/13489#discussion_r944328189 --- .../storage/databases/main/events_worker.py | 56 +++++++++++-------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index f28c19eb8bfa..721bac47c886 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1094,29 +1094,41 @@ async def _get_events_from_db( fetched_event_ids: Set[str] = set() fetched_events: Dict[str, _EventRow] = {} - async def _recursively_fetch_redactions(row_map: Dict[str, _EventRow]) -> None: - # We use a `while` here instead of recursively calling the function - # to avoid stack overflows because Python doesn't reliably catch - # them and can crash (eg. - # https://nvd.nist.gov/vuln/detail/CVE-2022-31052). - while row_map: - # we need to recursively fetch any redactions of those events - redaction_ids: Set[str] = set() - for event_id, row in row_map.items(): - fetched_event_ids.add(event_id) - if row: - fetched_events[event_id] = row - redaction_ids.update(row.redactions) - events_to_fetch = redaction_ids.difference(fetched_event_ids) - if events_to_fetch: - logger.debug("Also fetching redaction events %s", events_to_fetch) - row_map = await self._enqueue_events(events_to_fetch) - - events_to_fetch = event_ids - row_map = await self._enqueue_events(events_to_fetch) - + async def _fetch_event_ids_and_get_outstanding_redactions( + event_ids_to_fetch: Collection[str], + ) -> Collection[str]: + """ + Fetch all of the given event_ids and return any associated redaction event_ids + that we still need to fetch in the next iteration. + """ + row_map = await self._enqueue_events(event_ids_to_fetch) + + # we need to recursively fetch any redactions of those events + redaction_ids: Set[str] = set() + for event_id in event_ids_to_fetch: + row = row_map.get(event_id) + fetched_event_ids.add(event_id) + if row: + fetched_events[event_id] = row + redaction_ids.update(row.redactions) + + event_ids_to_fetch = redaction_ids.difference(fetched_event_ids) + return event_ids_to_fetch + + # Grab the initial list of events requested + event_ids_to_fetch = await _fetch_event_ids_and_get_outstanding_redactions( + event_ids + ) + # Then go and recursively find all of the associated redactions with start_active_span("recursively fetching redactions"): - await _recursively_fetch_redactions(row_map) + while event_ids_to_fetch: + logger.debug("Also fetching redaction events %s", event_ids_to_fetch) + + event_ids_to_fetch = ( + await _fetch_event_ids_and_get_outstanding_redactions( + event_ids_to_fetch + ) + ) # build a map from event_id to EventBase event_map: Dict[str, EventBase] = {} From 43eab68898913f68b36d11e0b5d73cb614ba1f1f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 16 Aug 2022 11:39:44 -0500 Subject: [PATCH 13/13] Update changelog to stand on its own since it can't merge anymore --- changelog.d/13489.misc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/13489.misc b/changelog.d/13489.misc index 4b433a510781..5e4853860e81 100644 --- a/changelog.d/13489.misc +++ b/changelog.d/13489.misc @@ -1 +1 @@ -Instrument `/messages` for understandable traces in Jaeger. +Instrument the federation/backfill part of `/messages` for understandable traces in Jaeger.