Source code for CPAC.utils.datasource

# Copyright (C) 2012-2024  C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""Utilities for sourcing data."""

import csv
import json
from pathlib import Path
import re

from voluptuous import RequiredFieldInvalid
from nipype.interfaces import utility as util

from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.resources.templates.lookup_table import format_identifier, lookup_identifier
from CPAC.utils import function
from CPAC.utils.bids_utils import bids_remove_entity
from CPAC.utils.interfaces.function import Function
from CPAC.utils.monitoring import FMLOGGER
from CPAC.utils.utils import get_scan_params


[docs] def bidsier_prefix(unique_id): """Return a BIDSier prefix for a given unique_id. Parameters ---------- unique_id : str Returns ------- prefix : str Examples -------- >>> bidsier_prefix('01_1') 'sub-01_ses-1' >>> bidsier_prefix('sub-01_ses-1') 'sub-01_ses-1' >>> bidsier_prefix('sub-01_1') 'sub-01_ses-1' >>> bidsier_prefix('01_ses-1') 'sub-01_ses-1' """ keys = ["sub", "ses"] components = unique_id.split("_") for i, component in enumerate(components): if i < len(keys): if not component.startswith(keys[i]): components[i] = "-".join([keys[i], component]) return "_".join(components)
[docs] def get_rest(scan, rest_dict, resource="scan"): """Return the path of the chosen resource in the functional file dictionary. scan: the scan/series name or label rest_dict: the dictionary read in from the data configuration YAML file (sublist) nested under 'func:' resource: the dictionary key scan - the functional timeseries scan_parameters - path to the scan parameters JSON file, or a dictionary containing scan parameters information (to be phased out in the future) """ try: file_path = rest_dict[scan][resource] except KeyError: file_path = None return file_path
[docs] def extract_scan_params_dct(scan_params_dct): """Extract the scan parameters dictionary from the data configuration file.""" return scan_params_dct
[docs] def select_model_files(model, ftest, model_name): """Select model files.""" import glob import os files = glob.glob(os.path.join(model, "*")) if len(files) == 0: msg = f"No files found inside directory {model}" raise FileNotFoundError(msg) fts_file = "" for filename in files: if (model_name + ".mat") in filename: mat_file = filename elif (model_name + ".grp") in filename: grp_file = filename elif ((model_name + ".fts") in filename) and ftest: fts_file = filename elif (model_name + ".con") in filename: con_file = filename if ftest and fts_file == "": errmsg = ( "\n[!] CPAC says: You have f-tests included in your group " f"analysis model '{model_name}', but no .fts files were found in the " f"output folder specified for group analysis: {model}.\n\nThe " ".fts file is automatically generated by CPAC, and if you " "are seeing this error, it is because something went wrong " "with the generation of this file, or it has been moved." ) raise FileNotFoundError(errmsg) return fts_file, con_file, grp_file, mat_file
[docs] def check_func_scan(func_scan_dct, scan): """Run some checks on the functional timeseries-related files. For a given series/scan name or label. """ scan_resources = func_scan_dct[scan] try: scan_resources.keys() except AttributeError: err = ( "\n[!] The data configuration file you provided is " "missing a level under the 'func:' key. CPAC versions " "1.2 and later use data configurations with an " "additional level of nesting.\n\nExample\nfunc:\n " "rest01:\n scan: /path/to/rest01_func.nii.gz\n" " scan parameters: /path/to/scan_params.json\n\n" "See the User Guide for more information.\n\n" ) raise ValueError(err) # actual 4D time series file if "scan" not in scan_resources.keys(): err = ( f"\n\n[!] The {scan} scan is missing its actual time-series " "scan file, which should be a filepath labeled with the " "'scan' key.\n\n" ) raise FileNotFoundError(err) # Nipype restriction (may have changed) if "." in scan or "+" in scan or "*" in scan: msg = ( "\n\n[!] Scan names cannot contain any special " "characters (., +, *, etc.). Please update this " f"and try again.\n\nScan: {scan}" "\n\n" ) raise ValueError(msg)
[docs] def create_func_datasource(rest_dict, rpool, wf_name="func_datasource"): """Return the functional timeseries-related file paths for each series/scan... ...from the dictionary of functional files described in the data configuration (sublist) YAML file. Scan input (from inputnode) is an iterable. """ import nipype.interfaces.utility as util from CPAC.pipeline import nipype_pipeline_engine as pe wf = pe.Workflow(name=wf_name) inputnode = pe.Node( util.IdentityInterface( fields=["subject", "scan", "creds_path", "dl_dir"], mandatory_inputs=True ), name="inputnode", ) outputnode = pe.Node( util.IdentityInterface( fields=["subject", "rest", "scan", "scan_params", "phase_diff", "magnitude"] ), name="outputspec", ) # have this here for now because of the big change in the data # configuration format # (Not necessary with ingress - format does not comply) if not rpool.check_rpool("derivatives-dir"): check_scan = pe.Node( function.Function( input_names=["func_scan_dct", "scan"], output_names=[], function=check_func_scan, as_module=True, ), name="check_func_scan", ) check_scan.inputs.func_scan_dct = rest_dict wf.connect(inputnode, "scan", check_scan, "scan") # get the functional scan itself selectrest = pe.Node( function.Function( input_names=["scan", "rest_dict", "resource"], output_names=["file_path"], function=get_rest, as_module=True, ), name="selectrest", ) selectrest.inputs.rest_dict = rest_dict selectrest.inputs.resource = "scan" wf.connect(inputnode, "scan", selectrest, "scan") # check to see if it's on an Amazon AWS S3 bucket, and download it, if it # is - otherwise, just return the local file path check_s3_node = pe.Node( function.Function( input_names=["file_path", "creds_path", "dl_dir", "img_type"], output_names=["local_path"], function=check_for_s3, as_module=True, ), name="check_for_s3", ) wf.connect(selectrest, "file_path", check_s3_node, "file_path") wf.connect(inputnode, "creds_path", check_s3_node, "creds_path") wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir") check_s3_node.inputs.img_type = "func" wf.connect(inputnode, "subject", outputnode, "subject") wf.connect(check_s3_node, "local_path", outputnode, "rest") wf.connect(inputnode, "scan", outputnode, "scan") # scan parameters CSV select_scan_params = pe.Node( function.Function( input_names=["scan", "rest_dict", "resource"], output_names=["file_path"], function=get_rest, as_module=True, ), name="select_scan_params", ) select_scan_params.inputs.rest_dict = rest_dict select_scan_params.inputs.resource = "scan_parameters" wf.connect(inputnode, "scan", select_scan_params, "scan") # if the scan parameters file is on AWS S3, download it s3_scan_params = pe.Node( function.Function( input_names=["file_path", "creds_path", "dl_dir", "img_type"], output_names=["local_path"], function=check_for_s3, as_module=True, ), name="s3_scan_params", ) wf.connect(select_scan_params, "file_path", s3_scan_params, "file_path") wf.connect(inputnode, "creds_path", s3_scan_params, "creds_path") wf.connect(inputnode, "dl_dir", s3_scan_params, "dl_dir") wf.connect(s3_scan_params, "local_path", outputnode, "scan_params") return wf
[docs] def create_fmap_datasource(fmap_dct, wf_name="fmap_datasource"): """Return the field map files... ...from the dictionary of functional files described in the data configuration (sublist) YAML file. """ import nipype.interfaces.utility as util from CPAC.pipeline import nipype_pipeline_engine as pe wf = pe.Workflow(name=wf_name) inputnode = pe.Node( util.IdentityInterface( fields=["subject", "scan", "creds_path", "dl_dir"], mandatory_inputs=True ), name="inputnode", ) outputnode = pe.Node( util.IdentityInterface( fields=["subject", "rest", "scan", "scan_params", "phase_diff", "magnitude"] ), name="outputspec", ) selectrest = pe.Node( function.Function( input_names=["scan", "rest_dict", "resource"], output_names=["file_path"], function=get_rest, as_module=True, ), name="selectrest", ) selectrest.inputs.rest_dict = fmap_dct selectrest.inputs.resource = "scan" wf.connect(inputnode, "scan", selectrest, "scan") # check to see if it's on an Amazon AWS S3 bucket, and download it, if it # is - otherwise, just return the local file path check_s3_node = pe.Node( function.Function( input_names=["file_path", "creds_path", "dl_dir", "img_type"], output_names=["local_path"], function=check_for_s3, as_module=True, ), name="check_for_s3", ) wf.connect(selectrest, "file_path", check_s3_node, "file_path") wf.connect(inputnode, "creds_path", check_s3_node, "creds_path") wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir") check_s3_node.inputs.img_type = "other" wf.connect(inputnode, "subject", outputnode, "subject") wf.connect(check_s3_node, "local_path", outputnode, "rest") wf.connect(inputnode, "scan", outputnode, "scan") # scan parameters CSV select_scan_params = pe.Node( function.Function( input_names=["scan", "rest_dict", "resource"], output_names=["file_path"], function=get_rest, as_module=True, ), name="select_scan_params", ) select_scan_params.inputs.rest_dict = fmap_dct select_scan_params.inputs.resource = "scan_parameters" wf.connect(inputnode, "scan", select_scan_params, "scan") # if the scan parameters file is on AWS S3, download it s3_scan_params = pe.Node( function.Function( input_names=["file_path", "creds_path", "dl_dir", "img_type"], output_names=["local_path"], function=check_for_s3, as_module=True, ), name="s3_scan_params", ) wf.connect(select_scan_params, "file_path", s3_scan_params, "file_path") wf.connect(inputnode, "creds_path", s3_scan_params, "creds_path") wf.connect(inputnode, "dl_dir", s3_scan_params, "dl_dir") wf.connect(s3_scan_params, "local_path", outputnode, "scan_params") return wf
[docs] def get_fmap_phasediff_metadata(data_config_scan_params): """Return the scan parameters for a field map phasediff scan.""" if ( not isinstance(data_config_scan_params, dict) and ".json" in data_config_scan_params ): with open(data_config_scan_params, "r", encoding="utf-8") as _f: data_config_scan_params = json.load(_f) echo_time = None echo_time_one = None echo_time_two = None if "EchoTime" in data_config_scan_params: echo_time = data_config_scan_params.get("EchoTime") elif ( "EchoTime1" in data_config_scan_params and "EchoTime2" in data_config_scan_params ): echo_time_one = data_config_scan_params.get("EchoTime1") echo_time_two = data_config_scan_params.get("EchoTime2") dwell_time = data_config_scan_params.get("DwellTime") pe_direction = data_config_scan_params.get("PhaseEncodingDirection") total_readout = data_config_scan_params.get("TotalReadoutTime") return ( dwell_time, pe_direction, total_readout, echo_time, echo_time_one, echo_time_two, )
[docs] def calc_delta_te_and_asym_ratio( effective_echo_spacing: float, echo_times: list ) -> tuple[float, float]: """Calcluate ``deltaTE`` and ``ees_asym_ratio`` from given metadata. Parameters ---------- effective_echo_spacing : float EffectiveEchoSpacing from sidecar JSON echo_times : list Returns ------- deltaTE : float ees_asym_ratio : float """ if not isinstance(effective_echo_spacing, float): msg = ( "C-PAC could not find `EffectiveEchoSpacing` in " "either fmap or func sidecar JSON, but that field " "is required for PhaseDiff distortion correction." ) raise LookupError(msg) # convert into milliseconds if necessary # these values will/should never be more than 10ms if ( ((echo_times[0] * 1000) < 10) # noqa: PLR2004 and ((echo_times[1] * 1000) < 10) # noqa: PLR2004 ): echo_times[0] = echo_times[0] * 1000 echo_times[1] = echo_times[1] * 1000 deltaTE = abs(echo_times[0] - echo_times[1]) ees_asym_ratio = effective_echo_spacing / deltaTE return deltaTE, ees_asym_ratio
[docs] def gather_echo_times(echotime_1, echotime_2, echotime_3=None, echotime_4=None): """Gather the echo times from the field map data.""" echotime_list = [echotime_1, echotime_2, echotime_3, echotime_4] echotime_list = list(filter(lambda item: item is not None, echotime_list)) echotime_list = list(set(echotime_list)) if len(echotime_list) != 2: # noqa: PLR2004 msg = ( "\n[!] Something went wrong with the field map echo times - there should" f" be two distinct values.\n\nEcho Times:\n{echotime_list}\n" ) raise ValueError(msg) return echotime_list
[docs] def match_epi_fmaps( bold_pedir, epi_fmap_one, epi_fmap_params_one, epi_fmap_two=None, epi_fmap_params_two=None, ): """Match EPI field maps to the BOLD scan. Parse the field map files in the data configuration and determine which ones have the same and opposite phase-encoding directions as the BOLD scan in the current pipeline. Example - parse the files under the 'fmap' level, i.e. 'epi_AP': anat: /path/to/T1w.nii.gz fmap: epi_AP: scan: /path/to/field-map.nii.gz scan_parameters: <config dictionary containing phase-encoding direction> func: rest_1: scan: /path/to/bold.nii.gz scan_parameters: <config dictionary of BOLD scan parameters> 1. Check PhaseEncodingDirection field in the metadata for the BOLD. 2. Check whether there are one or two EPI's in the field map data. 3. Grab the one or two EPI field maps. """ fmap_dct = {epi_fmap_one: epi_fmap_params_one} if epi_fmap_two and epi_fmap_params_two: fmap_dct[epi_fmap_two] = epi_fmap_params_two opposite_pe_epi = None same_pe_epi = None for epi_scan in fmap_dct.keys(): scan_params = fmap_dct[epi_scan] if not isinstance(scan_params, dict) and ".json" in scan_params: with open(scan_params, "r") as f: scan_params = json.load(f) if "PhaseEncodingDirection" in scan_params: epi_pedir = scan_params["PhaseEncodingDirection"] if epi_pedir == bold_pedir: same_pe_epi = epi_scan elif epi_pedir[0] == bold_pedir[0]: opposite_pe_epi = epi_scan return (opposite_pe_epi, same_pe_epi)
[docs] def ingress_func_metadata( wf, cfg, rpool, sub_dict, subject_id, input_creds_path, unique_id=None, num_strat=None, ): """Ingress metadata for functional scans.""" name_suffix = "" for suffix_part in (unique_id, num_strat): if suffix_part is not None: name_suffix += f"_{suffix_part}" # Grab field maps diff = False blip = False fmap_rp_list = [] fmap_TE_list = [] if "fmap" in sub_dict: second = False for orig_key in sub_dict["fmap"]: gather_fmap = create_fmap_datasource( sub_dict["fmap"], f"fmap_gather_{orig_key}_{subject_id}" ) gather_fmap.inputs.inputnode.set( subject=subject_id, creds_path=input_creds_path, dl_dir=cfg.pipeline_setup["working_directory"]["path"], ) gather_fmap.inputs.inputnode.scan = orig_key key = orig_key if "epi" in key and not second: key = "epi-1" second = True elif "epi" in key and second: key = "epi-2" rpool.set_data(key, gather_fmap, "outputspec.rest", {}, "", "fmap_ingress") rpool.set_data( f"{key}-scan-params", gather_fmap, "outputspec.scan_params", {}, "", "fmap_params_ingress", ) fmap_rp_list.append(key) get_fmap_metadata_imports = ["import json"] get_fmap_metadata = pe.Node( Function( input_names=["data_config_scan_params"], output_names=[ "dwell_time", "pe_direction", "total_readout", "echo_time", "echo_time_one", "echo_time_two", ], function=get_fmap_phasediff_metadata, imports=get_fmap_metadata_imports, ), name=f"{key}_get_metadata{name_suffix}", ) wf.connect( gather_fmap, "outputspec.scan_params", get_fmap_metadata, "data_config_scan_params", ) if "phase" in key: # leave it open to all three options, in case there is a # phasediff image with either a single EchoTime field (which # usually matches one of the magnitude EchoTimes), OR # a phasediff with an EchoTime1 and EchoTime2 # at least one of these rpool keys will have a None value, # which will be sorted out in gather_echo_times below rpool.set_data( f"{key}-TE", get_fmap_metadata, "echo_time", {}, "", "fmap_TE_ingress", ) fmap_TE_list.append(f"{key}-TE") rpool.set_data( f"{key}-TE1", get_fmap_metadata, "echo_time_one", {}, "", "fmap_TE1_ingress", ) fmap_TE_list.append(f"{key}-TE1") rpool.set_data( f"{key}-TE2", get_fmap_metadata, "echo_time_two", {}, "", "fmap_TE2_ingress", ) fmap_TE_list.append(f"{key}-TE2") elif "magnitude" in key: rpool.set_data( f"{key}-TE", get_fmap_metadata, "echo_time", {}, "", "fmap_TE_ingress", ) fmap_TE_list.append(f"{key}-TE") rpool.set_data( f"{key}-dwell", get_fmap_metadata, "dwell_time", {}, "", "fmap_dwell_ingress", ) rpool.set_data( f"{key}-pedir", get_fmap_metadata, "pe_direction", {}, "", "fmap_pedir_ingress", ) rpool.set_data( f"{key}-total-readout", get_fmap_metadata, "total_readout", {}, "", "fmap_readout_ingress", ) if "phase" in key or "mag" in key: diff = True if re.match("epi_[AP]{2}", orig_key): blip = True if diff: calc_delta_ratio = pe.Node( Function( input_names=["effective_echo_spacing", "echo_times"], output_names=["deltaTE", "ees_asym_ratio"], function=calc_delta_te_and_asym_ratio, imports=["from typing import Optional"], ), name=f"diff_distcor_calc_delta{name_suffix}", ) gather_echoes = pe.Node( Function( input_names=[ "echotime_1", "echotime_2", "echotime_3", "echotime_4", ], output_names=["echotime_list"], function=gather_echo_times, ), name="fugue_gather_echo_times", ) for idx, fmap_file in enumerate(fmap_TE_list, start=1): try: node, out_file = rpool.get(fmap_file)[ f"['{fmap_file}:fmap_TE_ingress']" ]["data"] wf.connect(node, out_file, gather_echoes, f"echotime_{idx}") except KeyError: pass wf.connect(gather_echoes, "echotime_list", calc_delta_ratio, "echo_times") # Add in nodes to get parameters from configuration file # a node which checks if scan_parameters are present for each scan scan_params = pe.Node( Function( input_names=[ "data_config_scan_params", "subject_id", "scan", "pipeconfig_tr", "pipeconfig_tpattern", "pipeconfig_start_indx", "pipeconfig_stop_indx", ], output_names=[ "tr", "tpattern", "template", "ref_slice", "start_indx", "stop_indx", "pe_direction", "effective_echo_spacing", ], function=get_scan_params, imports=["from CPAC.utils.utils import check, try_fetch_parameter"], ), name=f"bold_scan_params_{subject_id}{name_suffix}", ) scan_params.inputs.subject_id = subject_id scan_params.inputs.set( pipeconfig_start_indx=cfg.functional_preproc["truncation"]["start_tr"], pipeconfig_stop_indx=cfg.functional_preproc["truncation"]["stop_tr"], ) node, out = rpool.get("scan")["['scan:func_ingress']"]["data"] wf.connect(node, out, scan_params, "scan") # Workaround for extracting metadata with ingress if rpool.check_rpool("derivatives-dir"): selectrest_json = pe.Node( function.Function( input_names=["scan", "rest_dict", "resource"], output_names=["file_path"], function=get_rest, as_module=True, ), name="selectrest_json", ) selectrest_json.inputs.rest_dict = sub_dict selectrest_json.inputs.resource = "scan_parameters" wf.connect(node, out, selectrest_json, "scan") wf.connect(selectrest_json, "file_path", scan_params, "data_config_scan_params") else: # wire in the scan parameter workflow node, out = rpool.get("scan-params")["['scan-params:scan_params_ingress']"][ "data" ] wf.connect(node, out, scan_params, "data_config_scan_params") rpool.set_data("TR", scan_params, "tr", {}, "", "func_metadata_ingress") rpool.set_data("tpattern", scan_params, "tpattern", {}, "", "func_metadata_ingress") rpool.set_data("template", scan_params, "template", {}, "", "func_metadata_ingress") rpool.set_data( "start-tr", scan_params, "start_indx", {}, "", "func_metadata_ingress" ) rpool.set_data("stop-tr", scan_params, "stop_indx", {}, "", "func_metadata_ingress") rpool.set_data( "pe-direction", scan_params, "pe_direction", {}, "", "func_metadata_ingress" ) if diff: # Connect EffectiveEchoSpacing from functional metadata rpool.set_data( "effectiveEchoSpacing", scan_params, "effective_echo_spacing", {}, "", "func_metadata_ingress", ) node, out_file = rpool.get("effectiveEchoSpacing")[ "['effectiveEchoSpacing:func_metadata_ingress']" ]["data"] wf.connect(node, out_file, calc_delta_ratio, "effective_echo_spacing") rpool.set_data( "deltaTE", calc_delta_ratio, "deltaTE", {}, "", "deltaTE_ingress" ) rpool.set_data( "ees-asym-ratio", calc_delta_ratio, "ees_asym_ratio", {}, "", "ees_asym_ratio_ingress", ) return wf, rpool, diff, blip, fmap_rp_list
[docs] def create_general_datasource(wf_name): """Create a general-purpose datasource node.""" import nipype.interfaces.utility as util from CPAC.pipeline import nipype_pipeline_engine as pe wf = pe.Workflow(name=wf_name) inputnode = pe.Node( util.IdentityInterface( fields=["unique_id", "data", "scan", "creds_path", "dl_dir"], mandatory_inputs=True, ), name="inputnode", ) check_s3_node = pe.Node( function.Function( input_names=["file_path", "creds_path", "dl_dir", "img_type"], output_names=["local_path"], function=check_for_s3, as_module=True, ), name="check_for_s3", ) check_s3_node.inputs.img_type = "other" wf.connect(inputnode, "data", check_s3_node, "file_path") wf.connect(inputnode, "creds_path", check_s3_node, "creds_path") wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir") outputnode = pe.Node( util.IdentityInterface(fields=["unique_id", "data", "scan"]), name="outputspec" ) wf.connect(inputnode, "unique_id", outputnode, "unique_id") wf.connect(inputnode, "scan", outputnode, "scan") wf.connect(check_s3_node, "local_path", outputnode, "data") return wf
[docs] def create_check_for_s3_node( name, file_path, img_type="other", creds_path=None, dl_dir=None, map_node=False ): """Create a node to check if a file is on S3.""" if map_node: check_s3_node = pe.MapNode( function.Function( input_names=["file_path", "creds_path", "dl_dir", "img_type"], output_names=["local_path"], function=check_for_s3, as_module=True, ), iterfield=["file_path"], name=f"check_for_s3_{name}", ) else: check_s3_node = pe.Node( function.Function( input_names=["file_path", "creds_path", "dl_dir", "img_type"], output_names=["local_path"], function=check_for_s3, as_module=True, ), name=f"check_for_s3_{name}", ) check_s3_node.inputs.set( file_path=file_path, creds_path=creds_path, dl_dir=dl_dir, img_type=img_type ) return check_s3_node
[docs] def check_for_s3( file_path, creds_path=None, dl_dir=None, img_type="other", verbose=False ): """Check if passed-in file is on S3.""" # Import packages import os import botocore.exceptions import nibabel as nib from indi_aws import fetch_creds # Init variables s3_str = "s3://" if creds_path: if "None" in creds_path or "none" in creds_path or "null" in creds_path: creds_path = None if dl_dir is None: dl_dir = os.getcwd() if file_path is None: # in case it's something like scan parameters or field map files, but # we don't have any return None # TODO: remove this once scan parameter input as dictionary is phased out if isinstance(file_path, dict): # if this is a dictionary, just skip altogether return file_path if file_path.lower().startswith(s3_str): file_path = s3_str + file_path[len(s3_str) :] # Get bucket name and bucket object bucket_name = file_path[len(s3_str) :].split("/")[0] # Extract relative key path from bucket and local path s3_prefix = s3_str + bucket_name s3_key = file_path[len(s3_prefix) + 1 :] local_path = os.path.join(dl_dir, bucket_name, s3_key) # Get local directory and create folders if they dont exist local_dir = os.path.dirname(local_path) if not os.path.exists(local_dir): os.makedirs(local_dir, exist_ok=True) if os.path.exists(local_path): FMLOGGER.info("%s already exists- skipping download.", local_path) else: # Download file try: bucket = fetch_creds.return_bucket(creds_path, bucket_name) FMLOGGER.info("Attempting to download from AWS S3: %s", file_path) bucket.download_file(Key=s3_key, Filename=local_path) except botocore.exceptions.ClientError as exc: error_code = int(exc.response["Error"]["Code"]) err_msg = str(exc) if error_code == 403: # noqa: PLR2004 err_msg = ( f'Access to bucket: "{bucket_name}" is denied; using' f' credentials in subject list: "{creds_path}"; cannot access' f' the file "{file_path}"' ) error_type = PermissionError elif error_code == 404: # noqa: PLR2004 err_msg = ( f"File: {os.path.join(bucket_name, s3_key)} does not exist;" " check spelling and try again" ) error_type = FileNotFoundError else: err_msg = ( f'Unable to connect to bucket: "{bucket_name}". Error message:' f"\n{exc}" ) error_type = ConnectionError raise error_type(err_msg) except Exception as exc: err_msg = ( f'Unable to connect to bucket: "{bucket_name}". Error message:' f"\n{exc}" ) raise ConnectionError(err_msg) # Otherwise just return what was passed in, resolving if a link else: local_path = os.path.realpath(file_path) # Check if it exists or it is successfully downloaded if not os.path.exists(local_path): # alert users to 2020-07-20 Neuroparc atlas update (v0 to v1) ndmg_atlases = {} with open( os.path.join( os.path.dirname(os.path.dirname(__file__)), "resources/templates/ndmg_atlases.csv", ) ) as ndmg_atlases_file: ndmg_atlases["v0"], ndmg_atlases["v1"] = zip( *[ ( f"/ndmg_atlases/label/Human/{atlas[0]}", f"/ndmg_atlases/label/Human/{atlas[1]}", ) for atlas in csv.reader(ndmg_atlases_file) ] ) if local_path in ndmg_atlases["v0"]: from CPAC.utils.docs import DOCS_URL_PREFIX msg = ( "Neuroparc atlas paths were updated on July 20, 2020. C-PAC" " configuration files using Neuroparc v0 atlas paths (including C-PAC" " default and preconfigured pipeline configurations from v1.6.2a and" " earlier) need to be updated to use Neuroparc atlases. Your current" f" configuration includes the Neuroparc v0 path {local_path} which" " needs to be updated to" f" {ndmg_atlases['v1'][ndmg_atlases['v0'].index(local_path)]}. For a" f" full list such paths, see {DOCS_URL_PREFIX}/user/ndmg_atlases" ) else: msg = f"File {local_path} does not exist!" raise FileNotFoundError(msg) if verbose: FMLOGGER.info("Downloaded file:\n%s\n", local_path) # Check image dimensionality if local_path.endswith(".nii") or local_path.endswith(".nii.gz"): img_nii = nib.load(local_path) if img_type == "anat": if len(img_nii.shape) != 3: # noqa: PLR2004 msg = ( f"File: {local_path} must be an anatomical image with 3 " f"dimensions but {len(img_nii.shape)} dimensions found!" ) elif img_type == "func": if len(img_nii.shape) not in [3, 4]: msg = ( f"File: {local_path} must be a functional image with 3 or " f"4 dimensions but {len(img_nii.shape)} dimensions found!" ) raise IOError(msg) return local_path
[docs] def gather_extraction_maps(c): """Gather the timeseries and SCA analysis configurations.""" ts_analysis_dict = {} sca_analysis_dict = {} if hasattr(c, "timeseries_extraction"): tsa_roi_dict = c.timeseries_extraction["tse_roi_paths"] # Timeseries and SCA config selections processing # flip the dictionary for roi_path in tsa_roi_dict.keys(): ts_analysis_to_run = [x.strip() for x in tsa_roi_dict[roi_path].split(",")] for analysis_type in ts_analysis_to_run: if analysis_type not in ts_analysis_dict.keys(): ts_analysis_dict[analysis_type] = [] ts_analysis_dict[analysis_type].append(roi_path) if c.timeseries_extraction["run"]: if not tsa_roi_dict: err = ( "\n\n[!] CPAC says: Time Series Extraction is " "set to run, but no ROI NIFTI file paths were " "provided!\n\n" ) raise RequiredFieldInvalid(err) if c.seed_based_correlation_analysis["run"]: try: sca_roi_dict = c.seed_based_correlation_analysis["sca_roi_paths"] except KeyError: err = ( "\n\n[!] CPAC says: Seed-based Correlation Analysis " "is set to run, but no ROI NIFTI file paths were " "provided!\n\n" ) raise RequiredFieldInvalid(err) # flip the dictionary for roi_path in sca_roi_dict.keys(): # update analysis dict for _analysis_type in sca_roi_dict[roi_path].split(","): analysis_type = _analysis_type.replace(" ", "") if analysis_type not in sca_analysis_dict.keys(): sca_analysis_dict[analysis_type] = [] sca_analysis_dict[analysis_type].append(roi_path) return (ts_analysis_dict, sca_analysis_dict)
[docs] def get_highest_local_res(template: Path | str, tagname: str) -> Path: """Return the highest resolution of a template in the same local path. Given a reference template path and a resolution string, get all resolutions of that template in the same local path and return the highest resolution. Parameters ---------- template : Path or str tagname : str Returns ------- str Raises ------ LookupError If no matching local template is found. Examples -------- >>> get_highest_local_res( ... '/cpac_templates/MacaqueYerkes19_T1w_2mm_brain.nii.gz', '2mm') PosixPath('/cpac_templates/MacaqueYerkes19_T1w_0.5mm_brain.nii.gz') >>> get_highest_local_res( ... '/cpac_templates/dne_T1w_2mm.nii.gz', '2mm') Traceback (most recent call last): ... LookupError: Could not find template /cpac_templates/dne_T1w_2mm.nii.gz """ from CPAC.pipeline.schema import RESOLUTION_REGEX if isinstance(template, str): template = Path(template) template_pattern = ( RESOLUTION_REGEX.replace("^", "") .replace("$", "") .join([re.escape(_part) for _part in template.name.split(tagname, 1)]) ) matching_templates = [ file for file in template.parent.iterdir() if re.match(template_pattern, file.name) ] matching_templates.sort() try: return matching_templates[0] except (FileNotFoundError, IndexError): msg = f"Could not find template {template}" raise LookupError(msg)
[docs] def res_string_to_tuple(resolution): """Convert a resolution string to a tuple of floats. Parameters ---------- resolution : str Resolution string, e.g. "3.438mmx3.438mmx3.4mm" Returns ------- resolution :tuple Tuple of floats, e.g. (3.438, 3.438, 3.4) """ if "x" in str(resolution): return tuple(float(i.replace("mm", "")) for i in resolution.split("x")) return (float(resolution.replace("mm", "")),) * 3
[docs] def resolve_resolution(resolution, template, template_name, tag=None): """Resample a template to a given resolution.""" from nipype.interfaces import afni from CPAC.pipeline import nipype_pipeline_engine as pe from CPAC.utils.datasource import check_for_s3 tagname = None local_path = None if "{" in template and tag is not None: tagname = "${" + tag + "}" try: if tagname is not None: local_path = check_for_s3(template.replace(tagname, str(resolution))) except (IOError, OSError): local_path = None ## TODO debug - it works in ipython but doesn't work in nipype wf # try: # local_path = check_for_s3('/usr/local/fsl/data/standard/MNI152_T1_3.438mmx3.438mmx3.4mm_brain_mask_dil.nii.gz') # except (IOError, OSError): # local_path = None if local_path is None: if tagname is not None: if template.startswith("s3:"): ref_template = template.replace(tagname, "1mm") local_path = check_for_s3(ref_template) else: local_path = get_highest_local_res(template, tagname) elif tagname is None and template.startswith("s3:"): local_path = check_for_s3(template) else: local_path = template resample = pe.Node( interface=afni.Resample(), name=template_name, mem_gb=0, mem_x=(0.0115, "in_file", "t"), ) resample.inputs.voxel_size = res_string_to_tuple(resolution) resample.inputs.outputtype = "NIFTI_GZ" resample.inputs.resample_mode = "Cu" resample.inputs.in_file = local_path resample.base_dir = "." resampled_template = resample.run() local_path = resampled_template.outputs.out_file return local_path
[docs] def create_anat_datasource(wf_name="anat_datasource"): """Create a dataflow for anatomical images.""" import nipype.interfaces.utility as util from CPAC.pipeline import nipype_pipeline_engine as pe wf = pe.Workflow(name=wf_name) inputnode = pe.Node( util.IdentityInterface( fields=["subject", "anat", "creds_path", "dl_dir", "img_type"], mandatory_inputs=True, ), name="inputnode", ) check_s3_node = pe.Node( function.Function( input_names=["file_path", "creds_path", "dl_dir", "img_type"], output_names=["local_path"], function=check_for_s3, as_module=True, ), name="check_for_s3", ) wf.connect(inputnode, "anat", check_s3_node, "file_path") wf.connect(inputnode, "creds_path", check_s3_node, "creds_path") wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir") wf.connect(inputnode, "img_type", check_s3_node, "img_type") outputnode = pe.Node( util.IdentityInterface(fields=["subject", "anat"]), name="outputspec" ) wf.connect(inputnode, "subject", outputnode, "subject") wf.connect(check_s3_node, "local_path", outputnode, "anat") # Return the workflow return wf
[docs] def create_roi_mask_dataflow(masks, wf_name="datasource_roi_mask"): """Create a dataflow for ROI masks.""" import os mask_dict = {} for _mask_file in masks: mask_file = _mask_file.rstrip("\r\n") if mask_file.strip() == "" or mask_file.startswith("#"): continue name, desc = lookup_identifier(mask_file) if name == "template": base_file = os.path.basename(mask_file) try: valid_extensions = [".nii", ".nii.gz"] base_name = next( base_file[: -len(ext)] for ext in valid_extensions if base_file.endswith(ext) ) for key in ["res", "space"]: base_name = bids_remove_entity(base_name, key) except IndexError: # pylint: disable=raise-missing-from msg = ( "Error in spatial_map_dataflow: File " f'extension of {base_file} not ".nii" or ' ".nii.gz" ) raise ValueError(msg) except Exception as e: raise e else: base_name = format_identifier(name, desc) if base_name in mask_dict: msg = ( "Duplicate templates/atlases not allowed: " f"{mask_file} {mask_dict[base_name]}" ) raise ValueError(msg) mask_dict[base_name] = mask_file wf = pe.Workflow(name=wf_name) inputnode = pe.Node( util.IdentityInterface( fields=["mask", "mask_file", "creds_path", "dl_dir"], mandatory_inputs=True ), name="inputspec", ) mask_keys, mask_values = zip(*mask_dict.items()) inputnode.synchronize = True inputnode.iterables = [ ("mask", mask_keys), ("mask_file", mask_values), ] check_s3_node = pe.Node( function.Function( input_names=["file_path", "creds_path", "dl_dir", "img_type"], output_names=["local_path"], function=check_for_s3, as_module=True, ), name="check_for_s3", ) wf.connect(inputnode, "mask_file", check_s3_node, "file_path") wf.connect(inputnode, "creds_path", check_s3_node, "creds_path") wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir") check_s3_node.inputs.img_type = "mask" outputnode = pe.Node( util.IdentityInterface(fields=["out_file", "out_name"]), name="outputspec" ) wf.connect(check_s3_node, "local_path", outputnode, "out_file") wf.connect(inputnode, "mask", outputnode, "out_name") return wf
[docs] def create_spatial_map_dataflow(spatial_maps, wf_name="datasource_maps"): """Create a dataflow for spatial maps.""" import os wf = pe.Workflow(name=wf_name) spatial_map_dict = {} for _spatial_map_file in spatial_maps: spatial_map_file = _spatial_map_file.rstrip("\r\n") base_file = os.path.basename(spatial_map_file) try: valid_extensions = [".nii", ".nii.gz"] base_name = next( base_file[: -len(ext)] for ext in valid_extensions if base_file.endswith(ext) ) if base_name in spatial_map_dict: msg = ( f"Files with same name not allowed: {spatial_map_file}" f" {spatial_map_dict[base_name]}" ) raise ValueError(msg) spatial_map_dict[base_name] = spatial_map_file except IndexError: msg = ( "Error in spatial_map_dataflow: File extension not in .nii and .nii.gz" ) raise ValueError(msg) inputnode = pe.Node( util.IdentityInterface( fields=["spatial_map", "spatial_map_file", "creds_path", "dl_dir"], mandatory_inputs=True, ), name="inputspec", ) spatial_map_keys, spatial_map_values = zip(*spatial_map_dict.items()) inputnode.synchronize = True inputnode.iterables = [ ("spatial_map", spatial_map_keys), ("spatial_map_file", spatial_map_values), ] check_s3_node = pe.Node( function.Function( input_names=["file_path", "creds_path", "dl_dir", "img_type"], output_names=["local_path"], function=check_for_s3, as_module=True, ), name="check_for_s3", ) wf.connect(inputnode, "spatial_map_file", check_s3_node, "file_path") wf.connect(inputnode, "creds_path", check_s3_node, "creds_path") wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir") check_s3_node.inputs.img_type = "mask" select_spatial_map = pe.Node( util.IdentityInterface(fields=["out_file", "out_name"], mandatory_inputs=True), name="select_spatial_map", ) wf.connect(check_s3_node, "local_path", select_spatial_map, "out_file") wf.connect(inputnode, "spatial_map", select_spatial_map, "out_name") return wf
[docs] def create_grp_analysis_dataflow(wf_name="gp_dataflow"): """Create a dataflow for group analysis.""" import nipype.interfaces.utility as util from CPAC.pipeline import nipype_pipeline_engine as pe from CPAC.utils.datasource import select_model_files wf = pe.Workflow(name=wf_name) inputnode = pe.Node( util.IdentityInterface( fields=["ftest", "grp_model", "model_name"], mandatory_inputs=True ), name="inputspec", ) selectmodel = pe.Node( function.Function( input_names=["model", "ftest", "model_name"], output_names=["fts_file", "con_file", "grp_file", "mat_file"], function=select_model_files, as_module=True, ), name="selectnode", ) wf.connect(inputnode, "ftest", selectmodel, "ftest") wf.connect(inputnode, "grp_model", selectmodel, "model") wf.connect(inputnode, "model_name", selectmodel, "model_name") outputnode = pe.Node( util.IdentityInterface( fields=["fts", "grp", "mat", "con"], mandatory_inputs=True ), name="outputspec", ) wf.connect(selectmodel, "mat_file", outputnode, "mat") wf.connect(selectmodel, "grp_file", outputnode, "grp") wf.connect(selectmodel, "fts_file", outputnode, "fts") wf.connect(selectmodel, "con_file", outputnode, "con") return wf
[docs] def resample_func_roi(in_func, in_roi, realignment, identity_matrix): """Resample functional image to ROI or ROI to functional image using flirt.""" import os import nibabel as nib from CPAC.utils.monitoring.custom_logging import log_subprocess # load func and ROI dimension func_img = nib.load(in_func) func_shape = func_img.shape roi_img = nib.load(in_roi) roi_shape = roi_img.shape # check if func size = ROI size, return func and ROI; else resample using flirt if roi_shape != func_shape: # resample func to ROI: in_file = func, reference = ROI if "func_to_ROI" in realignment: in_file = in_func reference = in_roi out_file = os.path.join( os.getcwd(), in_file[in_file.rindex("/") + 1 : in_file.rindex(".nii")] + "_resampled.nii.gz", ) out_func = out_file out_roi = in_roi interp = "trilinear" # resample ROI to func: in_file = ROI, reference = func elif "ROI_to_func" in realignment: in_file = in_roi reference = in_func out_file = os.path.join( os.getcwd(), in_file[in_file.rindex("/") + 1 : in_file.rindex(".nii")] + "_resampled.nii.gz", ) out_func = in_func out_roi = out_file interp = "nearestneighbour" cmd = [ "flirt", "-in", in_file, "-ref", reference, "-out", out_file, "-interp", interp, "-applyxfm", "-init", identity_matrix, ] log_subprocess(cmd) else: out_func = in_func out_roi = in_roi return out_func, out_roi