From 0a7091ed2104f5fc861c812a5b0bedfd3abaa1e4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 May 2020 11:32:35 +0100 Subject: [PATCH 1/6] Make inflight background metrics more efficient. --- synapse/metrics/background_process_metrics.py | 98 +++++++++++++------ 1 file changed, 67 insertions(+), 31 deletions(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 8449ef82f7bb..66a6fb02358d 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -17,16 +17,18 @@ import threading from asyncio import iscoroutine from functools import wraps -from typing import Dict, Set +from typing import TYPE_CHECKING, Dict, Optional, Set -import six - -from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily +from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer from synapse.logging.context import LoggingContext, PreserveLoggingContext +if TYPE_CHECKING: + import resource + + logger = logging.getLogger(__name__) @@ -36,6 +38,12 @@ ["name"], ) +_background_process_in_flight_count = Gauge( + "synapse_background_process_in_flight_count", + "Number of background processes in flight", + labelnames=["name"], +) + # we set registry=None in all of these to stop them getting registered with # the default registry. Instead we collect them all via the CustomCollector, # which ensures that we can update them before they are collected. @@ -83,11 +91,14 @@ # it's much simpler to do so than to try to combine them.) _background_process_counts = {} # type: Dict[str, int] -# map from description to the currently running background processes. +# Set of all running background processes that have been active since the last +# time metrics were scraped. # -# it's kept as a dict of sets rather than a big set so that we can keep track -# of process descriptions that no longer have any active processes. -_background_processes = {} # type: Dict[str, Set[_BackgroundProcess]] +# We do it like this to handle the case where we have a large number of +# background processes stacking up behind a lock or linearizer, where we then +# only need to iterate over and update metrics for the process that have +# actually been active and can ignore the idle ones. +_background_processes_active_since_last_scrape = set() # type: Set[_BackgroundProcess] # A lock that covers the above dicts _bg_metrics_lock = threading.Lock() @@ -101,26 +112,18 @@ class _Collector(object): """ def collect(self): - background_process_in_flight_count = GaugeMetricFamily( - "synapse_background_process_in_flight_count", - "Number of background processes in flight", - labels=["name"], - ) + global _background_processes_active_since_last_scrape - # We copy the dict so that it doesn't change from underneath us. - # We also copy the process lists as that can also change + # We swap out the _background_processes set with an empty one so that + # we can safely iterate over the set without holding the lock. with _bg_metrics_lock: - _background_processes_copy = { - k: list(v) for k, v in six.iteritems(_background_processes) - } + _background_processes_copy = _background_processes_active_since_last_scrape + _background_processes_active_since_last_scrape = set() - for desc, processes in six.iteritems(_background_processes_copy): - background_process_in_flight_count.add_metric((desc,), len(processes)) + for processes in _background_processes_copy: for process in processes: process.update_metrics() - yield background_process_in_flight_count - # now we need to run collect() over each of the static Counters, and # yield each metric they return. for m in ( @@ -191,13 +194,10 @@ def run(): _background_process_counts[desc] = count + 1 _background_process_start_count.labels(desc).inc() + _background_process_in_flight_count.labels(desc).inc() - with LoggingContext(desc) as context: + with BackgroundProcessLoggingContext(desc) as context: context.request = "%s-%i" % (desc, count) - proc = _BackgroundProcess(desc, context) - - with _bg_metrics_lock: - _background_processes.setdefault(desc, set()).add(proc) try: result = func(*args, **kwargs) @@ -214,10 +214,7 @@ def run(): except Exception: logger.exception("Background process '%s' threw an exception", desc) finally: - proc.update_metrics() - - with _bg_metrics_lock: - _background_processes[desc].remove(proc) + _background_process_in_flight_count.labels(desc).dec() with PreserveLoggingContext(): return run() @@ -238,3 +235,42 @@ def wrap_as_background_process_inner_2(*args, **kwargs): return wrap_as_background_process_inner_2 return wrap_as_background_process_inner + + +class BackgroundProcessLoggingContext(LoggingContext): + """A logging context that tracks in flight metrics for background + processes. + """ + + __slots__ = ["_proc"] + + def __init__(self, name: str): + super().__init__(name) + + self._proc = _BackgroundProcess(name, self) + + def start(self, rusage: "Optional[resource._RUsage]"): + """Log context has started running (again). + """ + + super().start(rusage) + + # We've become active again so we make sure we're in the list of active + # procs. (Note that "start" here means we've become active, as opposed + # to starting for the first time.) + with _bg_metrics_lock: + _background_processes_active_since_last_scrape.add(self._proc) + + def __exit__(self, type, value, traceback) -> None: + """Log context has finished. + """ + + super().__exit__(type, value, traceback) + + # The background process has finished. We explictly remove and manually + # update the metrics here so that if nothing is scraping metrics the set + # doesn't infinitely grow. + with _bg_metrics_lock: + _background_processes_active_since_last_scrape.discard(self._proc) + + self._proc.update_metrics() From 5b5a6a56c342559574e17c53e39ae45eec016950 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 May 2020 12:02:21 +0100 Subject: [PATCH 2/6] Newsfile --- changelog.d/7597.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7597.bugfix diff --git a/changelog.d/7597.bugfix b/changelog.d/7597.bugfix new file mode 100644 index 000000000000..e2ff951915f2 --- /dev/null +++ b/changelog.d/7597.bugfix @@ -0,0 +1 @@ +Fix metrics failing when there is a large number of active background processes. From 60783b31a1483a5b6c4de443053adad4101b8442 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 May 2020 12:10:54 +0100 Subject: [PATCH 3/6] Fix incorrect iteration --- synapse/metrics/background_process_metrics.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 66a6fb02358d..1d812ff27a9c 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -120,9 +120,8 @@ def collect(self): _background_processes_copy = _background_processes_active_since_last_scrape _background_processes_active_since_last_scrape = set() - for processes in _background_processes_copy: - for process in processes: - process.update_metrics() + for process in _background_processes_copy: + process.update_metrics() # now we need to run collect() over each of the static Counters, and # yield each metric they return. From ae630f32a2c3b892d3ca74c1a6d89e44f33abcd9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 May 2020 12:44:06 +0100 Subject: [PATCH 4/6] Fixup comment Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- synapse/metrics/background_process_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 1d812ff27a9c..339a0dfbcd27 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -100,7 +100,7 @@ # actually been active and can ignore the idle ones. _background_processes_active_since_last_scrape = set() # type: Set[_BackgroundProcess] -# A lock that covers the above dicts +# A lock that covers the above set and dict _bg_metrics_lock = threading.Lock() From fa5d4bff4dd00ede5fc640eb4dc61e20efba5d68 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 May 2020 12:56:16 +0100 Subject: [PATCH 5/6] Fixup style --- synapse/metrics/background_process_metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 339a0dfbcd27..75cf97e54ec0 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -100,7 +100,7 @@ # actually been active and can ignore the idle ones. _background_processes_active_since_last_scrape = set() # type: Set[_BackgroundProcess] -# A lock that covers the above set and dict +# A lock that covers the above set and dict _bg_metrics_lock = threading.Lock() From faee73ce340b9586e9142eebfcf1c261671afb33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 May 2020 13:23:37 +0100 Subject: [PATCH 6/6] Update comment --- synapse/metrics/background_process_metrics.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 75cf97e54ec0..13785038ad96 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -91,8 +91,9 @@ # it's much simpler to do so than to try to combine them.) _background_process_counts = {} # type: Dict[str, int] -# Set of all running background processes that have been active since the last -# time metrics were scraped. +# Set of all running background processes that became active active since the +# last time metrics were scraped (i.e. background processes that performed some +# work since the last scrape.) # # We do it like this to handle the case where we have a large number of # background processes stacking up behind a lock or linearizer, where we then