diff --git a/.circleci/main.yml b/.circleci/main.yml
index a13300a78d..f936f9230d 100644
--- a/.circleci/main.yml
+++ b/.circleci/main.yml
@@ -48,7 +48,7 @@ commands:
sudo apt-key adv --recv-keys --keyserver keyserver.ubuntu.com 78BD65473CB3BD13
curl -L https://packagecloud.io/circleci/trusty/gpgkey | sudo apt-key add -
sudo apt-get update
- sudo apt-get install git -y
+ sudo apt-get install git openssh-client -y
git config --global user.email "CMI_CPAC_Support@childmind.org"
git config --global user.name "Theodore (machine user) @ CircleCI"
create-docker-test-container:
@@ -64,11 +64,6 @@ commands:
mkdir -p ~/project/test-results
docker pull ${DOCKER_TAG}
docker run -v /etc/passwd:/etc/passwd --user=$(id -u):c-pac -dit -P -e COVERAGE_FILE=<< parameters.coverage-file >> -v /home/circleci/project/test-results:/code/test-results -v /home/circleci:/home/circleci -v /home/circleci/project/CPAC/resources/configs/test_configs:/test_configs -v $PWD:/code -v $PWD/dev/circleci_data:$PWD/dev/circleci_data --workdir=/home/circleci/project --entrypoint=/bin/bash --name docker_test ${DOCKER_TAG}
- get-sample-bids-data:
- steps:
- - run:
- name: Getting Sample BIDS Data
- command: git clone https://github.com/bids-standard/bids-examples.git
get-singularity:
parameters:
version:
@@ -231,7 +226,6 @@ jobs:
- set-up-variant:
variant: "<< parameters.variant >>"
- set-python-version
- - get-sample-bids-data
- run-pytest-docker
- store_test_results:
path: test-results
diff --git a/CHANGELOG.md b/CHANGELOG.md
index be5ec4a432..b67477ffde 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -24,12 +24,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Required positional parameter "wf" in input and output of `ingress_pipeconfig_paths` function, where a node to reorient templates is added to the `wf`.
- Required positional parameter "orientation" to `resolve_resolution`.
- Optional positional argument "cfg" to `create_lesion_preproc`.
+- `resource_inventory` utility to inventory NodeBlock function inputs and outputs.
### Changed
- Moved `pygraphviz` from requirements to `graphviz` optional dependencies group.
- Automatically tag untagged `subject_id` and `unique_id` as `!!str` when loading data config files.
- Made orientation configurable (was hard-coded as "RPI").
+- Resource-not-found errors now include information about where to source those resources.
### Fixed
diff --git a/CPAC/_entrypoints/run.py b/CPAC/_entrypoints/run.py
index 98a30ba094..f84b6cf799 100755
--- a/CPAC/_entrypoints/run.py
+++ b/CPAC/_entrypoints/run.py
@@ -795,7 +795,7 @@ def run_main():
args.data_config_file, args.participant_label, args.aws_input_creds
)
sub_list = sub_list_filter_by_labels(
- sub_list, {"T1w": args.T1w_label, "bold": args.bold_label}
+ list(sub_list), {"T1w": args.T1w_label, "bold": args.bold_label}
)
# C-PAC only handles single anatomical images (for now)
diff --git a/CPAC/anat_preproc/anat_preproc.py b/CPAC/anat_preproc/anat_preproc.py
index a561f8e077..f4bd6f7049 100644
--- a/CPAC/anat_preproc/anat_preproc.py
+++ b/CPAC/anat_preproc/anat_preproc.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright (C) 2012-2023 C-PAC Developers
+# Copyright (C) 2012-2025 C-PAC Developers
# This file is part of C-PAC.
@@ -2572,7 +2572,7 @@ def brain_mask_acpc_niworkflows_ants_T2(wf, cfg, strat_pool, pipe_num, opt=None)
config=["anatomical_preproc", "brain_extraction"],
option_key="using",
option_val="UNet",
- inputs=["desc-preproc_T2w", "T1w-brain-template", "T1w-template", "unet_model"],
+ inputs=["desc-preproc_T2w", "T1w-brain-template", "T1w-template", "unet-model"],
outputs=["space-T2w_desc-brain_mask"],
)
def brain_mask_unet_T2(wf, cfg, strat_pool, pipe_num, opt=None):
@@ -2586,7 +2586,7 @@ def brain_mask_unet_T2(wf, cfg, strat_pool, pipe_num, opt=None):
config=["anatomical_preproc", "brain_extraction"],
option_key="using",
option_val="UNet",
- inputs=["desc-preproc_T2w", "T1w-brain-template", "T1w-template", "unet_model"],
+ inputs=["desc-preproc_T2w", "T1w-brain-template", "T1w-template", "unet-model"],
outputs=["space-T2w_desc-acpcbrain_mask"],
)
def brain_mask_acpc_unet_T2(wf, cfg, strat_pool, pipe_num, opt=None):
diff --git a/CPAC/conftest.py b/CPAC/conftest.py
new file mode 100644
index 0000000000..7b765736ee
--- /dev/null
+++ b/CPAC/conftest.py
@@ -0,0 +1,34 @@
+# Copyright (C) 2025 C-PAC Developers
+
+# This file is part of C-PAC.
+
+# C-PAC is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the
+# Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+
+# C-PAC is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+
+# You should have received a copy of the GNU Lesser General Public
+# License along with C-PAC. If not, see .
+"""Global fixtures for C-PAC tests."""
+
+from pathlib import Path
+
+from _pytest.tmpdir import TempPathFactory
+from git import Repo
+import pytest
+
+
+@pytest.fixture(scope="session")
+def bids_examples(tmp_path_factory: TempPathFactory) -> Path:
+ """Get the BIDS examples dataset."""
+ example_dir = tmp_path_factory.mktemp("bids-examples")
+ if not example_dir.exists() or not any(example_dir.iterdir()):
+ Repo.clone_from(
+ "https://github.com/bids-standard/bids-examples.git", str(example_dir)
+ )
+ return example_dir
diff --git a/CPAC/pipeline/engine.py b/CPAC/pipeline/engine.py
index be1d0c0c17..878b743bfe 100644
--- a/CPAC/pipeline/engine.py
+++ b/CPAC/pipeline/engine.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2021-2024 C-PAC Developers
+# Copyright (C) 2021-2025 C-PAC Developers
# This file is part of C-PAC.
@@ -17,6 +17,7 @@
import ast
import copy
import hashlib
+from importlib.resources import files
from itertools import chain
import json
import os
@@ -24,6 +25,7 @@
from typing import Optional
import warnings
+import pandas as pd
from nipype import config, logging
from nipype.interfaces import afni
from nipype.interfaces.utility import Rename
@@ -418,10 +420,12 @@ def get(
if report_fetched:
return (None, None)
return None
+ from CPAC.pipeline.resource_inventory import where_to_find
+
msg = (
"\n\n[!] C-PAC says: None of the listed resources are in "
- f"the resource pool:\n\n {resource}\n\nOptions:\n- You "
- "can enable a node block earlier in the pipeline which "
+ f"the resource pool:\n\n {where_to_find(resource)}\n\nOptions:\n"
+ "- You can enable a node block earlier in the pipeline which "
"produces these resources. Check the 'outputs:' field in "
"a node block's documentation.\n- You can directly "
"provide this required data by pulling it from another "
@@ -456,7 +460,9 @@ def copy_resource(self, resource, new_name):
try:
self.rpool[new_name] = self.rpool[resource]
except KeyError:
- msg = f"[!] {resource} not in the resource pool."
+ from CPAC.pipeline.resource_inventory import where_to_find
+
+ msg = f"[!] Not in the resource pool:\n{where_to_find(resource)}"
raise Exception(msg)
def update_resource(self, resource, new_name):
@@ -628,11 +634,13 @@ def get_strats(self, resources, debug=False):
total_pool.append(sub_pool)
if not total_pool:
+ from CPAC.pipeline.resource_inventory import where_to_find
+
raise LookupError(
"\n\n[!] C-PAC says: None of the listed "
"resources in the node block being connected "
"exist in the resource pool.\n\nResources:\n"
- "%s\n\n" % resource_list
+ "%s\n\n" % where_to_find(resource_list)
)
# TODO: right now total_pool is:
@@ -1007,6 +1015,19 @@ def post_process(self, wf, label, connection, json_info, pipe_idx, pipe_x, outs)
for label_con_tpl in post_labels:
label = label_con_tpl[0]
connection = (label_con_tpl[1], label_con_tpl[2])
+ if "desc-" not in label:
+ if "space-template" in label:
+ new_label = label.replace(
+ "space-template", "space-template_desc-zstd"
+ )
+ else:
+ new_label = f"desc-zstd_{label}"
+ else:
+ for tag in label.split("_"):
+ if "desc-" in tag:
+ newtag = f"{tag}-zstd"
+ new_label = label.replace(tag, newtag)
+ break
if label in Outputs.to_zstd:
zstd = z_score_standardize(f"{label}_zstd_{pipe_x}", input_type)
@@ -1015,20 +1036,6 @@ def post_process(self, wf, label, connection, json_info, pipe_idx, pipe_x, outs)
node, out = self.get_data(mask, pipe_idx=mask_idx)
wf.connect(node, out, zstd, "inputspec.mask")
- if "desc-" not in label:
- if "space-template" in label:
- new_label = label.replace(
- "space-template", "space-template_desc-zstd"
- )
- else:
- new_label = f"desc-zstd_{label}"
- else:
- for tag in label.split("_"):
- if "desc-" in tag:
- newtag = f"{tag}-zstd"
- new_label = label.replace(tag, newtag)
- break
-
post_labels.append((new_label, zstd, "outputspec.out_file"))
self.set_data(
@@ -2408,15 +2415,17 @@ def strip_template(data_label, dir_path, filename):
return data_label, json
+def template_dataframe() -> pd.DataFrame:
+ """Return the template dataframe."""
+ template_csv = files("CPAC").joinpath("resources/cpac_templates.csv")
+ return pd.read_csv(str(template_csv), keep_default_na=False)
+
+
def ingress_pipeconfig_paths(wf, cfg, rpool, unique_id, creds_path=None):
# ingress config file paths
# TODO: may want to change the resource keys for each to include one level up in the YAML as well
- import pandas as pd
- import pkg_resources as p
-
- template_csv = p.resource_filename("CPAC", "resources/cpac_templates.csv")
- template_df = pd.read_csv(template_csv, keep_default_na=False)
+ template_df = template_dataframe()
desired_orientation = cfg.pipeline_setup["desired_orientation"]
for row in template_df.itertuples():
diff --git a/CPAC/pipeline/resource_inventory.py b/CPAC/pipeline/resource_inventory.py
new file mode 100755
index 0000000000..a181ea6567
--- /dev/null
+++ b/CPAC/pipeline/resource_inventory.py
@@ -0,0 +1,670 @@
+#!/usr/bin/env python
+# Copyright (C) 2025 C-PAC Developers
+
+# This file is part of C-PAC.
+
+# C-PAC is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the
+# Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+
+# C-PAC is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+
+# You should have received a copy of the GNU Lesser General Public
+# License along with C-PAC. If not, see .
+"""Inspect inputs and outputs for NodeBlockFunctions."""
+
+from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
+import ast
+from collections.abc import Hashable
+from dataclasses import dataclass, field
+import importlib
+from importlib.resources import files
+import inspect
+from itertools import chain, product
+import os
+from pathlib import Path
+import re
+from typing import Any, cast, Iterable, Optional
+from unittest.mock import patch
+
+from traits.trait_errors import TraitError
+import yaml
+
+from CPAC.pipeline.engine import template_dataframe
+from CPAC.pipeline.nodeblock import NodeBlockFunction
+from CPAC.pipeline.schema import latest_schema
+from CPAC.utils.monitoring import UTLOGGER
+from CPAC.utils.outputs import Outputs
+
+ONE_OFFS: dict[str, list[str]] = {
+ r".*desc-preproc_bold": ["func_ingress"],
+ r".*-sm.*": [
+ f"spatial_smoothing_{smooth_opt}"
+ for smooth_opt in latest_schema.schema["post_processing"]["spatial_smoothing"][
+ "smoothing_method"
+ ][0].container
+ ],
+ r".*-zstd.*": [f"{fisher}zscore_standardize" for fisher in ["", "fisher_"]],
+}
+"""A few out-of-nodeblock generated resources.
+
+Easier to note these manually than to code up the AST rules."""
+
+SKIPS: list[str] = [
+ "CPAC.unet.__init__",
+ "CPAC.unet._torch",
+]
+"""No nodeblock functions in these modules that dynamically install `torch`."""
+
+
+def import_nodeblock_functions(
+ package_name: str, exclude: Optional[list[str]] = None
+) -> list[NodeBlockFunction]:
+ """
+ Import all functions with the @nodeblock decorator from all modules and submodules in a package.
+
+ Parameters
+ ----------
+ package_name
+ The name of the package to import from.
+
+ exclude
+ A list of module names to exclude from the import.
+ """
+ if exclude is None:
+ exclude = []
+ functions: list[NodeBlockFunction] = []
+ package = importlib.import_module(package_name)
+ package_path = package.__path__[0] # Path to the package directory
+
+ for root, _, package_files in os.walk(package_path):
+ for file in package_files:
+ if file.endswith(".py") and file != "__init__.py":
+ # Get the module path
+ rel_path = os.path.relpath(os.path.join(root, file), package_path)
+ module_name = f"{package_name}.{rel_path[:-3].replace(os.sep, '.')}"
+ if module_name in exclude:
+ continue
+
+ # Import the module
+ try:
+ with patch.dict(
+ "sys.modules", {exclusion: None for exclusion in exclude}
+ ):
+ module = importlib.import_module(module_name)
+ except (ImportError, TraitError, ValueError) as e:
+ UTLOGGER.debug(f"Failed to import {module_name}: {e}")
+ continue
+ # Extract nodeblock-decorated functions from the module
+ for _name, obj in inspect.getmembers(
+ module, predicate=lambda obj: isinstance(obj, NodeBlockFunction)
+ ):
+ functions.append(obj)
+
+ return functions
+
+
+@dataclass
+class ResourceSourceList:
+ """A list of resource sources without duplicates."""
+
+ sources: list[str] = field(default_factory=list)
+
+ def __add__(self, other: "str | list[str] | ResourceSourceList") -> list[str]:
+ """Add a list of sources to the list."""
+ if isinstance(other, str):
+ if not other or other == "created_before_this_test":
+ # dummy node in a testing function, no need to include in inventory
+ return list(self)
+ other = [other]
+ new_set = {*self.sources, *other}
+ return sorted(new_set, key=str.casefold)
+
+ def __contains__(self, item: str) -> bool:
+ """Check if a source is in the list."""
+ return item in self.sources
+
+ def __delitem__(self, key: int) -> None:
+ """Delete a source by index."""
+ del self.sources[key]
+
+ def __eq__(self, value: Any) -> bool:
+ """Check if the lists of sources are the same."""
+ return set(self) == set(value)
+
+ def __getitem__(self, item: int) -> str:
+ """Get a source by index."""
+ return self.sources[item]
+
+ def __hash__(self) -> int:
+ """Get the hash of the list of sources."""
+ return hash(self.sources)
+
+ def __iadd__(
+ self, other: "str | list[str] | ResourceSourceList"
+ ) -> "ResourceSourceList":
+ """Add a list of sources to the list."""
+ self.sources = self + other
+ return self
+
+ def __iter__(self):
+ """Iterate over the sources."""
+ return iter(self.sources)
+
+ def __len__(self) -> int:
+ """Get the number of sources."""
+ return len(self.sources)
+
+ def __repr__(self) -> str:
+ """Get the reproducable string representation of the sources."""
+ return f"ResourceSourceList({(self.sources)})"
+
+ def __reversed__(self) -> list[str]:
+ """Get the sources reversed."""
+ return list(reversed(self.sources))
+
+ def __setitem__(self, key: int, value: str) -> None:
+ """Set a source by index."""
+ self.sources[key] = value
+
+ def __sorted__(self) -> list[str]:
+ """Get the sources sorted."""
+ return sorted(self.sources, key=str.casefold)
+
+ def __str__(self) -> str:
+ """Get the string representation of the sources."""
+ return str(self.sources)
+
+
+@dataclass
+class ResourceIO:
+ """NodeBlockFunctions that use a resource for IO."""
+
+ name: str
+ """The name of the resource."""
+ output_from: ResourceSourceList | list[str] = field(
+ default_factory=ResourceSourceList
+ )
+ """The functions that output the resource."""
+ output_to: ResourceSourceList | list[str] = field(
+ default_factory=ResourceSourceList
+ )
+ """The subdirectory the resource is output to."""
+ input_for: ResourceSourceList | list[str] = field(
+ default_factory=ResourceSourceList
+ )
+ """The functions that use the resource as input."""
+
+ def __post_init__(self) -> None:
+ """Handle optionals."""
+ if isinstance(self.output_from, list):
+ self.output_from = ResourceSourceList(self.output_from)
+ if isinstance(self.output_to, list):
+ self.output_to = ResourceSourceList(self.output_to)
+ if isinstance(self.input_for, list):
+ self.input_for = ResourceSourceList(self.input_for)
+
+ def __str__(self) -> str:
+ """Return string representation for ResourceIO instance."""
+ return f"{{{self.name}: {{'input_for': {self.input_for!s}, 'output_from': {self.output_from!s}}}}})"
+
+ def as_dict(self) -> dict[str, list[str]]:
+ """Return the ResourceIO as a built-in dictionary type."""
+ return {
+ k: v
+ for k, v in {
+ "input_for": [str(source) for source in self.input_for],
+ "output_from": [str(source) for source in self.output_from],
+ "output_to": [str(source) for source in self.output_to],
+ }.items()
+ if v
+ }
+
+
+def cli_parser() -> Namespace:
+ """Parse command line argument."""
+ parser = ArgumentParser(
+ description="Inventory resources for C-PAC NodeBlockFunctions.",
+ formatter_class=ArgumentDefaultsHelpFormatter,
+ )
+ parser.add_argument(
+ "-o",
+ "--output",
+ nargs="?",
+ help="The output file to write the inventory to.",
+ type=Path,
+ default=Path("resource_inventory.yaml"),
+ )
+ return parser.parse_args()
+
+
+def _flatten_io(io: Iterable[Iterable]) -> list[str]:
+ """Given a list of strings or iterables thereof, flatten the list to all strings."""
+ if all(isinstance(resource, str) for resource in io):
+ return cast(list[str], io)
+ while not all(isinstance(resource, str) for resource in io):
+ io = list(
+ chain.from_iterable(
+ [
+ resource if not isinstance(resource, str) else [resource]
+ for resource in io
+ ]
+ )
+ )
+ return cast(list[str], io)
+
+
+class MultipleContext(list):
+ """Subclass of list to store multilpe contexts."""
+
+ def __init__(self, /, *args, **kwargs) -> None:
+ """Initialize MultipleContext."""
+ super().__init__(*args, **kwargs)
+ data = self._unique(self)
+ self.clear()
+ self.extend(data)
+
+ def __hash__(self) -> int:
+ """Hash a MultipleContext instance."""
+ return hash(str(self))
+
+ def __str__(self) -> str:
+ """Return a stringified MultipleContext instance."""
+ if len(self) == 1:
+ return str(self[0])
+ return super().__str__()
+
+ def append(self, item: Any) -> None:
+ """Append if not already included."""
+ if item not in self:
+ super().append(item)
+
+ def extend(self, iterable: Iterable) -> None:
+ """Extend MultipleContext."""
+ for item in iterable:
+ self.append(item)
+
+ @staticmethod
+ def _unique(iterable: Iterable) -> list:
+ """Dedupe."""
+ try:
+ seen = set()
+ return [x for x in iterable if not (x in seen or seen.add(x))]
+ except TypeError:
+ seen = set()
+ return [
+ x
+ for x in (MultipleContext(item) for item in iterable)
+ if not (x in seen or seen.add(x))
+ ]
+
+
+class DirectlySetResources(ast.NodeVisitor):
+ """Class to track resources set directly, rather than through NodeBlocks."""
+
+ def __init__(self) -> None:
+ """Initialize the visitor."""
+ super().__init__()
+ self._context: dict[str, Any] = {}
+ self.dynamic_resources: dict[str, ResourceSourceList] = {
+ resource: ResourceSourceList(sources)
+ for resource, sources in ONE_OFFS.items()
+ }
+ self._history: dict[str, list[Any]] = {}
+ self.resources: dict[str, ResourceSourceList] = {}
+
+ def assign_resource(self, resource: str, value: str | MultipleContext) -> None:
+ """Assign a value to a resource."""
+ if isinstance(resource, ast.AST):
+ resource = self.parse_ast(resource)
+ resource = str(resource)
+ if isinstance(value, MultipleContext):
+ for subvalue in value:
+ self.assign_resource(resource, subvalue)
+ return
+ target = (
+ self.dynamic_resources
+ if r".*" in value or r".*" in resource
+ else self.resources
+ )
+ if resource not in target:
+ target[resource] = ResourceSourceList()
+ target[resource] += value
+
+ @property
+ def context(self) -> dict[str, Any]:
+ """Return the context."""
+ return self._context
+
+ @context.setter
+ def context(self, value: tuple[Iterable, Any]) -> None:
+ """Set the context."""
+ key, _value = value
+ if not isinstance(key, str):
+ for subkey in key:
+ self.context = subkey, _value
+ else:
+ self._context[key] = _value
+ if key not in self._history:
+ self._history[key] = [".*"]
+ self._history[key].append(_value)
+
+ def lookup_context(
+ self, variable: str, return_type: Optional[type] = None
+ ) -> str | MultipleContext:
+ """Plug in variable."""
+ if variable in self.context:
+ if self.context[variable] == variable or (
+ return_type and not isinstance(self.context[variable], return_type)
+ ):
+ history = list(self._history[variable])
+ while history and history[-1] == variable:
+ history.pop()
+ if history:
+ context = history[-1]
+ while (
+ return_type
+ and len(history)
+ and not isinstance(context, return_type)
+ ):
+ context = history.pop()
+ if return_type and not isinstance(context, return_type):
+ return ".*"
+ return context
+ return self.context[variable]
+ return ".*"
+
+ @staticmethod
+ def handle_multiple_contexts(contexts: list[str | list[str]]) -> list[str]:
+ """Parse multiple contexts."""
+ if isinstance(contexts, list):
+ return MultipleContext(
+ [
+ "".join(list(ctx))
+ for ctx in product(
+ *[
+ context if isinstance(context, list) else [context]
+ for context in contexts
+ ]
+ )
+ ]
+ )
+ return contexts
+
+ def parse_ast(self, node: Any) -> Any:
+ """Parse AST."""
+ if not isinstance(node, ast.AST):
+ if isinstance(node, str) or not isinstance(node, Iterable):
+ return str(node)
+ if isinstance(node, ast.Dict):
+ return {
+ self.parse_ast(key): self.parse_ast(value)
+ for key, value in dict(zip(node.keys, node.values)).items()
+ }
+ if isinstance(node, (MultipleContext, list, set, tuple)):
+ return type(node)(self.parse_ast(subnode) for subnode in node)
+ if isinstance(node, ast.FormattedValue):
+ if hasattr(node, "value") and hasattr(node.value, "id"):
+ return self.lookup_context(getattr(node.value, "id"))
+ if isinstance(node, ast.JoinedStr):
+ node_values = [self.parse_ast(value) for value in node.values]
+ if any(isinstance(value, MultipleContext) for value in node_values):
+ return self.handle_multiple_contexts(node_values)
+ return "".join(str(item) for item in node_values)
+ if isinstance(node, ast.Dict):
+ return {
+ self.parse_ast(key)
+ if isinstance(self.parse_ast(key), Hashable)
+ else ".*": self.parse_ast(value)
+ for key, value in dict(zip(node.keys, node.values)).items()
+ }
+ if not isinstance(node, ast.Call):
+ for attr in ["values", "elts", "args"]:
+ if hasattr(node, attr):
+ iterable = getattr(node, attr)
+ if isinstance(iterable, Iterable):
+ return [
+ self.parse_ast(subnode) for subnode in getattr(node, attr)
+ ]
+ return self.parse_ast(iterable)
+ for attr in ["value", "id", "arg"]:
+ if hasattr(node, attr):
+ return self.parse_ast(getattr(node, attr))
+ elif (
+ hasattr(node, "func")
+ and getattr(node.func, "attr", None) in ["items", "keys", "values"]
+ and getattr(getattr(node.func, "value", None), "id", None) in self.context
+ ):
+ context = self.lookup_context(node.func.value.id, return_type=dict)
+ if isinstance(context, dict):
+ return MultipleContext(getattr(context, node.func.attr)())
+ return r".*" # wildcard for regex matching
+
+ def visit_Assign(self, node: ast.Assign) -> None:
+ """Visit an assignment."""
+ value = self.parse_ast(node.value)
+ if value == "row" and getattr(node.value, "attr", None):
+ # hack for template dataframe
+ value = MultipleContext(getattr(template_dataframe(), node.value.attr))
+ for target in node.targets:
+ resource = self.parse_ast(target)
+ self.context = resource, value
+ self.generic_visit(node)
+
+ def visit_Call(self, node: ast.Call) -> None:
+ """Visit a function call."""
+ if isinstance(node.func, ast.Attribute) and node.func.attr == "set_data":
+ value = self.parse_ast(node.args[5])
+ if isinstance(node.args[5], ast.Name):
+ if isinstance(value, str):
+ value = self.lookup_context(value)
+ if hasattr(node.args[0], "value"):
+ resource: str = getattr(node.args[0], "value")
+ elif hasattr(node.args[0], "id"):
+ resource = self.lookup_context(getattr(node.args[0], "id"))
+ if isinstance(resource, MultipleContext):
+ if len(resource) == len(value):
+ for k, v in zip(resource, value):
+ self.assign_resource(k, v)
+ else:
+ for resource_context in resource:
+ self.assign_resource(resource_context, value)
+ self.generic_visit(node)
+ return
+ elif isinstance(node.args[0], ast.JoinedStr):
+ resource = self.parse_ast(node.args[0])
+ else:
+ self.generic_visit(node)
+ return
+ self.assign_resource(resource, value)
+ self.generic_visit(node)
+
+ def visit_For(self, node: ast.For) -> None:
+ """Vist for loop."""
+ target = self.parse_ast(node.target)
+ if (
+ hasattr(node.iter, "func")
+ and hasattr(node.iter.func, "value")
+ and hasattr(node.iter.func.value, "id")
+ ):
+ context = self.parse_ast(node.iter)
+ if not context:
+ context = r".*"
+ if isinstance(target, list):
+ target_len = len(target)
+ if isinstance(context, dict):
+ self.context = target[0], MultipleContext(context.keys())
+ if isinstance(context, list) and all(
+ (isinstance(item, tuple) and len(item) == target_len)
+ for item in context
+ ):
+ for index, item in enumerate(target):
+ self.context = (
+ item,
+ MultipleContext(
+ subcontext[index] for subcontext in context
+ ),
+ )
+ elif hasattr(node.iter, "value") and (
+ getattr(node.iter.value, "id", None) == "self"
+ or getattr(node.iter, "attr", False)
+ ):
+ self.context = target, ".*"
+ else:
+ self.context = target, self.parse_ast(node.iter)
+ self.generic_visit(node)
+
+ def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
+ """Visit a function definition."""
+ if node.name == "set_data":
+ # skip the method definition
+ return
+ for arg in self.parse_ast(node):
+ self.context = arg, ".*"
+ self.generic_visit(node)
+
+
+def find_directly_set_resources(
+ package_name: str,
+) -> tuple[dict[str, ResourceSourceList], dict[str, ResourceSourceList]]:
+ """Find all resources set explicitly via :pyy:method:`~CPAC.pipeline.engine.ResourcePool.set_data`.
+
+ Parameters
+ ----------
+ package_name
+ The name of the package to search for resources.
+
+ Returns
+ -------
+ dict
+ A dictionary containing the name of the resource and the name of the functions that set it.
+
+ dict
+ A dictionary containing regex strings for special cases
+ """
+ resources: dict[str, ResourceSourceList] = {}
+ dynamic_resources: dict[str, ResourceSourceList] = {}
+ for dirpath, _, filenames in os.walk(str(files(package_name))):
+ for filename in filenames:
+ if filename.endswith(".py"):
+ filepath = os.path.join(dirpath, filename)
+ with open(filepath, "r", encoding="utf-8") as file:
+ tree = ast.parse(file.read(), filename=filepath)
+ directly_set = DirectlySetResources()
+ directly_set.visit(tree)
+ for resource in directly_set.resources:
+ if resource not in resources:
+ resources[resource] = ResourceSourceList()
+ resources[resource] += directly_set.resources[resource]
+ for resource in directly_set.dynamic_resources:
+ if resource not in dynamic_resources:
+ dynamic_resources[resource] = ResourceSourceList()
+ dynamic_resources[resource] += directly_set.dynamic_resources[
+ resource
+ ]
+ return resources, dynamic_resources
+
+
+def resource_inventory(package: str = "CPAC") -> dict[str, ResourceIO]:
+ """Gather all inputs and outputs for a list of NodeBlockFunctions."""
+ resources: dict[str, ResourceIO] = {}
+ # Node block function inputs and outputs
+ for nbf in import_nodeblock_functions(
+ package,
+ exclude=SKIPS,
+ ):
+ nbf_name = f"{nbf.name} ({nbf.__module__}.{nbf.__qualname__})"
+ if hasattr(nbf, "inputs"):
+ for nbf_input in _flatten_io(cast(list[Iterable], nbf.inputs)):
+ if nbf_input:
+ if nbf_input not in resources:
+ resources[nbf_input] = ResourceIO(
+ nbf_input, input_for=[nbf_name]
+ )
+ else:
+ resources[nbf_input].input_for += nbf_name
+ if hasattr(nbf, "outputs"):
+ for nbf_output in _flatten_io(cast(list[Iterable], nbf.outputs)):
+ if nbf_output:
+ if nbf_output not in resources:
+ resources[nbf_output] = ResourceIO(
+ nbf_output, output_from=[nbf_name]
+ )
+ else:
+ resources[nbf_output].output_from += nbf_name
+ # Template resources set from pipeline config
+ templates_from_config_df = template_dataframe()
+ for _, row in templates_from_config_df.iterrows():
+ output_from = f"pipeline configuration: {row.Pipeline_Config_Entry}"
+ if row.Key not in resources:
+ resources[row.Key] = ResourceIO(row.Key, output_from=[output_from])
+ else:
+ resources[row.Key].output_from += output_from
+ # Hard-coded resources
+ direct, dynamic = find_directly_set_resources(package)
+ for resource, functions in direct.items():
+ if resource not in resources:
+ resources[resource] = ResourceIO(resource, output_from=functions)
+ else:
+ resources[resource].output_from += functions
+ # Outputs
+ for _, row in Outputs.reference.iterrows():
+ if row.Resource not in resources:
+ resources[row.Resource] = ResourceIO(
+ row.Resource, output_to=[row["Sub-Directory"]]
+ )
+ else:
+ resources[row.Resource].output_to += row["Sub-Directory"]
+ # Special cases
+ for dynamic_key, dynamic_value in dynamic.items():
+ if dynamic_key != r".*":
+ dynamic_resource = re.compile(dynamic_key)
+ for resource in resources.keys():
+ if dynamic_resource.search(resource):
+ resources[resource].output_from += dynamic_value
+ if "interface" in resources:
+ # this is a loop in setting up nodeblocks
+ # https://github.com/FCP-INDI/C-PAC/blob/61ad414447023daf0e401a81c92267b09c64ed94/CPAC/pipeline/engine.py#L1453-L1464
+ # it's already handled in the NodeBlock resources
+ del resources["interface"]
+ return dict(sorted(resources.items(), key=lambda item: item[0].casefold()))
+
+
+def dump_inventory_to_yaml(inventory: dict[str, ResourceIO]) -> str:
+ """Dump NodeBlock Interfaces to a YAML string."""
+ return yaml.dump(
+ {key: value.as_dict() for key, value in inventory.items()}, sort_keys=False
+ )
+
+
+def where_to_find(resources: list[str] | str) -> str:
+ """Return a multiline string describing where each listed resource is output from."""
+ if isinstance(resources, str):
+ resources = [resources]
+ resources = _flatten_io(resources)
+ inventory = resource_inventory("CPAC")
+ output = ""
+ for resource in resources:
+ output += f"'{resource}' can be output from:\n"
+ if resource in inventory:
+ for source in inventory[resource].output_from:
+ output += f" {source}\n"
+ else:
+ output += " !! Nowhere !!\n"
+ output += "\n"
+ return output.rstrip()
+
+
+def main() -> None:
+ """Save the NodeBlock inventory to a file."""
+ args = cli_parser()
+ with args.output.open("w") as file:
+ file.write(dump_inventory_to_yaml(resource_inventory("CPAC")))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/CPAC/pipeline/test/test_engine.py b/CPAC/pipeline/test/test_engine.py
index cf85f50dbe..25b16d9e44 100644
--- a/CPAC/pipeline/test/test_engine.py
+++ b/CPAC/pipeline/test/test_engine.py
@@ -1,5 +1,27 @@
+# Copyright (C) 2021-2025 C-PAC Developers
+
+# This file is part of C-PAC.
+
+# C-PAC is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the
+# Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+
+# C-PAC is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+
+# You should have received a copy of the GNU Lesser General Public
+# License along with C-PAC. If not, see .
+"""Unit tests for the C-PAC pipeline engine."""
+
+from argparse import Namespace
import os
+from pathlib import Path
+from typing import cast
+from _pytest.logging import LogCaptureFixture
import pytest
from CPAC.pipeline.cpac_pipeline import (
@@ -138,17 +160,129 @@ def test_build_workflow(pipe_config, bids_dir, test_dir):
wf.run()
+def test_missing_resource(
+ bids_examples: Path, caplog: LogCaptureFixture, tmp_path: Path
+) -> None:
+ """Test the error message thrown when a resource is missing."""
+ from datetime import datetime
+
+ import yaml
+
+ from CPAC.pipeline.cpac_runner import run
+ from CPAC.utils.bids_utils import sub_list_filter_by_labels
+ from CPAC.utils.configuration import Preconfiguration, set_subject
+ from CPAC.utils.configuration.yaml_template import create_yaml_from_template
+
+ st = datetime.now().strftime("%Y-%m-%dT%H-%M-%SZ")
+ namespace = Namespace(
+ bids_dir=str(bids_examples / "ds113b"),
+ output_dir=str(tmp_path / "output"),
+ analysis_level="test_config",
+ participant_label="sub-01",
+ )
+ c = Preconfiguration("anat-only")
+ c["pipeline_setup", "output_directory", "path"] = namespace.output_dir
+ c["pipeline_setup", "log_directory", "path"] = str(tmp_path / "logs")
+ c["pipeline_setup", "working_directory", "path"] = str(tmp_path / "work")
+ c["pipeline_setup", "system_config", "maximum_memory_per_participant"] = 1.0
+ c["pipeline_setup", "system_config", "max_cores_per_participant"] = 1
+ c["pipeline_setup", "system_config", "num_participants_at_once"] = 1
+ c["pipeline_setup", "system_config", "num_ants_threads"] = 1
+ c["pipeline_setup", "working_directory", "remove_working_dir"] = True
+ sub_list = create_cpac_data_config(
+ namespace.bids_dir,
+ namespace.participant_label,
+ None,
+ True,
+ only_one_anat=False,
+ )
+ sub_list = sub_list_filter_by_labels(list(sub_list), {"T1w": None, "bold": None})
+ for i, sub in enumerate(sub_list):
+ if isinstance(sub.get("anat"), dict):
+ for anat_key in sub["anat"]:
+ if isinstance(sub["anat"][anat_key], list) and len(
+ sub["anat"][anat_key]
+ ):
+ sub_list[i]["anat"][anat_key] = sub["anat"][anat_key][0]
+ if isinstance(sub.get("anat"), list) and len(sub["anat"]):
+ sub_list[i]["anat"] = sub["anat"][0]
+ data_config_file = f"cpac_data_config_{st}.yml"
+ sublogdirs = [set_subject(sub, c)[2] for sub in sub_list]
+ # write out the data configuration file
+ data_config_file = os.path.join(sublogdirs[0], data_config_file)
+ with open(data_config_file, "w", encoding="utf-8") as _f:
+ noalias_dumper = yaml.dumper.SafeDumper
+ noalias_dumper.ignore_aliases = lambda self, data: True
+ yaml.dump(sub_list, _f, default_flow_style=False, Dumper=noalias_dumper)
+
+ # update and write out pipeline config file
+ pipeline_config_file = os.path.join(sublogdirs[0], f"cpac_pipeline_config_{st}.yml")
+ with open(pipeline_config_file, "w", encoding="utf-8") as _f:
+ _f.write(create_yaml_from_template(c))
+ minimized_config = f"{pipeline_config_file[:-4]}_min.yml"
+ with open(minimized_config, "w", encoding="utf-8") as _f:
+ _f.write(create_yaml_from_template(c, import_from="blank"))
+ for config_file in (data_config_file, pipeline_config_file, minimized_config):
+ os.chmod(config_file, 0o444) # Make config files readonly
+
+ if len(sublogdirs) > 1:
+ # If more than one run is included in the given data config
+ # file, an identical copy of the data and pipeline config
+ # will be included in the log directory for each run
+ for sublogdir in sublogdirs[1:]:
+ for config_file in (
+ data_config_file,
+ pipeline_config_file,
+ minimized_config,
+ ):
+ try:
+ os.link(config_file, config_file.replace(sublogdirs[0], sublogdir))
+ except FileExistsError:
+ pass
+
+ run(
+ data_config_file,
+ pipeline_config_file,
+ plugin="Linear",
+ plugin_args={
+ "n_procs": int(
+ cast(
+ int | str,
+ c["pipeline_setup", "system_config", "max_cores_per_participant"],
+ )
+ ),
+ "memory_gb": int(
+ cast(
+ int | str,
+ c[
+ "pipeline_setup",
+ "system_config",
+ "maximum_memory_per_participant",
+ ],
+ )
+ ),
+ "raise_insufficient": c[
+ "pipeline_setup", "system_config", "raise_insufficient"
+ ],
+ },
+ tracking=False,
+ test_config=namespace.analysis_level == "test_config",
+ )
+
+ assert "can be output from" in caplog.text
+
+
# bids_dir = "/Users/steven.giavasis/data/HBN-SI_dataset/rawdata"
# test_dir = "/test_dir"
# cfg = "/Users/hecheng.jin/GitHub/DevBranch/CPAC/resources/configs/pipeline_config_monkey-ABCD.yml"
-cfg = "/Users/hecheng.jin/GitHub/pipeline_config_monkey-ABCDlocal.yml"
-bids_dir = "/Users/hecheng.jin/Monkey/monkey_data_oxford/site-ucdavis"
-test_dir = "/Users/hecheng.jin/GitHub/Test/T2preproc"
# test_ingress_func_raw_data(cfg, bids_dir, test_dir)
# test_ingress_anat_raw_data(cfg, bids_dir, test_dir)
# test_ingress_pipeconfig_data(cfg, bids_dir, test_dir)
# test_build_anat_preproc_stack(cfg, bids_dir, test_dir)
if __name__ == "__main__":
+ cfg = "/Users/hecheng.jin/GitHub/pipeline_config_monkey-ABCDlocal.yml"
+ bids_dir = "/Users/hecheng.jin/Monkey/monkey_data_oxford/site-ucdavis"
+ test_dir = "/Users/hecheng.jin/GitHub/Test/T2preproc"
test_build_workflow(cfg, bids_dir, test_dir)
diff --git a/CPAC/utils/outputs.py b/CPAC/utils/outputs.py
index 11b81eb60f..451d893987 100644
--- a/CPAC/utils/outputs.py
+++ b/CPAC/utils/outputs.py
@@ -1,10 +1,30 @@
+# Copyright (C) 2018-2025 C-PAC Developers
+
+# This file is part of C-PAC.
+
+# C-PAC is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the
+# Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+
+# C-PAC is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+
+# You should have received a copy of the GNU Lesser General Public
+# License along with C-PAC. If not, see .
+"""Specify the resources that C-PAC writes to the output direcotry."""
+
+from importlib.resources import files
+
import pandas as pd
-import pkg_resources as p
class Outputs:
- # Settle some things about the resource pool reference and the output directory
- reference_csv = p.resource_filename("CPAC", "resources/cpac_outputs.tsv")
+ """Settle some things about the resource pool reference and the output directory."""
+
+ reference_csv = str(files("CPAC").joinpath("resources/cpac_outputs.tsv"))
try:
reference = pd.read_csv(reference_csv, delimiter="\t", keep_default_na=False)
diff --git a/dev/circleci_data/conftest.py b/dev/circleci_data/conftest.py
new file mode 100644
index 0000000000..ba239b2b4f
--- /dev/null
+++ b/dev/circleci_data/conftest.py
@@ -0,0 +1,19 @@
+# Copyright (C) 2025 C-PAC Developers
+
+# This file is part of C-PAC.
+
+# C-PAC is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the
+# Free Software Foundation, either version 3 of the License, or (at your
+# option) any later version.
+
+# C-PAC is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+# License for more details.
+
+# You should have received a copy of the GNU Lesser General Public
+# License along with C-PAC. If not, see .
+"""Global fixtures for C-PAC tests."""
+
+from CPAC.conftest import * # noqa: F403
diff --git a/dev/circleci_data/test_external_utils.py b/dev/circleci_data/test_external_utils.py
index f516b0c903..31f6b243da 100644
--- a/dev/circleci_data/test_external_utils.py
+++ b/dev/circleci_data/test_external_utils.py
@@ -31,8 +31,6 @@
from CPAC.__main__ import utils as CPAC_main_utils # noqa: E402
-# pylint: disable=wrong-import-position
-
def _click_backport(command, key):
"""Switch back to underscores for older versions of click."""
@@ -93,18 +91,11 @@ def test_build_data_config(caplog, cli_runner, multiword_connector):
_delete_test_yaml(test_yaml)
-def test_new_settings_template(caplog, cli_runner):
+def test_new_settings_template(bids_examples: Path, caplog, cli_runner):
"""Test CLI ``utils new-settings-template``."""
caplog.set_level(INFO)
os.chdir(CPAC_DIR)
-
- example_dir = os.path.join(CPAC_DIR, "bids-examples")
- if not os.path.exists(example_dir):
- from git import Repo
-
- Repo.clone_from(
- "https://github.com/bids-standard/bids-examples.git", example_dir
- )
+ assert bids_examples.exists()
result = cli_runner.invoke(
CPAC_main_utils.commands[
diff --git a/setup.py b/setup.py
index 17919395d2..f22a744e2d 100755
--- a/setup.py
+++ b/setup.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2022-2024 C-PAC Developers
+# Copyright (C) 2022-2025 C-PAC Developers
# This file is part of C-PAC.
@@ -84,7 +84,12 @@ def main(**extra_args):
extras_require={"graphviz": ["pygraphviz"]},
configuration=configuration,
scripts=glob("scripts/*"),
- entry_points={"console_scripts": ["cpac = CPAC.__main__:main"]},
+ entry_points={
+ "console_scripts": [
+ "cpac = CPAC.__main__:main",
+ "resource_inventory = CPAC.pipeline.resource_inventory:main",
+ ]
+ },
package_data={
"CPAC": [
"test_data/*",