Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mohammad-alisafaee committed Jun 27, 2022
1 parent c34546f commit a1fa5a0
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 154 deletions.
2 changes: 0 additions & 2 deletions renku/command/checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from .external import check_missing_external_files
from .githooks import check_git_hooks_installed
from .migration import check_migration
from .plan import check_deleted_plan_chain
from .storage import check_lfs_info
from .validate_shacl import check_datasets_structure, check_project_structure

Expand All @@ -31,7 +30,6 @@
__all__ = (
"check_dataset_old_metadata_location",
"check_datasets_structure",
"check_deleted_plan_chain",
"check_git_hooks_installed",
"check_invalid_datasets_derivation",
"check_lfs_info",
Expand Down
55 changes: 0 additions & 55 deletions renku/command/checks/plan.py

This file was deleted.

3 changes: 2 additions & 1 deletion renku/command/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from renku.core.workflow.activity import (
get_all_modified_and_deleted_activities_and_entities,
get_downstream_generating_activities,
is_activity_valid,
sort_activities,
)
from renku.core.workflow.concrete_execution_graph import ExecutionGraph
Expand Down Expand Up @@ -61,7 +62,7 @@ def _update(
paths = get_relative_paths(base=client.path, paths=[Path.cwd() / p for p in paths])

modified, _ = get_all_modified_and_deleted_activities_and_entities(client.repository)
modified_activities = {a for a, _ in modified if a.is_activity_valid and not a.deleted}
modified_activities = {a for a, _ in modified if not a.deleted and is_activity_valid(a)}
modified_paths = {e.path for _, e in modified}

activities = get_downstream_generating_activities(
Expand Down
22 changes: 18 additions & 4 deletions renku/core/workflow/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from renku.core.interface.client_dispatcher import IClientDispatcher
from renku.core.util import communication
from renku.core.util.datetime8601 import local_now
from renku.core.workflow.plan import get_activities, remove_plan
from renku.core.workflow.plan import get_activities, is_plan_removed, remove_plan
from renku.domain_model.entity import Entity
from renku.domain_model.provenance.activity import Activity

Expand Down Expand Up @@ -315,7 +315,7 @@ def has_an_existing_generation(activity) -> bool:

for chain in downstream_chains:
for activity in chain:
if not activity.is_activity_valid:
if not is_activity_valid(activity):
# don't process further downstream activities as the plan in question was deleted
break
include_newest_activity(activity)
Expand Down Expand Up @@ -467,9 +467,9 @@ def revert_generations(activity) -> Tuple[Set[str], Set[str]]:
path = generation.entity.path

generator_activities = activity_gateway.get_activities_by_generation(path=path)
generator_activities = [a for a in generator_activities if a.is_activity_valid and not a.deleted]
generator_activities = [a for a in generator_activities if is_activity_valid(a) and not a.deleted]
latest_generator = get_latest_activity(generator_activities)
if latest_generator != activity: # NOTE: Another activity already generated the same path
if latest_generator != activity: # NOTE: A newer activity already generated the same path
continue

previous_generator = get_latest_activity_before(generator_activities, activity)
Expand Down Expand Up @@ -514,3 +514,17 @@ def revert_generations(activity) -> Tuple[Set[str], Set[str]]:
activity.delete(when=delete_time)

return activity


@inject.autoparams()
def is_activity_valid(activity: Activity) -> bool:
"""Return whether this plan has not been deleted.
Args:
activity(Activity): The Activity whose Plan should be checked.
Returns:
bool: True if the activities' Plan is still valid, False otherwise.
"""
return not is_plan_removed(plan=activity.association.plan)
49 changes: 37 additions & 12 deletions renku/core/workflow/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,10 @@


@inject.autoparams()
def get_derivative_chain(
plan: Optional[AbstractPlan], plan_gateway: IPlanGateway
) -> Generator[AbstractPlan, None, None]:
"""Return all plans in the derivative chain of a given plan including its parents/children and the plan itself."""
def get_latest_plan(plan: Optional[AbstractPlan], plan_gateway: IPlanGateway) -> Optional[AbstractPlan]:
"""Return the latest version of a given plan in its derivative chain."""
if plan is None:
return
return None

all_plans = plan_gateway.get_all_plans()

Expand All @@ -45,6 +43,19 @@ def get_derivative_chain(
plan = child_plan
child_plan = next((p for p in all_plans if p.derived_from is not None and p.derived_from == plan.id), None)

return plan


@inject.autoparams()
def get_derivative_chain(
plan: Optional[AbstractPlan], plan_gateway: IPlanGateway
) -> Generator[AbstractPlan, None, None]:
"""Return all plans in the derivative chain of a given plan including its parents/children and the plan itself."""
if plan is None:
return

plan = get_latest_plan(plan)

while plan is not None:
yield plan
plan = plan_gateway.get_by_id(plan.derived_from)
Expand All @@ -58,22 +69,28 @@ def remove_plan(name_or_id: str, force: bool, plan_gateway: IPlanGateway, when:
name_or_id (str): The name of the Plan to remove.
force (bool): Whether to force removal or not.
plan_gateway(IPlanGateway): The injected Plan gateway.
when(datetime): Time of deletion (Default value = current local date/time).
Raises:
errors.ParameterError: If the Plan doesn't exist or was already deleted.
"""
plan: Optional[AbstractPlan] = plan_gateway.get_by_name(name_or_id) or plan_gateway.get_by_id(name_or_id)

if not plan:
raise errors.ParameterError(f"The specified workflow '{name_or_id}' cannot be found.")
elif plan.deleted:

latest_version = get_latest_plan(plan)

if latest_version.deleted:
raise errors.ParameterError(f"The specified workflow '{name_or_id}' is already deleted.")

if not force:
prompt_text = f"You are about to remove the following workflow '{name_or_id}'.\n\nDo you wish to continue?"
communication.confirm(prompt_text, abort=True, warning=True)

for derivative_plan in get_derivative_chain(plan):
derivative_plan.delete(when=when)
derived_plan = plan.derive()
derived_plan.delete(when=when)

plan_gateway.add(derived_plan)


@inject.autoparams()
Expand All @@ -98,12 +115,20 @@ def get_activities(plan: Optional[AbstractPlan], activity_gateway: IActivityGate
if plan is None:
return

initial_id = get_initial_id(plan)
derivative_ids = [p.id for p in get_derivative_chain(plan=plan)]

for activity in activity_gateway.get_all_activities():
if not activity.is_activity_valid or activity.deleted:
if activity.deleted:
continue

activity_plan = activity.association.plan
if activity.association.plan is plan or (initial_id == get_initial_id(activity_plan)):
if activity.association.plan.id in derivative_ids:
yield activity


def is_plan_removed(plan: AbstractPlan) -> bool:
"""Return true if the plan or any plan in its derivative chain is deleted."""
for derived_plan in get_derivative_chain(plan):
if derived_plan.deleted:
return True

return False
5 changes: 3 additions & 2 deletions renku/core/workflow/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from renku.core.workflow.activity import (
get_all_modified_and_deleted_activities_and_entities,
get_downstream_generating_activities,
is_activity_valid,
)


Expand Down Expand Up @@ -70,8 +71,8 @@ def mark_generations_as_stale(activity):

modified, deleted = get_all_modified_and_deleted_activities_and_entities(client.repository)

modified = {(a, e) for a, e in modified if a.is_activity_valid}
deleted = {(a, e) for a, e in deleted if a.is_activity_valid}
modified = {(a, e) for a, e in modified if is_activity_valid(a)}
deleted = {(a, e) for a, e in deleted if is_activity_valid(a)}

if not modified and not deleted:
return StatusResult({}, {}, set(), set())
Expand Down
5 changes: 0 additions & 5 deletions renku/domain_model/provenance/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,6 @@ def deleted(self) -> bool:
"""Return if the activity was deleted."""
return self.invalidated_at is not None

@property
def is_activity_valid(self) -> bool:
"""Return if the activity or its plan is not deleted."""
return not self.association.plan.deleted

@staticmethod
def generate_id(uuid: Optional[str] = None) -> str:
"""Generate an identifier for an activity."""
Expand Down
6 changes: 3 additions & 3 deletions renku/ui/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,8 @@
Renku allows you to undo a Run in a project by using ``renku workflow revert
<activity ID>``. You can obtain <activity ID> from the ``renku log`` command.
If the deleted run generated some files, Renku either deleted these files (in
case there are no earlier version of them and they are not used in other
If the deleted run generated some files, Renku either deletes these files (in
case there are no earlier versions of them and they are not used in other
activities) or revert them to their earlier versions. You can ask Renku to keep the
generated files and only delete the metadata by passing the ``--metadata-only``
option.
Expand All @@ -637,7 +637,7 @@
dataset for example. Make sure that the project doesn't use such files in
other places or always use ``--metadata-only`` option when reverting a run.
If you want to delete run along with its plan use the ``--plan`` option.
If you want to delete a run along with its plan use the ``--plan`` option.
This only deletes the plan if it's not used by any other activity.
Renku won't remove a run if there are downstream runs that depend on it. The
Expand Down
33 changes: 33 additions & 0 deletions tests/cli/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from renku.core.plugin.provider import available_workflow_providers
from renku.core.util.yaml import write_yaml
from renku.infrastructure.database import Database
from renku.infrastructure.gateway.activity_gateway import ActivityGateway
from renku.ui.cli import cli
from tests.utils import format_result_exception, write_and_commit_file

Expand Down Expand Up @@ -1208,3 +1209,35 @@ def test_workflow_templated_params(runner, run_shell, client, capsys, workflow,

for o in outputs:
assert Path(o).resolve().exists()


def test_reverted_activity_status(client, runner, client_database_injection_manager):
"""Test that reverted activity doesn't affect status/update/log/etc."""
input = client.path / "input"
write_and_commit_file(client.repository, input, "content")
output = client.path / "output"

assert 0 == runner.invoke(cli, ["run", "cat", input], stdout=output).exit_code
write_and_commit_file(client.repository, input, "changes")

with client_database_injection_manager(client):
activity_gateway = ActivityGateway()
activity_id = activity_gateway.get_all_activities()[0].id

assert 1 == runner.invoke(cli, ["status"]).exit_code
assert "output" in runner.invoke(cli, ["update", "--all", "--dry-run"]).output
assert "output" in runner.invoke(cli, ["workflow", "visualize", "output"]).output
assert activity_id in runner.invoke(cli, ["log"]).output
assert "input" in runner.invoke(cli, ["workflow", "inputs"]).output
assert "output" in runner.invoke(cli, ["workflow", "outputs"]).output

result = runner.invoke(cli, ["workflow", "revert", activity_id])

assert 0 == result.exit_code, format_result_exception(result)

assert 0 == runner.invoke(cli, ["status"]).exit_code
assert "output" not in runner.invoke(cli, ["update", "--all", "--dry-run"]).output
assert "output" not in runner.invoke(cli, ["workflow", "visualize", "output"]).output
assert activity_id not in runner.invoke(cli, ["log"]).output
assert "input" not in runner.invoke(cli, ["workflow", "inputs"]).output
assert "output" not in runner.invoke(cli, ["workflow", "outputs"]).output
38 changes: 1 addition & 37 deletions tests/core/commands/test_doctor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
from pathlib import Path

from renku.domain_model.dataset import Url
from renku.infrastructure.gateway.plan_gateway import PlanGateway
from renku.ui.cli import cli
from tests.utils import create_dummy_plan, format_result_exception, with_dataset
from tests.utils import format_result_exception, with_dataset


def test_new_project_is_ok(runner, project):
Expand Down Expand Up @@ -143,38 +142,3 @@ def test_fix_invalid_imported_dataset(runner, client_with_datasets, client_datab
# NOTE: Set both same_as and derived_from for a dataset
assert dataset.same_as.value == "http://example.com"
assert dataset.derived_from is None


def test_partially_deleted_plan_chain(runner, client, client_database_injection_manager):
"""Test fixing plans that are not entirely deleted."""
with client_database_injection_manager(client):
parent = create_dummy_plan("invalid-deleted-plan")
plan = parent.derive()
child = plan.derive()

plan.delete()

plan_gateway = PlanGateway()

plan_gateway.add(parent)
plan_gateway.add(plan)
plan_gateway.add(child)

assert not parent.deleted
assert plan.deleted
assert not child.deleted

client.repository.add(all=True)
client.repository.commit("Added plans")

result = runner.invoke(cli, ["doctor"])

assert 1 == result.exit_code, format_result_exception(result)
assert "There are plans that are not deleted correctly" in result.output
assert "invalid-deleted-plan" in result.output

result = runner.invoke(cli, ["doctor", "--fix"])

assert 0 == result.exit_code, format_result_exception(result)
assert "There are plans that are not deleted correctly" not in result.output
assert "Fixing plan 'invalid-deleted-plan'" in result.output
Loading

0 comments on commit a1fa5a0

Please sign in to comment.