Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Prometheus instrumentation for ingest-file workers #550

Merged
merged 9 commits into from
Nov 22, 2023
56 changes: 52 additions & 4 deletions ingestors/manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import magic
import logging
from timeit import default_timer
from tempfile import mkdtemp
from datetime import datetime
from pkg_resources import get_distribution
Expand All @@ -15,6 +16,7 @@
from sentry_sdk import capture_exception
from followthemoney.helpers import entity_filename
from followthemoney.namespace import Namespace
from prometheus_client import Counter, Histogram

from ingestors.directory import DirectoryIngestor
from ingestors.exc import ProcessingException
Expand All @@ -23,6 +25,27 @@

log = logging.getLogger(__name__)

INGEST_SUCCEEDED = Counter(
"ingest_succeeded_total",
"Successful ingestions",
["ingestor"],
)
INGEST_FAILED = Counter(
"ingest_failed_total",
"Failed ingestions",
["ingestor"],
)
INGEST_DURATION = Histogram(
"ingest_duration_seconds",
"Ingest duration by ingestor",
["ingestor"],
)
INGEST_INGESTED_BYTES = Counter(
"ingest_ingested_bytes_total",
"Total number of bytes ingested",
["ingestor"],
)


class Manager(object):
"""Handles the lifecycle of an ingestor. This can be subclassed to embed it
Expand Down Expand Up @@ -138,8 +161,13 @@ def ingest_entity(self, entity):
def ingest(self, file_path, entity, **kwargs):
"""Main execution step of an ingestor."""
file_path = ensure_path(file_path)
if file_path.is_file() and not entity.has("fileSize"):
entity.add("fileSize", file_path.stat().st_size)
file_size = None

if file_path.is_file():
file_size = file_path.stat().st_size # size in bytes

if file_size is not None and not entity.has("fileSize"):
entity.add("fileSize", file_size)

now = datetime.now()
now_string = now.strftime("%Y-%m-%dT%H:%M:%S.%f")
Expand All @@ -148,14 +176,34 @@ def ingest(self, file_path, entity, **kwargs):
entity.set("processingAgent", get_distribution("ingest").version)
entity.set("processedAt", now_string)

ingestor_class = None
ingestor_name = None

try:
ingestor_class = self.auction(file_path, entity)
log.info("Ingestor [%r]: %s", entity, ingestor_class.__name__)
ingestor_name = ingestor_class.__name__
log.info("Ingestor [%r]: %s", entity, ingestor_name)

start_time = default_timer()
self.delegate(ingestor_class, file_path, entity)
duration = max(0, default_timer() - start_time)

INGEST_SUCCEEDED.labels(ingestor_name).inc()
INGEST_DURATION.labels(ingestor_name).observe(duration)

if file_size is not None:
INGEST_INGESTED_BYTES.labels(ingestor_name).inc(file_size)

entity.set("processingStatus", self.STATUS_SUCCESS)
except ProcessingException as pexc:
entity.set("processingError", stringify(pexc))
log.exception("[%r] Failed to process: %s", entity, pexc)

if ingestor_name:
INGEST_FAILED.labels(ingestor_name).inc()
else:
INGEST_FAILED.labels(None).inc()

entity.set("processingError", stringify(pexc))
capture_exception(pexc)
finally:
self.finalize(entity)
Expand Down
9 changes: 9 additions & 0 deletions ingestors/support/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import subprocess

from followthemoney.helpers import entity_filename
from prometheus_client import Counter

from ingestors.support.cache import CacheSupport
from ingestors.support.temp import TempFileSupport
Expand All @@ -14,6 +15,12 @@
TIMEOUT = 3600 # seconds
CONVERT_RETRIES = 5

INGEST_PDF_CACHE_ACCESSED = Counter(
"ingest_pdf_cache_accessed",
"Number of times the PDF cache has been accessed, by cache status",
["status"],
)


class DocumentConvertSupport(CacheSupport, TempFileSupport):
"""Provides helpers for UNO document conversion."""
Expand All @@ -25,10 +32,12 @@ def document_to_pdf(self, unique_tmpdir, file_path, entity):
file_name = entity_filename(entity, extension="pdf")
path = self.manager.load(pdf_hash, file_name=file_name)
if path is not None:
INGEST_PDF_CACHE_ACCESSED.labels("hit").inc()
log.info("Using PDF cache: %s", file_name)
entity.set("pdfHash", pdf_hash)
return path

INGEST_PDF_CACHE_ACCESSED.labels("miss").inc()
pdf_file = self._document_to_pdf(unique_tmpdir, file_path, entity)
if pdf_file is not None:
content_hash = self.manager.store(pdf_file)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ normality==2.4.0
pantomime==0.6.1
followthemoney==3.5.2
followthemoney-store[postgresql]==3.0.6
servicelayer[google,amazon]==1.21.2
servicelayer[google,amazon]==1.22.0
languagecodes==1.1.1
countrytagger==0.1.2
pyicu==2.11
Expand Down