Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data-workflows maintenance data fetch cleanup #1148

Merged
merged 9 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .happy/terraform/modules/ecs-stack/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,6 @@ data aws_iam_policy_document backend_policy {
}

data aws_iam_policy_document data_workflows_policy {
statement {
actions = ["s3:GetObject",]
resources = ["${local.data_bucket_arn}/*"]
}
statement {
actions = [
"dynamodb:BatchWriteItem",
Expand Down
102 changes: 65 additions & 37 deletions data-workflows/activity/github_activity_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,64 @@
import time
from datetime import datetime
from enum import Enum, auto
from typing import List, Union
from typing import Union, Optional
import os

from pynamodb.models import Model
from pynamodb.attributes import UnicodeAttribute, NumberAttribute
from nhcommons.utils.time import get_current_timestamp
from utils.utils import date_to_utc_timestamp_in_millis, datetime_to_utc_timestamp_in_millis
from plugin.helpers import _get_repo_to_plugin_dict
from utils.utils import (
date_to_utc_timestamp_in_millis, datetime_to_utc_timestamp_in_millis
)


LOGGER = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
TIMESTAMP_FORMAT = "TO_TIMESTAMP('{0:%Y-%m-%d %H:%M:%S}')"


class GitHubActivityType(Enum):
def __new__(cls, timestamp_formatter, type_identifier_formatter, query_projection, query_sorting):
def __new__(cls, timestamp_formatter, type_id_formatter, projection, sort):
github_activity_type = object.__new__(cls)
github_activity_type._value = auto()
github_activity_type.timestamp_formatter = timestamp_formatter
github_activity_type.type_identifier_formatter = type_identifier_formatter
github_activity_type.query_projection = query_projection
github_activity_type.query_sorting = query_sorting
github_activity_type.type_identifier_formatter = type_id_formatter
github_activity_type.query_projection = projection
github_activity_type.query_sorting = sort
return github_activity_type

LATEST = (datetime_to_utc_timestamp_in_millis, 'LATEST:{0}',
'repo AS name, TO_TIMESTAMP(MAX(commit_author_date)) AS latest_commit', 'name')
MONTH = (date_to_utc_timestamp_in_millis, 'MONTH:{1:%Y%m}:{0}',
'repo AS name, DATE_TRUNC("month", TO_DATE(commit_author_date)) AS month, COUNT(*) AS commit_count',
'name, month')
TOTAL = (lambda timestamp: None, 'TOTAL:{0}', 'repo AS name, COUNT(*) AS commit_count', 'name')
LATEST = (
datetime_to_utc_timestamp_in_millis,
"LATEST:{repo}",
"TO_TIMESTAMP(MAX(commit_author_date)) AS latest_commit",
"name"
)
MONTH = (
date_to_utc_timestamp_in_millis,
"MONTH:{timestamp:%Y%m}:{repo}",
"DATE_TRUNC('month', TO_DATE(commit_author_date)) AS month, "
"COUNT(*) AS commit_count",
"name, month"
)
TOTAL = (
lambda timestamp: None,
"TOTAL:{repo}",
"COUNT(*) AS commit_count",
"name"
)

def format_to_timestamp(self, timestamp: datetime) -> Union[int, None]:
return self.timestamp_formatter(timestamp)

def format_to_type_identifier(self, repo_name: str, identifier_timestamp: str) -> str:
return self.type_identifier_formatter.format(repo_name, identifier_timestamp)
def format_to_type_identifier(self,
repo_name: str,
timestamp: Optional[datetime]) -> str:
return self.type_identifier_formatter.format(
repo=repo_name, timestamp=timestamp
)

def _create_subquery(self, plugins_by_earliest_ts: dict[str, datetime]) -> str:
def _create_subquery(
self, plugins_by_earliest_ts: dict[str, datetime]
) -> str:
if self is GitHubActivityType.MONTH:
return " OR ".join(
[
Expand All @@ -48,17 +68,19 @@ def _create_subquery(self, plugins_by_earliest_ts: dict[str, datetime]) -> str:
for name, ts in plugins_by_earliest_ts.items()
]
)
return f"""repo IN ({','.join([f"'{plugin}'" for plugin in plugins_by_earliest_ts.keys()])})"""
plugins = [f"'{plugin}'" for plugin in plugins_by_earliest_ts.keys()]
return f"repo IN ({','.join(plugins)})"

def get_query(self, plugins_by_earliest_ts: dict[str, datetime]) -> str:
return f"""
SELECT
SELECT
repo AS name,
{self.query_projection}
FROM
imaging.github.commits
WHERE
repo_type = 'plugin'
AND {self._create_subquery(plugins_by_earliest_ts)}
AND ({self._create_subquery(plugins_by_earliest_ts)})
GROUP BY {self.query_sorting}
ORDER BY {self.query_sorting}
"""
Expand Down Expand Up @@ -91,40 +113,46 @@ def __eq__(self, other):
return False


def transform_and_write_to_dynamo(data: dict[str, List], activity_type: GitHubActivityType) -> None:
"""Transforms plugin commit data generated by get_plugins_commit_count_since_timestamp to the expected format
and then writes the formatted data to the corresponding github-activity dynamo table in each environment
:param dict[str, list] data: plugin commit data of type dictionary in which the key is plugin name
of type str and the value is Github activities of type list
def transform_and_write_to_dynamo(data: dict[str, list],
activity_type: GitHubActivityType,
plugin_name_by_repo: dict[str, str]) -> None:
"""Transforms data generated by get_plugins_commit_count_since_timestamp to
the expected format and then writes the formatted data to the corresponding
github-activity dynamo table in each environment
:param dict[str, list] data: plugin commit data in which the key is plugin
name and the value is GitHub activities
:param GitHubActivityType activity_type:
:param dict[str, str] plugin_name_by_repo: dict mapping repo to plugin name
"""
LOGGER.info(f'Starting item creation for github-activity type={activity_type.name}')
granularity = activity_type.name
logger.info(f"Starting for github-activity type={granularity}")

batch = GitHubActivity.batch_write()

start = time.perf_counter()
count = 0
repo_to_plugin_dict = _get_repo_to_plugin_dict()

for repo, github_activities in data.items():
plugin_name = repo_to_plugin_dict.get(repo)
plugin_name = plugin_name_by_repo.get(repo)
if plugin_name is None:
logger.warning(f"Unable to find plugin name for repo={repo}")
continue
for activity in github_activities:
identifier_timestamp = activity.get('timestamp', '')
timestamp = activity.get('timestamp')
commit_count = activity.get('count')
timestamp = activity.get("timestamp")
commit_count = activity.get("count")
item = GitHubActivity(
plugin_name,
activity_type.format_to_type_identifier(repo, identifier_timestamp),
granularity=activity_type.name,
plugin_name.lower(),
activity_type.format_to_type_identifier(repo, timestamp),
granularity=granularity,
timestamp=activity_type.format_to_timestamp(timestamp),
commit_count=commit_count,
repo=repo)
repo=repo
)
batch.save(item)
count += 1

batch.commit()
duration = (time.perf_counter() - start) * 1000

LOGGER.info(f'Items github-activity type={activity_type.name} count={count}')
LOGGER.info(f'Transform and write to github-activity type={activity_type.name} timeTaken={duration}ms')
logger.info(f"Completed processing for github-activity type={granularity} "
f"count={count} timeTaken={duration}ms")
33 changes: 18 additions & 15 deletions data-workflows/activity/install_activity_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,32 @@
from nhcommons.utils.time import get_current_timestamp
from utils.utils import datetime_to_utc_timestamp_in_millis

LOGGER = logging.getLogger(__name__)
logger = logging.getLogger(__name__)


class InstallActivityType(Enum):

def __new__(cls, timestamp_formatter, type_timestamp_formatter):
def __new__(cls, timestamp_formatter, type_timestamp_format):
install_activity_type = object.__new__(cls)
install_activity_type._value_ = auto()
install_activity_type.timestamp_formatter = timestamp_formatter
install_activity_type.type_timestamp_formatter = type_timestamp_formatter
install_activity_type.type_timestamp_format = type_timestamp_format
return install_activity_type

DAY = (datetime_to_utc_timestamp_in_millis, 'DAY:{0:%Y%m%d}')
MONTH = (datetime_to_utc_timestamp_in_millis, 'MONTH:{0:%Y%m}')
TOTAL = (lambda timestamp: None, 'TOTAL:')
DAY = (datetime_to_utc_timestamp_in_millis, "DAY:{0:%Y%m%d}")
MONTH = (datetime_to_utc_timestamp_in_millis, "MONTH:{0:%Y%m}")
TOTAL = (lambda timestamp: None, "TOTAL:")

def format_to_timestamp(self, timestamp: datetime) -> Union[int, None]:
return self.timestamp_formatter(timestamp)

def format_to_type_timestamp(self, timestamp: datetime) -> str:
return self.type_timestamp_formatter.format(timestamp)
return self.type_timestamp_format.format(timestamp)

def get_query_timestamp_projection(self) -> str:
return '1' if self is InstallActivityType.TOTAL else f"DATE_TRUNC('{self.name}', timestamp)"
if self is InstallActivityType.TOTAL:
return "1"
return f"DATE_TRUNC('{self.name}', timestamp)"


class InstallActivity(Model):
Expand Down Expand Up @@ -61,21 +63,22 @@ def __eq__(self, other):

def transform_and_write_to_dynamo(data: dict[str, List],
activity_type: InstallActivityType) -> None:
LOGGER.info(f'Starting item creation for install-activity type={activity_type.name}')
granularity = activity_type.name
logger.info(f"Starting for install-activity type={granularity}")
batch = InstallActivity.batch_write()
count = 0
is_total = 'true' if activity_type is InstallActivityType.TOTAL else None
is_total = "true" if activity_type is InstallActivityType.TOTAL else None
start = time.perf_counter()
for plugin_name, install_activities in data.items():
for activity in install_activities:
timestamp = activity['timestamp']
timestamp = activity["timestamp"]

item = InstallActivity(
plugin_name=plugin_name.lower(),
type_timestamp=activity_type.format_to_type_timestamp(timestamp),
granularity=activity_type.name,
granularity=granularity,
timestamp=activity_type.format_to_timestamp(timestamp),
install_count=activity['count'],
install_count=activity["count"],
is_total=is_total,
)
batch.save(item)
Expand All @@ -84,5 +87,5 @@ def transform_and_write_to_dynamo(data: dict[str, List],
batch.commit()
duration = (time.perf_counter() - start) * 1000

LOGGER.info(f'Items install-activity type={activity_type.name} count={count}')
LOGGER.info(f'Transform and write to install-activity type={activity_type.name} timeTaken={duration}ms')
logger.info(f"Completed processing for install-activity type={granularity} "
f"count={count} timeTaken={duration}ms")
25 changes: 16 additions & 9 deletions data-workflows/activity/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
import activity.snowflake_adapter as snowflake
from utils.utils import ParameterStoreAdapter
import nhcommons
from nhcommons.models.plugin import get_plugin_name_by_repo

LOGGER = logging.getLogger(__name__)


def _fetch_install_data_and_write_to_dynamo(
data: dict[str, datetime], install_activity_type: InstallActivityType
data: dict[str, datetime], install_activity_type: InstallActivityType
) -> None:
plugin_install_data = snowflake.get_plugins_install_count_since_timestamp(
data, install_activity_type
Expand All @@ -24,13 +25,15 @@ def _fetch_install_data_and_write_to_dynamo(


def _fetch_github_data_and_write_to_dynamo(
data: dict[str, datetime], github_activity_type: GitHubActivityType
data: dict[str, datetime],
github_activity_type: GitHubActivityType,
plugin_name_by_repo: dict[str, str]
) -> None:
plugin_commit_data = snowflake.get_plugins_commit_count_since_timestamp(
data, github_activity_type
)
github_model.transform_and_write_to_dynamo(
plugin_commit_data, github_activity_type
plugin_commit_data, github_activity_type, plugin_name_by_repo
)


Expand All @@ -43,9 +46,10 @@ def _update_install_activity(start_time: int, end_time: int) -> None:
if count == 0:
return

_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.DAY)
_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.MONTH)
_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.TOTAL)
for install_activity_type in InstallActivityType:
_fetch_install_data_and_write_to_dynamo(
updated_plugins, install_activity_type
)


def _update_github_activity(start_time: int, end_time: int) -> None:
Expand All @@ -56,9 +60,12 @@ def _update_github_activity(start_time: int, end_time: int) -> None:
LOGGER.info(f"Plugins with new github activity count={count}")
if count == 0:
return
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.LATEST)
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.MONTH)
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.TOTAL)

plugin_name_by_repo = get_plugin_name_by_repo()
for github_activity_type in GitHubActivityType:
_fetch_github_data_and_write_to_dynamo(
updated_plugins, github_activity_type, plugin_name_by_repo
)


def update_activity() -> None:
Expand Down
Loading