Source code for CPAC.pipeline.utils

# Copyright (C) 2021-2024  C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""C-PAC pipeline engine utilities."""

from itertools import chain
import os
import subprocess
from typing import Optional

from CPAC.func_preproc.func_motion import motion_estimate_filter
from CPAC.utils.bids_utils import insert_entity

MOVEMENT_FILTER_KEYS = motion_estimate_filter.outputs


[docs] def get_shell() -> str: """Return the path to default shell.""" shell: Optional[str] = subprocess.getoutput( f"which $(ps -p {os.getppid()} -o comm=)" ) if not shell: try: shell = os.environ["_SHELL"] except KeyError: msg = "Shell command not found." raise EnvironmentError(msg) return shell
[docs] def name_fork(resource_idx, cfg, json_info, out_dct): """Create and insert entities for forkpoints. Parameters ---------- resource_idx : str cfg : CPAC.utils.configuration.Configuration json_info : dict out_dct : dict Returns ------- resource_idx : str out_dct : dict """ if cfg.switch_is_on( [ "functional_preproc", "motion_estimates_and_correction", "motion_estimate_filter", "run", ] ): filt_value = None _motion_variant = { _key: json_info["CpacVariant"][_key] for _key in MOVEMENT_FILTER_KEYS if _key in json_info.get("CpacVariant", {}) } if "unfiltered-" in resource_idx: resource_idx = resource_idx.replace("unfiltered-", "") filt_value = "none" else: try: filt_value = next( json_info["CpacVariant"][_k][0].replace( "motion_estimate_filter_", "" ) for _k, _v in _motion_variant.items() if _v ) except (IndexError, KeyError): filt_value = "none" resource_idx, out_dct = _update_resource_idx( resource_idx, out_dct, "filt", filt_value ) if cfg.switch_is_on(["nuisance_corrections", "2-nuisance_regression", "run"]): variants = [ variant.split("_")[-1] for variant in chain.from_iterable( json_info.get("CpacVariant", {}).values() ) if variant.startswith("nuisance_regressors_generation") ] if cfg.switch_is_off(["nuisance_corrections", "2-nuisance_regression", "run"]): variants.append("Off") reg_value = variants[0] if variants else None resource_idx, out_dct = _update_resource_idx( resource_idx, out_dct, "reg", reg_value ) return resource_idx, out_dct
[docs] def present_outputs(outputs: dict, keys: list) -> dict: """ Return the subset of ``outputs`` including only that are present in ``keys``. I.e., :py:func:`~CPAC.func_preproc.func_motion.motion_correct_connections` will have different items in its ``outputs`` dictionary at different times depending on the ``motion_correction`` configuration; :py:func:`~CPAC.func_preproc.func_motion.func_motion_estimates` can then wrap that ``outputs`` in this function and provide a list of keys of the desired outputs to include, if they are present in the provided ``outputs`` dictionary, eliminating the need for multiple NodeBlocks that differ only by configuration options and relevant output keys. Parameters ---------- outputs : dict keys : list of str Returns ------- dict outputs filtered down to keys Examples -------- >>> present_outputs({'a': 1, 'b': 2, 'c': 3}, ['b']) {'b': 2} >>> present_outputs({'a': 1, 'b': 2, 'c': 3}, ['d']) {} >>> present_outputs({'a': 1, 'b': 2, 'c': 3}, ['a', 'c']) {'a': 1, 'c': 3} """ # pylint: disable=line-too-long return {key: outputs[key] for key in keys if key in outputs}
[docs] def source_set(sources: str | list | set) -> set: """Given a CpacProvenance, return a set of {resource}:{source} strings. Parameters ---------- sources: str, list, or set Returns ------- set Examples -------- >>> source_set([[[['bold:func_ingress', ... 'desc-preproc_bold:func_reorient', ... 'desc-preproc_bold:func_truncate'], ... ['TR:func_metadata_ingress'], ... ['tpattern:func_metadata_ingress'], ... 'desc-preproc_bold:func_slice_time'], ... [['bold:func_ingress', ... 'desc-preproc_bold:func_reorient', ... 'desc-preproc_bold:func_truncate'], ... ['bold:func_ingress', 'desc-reorient_bold:func_reorient'], ... 'motion-basefile:get_motion_ref_fmriprep_reference'], ... 'desc-preproc_bold:motion_correction_only_mcflirt'], ... [[['bold:func_ingress', ... 'desc-preproc_bold:func_reorient', ... 'desc-preproc_bold:func_truncate'], ... ['bold:func_ingress', 'desc-reorient_bold:func_reorient'], ... 'motion-basefile:get_motion_ref_fmriprep_reference'], ... [[['bold:func_ingress', ... 'desc-preproc_bold:func_reorient', ... 'desc-preproc_bold:func_truncate'], ... ['TR:func_metadata_ingress'], ... ['tpattern:func_metadata_ingress'], ... 'desc-preproc_bold:func_slice_time'], ... [['bold:func_ingress', ... 'desc-preproc_bold:func_reorient', ... 'desc-preproc_bold:func_truncate'], ... ['bold:func_ingress', 'desc-reorient_bold:func_reorient'], ... 'motion-basefile:get_motion_ref_fmriprep_reference'], ... 'desc-preproc_bold:motion_correction_only_mcflirt'], ... ['FSL-AFNI-bold-ref:template_resample'], ... ['FSL-AFNI-brain-mask:template_resample'], ... ['FSL-AFNI-brain-probseg:template_resample'], ... 'space-bold_desc-brain_mask:bold_mask_fsl_afni'], ... 'desc-preproc_bold:bold_masking']) == set({ ... 'FSL-AFNI-bold-ref:template_resample', ... 'FSL-AFNI-brain-mask:template_resample', ... 'FSL-AFNI-brain-probseg:template_resample', ... 'TR:func_metadata_ingress', ... 'bold:func_ingress', ... 'desc-preproc_bold:bold_masking', ... 'desc-preproc_bold:func_reorient', ... 'desc-preproc_bold:func_slice_time', ... 'desc-preproc_bold:func_truncate', ... 'desc-preproc_bold:motion_correction_only_mcflirt', ... 'desc-reorient_bold:func_reorient', ... 'motion-basefile:get_motion_ref_fmriprep_reference', ... 'space-bold_desc-brain_mask:bold_mask_fsl_afni', ... 'tpattern:func_metadata_ingress'}) True """ _set = set() if isinstance(sources, str): _set.add(sources) if isinstance(sources, (set, list)): for item in sources: _set.update(source_set(item)) return _set
def _update_resource_idx(resource_idx, out_dct, key, value): """ Given a resource_idx and an out_dct, insert fork-based keys as appropriate. Parameters ---------- resource_idx : str out_dct : dict key : str value : str Returns ------- resource_idx : str out_dct : dict """ if value is not None: resource_idx = insert_entity(resource_idx, key, value) out_dct["filename"] = insert_entity(out_dct["filename"], key, value) return resource_idx, out_dct