Source code for CPAC.utils.datasource

# Copyright (C) 2012-2023  C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
import csv
import json
import re
from pathlib import Path
from typing import Union
from nipype import logging
from nipype.interfaces import utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.resources.templates.lookup_table import format_identifier, \
                                                  lookup_identifier
from CPAC.utils import function
from CPAC.utils.bids_utils import bids_remove_entity
from CPAC.utils.interfaces.function import Function
from CPAC.utils.typing import TUPLE
from CPAC.utils.utils import get_scan_params

logger = logging.getLogger('nipype.workflow')


[docs]def bidsier_prefix(unique_id): """ Function to return a BIDSier prefix for a given unique_id Parameters ---------- unique_id : str Returns ------- prefix : str Examples -------- >>> bidsier_prefix('01_1') 'sub-01_ses-1' >>> bidsier_prefix('sub-01_ses-1') 'sub-01_ses-1' >>> bidsier_prefix('sub-01_1') 'sub-01_ses-1' >>> bidsier_prefix('01_ses-1') 'sub-01_ses-1' """ keys = ['sub', 'ses'] components = unique_id.split('_') for i, component in enumerate(components): if i < len(keys): if not component.startswith(keys[i]): components[i] = '-'.join([keys[i], component]) return '_'.join(components)
[docs]def get_rest(scan, rest_dict, resource="scan"): """Return the file path of the chosen resource stored in the functional file dictionary, if it exists. scan: the scan/series name or label rest_dict: the dictionary read in from the data configuration YAML file (sublist) nested under 'func:' resource: the dictionary key scan - the functional timeseries scan_parameters - path to the scan parameters JSON file, or a dictionary containing scan parameters information (to be phased out in the future) """ try: file_path = rest_dict[scan][resource] except KeyError: file_path = None return file_path
[docs]def extract_scan_params_dct(scan_params_dct): return scan_params_dct
[docs]def get_map(map, map_dct): # return the spatial map required return map_dct[map]
[docs]def select_model_files(model, ftest, model_name): """ Method to select model files """ import os import glob files = glob.glob(os.path.join(model, '*')) if len(files) == 0: raise Exception("No files found inside directory %s" % model) fts_file = '' for filename in files: if (model_name + '.mat') in filename: mat_file = filename elif (model_name + '.grp') in filename: grp_file = filename elif ((model_name + '.fts') in filename) and ftest: fts_file = filename elif (model_name + '.con') in filename: con_file = filename if ftest == True and fts_file == '': errmsg = "\n[!] CPAC says: You have f-tests included in your group " \ "analysis model '%s', but no .fts files were found in the " \ "output folder specified for group analysis: %s.\n\nThe " \ ".fts file is automatically generated by CPAC, and if you " \ "are seeing this error, it is because something went wrong " \ "with the generation of this file, or it has been moved." \ "\n\n" % (model_name, model) raise Exception(errmsg) return fts_file, con_file, grp_file, mat_file
[docs]def check_func_scan(func_scan_dct, scan): """Run some checks on the functional timeseries-related files for a given series/scan name or label.""" scan_resources = func_scan_dct[scan] try: scan_resources.keys() except AttributeError: err = "\n[!] The data configuration file you provided is " \ "missing a level under the 'func:' key. CPAC versions " \ "1.2 and later use data configurations with an " \ "additional level of nesting.\n\nExample\nfunc:\n " \ "rest01:\n scan: /path/to/rest01_func.nii.gz\n" \ " scan parameters: /path/to/scan_params.json\n\n" \ "See the User Guide for more information.\n\n" raise Exception(err) # actual 4D time series file if "scan" not in scan_resources.keys(): err = "\n\n[!] The {0} scan is missing its actual time-series " \ "scan file, which should be a filepath labeled with the " \ "'scan' key.\n\n".format(scan) raise Exception(err) # Nipype restriction (may have changed) if '.' in scan or '+' in scan or '*' in scan: raise Exception('\n\n[!] Scan names cannot contain any special ' 'characters (., +, *, etc.). Please update this ' 'and try again.\n\nScan: {0}' '\n\n'.format(scan))
[docs]def create_func_datasource(rest_dict, rpool, wf_name='func_datasource'): """Return the functional timeseries-related file paths for each series/scan, from the dictionary of functional files described in the data configuration (sublist) YAML file. Scan input (from inputnode) is an iterable. """ from CPAC.pipeline import nipype_pipeline_engine as pe import nipype.interfaces.utility as util wf = pe.Workflow(name=wf_name) inputnode = pe.Node(util.IdentityInterface( fields=['subject', 'scan', 'creds_path', 'dl_dir'], mandatory_inputs=True), name='inputnode') outputnode = pe.Node(util.IdentityInterface(fields=['subject', 'rest', 'scan', 'scan_params', 'phase_diff', 'magnitude']), name='outputspec') # have this here for now because of the big change in the data # configuration format # (Not necessary with ingress - format does not comply) if not rpool.check_rpool('derivatives-dir'): check_scan = pe.Node(function.Function(input_names=['func_scan_dct', 'scan'], output_names=[], function=check_func_scan, as_module=True), name='check_func_scan') check_scan.inputs.func_scan_dct = rest_dict wf.connect(inputnode, 'scan', check_scan, 'scan') # get the functional scan itself selectrest = pe.Node(function.Function(input_names=['scan', 'rest_dict', 'resource'], output_names=['file_path'], function=get_rest, as_module=True), name='selectrest') selectrest.inputs.rest_dict = rest_dict selectrest.inputs.resource = "scan" wf.connect(inputnode, 'scan', selectrest, 'scan') # check to see if it's on an Amazon AWS S3 bucket, and download it, if it # is - otherwise, just return the local file path check_s3_node = pe.Node(function.Function(input_names=['file_path', 'creds_path', 'dl_dir', 'img_type'], output_names=['local_path'], function=check_for_s3, as_module=True), name='check_for_s3') wf.connect(selectrest, 'file_path', check_s3_node, 'file_path') wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path') wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') check_s3_node.inputs.img_type = 'func' wf.connect(inputnode, 'subject', outputnode, 'subject') wf.connect(check_s3_node, 'local_path', outputnode, 'rest') wf.connect(inputnode, 'scan', outputnode, 'scan') # scan parameters CSV select_scan_params = pe.Node(function.Function(input_names=['scan', 'rest_dict', 'resource'], output_names=['file_path'], function=get_rest, as_module=True), name='select_scan_params') select_scan_params.inputs.rest_dict = rest_dict select_scan_params.inputs.resource = "scan_parameters" wf.connect(inputnode, 'scan', select_scan_params, 'scan') # if the scan parameters file is on AWS S3, download it s3_scan_params = pe.Node(function.Function(input_names=['file_path', 'creds_path', 'dl_dir', 'img_type'], output_names=['local_path'], function=check_for_s3, as_module=True), name='s3_scan_params') wf.connect(select_scan_params, 'file_path', s3_scan_params, 'file_path') wf.connect(inputnode, 'creds_path', s3_scan_params, 'creds_path') wf.connect(inputnode, 'dl_dir', s3_scan_params, 'dl_dir') wf.connect(s3_scan_params, 'local_path', outputnode, 'scan_params') return wf
[docs]def create_fmap_datasource(fmap_dct, wf_name='fmap_datasource'): """Return the field map files, from the dictionary of functional files described in the data configuration (sublist) YAML file. """ from CPAC.pipeline import nipype_pipeline_engine as pe import nipype.interfaces.utility as util wf = pe.Workflow(name=wf_name) inputnode = pe.Node(util.IdentityInterface( fields=['subject', 'scan', 'creds_path', 'dl_dir'], mandatory_inputs=True), name='inputnode') outputnode = pe.Node(util.IdentityInterface(fields=['subject', 'rest', 'scan', 'scan_params', 'phase_diff', 'magnitude']), name='outputspec') selectrest = pe.Node(function.Function(input_names=['scan', 'rest_dict', 'resource'], output_names=['file_path'], function=get_rest, as_module=True), name='selectrest') selectrest.inputs.rest_dict = fmap_dct selectrest.inputs.resource = "scan" wf.connect(inputnode, 'scan', selectrest, 'scan') # check to see if it's on an Amazon AWS S3 bucket, and download it, if it # is - otherwise, just return the local file path check_s3_node = pe.Node(function.Function(input_names=['file_path', 'creds_path', 'dl_dir', 'img_type'], output_names=['local_path'], function=check_for_s3, as_module=True), name='check_for_s3') wf.connect(selectrest, 'file_path', check_s3_node, 'file_path') wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path') wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') check_s3_node.inputs.img_type = 'other' wf.connect(inputnode, 'subject', outputnode, 'subject') wf.connect(check_s3_node, 'local_path', outputnode, 'rest') wf.connect(inputnode, 'scan', outputnode, 'scan') # scan parameters CSV select_scan_params = pe.Node(function.Function(input_names=['scan', 'rest_dict', 'resource'], output_names=['file_path'], function=get_rest, as_module=True), name='select_scan_params') select_scan_params.inputs.rest_dict = fmap_dct select_scan_params.inputs.resource = "scan_parameters" wf.connect(inputnode, 'scan', select_scan_params, 'scan') # if the scan parameters file is on AWS S3, download it s3_scan_params = pe.Node(function.Function(input_names=['file_path', 'creds_path', 'dl_dir', 'img_type'], output_names=['local_path'], function=check_for_s3, as_module=True), name='s3_scan_params') wf.connect(select_scan_params, 'file_path', s3_scan_params, 'file_path') wf.connect(inputnode, 'creds_path', s3_scan_params, 'creds_path') wf.connect(inputnode, 'dl_dir', s3_scan_params, 'dl_dir') wf.connect(s3_scan_params, 'local_path', outputnode, 'scan_params') return wf
[docs]def get_fmap_phasediff_metadata(data_config_scan_params): if (not isinstance(data_config_scan_params, dict) and ".json" in data_config_scan_params): with open(data_config_scan_params, 'r', encoding='utf-8') as _f: data_config_scan_params = json.load(_f) echo_time = None echo_time_one = None echo_time_two = None if "EchoTime" in data_config_scan_params: echo_time = data_config_scan_params.get("EchoTime") elif "EchoTime1" in data_config_scan_params and "EchoTime2" \ in data_config_scan_params: echo_time_one = data_config_scan_params.get("EchoTime1") echo_time_two = data_config_scan_params.get("EchoTime2") dwell_time = data_config_scan_params.get("DwellTime") pe_direction = data_config_scan_params.get("PhaseEncodingDirection") total_readout = data_config_scan_params.get("TotalReadoutTime") return (dwell_time, pe_direction, total_readout, echo_time, echo_time_one, echo_time_two)
[docs]@Function.sig_imports(['from CPAC.utils.typing import TUPLE']) def calc_delta_te_and_asym_ratio(effective_echo_spacing: float, echo_times: list) -> TUPLE[float, float]: """Calcluate ``deltaTE`` and ``ees_asym_ratio`` from given metadata Parameters ---------- effective_echo_spacing : float EffectiveEchoSpacing from sidecar JSON echo_times : list Returns ------- deltaTE : float ees_asym_ratio : float """ if not isinstance(effective_echo_spacing, float): raise LookupError('C-PAC could not find `EffectiveEchoSpacing` in ' 'either fmap or func sidecar JSON, but that field ' 'is required for PhaseDiff distortion correction.') # convert into milliseconds if necessary # these values will/should never be more than 10ms if ((echo_times[0] * 1000) < 10) and ((echo_times[1] * 1000) < 10): echo_times[0] = echo_times[0] * 1000 echo_times[1] = echo_times[1] * 1000 deltaTE = abs(echo_times[0] - echo_times[1]) ees_asym_ratio = (effective_echo_spacing / deltaTE) return deltaTE, ees_asym_ratio
[docs]def gather_echo_times(echotime_1, echotime_2, echotime_3=None, echotime_4=None): echotime_list = [echotime_1, echotime_2, echotime_3, echotime_4] echotime_list = list(filter(lambda item: item is not None, echotime_list)) echotime_list = list(set(echotime_list)) if len(echotime_list) != 2: raise Exception("\n[!] Something went wrong with the field map echo " "times - there should be two distinct values.\n\n" f"Echo Times:\n{echotime_list}\n") return echotime_list
[docs]def match_epi_fmaps(bold_pedir, epi_fmap_one, epi_fmap_params_one, epi_fmap_two=None, epi_fmap_params_two=None): """Parse the field map files in the data configuration and determine which ones have the same and opposite phase-encoding directions as the BOLD scan in the current pipeline. Example - parse the files under the 'fmap' level, i.e. 'epi_AP': anat: /path/to/T1w.nii.gz fmap: epi_AP: scan: /path/to/field-map.nii.gz scan_parameters: <config dictionary containing phase-encoding direction> func: rest_1: scan: /path/to/bold.nii.gz scan_parameters: <config dictionary of BOLD scan parameters> 1. Check PhaseEncodingDirection field in the metadata for the BOLD. 2. Check whether there are one or two EPI's in the field map data. 3. Grab the one or two EPI field maps. """ fmap_dct = {epi_fmap_one: epi_fmap_params_one} if epi_fmap_two and epi_fmap_params_two: fmap_dct[epi_fmap_two] = epi_fmap_params_two opposite_pe_epi = None same_pe_epi = None for epi_scan in fmap_dct.keys(): scan_params = fmap_dct[epi_scan] if not isinstance(scan_params, dict) and ".json" in scan_params: with open(scan_params, 'r') as f: scan_params = json.load(f) if "PhaseEncodingDirection" in scan_params: epi_pedir = scan_params["PhaseEncodingDirection"] if epi_pedir == bold_pedir: same_pe_epi = epi_scan elif epi_pedir[0] == bold_pedir[0]: opposite_pe_epi = epi_scan return (opposite_pe_epi, same_pe_epi)
[docs]def ingress_func_metadata(wf, cfg, rpool, sub_dict, subject_id, input_creds_path, unique_id=None, num_strat=None): name_suffix = '' for suffix_part in (unique_id, num_strat): if suffix_part is not None: name_suffix += f'_{suffix_part}' # Grab field maps diff = False blip = False fmap_rp_list = [] fmap_TE_list = [] if "fmap" in sub_dict: second = False for key in sub_dict["fmap"]: gather_fmap = create_fmap_datasource( sub_dict["fmap"], f"fmap_gather_{key}_{subject_id}") gather_fmap.inputs.inputnode.set( subject=subject_id, creds_path=input_creds_path, dl_dir=cfg.pipeline_setup['working_directory']['path']) gather_fmap.inputs.inputnode.scan = key orig_key = key if 'epi' in key and not second: key = 'epi-1' second = True elif 'epi' in key and second: key = 'epi-2' rpool.set_data(key, gather_fmap, 'outputspec.rest', {}, "", "fmap_ingress") rpool.set_data(f'{key}-scan-params', gather_fmap, 'outputspec.scan_params', {}, "", "fmap_params_ingress") fmap_rp_list.append(key) get_fmap_metadata_imports = ['import json'] get_fmap_metadata = pe.Node(Function( input_names=['data_config_scan_params'], output_names=['dwell_time', 'pe_direction', 'total_readout', 'echo_time', 'echo_time_one', 'echo_time_two'], function=get_fmap_phasediff_metadata, imports=get_fmap_metadata_imports), name=f'{key}_get_metadata{name_suffix}') wf.connect(gather_fmap, 'outputspec.scan_params', get_fmap_metadata, 'data_config_scan_params') if "phase" in key: # leave it open to all three options, in case there is a # phasediff image with either a single EchoTime field (which # usually matches one of the magnitude EchoTimes), OR # a phasediff with an EchoTime1 and EchoTime2 # at least one of these rpool keys will have a None value, # which will be sorted out in gather_echo_times below rpool.set_data(f'{key}-TE', get_fmap_metadata, 'echo_time', {}, "", "fmap_TE_ingress") fmap_TE_list.append(f"{key}-TE") rpool.set_data(f'{key}-TE1', get_fmap_metadata, 'echo_time_one', {}, "", "fmap_TE1_ingress") fmap_TE_list.append(f"{key}-TE1") rpool.set_data(f'{key}-TE2', get_fmap_metadata, 'echo_time_two', {}, "", "fmap_TE2_ingress") fmap_TE_list.append(f"{key}-TE2") elif "magnitude" in key: rpool.set_data(f'{key}-TE', get_fmap_metadata, 'echo_time', {}, "", "fmap_TE_ingress") fmap_TE_list.append(f"{key}-TE") rpool.set_data(f'{key}-dwell', get_fmap_metadata, 'dwell_time', {}, "", "fmap_dwell_ingress") rpool.set_data(f'{key}-pedir', get_fmap_metadata, 'pe_direction', {}, "", "fmap_pedir_ingress") rpool.set_data(f'{key}-total-readout', get_fmap_metadata, 'total_readout', {}, "", "fmap_readout_ingress") if 'phase' in key or 'mag' in key: diff = True if re.match('epi_[AP]{2}', orig_key): blip = True if diff: calc_delta_ratio = pe.Node(Function( input_names=['effective_echo_spacing', 'echo_times'], output_names=['deltaTE', 'ees_asym_ratio'], function=calc_delta_te_and_asym_ratio, imports=['from typing import Optional, Tuple']), name=f'diff_distcor_calc_delta{name_suffix}') gather_echoes = pe.Node(Function( input_names=['echotime_1', 'echotime_2', 'echotime_3', 'echotime_4'], output_names=['echotime_list'], function=gather_echo_times), name='fugue_gather_echo_times') for idx, fmap_file in enumerate(fmap_TE_list, start=1): try: node, out_file = rpool.get(fmap_file)[ f"['{fmap_file}:fmap_TE_ingress']"]['data'] wf.connect(node, out_file, gather_echoes, f'echotime_{idx}') except KeyError: pass wf.connect(gather_echoes, 'echotime_list', calc_delta_ratio, 'echo_times') # Add in nodes to get parameters from configuration file # a node which checks if scan_parameters are present for each scan scan_params = pe.Node(Function( input_names=['data_config_scan_params', 'subject_id', 'scan', 'pipeconfig_tr', 'pipeconfig_tpattern', 'pipeconfig_start_indx', 'pipeconfig_stop_indx'], output_names=['tr', 'tpattern', 'template', 'ref_slice', 'start_indx', 'stop_indx', 'pe_direction', 'effective_echo_spacing'], function=get_scan_params, imports=['from CPAC.utils.utils import check, try_fetch_parameter'] ), name=f"bold_scan_params_{subject_id}{name_suffix}") scan_params.inputs.subject_id = subject_id scan_params.inputs.set( pipeconfig_start_indx=cfg.functional_preproc['truncation'][ 'start_tr'], pipeconfig_stop_indx=cfg.functional_preproc['truncation']['stop_tr']) node, out = rpool.get('scan')["['scan:func_ingress']"]['data'] wf.connect(node, out, scan_params, 'scan') # Workaround for extracting metadata with ingress if rpool.check_rpool('derivatives-dir'): selectrest_json = pe.Node(function.Function(input_names=['scan', 'rest_dict', 'resource'], output_names=['file_path'], function=get_rest, as_module=True), name='selectrest_json') selectrest_json.inputs.rest_dict = sub_dict selectrest_json.inputs.resource = "scan_parameters" wf.connect(node, out, selectrest_json, 'scan') wf.connect(selectrest_json, 'file_path', scan_params, 'data_config_scan_params') else: # wire in the scan parameter workflow node, out = rpool.get('scan-params')[ "['scan-params:scan_params_ingress']"]['data'] wf.connect(node, out, scan_params, 'data_config_scan_params') rpool.set_data('TR', scan_params, 'tr', {}, "", "func_metadata_ingress") rpool.set_data('tpattern', scan_params, 'tpattern', {}, "", "func_metadata_ingress") rpool.set_data('template', scan_params, 'template', {}, "", "func_metadata_ingress") rpool.set_data('start-tr', scan_params, 'start_indx', {}, "", "func_metadata_ingress") rpool.set_data('stop-tr', scan_params, 'stop_indx', {}, "", "func_metadata_ingress") rpool.set_data('pe-direction', scan_params, 'pe_direction', {}, "", "func_metadata_ingress") if diff: # Connect EffectiveEchoSpacing from functional metadata rpool.set_data('effectiveEchoSpacing', scan_params, 'effective_echo_spacing', {}, '', 'func_metadata_ingress') node, out_file = rpool.get('effectiveEchoSpacing')[ "['effectiveEchoSpacing:func_metadata_ingress']"]['data'] wf.connect(node, out_file, calc_delta_ratio, 'effective_echo_spacing') rpool.set_data('deltaTE', calc_delta_ratio, 'deltaTE', {}, '', 'deltaTE_ingress') rpool.set_data('ees-asym-ratio', calc_delta_ratio, 'ees_asym_ratio', {}, '', 'ees_asym_ratio_ingress') return wf, rpool, diff, blip, fmap_rp_list
[docs]def create_general_datasource(wf_name): from CPAC.pipeline import nipype_pipeline_engine as pe import nipype.interfaces.utility as util wf = pe.Workflow(name=wf_name) inputnode = pe.Node(util.IdentityInterface( fields=['unique_id', 'data', 'scan', 'creds_path', 'dl_dir'], mandatory_inputs=True), name='inputnode') check_s3_node = pe.Node(function.Function(input_names=['file_path', 'creds_path', 'dl_dir', 'img_type'], output_names=['local_path'], function=check_for_s3, as_module=True), name='check_for_s3') check_s3_node.inputs.img_type = "other" wf.connect(inputnode, 'data', check_s3_node, 'file_path') wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path') wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') outputnode = pe.Node(util.IdentityInterface(fields=['unique_id', 'data', 'scan']), name='outputspec') wf.connect(inputnode, 'unique_id', outputnode, 'unique_id') wf.connect(inputnode, 'scan', outputnode, 'scan') wf.connect(check_s3_node, 'local_path', outputnode, 'data') return wf
[docs]def create_check_for_s3_node(name, file_path, img_type='other', creds_path=None, dl_dir=None, map_node=False): if map_node: check_s3_node = pe.MapNode(function.Function(input_names=['file_path', 'creds_path', 'dl_dir', 'img_type'], output_names=[ 'local_path'], function=check_for_s3, as_module=True), iterfield=['file_path'], name='check_for_s3_%s' % name) else: check_s3_node = pe.Node(function.Function(input_names=['file_path', 'creds_path', 'dl_dir', 'img_type'], output_names=['local_path'], function=check_for_s3, as_module=True), name='check_for_s3_%s' % name) check_s3_node.inputs.set( file_path=file_path, creds_path=creds_path, dl_dir=dl_dir, img_type=img_type ) return check_s3_node
# Check if passed-in file is on S3
[docs]def check_for_s3(file_path, creds_path=None, dl_dir=None, img_type='other', verbose=False): # Import packages import os import nibabel as nib import botocore.exceptions from indi_aws import fetch_creds # Init variables s3_str = 's3://' if creds_path: if "None" in creds_path or "none" in creds_path or \ "null" in creds_path: creds_path = None if dl_dir is None: dl_dir = os.getcwd() if file_path is None: # in case it's something like scan parameters or field map files, but # we don't have any return None # TODO: remove this once scan parameter input as dictionary is phased out if isinstance(file_path, dict): # if this is a dictionary, just skip altogether local_path = file_path return local_path if file_path.lower().startswith(s3_str): file_path = s3_str + file_path[len(s3_str):] # Get bucket name and bucket object bucket_name = file_path[len(s3_str):].split('/')[0] # Extract relative key path from bucket and local path s3_prefix = s3_str + bucket_name s3_key = file_path[len(s3_prefix) + 1:] local_path = os.path.join(dl_dir, bucket_name, s3_key) # Get local directory and create folders if they dont exist local_dir = os.path.dirname(local_path) if not os.path.exists(local_dir): os.makedirs(local_dir, exist_ok=True) if os.path.exists(local_path): print("{0} already exists- skipping download.".format(local_path)) else: # Download file try: bucket = fetch_creds.return_bucket(creds_path, bucket_name) print("Attempting to download from AWS S3: {0}".format( file_path)) bucket.download_file(Key=s3_key, Filename=local_path) except botocore.exceptions.ClientError as exc: error_code = int(exc.response['Error']['Code']) err_msg = str(exc) if error_code == 403: err_msg = 'Access to bucket: "%s" is denied; using credentials ' \ 'in subject list: "%s"; cannot access the file "%s"' \ % (bucket_name, creds_path, file_path) elif error_code == 404: err_msg = 'File: {0} does not exist; check spelling and try ' \ 'again'.format( os.path.join(bucket_name, s3_key)) else: err_msg = 'Unable to connect to bucket: "%s". Error message:\n%s' \ % (bucket_name, exc) raise Exception(err_msg) except Exception as exc: err_msg = 'Unable to connect to bucket: "%s". Error message:\n%s' \ % (bucket_name, exc) raise Exception(err_msg) # Otherwise just return what was passed in, resolving if a link else: local_path = os.path.realpath(file_path) # Check if it exists or it is successfully downloaded if not os.path.exists(local_path): # alert users to 2020-07-20 Neuroparc atlas update (v0 to v1) ndmg_atlases = {} with open( os.path.join( os.path.dirname(os.path.dirname(__file__)), 'resources/templates/ndmg_atlases.csv' ) ) as ndmg_atlases_file: ndmg_atlases['v0'], ndmg_atlases['v1'] = zip(*[( f'/ndmg_atlases/label/Human/{atlas[0]}', f'/ndmg_atlases/label/Human/{atlas[1]}' ) for atlas in csv.reader( ndmg_atlases_file)]) if local_path in ndmg_atlases['v0']: raise FileNotFoundError( ''.join([ 'Neuroparc atlas paths were updated on July 20, 2020. ' 'C-PAC configuration files using Neuroparc v0 atlas paths ' '(including C-PAC default and preconfigured pipeline ' 'configurations from v1.6.2a and earlier) need to be ' 'updated to use Neuroparc atlases. Your current ' 'configuration includes the Neuroparc v0 path ' f'{local_path} which needs to be updated to ', ndmg_atlases['v1'][ndmg_atlases['v0'].index(local_path)], '. For a full list such paths, see https://fcp-indi.' 'github.io/docs/nightly/user/ndmg_atlases' ]) ) else: raise FileNotFoundError(f'File {local_path} does not exist!') if verbose: print("Downloaded file:\n{0}\n".format(local_path)) # Check image dimensionality if local_path.endswith('.nii') or local_path.endswith('.nii.gz'): img_nii = nib.load(local_path) if img_type == 'anat': if len(img_nii.shape) != 3: raise IOError('File: %s must be an anatomical image with 3 ' 'dimensions but %d dimensions found!' % (local_path, len(img_nii.shape))) elif img_type == 'func': if len(img_nii.shape) not in [3, 4]: raise IOError('File: %s must be a functional image with 3 or ' '4 dimensions but %d dimensions found!' % (local_path, len(img_nii.shape))) return local_path
[docs]def gather_extraction_maps(c): ts_analysis_dict = {} sca_analysis_dict = {} if hasattr(c, 'timeseries_extraction'): tsa_roi_dict = c.timeseries_extraction['tse_roi_paths'] # Timeseries and SCA config selections processing # flip the dictionary for roi_path in tsa_roi_dict.keys(): ts_analysis_to_run = [ x.strip() for x in tsa_roi_dict[roi_path].split(",") ] for analysis_type in ts_analysis_to_run: if analysis_type not in ts_analysis_dict.keys(): ts_analysis_dict[analysis_type] = [] ts_analysis_dict[analysis_type].append(roi_path) if c.timeseries_extraction['run']: if not tsa_roi_dict: err = "\n\n[!] CPAC says: Time Series Extraction is " \ "set to run, but no ROI NIFTI file paths were " \ "provided!\n\n" raise Exception(err) if c.seed_based_correlation_analysis['run']: try: sca_roi_dict = c.seed_based_correlation_analysis[ 'sca_roi_paths' ] except KeyError: err = "\n\n[!] CPAC says: Seed-based Correlation Analysis " \ "is set to run, but no ROI NIFTI file paths were " \ "provided!\n\n" raise Exception(err) # flip the dictionary for roi_path in sca_roi_dict.keys(): # update analysis dict for analysis_type in sca_roi_dict[roi_path].split(","): analysis_type = analysis_type.replace(" ", "") if analysis_type not in sca_analysis_dict.keys(): sca_analysis_dict[analysis_type] = [] sca_analysis_dict[analysis_type].append(roi_path) return (ts_analysis_dict, sca_analysis_dict)
[docs]def get_highest_local_res(template: Union[Path, str], tagname: str) -> Path: """Given a reference template path and a resolution string, get all resolutions of that template in the same local path and return the highest resolution. Parameters ---------- template : Path or str tagname : str Returns ------- str Raises ------ LookupError If no matching local template is found. Examples -------- >>> get_highest_local_res( ... '/cpac_templates/MacaqueYerkes19_T1w_2mm_brain.nii.gz', '2mm') PosixPath('/cpac_templates/MacaqueYerkes19_T1w_0.5mm_brain.nii.gz') >>> get_highest_local_res( ... '/cpac_templates/dne_T1w_2mm.nii.gz', '2mm') Traceback (most recent call last): ... LookupError: Could not find template /cpac_templates/dne_T1w_2mm.nii.gz """ from CPAC.pipeline.schema import RESOLUTION_REGEX if isinstance(template, str): template = Path(template) template_pattern = ( RESOLUTION_REGEX.replace('^', '').replace('$', '').join([ re.escape(_part) for _part in template.name.split(tagname, 1)])) matching_templates = [file for file in template.parent.iterdir() if re.match(template_pattern, file.name)] matching_templates.sort() try: return matching_templates[0] except (FileNotFoundError, IndexError): raise LookupError(f"Could not find template {template}")
[docs]def res_string_to_tuple(resolution): """ Converts a resolution string to a tuple of floats. Parameters ---------- resolution : str Resolution string, e.g. "3.438mmx3.438mmx3.4mm" Returns ------- resolution :tuple Tuple of floats, e.g. (3.438, 3.438, 3.4) """ if "x" in str(resolution): return tuple( float(i.replace('mm', '')) for i in resolution.split("x")) return (float(resolution.replace('mm', '')),) * 3
[docs]def resolve_resolution(resolution, template, template_name, tag=None): from nipype.interfaces import afni from CPAC.pipeline import nipype_pipeline_engine as pe from CPAC.utils.datasource import check_for_s3 tagname = None local_path = None if "{" in template and tag is not None: tagname = "${" + tag + "}" try: if tagname is not None: local_path = check_for_s3( template.replace(tagname, str(resolution))) except (IOError, OSError): local_path = None ## TODO debug - it works in ipython but doesn't work in nipype wf # try: # local_path = check_for_s3('/usr/local/fsl/data/standard/MNI152_T1_3.438mmx3.438mmx3.4mm_brain_mask_dil.nii.gz') # except (IOError, OSError): # local_path = None if local_path is None: if tagname is not None: if template.startswith('s3:'): ref_template = template.replace(tagname, '1mm') local_path = check_for_s3(ref_template) else: local_path = get_highest_local_res(template, tagname) elif tagname is None and template.startswith('s3:'): local_path = check_for_s3(template) else: local_path = template resample = pe.Node(interface=afni.Resample(), name=template_name, mem_gb=0, mem_x=(0.0115, 'in_file', 't')) resample.inputs.voxel_size = res_string_to_tuple(resolution) resample.inputs.outputtype = 'NIFTI_GZ' resample.inputs.resample_mode = 'Cu' resample.inputs.in_file = local_path resample.base_dir = '.' resampled_template = resample.run() local_path = resampled_template.outputs.out_file return local_path
[docs]def create_anat_datasource(wf_name='anat_datasource'): from CPAC.pipeline import nipype_pipeline_engine as pe import nipype.interfaces.utility as util wf = pe.Workflow(name=wf_name) inputnode = pe.Node(util.IdentityInterface( fields=['subject', 'anat', 'creds_path', 'dl_dir', 'img_type'], mandatory_inputs=True), name='inputnode') check_s3_node = pe.Node(function.Function(input_names=['file_path', 'creds_path', 'dl_dir', 'img_type'], output_names=['local_path'], function=check_for_s3, as_module=True), name='check_for_s3') wf.connect(inputnode, 'anat', check_s3_node, 'file_path') wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path') wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') wf.connect(inputnode, 'img_type', check_s3_node, 'img_type') outputnode = pe.Node(util.IdentityInterface(fields=['subject', 'anat']), name='outputspec') wf.connect(inputnode, 'subject', outputnode, 'subject') wf.connect(check_s3_node, 'local_path', outputnode, 'anat') # Return the workflow return wf
[docs]def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'): import os mask_dict = {} for mask_file in masks: mask_file = mask_file.rstrip('\r\n') if mask_file.strip() == '' or mask_file.startswith('#'): continue name, desc = lookup_identifier(mask_file) if name == 'template': base_file = os.path.basename(mask_file) try: valid_extensions = ['.nii', '.nii.gz'] base_name = [ base_file[:-len(ext)] for ext in valid_extensions if base_file.endswith(ext) ][0] for key in ['res', 'space']: base_name = bids_remove_entity(base_name, key) except IndexError: # pylint: disable=raise-missing-from raise ValueError('Error in spatial_map_dataflow: File ' f'extension of {base_file} not ".nii" or ' '.nii.gz') except Exception as e: raise e else: base_name = format_identifier(name, desc) if base_name in mask_dict: raise ValueError('Duplicate templates/atlases not allowed: ' f'{mask_file} {mask_dict[base_name]}') mask_dict[base_name] = mask_file wf = pe.Workflow(name=wf_name) inputnode = pe.Node(util.IdentityInterface(fields=['mask', 'mask_file', 'creds_path', 'dl_dir'], mandatory_inputs=True), name='inputspec') mask_keys, mask_values = \ zip(*mask_dict.items()) inputnode.synchronize = True inputnode.iterables = [ ('mask', mask_keys), ('mask_file', mask_values), ] check_s3_node = pe.Node(function.Function(input_names=['file_path', 'creds_path', 'dl_dir', 'img_type'], output_names=['local_path'], function=check_for_s3, as_module=True), name='check_for_s3') wf.connect(inputnode, 'mask_file', check_s3_node, 'file_path') wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path') wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') check_s3_node.inputs.img_type = 'mask' outputnode = pe.Node(util.IdentityInterface(fields=['out_file', 'out_name']), name='outputspec') wf.connect(check_s3_node, 'local_path', outputnode, 'out_file') wf.connect(inputnode, 'mask', outputnode, 'out_name') return wf
[docs]def create_spatial_map_dataflow(spatial_maps, wf_name='datasource_maps'): import os wf = pe.Workflow(name=wf_name) spatial_map_dict = {} for spatial_map_file in spatial_maps: spatial_map_file = spatial_map_file.rstrip('\r\n') base_file = os.path.basename(spatial_map_file) try: valid_extensions = ['.nii', '.nii.gz'] base_name = [ base_file[:-len(ext)] for ext in valid_extensions if base_file.endswith(ext) ][0] if base_name in spatial_map_dict: raise ValueError( 'Files with same name not allowed: %s %s' % ( spatial_map_file, spatial_map_dict[base_name] ) ) spatial_map_dict[base_name] = spatial_map_file except IndexError as e: raise Exception('Error in spatial_map_dataflow: ' 'File extension not in .nii and .nii.gz') inputnode = pe.Node(util.IdentityInterface(fields=['spatial_map', 'spatial_map_file', 'creds_path', 'dl_dir'], mandatory_inputs=True), name='inputspec') spatial_map_keys, spatial_map_values = \ zip(*spatial_map_dict.items()) inputnode.synchronize = True inputnode.iterables = [ ('spatial_map', spatial_map_keys), ('spatial_map_file', spatial_map_values), ] check_s3_node = pe.Node(function.Function(input_names=['file_path', 'creds_path', 'dl_dir', 'img_type'], output_names=['local_path'], function=check_for_s3, as_module=True), name='check_for_s3') wf.connect(inputnode, 'spatial_map_file', check_s3_node, 'file_path') wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path') wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') check_s3_node.inputs.img_type = 'mask' select_spatial_map = pe.Node(util.IdentityInterface(fields=['out_file', 'out_name'], mandatory_inputs=True), name='select_spatial_map') wf.connect(check_s3_node, 'local_path', select_spatial_map, 'out_file') wf.connect(inputnode, 'spatial_map', select_spatial_map, 'out_name') return wf
[docs]def create_grp_analysis_dataflow(wf_name='gp_dataflow'): from CPAC.pipeline import nipype_pipeline_engine as pe import nipype.interfaces.utility as util from CPAC.utils.datasource import select_model_files wf = pe.Workflow(name=wf_name) inputnode = pe.Node(util.IdentityInterface(fields=['ftest', 'grp_model', 'model_name'], mandatory_inputs=True), name='inputspec') selectmodel = pe.Node(function.Function(input_names=['model', 'ftest', 'model_name'], output_names=['fts_file', 'con_file', 'grp_file', 'mat_file'], function=select_model_files, as_module=True), name='selectnode') wf.connect(inputnode, 'ftest', selectmodel, 'ftest') wf.connect(inputnode, 'grp_model', selectmodel, 'model') wf.connect(inputnode, 'model_name', selectmodel, 'model_name') outputnode = pe.Node(util.IdentityInterface(fields=['fts', 'grp', 'mat', 'con'], mandatory_inputs=True), name='outputspec') wf.connect(selectmodel, 'mat_file', outputnode, 'mat') wf.connect(selectmodel, 'grp_file', outputnode, 'grp') wf.connect(selectmodel, 'fts_file', outputnode, 'fts') wf.connect(selectmodel, 'con_file', outputnode, 'con') return wf
[docs]def resample_func_roi(in_func, in_roi, realignment, identity_matrix): import os import nibabel as nb from CPAC.utils.monitoring.custom_logging import log_subprocess # load func and ROI dimension func_img = nb.load(in_func) func_shape = func_img.shape roi_img = nb.load(in_roi) roi_shape = roi_img.shape # check if func size = ROI size, return func and ROI; else resample using flirt if roi_shape != func_shape: # resample func to ROI: in_file = func, reference = ROI if 'func_to_ROI' in realignment: in_file = in_func reference = in_roi out_file = os.path.join(os.getcwd(), in_file[in_file.rindex( '/') + 1:in_file.rindex('.nii')] + '_resampled.nii.gz') out_func = out_file out_roi = in_roi interp = 'trilinear' # resample ROI to func: in_file = ROI, reference = func elif 'ROI_to_func' in realignment: in_file = in_roi reference = in_func out_file = os.path.join(os.getcwd(), in_file[in_file.rindex( '/') + 1:in_file.rindex('.nii')] + '_resampled.nii.gz') out_func = in_func out_roi = out_file interp = 'nearestneighbour' cmd = ['flirt', '-in', in_file, '-ref', reference, '-out', out_file, '-interp', interp, '-applyxfm', '-init', identity_matrix] log_subprocess(cmd) else: out_func = in_func out_roi = in_roi return out_func, out_roi