diff --git a/renku/command/checks/__init__.py b/renku/command/checks/__init__.py index b75735fca6..39f03154d4 100644 --- a/renku/command/checks/__init__.py +++ b/renku/command/checks/__init__.py @@ -22,7 +22,6 @@ from .external import check_missing_external_files from .githooks import check_git_hooks_installed from .migration import check_migration -from .plan import check_deleted_plan_chain from .storage import check_lfs_info from .validate_shacl import check_datasets_structure, check_project_structure @@ -31,7 +30,6 @@ __all__ = ( "check_dataset_old_metadata_location", "check_datasets_structure", - "check_deleted_plan_chain", "check_git_hooks_installed", "check_invalid_datasets_derivation", "check_lfs_info", diff --git a/renku/command/checks/plan.py b/renku/command/checks/plan.py deleted file mode 100644 index 804cf55179..0000000000 --- a/renku/command/checks/plan.py +++ /dev/null @@ -1,55 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2020 - Swiss Data Science Center (SDSC) -# A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and -# Eidgenössische Technische Hochschule Zürich (ETHZ). -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Checks needed to determine integrity of plans.""" - -import click - -from renku.command.command_builder import inject -from renku.command.echo import WARNING -from renku.core.interface.plan_gateway import IPlanGateway -from renku.core.util import communication -from renku.core.workflow.plan import get_derivative_chain - - -@inject.autoparams("plan_gateway") -def check_deleted_plan_chain(client, fix, plan_gateway: IPlanGateway): - """Check that all plans in a derivation chain are deleted if one of them is deleted.""" - wrong_plans = set() - - plans = plan_gateway.get_all_plans() - for plan in plans: - if plan.deleted: - for derivative_plan in get_derivative_chain(plan): - if not derivative_plan.deleted: - if fix: - communication.info(f"Fixing plan '{plan.name}'") - derivative_plan.delete(when=plan.invalidated_at) - else: - wrong_plans.add(plan.name) - - if not wrong_plans: - return True, None - - problems = ( - WARNING - + "There are plans that are not deleted correctly (use 'renku doctor --fix' to fix them):\n\n\t" - + "\n\t".join(click.style(p, fg="yellow") for p in wrong_plans) - + "\n" - ) - - return False, problems diff --git a/renku/command/update.py b/renku/command/update.py index 606825e881..1549572a8a 100644 --- a/renku/command/update.py +++ b/renku/command/update.py @@ -30,6 +30,7 @@ from renku.core.workflow.activity import ( get_all_modified_and_deleted_activities_and_entities, get_downstream_generating_activities, + is_activity_valid, sort_activities, ) from renku.core.workflow.concrete_execution_graph import ExecutionGraph @@ -61,7 +62,7 @@ def _update( paths = get_relative_paths(base=client.path, paths=[Path.cwd() / p for p in paths]) modified, _ = get_all_modified_and_deleted_activities_and_entities(client.repository) - modified_activities = {a for a, _ in modified if a.is_activity_valid and not a.deleted} + modified_activities = {a for a, _ in modified if not a.deleted and is_activity_valid(a)} modified_paths = {e.path for _, e in modified} activities = get_downstream_generating_activities( diff --git a/renku/core/workflow/activity.py b/renku/core/workflow/activity.py index 8d02e0d80a..b5f35464df 100644 --- a/renku/core/workflow/activity.py +++ b/renku/core/workflow/activity.py @@ -31,7 +31,7 @@ from renku.core.interface.client_dispatcher import IClientDispatcher from renku.core.util import communication from renku.core.util.datetime8601 import local_now -from renku.core.workflow.plan import get_activities, remove_plan +from renku.core.workflow.plan import get_activities, is_plan_removed, remove_plan from renku.domain_model.entity import Entity from renku.domain_model.provenance.activity import Activity @@ -315,7 +315,7 @@ def has_an_existing_generation(activity) -> bool: for chain in downstream_chains: for activity in chain: - if not activity.is_activity_valid: + if not is_activity_valid(activity): # don't process further downstream activities as the plan in question was deleted break include_newest_activity(activity) @@ -467,9 +467,9 @@ def revert_generations(activity) -> Tuple[Set[str], Set[str]]: path = generation.entity.path generator_activities = activity_gateway.get_activities_by_generation(path=path) - generator_activities = [a for a in generator_activities if a.is_activity_valid and not a.deleted] + generator_activities = [a for a in generator_activities if is_activity_valid(a) and not a.deleted] latest_generator = get_latest_activity(generator_activities) - if latest_generator != activity: # NOTE: Another activity already generated the same path + if latest_generator != activity: # NOTE: A newer activity already generated the same path continue previous_generator = get_latest_activity_before(generator_activities, activity) @@ -514,3 +514,17 @@ def revert_generations(activity) -> Tuple[Set[str], Set[str]]: activity.delete(when=delete_time) return activity + + +@inject.autoparams() +def is_activity_valid(activity: Activity) -> bool: + """Return whether this plan has not been deleted. + + Args: + activity(Activity): The Activity whose Plan should be checked. + + Returns: + bool: True if the activities' Plan is still valid, False otherwise. + + """ + return not is_plan_removed(plan=activity.association.plan) diff --git a/renku/core/workflow/plan.py b/renku/core/workflow/plan.py index b80783be4b..edf3aa664f 100644 --- a/renku/core/workflow/plan.py +++ b/renku/core/workflow/plan.py @@ -31,12 +31,10 @@ @inject.autoparams() -def get_derivative_chain( - plan: Optional[AbstractPlan], plan_gateway: IPlanGateway -) -> Generator[AbstractPlan, None, None]: - """Return all plans in the derivative chain of a given plan including its parents/children and the plan itself.""" +def get_latest_plan(plan: Optional[AbstractPlan], plan_gateway: IPlanGateway) -> Optional[AbstractPlan]: + """Return the latest version of a given plan in its derivative chain.""" if plan is None: - return + return None all_plans = plan_gateway.get_all_plans() @@ -45,6 +43,19 @@ def get_derivative_chain( plan = child_plan child_plan = next((p for p in all_plans if p.derived_from is not None and p.derived_from == plan.id), None) + return plan + + +@inject.autoparams() +def get_derivative_chain( + plan: Optional[AbstractPlan], plan_gateway: IPlanGateway +) -> Generator[AbstractPlan, None, None]: + """Return all plans in the derivative chain of a given plan including its parents/children and the plan itself.""" + if plan is None: + return + + plan = get_latest_plan(plan) + while plan is not None: yield plan plan = plan_gateway.get_by_id(plan.derived_from) @@ -58,6 +69,7 @@ def remove_plan(name_or_id: str, force: bool, plan_gateway: IPlanGateway, when: name_or_id (str): The name of the Plan to remove. force (bool): Whether to force removal or not. plan_gateway(IPlanGateway): The injected Plan gateway. + when(datetime): Time of deletion (Default value = current local date/time). Raises: errors.ParameterError: If the Plan doesn't exist or was already deleted. """ @@ -65,15 +77,20 @@ def remove_plan(name_or_id: str, force: bool, plan_gateway: IPlanGateway, when: if not plan: raise errors.ParameterError(f"The specified workflow '{name_or_id}' cannot be found.") - elif plan.deleted: + + latest_version = get_latest_plan(plan) + + if latest_version.deleted: raise errors.ParameterError(f"The specified workflow '{name_or_id}' is already deleted.") if not force: prompt_text = f"You are about to remove the following workflow '{name_or_id}'.\n\nDo you wish to continue?" communication.confirm(prompt_text, abort=True, warning=True) - for derivative_plan in get_derivative_chain(plan): - derivative_plan.delete(when=when) + derived_plan = plan.derive() + derived_plan.delete(when=when) + + plan_gateway.add(derived_plan) @inject.autoparams() @@ -98,12 +115,20 @@ def get_activities(plan: Optional[AbstractPlan], activity_gateway: IActivityGate if plan is None: return - initial_id = get_initial_id(plan) + derivative_ids = [p.id for p in get_derivative_chain(plan=plan)] for activity in activity_gateway.get_all_activities(): - if not activity.is_activity_valid or activity.deleted: + if activity.deleted: continue - activity_plan = activity.association.plan - if activity.association.plan is plan or (initial_id == get_initial_id(activity_plan)): + if activity.association.plan.id in derivative_ids: yield activity + + +def is_plan_removed(plan: AbstractPlan) -> bool: + """Return true if the plan or any plan in its derivative chain is deleted.""" + for derived_plan in get_derivative_chain(plan): + if derived_plan.deleted: + return True + + return False diff --git a/renku/core/workflow/run.py b/renku/core/workflow/run.py index 961377e2fc..3837b97b0e 100644 --- a/renku/core/workflow/run.py +++ b/renku/core/workflow/run.py @@ -27,6 +27,7 @@ from renku.core.workflow.activity import ( get_all_modified_and_deleted_activities_and_entities, get_downstream_generating_activities, + is_activity_valid, ) @@ -70,8 +71,8 @@ def mark_generations_as_stale(activity): modified, deleted = get_all_modified_and_deleted_activities_and_entities(client.repository) - modified = {(a, e) for a, e in modified if a.is_activity_valid} - deleted = {(a, e) for a, e in deleted if a.is_activity_valid} + modified = {(a, e) for a, e in modified if is_activity_valid(a)} + deleted = {(a, e) for a, e in deleted if is_activity_valid(a)} if not modified and not deleted: return StatusResult({}, {}, set(), set()) diff --git a/renku/domain_model/provenance/activity.py b/renku/domain_model/provenance/activity.py index 9f461d075f..55c174f111 100644 --- a/renku/domain_model/provenance/activity.py +++ b/renku/domain_model/provenance/activity.py @@ -236,11 +236,6 @@ def deleted(self) -> bool: """Return if the activity was deleted.""" return self.invalidated_at is not None - @property - def is_activity_valid(self) -> bool: - """Return if the activity or its plan is not deleted.""" - return not self.association.plan.deleted - @staticmethod def generate_id(uuid: Optional[str] = None) -> str: """Generate an identifier for an activity.""" diff --git a/renku/ui/cli/workflow.py b/renku/ui/cli/workflow.py index 08711108b4..65ee029210 100644 --- a/renku/ui/cli/workflow.py +++ b/renku/ui/cli/workflow.py @@ -626,8 +626,8 @@ Renku allows you to undo a Run in a project by using ``renku workflow revert ``. You can obtain from the ``renku log`` command. -If the deleted run generated some files, Renku either deleted these files (in -case there are no earlier version of them and they are not used in other +If the deleted run generated some files, Renku either deletes these files (in +case there are no earlier versions of them and they are not used in other activities) or revert them to their earlier versions. You can ask Renku to keep the generated files and only delete the metadata by passing the ``--metadata-only`` option. @@ -637,7 +637,7 @@ dataset for example. Make sure that the project doesn't use such files in other places or always use ``--metadata-only`` option when reverting a run. -If you want to delete run along with its plan use the ``--plan`` option. +If you want to delete a run along with its plan use the ``--plan`` option. This only deletes the plan if it's not used by any other activity. Renku won't remove a run if there are downstream runs that depend on it. The diff --git a/tests/cli/test_workflow.py b/tests/cli/test_workflow.py index 9ccfe1f654..ffeb5977db 100644 --- a/tests/cli/test_workflow.py +++ b/tests/cli/test_workflow.py @@ -35,6 +35,7 @@ from renku.core.plugin.provider import available_workflow_providers from renku.core.util.yaml import write_yaml from renku.infrastructure.database import Database +from renku.infrastructure.gateway.activity_gateway import ActivityGateway from renku.ui.cli import cli from tests.utils import format_result_exception, write_and_commit_file @@ -1208,3 +1209,35 @@ def test_workflow_templated_params(runner, run_shell, client, capsys, workflow, for o in outputs: assert Path(o).resolve().exists() + + +def test_reverted_activity_status(client, runner, client_database_injection_manager): + """Test that reverted activity doesn't affect status/update/log/etc.""" + input = client.path / "input" + write_and_commit_file(client.repository, input, "content") + output = client.path / "output" + + assert 0 == runner.invoke(cli, ["run", "cat", input], stdout=output).exit_code + write_and_commit_file(client.repository, input, "changes") + + with client_database_injection_manager(client): + activity_gateway = ActivityGateway() + activity_id = activity_gateway.get_all_activities()[0].id + + assert 1 == runner.invoke(cli, ["status"]).exit_code + assert "output" in runner.invoke(cli, ["update", "--all", "--dry-run"]).output + assert "output" in runner.invoke(cli, ["workflow", "visualize", "output"]).output + assert activity_id in runner.invoke(cli, ["log"]).output + assert "input" in runner.invoke(cli, ["workflow", "inputs"]).output + assert "output" in runner.invoke(cli, ["workflow", "outputs"]).output + + result = runner.invoke(cli, ["workflow", "revert", activity_id]) + + assert 0 == result.exit_code, format_result_exception(result) + + assert 0 == runner.invoke(cli, ["status"]).exit_code + assert "output" not in runner.invoke(cli, ["update", "--all", "--dry-run"]).output + assert "output" not in runner.invoke(cli, ["workflow", "visualize", "output"]).output + assert activity_id not in runner.invoke(cli, ["log"]).output + assert "input" not in runner.invoke(cli, ["workflow", "inputs"]).output + assert "output" not in runner.invoke(cli, ["workflow", "outputs"]).output diff --git a/tests/core/commands/test_doctor.py b/tests/core/commands/test_doctor.py index 0a8a6812b4..3773d265fc 100644 --- a/tests/core/commands/test_doctor.py +++ b/tests/core/commands/test_doctor.py @@ -19,9 +19,8 @@ from pathlib import Path from renku.domain_model.dataset import Url -from renku.infrastructure.gateway.plan_gateway import PlanGateway from renku.ui.cli import cli -from tests.utils import create_dummy_plan, format_result_exception, with_dataset +from tests.utils import format_result_exception, with_dataset def test_new_project_is_ok(runner, project): @@ -143,38 +142,3 @@ def test_fix_invalid_imported_dataset(runner, client_with_datasets, client_datab # NOTE: Set both same_as and derived_from for a dataset assert dataset.same_as.value == "http://example.com" assert dataset.derived_from is None - - -def test_partially_deleted_plan_chain(runner, client, client_database_injection_manager): - """Test fixing plans that are not entirely deleted.""" - with client_database_injection_manager(client): - parent = create_dummy_plan("invalid-deleted-plan") - plan = parent.derive() - child = plan.derive() - - plan.delete() - - plan_gateway = PlanGateway() - - plan_gateway.add(parent) - plan_gateway.add(plan) - plan_gateway.add(child) - - assert not parent.deleted - assert plan.deleted - assert not child.deleted - - client.repository.add(all=True) - client.repository.commit("Added plans") - - result = runner.invoke(cli, ["doctor"]) - - assert 1 == result.exit_code, format_result_exception(result) - assert "There are plans that are not deleted correctly" in result.output - assert "invalid-deleted-plan" in result.output - - result = runner.invoke(cli, ["doctor", "--fix"]) - - assert 0 == result.exit_code, format_result_exception(result) - assert "There are plans that are not deleted correctly" not in result.output - assert "Fixing plan 'invalid-deleted-plan'" in result.output diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index 903a42dba1..102a71389d 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -20,9 +20,17 @@ import pytest from renku.core import errors -from renku.core.workflow.plan import get_derivative_chain, get_initial_id, remove_plan +from renku.core.workflow.plan import ( + get_activities, + get_derivative_chain, + get_initial_id, + get_latest_plan, + is_plan_removed, + remove_plan, +) +from renku.infrastructure.gateway.activity_gateway import ActivityGateway from renku.infrastructure.gateway.plan_gateway import PlanGateway -from tests.utils import create_dummy_plan +from tests.utils import create_dummy_activity, create_dummy_plan def create_dummy_plans(): @@ -45,6 +53,21 @@ def create_dummy_plans(): return grand_parent, parent, plan, child, grand_child, unrelated +def test_get_latest_plan(injected_client): + """Test getting latest plan in a derivative chain.""" + grand_parent, parent, plan, child, grand_child, unrelated = create_dummy_plans() + + latest_plan = get_latest_plan(plan) + + assert latest_plan is grand_child + assert get_latest_plan(grand_parent) is grand_child + assert get_latest_plan(parent) is grand_child + assert get_latest_plan(child) is grand_child + assert get_latest_plan(grand_child) is grand_child + + assert get_latest_plan(unrelated) is unrelated + + def test_plan_get_derivatives_chain(injected_client): """Test getting plans that have parent/child relation.""" grand_parent, parent, plan, child, grand_child, unrelated = create_dummy_plans() @@ -62,50 +85,48 @@ def test_plan_get_derivatives_chain(injected_client): assert [] == list(get_derivative_chain(None)) -def test_plan_delete(injected_client): +def test_plan_remove(injected_client): """Test deleting a plan.""" - _, _, plan, _, _, _ = create_dummy_plans() - - assert not plan.deleted + grand_parent, parent, plan, child, grand_child, unrelated = create_dummy_plans() - # NOTE: Remove by name + # Remove by name remove_plan(name_or_id=plan.name, force=True) - assert plan.deleted + # NOTE: We don't delete the plan itself + assert not grand_parent.deleted + assert not parent.deleted + assert not plan.deleted + assert not child.deleted + assert not grand_child.deleted + assert not unrelated.deleted - # NOTE: Remove a deleted plan by id - with pytest.raises(errors.ParameterError, match="is already deleted"): - remove_plan(name_or_id=plan.id, force=True) + last_derivative = get_latest_plan(plan) - # NOTE: Remove a non-existing plan - with pytest.raises(errors.ParameterError, match="cannot be found"): - remove_plan(name_or_id="non-existing", force=True) + assert grand_child.id == last_derivative.derived_from + assert last_derivative.deleted + assert is_plan_removed(grand_parent) + assert is_plan_removed(parent) + assert is_plan_removed(plan) + assert is_plan_removed(child) + assert is_plan_removed(grand_child) + assert not is_plan_removed(unrelated) -def test_plan_delete_derivatives_chain(injected_client): - """Test deleting a plan will delete all its parent/child chain.""" - grand_parent, parent, plan, child, grand_child, unrelated = create_dummy_plans() - assert not plan.deleted +def test_plan_delete_errors(injected_client): + """Test deleting a deleted plan or a non-existing plan.""" + _, _, plan, _, _, _ = create_dummy_plans() - # NOTE: Remove by id + # Remove by id remove_plan(name_or_id=plan.id, force=True) - assert plan.deleted - assert grand_parent.deleted - assert parent.deleted - assert child.deleted - assert grand_child.deleted - assert not unrelated.deleted - - # NOTE: Remove by name - remove_plan(name_or_id=unrelated.name, force=True) - - assert unrelated.deleted - - # NOTE: Remove a deleted plan + # Remove a deleted plan with pytest.raises(errors.ParameterError, match="is already deleted"): - remove_plan(name_or_id=unrelated.name, force=True) + remove_plan(name_or_id=plan.id, force=True) + + # Remove a non-existing plan + with pytest.raises(errors.ParameterError, match="cannot be found"): + remove_plan(name_or_id="non-existing", force=True) def test_plan_get_initial_id(injected_client): @@ -118,3 +139,24 @@ def test_plan_get_initial_id(injected_client): assert initial_id == get_initial_id(parent) assert initial_id == get_initial_id(child) assert initial_id == get_initial_id(grand_child) + + +def test_get_activities(injected_client): + """Test getting activities of a plan.""" + grand_parent, parent, plan, child, grand_child, unrelated = create_dummy_plans() + activities = [ + create_dummy_activity(plan), + create_dummy_activity(grand_parent), + create_dummy_activity(grand_child), + create_dummy_activity(child), + create_dummy_activity(plan), + create_dummy_activity(unrelated), + create_dummy_activity("other-plan"), + ] + activity_gateway = ActivityGateway() + for activity in activities: + activity_gateway.add(activity) + + plan_activities = set(get_activities(plan)) + + assert set(activities[0:5]) == plan_activities