diff --git a/.circleci/main.yml b/.circleci/main.yml index a13300a78d..f936f9230d 100644 --- a/.circleci/main.yml +++ b/.circleci/main.yml @@ -48,7 +48,7 @@ commands: sudo apt-key adv --recv-keys --keyserver keyserver.ubuntu.com 78BD65473CB3BD13 curl -L https://packagecloud.io/circleci/trusty/gpgkey | sudo apt-key add - sudo apt-get update - sudo apt-get install git -y + sudo apt-get install git openssh-client -y git config --global user.email "CMI_CPAC_Support@childmind.org" git config --global user.name "Theodore (machine user) @ CircleCI" create-docker-test-container: @@ -64,11 +64,6 @@ commands: mkdir -p ~/project/test-results docker pull ${DOCKER_TAG} docker run -v /etc/passwd:/etc/passwd --user=$(id -u):c-pac -dit -P -e COVERAGE_FILE=<< parameters.coverage-file >> -v /home/circleci/project/test-results:/code/test-results -v /home/circleci:/home/circleci -v /home/circleci/project/CPAC/resources/configs/test_configs:/test_configs -v $PWD:/code -v $PWD/dev/circleci_data:$PWD/dev/circleci_data --workdir=/home/circleci/project --entrypoint=/bin/bash --name docker_test ${DOCKER_TAG} - get-sample-bids-data: - steps: - - run: - name: Getting Sample BIDS Data - command: git clone https://github.com/bids-standard/bids-examples.git get-singularity: parameters: version: @@ -231,7 +226,6 @@ jobs: - set-up-variant: variant: "<< parameters.variant >>" - set-python-version - - get-sample-bids-data - run-pytest-docker - store_test_results: path: test-results diff --git a/CHANGELOG.md b/CHANGELOG.md index be5ec4a432..b67477ffde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,12 +24,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Required positional parameter "wf" in input and output of `ingress_pipeconfig_paths` function, where a node to reorient templates is added to the `wf`. - Required positional parameter "orientation" to `resolve_resolution`. - Optional positional argument "cfg" to `create_lesion_preproc`. +- `resource_inventory` utility to inventory NodeBlock function inputs and outputs. ### Changed - Moved `pygraphviz` from requirements to `graphviz` optional dependencies group. - Automatically tag untagged `subject_id` and `unique_id` as `!!str` when loading data config files. - Made orientation configurable (was hard-coded as "RPI"). +- Resource-not-found errors now include information about where to source those resources. ### Fixed diff --git a/CPAC/_entrypoints/run.py b/CPAC/_entrypoints/run.py index 98a30ba094..f84b6cf799 100755 --- a/CPAC/_entrypoints/run.py +++ b/CPAC/_entrypoints/run.py @@ -795,7 +795,7 @@ def run_main(): args.data_config_file, args.participant_label, args.aws_input_creds ) sub_list = sub_list_filter_by_labels( - sub_list, {"T1w": args.T1w_label, "bold": args.bold_label} + list(sub_list), {"T1w": args.T1w_label, "bold": args.bold_label} ) # C-PAC only handles single anatomical images (for now) diff --git a/CPAC/anat_preproc/anat_preproc.py b/CPAC/anat_preproc/anat_preproc.py index a561f8e077..f4bd6f7049 100644 --- a/CPAC/anat_preproc/anat_preproc.py +++ b/CPAC/anat_preproc/anat_preproc.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright (C) 2012-2023 C-PAC Developers +# Copyright (C) 2012-2025 C-PAC Developers # This file is part of C-PAC. @@ -2572,7 +2572,7 @@ def brain_mask_acpc_niworkflows_ants_T2(wf, cfg, strat_pool, pipe_num, opt=None) config=["anatomical_preproc", "brain_extraction"], option_key="using", option_val="UNet", - inputs=["desc-preproc_T2w", "T1w-brain-template", "T1w-template", "unet_model"], + inputs=["desc-preproc_T2w", "T1w-brain-template", "T1w-template", "unet-model"], outputs=["space-T2w_desc-brain_mask"], ) def brain_mask_unet_T2(wf, cfg, strat_pool, pipe_num, opt=None): @@ -2586,7 +2586,7 @@ def brain_mask_unet_T2(wf, cfg, strat_pool, pipe_num, opt=None): config=["anatomical_preproc", "brain_extraction"], option_key="using", option_val="UNet", - inputs=["desc-preproc_T2w", "T1w-brain-template", "T1w-template", "unet_model"], + inputs=["desc-preproc_T2w", "T1w-brain-template", "T1w-template", "unet-model"], outputs=["space-T2w_desc-acpcbrain_mask"], ) def brain_mask_acpc_unet_T2(wf, cfg, strat_pool, pipe_num, opt=None): diff --git a/CPAC/conftest.py b/CPAC/conftest.py new file mode 100644 index 0000000000..7b765736ee --- /dev/null +++ b/CPAC/conftest.py @@ -0,0 +1,34 @@ +# Copyright (C) 2025 C-PAC Developers + +# This file is part of C-PAC. + +# C-PAC is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. + +# C-PAC is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +# License for more details. + +# You should have received a copy of the GNU Lesser General Public +# License along with C-PAC. If not, see . +"""Global fixtures for C-PAC tests.""" + +from pathlib import Path + +from _pytest.tmpdir import TempPathFactory +from git import Repo +import pytest + + +@pytest.fixture(scope="session") +def bids_examples(tmp_path_factory: TempPathFactory) -> Path: + """Get the BIDS examples dataset.""" + example_dir = tmp_path_factory.mktemp("bids-examples") + if not example_dir.exists() or not any(example_dir.iterdir()): + Repo.clone_from( + "https://github.com/bids-standard/bids-examples.git", str(example_dir) + ) + return example_dir diff --git a/CPAC/pipeline/engine.py b/CPAC/pipeline/engine.py index be1d0c0c17..878b743bfe 100644 --- a/CPAC/pipeline/engine.py +++ b/CPAC/pipeline/engine.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021-2024 C-PAC Developers +# Copyright (C) 2021-2025 C-PAC Developers # This file is part of C-PAC. @@ -17,6 +17,7 @@ import ast import copy import hashlib +from importlib.resources import files from itertools import chain import json import os @@ -24,6 +25,7 @@ from typing import Optional import warnings +import pandas as pd from nipype import config, logging from nipype.interfaces import afni from nipype.interfaces.utility import Rename @@ -418,10 +420,12 @@ def get( if report_fetched: return (None, None) return None + from CPAC.pipeline.resource_inventory import where_to_find + msg = ( "\n\n[!] C-PAC says: None of the listed resources are in " - f"the resource pool:\n\n {resource}\n\nOptions:\n- You " - "can enable a node block earlier in the pipeline which " + f"the resource pool:\n\n {where_to_find(resource)}\n\nOptions:\n" + "- You can enable a node block earlier in the pipeline which " "produces these resources. Check the 'outputs:' field in " "a node block's documentation.\n- You can directly " "provide this required data by pulling it from another " @@ -456,7 +460,9 @@ def copy_resource(self, resource, new_name): try: self.rpool[new_name] = self.rpool[resource] except KeyError: - msg = f"[!] {resource} not in the resource pool." + from CPAC.pipeline.resource_inventory import where_to_find + + msg = f"[!] Not in the resource pool:\n{where_to_find(resource)}" raise Exception(msg) def update_resource(self, resource, new_name): @@ -628,11 +634,13 @@ def get_strats(self, resources, debug=False): total_pool.append(sub_pool) if not total_pool: + from CPAC.pipeline.resource_inventory import where_to_find + raise LookupError( "\n\n[!] C-PAC says: None of the listed " "resources in the node block being connected " "exist in the resource pool.\n\nResources:\n" - "%s\n\n" % resource_list + "%s\n\n" % where_to_find(resource_list) ) # TODO: right now total_pool is: @@ -1007,6 +1015,19 @@ def post_process(self, wf, label, connection, json_info, pipe_idx, pipe_x, outs) for label_con_tpl in post_labels: label = label_con_tpl[0] connection = (label_con_tpl[1], label_con_tpl[2]) + if "desc-" not in label: + if "space-template" in label: + new_label = label.replace( + "space-template", "space-template_desc-zstd" + ) + else: + new_label = f"desc-zstd_{label}" + else: + for tag in label.split("_"): + if "desc-" in tag: + newtag = f"{tag}-zstd" + new_label = label.replace(tag, newtag) + break if label in Outputs.to_zstd: zstd = z_score_standardize(f"{label}_zstd_{pipe_x}", input_type) @@ -1015,20 +1036,6 @@ def post_process(self, wf, label, connection, json_info, pipe_idx, pipe_x, outs) node, out = self.get_data(mask, pipe_idx=mask_idx) wf.connect(node, out, zstd, "inputspec.mask") - if "desc-" not in label: - if "space-template" in label: - new_label = label.replace( - "space-template", "space-template_desc-zstd" - ) - else: - new_label = f"desc-zstd_{label}" - else: - for tag in label.split("_"): - if "desc-" in tag: - newtag = f"{tag}-zstd" - new_label = label.replace(tag, newtag) - break - post_labels.append((new_label, zstd, "outputspec.out_file")) self.set_data( @@ -2408,15 +2415,17 @@ def strip_template(data_label, dir_path, filename): return data_label, json +def template_dataframe() -> pd.DataFrame: + """Return the template dataframe.""" + template_csv = files("CPAC").joinpath("resources/cpac_templates.csv") + return pd.read_csv(str(template_csv), keep_default_na=False) + + def ingress_pipeconfig_paths(wf, cfg, rpool, unique_id, creds_path=None): # ingress config file paths # TODO: may want to change the resource keys for each to include one level up in the YAML as well - import pandas as pd - import pkg_resources as p - - template_csv = p.resource_filename("CPAC", "resources/cpac_templates.csv") - template_df = pd.read_csv(template_csv, keep_default_na=False) + template_df = template_dataframe() desired_orientation = cfg.pipeline_setup["desired_orientation"] for row in template_df.itertuples(): diff --git a/CPAC/pipeline/resource_inventory.py b/CPAC/pipeline/resource_inventory.py new file mode 100755 index 0000000000..a181ea6567 --- /dev/null +++ b/CPAC/pipeline/resource_inventory.py @@ -0,0 +1,670 @@ +#!/usr/bin/env python +# Copyright (C) 2025 C-PAC Developers + +# This file is part of C-PAC. + +# C-PAC is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. + +# C-PAC is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +# License for more details. + +# You should have received a copy of the GNU Lesser General Public +# License along with C-PAC. If not, see . +"""Inspect inputs and outputs for NodeBlockFunctions.""" + +from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace +import ast +from collections.abc import Hashable +from dataclasses import dataclass, field +import importlib +from importlib.resources import files +import inspect +from itertools import chain, product +import os +from pathlib import Path +import re +from typing import Any, cast, Iterable, Optional +from unittest.mock import patch + +from traits.trait_errors import TraitError +import yaml + +from CPAC.pipeline.engine import template_dataframe +from CPAC.pipeline.nodeblock import NodeBlockFunction +from CPAC.pipeline.schema import latest_schema +from CPAC.utils.monitoring import UTLOGGER +from CPAC.utils.outputs import Outputs + +ONE_OFFS: dict[str, list[str]] = { + r".*desc-preproc_bold": ["func_ingress"], + r".*-sm.*": [ + f"spatial_smoothing_{smooth_opt}" + for smooth_opt in latest_schema.schema["post_processing"]["spatial_smoothing"][ + "smoothing_method" + ][0].container + ], + r".*-zstd.*": [f"{fisher}zscore_standardize" for fisher in ["", "fisher_"]], +} +"""A few out-of-nodeblock generated resources. + +Easier to note these manually than to code up the AST rules.""" + +SKIPS: list[str] = [ + "CPAC.unet.__init__", + "CPAC.unet._torch", +] +"""No nodeblock functions in these modules that dynamically install `torch`.""" + + +def import_nodeblock_functions( + package_name: str, exclude: Optional[list[str]] = None +) -> list[NodeBlockFunction]: + """ + Import all functions with the @nodeblock decorator from all modules and submodules in a package. + + Parameters + ---------- + package_name + The name of the package to import from. + + exclude + A list of module names to exclude from the import. + """ + if exclude is None: + exclude = [] + functions: list[NodeBlockFunction] = [] + package = importlib.import_module(package_name) + package_path = package.__path__[0] # Path to the package directory + + for root, _, package_files in os.walk(package_path): + for file in package_files: + if file.endswith(".py") and file != "__init__.py": + # Get the module path + rel_path = os.path.relpath(os.path.join(root, file), package_path) + module_name = f"{package_name}.{rel_path[:-3].replace(os.sep, '.')}" + if module_name in exclude: + continue + + # Import the module + try: + with patch.dict( + "sys.modules", {exclusion: None for exclusion in exclude} + ): + module = importlib.import_module(module_name) + except (ImportError, TraitError, ValueError) as e: + UTLOGGER.debug(f"Failed to import {module_name}: {e}") + continue + # Extract nodeblock-decorated functions from the module + for _name, obj in inspect.getmembers( + module, predicate=lambda obj: isinstance(obj, NodeBlockFunction) + ): + functions.append(obj) + + return functions + + +@dataclass +class ResourceSourceList: + """A list of resource sources without duplicates.""" + + sources: list[str] = field(default_factory=list) + + def __add__(self, other: "str | list[str] | ResourceSourceList") -> list[str]: + """Add a list of sources to the list.""" + if isinstance(other, str): + if not other or other == "created_before_this_test": + # dummy node in a testing function, no need to include in inventory + return list(self) + other = [other] + new_set = {*self.sources, *other} + return sorted(new_set, key=str.casefold) + + def __contains__(self, item: str) -> bool: + """Check if a source is in the list.""" + return item in self.sources + + def __delitem__(self, key: int) -> None: + """Delete a source by index.""" + del self.sources[key] + + def __eq__(self, value: Any) -> bool: + """Check if the lists of sources are the same.""" + return set(self) == set(value) + + def __getitem__(self, item: int) -> str: + """Get a source by index.""" + return self.sources[item] + + def __hash__(self) -> int: + """Get the hash of the list of sources.""" + return hash(self.sources) + + def __iadd__( + self, other: "str | list[str] | ResourceSourceList" + ) -> "ResourceSourceList": + """Add a list of sources to the list.""" + self.sources = self + other + return self + + def __iter__(self): + """Iterate over the sources.""" + return iter(self.sources) + + def __len__(self) -> int: + """Get the number of sources.""" + return len(self.sources) + + def __repr__(self) -> str: + """Get the reproducable string representation of the sources.""" + return f"ResourceSourceList({(self.sources)})" + + def __reversed__(self) -> list[str]: + """Get the sources reversed.""" + return list(reversed(self.sources)) + + def __setitem__(self, key: int, value: str) -> None: + """Set a source by index.""" + self.sources[key] = value + + def __sorted__(self) -> list[str]: + """Get the sources sorted.""" + return sorted(self.sources, key=str.casefold) + + def __str__(self) -> str: + """Get the string representation of the sources.""" + return str(self.sources) + + +@dataclass +class ResourceIO: + """NodeBlockFunctions that use a resource for IO.""" + + name: str + """The name of the resource.""" + output_from: ResourceSourceList | list[str] = field( + default_factory=ResourceSourceList + ) + """The functions that output the resource.""" + output_to: ResourceSourceList | list[str] = field( + default_factory=ResourceSourceList + ) + """The subdirectory the resource is output to.""" + input_for: ResourceSourceList | list[str] = field( + default_factory=ResourceSourceList + ) + """The functions that use the resource as input.""" + + def __post_init__(self) -> None: + """Handle optionals.""" + if isinstance(self.output_from, list): + self.output_from = ResourceSourceList(self.output_from) + if isinstance(self.output_to, list): + self.output_to = ResourceSourceList(self.output_to) + if isinstance(self.input_for, list): + self.input_for = ResourceSourceList(self.input_for) + + def __str__(self) -> str: + """Return string representation for ResourceIO instance.""" + return f"{{{self.name}: {{'input_for': {self.input_for!s}, 'output_from': {self.output_from!s}}}}})" + + def as_dict(self) -> dict[str, list[str]]: + """Return the ResourceIO as a built-in dictionary type.""" + return { + k: v + for k, v in { + "input_for": [str(source) for source in self.input_for], + "output_from": [str(source) for source in self.output_from], + "output_to": [str(source) for source in self.output_to], + }.items() + if v + } + + +def cli_parser() -> Namespace: + """Parse command line argument.""" + parser = ArgumentParser( + description="Inventory resources for C-PAC NodeBlockFunctions.", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "-o", + "--output", + nargs="?", + help="The output file to write the inventory to.", + type=Path, + default=Path("resource_inventory.yaml"), + ) + return parser.parse_args() + + +def _flatten_io(io: Iterable[Iterable]) -> list[str]: + """Given a list of strings or iterables thereof, flatten the list to all strings.""" + if all(isinstance(resource, str) for resource in io): + return cast(list[str], io) + while not all(isinstance(resource, str) for resource in io): + io = list( + chain.from_iterable( + [ + resource if not isinstance(resource, str) else [resource] + for resource in io + ] + ) + ) + return cast(list[str], io) + + +class MultipleContext(list): + """Subclass of list to store multilpe contexts.""" + + def __init__(self, /, *args, **kwargs) -> None: + """Initialize MultipleContext.""" + super().__init__(*args, **kwargs) + data = self._unique(self) + self.clear() + self.extend(data) + + def __hash__(self) -> int: + """Hash a MultipleContext instance.""" + return hash(str(self)) + + def __str__(self) -> str: + """Return a stringified MultipleContext instance.""" + if len(self) == 1: + return str(self[0]) + return super().__str__() + + def append(self, item: Any) -> None: + """Append if not already included.""" + if item not in self: + super().append(item) + + def extend(self, iterable: Iterable) -> None: + """Extend MultipleContext.""" + for item in iterable: + self.append(item) + + @staticmethod + def _unique(iterable: Iterable) -> list: + """Dedupe.""" + try: + seen = set() + return [x for x in iterable if not (x in seen or seen.add(x))] + except TypeError: + seen = set() + return [ + x + for x in (MultipleContext(item) for item in iterable) + if not (x in seen or seen.add(x)) + ] + + +class DirectlySetResources(ast.NodeVisitor): + """Class to track resources set directly, rather than through NodeBlocks.""" + + def __init__(self) -> None: + """Initialize the visitor.""" + super().__init__() + self._context: dict[str, Any] = {} + self.dynamic_resources: dict[str, ResourceSourceList] = { + resource: ResourceSourceList(sources) + for resource, sources in ONE_OFFS.items() + } + self._history: dict[str, list[Any]] = {} + self.resources: dict[str, ResourceSourceList] = {} + + def assign_resource(self, resource: str, value: str | MultipleContext) -> None: + """Assign a value to a resource.""" + if isinstance(resource, ast.AST): + resource = self.parse_ast(resource) + resource = str(resource) + if isinstance(value, MultipleContext): + for subvalue in value: + self.assign_resource(resource, subvalue) + return + target = ( + self.dynamic_resources + if r".*" in value or r".*" in resource + else self.resources + ) + if resource not in target: + target[resource] = ResourceSourceList() + target[resource] += value + + @property + def context(self) -> dict[str, Any]: + """Return the context.""" + return self._context + + @context.setter + def context(self, value: tuple[Iterable, Any]) -> None: + """Set the context.""" + key, _value = value + if not isinstance(key, str): + for subkey in key: + self.context = subkey, _value + else: + self._context[key] = _value + if key not in self._history: + self._history[key] = [".*"] + self._history[key].append(_value) + + def lookup_context( + self, variable: str, return_type: Optional[type] = None + ) -> str | MultipleContext: + """Plug in variable.""" + if variable in self.context: + if self.context[variable] == variable or ( + return_type and not isinstance(self.context[variable], return_type) + ): + history = list(self._history[variable]) + while history and history[-1] == variable: + history.pop() + if history: + context = history[-1] + while ( + return_type + and len(history) + and not isinstance(context, return_type) + ): + context = history.pop() + if return_type and not isinstance(context, return_type): + return ".*" + return context + return self.context[variable] + return ".*" + + @staticmethod + def handle_multiple_contexts(contexts: list[str | list[str]]) -> list[str]: + """Parse multiple contexts.""" + if isinstance(contexts, list): + return MultipleContext( + [ + "".join(list(ctx)) + for ctx in product( + *[ + context if isinstance(context, list) else [context] + for context in contexts + ] + ) + ] + ) + return contexts + + def parse_ast(self, node: Any) -> Any: + """Parse AST.""" + if not isinstance(node, ast.AST): + if isinstance(node, str) or not isinstance(node, Iterable): + return str(node) + if isinstance(node, ast.Dict): + return { + self.parse_ast(key): self.parse_ast(value) + for key, value in dict(zip(node.keys, node.values)).items() + } + if isinstance(node, (MultipleContext, list, set, tuple)): + return type(node)(self.parse_ast(subnode) for subnode in node) + if isinstance(node, ast.FormattedValue): + if hasattr(node, "value") and hasattr(node.value, "id"): + return self.lookup_context(getattr(node.value, "id")) + if isinstance(node, ast.JoinedStr): + node_values = [self.parse_ast(value) for value in node.values] + if any(isinstance(value, MultipleContext) for value in node_values): + return self.handle_multiple_contexts(node_values) + return "".join(str(item) for item in node_values) + if isinstance(node, ast.Dict): + return { + self.parse_ast(key) + if isinstance(self.parse_ast(key), Hashable) + else ".*": self.parse_ast(value) + for key, value in dict(zip(node.keys, node.values)).items() + } + if not isinstance(node, ast.Call): + for attr in ["values", "elts", "args"]: + if hasattr(node, attr): + iterable = getattr(node, attr) + if isinstance(iterable, Iterable): + return [ + self.parse_ast(subnode) for subnode in getattr(node, attr) + ] + return self.parse_ast(iterable) + for attr in ["value", "id", "arg"]: + if hasattr(node, attr): + return self.parse_ast(getattr(node, attr)) + elif ( + hasattr(node, "func") + and getattr(node.func, "attr", None) in ["items", "keys", "values"] + and getattr(getattr(node.func, "value", None), "id", None) in self.context + ): + context = self.lookup_context(node.func.value.id, return_type=dict) + if isinstance(context, dict): + return MultipleContext(getattr(context, node.func.attr)()) + return r".*" # wildcard for regex matching + + def visit_Assign(self, node: ast.Assign) -> None: + """Visit an assignment.""" + value = self.parse_ast(node.value) + if value == "row" and getattr(node.value, "attr", None): + # hack for template dataframe + value = MultipleContext(getattr(template_dataframe(), node.value.attr)) + for target in node.targets: + resource = self.parse_ast(target) + self.context = resource, value + self.generic_visit(node) + + def visit_Call(self, node: ast.Call) -> None: + """Visit a function call.""" + if isinstance(node.func, ast.Attribute) and node.func.attr == "set_data": + value = self.parse_ast(node.args[5]) + if isinstance(node.args[5], ast.Name): + if isinstance(value, str): + value = self.lookup_context(value) + if hasattr(node.args[0], "value"): + resource: str = getattr(node.args[0], "value") + elif hasattr(node.args[0], "id"): + resource = self.lookup_context(getattr(node.args[0], "id")) + if isinstance(resource, MultipleContext): + if len(resource) == len(value): + for k, v in zip(resource, value): + self.assign_resource(k, v) + else: + for resource_context in resource: + self.assign_resource(resource_context, value) + self.generic_visit(node) + return + elif isinstance(node.args[0], ast.JoinedStr): + resource = self.parse_ast(node.args[0]) + else: + self.generic_visit(node) + return + self.assign_resource(resource, value) + self.generic_visit(node) + + def visit_For(self, node: ast.For) -> None: + """Vist for loop.""" + target = self.parse_ast(node.target) + if ( + hasattr(node.iter, "func") + and hasattr(node.iter.func, "value") + and hasattr(node.iter.func.value, "id") + ): + context = self.parse_ast(node.iter) + if not context: + context = r".*" + if isinstance(target, list): + target_len = len(target) + if isinstance(context, dict): + self.context = target[0], MultipleContext(context.keys()) + if isinstance(context, list) and all( + (isinstance(item, tuple) and len(item) == target_len) + for item in context + ): + for index, item in enumerate(target): + self.context = ( + item, + MultipleContext( + subcontext[index] for subcontext in context + ), + ) + elif hasattr(node.iter, "value") and ( + getattr(node.iter.value, "id", None) == "self" + or getattr(node.iter, "attr", False) + ): + self.context = target, ".*" + else: + self.context = target, self.parse_ast(node.iter) + self.generic_visit(node) + + def visit_FunctionDef(self, node: ast.FunctionDef) -> None: + """Visit a function definition.""" + if node.name == "set_data": + # skip the method definition + return + for arg in self.parse_ast(node): + self.context = arg, ".*" + self.generic_visit(node) + + +def find_directly_set_resources( + package_name: str, +) -> tuple[dict[str, ResourceSourceList], dict[str, ResourceSourceList]]: + """Find all resources set explicitly via :pyy:method:`~CPAC.pipeline.engine.ResourcePool.set_data`. + + Parameters + ---------- + package_name + The name of the package to search for resources. + + Returns + ------- + dict + A dictionary containing the name of the resource and the name of the functions that set it. + + dict + A dictionary containing regex strings for special cases + """ + resources: dict[str, ResourceSourceList] = {} + dynamic_resources: dict[str, ResourceSourceList] = {} + for dirpath, _, filenames in os.walk(str(files(package_name))): + for filename in filenames: + if filename.endswith(".py"): + filepath = os.path.join(dirpath, filename) + with open(filepath, "r", encoding="utf-8") as file: + tree = ast.parse(file.read(), filename=filepath) + directly_set = DirectlySetResources() + directly_set.visit(tree) + for resource in directly_set.resources: + if resource not in resources: + resources[resource] = ResourceSourceList() + resources[resource] += directly_set.resources[resource] + for resource in directly_set.dynamic_resources: + if resource not in dynamic_resources: + dynamic_resources[resource] = ResourceSourceList() + dynamic_resources[resource] += directly_set.dynamic_resources[ + resource + ] + return resources, dynamic_resources + + +def resource_inventory(package: str = "CPAC") -> dict[str, ResourceIO]: + """Gather all inputs and outputs for a list of NodeBlockFunctions.""" + resources: dict[str, ResourceIO] = {} + # Node block function inputs and outputs + for nbf in import_nodeblock_functions( + package, + exclude=SKIPS, + ): + nbf_name = f"{nbf.name} ({nbf.__module__}.{nbf.__qualname__})" + if hasattr(nbf, "inputs"): + for nbf_input in _flatten_io(cast(list[Iterable], nbf.inputs)): + if nbf_input: + if nbf_input not in resources: + resources[nbf_input] = ResourceIO( + nbf_input, input_for=[nbf_name] + ) + else: + resources[nbf_input].input_for += nbf_name + if hasattr(nbf, "outputs"): + for nbf_output in _flatten_io(cast(list[Iterable], nbf.outputs)): + if nbf_output: + if nbf_output not in resources: + resources[nbf_output] = ResourceIO( + nbf_output, output_from=[nbf_name] + ) + else: + resources[nbf_output].output_from += nbf_name + # Template resources set from pipeline config + templates_from_config_df = template_dataframe() + for _, row in templates_from_config_df.iterrows(): + output_from = f"pipeline configuration: {row.Pipeline_Config_Entry}" + if row.Key not in resources: + resources[row.Key] = ResourceIO(row.Key, output_from=[output_from]) + else: + resources[row.Key].output_from += output_from + # Hard-coded resources + direct, dynamic = find_directly_set_resources(package) + for resource, functions in direct.items(): + if resource not in resources: + resources[resource] = ResourceIO(resource, output_from=functions) + else: + resources[resource].output_from += functions + # Outputs + for _, row in Outputs.reference.iterrows(): + if row.Resource not in resources: + resources[row.Resource] = ResourceIO( + row.Resource, output_to=[row["Sub-Directory"]] + ) + else: + resources[row.Resource].output_to += row["Sub-Directory"] + # Special cases + for dynamic_key, dynamic_value in dynamic.items(): + if dynamic_key != r".*": + dynamic_resource = re.compile(dynamic_key) + for resource in resources.keys(): + if dynamic_resource.search(resource): + resources[resource].output_from += dynamic_value + if "interface" in resources: + # this is a loop in setting up nodeblocks + # https://github.com/FCP-INDI/C-PAC/blob/61ad414447023daf0e401a81c92267b09c64ed94/CPAC/pipeline/engine.py#L1453-L1464 + # it's already handled in the NodeBlock resources + del resources["interface"] + return dict(sorted(resources.items(), key=lambda item: item[0].casefold())) + + +def dump_inventory_to_yaml(inventory: dict[str, ResourceIO]) -> str: + """Dump NodeBlock Interfaces to a YAML string.""" + return yaml.dump( + {key: value.as_dict() for key, value in inventory.items()}, sort_keys=False + ) + + +def where_to_find(resources: list[str] | str) -> str: + """Return a multiline string describing where each listed resource is output from.""" + if isinstance(resources, str): + resources = [resources] + resources = _flatten_io(resources) + inventory = resource_inventory("CPAC") + output = "" + for resource in resources: + output += f"'{resource}' can be output from:\n" + if resource in inventory: + for source in inventory[resource].output_from: + output += f" {source}\n" + else: + output += " !! Nowhere !!\n" + output += "\n" + return output.rstrip() + + +def main() -> None: + """Save the NodeBlock inventory to a file.""" + args = cli_parser() + with args.output.open("w") as file: + file.write(dump_inventory_to_yaml(resource_inventory("CPAC"))) + + +if __name__ == "__main__": + main() diff --git a/CPAC/pipeline/test/test_engine.py b/CPAC/pipeline/test/test_engine.py index cf85f50dbe..25b16d9e44 100644 --- a/CPAC/pipeline/test/test_engine.py +++ b/CPAC/pipeline/test/test_engine.py @@ -1,5 +1,27 @@ +# Copyright (C) 2021-2025 C-PAC Developers + +# This file is part of C-PAC. + +# C-PAC is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. + +# C-PAC is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +# License for more details. + +# You should have received a copy of the GNU Lesser General Public +# License along with C-PAC. If not, see . +"""Unit tests for the C-PAC pipeline engine.""" + +from argparse import Namespace import os +from pathlib import Path +from typing import cast +from _pytest.logging import LogCaptureFixture import pytest from CPAC.pipeline.cpac_pipeline import ( @@ -138,17 +160,129 @@ def test_build_workflow(pipe_config, bids_dir, test_dir): wf.run() +def test_missing_resource( + bids_examples: Path, caplog: LogCaptureFixture, tmp_path: Path +) -> None: + """Test the error message thrown when a resource is missing.""" + from datetime import datetime + + import yaml + + from CPAC.pipeline.cpac_runner import run + from CPAC.utils.bids_utils import sub_list_filter_by_labels + from CPAC.utils.configuration import Preconfiguration, set_subject + from CPAC.utils.configuration.yaml_template import create_yaml_from_template + + st = datetime.now().strftime("%Y-%m-%dT%H-%M-%SZ") + namespace = Namespace( + bids_dir=str(bids_examples / "ds113b"), + output_dir=str(tmp_path / "output"), + analysis_level="test_config", + participant_label="sub-01", + ) + c = Preconfiguration("anat-only") + c["pipeline_setup", "output_directory", "path"] = namespace.output_dir + c["pipeline_setup", "log_directory", "path"] = str(tmp_path / "logs") + c["pipeline_setup", "working_directory", "path"] = str(tmp_path / "work") + c["pipeline_setup", "system_config", "maximum_memory_per_participant"] = 1.0 + c["pipeline_setup", "system_config", "max_cores_per_participant"] = 1 + c["pipeline_setup", "system_config", "num_participants_at_once"] = 1 + c["pipeline_setup", "system_config", "num_ants_threads"] = 1 + c["pipeline_setup", "working_directory", "remove_working_dir"] = True + sub_list = create_cpac_data_config( + namespace.bids_dir, + namespace.participant_label, + None, + True, + only_one_anat=False, + ) + sub_list = sub_list_filter_by_labels(list(sub_list), {"T1w": None, "bold": None}) + for i, sub in enumerate(sub_list): + if isinstance(sub.get("anat"), dict): + for anat_key in sub["anat"]: + if isinstance(sub["anat"][anat_key], list) and len( + sub["anat"][anat_key] + ): + sub_list[i]["anat"][anat_key] = sub["anat"][anat_key][0] + if isinstance(sub.get("anat"), list) and len(sub["anat"]): + sub_list[i]["anat"] = sub["anat"][0] + data_config_file = f"cpac_data_config_{st}.yml" + sublogdirs = [set_subject(sub, c)[2] for sub in sub_list] + # write out the data configuration file + data_config_file = os.path.join(sublogdirs[0], data_config_file) + with open(data_config_file, "w", encoding="utf-8") as _f: + noalias_dumper = yaml.dumper.SafeDumper + noalias_dumper.ignore_aliases = lambda self, data: True + yaml.dump(sub_list, _f, default_flow_style=False, Dumper=noalias_dumper) + + # update and write out pipeline config file + pipeline_config_file = os.path.join(sublogdirs[0], f"cpac_pipeline_config_{st}.yml") + with open(pipeline_config_file, "w", encoding="utf-8") as _f: + _f.write(create_yaml_from_template(c)) + minimized_config = f"{pipeline_config_file[:-4]}_min.yml" + with open(minimized_config, "w", encoding="utf-8") as _f: + _f.write(create_yaml_from_template(c, import_from="blank")) + for config_file in (data_config_file, pipeline_config_file, minimized_config): + os.chmod(config_file, 0o444) # Make config files readonly + + if len(sublogdirs) > 1: + # If more than one run is included in the given data config + # file, an identical copy of the data and pipeline config + # will be included in the log directory for each run + for sublogdir in sublogdirs[1:]: + for config_file in ( + data_config_file, + pipeline_config_file, + minimized_config, + ): + try: + os.link(config_file, config_file.replace(sublogdirs[0], sublogdir)) + except FileExistsError: + pass + + run( + data_config_file, + pipeline_config_file, + plugin="Linear", + plugin_args={ + "n_procs": int( + cast( + int | str, + c["pipeline_setup", "system_config", "max_cores_per_participant"], + ) + ), + "memory_gb": int( + cast( + int | str, + c[ + "pipeline_setup", + "system_config", + "maximum_memory_per_participant", + ], + ) + ), + "raise_insufficient": c[ + "pipeline_setup", "system_config", "raise_insufficient" + ], + }, + tracking=False, + test_config=namespace.analysis_level == "test_config", + ) + + assert "can be output from" in caplog.text + + # bids_dir = "/Users/steven.giavasis/data/HBN-SI_dataset/rawdata" # test_dir = "/test_dir" # cfg = "/Users/hecheng.jin/GitHub/DevBranch/CPAC/resources/configs/pipeline_config_monkey-ABCD.yml" -cfg = "/Users/hecheng.jin/GitHub/pipeline_config_monkey-ABCDlocal.yml" -bids_dir = "/Users/hecheng.jin/Monkey/monkey_data_oxford/site-ucdavis" -test_dir = "/Users/hecheng.jin/GitHub/Test/T2preproc" # test_ingress_func_raw_data(cfg, bids_dir, test_dir) # test_ingress_anat_raw_data(cfg, bids_dir, test_dir) # test_ingress_pipeconfig_data(cfg, bids_dir, test_dir) # test_build_anat_preproc_stack(cfg, bids_dir, test_dir) if __name__ == "__main__": + cfg = "/Users/hecheng.jin/GitHub/pipeline_config_monkey-ABCDlocal.yml" + bids_dir = "/Users/hecheng.jin/Monkey/monkey_data_oxford/site-ucdavis" + test_dir = "/Users/hecheng.jin/GitHub/Test/T2preproc" test_build_workflow(cfg, bids_dir, test_dir) diff --git a/CPAC/utils/outputs.py b/CPAC/utils/outputs.py index 11b81eb60f..451d893987 100644 --- a/CPAC/utils/outputs.py +++ b/CPAC/utils/outputs.py @@ -1,10 +1,30 @@ +# Copyright (C) 2018-2025 C-PAC Developers + +# This file is part of C-PAC. + +# C-PAC is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. + +# C-PAC is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +# License for more details. + +# You should have received a copy of the GNU Lesser General Public +# License along with C-PAC. If not, see . +"""Specify the resources that C-PAC writes to the output direcotry.""" + +from importlib.resources import files + import pandas as pd -import pkg_resources as p class Outputs: - # Settle some things about the resource pool reference and the output directory - reference_csv = p.resource_filename("CPAC", "resources/cpac_outputs.tsv") + """Settle some things about the resource pool reference and the output directory.""" + + reference_csv = str(files("CPAC").joinpath("resources/cpac_outputs.tsv")) try: reference = pd.read_csv(reference_csv, delimiter="\t", keep_default_na=False) diff --git a/dev/circleci_data/conftest.py b/dev/circleci_data/conftest.py new file mode 100644 index 0000000000..ba239b2b4f --- /dev/null +++ b/dev/circleci_data/conftest.py @@ -0,0 +1,19 @@ +# Copyright (C) 2025 C-PAC Developers + +# This file is part of C-PAC. + +# C-PAC is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the +# Free Software Foundation, either version 3 of the License, or (at your +# option) any later version. + +# C-PAC is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +# License for more details. + +# You should have received a copy of the GNU Lesser General Public +# License along with C-PAC. If not, see . +"""Global fixtures for C-PAC tests.""" + +from CPAC.conftest import * # noqa: F403 diff --git a/dev/circleci_data/test_external_utils.py b/dev/circleci_data/test_external_utils.py index f516b0c903..31f6b243da 100644 --- a/dev/circleci_data/test_external_utils.py +++ b/dev/circleci_data/test_external_utils.py @@ -31,8 +31,6 @@ from CPAC.__main__ import utils as CPAC_main_utils # noqa: E402 -# pylint: disable=wrong-import-position - def _click_backport(command, key): """Switch back to underscores for older versions of click.""" @@ -93,18 +91,11 @@ def test_build_data_config(caplog, cli_runner, multiword_connector): _delete_test_yaml(test_yaml) -def test_new_settings_template(caplog, cli_runner): +def test_new_settings_template(bids_examples: Path, caplog, cli_runner): """Test CLI ``utils new-settings-template``.""" caplog.set_level(INFO) os.chdir(CPAC_DIR) - - example_dir = os.path.join(CPAC_DIR, "bids-examples") - if not os.path.exists(example_dir): - from git import Repo - - Repo.clone_from( - "https://github.com/bids-standard/bids-examples.git", example_dir - ) + assert bids_examples.exists() result = cli_runner.invoke( CPAC_main_utils.commands[ diff --git a/setup.py b/setup.py index 17919395d2..f22a744e2d 100755 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -# Copyright (C) 2022-2024 C-PAC Developers +# Copyright (C) 2022-2025 C-PAC Developers # This file is part of C-PAC. @@ -84,7 +84,12 @@ def main(**extra_args): extras_require={"graphviz": ["pygraphviz"]}, configuration=configuration, scripts=glob("scripts/*"), - entry_points={"console_scripts": ["cpac = CPAC.__main__:main"]}, + entry_points={ + "console_scripts": [ + "cpac = CPAC.__main__:main", + "resource_inventory = CPAC.pipeline.resource_inventory:main", + ] + }, package_data={ "CPAC": [ "test_data/*",