Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Script for seeding category data from S3 #959

Merged
merged 29 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
214703b
Cleaning data-workflow handler
manasaV3 Apr 5, 2023
53263e4
Script for seeding category data from S3
codemonkey800 Mar 22, 2023
8acc33f
Address code review feedback
codemonkey800 Mar 29, 2023
9a7569d
Address feedback
codemonkey800 Apr 3, 2023
9ff2799
Move workflows back to handler
codemonkey800 Apr 6, 2023
56e863b
Address feedback
codemonkey800 Apr 8, 2023
ad192f7
Address feedback
codemonkey800 Apr 13, 2023
192ea9a
fix test error
codemonkey800 Apr 13, 2023
146013f
Merge branch 'main' into jeremy/seed-categories-script
codemonkey800 May 2, 2023
2d2954d
Add missing line
codemonkey800 May 2, 2023
c12da4f
fix merge
codemonkey800 May 2, 2023
dc636e2
Merge branch 'main' into jeremy/seed-categories-script
codemonkey800 May 5, 2023
376bb57
Rename s3_path -> categories_path
codemonkey800 May 5, 2023
8f1f688
Update docstring
codemonkey800 May 5, 2023
d19c291
Call module functions directly
codemonkey800 May 5, 2023
2977d71
Rename s3_path -> categories_path params
codemonkey800 May 5, 2023
ae2bce7
Add timing to load_json_from_s3
codemonkey800 May 5, 2023
8dacebb
Fix merge issue
codemonkey800 May 5, 2023
a1c4e03
Update logger usage
codemonkey800 May 5, 2023
6e6b31f
Add default value "local"
codemonkey800 May 5, 2023
011285d
Add types to params
codemonkey800 May 5, 2023
fbdb3e9
Add event data structure info
codemonkey800 May 5, 2023
736632c
newline
codemonkey800 May 5, 2023
ba18797
Fix test error
codemonkey800 May 5, 2023
46f3a16
remove extra space
codemonkey800 May 8, 2023
8b4472a
fix all requiring iterable
codemonkey800 May 8, 2023
36f71ff
Merge branch 'main' into jeremy/seed-categories-script
codemonkey800 May 9, 2023
1163a6a
Use processor update_activity
codemonkey800 May 9, 2023
c469c1c
Move to categories/ dir
codemonkey800 May 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
26 changes: 26 additions & 0 deletions data-workflows/categories/category_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
import logging

from pynamodb.attributes import ListAttribute, NumberAttribute, UnicodeAttribute
from pynamodb.models import Model
from utils.utils import get_current_timestamp

STACK_NAME = os.getenv("STACK_NAME", "local")

LOGGER = logging.getLogger()


class CategoryModel(Model):
class Meta:
host = os.getenv("LOCAL_DYNAMO_HOST")
region = os.getenv("AWS_REGION", "us-west-2")
table_name = f"{STACK_NAME}-category"

name = UnicodeAttribute(hash_key=True)
version_hash = UnicodeAttribute(range_key=True)
version = UnicodeAttribute()
formatted_name = UnicodeAttribute()
dimension = UnicodeAttribute()
hierarchy = ListAttribute() # List[str]
label = UnicodeAttribute()
last_updated_timestamp = NumberAttribute(default_for_new=get_current_timestamp)
91 changes: 91 additions & 0 deletions data-workflows/categories/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import os
import time
import hashlib
import logging

from categories.category_model import CategoryModel
from slugify import slugify
from typing import Dict
from utils.env import get_required_env
from utils.s3 import S3Client

STACK_NAME = os.getenv("STACK_NAME", "local")

LOGGER = logging.getLogger()


def _hash_category(category: Dict[str, str]) -> str:
"""
Hashes a category object using the MD5 hash algorithm. This works by
creating a hash from the string and string array fields in the category
object.
"""

label = category.get("label", "")
dimension = category.get("dimension")
hierarchy = category.get("hierarchy", [])

category_hash = hashlib.new("md5")
category_hash.update(label.encode("utf-8"))
category_hash.update(dimension.encode("utf-8"))

for value in hierarchy:
category_hash.update(value.encode("utf-8"))

return category_hash.hexdigest()


def seed_s3_categories_workflow(version: str, categories_path: str):
"""
Runs data workflow for populating the category dynamo table from an S3
source on the same depoyment stack.

:param version: The categories version
:type version: str
:param categories_path: The categories path in S3
:type categories_path: str
:raises ValueError: If params are not defined
"""

if not all([version, categories_path]):
LOGGER.error(
f"Missing required values version={version} s3_path={categories_path}"
)
raise ValueError()

bucket = get_required_env("BUCKET")
s3_prefix = os.getenv("BUCKET_PATH", "")

LOGGER.info(
f"Seeding {version} category data from S3 "
f"prefix={STACK_NAME} s3_path={categories_path} bucket={bucket} table={CategoryModel.Meta.table_name}"
)

client = S3Client(
bucket=bucket,
prefix=s3_prefix,
)
data = client.load_json_from_s3(categories_path)

batch = CategoryModel.batch_write()
start = time.perf_counter()
count = 0

for name, categories in data.items():
for category in categories:
item = CategoryModel(
name=slugify(name),
version_hash=f"{version}:{_hash_category(category)}",
version=version,
formatted_name=name,
dimension=category.get("dimension", ""),
hierarchy=category.get("hierarchy", []),
label=category.get("label", ""),
)
batch.save(item)
count += 1

batch.commit()
duration = (time.perf_counter() - start) * 1000

LOGGER.info(f"Finished seeding category data count={count} duration={duration}ms")
23 changes: 15 additions & 8 deletions data-workflows/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,30 @@
import logging

import activity.processor
import categories.processor


LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handle(event, context) -> None:

for record in event.get('Records', []):
if 'body' not in record:
for record in event.get("Records", []):
if "body" not in record:
continue

body = record.get('body')
LOGGER.info(f'Received message with body: {body}')
event_type = json.loads(body).get('type', '').lower()
body = record.get("body")
LOGGER.info(f"Received message with body: {body}")
event = json.loads(body)
event_type = event.get("type", "").lower()

# TODO: Create a dict for event_type by method to be called
if event_type == 'activity':
if event_type == "activity":
activity.processor.update_activity()
LOGGER.info(f'Update successful for type={event_type}')
LOGGER.info(f"Update successful for type={event_type}")
elif event_type == "seed-s3-categories":
version = event.get("version")
categories_path = event.get("categories_path")

categories.processor.seed_s3_categories_workflow(version, categories_path)
LOGGER.info(f"Update successful for type={event_type}")
1 change: 1 addition & 0 deletions data-workflows/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
snowflake-connector-python==3.0.0
boto3==1.26.77
pynamodb==5.4.1
python-slugify==8.0.1
63 changes: 63 additions & 0 deletions data-workflows/run_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
Module containing functionality for running data workflows as a standalone script.
"""

import argparse
import categories.processor
import logging
import activity.processor

from typing import Dict

LOGGER = logging.getLogger()


def run_workflow(event: Dict):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely done! Having this workflow is super helpful for the future!

"""
Function for running a particular data workflow based on the event type. The
event dictionary should contain all of the necessary data to run the
specified data workflow.

Events:
activity: {}
seed-s3-categories: { version: string, categories_path: string }
"""

event_type = event.get("type", "").lower()

if event_type == "activity":
activity.processor.update_activity()
LOGGER.info(f"Update successful for type={event_type}")
elif event_type == "seed-s3-categories":
version = event.get("version")
categories_path = event.get("categories_path")

categories.processor.seed_s3_categories_workflow(version, categories_path)
LOGGER.info(f"Update successful for type={event_type}")


def _get_arg_parser():
parser = argparse.ArgumentParser(
prog="run-workflow",
description="CLI for running hub data workflows",
)
subparsers = parser.add_subparsers(required=True, dest="type")

seed_s3_categories_parser = subparsers.add_parser(
"seed-s3-categories", help="categories help"
)
seed_s3_categories_parser.add_argument("--version", required=True)
seed_s3_categories_parser.add_argument("--categories-path", required=True)

subparsers.add_parser("activity", help="activity help")

return parser


def _main():
parser = _get_arg_parser()
run_workflow(vars(parser.parse_args()))


if __name__ == "__main__":
_main()
2 changes: 1 addition & 1 deletion data-workflows/tests/test_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ def test_handle_invalid_json(self):
def test_handle_invalid_event(self, event: Dict):
from handler import handle
handle(event, None)
self._verify()
self._verify()
22 changes: 22 additions & 0 deletions data-workflows/utils/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os


def get_required_env(key: str) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really cool idea. 😀

"""
Utility for getting the value of an environment variable with the additional
constraint that it must be defined, otherwise the application will be unable
to run without it.

:param key: The key for the environment variable.
:type key: str
:raises ValueError: If environment variable is not defined.
:return The environment variable value.
:rtype str
"""

value = os.getenv(key)

if not value:
raise ValueError(f"Required environment variable '{key}' is not defined")

return value
51 changes: 51 additions & 0 deletions data-workflows/utils/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import boto3
import json
import logging
import time

from os import path
from typing import Any, Dict
from utils.time import print_perf_duration


class S3Client:
"""
Client for accessing S3 resources.
"""

_bucket: str
_prefix: str
_client: Any

def __init__(self, bucket: str, prefix: str = ""):
self._bucket = bucket
self._prefix = prefix
self._client = boto3.client("s3")

def _get_complete_path(self, s3_path: str):
return path.join(self._prefix, s3_path)

def _get_from_s3(self, s3_path: str):
start = time.perf_counter()
obj = self._client.get_object(
Bucket=self._bucket, Key=self._get_complete_path(s3_path)
)
print_perf_duration(start, f"_get_from_s3('{s3_path}')")

return obj["Body"].read().decode("utf-8")

def load_json_from_s3(self, s3_path: str) -> Dict:
"""
Load JSON file from S3 path and convert to a Python dictionary.
"""
start = time.perf_counter()
result = {}

try:
result = json.loads(self._get_from_s3(s3_path))
except Exception as e:
logging.error(e)

print_perf_duration(start, f"load_json_from_s3('{s3_path}')")

return result
13 changes: 13 additions & 0 deletions data-workflows/utils/time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import time
import logging

LOGGER = logging.getLogger()


def get_perf_duration(start: float) -> float:
return (time.perf_counter() - start) * 1000


def print_perf_duration(start: float, message: str):
duration = get_perf_duration(start)
logging.info(f"{message} duration={duration}ms")
29 changes: 18 additions & 11 deletions data-workflows/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import boto3
import json
import os
import time
from datetime import date, datetime, timezone

import boto3
from .env import get_required_env


def get_current_timestamp() -> int:
return round(time.time() * 1000)


LAST_UPDATED_TIMESTAMP_KEY = "last_activity_fetched_timestamp"


def date_to_utc_timestamp_in_millis(timestamp: date) -> int:
timestamp_datetime = datetime(timestamp.year, timestamp.month, timestamp.day)
return datetime_to_utc_timestamp_in_millis(timestamp_datetime)
Expand All @@ -19,19 +22,23 @@ def datetime_to_utc_timestamp_in_millis(timestamp: datetime) -> int:
return int(timestamp.replace(tzinfo=timezone.utc).timestamp() * 1000)


LAST_UPDATED_TIMESTAMP_KEY = 'last_activity_fetched_timestamp'


class ParameterStoreAdapter:

def __init__(self):
self._parameter_name: str = f'/{os.getenv("STACK_NAME")}/napari-hub/data-workflows/config'
self._ssm_client = boto3.client('ssm')
self._parameter_name: str = (
f'/{get_required_env("STACK_NAME")}/napari-hub/data-workflows/config'
)
self._ssm_client = boto3.client("ssm")

def get_last_updated_timestamp(self) -> int:
response = self._ssm_client.get_parameter(Name=self._parameter_name, WithDecryption=True)
return json.loads(response['Parameter']['Value']).get(LAST_UPDATED_TIMESTAMP_KEY)
response = self._ssm_client.get_parameter(
Name=self._parameter_name, WithDecryption=True
)
return json.loads(response["Parameter"]["Value"]).get(
LAST_UPDATED_TIMESTAMP_KEY
)

def set_last_updated_timestamp(self, timestamp) -> None:
value = json.dumps({LAST_UPDATED_TIMESTAMP_KEY: timestamp})
self._ssm_client.put_parameter(Name=self._parameter_name, Value=value, Overwrite=True, Type='SecureString')
self._ssm_client.put_parameter(
Name=self._parameter_name, Value=value, Overwrite=True, Type="SecureString"
)