Skip to content

Commit

Permalink
Cleaning data-workflow handler (#986)
Browse files Browse the repository at this point in the history
* Cleaning data-workflow handler

* Updating for rebase
  • Loading branch information
manasaV3 authored May 8, 2023
1 parent aa955a2 commit c841ab5
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 84 deletions.
60 changes: 45 additions & 15 deletions data-workflows/activity/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,66 @@
import activity.install_activity_model as install_model
from activity.github_activity_model import GitHubActivityType
import activity.github_activity_model as github_model
import activity.snowflake_adapter as snowflake_adapter
import activity.snowflake_adapter as snowflake
from utils.utils import ParameterStoreAdapter
import utils.utils

LOGGER = logging.getLogger()


def _fetch_install_data_and_write_to_dynamo(data: dict[str, datetime], install_activity_type: InstallActivityType):
plugin_install_data = snowflake_adapter.get_plugins_install_count_since_timestamp(data, install_activity_type)
install_model.transform_and_write_to_dynamo(plugin_install_data, install_activity_type)
def _fetch_install_data_and_write_to_dynamo(
data: dict[str, datetime], install_activity_type: InstallActivityType
) -> None:
plugin_install_data = snowflake.get_plugins_install_count_since_timestamp(
data, install_activity_type
)
install_model.transform_and_write_to_dynamo(
plugin_install_data, install_activity_type
)


def _fetch_github_data_and_write_to_dynamo(data: dict[str, datetime], github_activity_type: GitHubActivityType):
plugin_commit_data = snowflake_adapter.get_plugins_commit_count_since_timestamp(data, github_activity_type)
github_model.transform_and_write_to_dynamo(plugin_commit_data, github_activity_type)
def _fetch_github_data_and_write_to_dynamo(
data: dict[str, datetime], github_activity_type: GitHubActivityType
) -> None:
plugin_commit_data = snowflake.get_plugins_commit_count_since_timestamp(
data, github_activity_type
)
github_model.transform_and_write_to_dynamo(
plugin_commit_data, github_activity_type
)


def update_install_activity(start_time: int, end_time: int):
updated_plugins = snowflake_adapter.get_plugins_with_installs_in_window(start_time, end_time)
LOGGER.info(f'Plugins with new install activity count={len(updated_plugins)}')
if len(updated_plugins) == 0:
def _update_install_activity(start_time: int, end_time: int) -> None:
updated_plugins = snowflake.get_plugins_with_installs_in_window(
start_time, end_time
)
count = len(updated_plugins)
LOGGER.info(f"Plugins with new install activity count={count}")
if count == 0:
return

_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.DAY)
_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.MONTH)
_fetch_install_data_and_write_to_dynamo(updated_plugins, InstallActivityType.TOTAL)


def update_github_activity(start_time: int, end_time: int):
updated_plugins = snowflake_adapter.get_plugins_with_commits_in_window(start_time, end_time)
LOGGER.info(f'Plugins with new github activity count={len(updated_plugins)}')
if len(updated_plugins) == 0:
def _update_github_activity(start_time: int, end_time: int) -> None:
updated_plugins = snowflake.get_plugins_with_commits_in_window(
start_time, end_time
)
count = len(updated_plugins)
LOGGER.info(f"Plugins with new github activity count={count}")
if count == 0:
return
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.LATEST)
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.MONTH)
_fetch_github_data_and_write_to_dynamo(updated_plugins, GitHubActivityType.TOTAL)


def update_activity() -> None:
parameter_store = ParameterStoreAdapter()
last_updated_timestamp = parameter_store.get_last_updated_timestamp()
current_timestamp = utils.utils.get_current_timestamp()
_update_install_activity(last_updated_timestamp, current_timestamp)
_update_github_activity(last_updated_timestamp, current_timestamp)
parameter_store.set_last_updated_timestamp(current_timestamp)
106 changes: 85 additions & 21 deletions data-workflows/activity/tests/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,110 @@

import pytest

import activity.install_activity_model
import activity.snowflake_adapter
import activity.install_activity_model as activity_iam
import activity.github_activity_model as activity_gam
import activity.snowflake_adapter as snowflake
import activity.processor as processor
from activity.install_activity_model import InstallActivityType
from activity.github_activity_model import GitHubActivityType
import utils.utils as util
from utils.utils import ParameterStoreAdapter

START_TIME = 1234567
END_TIME = 1239876

MOCK_DATA = {'foo': datetime.now()}
plugins_with_installs_in_window = {
InstallActivityType.DAY: {'bar': ["data1", "data2"]},
InstallActivityType.MONTH: {'baz': ["data3", "data4"]},
InstallActivityType.TOTAL: {'hap': ["data5", "data6"]},
MOCK_DATA = {"foo": datetime.now()}
PLUGINS_WITH_INSTALLS_IN_WINDOW = {
InstallActivityType.DAY: {"bari": ["data1i", "data2i"]},
InstallActivityType.MONTH: {"bazi": ["data3i", "data4i"]},
InstallActivityType.TOTAL: {"hapi": ["data5i"]},
}
PLUGINS_WITH_COMMITS_IN_WINDOW = {
GitHubActivityType.LATEST: {"barg": ["data1g"]},
GitHubActivityType.MONTH: {"bazg": ["data3g", "data4g"]},
GitHubActivityType.TOTAL: {"hapg": ["data5g"]},
}


class TestActivityProcessor:
def _verify_default(self):
self._parameter_store.set_last_updated_timestamp \
.assert_called_once_with(END_TIME)

@staticmethod
def _setup_snowflake_response(monkeypatch, data):
monkeypatch.setattr(
snowflake, "get_plugins_with_installs_in_window", lambda _, __: data
)
monkeypatch.setattr(
snowflake,
"get_plugins_install_count_since_timestamp",
lambda _, iat: PLUGINS_WITH_INSTALLS_IN_WINDOW.get(iat),
)
monkeypatch.setattr(
snowflake, "get_plugins_with_commits_in_window", lambda _, __: data
)
monkeypatch.setattr(
snowflake,
"get_plugins_commit_count_since_timestamp",
lambda _, iat: PLUGINS_WITH_COMMITS_IN_WINDOW.get(iat),
)

@pytest.fixture(autouse=True)
def _setup_parameter_store_adapter(self, monkeypatch):
self._parameter_store = Mock(
spec=ParameterStoreAdapter,
get_last_updated_timestamp=lambda: START_TIME,
)
monkeypatch.setattr(processor, "ParameterStoreAdapter",
lambda: self._parameter_store)

yield

self._verify_default()

@pytest.fixture(autouse=True)
def setup_method(self, monkeypatch):
monkeypatch.setattr(
activity.snowflake_adapter, 'get_plugins_install_count_since_timestamp',
lambda _, iat: plugins_with_installs_in_window.get(iat)
monkeypatch.setattr(util, "get_current_timestamp", lambda: END_TIME)
self._install_transform_and_write_mock = Mock(
spec=activity_iam.transform_and_write_to_dynamo
)
self._commits_transform_and_write_mock = Mock(
spec=activity_gam.transform_and_write_to_dynamo
)
self._mock = Mock()
monkeypatch.setattr(activity.install_activity_model, 'transform_and_write_to_dynamo', self._mock)

def test_update_install_activity_with_new_updates(self, monkeypatch):
monkeypatch.setattr(activity.snowflake_adapter, 'get_plugins_with_installs_in_window', lambda _, __: MOCK_DATA)
self._setup_snowflake_response(monkeypatch, MOCK_DATA)

from activity.processor import update_install_activity
update_install_activity(START_TIME, END_TIME)
monkeypatch.setattr(
activity_iam, "transform_and_write_to_dynamo",
self._install_transform_and_write_mock,
)
monkeypatch.setattr(
activity_gam, "transform_and_write_to_dynamo",
self._commits_transform_and_write_mock,
)

assert self._mock.call_count == 3
from activity.processor import update_activity
update_activity()

assert self._install_transform_and_write_mock.call_count == 3
for iat in InstallActivityType:
self._mock.assert_any_call(plugins_with_installs_in_window[iat], iat)
self._install_transform_and_write_mock.assert_any_call(
PLUGINS_WITH_INSTALLS_IN_WINDOW[iat], iat
)
assert self._commits_transform_and_write_mock.call_count == 3
for gat in GitHubActivityType:
self._commits_transform_and_write_mock.assert_any_call(
PLUGINS_WITH_COMMITS_IN_WINDOW[gat], gat
)

def test_update_install_activity_with_no_new_updates(self, monkeypatch):
monkeypatch.setattr(activity.snowflake_adapter, 'get_plugins_with_installs_in_window', lambda _, __: [])
self._setup_snowflake_response(monkeypatch, [])

from activity.processor import update_activity

from activity.processor import update_install_activity
update_install_activity(START_TIME, END_TIME)
update_activity()

assert self._mock.call_count == 0
assert self._install_transform_and_write_mock.call_count == 0
assert self._commits_transform_and_write_mock.call_count == 0
28 changes: 10 additions & 18 deletions data-workflows/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,23 @@
import logging

import activity.processor
from utils.utils import ParameterStoreAdapter
import utils.utils


def _setup_logging():
logger = logging.getLogger()
logger.setLevel(logging.INFO)
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def _update_activity() -> None:
parameter_store_adapter = ParameterStoreAdapter()
last_updated_timestamp = parameter_store_adapter.get_last_updated_timestamp()
current_timestamp = utils.utils.get_current_timestamp()
activity.processor.update_install_activity(last_updated_timestamp, current_timestamp)
activity.processor.update_github_activity(last_updated_timestamp, current_timestamp)
parameter_store_adapter.set_last_updated_timestamp(current_timestamp)


def handle(event, context):
_setup_logging()
def handle(event, context) -> None:

for record in event.get('Records', []):
if 'body' not in record:
continue
event_type = json.loads(record.get('body')).get('type')

body = record.get('body')
LOGGER.info(f'Received message with body: {body}')
event_type = json.loads(body).get('type', '').lower()

# TODO: Create a dict for event_type by method to be called
if event_type == 'activity':
_update_activity()
activity.processor.update_activity()
LOGGER.info(f'Update successful for type={event_type}')
52 changes: 22 additions & 30 deletions data-workflows/tests/test_handler.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,42 @@
from json import JSONDecodeError
from typing import Dict
from unittest.mock import Mock

import pytest

import activity.processor
import handler
import utils.utils

CURRENT_TIMESTAMP = 987654321
LAST_UPDATED_TIMESTAMP = 1234565789


class TestHandle:

@pytest.fixture(autouse=True)
def _setup(self, monkeypatch):
self._parameter_store_adapter_call = Mock()
monkeypatch.setattr(handler, 'ParameterStoreAdapter', self._parameter_store_adapter_call)
self._update_install_activity_call = Mock()
monkeypatch.setattr(activity.processor, 'update_install_activity', self._update_install_activity_call)
self._update_github_activity_call = Mock()
monkeypatch.setattr(activity.processor, 'update_github_activity', self._update_github_activity_call)

def _verify(self, call_count):
assert self._parameter_store_adapter_call.call_count == call_count
assert self._update_install_activity_call.call_count == call_count
assert self._update_github_activity_call.call_count == call_count

def test_handle_valid_activity_event(self, monkeypatch):
parameter_store_adapter = Mock(get_last_updated_timestamp=lambda: LAST_UPDATED_TIMESTAMP)
self._parameter_store_adapter_call.return_value = parameter_store_adapter
monkeypatch.setattr(utils.utils, 'get_current_timestamp', lambda: CURRENT_TIMESTAMP)
def setup(self, monkeypatch):
self._update_activity = Mock(spec=activity.processor.update_activity)
monkeypatch.setattr(activity.processor, 'update_activity', self._update_activity)

def _verify(self, activity_call_count: int = 0):
assert self._update_activity.call_count == activity_call_count

@pytest.mark.parametrize('event_type', ["Activity", "AcTiviTy", "ACTIVITY"])
def test_handle_event_type_in_different_case(self, event_type: str):
from handler import handle
handle({'Records': [{'body': '{"type":"activity"}'}, {'body': '{"type":"bar"}'}]}, None)
handle({'Records': [{'body': '{"type":"activity"}'}]}, None)

self._verify(1)
self._update_install_activity_call.assert_called_once_with(LAST_UPDATED_TIMESTAMP, CURRENT_TIMESTAMP)
self._update_github_activity_call.assert_called_once_with(LAST_UPDATED_TIMESTAMP, CURRENT_TIMESTAMP)
parameter_store_adapter.set_last_updated_timestamp.assert_called_once_with(CURRENT_TIMESTAMP)
self._verify(activity_call_count=1)

def test_handle_activity_event_type(self):
from handler import handle
handle({'Records': [
{'body': '{"type":"activity"}'},
{'body': '{"type":"bar"}'}
]}, None)
self._verify(activity_call_count=1)

def test_handle_invalid_json(self):
with pytest.raises(JSONDecodeError):
from handler import handle
handle({'Records': [{'body': '{"type:"activity"}'}]}, None)
self._verify(0)
self._verify()

@pytest.mark.parametrize('event', [
({'Records': [{'body': '{"type":"foo"}'}, {'body': '{"type":"bar"}'}]}),
Expand All @@ -53,7 +45,7 @@ def test_handle_invalid_json(self):
({'Records': []}),
({}),
])
def test_handle_invalid_event(self, event):
def test_handle_invalid_event(self, event: Dict):
from handler import handle
handle(event, None)
self._verify(0)
self._verify()

0 comments on commit c841ab5

Please sign in to comment.