diff --git a/data-workflows/categories/__init__.py b/data-workflows/categories/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/data-workflows/categories/category_model.py b/data-workflows/categories/category_model.py new file mode 100644 index 000000000..60d7d26a5 --- /dev/null +++ b/data-workflows/categories/category_model.py @@ -0,0 +1,26 @@ +import os +import logging + +from pynamodb.attributes import ListAttribute, NumberAttribute, UnicodeAttribute +from pynamodb.models import Model +from utils.utils import get_current_timestamp + +STACK_NAME = os.getenv("STACK_NAME", "local") + +LOGGER = logging.getLogger() + + +class CategoryModel(Model): + class Meta: + host = os.getenv("LOCAL_DYNAMO_HOST") + region = os.getenv("AWS_REGION", "us-west-2") + table_name = f"{STACK_NAME}-category" + + name = UnicodeAttribute(hash_key=True) + version_hash = UnicodeAttribute(range_key=True) + version = UnicodeAttribute() + formatted_name = UnicodeAttribute() + dimension = UnicodeAttribute() + hierarchy = ListAttribute() # List[str] + label = UnicodeAttribute() + last_updated_timestamp = NumberAttribute(default_for_new=get_current_timestamp) diff --git a/data-workflows/categories/processor.py b/data-workflows/categories/processor.py new file mode 100644 index 000000000..addc865ff --- /dev/null +++ b/data-workflows/categories/processor.py @@ -0,0 +1,91 @@ +import os +import time +import hashlib +import logging + +from categories.category_model import CategoryModel +from slugify import slugify +from typing import Dict +from utils.env import get_required_env +from utils.s3 import S3Client + +STACK_NAME = os.getenv("STACK_NAME", "local") + +LOGGER = logging.getLogger() + + +def _hash_category(category: Dict[str, str]) -> str: + """ + Hashes a category object using the MD5 hash algorithm. This works by + creating a hash from the string and string array fields in the category + object. + """ + + label = category.get("label", "") + dimension = category.get("dimension") + hierarchy = category.get("hierarchy", []) + + category_hash = hashlib.new("md5") + category_hash.update(label.encode("utf-8")) + category_hash.update(dimension.encode("utf-8")) + + for value in hierarchy: + category_hash.update(value.encode("utf-8")) + + return category_hash.hexdigest() + + +def seed_s3_categories_workflow(version: str, categories_path: str): + """ + Runs data workflow for populating the category dynamo table from an S3 + source on the same depoyment stack. + + :param version: The categories version + :type version: str + :param categories_path: The categories path in S3 + :type categories_path: str + :raises ValueError: If params are not defined + """ + + if not all([version, categories_path]): + LOGGER.error( + f"Missing required values version={version} s3_path={categories_path}" + ) + raise ValueError() + + bucket = get_required_env("BUCKET") + s3_prefix = os.getenv("BUCKET_PATH", "") + + LOGGER.info( + f"Seeding {version} category data from S3 " + f"prefix={STACK_NAME} s3_path={categories_path} bucket={bucket} table={CategoryModel.Meta.table_name}" + ) + + client = S3Client( + bucket=bucket, + prefix=s3_prefix, + ) + data = client.load_json_from_s3(categories_path) + + batch = CategoryModel.batch_write() + start = time.perf_counter() + count = 0 + + for name, categories in data.items(): + for category in categories: + item = CategoryModel( + name=slugify(name), + version_hash=f"{version}:{_hash_category(category)}", + version=version, + formatted_name=name, + dimension=category.get("dimension", ""), + hierarchy=category.get("hierarchy", []), + label=category.get("label", ""), + ) + batch.save(item) + count += 1 + + batch.commit() + duration = (time.perf_counter() - start) * 1000 + + LOGGER.info(f"Finished seeding category data count={count} duration={duration}ms") diff --git a/data-workflows/handler.py b/data-workflows/handler.py index 9daed82b8..52461c765 100644 --- a/data-workflows/handler.py +++ b/data-workflows/handler.py @@ -2,6 +2,7 @@ import logging import activity.processor +import categories.processor LOGGER = logging.getLogger() @@ -9,16 +10,22 @@ def handle(event, context) -> None: - - for record in event.get('Records', []): - if 'body' not in record: + for record in event.get("Records", []): + if "body" not in record: continue - body = record.get('body') - LOGGER.info(f'Received message with body: {body}') - event_type = json.loads(body).get('type', '').lower() + body = record.get("body") + LOGGER.info(f"Received message with body: {body}") + event = json.loads(body) + event_type = event.get("type", "").lower() # TODO: Create a dict for event_type by method to be called - if event_type == 'activity': + if event_type == "activity": activity.processor.update_activity() - LOGGER.info(f'Update successful for type={event_type}') + LOGGER.info(f"Update successful for type={event_type}") + elif event_type == "seed-s3-categories": + version = event.get("version") + categories_path = event.get("categories_path") + + categories.processor.seed_s3_categories_workflow(version, categories_path) + LOGGER.info(f"Update successful for type={event_type}") diff --git a/data-workflows/requirements.txt b/data-workflows/requirements.txt index 0766eccd0..cddf79145 100644 --- a/data-workflows/requirements.txt +++ b/data-workflows/requirements.txt @@ -1,3 +1,4 @@ snowflake-connector-python==3.0.0 boto3==1.26.77 pynamodb==5.4.1 +python-slugify==8.0.1 diff --git a/data-workflows/run_workflow.py b/data-workflows/run_workflow.py new file mode 100644 index 000000000..49fcdb8e3 --- /dev/null +++ b/data-workflows/run_workflow.py @@ -0,0 +1,63 @@ +""" +Module containing functionality for running data workflows as a standalone script. +""" + +import argparse +import categories.processor +import logging +import activity.processor + +from typing import Dict + +LOGGER = logging.getLogger() + + +def run_workflow(event: Dict): + """ + Function for running a particular data workflow based on the event type. The + event dictionary should contain all of the necessary data to run the + specified data workflow. + + Events: + activity: {} + seed-s3-categories: { version: string, categories_path: string } + """ + + event_type = event.get("type", "").lower() + + if event_type == "activity": + activity.processor.update_activity() + LOGGER.info(f"Update successful for type={event_type}") + elif event_type == "seed-s3-categories": + version = event.get("version") + categories_path = event.get("categories_path") + + categories.processor.seed_s3_categories_workflow(version, categories_path) + LOGGER.info(f"Update successful for type={event_type}") + + +def _get_arg_parser(): + parser = argparse.ArgumentParser( + prog="run-workflow", + description="CLI for running hub data workflows", + ) + subparsers = parser.add_subparsers(required=True, dest="type") + + seed_s3_categories_parser = subparsers.add_parser( + "seed-s3-categories", help="categories help" + ) + seed_s3_categories_parser.add_argument("--version", required=True) + seed_s3_categories_parser.add_argument("--categories-path", required=True) + + subparsers.add_parser("activity", help="activity help") + + return parser + + +def _main(): + parser = _get_arg_parser() + run_workflow(vars(parser.parse_args())) + + +if __name__ == "__main__": + _main() diff --git a/data-workflows/tests/test_handler.py b/data-workflows/tests/test_handler.py index 2623ce9c0..c4ba60e99 100644 --- a/data-workflows/tests/test_handler.py +++ b/data-workflows/tests/test_handler.py @@ -48,4 +48,4 @@ def test_handle_invalid_json(self): def test_handle_invalid_event(self, event: Dict): from handler import handle handle(event, None) - self._verify() + self._verify() \ No newline at end of file diff --git a/data-workflows/utils/env.py b/data-workflows/utils/env.py new file mode 100644 index 000000000..c67a228ce --- /dev/null +++ b/data-workflows/utils/env.py @@ -0,0 +1,22 @@ +import os + + +def get_required_env(key: str) -> str: + """ + Utility for getting the value of an environment variable with the additional + constraint that it must be defined, otherwise the application will be unable + to run without it. + + :param key: The key for the environment variable. + :type key: str + :raises ValueError: If environment variable is not defined. + :return The environment variable value. + :rtype str + """ + + value = os.getenv(key) + + if not value: + raise ValueError(f"Required environment variable '{key}' is not defined") + + return value diff --git a/data-workflows/utils/s3.py b/data-workflows/utils/s3.py new file mode 100644 index 000000000..a898cf0e4 --- /dev/null +++ b/data-workflows/utils/s3.py @@ -0,0 +1,51 @@ +import boto3 +import json +import logging +import time + +from os import path +from typing import Any, Dict +from utils.time import print_perf_duration + + +class S3Client: + """ + Client for accessing S3 resources. + """ + + _bucket: str + _prefix: str + _client: Any + + def __init__(self, bucket: str, prefix: str = ""): + self._bucket = bucket + self._prefix = prefix + self._client = boto3.client("s3") + + def _get_complete_path(self, s3_path: str): + return path.join(self._prefix, s3_path) + + def _get_from_s3(self, s3_path: str): + start = time.perf_counter() + obj = self._client.get_object( + Bucket=self._bucket, Key=self._get_complete_path(s3_path) + ) + print_perf_duration(start, f"_get_from_s3('{s3_path}')") + + return obj["Body"].read().decode("utf-8") + + def load_json_from_s3(self, s3_path: str) -> Dict: + """ + Load JSON file from S3 path and convert to a Python dictionary. + """ + start = time.perf_counter() + result = {} + + try: + result = json.loads(self._get_from_s3(s3_path)) + except Exception as e: + logging.error(e) + + print_perf_duration(start, f"load_json_from_s3('{s3_path}')") + + return result diff --git a/data-workflows/utils/time.py b/data-workflows/utils/time.py new file mode 100644 index 000000000..20ef7b7cf --- /dev/null +++ b/data-workflows/utils/time.py @@ -0,0 +1,13 @@ +import time +import logging + +LOGGER = logging.getLogger() + + +def get_perf_duration(start: float) -> float: + return (time.perf_counter() - start) * 1000 + + +def print_perf_duration(start: float, message: str): + duration = get_perf_duration(start) + logging.info(f"{message} duration={duration}ms") diff --git a/data-workflows/utils/utils.py b/data-workflows/utils/utils.py index ca94c6d95..492672e54 100644 --- a/data-workflows/utils/utils.py +++ b/data-workflows/utils/utils.py @@ -1,15 +1,18 @@ +import boto3 import json -import os import time from datetime import date, datetime, timezone -import boto3 +from .env import get_required_env def get_current_timestamp() -> int: return round(time.time() * 1000) +LAST_UPDATED_TIMESTAMP_KEY = "last_activity_fetched_timestamp" + + def date_to_utc_timestamp_in_millis(timestamp: date) -> int: timestamp_datetime = datetime(timestamp.year, timestamp.month, timestamp.day) return datetime_to_utc_timestamp_in_millis(timestamp_datetime) @@ -19,19 +22,23 @@ def datetime_to_utc_timestamp_in_millis(timestamp: datetime) -> int: return int(timestamp.replace(tzinfo=timezone.utc).timestamp() * 1000) -LAST_UPDATED_TIMESTAMP_KEY = 'last_activity_fetched_timestamp' - - class ParameterStoreAdapter: - def __init__(self): - self._parameter_name: str = f'/{os.getenv("STACK_NAME")}/napari-hub/data-workflows/config' - self._ssm_client = boto3.client('ssm') + self._parameter_name: str = ( + f'/{get_required_env("STACK_NAME")}/napari-hub/data-workflows/config' + ) + self._ssm_client = boto3.client("ssm") def get_last_updated_timestamp(self) -> int: - response = self._ssm_client.get_parameter(Name=self._parameter_name, WithDecryption=True) - return json.loads(response['Parameter']['Value']).get(LAST_UPDATED_TIMESTAMP_KEY) + response = self._ssm_client.get_parameter( + Name=self._parameter_name, WithDecryption=True + ) + return json.loads(response["Parameter"]["Value"]).get( + LAST_UPDATED_TIMESTAMP_KEY + ) def set_last_updated_timestamp(self, timestamp) -> None: value = json.dumps({LAST_UPDATED_TIMESTAMP_KEY: timestamp}) - self._ssm_client.put_parameter(Name=self._parameter_name, Value=value, Overwrite=True, Type='SecureString') + self._ssm_client.put_parameter( + Name=self._parameter_name, Value=value, Overwrite=True, Type="SecureString" + )