# Copyright (C) 2012-2023 C-PAC Developers
# This file is part of C-PAC.
# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
import csv
import json
import re
from pathlib import Path
from typing import Union
from nipype import logging
from nipype.interfaces import utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.resources.templates.lookup_table import format_identifier, \
lookup_identifier
from CPAC.utils import function
from CPAC.utils.bids_utils import bids_remove_entity
from CPAC.utils.interfaces.function import Function
from CPAC.utils.typing import TUPLE
from CPAC.utils.utils import get_scan_params
logger = logging.getLogger('nipype.workflow')
[docs]def bidsier_prefix(unique_id):
"""
Function to return a BIDSier prefix for a given unique_id
Parameters
----------
unique_id : str
Returns
-------
prefix : str
Examples
--------
>>> bidsier_prefix('01_1')
'sub-01_ses-1'
>>> bidsier_prefix('sub-01_ses-1')
'sub-01_ses-1'
>>> bidsier_prefix('sub-01_1')
'sub-01_ses-1'
>>> bidsier_prefix('01_ses-1')
'sub-01_ses-1'
"""
keys = ['sub', 'ses']
components = unique_id.split('_')
for i, component in enumerate(components):
if i < len(keys):
if not component.startswith(keys[i]):
components[i] = '-'.join([keys[i], component])
return '_'.join(components)
[docs]def get_rest(scan, rest_dict, resource="scan"):
"""Return the file path of the chosen resource stored in the functional
file dictionary, if it exists.
scan: the scan/series name or label
rest_dict: the dictionary read in from the data configuration YAML file
(sublist) nested under 'func:'
resource: the dictionary key
scan - the functional timeseries
scan_parameters - path to the scan parameters JSON file, or
a dictionary containing scan parameters
information (to be phased out in the
future)
"""
try:
file_path = rest_dict[scan][resource]
except KeyError:
file_path = None
return file_path
[docs]def get_map(map, map_dct):
# return the spatial map required
return map_dct[map]
[docs]def select_model_files(model, ftest, model_name):
"""
Method to select model files
"""
import os
import glob
files = glob.glob(os.path.join(model, '*'))
if len(files) == 0:
raise Exception("No files found inside directory %s" % model)
fts_file = ''
for filename in files:
if (model_name + '.mat') in filename:
mat_file = filename
elif (model_name + '.grp') in filename:
grp_file = filename
elif ((model_name + '.fts') in filename) and ftest:
fts_file = filename
elif (model_name + '.con') in filename:
con_file = filename
if ftest == True and fts_file == '':
errmsg = "\n[!] CPAC says: You have f-tests included in your group " \
"analysis model '%s', but no .fts files were found in the " \
"output folder specified for group analysis: %s.\n\nThe " \
".fts file is automatically generated by CPAC, and if you " \
"are seeing this error, it is because something went wrong " \
"with the generation of this file, or it has been moved." \
"\n\n" % (model_name, model)
raise Exception(errmsg)
return fts_file, con_file, grp_file, mat_file
[docs]def check_func_scan(func_scan_dct, scan):
"""Run some checks on the functional timeseries-related files for a given
series/scan name or label."""
scan_resources = func_scan_dct[scan]
try:
scan_resources.keys()
except AttributeError:
err = "\n[!] The data configuration file you provided is " \
"missing a level under the 'func:' key. CPAC versions " \
"1.2 and later use data configurations with an " \
"additional level of nesting.\n\nExample\nfunc:\n " \
"rest01:\n scan: /path/to/rest01_func.nii.gz\n" \
" scan parameters: /path/to/scan_params.json\n\n" \
"See the User Guide for more information.\n\n"
raise Exception(err)
# actual 4D time series file
if "scan" not in scan_resources.keys():
err = "\n\n[!] The {0} scan is missing its actual time-series " \
"scan file, which should be a filepath labeled with the " \
"'scan' key.\n\n".format(scan)
raise Exception(err)
# Nipype restriction (may have changed)
if '.' in scan or '+' in scan or '*' in scan:
raise Exception('\n\n[!] Scan names cannot contain any special '
'characters (., +, *, etc.). Please update this '
'and try again.\n\nScan: {0}'
'\n\n'.format(scan))
[docs]def create_func_datasource(rest_dict, rpool, wf_name='func_datasource'):
"""Return the functional timeseries-related file paths for each
series/scan, from the dictionary of functional files described in the data
configuration (sublist) YAML file.
Scan input (from inputnode) is an iterable.
"""
from CPAC.pipeline import nipype_pipeline_engine as pe
import nipype.interfaces.utility as util
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(util.IdentityInterface(
fields=['subject', 'scan', 'creds_path', 'dl_dir'],
mandatory_inputs=True),
name='inputnode')
outputnode = pe.Node(util.IdentityInterface(fields=['subject', 'rest',
'scan', 'scan_params',
'phase_diff',
'magnitude']),
name='outputspec')
# have this here for now because of the big change in the data
# configuration format
# (Not necessary with ingress - format does not comply)
if not rpool.check_rpool('derivatives-dir'):
check_scan = pe.Node(function.Function(input_names=['func_scan_dct',
'scan'],
output_names=[],
function=check_func_scan,
as_module=True),
name='check_func_scan')
check_scan.inputs.func_scan_dct = rest_dict
wf.connect(inputnode, 'scan', check_scan, 'scan')
# get the functional scan itself
selectrest = pe.Node(function.Function(input_names=['scan',
'rest_dict',
'resource'],
output_names=['file_path'],
function=get_rest,
as_module=True),
name='selectrest')
selectrest.inputs.rest_dict = rest_dict
selectrest.inputs.resource = "scan"
wf.connect(inputnode, 'scan', selectrest, 'scan')
# check to see if it's on an Amazon AWS S3 bucket, and download it, if it
# is - otherwise, just return the local file path
check_s3_node = pe.Node(function.Function(input_names=['file_path',
'creds_path',
'dl_dir',
'img_type'],
output_names=['local_path'],
function=check_for_s3,
as_module=True),
name='check_for_s3')
wf.connect(selectrest, 'file_path', check_s3_node, 'file_path')
wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path')
wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir')
check_s3_node.inputs.img_type = 'func'
wf.connect(inputnode, 'subject', outputnode, 'subject')
wf.connect(check_s3_node, 'local_path', outputnode, 'rest')
wf.connect(inputnode, 'scan', outputnode, 'scan')
# scan parameters CSV
select_scan_params = pe.Node(function.Function(input_names=['scan',
'rest_dict',
'resource'],
output_names=['file_path'],
function=get_rest,
as_module=True),
name='select_scan_params')
select_scan_params.inputs.rest_dict = rest_dict
select_scan_params.inputs.resource = "scan_parameters"
wf.connect(inputnode, 'scan', select_scan_params, 'scan')
# if the scan parameters file is on AWS S3, download it
s3_scan_params = pe.Node(function.Function(input_names=['file_path',
'creds_path',
'dl_dir',
'img_type'],
output_names=['local_path'],
function=check_for_s3,
as_module=True),
name='s3_scan_params')
wf.connect(select_scan_params, 'file_path', s3_scan_params, 'file_path')
wf.connect(inputnode, 'creds_path', s3_scan_params, 'creds_path')
wf.connect(inputnode, 'dl_dir', s3_scan_params, 'dl_dir')
wf.connect(s3_scan_params, 'local_path', outputnode, 'scan_params')
return wf
[docs]def create_fmap_datasource(fmap_dct, wf_name='fmap_datasource'):
"""Return the field map files, from the dictionary of functional files
described in the data configuration (sublist) YAML file.
"""
from CPAC.pipeline import nipype_pipeline_engine as pe
import nipype.interfaces.utility as util
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(util.IdentityInterface(
fields=['subject', 'scan', 'creds_path', 'dl_dir'],
mandatory_inputs=True),
name='inputnode')
outputnode = pe.Node(util.IdentityInterface(fields=['subject', 'rest',
'scan', 'scan_params',
'phase_diff',
'magnitude']),
name='outputspec')
selectrest = pe.Node(function.Function(input_names=['scan',
'rest_dict',
'resource'],
output_names=['file_path'],
function=get_rest,
as_module=True),
name='selectrest')
selectrest.inputs.rest_dict = fmap_dct
selectrest.inputs.resource = "scan"
wf.connect(inputnode, 'scan', selectrest, 'scan')
# check to see if it's on an Amazon AWS S3 bucket, and download it, if it
# is - otherwise, just return the local file path
check_s3_node = pe.Node(function.Function(input_names=['file_path',
'creds_path',
'dl_dir',
'img_type'],
output_names=['local_path'],
function=check_for_s3,
as_module=True),
name='check_for_s3')
wf.connect(selectrest, 'file_path', check_s3_node, 'file_path')
wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path')
wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir')
check_s3_node.inputs.img_type = 'other'
wf.connect(inputnode, 'subject', outputnode, 'subject')
wf.connect(check_s3_node, 'local_path', outputnode, 'rest')
wf.connect(inputnode, 'scan', outputnode, 'scan')
# scan parameters CSV
select_scan_params = pe.Node(function.Function(input_names=['scan',
'rest_dict',
'resource'],
output_names=['file_path'],
function=get_rest,
as_module=True),
name='select_scan_params')
select_scan_params.inputs.rest_dict = fmap_dct
select_scan_params.inputs.resource = "scan_parameters"
wf.connect(inputnode, 'scan', select_scan_params, 'scan')
# if the scan parameters file is on AWS S3, download it
s3_scan_params = pe.Node(function.Function(input_names=['file_path',
'creds_path',
'dl_dir',
'img_type'],
output_names=['local_path'],
function=check_for_s3,
as_module=True),
name='s3_scan_params')
wf.connect(select_scan_params, 'file_path', s3_scan_params, 'file_path')
wf.connect(inputnode, 'creds_path', s3_scan_params, 'creds_path')
wf.connect(inputnode, 'dl_dir', s3_scan_params, 'dl_dir')
wf.connect(s3_scan_params, 'local_path', outputnode, 'scan_params')
return wf
[docs]@Function.sig_imports(['from CPAC.utils.typing import TUPLE'])
def calc_delta_te_and_asym_ratio(effective_echo_spacing: float,
echo_times: list) -> TUPLE[float, float]:
"""Calcluate ``deltaTE`` and ``ees_asym_ratio`` from given metadata
Parameters
----------
effective_echo_spacing : float
EffectiveEchoSpacing from sidecar JSON
echo_times : list
Returns
-------
deltaTE : float
ees_asym_ratio : float
"""
if not isinstance(effective_echo_spacing, float):
raise LookupError('C-PAC could not find `EffectiveEchoSpacing` in '
'either fmap or func sidecar JSON, but that field '
'is required for PhaseDiff distortion correction.')
# convert into milliseconds if necessary
# these values will/should never be more than 10ms
if ((echo_times[0] * 1000) < 10) and ((echo_times[1] * 1000) < 10):
echo_times[0] = echo_times[0] * 1000
echo_times[1] = echo_times[1] * 1000
deltaTE = abs(echo_times[0] - echo_times[1])
ees_asym_ratio = (effective_echo_spacing / deltaTE)
return deltaTE, ees_asym_ratio
[docs]def gather_echo_times(echotime_1, echotime_2=None, echotime_3=None, echotime_4=None):
echotime_list = [echotime_1, echotime_2, echotime_3, echotime_4]
echotime_list = list(filter(lambda item: item is not None, echotime_list))
echotime_list = list(set(echotime_list))
if len(echotime_list) != 2:
raise Exception("\n[!] Something went wrong with the field map echo "
"times - there should be two distinct values.\n\n"
f"Echo Times:\n{echotime_list}\n")
return echotime_list
[docs]def match_epi_fmaps(bold_pedir, epi_fmap_one, epi_fmap_params_one,
epi_fmap_two=None, epi_fmap_params_two=None):
"""Parse the field map files in the data configuration and determine which
ones have the same and opposite phase-encoding directions as the BOLD scan
in the current pipeline.
Example - parse the files under the 'fmap' level, i.e. 'epi_AP':
anat: /path/to/T1w.nii.gz
fmap:
epi_AP:
scan: /path/to/field-map.nii.gz
scan_parameters: <config dictionary containing phase-encoding
direction>
func:
rest_1:
scan: /path/to/bold.nii.gz
scan_parameters: <config dictionary of BOLD scan parameters>
1. Check PhaseEncodingDirection field in the metadata for the BOLD.
2. Check whether there are one or two EPI's in the field map data.
3. Grab the one or two EPI field maps.
"""
fmap_dct = {epi_fmap_one: epi_fmap_params_one}
if epi_fmap_two and epi_fmap_params_two:
fmap_dct[epi_fmap_two] = epi_fmap_params_two
opposite_pe_epi = None
same_pe_epi = None
for epi_scan in fmap_dct.keys():
scan_params = fmap_dct[epi_scan]
if not isinstance(scan_params, dict) and ".json" in scan_params:
with open(scan_params, 'r') as f:
scan_params = json.load(f)
if "PhaseEncodingDirection" in scan_params:
epi_pedir = scan_params["PhaseEncodingDirection"]
if epi_pedir == bold_pedir:
same_pe_epi = epi_scan
elif epi_pedir[0] == bold_pedir[0]:
opposite_pe_epi = epi_scan
return (opposite_pe_epi, same_pe_epi)
[docs]def create_general_datasource(wf_name):
from CPAC.pipeline import nipype_pipeline_engine as pe
import nipype.interfaces.utility as util
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(util.IdentityInterface(
fields=['unique_id', 'data', 'scan', 'creds_path',
'dl_dir'],
mandatory_inputs=True),
name='inputnode')
check_s3_node = pe.Node(function.Function(input_names=['file_path',
'creds_path',
'dl_dir',
'img_type'],
output_names=['local_path'],
function=check_for_s3,
as_module=True),
name='check_for_s3')
check_s3_node.inputs.img_type = "other"
wf.connect(inputnode, 'data', check_s3_node, 'file_path')
wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path')
wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir')
outputnode = pe.Node(util.IdentityInterface(fields=['unique_id',
'data',
'scan']),
name='outputspec')
wf.connect(inputnode, 'unique_id', outputnode, 'unique_id')
wf.connect(inputnode, 'scan', outputnode, 'scan')
wf.connect(check_s3_node, 'local_path', outputnode, 'data')
return wf
[docs]def create_check_for_s3_node(name, file_path, img_type='other',
creds_path=None, dl_dir=None, map_node=False):
if map_node:
check_s3_node = pe.MapNode(function.Function(input_names=['file_path',
'creds_path',
'dl_dir',
'img_type'],
output_names=[
'local_path'],
function=check_for_s3,
as_module=True),
iterfield=['file_path'],
name='check_for_s3_%s' % name)
else:
check_s3_node = pe.Node(function.Function(input_names=['file_path',
'creds_path',
'dl_dir',
'img_type'],
output_names=['local_path'],
function=check_for_s3,
as_module=True),
name='check_for_s3_%s' % name)
check_s3_node.inputs.set(
file_path=file_path,
creds_path=creds_path,
dl_dir=dl_dir,
img_type=img_type
)
return check_s3_node
# Check if passed-in file is on S3
[docs]def check_for_s3(file_path, creds_path=None, dl_dir=None, img_type='other',
verbose=False):
# Import packages
import os
import nibabel as nib
import botocore.exceptions
from indi_aws import fetch_creds
# Init variables
s3_str = 's3://'
if creds_path:
if "None" in creds_path or "none" in creds_path or \
"null" in creds_path:
creds_path = None
if dl_dir is None:
dl_dir = os.getcwd()
if file_path is None:
# in case it's something like scan parameters or field map files, but
# we don't have any
return None
# TODO: remove this once scan parameter input as dictionary is phased out
if isinstance(file_path, dict):
# if this is a dictionary, just skip altogether
local_path = file_path
return local_path
if file_path.lower().startswith(s3_str):
file_path = s3_str + file_path[len(s3_str):]
# Get bucket name and bucket object
bucket_name = file_path[len(s3_str):].split('/')[0]
# Extract relative key path from bucket and local path
s3_prefix = s3_str + bucket_name
s3_key = file_path[len(s3_prefix) + 1:]
local_path = os.path.join(dl_dir, bucket_name, s3_key)
# Get local directory and create folders if they dont exist
local_dir = os.path.dirname(local_path)
if not os.path.exists(local_dir):
os.makedirs(local_dir, exist_ok=True)
if os.path.exists(local_path):
print("{0} already exists- skipping download.".format(local_path))
else:
# Download file
try:
bucket = fetch_creds.return_bucket(creds_path, bucket_name)
print("Attempting to download from AWS S3: {0}".format(
file_path))
bucket.download_file(Key=s3_key, Filename=local_path)
except botocore.exceptions.ClientError as exc:
error_code = int(exc.response['Error']['Code'])
err_msg = str(exc)
if error_code == 403:
err_msg = 'Access to bucket: "%s" is denied; using credentials ' \
'in subject list: "%s"; cannot access the file "%s"' \
% (bucket_name, creds_path, file_path)
elif error_code == 404:
err_msg = 'File: {0} does not exist; check spelling and try ' \
'again'.format(
os.path.join(bucket_name, s3_key))
else:
err_msg = 'Unable to connect to bucket: "%s". Error message:\n%s' \
% (bucket_name, exc)
raise Exception(err_msg)
except Exception as exc:
err_msg = 'Unable to connect to bucket: "%s". Error message:\n%s' \
% (bucket_name, exc)
raise Exception(err_msg)
# Otherwise just return what was passed in, resolving if a link
else:
local_path = os.path.realpath(file_path)
# Check if it exists or it is successfully downloaded
if not os.path.exists(local_path):
# alert users to 2020-07-20 Neuroparc atlas update (v0 to v1)
ndmg_atlases = {}
with open(
os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'resources/templates/ndmg_atlases.csv'
)
) as ndmg_atlases_file:
ndmg_atlases['v0'], ndmg_atlases['v1'] = zip(*[(
f'/ndmg_atlases/label/Human/{atlas[0]}',
f'/ndmg_atlases/label/Human/{atlas[1]}'
) for atlas in
csv.reader(
ndmg_atlases_file)])
if local_path in ndmg_atlases['v0']:
raise FileNotFoundError(
''.join([
'Neuroparc atlas paths were updated on July 20, 2020. '
'C-PAC configuration files using Neuroparc v0 atlas paths '
'(including C-PAC default and preconfigured pipeline '
'configurations from v1.6.2a and earlier) need to be '
'updated to use Neuroparc atlases. Your current '
'configuration includes the Neuroparc v0 path '
f'{local_path} which needs to be updated to ',
ndmg_atlases['v1'][ndmg_atlases['v0'].index(local_path)],
'. For a full list such paths, see https://fcp-indi.'
'github.io/docs/nightly/user/ndmg_atlases'
])
)
else:
raise FileNotFoundError(f'File {local_path} does not exist!')
if verbose:
print("Downloaded file:\n{0}\n".format(local_path))
# Check image dimensionality
if local_path.endswith('.nii') or local_path.endswith('.nii.gz'):
img_nii = nib.load(local_path)
if img_type == 'anat':
if len(img_nii.shape) != 3:
raise IOError('File: %s must be an anatomical image with 3 '
'dimensions but %d dimensions found!'
% (local_path, len(img_nii.shape)))
elif img_type == 'func':
if len(img_nii.shape) not in [3, 4]:
raise IOError('File: %s must be a functional image with 3 or '
'4 dimensions but %d dimensions found!'
% (local_path, len(img_nii.shape)))
return local_path
[docs]def get_highest_local_res(template: Union[Path, str], tagname: str) -> Path:
"""Given a reference template path and a resolution string, get all
resolutions of that template in the same local path and return the
highest resolution.
Parameters
----------
template : Path or str
tagname : str
Returns
-------
str
Raises
------
LookupError
If no matching local template is found.
Examples
--------
>>> get_highest_local_res(
... '/cpac_templates/MacaqueYerkes19_T1w_2mm_brain.nii.gz', '2mm')
PosixPath('/cpac_templates/MacaqueYerkes19_T1w_0.5mm_brain.nii.gz')
>>> get_highest_local_res(
... '/cpac_templates/dne_T1w_2mm.nii.gz', '2mm')
Traceback (most recent call last):
...
LookupError: Could not find template /cpac_templates/dne_T1w_2mm.nii.gz
"""
from CPAC.pipeline.schema import RESOLUTION_REGEX
if isinstance(template, str):
template = Path(template)
template_pattern = (
RESOLUTION_REGEX.replace('^', '').replace('$', '').join([
re.escape(_part) for _part in template.name.split(tagname, 1)]))
matching_templates = [file for file in template.parent.iterdir() if
re.match(template_pattern, file.name)]
matching_templates.sort()
try:
return matching_templates[0]
except (FileNotFoundError, IndexError):
raise LookupError(f"Could not find template {template}")
[docs]def res_string_to_tuple(resolution):
"""
Converts a resolution string to a tuple of floats.
Parameters
----------
resolution : str
Resolution string, e.g. "3.438mmx3.438mmx3.4mm"
Returns
-------
resolution :tuple
Tuple of floats, e.g. (3.438, 3.438, 3.4)
"""
if "x" in str(resolution):
return tuple(
float(i.replace('mm', '')) for i in resolution.split("x"))
return (float(resolution.replace('mm', '')),) * 3
[docs]def resolve_resolution(resolution, template, template_name, tag=None):
from nipype.interfaces import afni
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.utils.datasource import check_for_s3
tagname = None
local_path = None
if "{" in template and tag is not None:
tagname = "${" + tag + "}"
try:
if tagname is not None:
local_path = check_for_s3(
template.replace(tagname, str(resolution)))
except (IOError, OSError):
local_path = None
## TODO debug - it works in ipython but doesn't work in nipype wf
# try:
# local_path = check_for_s3('/usr/local/fsl/data/standard/MNI152_T1_3.438mmx3.438mmx3.4mm_brain_mask_dil.nii.gz')
# except (IOError, OSError):
# local_path = None
if local_path is None:
if tagname is not None:
if template.startswith('s3:'):
ref_template = template.replace(tagname, '1mm')
local_path = check_for_s3(ref_template)
else:
local_path = get_highest_local_res(template, tagname)
elif tagname is None and template.startswith('s3:'):
local_path = check_for_s3(template)
else:
local_path = template
resample = pe.Node(interface=afni.Resample(),
name=template_name,
mem_gb=0,
mem_x=(0.0115, 'in_file', 't'))
resample.inputs.voxel_size = res_string_to_tuple(resolution)
resample.inputs.outputtype = 'NIFTI_GZ'
resample.inputs.resample_mode = 'Cu'
resample.inputs.in_file = local_path
resample.base_dir = '.'
resampled_template = resample.run()
local_path = resampled_template.outputs.out_file
return local_path
[docs]def create_anat_datasource(wf_name='anat_datasource'):
from CPAC.pipeline import nipype_pipeline_engine as pe
import nipype.interfaces.utility as util
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(util.IdentityInterface(
fields=['subject', 'anat', 'creds_path',
'dl_dir', 'img_type'],
mandatory_inputs=True),
name='inputnode')
check_s3_node = pe.Node(function.Function(input_names=['file_path',
'creds_path',
'dl_dir',
'img_type'],
output_names=['local_path'],
function=check_for_s3,
as_module=True),
name='check_for_s3')
wf.connect(inputnode, 'anat', check_s3_node, 'file_path')
wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path')
wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir')
wf.connect(inputnode, 'img_type', check_s3_node, 'img_type')
outputnode = pe.Node(util.IdentityInterface(fields=['subject',
'anat']),
name='outputspec')
wf.connect(inputnode, 'subject', outputnode, 'subject')
wf.connect(check_s3_node, 'local_path', outputnode, 'anat')
# Return the workflow
return wf
[docs]def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'):
import os
mask_dict = {}
for mask_file in masks:
mask_file = mask_file.rstrip('\r\n')
if mask_file.strip() == '' or mask_file.startswith('#'):
continue
name, desc = lookup_identifier(mask_file)
if name == 'template':
base_file = os.path.basename(mask_file)
try:
valid_extensions = ['.nii', '.nii.gz']
base_name = [
base_file[:-len(ext)]
for ext in valid_extensions
if base_file.endswith(ext)
][0]
for key in ['res', 'space']:
base_name = bids_remove_entity(base_name, key)
except IndexError:
# pylint: disable=raise-missing-from
raise ValueError('Error in spatial_map_dataflow: File '
f'extension of {base_file} not ".nii" or '
'.nii.gz')
except Exception as e:
raise e
else:
base_name = format_identifier(name, desc)
if base_name in mask_dict:
raise ValueError('Duplicate templates/atlases not allowed: '
f'{mask_file} {mask_dict[base_name]}')
mask_dict[base_name] = mask_file
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(util.IdentityInterface(fields=['mask',
'mask_file',
'creds_path',
'dl_dir'],
mandatory_inputs=True),
name='inputspec')
mask_keys, mask_values = \
zip(*mask_dict.items())
inputnode.synchronize = True
inputnode.iterables = [
('mask', mask_keys),
('mask_file', mask_values),
]
check_s3_node = pe.Node(function.Function(input_names=['file_path',
'creds_path',
'dl_dir',
'img_type'],
output_names=['local_path'],
function=check_for_s3,
as_module=True),
name='check_for_s3')
wf.connect(inputnode, 'mask_file', check_s3_node, 'file_path')
wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path')
wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir')
check_s3_node.inputs.img_type = 'mask'
outputnode = pe.Node(util.IdentityInterface(fields=['out_file',
'out_name']),
name='outputspec')
wf.connect(check_s3_node, 'local_path', outputnode, 'out_file')
wf.connect(inputnode, 'mask', outputnode, 'out_name')
return wf
[docs]def create_spatial_map_dataflow(spatial_maps, wf_name='datasource_maps'):
import os
wf = pe.Workflow(name=wf_name)
spatial_map_dict = {}
for spatial_map_file in spatial_maps:
spatial_map_file = spatial_map_file.rstrip('\r\n')
base_file = os.path.basename(spatial_map_file)
try:
valid_extensions = ['.nii', '.nii.gz']
base_name = [
base_file[:-len(ext)]
for ext in valid_extensions
if base_file.endswith(ext)
][0]
if base_name in spatial_map_dict:
raise ValueError(
'Files with same name not allowed: %s %s' % (
spatial_map_file,
spatial_map_dict[base_name]
)
)
spatial_map_dict[base_name] = spatial_map_file
except IndexError as e:
raise Exception('Error in spatial_map_dataflow: '
'File extension not in .nii and .nii.gz')
inputnode = pe.Node(util.IdentityInterface(fields=['spatial_map',
'spatial_map_file',
'creds_path',
'dl_dir'],
mandatory_inputs=True),
name='inputspec')
spatial_map_keys, spatial_map_values = \
zip(*spatial_map_dict.items())
inputnode.synchronize = True
inputnode.iterables = [
('spatial_map', spatial_map_keys),
('spatial_map_file', spatial_map_values),
]
check_s3_node = pe.Node(function.Function(input_names=['file_path',
'creds_path',
'dl_dir',
'img_type'],
output_names=['local_path'],
function=check_for_s3,
as_module=True),
name='check_for_s3')
wf.connect(inputnode, 'spatial_map_file', check_s3_node, 'file_path')
wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path')
wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir')
check_s3_node.inputs.img_type = 'mask'
select_spatial_map = pe.Node(util.IdentityInterface(fields=['out_file',
'out_name'],
mandatory_inputs=True),
name='select_spatial_map')
wf.connect(check_s3_node, 'local_path', select_spatial_map, 'out_file')
wf.connect(inputnode, 'spatial_map', select_spatial_map, 'out_name')
return wf
[docs]def create_grp_analysis_dataflow(wf_name='gp_dataflow'):
from CPAC.pipeline import nipype_pipeline_engine as pe
import nipype.interfaces.utility as util
from CPAC.utils.datasource import select_model_files
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(util.IdentityInterface(fields=['ftest',
'grp_model',
'model_name'],
mandatory_inputs=True),
name='inputspec')
selectmodel = pe.Node(function.Function(input_names=['model',
'ftest',
'model_name'],
output_names=['fts_file',
'con_file',
'grp_file',
'mat_file'],
function=select_model_files,
as_module=True),
name='selectnode')
wf.connect(inputnode, 'ftest',
selectmodel, 'ftest')
wf.connect(inputnode, 'grp_model',
selectmodel, 'model')
wf.connect(inputnode, 'model_name', selectmodel, 'model_name')
outputnode = pe.Node(util.IdentityInterface(fields=['fts',
'grp',
'mat',
'con'],
mandatory_inputs=True),
name='outputspec')
wf.connect(selectmodel, 'mat_file',
outputnode, 'mat')
wf.connect(selectmodel, 'grp_file',
outputnode, 'grp')
wf.connect(selectmodel, 'fts_file',
outputnode, 'fts')
wf.connect(selectmodel, 'con_file',
outputnode, 'con')
return wf
[docs]def resample_func_roi(in_func, in_roi, realignment, identity_matrix):
import os
import nibabel as nb
from CPAC.utils.monitoring.custom_logging import log_subprocess
# load func and ROI dimension
func_img = nb.load(in_func)
func_shape = func_img.shape
roi_img = nb.load(in_roi)
roi_shape = roi_img.shape
# check if func size = ROI size, return func and ROI; else resample using flirt
if roi_shape != func_shape:
# resample func to ROI: in_file = func, reference = ROI
if 'func_to_ROI' in realignment:
in_file = in_func
reference = in_roi
out_file = os.path.join(os.getcwd(), in_file[in_file.rindex(
'/') + 1:in_file.rindex('.nii')] + '_resampled.nii.gz')
out_func = out_file
out_roi = in_roi
interp = 'trilinear'
# resample ROI to func: in_file = ROI, reference = func
elif 'ROI_to_func' in realignment:
in_file = in_roi
reference = in_func
out_file = os.path.join(os.getcwd(), in_file[in_file.rindex(
'/') + 1:in_file.rindex('.nii')] + '_resampled.nii.gz')
out_func = in_func
out_roi = out_file
interp = 'nearestneighbour'
cmd = ['flirt', '-in', in_file,
'-ref', reference,
'-out', out_file,
'-interp', interp,
'-applyxfm', '-init', identity_matrix]
log_subprocess(cmd)
else:
out_func = in_func
out_roi = in_roi
return out_func, out_roi