Skip to content

Commit

Permalink
Make inflight background metrics more efficient. (matrix-org#7597)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Morgan <[email protected]>
  • Loading branch information
2 people authored and phil-flex committed Jun 16, 2020
1 parent ceeebb2 commit a8f2ba3
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 34 deletions.
1 change: 1 addition & 0 deletions changelog.d/7597.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix metrics failing when there is a large number of active background processes.
104 changes: 70 additions & 34 deletions synapse/metrics/background_process_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
import threading
from asyncio import iscoroutine
from functools import wraps
from typing import Dict, Set
from typing import TYPE_CHECKING, Dict, Optional, Set

import six

from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
from prometheus_client.core import REGISTRY, Counter, Gauge

from twisted.internet import defer

from synapse.logging.context import LoggingContext, PreserveLoggingContext

if TYPE_CHECKING:
import resource


logger = logging.getLogger(__name__)


Expand All @@ -36,6 +38,12 @@
["name"],
)

_background_process_in_flight_count = Gauge(
"synapse_background_process_in_flight_count",
"Number of background processes in flight",
labelnames=["name"],
)

# we set registry=None in all of these to stop them getting registered with
# the default registry. Instead we collect them all via the CustomCollector,
# which ensures that we can update them before they are collected.
Expand Down Expand Up @@ -83,13 +91,17 @@
# it's much simpler to do so than to try to combine them.)
_background_process_counts = {} # type: Dict[str, int]

# map from description to the currently running background processes.
# Set of all running background processes that became active active since the
# last time metrics were scraped (i.e. background processes that performed some
# work since the last scrape.)
#
# it's kept as a dict of sets rather than a big set so that we can keep track
# of process descriptions that no longer have any active processes.
_background_processes = {} # type: Dict[str, Set[_BackgroundProcess]]
# We do it like this to handle the case where we have a large number of
# background processes stacking up behind a lock or linearizer, where we then
# only need to iterate over and update metrics for the process that have
# actually been active and can ignore the idle ones.
_background_processes_active_since_last_scrape = set() # type: Set[_BackgroundProcess]

# A lock that covers the above dicts
# A lock that covers the above set and dict
_bg_metrics_lock = threading.Lock()


Expand All @@ -101,25 +113,16 @@ class _Collector(object):
"""

def collect(self):
background_process_in_flight_count = GaugeMetricFamily(
"synapse_background_process_in_flight_count",
"Number of background processes in flight",
labels=["name"],
)
global _background_processes_active_since_last_scrape

# We copy the dict so that it doesn't change from underneath us.
# We also copy the process lists as that can also change
# We swap out the _background_processes set with an empty one so that
# we can safely iterate over the set without holding the lock.
with _bg_metrics_lock:
_background_processes_copy = {
k: list(v) for k, v in six.iteritems(_background_processes)
}
_background_processes_copy = _background_processes_active_since_last_scrape
_background_processes_active_since_last_scrape = set()

for desc, processes in six.iteritems(_background_processes_copy):
background_process_in_flight_count.add_metric((desc,), len(processes))
for process in processes:
process.update_metrics()

yield background_process_in_flight_count
for process in _background_processes_copy:
process.update_metrics()

# now we need to run collect() over each of the static Counters, and
# yield each metric they return.
Expand Down Expand Up @@ -191,13 +194,10 @@ def run():
_background_process_counts[desc] = count + 1

_background_process_start_count.labels(desc).inc()
_background_process_in_flight_count.labels(desc).inc()

with LoggingContext(desc) as context:
with BackgroundProcessLoggingContext(desc) as context:
context.request = "%s-%i" % (desc, count)
proc = _BackgroundProcess(desc, context)

with _bg_metrics_lock:
_background_processes.setdefault(desc, set()).add(proc)

try:
result = func(*args, **kwargs)
Expand All @@ -214,10 +214,7 @@ def run():
except Exception:
logger.exception("Background process '%s' threw an exception", desc)
finally:
proc.update_metrics()

with _bg_metrics_lock:
_background_processes[desc].remove(proc)
_background_process_in_flight_count.labels(desc).dec()

with PreserveLoggingContext():
return run()
Expand All @@ -238,3 +235,42 @@ def wrap_as_background_process_inner_2(*args, **kwargs):
return wrap_as_background_process_inner_2

return wrap_as_background_process_inner


class BackgroundProcessLoggingContext(LoggingContext):
"""A logging context that tracks in flight metrics for background
processes.
"""

__slots__ = ["_proc"]

def __init__(self, name: str):
super().__init__(name)

self._proc = _BackgroundProcess(name, self)

def start(self, rusage: "Optional[resource._RUsage]"):
"""Log context has started running (again).
"""

super().start(rusage)

# We've become active again so we make sure we're in the list of active
# procs. (Note that "start" here means we've become active, as opposed
# to starting for the first time.)
with _bg_metrics_lock:
_background_processes_active_since_last_scrape.add(self._proc)

def __exit__(self, type, value, traceback) -> None:
"""Log context has finished.
"""

super().__exit__(type, value, traceback)

# The background process has finished. We explictly remove and manually
# update the metrics here so that if nothing is scraping metrics the set
# doesn't infinitely grow.
with _bg_metrics_lock:
_background_processes_active_since_last_scrape.discard(self._proc)

self._proc.update_metrics()

0 comments on commit a8f2ba3

Please sign in to comment.