From cbe685d06917be548b9143384894f86505345cd7 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Fri, 17 May 2024 13:26:45 +0200 Subject: [PATCH 1/9] added check to fail on start --- .../projects/_nodes_handlers.py | 7 +++++++ .../simcore_service_webserver/projects/db.py | 20 +++++++++++++++++++ .../projects/exceptions.py | 8 ++++++++ .../projects/projects_api.py | 2 ++ 4 files changed, 37 insertions(+) diff --git a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py index e3e22c01064..255b797de1b 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py @@ -74,6 +74,7 @@ DefaultPricingUnitNotFoundError, NodeNotFoundError, ProjectInvalidRightsError, + ProjectNodeRequiredInputsNotSetError, ProjectNodeResourcesInsufficientRightsError, ProjectNodeResourcesInvalidError, ProjectNotFoundError, @@ -318,6 +319,12 @@ async def start_node(request: web.Request) -> web.Response: raise web.HTTPConflict(reason=f"{exc}") from exc except ClustersKeeperNotAvailableError as exc: raise web.HTTPServiceUnavailable(reason=f"{exc}") from exc + except ProjectNodeRequiredInputsNotSetError as exc: + raise web.HTTPConflict( + reason=f"{exc}", + text=f"{exc}", + content_type=MIMETYPE_APPLICATION_JSON, + ) from exc async def _stop_dynamic_service_task( diff --git a/services/web/server/src/simcore_service_webserver/projects/db.py b/services/web/server/src/simcore_service_webserver/projects/db.py index e551efc3428..ea5b0febd12 100644 --- a/services/web/server/src/simcore_service_webserver/projects/db.py +++ b/services/web/server/src/simcore_service_webserver/projects/db.py @@ -78,6 +78,7 @@ from .exceptions import ( ProjectDeleteError, ProjectInvalidRightsError, + ProjectNodeRequiredInputsNotSetError, ProjectNodeResourcesInsufficientRightsError, ProjectNotFoundError, ) @@ -444,6 +445,25 @@ async def get_project( project_type, ) + async def check_project_node_has_all_required_inputs( + self, user_id: UserID, project_uuid: NodeID + ) -> None: + project_dict, _ = await self.get_project(user_id, f"{project_uuid}") + workbench = project_dict["workbench"] + required_inputs = workbench["required_inputs"] + inputs = workbench["inputs"] + inputs_set = set(inputs.keys()) + + missing_required_inputs = [ + required_input + for required_input in required_inputs + if required_input not in inputs_set + ] + if missing_required_inputs: + raise ProjectNodeRequiredInputsNotSetError( + missing_required_inputs=missing_required_inputs + ) + # NOTE: MD: I intentionally didn't include the workbench. There is a special interface # for the workbench, and at some point, this column should be removed from the table. # The same holds true for access_rights/ui/classifiers/quality, but we have decided to proceed step by step. diff --git a/services/web/server/src/simcore_service_webserver/projects/exceptions.py b/services/web/server/src/simcore_service_webserver/projects/exceptions.py index 27db44fda07..7728d9f4af5 100644 --- a/services/web/server/src/simcore_service_webserver/projects/exceptions.py +++ b/services/web/server/src/simcore_service_webserver/projects/exceptions.py @@ -1,4 +1,5 @@ """Defines the different exceptions that may arise in the projects subpackage""" + from typing import Any import redis.exceptions @@ -118,6 +119,13 @@ class ProjectNodeResourcesInsufficientRightsError(BaseProjectError): ... +class ProjectNodeRequiredInputsNotSetError(BaseProjectError): + msg_template = ( + "The following 'missing_required_inputs={missing_required_inputs}' must be set. " + "Please set them and try to start the service again." + ) + + class DefaultPricingUnitNotFoundError(BaseProjectError): msg_template = "Default pricing unit not found for node '{node_uuid}' in project '{project_uuid}'" diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index e6c5587a7cc..656e1250599 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -463,6 +463,8 @@ async def _start_dynamic_service( db: ProjectDBAPI = ProjectDBAPI.get_from_app_context(request.app) + await db.check_project_node_has_all_required_inputs(user_id, project_uuid) + save_state = False user_role: UserRole = await get_user_role(request.app, user_id) if user_role > UserRole.GUEST: From 44c12154face7ab99cb0dbaae7251c64b7a3f85c Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 21 May 2024 14:05:43 +0200 Subject: [PATCH 2/9] WIP first base version --- .../simcore_service_webserver/projects/db.py | 62 ++++++++++++++----- .../projects/exceptions.py | 5 +- .../projects/projects_api.py | 17 ++++- 3 files changed, 64 insertions(+), 20 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/db.py b/services/web/server/src/simcore_service_webserver/projects/db.py index ea5b0febd12..e9a5b39ca0c 100644 --- a/services/web/server/src/simcore_service_webserver/projects/db.py +++ b/services/web/server/src/simcore_service_webserver/projects/db.py @@ -17,7 +17,7 @@ from aiopg.sa.result import ResultProxy, RowProxy from models_library.projects import ProjectID, ProjectIDStr from models_library.projects_comments import CommentID, ProjectsCommentsDB -from models_library.projects_nodes import Node +from models_library.projects_nodes import Node, OutputsDict from models_library.projects_nodes_io import NodeID, NodeIDStr from models_library.resource_tracker import ( PricingPlanAndUnitIdsTuple, @@ -45,6 +45,7 @@ ProjectNodesRepo, ) from simcore_postgres_database.webserver_models import ProjectType, projects, users +from simcore_sdk.node_ports_v2.links import PortLink from sqlalchemy import func, literal_column from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.sql import and_ @@ -446,23 +447,54 @@ async def get_project( ) async def check_project_node_has_all_required_inputs( - self, user_id: UserID, project_uuid: NodeID + self, user_id: UserID, project_uuid: ProjectID, node_id: NodeID ) -> None: project_dict, _ = await self.get_project(user_id, f"{project_uuid}") workbench = project_dict["workbench"] - required_inputs = workbench["required_inputs"] - inputs = workbench["inputs"] - inputs_set = set(inputs.keys()) - - missing_required_inputs = [ - required_input - for required_input in required_inputs - if required_input not in inputs_set - ] - if missing_required_inputs: - raise ProjectNodeRequiredInputsNotSetError( - missing_required_inputs=missing_required_inputs - ) + + nodes_map: dict[str, Node] = {i: Node(**workbench[i]) for i in workbench} + node = nodes_map[f"{node_id}"] + + unset_required_inputs: list[str] = [] + unset_outputs_in_upstream: list[tuple[str, str]] = [] + + def _check_required_input(required_input_key: str) -> None: + input_entry: PortLink | None = None + if node.inputs: + input_entry = node.inputs.get(required_input_key, None) + if input_entry is None: + # NOT linked to any node connect service or set value manually(whichever applies) + unset_required_inputs.append(required_input_key) + return + + source_node_id: str = f"{input_entry.node_uuid}" + source_output_key = input_entry.output + + source_node = nodes_map[source_node_id] + + output_entry: OutputsDict | None = None + if source_node.outputs: + output_entry = source_node.outputs.get(source_output_key, None) + if output_entry is None: + unset_outputs_in_upstream.append((source_output_key, source_node.label)) + + for required_input in node.inputs_required: + _check_required_input(required_input) + + node_with_required_inputs = node.label + if unset_required_inputs: + msg = f"Missing '{', '.join(unset_required_inputs)}' connection(s) to '{node_with_required_inputs}'" + raise ProjectNodeRequiredInputsNotSetError(msg) + + if unset_outputs_in_upstream: + start_messages = [ + f"'{input_key}' of '{service_name}'" + for input_key, service_name in unset_outputs_in_upstream + ] + msg = f"Missing: {', '.join(start_messages)}" + raise ProjectNodeRequiredInputsNotSetError(msg) + + # TODO: evaluate about sending thsi via socketio so that the frontedn can bette react to it # NOTE: MD: I intentionally didn't include the workbench. There is a special interface # for the workbench, and at some point, this column should be removed from the table. diff --git a/services/web/server/src/simcore_service_webserver/projects/exceptions.py b/services/web/server/src/simcore_service_webserver/projects/exceptions.py index 7728d9f4af5..453da229925 100644 --- a/services/web/server/src/simcore_service_webserver/projects/exceptions.py +++ b/services/web/server/src/simcore_service_webserver/projects/exceptions.py @@ -120,10 +120,7 @@ class ProjectNodeResourcesInsufficientRightsError(BaseProjectError): class ProjectNodeRequiredInputsNotSetError(BaseProjectError): - msg_template = ( - "The following 'missing_required_inputs={missing_required_inputs}' must be set. " - "Please set them and try to start the service again." - ) + ... class DefaultPricingUnitNotFoundError(BaseProjectError): diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index 656e1250599..9c7a2878527 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -123,6 +123,7 @@ NodeNotFoundError, ProjectInvalidRightsError, ProjectLockError, + ProjectNodeRequiredInputsNotSetError, ProjectNodeResourcesInvalidError, ProjectOwnerNotFoundInTheProjectAccessRightsError, ProjectStartsTooManyDynamicNodesError, @@ -455,6 +456,7 @@ async def _start_dynamic_service( user_id: UserID, project_uuid: ProjectID, node_uuid: NodeID, + graceful_start: bool = False, ) -> None: if not _is_node_dynamic(service_key): return @@ -463,7 +465,19 @@ async def _start_dynamic_service( db: ProjectDBAPI = ProjectDBAPI.get_from_app_context(request.app) - await db.check_project_node_has_all_required_inputs(user_id, project_uuid) + try: + await db.check_project_node_has_all_required_inputs( + user_id, project_uuid, node_uuid + ) + except ProjectNodeRequiredInputsNotSetError as e: + if graceful_start: + log.info( + "Did not start '%s' because of missing required inputs: %s", + node_uuid, + e, + ) + return + raise save_state = False user_role: UserRole = await get_user_role(request.app, user_id) @@ -1457,6 +1471,7 @@ async def run_project_dynamic_services( user_id=user_id, project_uuid=project["uuid"], node_uuid=NodeID(service_uuid), + graceful_start=True, ) for service_uuid, is_deprecated in zip( services_to_start_uuids, deprecated_services, strict=True From 879333e6dc006c8b6810533de1bfd15882006326 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Wed, 22 May 2024 16:05:46 +0200 Subject: [PATCH 3/9] added tests --- .../simcore_service_webserver/projects/db.py | 3 +- .../tests/unit/with_dbs/03/test_project_db.py | 112 ++++++++++++++++++ 2 files changed, 114 insertions(+), 1 deletion(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/db.py b/services/web/server/src/simcore_service_webserver/projects/db.py index e9a5b39ca0c..769274fe706 100644 --- a/services/web/server/src/simcore_service_webserver/projects/db.py +++ b/services/web/server/src/simcore_service_webserver/projects/db.py @@ -494,7 +494,8 @@ def _check_required_input(required_input_key: str) -> None: msg = f"Missing: {', '.join(start_messages)}" raise ProjectNodeRequiredInputsNotSetError(msg) - # TODO: evaluate about sending thsi via socketio so that the frontedn can bette react to it + # TODO: figure out if we also need to propagate this via socketio to the FE for better formatting + # especially when opening the project, since the node cannot start if the required inputs are missing # NOTE: MD: I intentionally didn't include the workbench. There is a special interface # for the workbench, and at some point, this column should be removed from the table. diff --git a/services/web/server/tests/unit/with_dbs/03/test_project_db.py b/services/web/server/tests/unit/with_dbs/03/test_project_db.py index 67787b779cd..69894d58f92 100644 --- a/services/web/server/tests/unit/with_dbs/03/test_project_db.py +++ b/services/web/server/tests/unit/with_dbs/03/test_project_db.py @@ -33,6 +33,7 @@ from simcore_service_webserver.projects.db import ProjectAccessRights, ProjectDBAPI from simcore_service_webserver.projects.exceptions import ( NodeNotFoundError, + ProjectNodeRequiredInputsNotSetError, ProjectNotFoundError, ) from simcore_service_webserver.projects.models import ProjectDict @@ -834,3 +835,114 @@ async def test_has_permission( await db_api.has_permission(second_user["id"], project_id, permission) is access_rights[permission] ), f"Found unexpected {permission=} for {access_rights=} of {user_role=} and {project_id=}" + + +def _fake_output_data() -> dict: + return { + "store": 0, + "path": "9f8207e6-144a-11ef-831f-0242ac140027/98b68cbe-9e22-4eb5-a91b-2708ad5317b7/outputs/output_2/output_2.zip", + "eTag": "ec3bc734d85359b660aab400147cd1ea", + } + + +def _fake_connect_to(output_number: int) -> dict: + return { + "nodeUuid": "98b68cbe-9e22-4eb5-a91b-2708ad5317b7", + "output": f"output_{output_number}", + } + + +@pytest.fixture +async def inserted_project( + logged_user: dict[str, Any], + insert_project_in_db: Callable[..., Awaitable[dict[str, Any]]], + fake_project: dict[str, Any], + downstream_inputs: dict, + downstream_required_inputs: list[str], + upstream_outputs: dict, +) -> dict: + fake_project["workbench"] = { + "98b68cbe-9e22-4eb5-a91b-2708ad5317b7": { + "key": "simcore/services/dynamic/jupyter-math", + "version": "2.0.10", + "label": "upstream", + "inputs": {}, + "inputsUnits": {}, + "inputNodes": [], + "thumbnail": "", + "outputs": upstream_outputs, + "runHash": "c6ae58f36a2e0f65f443441ecda023a451cb1b8051d01412d79aa03653e1a6b3", + }, + "324d6ef2-a82c-414d-9001-dc84da1cbea3": { + "key": "simcore/services/dynamic/jupyter-math", + "version": "2.0.10", + "label": "downstream", + "inputs": downstream_inputs, + "inputsUnits": {}, + "inputNodes": ["98b68cbe-9e22-4eb5-a91b-2708ad5317b7"], + "thumbnail": "", + "inputsRequired": downstream_required_inputs, + }, + } + + return await insert_project_in_db(fake_project, user_id=logged_user["id"]) + + +@pytest.mark.parametrize( + "downstream_inputs,downstream_required_inputs,upstream_outputs,expected_error", + [ + pytest.param( + {"input_1": _fake_connect_to(1)}, + ["input_1", "input_2"], + {}, + "Missing 'input_2' connection(s) to 'downstream'", + id="missing_connection_on_input_2", + ), + pytest.param( + {"input_1": _fake_connect_to(1), "input_2": _fake_connect_to(2)}, + ["input_1", "input_2"], + {"output_2": _fake_output_data()}, + "Missing: 'output_1' of 'upstream'", + id="output_1_has_not_file", + ), + ], +) +@pytest.mark.parametrize("user_role", [(UserRole.USER)]) +async def test_check_project_node_has_all_required_inputs_raises( + logged_user: dict[str, Any], + db_api: ProjectDBAPI, + inserted_project: dict, + expected_error: str, +): + + with pytest.raises(ProjectNodeRequiredInputsNotSetError) as exc: + await db_api.check_project_node_has_all_required_inputs( + user_id=logged_user["id"], + project_uuid=UUID(inserted_project["uuid"]), + node_id=UUID("324d6ef2-a82c-414d-9001-dc84da1cbea3"), + ) + assert f"{exc.value}" == expected_error + + +@pytest.mark.parametrize( + "downstream_inputs,downstream_required_inputs,upstream_outputs", + [ + pytest.param( + {"input_1": _fake_connect_to(1), "input_2": _fake_connect_to(2)}, + ["input_1", "input_2"], + {"output_1": _fake_output_data(), "output_2": _fake_output_data()}, + id="with_required_inputs_present", + ), + ], +) +@pytest.mark.parametrize("user_role", [(UserRole.USER)]) +async def test_check_project_node_has_all_required_inputs_ok( + logged_user: dict[str, Any], + db_api: ProjectDBAPI, + inserted_project: dict, +): + await db_api.check_project_node_has_all_required_inputs( + user_id=logged_user["id"], + project_uuid=UUID(inserted_project["uuid"]), + node_id=UUID("324d6ef2-a82c-414d-9001-dc84da1cbea3"), + ) From b5a18a5bf67518e2977c8bebf5dadca37fd47d4f Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Wed, 29 May 2024 14:34:50 +0200 Subject: [PATCH 4/9] remove note, not required --- .../web/server/src/simcore_service_webserver/projects/db.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/db.py b/services/web/server/src/simcore_service_webserver/projects/db.py index 769274fe706..f5b314fe85f 100644 --- a/services/web/server/src/simcore_service_webserver/projects/db.py +++ b/services/web/server/src/simcore_service_webserver/projects/db.py @@ -494,9 +494,6 @@ def _check_required_input(required_input_key: str) -> None: msg = f"Missing: {', '.join(start_messages)}" raise ProjectNodeRequiredInputsNotSetError(msg) - # TODO: figure out if we also need to propagate this via socketio to the FE for better formatting - # especially when opening the project, since the node cannot start if the required inputs are missing - # NOTE: MD: I intentionally didn't include the workbench. There is a special interface # for the workbench, and at some point, this column should be removed from the table. # The same holds true for access_rights/ui/classifiers/quality, but we have decided to proceed step by step. From edc6bfeccb0403914c0d87e8dbf3252859b398ce Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 3 Jun 2024 11:57:42 +0200 Subject: [PATCH 5/9] refactor --- .../projects/_nodes_handlers.py | 6 +- .../simcore_service_webserver/projects/db.py | 52 +--------------- .../projects/projects_api.py | 59 +++++++++++++++++-- .../tests/unit/with_dbs/03/test_project_db.py | 9 ++- 4 files changed, 64 insertions(+), 62 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py index 18c39e0122f..cccabfa3775 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py @@ -107,11 +107,7 @@ async def wrapper(request: web.Request) -> web.StreamResponse: except ClustersKeeperNotAvailableError as exc: raise web.HTTPServiceUnavailable(reason=f"{exc}") from exc except ProjectNodeRequiredInputsNotSetError as exc: - raise web.HTTPConflict( - reason=f"{exc}", - text=f"{exc}", - content_type=MIMETYPE_APPLICATION_JSON, - ) from exc + raise web.HTTPConflict(reason=f"{exc}", text=f"{exc}") from exc return wrapper diff --git a/services/web/server/src/simcore_service_webserver/projects/db.py b/services/web/server/src/simcore_service_webserver/projects/db.py index f5b314fe85f..e551efc3428 100644 --- a/services/web/server/src/simcore_service_webserver/projects/db.py +++ b/services/web/server/src/simcore_service_webserver/projects/db.py @@ -17,7 +17,7 @@ from aiopg.sa.result import ResultProxy, RowProxy from models_library.projects import ProjectID, ProjectIDStr from models_library.projects_comments import CommentID, ProjectsCommentsDB -from models_library.projects_nodes import Node, OutputsDict +from models_library.projects_nodes import Node from models_library.projects_nodes_io import NodeID, NodeIDStr from models_library.resource_tracker import ( PricingPlanAndUnitIdsTuple, @@ -45,7 +45,6 @@ ProjectNodesRepo, ) from simcore_postgres_database.webserver_models import ProjectType, projects, users -from simcore_sdk.node_ports_v2.links import PortLink from sqlalchemy import func, literal_column from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.sql import and_ @@ -79,7 +78,6 @@ from .exceptions import ( ProjectDeleteError, ProjectInvalidRightsError, - ProjectNodeRequiredInputsNotSetError, ProjectNodeResourcesInsufficientRightsError, ProjectNotFoundError, ) @@ -446,54 +444,6 @@ async def get_project( project_type, ) - async def check_project_node_has_all_required_inputs( - self, user_id: UserID, project_uuid: ProjectID, node_id: NodeID - ) -> None: - project_dict, _ = await self.get_project(user_id, f"{project_uuid}") - workbench = project_dict["workbench"] - - nodes_map: dict[str, Node] = {i: Node(**workbench[i]) for i in workbench} - node = nodes_map[f"{node_id}"] - - unset_required_inputs: list[str] = [] - unset_outputs_in_upstream: list[tuple[str, str]] = [] - - def _check_required_input(required_input_key: str) -> None: - input_entry: PortLink | None = None - if node.inputs: - input_entry = node.inputs.get(required_input_key, None) - if input_entry is None: - # NOT linked to any node connect service or set value manually(whichever applies) - unset_required_inputs.append(required_input_key) - return - - source_node_id: str = f"{input_entry.node_uuid}" - source_output_key = input_entry.output - - source_node = nodes_map[source_node_id] - - output_entry: OutputsDict | None = None - if source_node.outputs: - output_entry = source_node.outputs.get(source_output_key, None) - if output_entry is None: - unset_outputs_in_upstream.append((source_output_key, source_node.label)) - - for required_input in node.inputs_required: - _check_required_input(required_input) - - node_with_required_inputs = node.label - if unset_required_inputs: - msg = f"Missing '{', '.join(unset_required_inputs)}' connection(s) to '{node_with_required_inputs}'" - raise ProjectNodeRequiredInputsNotSetError(msg) - - if unset_outputs_in_upstream: - start_messages = [ - f"'{input_key}' of '{service_name}'" - for input_key, service_name in unset_outputs_in_upstream - ] - msg = f"Missing: {', '.join(start_messages)}" - raise ProjectNodeRequiredInputsNotSetError(msg) - # NOTE: MD: I intentionally didn't include the workbench. There is a special interface # for the workbench, and at some point, this column should be removed from the table. # The same holds true for access_rights/ui/classifiers/quality, but we have decided to proceed step by step. diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index c9bc000e1f5..4a51284d190 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -34,8 +34,8 @@ from models_library.errors import ErrorDict from models_library.products import ProductName from models_library.projects import Project, ProjectID, ProjectIDStr -from models_library.projects_nodes import Node -from models_library.projects_nodes_io import NodeID, NodeIDStr +from models_library.projects_nodes import Node, OutputsDict +from models_library.projects_nodes_io import NodeID, NodeIDStr, PortLink from models_library.projects_state import ( Owner, ProjectLocked, @@ -448,6 +448,57 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool: raise ClustersKeeperNotAvailableError from exc +async def check_project_node_has_all_required_inputs( + db: ProjectDBAPI, user_id: UserID, project_uuid: ProjectID, node_id: NodeID +) -> None: + + project_dict, _ = await db.get_project(user_id, f"{project_uuid}") + + nodes_map: dict[NodeID, Node] = { + NodeID(k): Node(**v) for k, v in project_dict["workbench"].items() + } + node = nodes_map[node_id] + + unset_required_inputs: list[str] = [] + unset_outputs_in_upstream: list[tuple[str, str]] = [] + + def _check_required_input(required_input_key: str) -> None: + input_entry: PortLink | None = None + if node.inputs: + input_entry = node.inputs.get(required_input_key, None) + if input_entry is None: + # NOT linked to any node connect service or set value manually(whichever applies) + unset_required_inputs.append(required_input_key) + return + + source_node_id: NodeID = input_entry.node_uuid + source_output_key = input_entry.output + + source_node = nodes_map[source_node_id] + + output_entry: OutputsDict | None = None + if source_node.outputs: + output_entry = source_node.outputs.get(source_output_key, None) + if output_entry is None: + unset_outputs_in_upstream.append((source_output_key, source_node.label)) + + for required_input in node.inputs_required: + _check_required_input(required_input) + + node_with_required_inputs = node.label + if unset_required_inputs: + msg = f"Missing '{', '.join(unset_required_inputs)}' connection(s) to '{node_with_required_inputs}'" + raise ProjectNodeRequiredInputsNotSetError(msg) + + if unset_outputs_in_upstream: + start_messages = [ + f"'{input_key}' of '{service_name}'" + for input_key, service_name in unset_outputs_in_upstream + ] + msg = f"Missing: {', '.join(start_messages)}" + raise ProjectNodeRequiredInputsNotSetError(msg) + + async def _start_dynamic_service( request: web.Request, *, @@ -467,8 +518,8 @@ async def _start_dynamic_service( db: ProjectDBAPI = ProjectDBAPI.get_from_app_context(request.app) try: - await db.check_project_node_has_all_required_inputs( - user_id, project_uuid, node_uuid + await check_project_node_has_all_required_inputs( + db, user_id, project_uuid, node_uuid ) except ProjectNodeRequiredInputsNotSetError as e: if graceful_start: diff --git a/services/web/server/tests/unit/with_dbs/03/test_project_db.py b/services/web/server/tests/unit/with_dbs/03/test_project_db.py index c0baf18306d..e86a8e13180 100644 --- a/services/web/server/tests/unit/with_dbs/03/test_project_db.py +++ b/services/web/server/tests/unit/with_dbs/03/test_project_db.py @@ -37,6 +37,9 @@ ProjectNotFoundError, ) from simcore_service_webserver.projects.models import ProjectDict +from simcore_service_webserver.projects.projects_api import ( + check_project_node_has_all_required_inputs, +) from simcore_service_webserver.users.exceptions import UserNotFoundError from simcore_service_webserver.utils import to_datetime from sqlalchemy.engine.result import Row @@ -911,7 +914,8 @@ async def test_check_project_node_has_all_required_inputs_raises( ): with pytest.raises(ProjectNodeRequiredInputsNotSetError) as exc: - await db_api.check_project_node_has_all_required_inputs( + await check_project_node_has_all_required_inputs( + db_api, user_id=logged_user["id"], project_uuid=UUID(inserted_project["uuid"]), node_id=UUID("324d6ef2-a82c-414d-9001-dc84da1cbea3"), @@ -936,7 +940,8 @@ async def test_check_project_node_has_all_required_inputs_ok( db_api: ProjectDBAPI, inserted_project: dict, ): - await db_api.check_project_node_has_all_required_inputs( + await check_project_node_has_all_required_inputs( + db_api, user_id=logged_user["id"], project_uuid=UUID(inserted_project["uuid"]), node_id=UUID("324d6ef2-a82c-414d-9001-dc84da1cbea3"), From d8afcefbab336ac6c30df34aad9bce73b7655c12 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 3 Jun 2024 12:46:47 +0200 Subject: [PATCH 6/9] fix serialization --- .../src/simcore_service_webserver/projects/_nodes_handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py index cccabfa3775..6a7109799e0 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py @@ -107,7 +107,7 @@ async def wrapper(request: web.Request) -> web.StreamResponse: except ClustersKeeperNotAvailableError as exc: raise web.HTTPServiceUnavailable(reason=f"{exc}") from exc except ProjectNodeRequiredInputsNotSetError as exc: - raise web.HTTPConflict(reason=f"{exc}", text=f"{exc}") from exc + raise web.HTTPConflict(reason=f"{exc}") from exc return wrapper From 5e33864409f1aabf4c83dcdc22893f022e1c4d45 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 3 Jun 2024 12:48:57 +0200 Subject: [PATCH 7/9] making method private --- .../src/simcore_service_webserver/projects/projects_api.py | 4 ++-- .../web/server/tests/unit/with_dbs/03/test_project_db.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index 4a51284d190..ee9255890d7 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -448,7 +448,7 @@ def _by_type_name(ec2: EC2InstanceTypeGet) -> bool: raise ClustersKeeperNotAvailableError from exc -async def check_project_node_has_all_required_inputs( +async def _check_project_node_has_all_required_inputs( db: ProjectDBAPI, user_id: UserID, project_uuid: ProjectID, node_id: NodeID ) -> None: @@ -518,7 +518,7 @@ async def _start_dynamic_service( db: ProjectDBAPI = ProjectDBAPI.get_from_app_context(request.app) try: - await check_project_node_has_all_required_inputs( + await _check_project_node_has_all_required_inputs( db, user_id, project_uuid, node_uuid ) except ProjectNodeRequiredInputsNotSetError as e: diff --git a/services/web/server/tests/unit/with_dbs/03/test_project_db.py b/services/web/server/tests/unit/with_dbs/03/test_project_db.py index e86a8e13180..ebf46bee580 100644 --- a/services/web/server/tests/unit/with_dbs/03/test_project_db.py +++ b/services/web/server/tests/unit/with_dbs/03/test_project_db.py @@ -38,7 +38,7 @@ ) from simcore_service_webserver.projects.models import ProjectDict from simcore_service_webserver.projects.projects_api import ( - check_project_node_has_all_required_inputs, + _check_project_node_has_all_required_inputs, ) from simcore_service_webserver.users.exceptions import UserNotFoundError from simcore_service_webserver.utils import to_datetime @@ -914,7 +914,7 @@ async def test_check_project_node_has_all_required_inputs_raises( ): with pytest.raises(ProjectNodeRequiredInputsNotSetError) as exc: - await check_project_node_has_all_required_inputs( + await _check_project_node_has_all_required_inputs( db_api, user_id=logged_user["id"], project_uuid=UUID(inserted_project["uuid"]), @@ -940,7 +940,7 @@ async def test_check_project_node_has_all_required_inputs_ok( db_api: ProjectDBAPI, inserted_project: dict, ): - await check_project_node_has_all_required_inputs( + await _check_project_node_has_all_required_inputs( db_api, user_id=logged_user["id"], project_uuid=UUID(inserted_project["uuid"]), From 58312ab0b4956124615af582e7a3a6583aa46932 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 3 Jun 2024 14:18:17 +0200 Subject: [PATCH 8/9] expanded exception handling --- .../projects/exceptions.py | 38 +++++++++++++++++++ .../projects/projects_api.py | 17 +++++---- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/exceptions.py b/services/web/server/src/simcore_service_webserver/projects/exceptions.py index f6f277c91ee..1caf9f2770e 100644 --- a/services/web/server/src/simcore_service_webserver/projects/exceptions.py +++ b/services/web/server/src/simcore_service_webserver/projects/exceptions.py @@ -4,6 +4,7 @@ import redis.exceptions from models_library.projects import ProjectID +from models_library.projects_nodes_io import NodeID from models_library.users import UserID from ..errors import WebServerBaseError @@ -140,6 +141,43 @@ class ProjectNodeRequiredInputsNotSetError(BaseProjectError): ... +class ProjectNodeConnectionsMissingError(ProjectNodeRequiredInputsNotSetError): + msg_template = "Missing '{joined_unset_required_inputs}' connection(s) to '{node_with_required_inputs}'" + + def __init__( + self, + *, + unset_required_inputs: list[str], + node_with_required_inputs: NodeID, + **ctx, + ): + + joined_unset_required_inputs = ", ".join(unset_required_inputs) + ctx["joined_unset_required_inputs"] = joined_unset_required_inputs + super().__init__(**ctx) + self.unset_required_inputs = unset_required_inputs + self.node_with_required_inputs = node_with_required_inputs + + +class ProjectNodeOutputPortMissingValueError(ProjectNodeRequiredInputsNotSetError): + msg_template = "Missing: {joined_start_message}" + + def __init__( + self, + *, + unset_outputs_in_upstream: list[tuple[str, str]], + **ctx, + ): + start_messages = [ + f"'{input_key}' of '{service_name}'" + for input_key, service_name in unset_outputs_in_upstream + ] + joined_start_message = ", ".join(start_messages) + ctx["joined_start_message"] = joined_start_message + super().__init__(**ctx) + self.unset_outputs_in_upstream = unset_outputs_in_upstream + + class DefaultPricingUnitNotFoundError(BaseProjectError): msg_template = "Default pricing unit not found for node '{node_uuid}' in project '{project_uuid}'" diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index ee9255890d7..28655495bfb 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -124,6 +124,8 @@ NodeNotFoundError, ProjectInvalidRightsError, ProjectLockError, + ProjectNodeConnectionsMissingError, + ProjectNodeOutputPortMissingValueError, ProjectNodeRequiredInputsNotSetError, ProjectNodeResourcesInvalidError, ProjectOwnerNotFoundInTheProjectAccessRightsError, @@ -487,16 +489,15 @@ def _check_required_input(required_input_key: str) -> None: node_with_required_inputs = node.label if unset_required_inputs: - msg = f"Missing '{', '.join(unset_required_inputs)}' connection(s) to '{node_with_required_inputs}'" - raise ProjectNodeRequiredInputsNotSetError(msg) + raise ProjectNodeConnectionsMissingError( + unset_required_inputs=unset_required_inputs, + node_with_required_inputs=node_with_required_inputs, + ) if unset_outputs_in_upstream: - start_messages = [ - f"'{input_key}' of '{service_name}'" - for input_key, service_name in unset_outputs_in_upstream - ] - msg = f"Missing: {', '.join(start_messages)}" - raise ProjectNodeRequiredInputsNotSetError(msg) + raise ProjectNodeOutputPortMissingValueError( + unset_outputs_in_upstream=unset_outputs_in_upstream + ) async def _start_dynamic_service( From 1713f33ce821d68646f0904b3a61c80c93f04c26 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Mon, 3 Jun 2024 15:26:18 +0200 Subject: [PATCH 9/9] refactor --- .../projects/exceptions.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/services/web/server/src/simcore_service_webserver/projects/exceptions.py b/services/web/server/src/simcore_service_webserver/projects/exceptions.py index 1caf9f2770e..ecd60a58c39 100644 --- a/services/web/server/src/simcore_service_webserver/projects/exceptions.py +++ b/services/web/server/src/simcore_service_webserver/projects/exceptions.py @@ -151,10 +151,12 @@ def __init__( node_with_required_inputs: NodeID, **ctx, ): - - joined_unset_required_inputs = ", ".join(unset_required_inputs) - ctx["joined_unset_required_inputs"] = joined_unset_required_inputs - super().__init__(**ctx) + super().__init__( + joined_unset_required_inputs=", ".join(unset_required_inputs), + unset_required_inputs=unset_required_inputs, + node_with_required_inputs=node_with_required_inputs, + **ctx, + ) self.unset_required_inputs = unset_required_inputs self.node_with_required_inputs = node_with_required_inputs @@ -172,9 +174,11 @@ def __init__( f"'{input_key}' of '{service_name}'" for input_key, service_name in unset_outputs_in_upstream ] - joined_start_message = ", ".join(start_messages) - ctx["joined_start_message"] = joined_start_message - super().__init__(**ctx) + super().__init__( + joined_start_message=", ".join(start_messages), + unset_outputs_in_upstream=unset_outputs_in_upstream, + **ctx, + ) self.unset_outputs_in_upstream = unset_outputs_in_upstream