# Copyright (C) 2012-2024 C-PAC Developers
# This file is part of C-PAC.
# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""Utilities for sourcing data."""
import csv
import json
from pathlib import Path
import re
from voluptuous import RequiredFieldInvalid
from nipype.interfaces import utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.resources.templates.lookup_table import format_identifier, lookup_identifier
from CPAC.utils import function
from CPAC.utils.bids_utils import bids_remove_entity
from CPAC.utils.interfaces.function import Function
from CPAC.utils.monitoring import FMLOGGER
from CPAC.utils.utils import get_scan_params
[docs]
def bidsier_prefix(unique_id):
"""Return a BIDSier prefix for a given unique_id.
Parameters
----------
unique_id : str
Returns
-------
prefix : str
Examples
--------
>>> bidsier_prefix('01_1')
'sub-01_ses-1'
>>> bidsier_prefix('sub-01_ses-1')
'sub-01_ses-1'
>>> bidsier_prefix('sub-01_1')
'sub-01_ses-1'
>>> bidsier_prefix('01_ses-1')
'sub-01_ses-1'
"""
keys = ["sub", "ses"]
components = unique_id.split("_")
for i, component in enumerate(components):
if i < len(keys):
if not component.startswith(keys[i]):
components[i] = "-".join([keys[i], component])
return "_".join(components)
[docs]
def get_rest(scan, rest_dict, resource="scan"):
"""Return the path of the chosen resource in the functional file dictionary.
scan: the scan/series name or label
rest_dict: the dictionary read in from the data configuration YAML file
(sublist) nested under 'func:'
resource: the dictionary key
scan - the functional timeseries
scan_parameters - path to the scan parameters JSON file, or
a dictionary containing scan parameters
information (to be phased out in the
future)
"""
try:
file_path = rest_dict[scan][resource]
except KeyError:
file_path = None
return file_path
[docs]
def select_model_files(model, ftest, model_name):
"""Select model files."""
import glob
import os
files = glob.glob(os.path.join(model, "*"))
if len(files) == 0:
msg = f"No files found inside directory {model}"
raise FileNotFoundError(msg)
fts_file = ""
for filename in files:
if (model_name + ".mat") in filename:
mat_file = filename
elif (model_name + ".grp") in filename:
grp_file = filename
elif ((model_name + ".fts") in filename) and ftest:
fts_file = filename
elif (model_name + ".con") in filename:
con_file = filename
if ftest and fts_file == "":
errmsg = (
"\n[!] CPAC says: You have f-tests included in your group "
f"analysis model '{model_name}', but no .fts files were found in the "
f"output folder specified for group analysis: {model}.\n\nThe "
".fts file is automatically generated by CPAC, and if you "
"are seeing this error, it is because something went wrong "
"with the generation of this file, or it has been moved."
)
raise FileNotFoundError(errmsg)
return fts_file, con_file, grp_file, mat_file
[docs]
def check_func_scan(func_scan_dct, scan):
"""Run some checks on the functional timeseries-related files.
For a given series/scan name or label.
"""
scan_resources = func_scan_dct[scan]
try:
scan_resources.keys()
except AttributeError:
err = (
"\n[!] The data configuration file you provided is "
"missing a level under the 'func:' key. CPAC versions "
"1.2 and later use data configurations with an "
"additional level of nesting.\n\nExample\nfunc:\n "
"rest01:\n scan: /path/to/rest01_func.nii.gz\n"
" scan parameters: /path/to/scan_params.json\n\n"
"See the User Guide for more information.\n\n"
)
raise ValueError(err)
# actual 4D time series file
if "scan" not in scan_resources.keys():
err = (
f"\n\n[!] The {scan} scan is missing its actual time-series "
"scan file, which should be a filepath labeled with the "
"'scan' key.\n\n"
)
raise FileNotFoundError(err)
# Nipype restriction (may have changed)
if "." in scan or "+" in scan or "*" in scan:
msg = (
"\n\n[!] Scan names cannot contain any special "
"characters (., +, *, etc.). Please update this "
f"and try again.\n\nScan: {scan}"
"\n\n"
)
raise ValueError(msg)
[docs]
def create_func_datasource(rest_dict, rpool, wf_name="func_datasource"):
"""Return the functional timeseries-related file paths for each series/scan...
...from the dictionary of functional files described in the data
configuration (sublist) YAML file.
Scan input (from inputnode) is an iterable.
"""
import nipype.interfaces.utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(
util.IdentityInterface(
fields=["subject", "scan", "creds_path", "dl_dir"], mandatory_inputs=True
),
name="inputnode",
)
outputnode = pe.Node(
util.IdentityInterface(
fields=["subject", "rest", "scan", "scan_params", "phase_diff", "magnitude"]
),
name="outputspec",
)
# have this here for now because of the big change in the data
# configuration format
# (Not necessary with ingress - format does not comply)
if not rpool.check_rpool("derivatives-dir"):
check_scan = pe.Node(
function.Function(
input_names=["func_scan_dct", "scan"],
output_names=[],
function=check_func_scan,
as_module=True,
),
name="check_func_scan",
)
check_scan.inputs.func_scan_dct = rest_dict
wf.connect(inputnode, "scan", check_scan, "scan")
# get the functional scan itself
selectrest = pe.Node(
function.Function(
input_names=["scan", "rest_dict", "resource"],
output_names=["file_path"],
function=get_rest,
as_module=True,
),
name="selectrest",
)
selectrest.inputs.rest_dict = rest_dict
selectrest.inputs.resource = "scan"
wf.connect(inputnode, "scan", selectrest, "scan")
# check to see if it's on an Amazon AWS S3 bucket, and download it, if it
# is - otherwise, just return the local file path
check_s3_node = pe.Node(
function.Function(
input_names=["file_path", "creds_path", "dl_dir", "img_type"],
output_names=["local_path"],
function=check_for_s3,
as_module=True,
),
name="check_for_s3",
)
wf.connect(selectrest, "file_path", check_s3_node, "file_path")
wf.connect(inputnode, "creds_path", check_s3_node, "creds_path")
wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir")
check_s3_node.inputs.img_type = "func"
wf.connect(inputnode, "subject", outputnode, "subject")
wf.connect(check_s3_node, "local_path", outputnode, "rest")
wf.connect(inputnode, "scan", outputnode, "scan")
# scan parameters CSV
select_scan_params = pe.Node(
function.Function(
input_names=["scan", "rest_dict", "resource"],
output_names=["file_path"],
function=get_rest,
as_module=True,
),
name="select_scan_params",
)
select_scan_params.inputs.rest_dict = rest_dict
select_scan_params.inputs.resource = "scan_parameters"
wf.connect(inputnode, "scan", select_scan_params, "scan")
# if the scan parameters file is on AWS S3, download it
s3_scan_params = pe.Node(
function.Function(
input_names=["file_path", "creds_path", "dl_dir", "img_type"],
output_names=["local_path"],
function=check_for_s3,
as_module=True,
),
name="s3_scan_params",
)
wf.connect(select_scan_params, "file_path", s3_scan_params, "file_path")
wf.connect(inputnode, "creds_path", s3_scan_params, "creds_path")
wf.connect(inputnode, "dl_dir", s3_scan_params, "dl_dir")
wf.connect(s3_scan_params, "local_path", outputnode, "scan_params")
return wf
[docs]
def create_fmap_datasource(fmap_dct, wf_name="fmap_datasource"):
"""Return the field map files...
...from the dictionary of functional files described in the data configuration
(sublist) YAML file.
"""
import nipype.interfaces.utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(
util.IdentityInterface(
fields=["subject", "scan", "creds_path", "dl_dir"], mandatory_inputs=True
),
name="inputnode",
)
outputnode = pe.Node(
util.IdentityInterface(
fields=["subject", "rest", "scan", "scan_params", "phase_diff", "magnitude"]
),
name="outputspec",
)
selectrest = pe.Node(
function.Function(
input_names=["scan", "rest_dict", "resource"],
output_names=["file_path"],
function=get_rest,
as_module=True,
),
name="selectrest",
)
selectrest.inputs.rest_dict = fmap_dct
selectrest.inputs.resource = "scan"
wf.connect(inputnode, "scan", selectrest, "scan")
# check to see if it's on an Amazon AWS S3 bucket, and download it, if it
# is - otherwise, just return the local file path
check_s3_node = pe.Node(
function.Function(
input_names=["file_path", "creds_path", "dl_dir", "img_type"],
output_names=["local_path"],
function=check_for_s3,
as_module=True,
),
name="check_for_s3",
)
wf.connect(selectrest, "file_path", check_s3_node, "file_path")
wf.connect(inputnode, "creds_path", check_s3_node, "creds_path")
wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir")
check_s3_node.inputs.img_type = "other"
wf.connect(inputnode, "subject", outputnode, "subject")
wf.connect(check_s3_node, "local_path", outputnode, "rest")
wf.connect(inputnode, "scan", outputnode, "scan")
# scan parameters CSV
select_scan_params = pe.Node(
function.Function(
input_names=["scan", "rest_dict", "resource"],
output_names=["file_path"],
function=get_rest,
as_module=True,
),
name="select_scan_params",
)
select_scan_params.inputs.rest_dict = fmap_dct
select_scan_params.inputs.resource = "scan_parameters"
wf.connect(inputnode, "scan", select_scan_params, "scan")
# if the scan parameters file is on AWS S3, download it
s3_scan_params = pe.Node(
function.Function(
input_names=["file_path", "creds_path", "dl_dir", "img_type"],
output_names=["local_path"],
function=check_for_s3,
as_module=True,
),
name="s3_scan_params",
)
wf.connect(select_scan_params, "file_path", s3_scan_params, "file_path")
wf.connect(inputnode, "creds_path", s3_scan_params, "creds_path")
wf.connect(inputnode, "dl_dir", s3_scan_params, "dl_dir")
wf.connect(s3_scan_params, "local_path", outputnode, "scan_params")
return wf
[docs]
def calc_delta_te_and_asym_ratio(
effective_echo_spacing: float, echo_times: list
) -> tuple[float, float]:
"""Calcluate ``deltaTE`` and ``ees_asym_ratio`` from given metadata.
Parameters
----------
effective_echo_spacing : float
EffectiveEchoSpacing from sidecar JSON
echo_times : list
Returns
-------
deltaTE : float
ees_asym_ratio : float
"""
if not isinstance(effective_echo_spacing, float):
msg = (
"C-PAC could not find `EffectiveEchoSpacing` in "
"either fmap or func sidecar JSON, but that field "
"is required for PhaseDiff distortion correction."
)
raise LookupError(msg)
# convert into milliseconds if necessary
# these values will/should never be more than 10ms
if (
((echo_times[0] * 1000) < 10) # noqa: PLR2004
and ((echo_times[1] * 1000) < 10) # noqa: PLR2004
):
echo_times[0] = echo_times[0] * 1000
echo_times[1] = echo_times[1] * 1000
deltaTE = abs(echo_times[0] - echo_times[1])
ees_asym_ratio = effective_echo_spacing / deltaTE
return deltaTE, ees_asym_ratio
[docs]
def gather_echo_times(echotime_1, echotime_2, echotime_3=None, echotime_4=None):
"""Gather the echo times from the field map data."""
echotime_list = [echotime_1, echotime_2, echotime_3, echotime_4]
echotime_list = list(filter(lambda item: item is not None, echotime_list))
echotime_list = list(set(echotime_list))
if len(echotime_list) != 2: # noqa: PLR2004
msg = (
"\n[!] Something went wrong with the field map echo times - there should"
f" be two distinct values.\n\nEcho Times:\n{echotime_list}\n"
)
raise ValueError(msg)
return echotime_list
[docs]
def match_epi_fmaps(
bold_pedir,
epi_fmap_one,
epi_fmap_params_one,
epi_fmap_two=None,
epi_fmap_params_two=None,
):
"""Match EPI field maps to the BOLD scan.
Parse the field map files in the data configuration and determine which
ones have the same and opposite phase-encoding directions as the BOLD scan
in the current pipeline.
Example - parse the files under the 'fmap' level, i.e. 'epi_AP':
anat: /path/to/T1w.nii.gz
fmap:
epi_AP:
scan: /path/to/field-map.nii.gz
scan_parameters: <config dictionary containing phase-encoding
direction>
func:
rest_1:
scan: /path/to/bold.nii.gz
scan_parameters: <config dictionary of BOLD scan parameters>
1. Check PhaseEncodingDirection field in the metadata for the BOLD.
2. Check whether there are one or two EPI's in the field map data.
3. Grab the one or two EPI field maps.
"""
fmap_dct = {epi_fmap_one: epi_fmap_params_one}
if epi_fmap_two and epi_fmap_params_two:
fmap_dct[epi_fmap_two] = epi_fmap_params_two
opposite_pe_epi = None
same_pe_epi = None
for epi_scan in fmap_dct.keys():
scan_params = fmap_dct[epi_scan]
if not isinstance(scan_params, dict) and ".json" in scan_params:
with open(scan_params, "r") as f:
scan_params = json.load(f)
if "PhaseEncodingDirection" in scan_params:
epi_pedir = scan_params["PhaseEncodingDirection"]
if epi_pedir == bold_pedir:
same_pe_epi = epi_scan
elif epi_pedir[0] == bold_pedir[0]:
opposite_pe_epi = epi_scan
return (opposite_pe_epi, same_pe_epi)
[docs]
def create_general_datasource(wf_name):
"""Create a general-purpose datasource node."""
import nipype.interfaces.utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(
util.IdentityInterface(
fields=["unique_id", "data", "scan", "creds_path", "dl_dir"],
mandatory_inputs=True,
),
name="inputnode",
)
check_s3_node = pe.Node(
function.Function(
input_names=["file_path", "creds_path", "dl_dir", "img_type"],
output_names=["local_path"],
function=check_for_s3,
as_module=True,
),
name="check_for_s3",
)
check_s3_node.inputs.img_type = "other"
wf.connect(inputnode, "data", check_s3_node, "file_path")
wf.connect(inputnode, "creds_path", check_s3_node, "creds_path")
wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir")
outputnode = pe.Node(
util.IdentityInterface(fields=["unique_id", "data", "scan"]), name="outputspec"
)
wf.connect(inputnode, "unique_id", outputnode, "unique_id")
wf.connect(inputnode, "scan", outputnode, "scan")
wf.connect(check_s3_node, "local_path", outputnode, "data")
return wf
[docs]
def create_check_for_s3_node(
name, file_path, img_type="other", creds_path=None, dl_dir=None, map_node=False
):
"""Create a node to check if a file is on S3."""
if map_node:
check_s3_node = pe.MapNode(
function.Function(
input_names=["file_path", "creds_path", "dl_dir", "img_type"],
output_names=["local_path"],
function=check_for_s3,
as_module=True,
),
iterfield=["file_path"],
name=f"check_for_s3_{name}",
)
else:
check_s3_node = pe.Node(
function.Function(
input_names=["file_path", "creds_path", "dl_dir", "img_type"],
output_names=["local_path"],
function=check_for_s3,
as_module=True,
),
name=f"check_for_s3_{name}",
)
check_s3_node.inputs.set(
file_path=file_path, creds_path=creds_path, dl_dir=dl_dir, img_type=img_type
)
return check_s3_node
[docs]
def check_for_s3(
file_path, creds_path=None, dl_dir=None, img_type="other", verbose=False
):
"""Check if passed-in file is on S3."""
# Import packages
import os
import botocore.exceptions
import nibabel as nib
from indi_aws import fetch_creds
# Init variables
s3_str = "s3://"
if creds_path:
if "None" in creds_path or "none" in creds_path or "null" in creds_path:
creds_path = None
if dl_dir is None:
dl_dir = os.getcwd()
if file_path is None:
# in case it's something like scan parameters or field map files, but
# we don't have any
return None
# TODO: remove this once scan parameter input as dictionary is phased out
if isinstance(file_path, dict):
# if this is a dictionary, just skip altogether
return file_path
if file_path.lower().startswith(s3_str):
file_path = s3_str + file_path[len(s3_str) :]
# Get bucket name and bucket object
bucket_name = file_path[len(s3_str) :].split("/")[0]
# Extract relative key path from bucket and local path
s3_prefix = s3_str + bucket_name
s3_key = file_path[len(s3_prefix) + 1 :]
local_path = os.path.join(dl_dir, bucket_name, s3_key)
# Get local directory and create folders if they dont exist
local_dir = os.path.dirname(local_path)
if not os.path.exists(local_dir):
os.makedirs(local_dir, exist_ok=True)
if os.path.exists(local_path):
FMLOGGER.info("%s already exists- skipping download.", local_path)
else:
# Download file
try:
bucket = fetch_creds.return_bucket(creds_path, bucket_name)
FMLOGGER.info("Attempting to download from AWS S3: %s", file_path)
bucket.download_file(Key=s3_key, Filename=local_path)
except botocore.exceptions.ClientError as exc:
error_code = int(exc.response["Error"]["Code"])
err_msg = str(exc)
if error_code == 403: # noqa: PLR2004
err_msg = (
f'Access to bucket: "{bucket_name}" is denied; using'
f' credentials in subject list: "{creds_path}"; cannot access'
f' the file "{file_path}"'
)
error_type = PermissionError
elif error_code == 404: # noqa: PLR2004
err_msg = (
f"File: {os.path.join(bucket_name, s3_key)} does not exist;"
" check spelling and try again"
)
error_type = FileNotFoundError
else:
err_msg = (
f'Unable to connect to bucket: "{bucket_name}". Error message:'
f"\n{exc}"
)
error_type = ConnectionError
raise error_type(err_msg)
except Exception as exc:
err_msg = (
f'Unable to connect to bucket: "{bucket_name}". Error message:'
f"\n{exc}"
)
raise ConnectionError(err_msg)
# Otherwise just return what was passed in, resolving if a link
else:
local_path = os.path.realpath(file_path)
# Check if it exists or it is successfully downloaded
if not os.path.exists(local_path):
# alert users to 2020-07-20 Neuroparc atlas update (v0 to v1)
ndmg_atlases = {}
with open(
os.path.join(
os.path.dirname(os.path.dirname(__file__)),
"resources/templates/ndmg_atlases.csv",
)
) as ndmg_atlases_file:
ndmg_atlases["v0"], ndmg_atlases["v1"] = zip(
*[
(
f"/ndmg_atlases/label/Human/{atlas[0]}",
f"/ndmg_atlases/label/Human/{atlas[1]}",
)
for atlas in csv.reader(ndmg_atlases_file)
]
)
if local_path in ndmg_atlases["v0"]:
from CPAC.utils.docs import DOCS_URL_PREFIX
msg = (
"Neuroparc atlas paths were updated on July 20, 2020. C-PAC"
" configuration files using Neuroparc v0 atlas paths (including C-PAC"
" default and preconfigured pipeline configurations from v1.6.2a and"
" earlier) need to be updated to use Neuroparc atlases. Your current"
f" configuration includes the Neuroparc v0 path {local_path} which"
" needs to be updated to"
f" {ndmg_atlases['v1'][ndmg_atlases['v0'].index(local_path)]}. For a"
f" full list such paths, see {DOCS_URL_PREFIX}/user/ndmg_atlases"
)
else:
msg = f"File {local_path} does not exist!"
raise FileNotFoundError(msg)
if verbose:
FMLOGGER.info("Downloaded file:\n%s\n", local_path)
# Check image dimensionality
if local_path.endswith(".nii") or local_path.endswith(".nii.gz"):
img_nii = nib.load(local_path)
if img_type == "anat":
if len(img_nii.shape) != 3: # noqa: PLR2004
msg = (
f"File: {local_path} must be an anatomical image with 3 "
f"dimensions but {len(img_nii.shape)} dimensions found!"
)
elif img_type == "func":
if len(img_nii.shape) not in [3, 4]:
msg = (
f"File: {local_path} must be a functional image with 3 or "
f"4 dimensions but {len(img_nii.shape)} dimensions found!"
)
raise IOError(msg)
return local_path
[docs]
def get_highest_local_res(template: Path | str, tagname: str) -> Path:
"""Return the highest resolution of a template in the same local path.
Given a reference template path and a resolution string, get all
resolutions of that template in the same local path and return the
highest resolution.
Parameters
----------
template : Path or str
tagname : str
Returns
-------
str
Raises
------
LookupError
If no matching local template is found.
Examples
--------
>>> get_highest_local_res(
... '/cpac_templates/MacaqueYerkes19_T1w_2mm_brain.nii.gz', '2mm')
PosixPath('/cpac_templates/MacaqueYerkes19_T1w_0.5mm_brain.nii.gz')
>>> get_highest_local_res(
... '/cpac_templates/dne_T1w_2mm.nii.gz', '2mm')
Traceback (most recent call last):
...
LookupError: Could not find template /cpac_templates/dne_T1w_2mm.nii.gz
"""
from CPAC.pipeline.schema import RESOLUTION_REGEX
if isinstance(template, str):
template = Path(template)
template_pattern = (
RESOLUTION_REGEX.replace("^", "")
.replace("$", "")
.join([re.escape(_part) for _part in template.name.split(tagname, 1)])
)
matching_templates = [
file
for file in template.parent.iterdir()
if re.match(template_pattern, file.name)
]
matching_templates.sort()
try:
return matching_templates[0]
except (FileNotFoundError, IndexError):
msg = f"Could not find template {template}"
raise LookupError(msg)
[docs]
def res_string_to_tuple(resolution):
"""Convert a resolution string to a tuple of floats.
Parameters
----------
resolution : str
Resolution string, e.g. "3.438mmx3.438mmx3.4mm"
Returns
-------
resolution :tuple
Tuple of floats, e.g. (3.438, 3.438, 3.4)
"""
if "x" in str(resolution):
return tuple(float(i.replace("mm", "")) for i in resolution.split("x"))
return (float(resolution.replace("mm", "")),) * 3
[docs]
def resolve_resolution(resolution, template, template_name, tag=None):
"""Resample a template to a given resolution."""
from nipype.interfaces import afni
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.utils.datasource import check_for_s3
tagname = None
local_path = None
if "{" in template and tag is not None:
tagname = "${" + tag + "}"
try:
if tagname is not None:
local_path = check_for_s3(template.replace(tagname, str(resolution)))
except (IOError, OSError):
local_path = None
## TODO debug - it works in ipython but doesn't work in nipype wf
# try:
# local_path = check_for_s3('/usr/local/fsl/data/standard/MNI152_T1_3.438mmx3.438mmx3.4mm_brain_mask_dil.nii.gz')
# except (IOError, OSError):
# local_path = None
if local_path is None:
if tagname is not None:
if template.startswith("s3:"):
ref_template = template.replace(tagname, "1mm")
local_path = check_for_s3(ref_template)
else:
local_path = get_highest_local_res(template, tagname)
elif tagname is None and template.startswith("s3:"):
local_path = check_for_s3(template)
else:
local_path = template
resample = pe.Node(
interface=afni.Resample(),
name=template_name,
mem_gb=0,
mem_x=(0.0115, "in_file", "t"),
)
resample.inputs.voxel_size = res_string_to_tuple(resolution)
resample.inputs.outputtype = "NIFTI_GZ"
resample.inputs.resample_mode = "Cu"
resample.inputs.in_file = local_path
resample.base_dir = "."
resampled_template = resample.run()
local_path = resampled_template.outputs.out_file
return local_path
[docs]
def create_anat_datasource(wf_name="anat_datasource"):
"""Create a dataflow for anatomical images."""
import nipype.interfaces.utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(
util.IdentityInterface(
fields=["subject", "anat", "creds_path", "dl_dir", "img_type"],
mandatory_inputs=True,
),
name="inputnode",
)
check_s3_node = pe.Node(
function.Function(
input_names=["file_path", "creds_path", "dl_dir", "img_type"],
output_names=["local_path"],
function=check_for_s3,
as_module=True,
),
name="check_for_s3",
)
wf.connect(inputnode, "anat", check_s3_node, "file_path")
wf.connect(inputnode, "creds_path", check_s3_node, "creds_path")
wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir")
wf.connect(inputnode, "img_type", check_s3_node, "img_type")
outputnode = pe.Node(
util.IdentityInterface(fields=["subject", "anat"]), name="outputspec"
)
wf.connect(inputnode, "subject", outputnode, "subject")
wf.connect(check_s3_node, "local_path", outputnode, "anat")
# Return the workflow
return wf
[docs]
def create_roi_mask_dataflow(masks, wf_name="datasource_roi_mask"):
"""Create a dataflow for ROI masks."""
import os
mask_dict = {}
for _mask_file in masks:
mask_file = _mask_file.rstrip("\r\n")
if mask_file.strip() == "" or mask_file.startswith("#"):
continue
name, desc = lookup_identifier(mask_file)
if name == "template":
base_file = os.path.basename(mask_file)
try:
valid_extensions = [".nii", ".nii.gz"]
base_name = next(
base_file[: -len(ext)]
for ext in valid_extensions
if base_file.endswith(ext)
)
for key in ["res", "space"]:
base_name = bids_remove_entity(base_name, key)
except IndexError:
# pylint: disable=raise-missing-from
msg = (
"Error in spatial_map_dataflow: File "
f'extension of {base_file} not ".nii" or '
".nii.gz"
)
raise ValueError(msg)
except Exception as e:
raise e
else:
base_name = format_identifier(name, desc)
if base_name in mask_dict:
msg = (
"Duplicate templates/atlases not allowed: "
f"{mask_file} {mask_dict[base_name]}"
)
raise ValueError(msg)
mask_dict[base_name] = mask_file
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(
util.IdentityInterface(
fields=["mask", "mask_file", "creds_path", "dl_dir"], mandatory_inputs=True
),
name="inputspec",
)
mask_keys, mask_values = zip(*mask_dict.items())
inputnode.synchronize = True
inputnode.iterables = [
("mask", mask_keys),
("mask_file", mask_values),
]
check_s3_node = pe.Node(
function.Function(
input_names=["file_path", "creds_path", "dl_dir", "img_type"],
output_names=["local_path"],
function=check_for_s3,
as_module=True,
),
name="check_for_s3",
)
wf.connect(inputnode, "mask_file", check_s3_node, "file_path")
wf.connect(inputnode, "creds_path", check_s3_node, "creds_path")
wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir")
check_s3_node.inputs.img_type = "mask"
outputnode = pe.Node(
util.IdentityInterface(fields=["out_file", "out_name"]), name="outputspec"
)
wf.connect(check_s3_node, "local_path", outputnode, "out_file")
wf.connect(inputnode, "mask", outputnode, "out_name")
return wf
[docs]
def create_spatial_map_dataflow(spatial_maps, wf_name="datasource_maps"):
"""Create a dataflow for spatial maps."""
import os
wf = pe.Workflow(name=wf_name)
spatial_map_dict = {}
for _spatial_map_file in spatial_maps:
spatial_map_file = _spatial_map_file.rstrip("\r\n")
base_file = os.path.basename(spatial_map_file)
try:
valid_extensions = [".nii", ".nii.gz"]
base_name = next(
base_file[: -len(ext)]
for ext in valid_extensions
if base_file.endswith(ext)
)
if base_name in spatial_map_dict:
msg = (
f"Files with same name not allowed: {spatial_map_file}"
f" {spatial_map_dict[base_name]}"
)
raise ValueError(msg)
spatial_map_dict[base_name] = spatial_map_file
except IndexError:
msg = (
"Error in spatial_map_dataflow: File extension not in .nii and .nii.gz"
)
raise ValueError(msg)
inputnode = pe.Node(
util.IdentityInterface(
fields=["spatial_map", "spatial_map_file", "creds_path", "dl_dir"],
mandatory_inputs=True,
),
name="inputspec",
)
spatial_map_keys, spatial_map_values = zip(*spatial_map_dict.items())
inputnode.synchronize = True
inputnode.iterables = [
("spatial_map", spatial_map_keys),
("spatial_map_file", spatial_map_values),
]
check_s3_node = pe.Node(
function.Function(
input_names=["file_path", "creds_path", "dl_dir", "img_type"],
output_names=["local_path"],
function=check_for_s3,
as_module=True,
),
name="check_for_s3",
)
wf.connect(inputnode, "spatial_map_file", check_s3_node, "file_path")
wf.connect(inputnode, "creds_path", check_s3_node, "creds_path")
wf.connect(inputnode, "dl_dir", check_s3_node, "dl_dir")
check_s3_node.inputs.img_type = "mask"
select_spatial_map = pe.Node(
util.IdentityInterface(fields=["out_file", "out_name"], mandatory_inputs=True),
name="select_spatial_map",
)
wf.connect(check_s3_node, "local_path", select_spatial_map, "out_file")
wf.connect(inputnode, "spatial_map", select_spatial_map, "out_name")
return wf
[docs]
def create_grp_analysis_dataflow(wf_name="gp_dataflow"):
"""Create a dataflow for group analysis."""
import nipype.interfaces.utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.utils.datasource import select_model_files
wf = pe.Workflow(name=wf_name)
inputnode = pe.Node(
util.IdentityInterface(
fields=["ftest", "grp_model", "model_name"], mandatory_inputs=True
),
name="inputspec",
)
selectmodel = pe.Node(
function.Function(
input_names=["model", "ftest", "model_name"],
output_names=["fts_file", "con_file", "grp_file", "mat_file"],
function=select_model_files,
as_module=True,
),
name="selectnode",
)
wf.connect(inputnode, "ftest", selectmodel, "ftest")
wf.connect(inputnode, "grp_model", selectmodel, "model")
wf.connect(inputnode, "model_name", selectmodel, "model_name")
outputnode = pe.Node(
util.IdentityInterface(
fields=["fts", "grp", "mat", "con"], mandatory_inputs=True
),
name="outputspec",
)
wf.connect(selectmodel, "mat_file", outputnode, "mat")
wf.connect(selectmodel, "grp_file", outputnode, "grp")
wf.connect(selectmodel, "fts_file", outputnode, "fts")
wf.connect(selectmodel, "con_file", outputnode, "con")
return wf
[docs]
def resample_func_roi(in_func, in_roi, realignment, identity_matrix):
"""Resample functional image to ROI or ROI to functional image using flirt."""
import os
import nibabel as nib
from CPAC.utils.monitoring.custom_logging import log_subprocess
# load func and ROI dimension
func_img = nib.load(in_func)
func_shape = func_img.shape
roi_img = nib.load(in_roi)
roi_shape = roi_img.shape
# check if func size = ROI size, return func and ROI; else resample using flirt
if roi_shape != func_shape:
# resample func to ROI: in_file = func, reference = ROI
if "func_to_ROI" in realignment:
in_file = in_func
reference = in_roi
out_file = os.path.join(
os.getcwd(),
in_file[in_file.rindex("/") + 1 : in_file.rindex(".nii")]
+ "_resampled.nii.gz",
)
out_func = out_file
out_roi = in_roi
interp = "trilinear"
# resample ROI to func: in_file = ROI, reference = func
elif "ROI_to_func" in realignment:
in_file = in_roi
reference = in_func
out_file = os.path.join(
os.getcwd(),
in_file[in_file.rindex("/") + 1 : in_file.rindex(".nii")]
+ "_resampled.nii.gz",
)
out_func = in_func
out_roi = out_file
interp = "nearestneighbour"
cmd = [
"flirt",
"-in",
in_file,
"-ref",
reference,
"-out",
out_file,
"-interp",
interp,
"-applyxfm",
"-init",
identity_matrix,
]
log_subprocess(cmd)
else:
out_func = in_func
out_roi = in_roi
return out_func, out_roi