Skip to content

Commit

Permalink
Merge pull request #550 from alephdata/feature/metrics
Browse files Browse the repository at this point in the history
Add Prometheus instrumentation for ingest-file workers
  • Loading branch information
tillprochaska authored Nov 22, 2023
2 parents 90629ea + 2b9a024 commit 768978c
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 5 deletions.
68 changes: 64 additions & 4 deletions ingestors/manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import magic
import logging
from timeit import default_timer
from tempfile import mkdtemp
from datetime import datetime
from pkg_resources import get_distribution
Expand All @@ -15,6 +16,7 @@
from sentry_sdk import capture_exception
from followthemoney.helpers import entity_filename
from followthemoney.namespace import Namespace
from prometheus_client import Counter, Histogram

from ingestors.directory import DirectoryIngestor
from ingestors.exc import ProcessingException, ENCRYPTED_MSG
Expand All @@ -23,6 +25,44 @@

log = logging.getLogger(__name__)

INGESTIONS_SUCCEEDED = Counter(
"ingestfile_ingestions_succeeded_total",
"Successful ingestions",
["ingestor"],
)
INGESTIONS_FAILED = Counter(
"ingestfile_ingestions_failed_total",
"Failed ingestions",
["ingestor"],
)
INGESTION_DURATION = Histogram(
"ingestfile_ingestion_duration_seconds",
"Ingest duration by ingestor",
["ingestor"],
# The bucket sizes are a rough guess right now, we might want to adjust
# them later based on observed durations
buckets=[
0.005,
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1,
5,
15,
60,
5 * 60,
15 * 60,
],
)
INGESTED_BYTES = Counter(
"ingestfile_ingested_bytes_total",
"Total number of bytes ingested",
["ingestor"],
)


class Manager(object):
"""Handles the lifecycle of an ingestor. This can be subclassed to embed it
Expand Down Expand Up @@ -141,8 +181,13 @@ def ingest_entity(self, entity):
def ingest(self, file_path, entity, **kwargs):
"""Main execution step of an ingestor."""
file_path = ensure_path(file_path)
if file_path.is_file() and not entity.has("fileSize"):
entity.add("fileSize", file_path.stat().st_size)
file_size = None

if file_path.is_file():
file_size = file_path.stat().st_size # size in bytes

if file_size is not None and not entity.has("fileSize"):
entity.add("fileSize", file_size)

now = datetime.now()
now_string = now.strftime("%Y-%m-%dT%H:%M:%S.%f")
Expand All @@ -151,14 +196,29 @@ def ingest(self, file_path, entity, **kwargs):
entity.set("processingAgent", get_distribution("ingest").version)
entity.set("processedAt", now_string)

ingestor_class = None
ingestor_name = None

try:
ingestor_class = self.auction(file_path, entity)
log.info("Ingestor [%r]: %s", entity, ingestor_class.__name__)
ingestor_name = ingestor_class.__name__
log.info(f"Ingestor [{repr(entity)}]: {ingestor_name}")

start_time = default_timer()
self.delegate(ingestor_class, file_path, entity)
duration = max(0, default_timer() - start_time)

INGESTIONS_SUCCEEDED.labels(ingestor=ingestor_name).inc()
INGESTION_DURATION.labels(ingestor=ingestor_name).observe(duration)

if file_size is not None:
INGESTED_BYTES.labels(ingestor=ingestor_name).inc(file_size)

entity.set("processingStatus", self.STATUS_SUCCESS)
except ProcessingException as pexc:
log.exception(f"[{repr(entity)}] Failed to process: {pexc}")
INGESTIONS_FAILED.labels(ingestor=ingestor_name).inc()
entity.set("processingError", stringify(pexc))
log.exception("[%r] Failed to process: %s", entity, pexc)
capture_exception(pexc)
finally:
self.finalize(entity)
Expand Down
9 changes: 9 additions & 0 deletions ingestors/support/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import subprocess

from followthemoney.helpers import entity_filename
from prometheus_client import Counter

from ingestors.support.cache import CacheSupport
from ingestors.support.temp import TempFileSupport
Expand All @@ -14,6 +15,12 @@
TIMEOUT = 3600 # seconds
CONVERT_RETRIES = 5

PDF_CACHE_ACCESSED = Counter(
"ingestfile_pdf_cache_accessed",
"Number of times the PDF cache has been accessed, by cache status",
["status"],
)


class DocumentConvertSupport(CacheSupport, TempFileSupport):
"""Provides helpers for UNO document conversion."""
Expand All @@ -25,10 +32,12 @@ def document_to_pdf(self, unique_tmpdir, file_path, entity):
file_name = entity_filename(entity, extension="pdf")
path = self.manager.load(pdf_hash, file_name=file_name)
if path is not None:
PDF_CACHE_ACCESSED.labels(status="hit").inc()
log.info("Using PDF cache: %s", file_name)
entity.set("pdfHash", pdf_hash)
return path

PDF_CACHE_ACCESSED.labels(status="miss").inc()
pdf_file = self._document_to_pdf(unique_tmpdir, file_path, entity)
if pdf_file is not None:
content_hash = self.manager.store(pdf_file)
Expand Down
4 changes: 4 additions & 0 deletions ingestors/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ftmstore import get_dataset
from servicelayer.worker import Worker
from servicelayer.logs import apply_task_context
from prometheus_client import Info

from ingestors import __version__
from ingestors.manager import Manager
Expand All @@ -12,6 +13,9 @@
OP_INGEST = "ingest"
OP_ANALYZE = "analyze"

SYSTEM = Info("ingestfile_system", "ingest-file system information")
SYSTEM.info({"ingestfile_version": __version__})


class IngestWorker(Worker):
"""A long running task runner that uses Redis as a task queue"""
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ normality==2.4.0
pantomime==0.6.1
followthemoney==3.5.2
followthemoney-store[postgresql]==3.0.6
servicelayer[google,amazon]==1.21.2
servicelayer[google,amazon]==1.22.1
languagecodes==1.1.1
countrytagger==0.1.2
pyicu==2.11
Expand Down

0 comments on commit 768978c

Please sign in to comment.