Skip to content

Commit

Permalink
Address code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
codemonkey800 committed Mar 29, 2023
1 parent 4c163f7 commit 542c6fd
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 96 deletions.
13 changes: 13 additions & 0 deletions data-workflows/activity/update_activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import activity.processor

from utils.utils import ParameterStoreAdapter, get_current_timestamp


def update_activity() -> None:
parameter_store_adapter = ParameterStoreAdapter()
last_updated_timestamp = parameter_store_adapter.get_last_updated_timestamp()
current_timestamp = get_current_timestamp()
activity.processor.update_install_activity(
last_updated_timestamp, current_timestamp
)
parameter_store_adapter.set_last_updated_timestamp(current_timestamp)
91 changes: 56 additions & 35 deletions data-workflows/categories.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,76 @@
import os
import time
import hashlib
import logging

from pynamodb.attributes import ListAttribute, NumberAttribute, UnicodeAttribute
from pynamodb.models import Model
from slugify import slugify
from typing import Dict
from utils.utils import S3Client, get_current_timestamp
from utils.env import get_required_env
from utils.s3 import S3Client
from utils.utils import get_current_timestamp

STACK_NAME = get_required_env("STACK_NAME")

def get_category_model(table: str):
class CategoryModel(Model):
class Meta:
region = os.environ.get('AWS_REGION', 'us-west-2')
table_name = table
LOGGER = logging.getLogger()

name = UnicodeAttribute(hash_key=True)
version_hash = UnicodeAttribute(range_key=True)
version = UnicodeAttribute()
formatted_name = UnicodeAttribute()
dimension = UnicodeAttribute()
hierarchy = ListAttribute() # List[str]
label = UnicodeAttribute()
last_updated_timestamp = NumberAttribute(default_for_new=get_current_timestamp)

return CategoryModel
class CategoryModel(Model):
class Meta:
region = os.environ.get("AWS_REGION", "us-west-2")
table_name = f"{STACK_NAME}-category"

name = UnicodeAttribute(hash_key=True)
version_hash = UnicodeAttribute(range_key=True)
version = UnicodeAttribute()
formatted_name = UnicodeAttribute()
dimension = UnicodeAttribute()
hierarchy = ListAttribute() # List[str]
label = UnicodeAttribute()
last_updated_timestamp = NumberAttribute(default_for_new=get_current_timestamp)

def hash_category(category: Dict[str, str]) -> str:
label = category.get('label', '')
dimension = category.get('dimension')
hierarchy = category.get('hierarchy', [])

h = hashlib.new('md5')
h.update(label.encode('utf-8'))
h.update(dimension.encode('utf-8'))
def _hash_category(category: Dict[str, str]) -> str:
"""
Hashes a category object using the MD5 hash algorithm. This works by
creating a hash from the string and string array fields in the category
object.
"""

label = category.get("label", "")
dimension = category.get("dimension")
hierarchy = category.get("hierarchy", [])

category_hash = hashlib.new("md5")
category_hash.update(label.encode("utf-8"))
category_hash.update(dimension.encode("utf-8"))

for value in hierarchy:
h.update(value.encode('utf-8'))
category_hash.update(value.encode("utf-8"))

return category_hash.hexdigest()


def run_seed_s3_categories_workflow(edam_version: str, s3_path: str):
"""
Runs data workflow for populating the category dynamo table from an S3
source on the same depoyment stack.
"""

return h.hexdigest()
bucket = get_required_env("BUCKET")

def run_seed_s3_categories_workflow(table: str, bucket: str, edam_version: str):
print(f'Seeding category data from S3 version={edam_version} bucket={bucket} table={table}')
edam_name, version = edam_version.split(':')
s3_path = f'category/{edam_name}/{version}.json'
LOGGER.info(
f"Seeding {edam_version} category data from S3 "
f"s3_path={s3_path} bucket={bucket} table={CategoryModel.Meta.table_name}"
)

client = S3Client(bucket)
client = S3Client(
bucket=bucket,
prefix="" if STACK_NAME in ("prod", "staging") else STACK_NAME,
)
data = client.load_json_from_s3(s3_path)

CategoryModel = get_category_model(table)
batch = CategoryModel.batch_write()
start = time.perf_counter()
count = 0
Expand All @@ -58,17 +79,17 @@ def run_seed_s3_categories_workflow(table: str, bucket: str, edam_version: str):
for category in categories:
item = CategoryModel(
name=slugify(name),
version_hash=f'{edam_version}:{hash_category(category)}',
version_hash=f"{edam_version}:{_hash_category(category)}",
version=edam_version,
formatted_name=name,
dimension=category.get('dimension', ''),
hierarchy=category.get('hierarchy', []),
label=category.get('label', ''),
dimension=category.get("dimension", ""),
hierarchy=category.get("hierarchy", []),
label=category.get("label", ""),
)
batch.save(item)
count += 1

batch.commit()
duration = (time.perf_counter() - start) * 1000

print(f'Finished seeding category data count={count} duration={duration}ms')
LOGGER.info(f"Finished seeding category data count={count} duration={duration}ms")
21 changes: 11 additions & 10 deletions data-workflows/handler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import json
import logging

import activity.processor
from utils.utils import ParameterStoreAdapter
import utils.utils
from run_workflow import run_workflow

from utils.utils import ParameterStoreAdapter, get_current_timestamp


def _setup_logging():
Expand All @@ -14,18 +14,19 @@ def _setup_logging():
def _update_activity() -> None:
parameter_store_adapter = ParameterStoreAdapter()
last_updated_timestamp = parameter_store_adapter.get_last_updated_timestamp()
current_timestamp = utils.utils.get_current_timestamp()
activity.processor.update_install_activity(last_updated_timestamp, current_timestamp)
current_timestamp = get_current_timestamp()
activity.processor.update_install_activity(
last_updated_timestamp, current_timestamp
)
parameter_store_adapter.set_last_updated_timestamp(current_timestamp)


def handle(event, context):
_setup_logging()

for record in event.get('Records', []):
if 'body' not in record:
for record in event.get("Records", []):
if "body" not in record:
continue
event_type = json.loads(record.get('body')).get('type')

if event_type == 'activity':
_update_activity()
payload = json.loads(record.get("body"))
run_workflow(payload)
1 change: 1 addition & 0 deletions data-workflows/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
snowflake-connector-python==3.0.0
boto3==1.26.77
pynamodb==5.4.1
python-slugify==8.0.1
58 changes: 39 additions & 19 deletions data-workflows/run_workflow.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,59 @@
"""
Module containing functionality for running workflows. This can be run either as
a standalone CLI script or by importing the function `run_workflow()` and
passing the correct event payload information to the function. This allows us to
be able to run the workflow in a consistently in different environments like
local CLI, GitHub actions, or in an AWS lambda.
"""

import argparse
import os

from activity.update_activity import update_activity
from categories import run_seed_s3_categories_workflow
from typing import Dict


def run_workflow(event: Dict):
type = event['type']
"""
Function for running a particular data workflow based on the event type. The
event dictionary should contain all of the necessary data to run the
specified data workflow.
"""

type = event["type"]

if type == 'seed-s3-categories':
if type == "activity":
update_activity()

if type == "seed-s3-categories":
run_seed_s3_categories_workflow(
event['table'],
event['bucket'],
event['edam_version'],
event["edam_version"],
event["s3_path"],
)


def get_arg_parser():
def _get_arg_parser():
parser = argparse.ArgumentParser(
prog='run-workflow',
description='CLI for running hub data workflows',
prog="run-workflow",
description="CLI for running hub data workflows",
)
subparsers = parser.add_subparsers(required=True, dest='type')
subparsers = parser.add_subparsers(required=True, dest="type")

seed_s3_categories_parser = subparsers.add_parser('seed-s3-categories', help='categories help')
seed_s3_categories_parser.add_argument('--table', required=True)
seed_s3_categories_parser.add_argument('--bucket', required=True)
seed_s3_categories_parser.add_argument('--edam-version', required=True)
seed_s3_categories_parser = subparsers.add_parser(
"seed-s3-categories", help="categories help"
)
seed_s3_categories_parser.add_argument("--edam-version", required=True)
seed_s3_categories_parser.add_argument("--s3-path", required=True)

subparsers.add_parser("activity", help="activity help")

return parser

def main():
parser = get_arg_parser()

def _main():
parser = _get_arg_parser()
run_workflow(vars(parser.parse_args()))
run_workflow

if __name__ == '__main__':
main()

if __name__ == "__main__":
_main()
16 changes: 16 additions & 0 deletions data-workflows/utils/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os


def get_required_env(key: str) -> str:
"""
Utility for getting the value of an environment variable with the additional
constraint that it must be defined, otherwise the application will be unable
to run without it.
"""

value = os.environ.get(key)

if not value:
raise ValueError(f"Required environment variable '{key}' is not defined")

return value
42 changes: 42 additions & 0 deletions data-workflows/utils/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import boto3
import json
import logging

from os import path
from typing import Any, Dict


class S3Client:
"""
Client for accessing S3 resources.
"""

_bucket: str
_prefix: str
_client: Any

def __init__(self, bucket: str, prefix=""):
self._bucket = bucket
self._prefix = prefix
self._client = boto3.client("s3")

def _get_complete_path(self, s3_path: str):
return path.join(self._prefix, s3_path)

def _get_from_s3(self, s3_path):
obj = self._client.get_object(
Bucket=self._bucket, Key=self._get_complete_path(s3_path)
)

return obj["Body"].read().decode("utf-8")

def load_json_from_s3(self, s3_path: str) -> Dict:
"""
Load JSON file from S3 path and convert to a Python dictionary.
"""

try:
return json.loads(self._get_from_s3(s3_path))
except Exception as e:
logging.error(e)
return {}
34 changes: 2 additions & 32 deletions data-workflows/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import boto3
import json
import logging
import time

from os import path
from typing import Any, Dict
from .env import get_required_env


def get_current_timestamp() -> int:
Expand All @@ -17,7 +15,7 @@ def get_current_timestamp() -> int:
class ParameterStoreAdapter:
def __init__(self):
self._parameter_name: str = (
f'/{os.getenv("STACK_NAME")}/napari-hub/data-workflows/config'
f'/{get_required_env("STACK_NAME")}/napari-hub/data-workflows/config'
)
self._ssm_client = boto3.client("ssm")

Expand All @@ -34,31 +32,3 @@ def set_last_updated_timestamp(self, timestamp) -> None:
self._ssm_client.put_parameter(
Name=self._parameter_name, Value=value, Overwrite=True, Type="SecureString"
)


class S3Client:
bucket: str
prefix: str
client: Any

def __init__(self, bucket: str, prefix=""):
self.bucket = bucket
self.prefix = prefix
self.client = boto3.client("s3")

def _get_complete_path(self, s3_path: str):
return path.join(self.prefix, s3_path)

def _get_from_s3(self, s3_path):
obj = self.client.get_object(
Bucket=self.bucket, Key=self._get_complete_path(s3_path)
)

return obj["Body"].read().decode("utf-8")

def load_json_from_s3(self, s3_path: str) -> Dict:
try:
return json.loads(self._get_from_s3(s3_path))
except Exception as e:
logging.error(e)
return {}

0 comments on commit 542c6fd

Please sign in to comment.