Skip to content

Commit

Permalink
Add Prometheus instrumentation for ingest-file workers
Browse files Browse the repository at this point in the history
  • Loading branch information
tillprochaska committed Nov 13, 2023
1 parent 14118ec commit ef0ee23
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 4 deletions.
49 changes: 46 additions & 3 deletions ingestors/manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import magic
import logging
from timeit import default_timer
from tempfile import mkdtemp
from datetime import datetime
from pkg_resources import get_distribution
Expand All @@ -15,6 +16,7 @@
from sentry_sdk import capture_exception
from followthemoney.helpers import entity_filename
from followthemoney.namespace import Namespace
from prometheus_client import Counter, Histogram

from ingestors.directory import DirectoryIngestor
from ingestors.exc import ProcessingException
Expand All @@ -23,6 +25,27 @@

log = logging.getLogger(__name__)

INGEST_SUCCEEDED = Counter(
"ingest_succeeded_total",
"Successful ingestions",
["ingestor"],
)
INGEST_FAILED = Counter(
"ingest_failed_total",
"Failed ingestions",
["ingestor"],
)
INGEST_DURATION = Histogram(
"ingest_duration_seconds",
"Ingest duration by ingestor",
["ingestor"],
)
INGEST_INGESTED_BYTES = Counter(
"ingest_ingested_bytes_total",
"Total number of bytes ingested",
["ingestor"],
)


class Manager(object):
"""Handles the lifecycle of an ingestor. This can be subclassed to embed it
Expand Down Expand Up @@ -138,8 +161,10 @@ def ingest_entity(self, entity):
def ingest(self, file_path, entity, **kwargs):
"""Main execution step of an ingestor."""
file_path = ensure_path(file_path)
file_size = None
if file_path.is_file() and not entity.has("fileSize"):
entity.add("fileSize", file_path.stat().st_size)
file_size = file_path.stat().st_size # size in bytes
entity.add("fileSize", file_size)

now = datetime.now()
now_string = now.strftime("%Y-%m-%dT%H:%M:%S.%f")
Expand All @@ -148,14 +173,32 @@ def ingest(self, file_path, entity, **kwargs):
entity.set("processingAgent", get_distribution("ingest").version)
entity.set("processedAt", now_string)

ingestor_class = None
ingestor_name = None

try:
ingestor_class = self.auction(file_path, entity)
log.info("Ingestor [%r]: %s", entity, ingestor_class.__name__)
ingestor_name = ingestor_class.__name__
log.info("Ingestor [%r]: %s", entity, ingestor_name)

start_time = default_timer()
self.delegate(ingestor_class, file_path, entity)
duration = max(0, default_timer() - start_time)

INGEST_SUCCEEDED.labels(ingestor_name).inc()
INGEST_DURATION.labels(ingestor_name).observe(duration)
INGEST_INGESTED_BYTES.labels(ingestor_name).inc(file_size)

entity.set("processingStatus", self.STATUS_SUCCESS)
except ProcessingException as pexc:
entity.set("processingError", stringify(pexc))
log.exception("[%r] Failed to process: %s", entity, pexc)

if ingestor_name:
INGEST_FAILED.labels(ingestor_name).inc()
else:
INGEST_FAILED.labels(None).inc()

entity.set("processingError", stringify(pexc))
capture_exception(pexc)
finally:
self.finalize(entity)
Expand Down
9 changes: 9 additions & 0 deletions ingestors/support/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import subprocess

from followthemoney.helpers import entity_filename
from prometheus_client import Counter

from ingestors.support.cache import CacheSupport
from ingestors.support.temp import TempFileSupport
Expand All @@ -14,6 +15,12 @@
TIMEOUT = 3600 # seconds
CONVERT_RETRIES = 5

INGEST_PDF_CACHE_ACCESSED = Counter(
"ingest_pdf_cache_accessed",
"Number of times the PDF cache has been accessed, by cache status",
["status"],
)


class DocumentConvertSupport(CacheSupport, TempFileSupport):
"""Provides helpers for UNO document conversion."""
Expand All @@ -25,10 +32,12 @@ def document_to_pdf(self, unique_tmpdir, file_path, entity):
file_name = entity_filename(entity, extension="pdf")
path = self.manager.load(pdf_hash, file_name=file_name)
if path is not None:
INGEST_PDF_CACHE_ACCESSED.labels("hit").inc()
log.info("Using PDF cache: %s", file_name)
entity.set("pdfHash", pdf_hash)
return path

INGEST_PDF_CACHE_ACCESSED.labels("miss").inc()
pdf_file = self._document_to_pdf(unique_tmpdir, file_path, entity)
if pdf_file is not None:
content_hash = self.manager.store(pdf_file)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ normality==2.4.0
pantomime==0.6.1
followthemoney==3.5.2
followthemoney-store[postgresql]==3.0.6
servicelayer[google,amazon]==1.21.2
servicelayer[google,amazon]==1.22.0
languagecodes==1.1.1
countrytagger==0.1.2
pyicu==2.11
Expand Down

0 comments on commit ef0ee23

Please sign in to comment.