# Copyright (C) 2016-2023 C-PAC Developers
# This file is part of C-PAC.
# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
import json
import os
import re
import sys
from warnings import warn
import yaml
[docs]def bids_decode_fname(file_path, dbg=False, raise_error=True):
f_dict = {}
fname = os.path.basename(file_path)
# first lets make sure that we know how to handle the file
if 'nii' not in fname.lower() and 'json' not in fname.lower():
raise IOError("File (%s) does not appear to be" % fname +
"a nifti or json file")
if dbg:
print("parsing %s" % file_path)
# first figure out if there is a site directory level, this isn't
# specified in BIDS currently, but hopefully will be in the future
file_path_vals = os.path.dirname(file_path).split('/')
sub = [s for s in file_path_vals if 'sub-' in s]
if dbg:
print("found subject %s in %s" % (sub, str(file_path_vals)))
if len(sub) > 1:
print("Odd that there is more than one subject directory" +
"in (%s), does the filename conform to" % file_path +
" BIDS format?")
if sub:
sub_ndx = file_path_vals.index(sub[0])
if sub_ndx > 0 and file_path_vals[sub_ndx - 1]:
if dbg:
print("setting site to %s" % (file_path_vals[sub_ndx - 1]))
f_dict["site"] = file_path_vals[sub_ndx - 1]
else:
f_dict["site"] = "none"
elif file_path_vals[-1]:
if dbg:
print("looking for subject id didn't pan out settling for last"+
"subdir %s" % (str(file_path_vals[-1])))
f_dict["site"] = file_path_vals[-1]
else:
f_dict["site"] = "none"
f_dict["site"] = re.sub(r'[\s\-\_]+', '', f_dict["site"])
fname = fname.split(".")[0]
# convert the filename string into a dictionary to pull out the other
# key value pairs
for key_val_pair in fname.split("_"):
# if the chunk has the shape key-val store key: val in f_dict
if "-" in key_val_pair:
chunks = key_val_pair.split("-")
f_dict[chunks[0]] = "-".join(chunks[1:])
else:
f_dict["scantype"] = key_val_pair.split(".")[0]
if "scantype" not in f_dict:
msg = "Filename ({0}) does not appear to contain" \
" scan type, does it conform to the BIDS format?".format(fname)
if raise_error:
raise ValueError(msg)
else:
print(msg)
elif not f_dict["scantype"]:
msg = "Filename ({0}) does not appear to contain" \
" scan type, does it conform to the BIDS format?".format(fname)
if raise_error:
raise ValueError(msg)
else:
print(msg)
else:
if 'bold' in f_dict["scantype"] and not f_dict["task"]:
msg = "Filename ({0}) is a BOLD file, but " \
"doesn't contain a task, does it conform to the" \
" BIDS format?".format(fname)
if raise_error:
raise ValueError(msg)
else:
print(msg)
return f_dict
[docs]def bids_entities_from_filename(filename):
"""Function to collect a list of BIDS entities from a given
filename.
Parameters
----------
filename : str
Returns
-------
entities : list
Examples
--------
>>> bids_entities_from_filename(
... 's3://fake/data/sub-0001/ses-NFB3/func/'
... 'sub-0001_ses-NFB3_task-MSIT_bold.nii.gz')
['sub-0001', 'ses-NFB3', 'task-MSIT', 'bold']
"""
return (
filename.split('/')[-1] if '/' in filename else filename
).split('.')[0].split('_')
[docs]def bids_match_entities(file_list, entities, suffix):
"""Function to subset a list of filepaths by a passed BIDS entity.
Parameters
----------
file_list : list of str
entities : str
BIDS entities joined by underscores (e.g., 'ses-001_task-PEER1')
suffix : str
BIDS suffix (e.g., 'bold', 'T1w')
Returns
-------
list of str
Examples
--------
>>> bids_match_entities([
... 's3://fake/data/sub-001_ses-001_task-MSIT_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
... ], 'task-PEER1', 'bold')
['s3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz']
>>> bids_match_entities([
... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
... ], 'PEER', 'bold')
Traceback (most recent call last):
LookupError: No match found for provided entity "PEER" in
- s3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz
- s3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz
Perhaps you meant one of these?
- task-PEER1
- task-PEER2
"""
matches = [
file for file in file_list if (
f'_{entities}_' in '_'.join(
bids_entities_from_filename(file)
) and bids_entities_from_filename(file)[-1] == suffix
) or bids_entities_from_filename(file)[-1] != suffix
]
if file_list and not matches:
pp_file_list = '\n'.join([f'- {file}' for file in file_list])
error_message = ' '.join([
'No match found for provided',
'entity' if len(entities.split('_')) == 1 else 'entities',
f'"{entities}" in\n{pp_file_list}'
])
partial_matches = [match.group() for match in [
re.search(re.compile(f'[^_]*{entities}[^_]*'), file) for
file in file_list
] if match is not None]
if partial_matches:
if len(partial_matches) == 1:
error_message += f'\nPerhaps you meant "{partial_matches[0]}"?'
else:
error_message = '\n'.join([
error_message,
'Perhaps you meant one of these?',
*[f'- {match}' for match in partial_matches]
])
raise LookupError(error_message)
return matches
[docs]def bids_remove_entity(name, key):
"""Remove an entity from a BIDS string by key
Parameters
----------
name : str
BIDS string to remove entity from
key : str
BIDS key of entity to remove
Returns
-------
str
BIDS name with entity removed
Examples
--------
>>> bids_remove_entity('atlas-Yeo_space-MNI152NLin6_res-2x2x2', 'space')
'atlas-Yeo_res-2x2x2'
>>> bids_remove_entity('atlas-Yeo_space-MNI152NLin6_res-2x2x2', 'res')
'atlas-Yeo_space-MNI152NLin6'
"""
return '_'.join(entity for entity in bids_entities_from_filename(name)
if not entity.startswith(f'{key.rstrip("-")}-'))
[docs]def bids_retrieve_params(bids_config_dict, f_dict, dbg=False):
"""
Retrieve the BIDS parameters from bids_config_dict for BIDS file
corresponding to f_dict. If an exact match for f_dict is not found
the nearest match is returned, corresponding to the BIDS inheritance
principle.
:param bids_config_dict: BIDS configuration dictionary, this is a
multi-level dictionary that maps the components of a bids filename
(i.e. sub, ses, acq, run) to a dictionary that contains the BIDS
parameters (RepetitionTime, EchoTime, etc). This information is
extracted from sidecar json files using the principle of inheritance
using the bids_parse_configs function
:param f_dict: Dictionary built from the name of a file in the BIDS
format. This is built using the bids_decode_fname by splitting on
"-" and "_" delimeters
:param dbg: boolean flag that indicates whether or not debug statements
should be printed, defaults to "False"
:return: returns a dictionary that contains the BIDS parameters
"""
params = {}
t_dict = bids_config_dict # pointer to current dictionary
# try to populate the configuration using information
# already in the list
for level in ['scantype', 'site', 'sub', 'ses', 'task', 'acq',
'rec', 'dir', 'run']:
if level in f_dict:
key = "-".join([level, f_dict[level]])
else:
key = "-".join([level, "none"])
if dbg:
print(key)
# if the key doesn't exist in the config dictionary, check to see if
# the generic key exists and return that
if key in t_dict:
t_dict = t_dict[key]
else:
if dbg:
print("Couldn't find %s, so going with %s" % (key,
"-".join([level, "none"])))
key = "-".join([level, "none"])
if key in t_dict:
t_dict = t_dict[key]
# if we have an image parameter dictionary at this level, use it to
# initialize our configuration we look for "RepetitionTime", because
# according to the spec it is a mandatory parameter for JSON
# sidecar files
if dbg:
print(t_dict)
for key in t_dict.keys():
if 'RepetitionTime' in key:
params = t_dict
break
for k, v in params.items():
if isinstance(v, str):
params[k] = v.encode('ascii', errors='ignore')
return params
[docs]def bids_parse_sidecar(config_dict, dbg=False, raise_error=True):
# type: (dict, bool) -> dict
"""
Uses the BIDS principle of inheritance to build a data structure that
maps parameters in side car .json files to components in the names of
corresponding nifti files.
:param config_dict: dictionary that maps paths of sidecar json files
(the key) to a dictionary containing the contents of the files (the values)
:param dbg: boolean flag that indicates whether or not debug statements
should be printed
:return: a dictionary that maps parameters to components from BIDS filenames
such as sub, sess, run, acq, and scan type
"""
# we are going to build a large-scale data structure, consisting of many
# levels of dictionaries to hold the data.
bids_config_dict = {}
# initialize 'default' entries, this essentially is a pointer traversal
# of the dictionary
t_dict = bids_config_dict
for level in ['scantype', 'site', 'sub', 'ses', 'task',
'acq', 'rec', 'dir', 'run']:
key = '-'.join([level, 'none'])
t_dict[key] = {}
t_dict = t_dict[key]
if dbg:
print(bids_config_dict)
# get the paths to the json yaml files in config_dict, the paths contain
# the information needed to map the parameters from the jsons (the vals
# of the config_dict) to corresponding nifti files. We sort the list
# by the number of path components, so that we can iterate from the outer
# most path to inner-most, which will help us address the BIDS inheritance
# principle
config_paths = sorted(
list(config_dict.keys()),
key=lambda p: len(p.split('/'))
)
if dbg:
print(config_paths)
for cp in config_paths:
if dbg:
print("processing %s" % (cp))
# decode the filepath into its various components as defined by BIDS
f_dict = bids_decode_fname(cp, raise_error=raise_error)
# handling inheritance is a complete pain, we will try to handle it by
# build the key from the bottom up, starting with the most
# parsimonious possible, incorporating configuration information that
# exists at each level
# first lets try to find any parameters that already apply at this
# level using the information in the json's file path
t_params = bids_retrieve_params(bids_config_dict, f_dict)
# now populate the parameters
bids_config = {}
if t_params:
bids_config.update(t_params)
# add in the information from this config file
t_config = config_dict[cp]
if t_config is list:
t_config = t_config[0]
try:
bids_config.update(t_config)
except ValueError:
err = "\n[!] Could not properly parse the AWS S3 path provided " \
"- please double-check the bucket and the path.\n\nNote: " \
"This could either be an issue with the path or the way " \
"the data is organized in the directory. You can also " \
"try providing a specific site sub-directory.\n\n"
raise ValueError(err)
# now put the configuration in the data structure, by first iterating
# to the location of the key, and then inserting it. When a key isn't
# defined we use the "none" value. A "none" indicates that the
# corresponding parameters apply to all possible settings of that key
# e.g. run-1, run-2, ... will all map to run-none if no jsons
# explicitly define values for those runs
t_dict = bids_config_dict # pointer to current dictionary
for level in ['scantype', 'site', 'sub', 'ses', 'task', 'acq',
'rec', 'dir', 'run']:
if level in f_dict:
key = "-".join([level, f_dict[level]])
else:
key = "-".join([level, "none"])
if key not in t_dict:
t_dict[key] = {}
t_dict = t_dict[key]
t_dict.update(bids_config)
return(bids_config_dict)
[docs]def bids_shortest_entity(file_list):
"""Function to return the single file with the shortest chain of
BIDS entities from a given list, returning the first if more than
one have the same minimum length.
Parameters
----------
file_list : list of strings
Returns
-------
str or None
Examples
--------
>>> bids_shortest_entity([
... 's3://fake/data/sub-001_ses-001_task-MSIT_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
... ])
's3://fake/data/sub-001_ses-001_bold.nii.gz'
"""
entity_lists = [
bids_entities_from_filename(filename) for filename in file_list
]
if not entity_lists:
return None
shortest_len = min(len(entity_list) for entity_list in entity_lists)
shortest_list = [
file_list[i] for i in range(len(file_list)) if
len(entity_lists[i]) == shortest_len
]
return shortest_list[0] if len(shortest_list) == 1 else shortest_list
[docs]def gen_bids_outputs_sublist(base_path, paths_list, key_list, creds_path):
import copy
func_keys = ["functional_to_anat_linear_xfm", "motion_params",
"movement_parameters", "motion_correct"]
top_keys = list(set(key_list) - set(func_keys))
bot_keys = list(set(key_list).intersection(func_keys))
subjdict = {}
if not base_path.endswith('/'):
base_path = base_path + '/'
# output directories are a bit different than standard BIDS, so
# we handle things differently
for p in paths_list:
p = p.rstrip()
# find the participant and session info which should be at
# some level in the path
path_base = p.replace(base_path, '')
subj_info = path_base.split('/')[0]
resource = path_base.split('/')[1]
if resource not in key_list:
continue
if subj_info not in subjdict:
subjdict[subj_info] = {"subj_info": subj_info}
if creds_path:
subjdict[subj_info]["creds_path"] = creds_path
if resource in func_keys:
run_info = path_base.split('/')[2]
if "funcs" not in subjdict[subj_info]:
subjdict[subj_info]["funcs"] = {}
if run_info not in subjdict[subj_info]["funcs"]:
subjdict[subj_info]["funcs"][run_info] = {'run_info': run_info}
if resource in subjdict[subj_info]["funcs"][run_info]:
print("warning resource %s already exists in subjdict ??" %
(resource))
subjdict[subj_info]["funcs"][run_info][resource] = p
else:
subjdict[subj_info][resource] = p
sublist = []
for subj_info, subj_res in subjdict.items():
missing = 0
for tkey in top_keys:
if tkey not in subj_res:
print("%s not found for %s" % (tkey, subj_info))
missing += 1
break
if missing == 0:
for func_key, func_res in subj_res["funcs"].items():
for bkey in bot_keys:
if bkey not in func_res:
print("%s not found for %s" % (bkey,
func_key))
missing += 1
break
if missing == 0:
print("adding: %s, %s, %d" % (subj_info,
func_key,
len(sublist)))
tdict = copy.deepcopy(subj_res)
del tdict["funcs"]
tdict.update(func_res)
sublist.append(tdict)
return sublist
[docs]def bids_gen_cpac_sublist(bids_dir, paths_list, config_dict, creds_path,
dbg=False, raise_error=True, only_one_anat=True):
"""
Generates a CPAC formatted subject list from information contained in a
BIDS formatted set of data.
Parameters
----------
bids_dir : str
base directory that contains all of the data, this could be a
directory that contains data for a multiple BIDS datasets, in
which case the intervening directories will be interpreted as
site names
paths_list : str
lists of all nifti files found in bids_dir, these paths are
relative to bids_dir
config_dict : dict
dictionary that contains information from the JSON sidecars
found in bids_dir, keys are relative paths and values are
dictionaries containing all of the parameter information. if
config_dict is None, the subject list will be built without the
parameters
creds_path : str
if using S3 bucket, this path credentials needed to access the
bucket, if accessing anonymous bucket, this can be set to None
dbg : bool
indicating whether or not the debug statements should be
printed
raise_error : bool
only_one_anat : bool
The "anat" key for a subject expects a string value, but we can
temporarily store a list instead by passing True here if we
will be filtering that list down to a single string later
Returns
-------
list
a list of dictionaries suitable for use by CPAC to specify data
to be processed
"""
if dbg:
print("gen_bids_sublist called with:")
print(" bids_dir: {0}".format(bids_dir))
print(" # paths: {0}".format(str(len(paths_list))))
print(" config_dict: {0}".format(
"missing" if not config_dict else "found")
)
print(" creds_path: {0}".format(creds_path))
# if configuration information is not desired, config_dict will be empty,
# otherwise parse the information in the sidecar json files into a dict
# we can use to extract data for our nifti files
if config_dict:
bids_config_dict = bids_parse_sidecar(config_dict,
raise_error=raise_error)
subdict = {}
for p in paths_list:
if bids_dir in p:
str_list = p.split(bids_dir)
val = str_list[0]
val = val.rsplit('/')
val = val[0]
else:
str_list = p.split('/')
val = str_list[0]
if 'sub-' not in val:
continue
p = p.rstrip()
f = os.path.basename(p)
if f.endswith(".nii") or f.endswith(".nii.gz"):
f_dict = bids_decode_fname(p, raise_error=raise_error)
if config_dict:
t_params = bids_retrieve_params(bids_config_dict,
f_dict)
if not t_params:
print("Did not receive any parameters for %s," % (p) +
" is this a problem?")
task_info = {"scan": os.path.join(bids_dir, p),
"scan_parameters": t_params.copy()}
else:
task_info = os.path.join(bids_dir, p)
if "ses" not in f_dict:
f_dict["ses"] = "1"
if "sub" not in f_dict:
raise IOError("sub not found in %s," % (p) +
" perhaps it isn't in BIDS format?")
if f_dict["sub"] not in subdict:
subdict[f_dict["sub"]] = {}
subjid = "-".join(["sub", f_dict["sub"]])
if f_dict["ses"] not in subdict[f_dict["sub"]]:
subdict[f_dict["sub"]][f_dict["ses"]] = \
{"creds_path": creds_path,
"site_id": "-".join(["site", f_dict["site"]]),
"subject_id": subjid,
"unique_id": "-".join(["ses", f_dict["ses"]])}
if "T1w" in f_dict["scantype"] or "T2w" in f_dict["scantype"]:
if "lesion" in f_dict.keys() and "mask" in f_dict['lesion']:
if "lesion_mask" not in \
subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["lesion_mask"] = \
task_info["scan"]
else:
print("Lesion mask file (%s) already found" %
(subdict[f_dict["sub"]]
[f_dict["ses"]]
["lesion_mask"]) +
" for (%s:%s) discarding %s" %
(f_dict["sub"], f_dict["ses"], p))
# TODO deal with scan parameters anatomical
if "anat" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"] = {}
if f_dict["scantype"] not in subdict[f_dict["sub"]][
f_dict["ses"]
]["anat"]:
if only_one_anat:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
f_dict["scantype"]
] = task_info["scan"] if config_dict else task_info
else:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
f_dict["scantype"]] = []
if not only_one_anat:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
f_dict["scantype"]].append(
task_info["scan"] if config_dict else task_info)
if "bold" in f_dict["scantype"]:
task_key = f_dict["task"]
if "run" in f_dict:
task_key = "_".join([task_key,
"-".join(["run", f_dict["run"]])])
if "acq" in f_dict:
task_key = "_".join([task_key,
"-".join(["acq", f_dict["acq"]])])
if "func" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["func"] = {}
if task_key not in \
subdict[f_dict["sub"]][f_dict["ses"]]["func"]:
if not isinstance(task_info, dict):
task_info = {"scan": task_info}
subdict[f_dict["sub"]][f_dict["ses"]]["func"][task_key] = task_info
else:
print("Func file (%s)" %
subdict[f_dict["sub"]][f_dict["ses"]]["func"][task_key] +
" already found for ( % s: %s: % s) discarding % s" % (
f_dict["sub"],
f_dict["ses"],
task_key,
p))
if "phase" in f_dict["scantype"]:
if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
if f_dict["scantype"] not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]:
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][f_dict["scantype"]] = task_info
if "magnitude" in f_dict["scantype"]:
if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
if f_dict["scantype"] not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]:
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][f_dict["scantype"]] = task_info
if "epi" in f_dict["scantype"]:
pe_dir = f_dict["dir"]
if "acq" in f_dict:
if "fMRI" in f_dict["acq"]:
if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
if "epi_{0}".format(
pe_dir
) not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]:
subdict[f_dict["sub"]][
f_dict["ses"]
]["fmap"]["epi_{0}".format(pe_dir)] = task_info
sublist = []
for ksub, sub in subdict.items():
for kses, ses in sub.items():
if "anat" in ses or "func" in ses:
sublist.append(ses)
else:
if "anat" not in ses:
print("%s %s %s is missing an anat" % (
ses["site_id"] if 'none' not in ses["site_id"] else '',
ses["subject_id"],
ses["unique_id"]
))
if "func" not in ses:
print("%s %s %s is missing an func" % (
ses["site_id"] if 'none' not in ses["site_id"] else '',
ses["subject_id"],
ses["unique_id"]
))
return sublist
[docs]def collect_bids_files_configs(bids_dir, aws_input_creds=''):
"""
:param bids_dir:
:param aws_input_creds:
:return:
"""
file_paths = []
config_dict = {}
suffixes = ['T1w', 'T2w', 'bold', 'epi', 'phasediff', 'phase1',
'phase2', 'magnitude', 'magnitude1', 'magnitude2']
if bids_dir.lower().startswith("s3://"):
# s3 paths begin with s3://bucket/
bucket_name = bids_dir.split('/')[2]
s3_prefix = '/'.join(bids_dir.split('/')[:3])
prefix = bids_dir.replace(s3_prefix, '').lstrip('/')
if aws_input_creds:
if not os.path.isfile(aws_input_creds):
raise IOError("Could not find aws_input_creds (%s)" %
(aws_input_creds))
from indi_aws import fetch_creds
bucket = fetch_creds.return_bucket(aws_input_creds, bucket_name)
print(f"gathering files from S3 bucket ({bucket}) for {prefix}")
for s3_obj in bucket.objects.filter(Prefix=prefix):
for suf in suffixes:
if suf in str(s3_obj.key):
if suf == 'epi' and 'acq-fMRI' not in s3_obj.key:
continue
if str(s3_obj.key).endswith("json"):
try:
config_dict[s3_obj.key.replace(prefix, "")
.lstrip('/')] = json.loads(
s3_obj.get()["Body"].read())
except Exception as e:
print("Error retrieving %s (%s)" %
(s3_obj.key.replace(prefix, ""),
e.message))
raise
elif 'nii' in str(s3_obj.key):
file_paths.append(str(s3_obj.key)
.replace(prefix,'').lstrip('/'))
else:
for root, dirs, files in os.walk(bids_dir, topdown=False, followlinks=True):
if files:
for f in files:
for suf in suffixes:
if suf == 'epi' and 'acq-fMRI' not in f:
continue
if 'nii' in f and suf in f:
file_paths += [os.path.join(root, f)
.replace(bids_dir, '').lstrip('/')]
if f.endswith('json') and suf in f:
try:
config_dict.update(
{os.path.join(root.replace(bids_dir, '')
.lstrip('/'), f):
json.load(
open(os.path.join(root, f), 'r')
)})
except UnicodeDecodeError:
raise Exception("Could not decode {0}".format(
os.path.join(root, f)))
if not file_paths and not config_dict:
raise IOError("Didn't find any files in {0}. Please verify that the "
"path is typed correctly, that you have read access to "
"the directory, and that it is not "
"empty.".format(bids_dir))
return file_paths, config_dict
[docs]def camelCase(string: str) -> str: # pylint: disable=invalid-name
"""Convert a hyphenated string to camelCase
Parameters
----------
string : str
string to convert to camelCase
Returns
-------
str
Examples
--------
>>> camelCase('PearsonNilearn-aCompCor')
'PearsonNilearnACompCor'
>>> camelCase('mean-Pearson-Nilearn-aCompCor')
'meanPearsonNilearnACompCor'
"""
pieces = string.split('-')
for i in range(1, len(pieces)): # don't change case of first piece
if pieces[i]: # don't do anything to falsy pieces
pieces[i] = f'{pieces[i][0].upper()}{pieces[i][1:]}'
return ''.join(pieces)
[docs]def combine_multiple_entity_instances(bids_str: str) -> str:
"""Combines mutliple instances of a key in a BIDS string to a single
instance by camelCasing and concatenating the values
Parameters
----------
bids_str : str
Returns
-------
str
Examples
--------
>>> combine_multiple_entity_instances(
... 'sub-1_ses-HBN_site-RU_task-rest_atlas-AAL_'
... 'desc-Nilearn_desc-36-param_suffix.ext')
'sub-1_ses-HBN_site-RU_task-rest_atlas-AAL_desc-Nilearn36Param_suffix.ext'
>>> combine_multiple_entity_instances(
... 'sub-1_ses-HBN_site-RU_task-rest_'
... 'run-1_framewise-displacement-power.1D')
'sub-1_ses-HBN_site-RU_task-rest_run-1_framewiseDisplacementPower.1D'
"""
_entity_list = bids_str.split('_')
entity_list = _entity_list[:-1]
suffixes = [camelCase(_entity_list[-1])]
entities = {}
for entity in entity_list:
if '-' in entity:
key, value = entity.split('-', maxsplit=1)
if key not in entities:
entities[key] = []
entities[key].append(value)
for key, value in entities.items():
entities[key] = camelCase('-'.join(value))
if 'desc' in entities: # make 'desc' final entity
suffixes.insert(0, f'desc-{entities.pop("desc")}')
return '_'.join([f'{key}-{value}' for key, value in entities.items()
] + suffixes)
[docs]def insert_entity(resource, key, value):
"""Insert a `f'{key}-{value}'` BIDS entity before `desc-` if
present or before the suffix otherwise
Parameters
----------
resource, key, value : str
Returns
-------
str
Examples
--------
>>> insert_entity('run-1_desc-preproc_bold', 'reg', 'default')
'run-1_reg-default_desc-preproc_bold'
>>> insert_entity('run-1_bold', 'reg', 'default')
'run-1_reg-default_bold'
>>> insert_entity('run-1_desc-preproc_bold', 'filt', 'notch4c0p31bw0p12')
'run-1_filt-notch4c0p31bw0p12_desc-preproc_bold'
>>> insert_entity('run-1_reg-default_bold', 'filt', 'notch4c0p31bw0p12')
'run-1_reg-default_filt-notch4c0p31bw0p12_bold'
"""
entities = resource.split('_')[:-1]
suff = resource.split('_')[-1]
new_entities = [[], []]
for entity in entities:
if entity.startswith('desc-'):
new_entities[1].append(entity)
else:
new_entities[0].append(entity)
return '_'.join([*new_entities[0], f'{key}-{value}', *new_entities[1],
suff])
[docs]def load_yaml_config(config_filename, aws_input_creds):
if config_filename.lower().startswith('data:'):
try:
header, encoded = config_filename.split(",", 1)
config_content = b64decode(encoded)
config_data = yaml.safe_load(config_content)
return config_data
except:
print("Error! Could not find load config from data URI")
raise
if config_filename.lower().startswith("s3://"):
# s3 paths begin with s3://bucket/
bucket_name = config_filename.split('/')[2]
s3_prefix = '/'.join(config_filename.split('/')[:3])
prefix = config_filename.replace(s3_prefix, '').lstrip('/')
if aws_input_creds:
if not os.path.isfile(aws_input_creds):
raise IOError("Could not find aws_input_creds (%s)" %
(aws_input_creds))
from indi_aws import fetch_creds
bucket = fetch_creds.return_bucket(aws_input_creds, bucket_name)
downloaded_config = '/tmp/' + os.path.basename(config_filename)
bucket.download_file(prefix, downloaded_config)
config_filename = downloaded_config
config_filename = os.path.realpath(config_filename)
try:
config_data = yaml.safe_load(open(config_filename, 'r'))
return config_data
except IOError:
print("Error! Could not find config file {0}".format(config_filename))
raise
[docs]def cl_strip_brackets(arg_list):
"""Removes '[' from before first and ']' from after final
arguments in a list of commandline arguments
Parameters
----------
arg_list : list
Returns
-------
list
Examples
--------
>>> cl_strip_brackets('[a b c]'.split(' '))
['a', 'b', 'c']
>>> cl_strip_brackets('a b c'.split(' '))
['a', 'b', 'c']
>>> cl_strip_brackets('[ a b c ]'.split(' '))
['a', 'b', 'c']
"""
arg_list[0] = arg_list[0].lstrip('[')
arg_list[-1] = arg_list[-1].rstrip(']')
return [arg for arg in arg_list if arg]
[docs]def create_cpac_data_config(bids_dir, participant_labels=None,
aws_input_creds=None, skip_bids_validator=False,
only_one_anat=True):
"""
Create a C-PAC data config YAML file from a BIDS directory.
Parameters
----------
bids_dir : str
participant_labels : list or None
aws_input_creds
skip_bids_validator : bool
only_one_anat : bool
The "anat" key for a subject expects a string value, but we
can temporarily store a list instead by passing True here if
we will be filtering that list down to a single string later
Returns
-------
list
"""
print("Parsing {0}..".format(bids_dir))
(file_paths, config) = collect_bids_files_configs(bids_dir,
aws_input_creds)
if participant_labels and file_paths:
file_paths = [
file_path for file_path in file_paths if any(
participant_label in file_path
for participant_label in participant_labels
)
]
if not file_paths:
print("Did not find data for {0}".format(
", ".join(participant_labels)
))
sys.exit(1)
raise_error = not skip_bids_validator
sub_list = bids_gen_cpac_sublist(
bids_dir,
file_paths,
config,
aws_input_creds,
raise_error=raise_error,
only_one_anat=only_one_anat
)
if not sub_list:
print("Did not find data in {0}".format(bids_dir))
sys.exit(1)
return sub_list
[docs]def load_cpac_data_config(data_config_file, participant_labels,
aws_input_creds):
"""
Loads the file as a check to make sure it is available and readable
Parameters
----------
data_config_file : str
path to data config
participants_labels : list or None
aws_input_creds
Returns
-------
list
"""
sub_list = load_yaml_config(data_config_file, aws_input_creds)
if participant_labels:
sub_list = [
d
for d in sub_list
if (
d["subject_id"]
if d["subject_id"].startswith('sub-')
else 'sub-' + d["subject_id"]
) in participant_labels
]
if not sub_list:
print("Did not find data for {0} in {1}".format(
", ".join(participant_labels),
(
data_config_file
if not data_config_file.startswith("data:")
else "data URI"
)
))
sys.exit(1)
return sub_list
[docs]def res_in_filename(cfg, label):
"""Specify resolution in filename
Parameters
----------
cfg : CPAC.utils.configuration.Configuration
label : str
Returns
-------
label : str
Examples
--------
>>> from CPAC.utils.configuration import Configuration
>>> res_in_filename(Configuration({
... 'registration_workflows': {
... 'anatomical_registration': {'resolution_for_anat': '2x2x2'}}}),
... 'sub-1_res-anat_bold')
'sub-1_res-2x2x2_bold'
>>> res_in_filename(Configuration({
... 'registration_workflows': {
... 'anatomical_registration': {'resolution_for_anat': '2x2x2'}}}),
... 'sub-1_res-3mm_bold')
'sub-1_res-3mm_bold'
"""
if '_res-' in label:
# replace resolution text with actual resolution
resolution = label.split('_res-', 1)[1].split('_', 1)[0]
resolution = {
'anat': cfg['registration_workflows', 'anatomical_registration',
'resolution_for_anat'],
'bold': cfg['registration_workflows', 'functional_registration',
'func_registration_to_template', 'output_resolution',
'func_preproc_outputs'],
'derivative': cfg['registration_workflows',
'functional_registration',
'func_registration_to_template',
'output_resolution', 'func_derivative_outputs']
}.get(resolution, resolution)
label = re.sub('_res-[A-Za-z0-9]*_', f'_res-{resolution}_', label)
return label
[docs]def sub_list_filter_by_labels(sub_list, labels):
"""Function to filter a sub_list by provided BIDS labels for
specified suffixes
Parameters
----------
sub_list : list
labels : dict
labels['T1w'] : str or None
C-PAC currently only uses a single T1w image
labels['bold'] : str, list, or None
Returns
-------
list
"""
if labels.get('T1w'):
sub_list = _sub_list_filter_by_label(sub_list, 'T1w', labels['T1w'])
if labels.get('bold'):
labels['bold'] = cl_strip_brackets(labels['bold'])
sub_list = _sub_list_filter_by_label(sub_list, 'bold', labels['bold'])
return sub_list
[docs]def with_key(entity: str, key: str) -> str:
"""Return a keyed BIDS entity
Parameters
----------
entity, key : str
Returns
-------
str
Examples
--------
>>> with_key('sub-1', 'sub')
'sub-1'
>>> with_key('1', 'sub')
'sub-1'
"""
if not isinstance(entity, str):
entity = str(entity)
if not entity.startswith(f'{key}-'):
entity = '-'.join((key, entity))
return entity
[docs]def without_key(entity: str, key: str) -> str:
"""Return a BIDS entity value
Parameters
----------
entity, key : str
Returns
-------
str
Examples
--------
>>> without_key('sub-1', 'sub')
'1'
>>> without_key('1', 'sub')
'1'
"""
if not isinstance(entity, str):
entity = str(entity)
if entity.startswith(f'{key}-'):
entity = entity.replace(f'{key}-', '')
return entity
def _t1w_filter(anat, shortest_entity, label):
"""Helper function to filter T1w paths
Parameters
----------
anat: list or str
shortest_entity: bool
label: str
Returns
-------
anat: list
"""
if not isinstance(anat, list):
anat = [anat]
if shortest_entity:
anat = bids_shortest_entity(anat)
else:
anat = bids_match_entities(anat, label, 'T1w')
# pylint: disable=invalid-name
try:
anat_T2 = bids_match_entities(anat, label, 'T2w')
except LookupError:
anat_T2 = None
if anat_T2 is not None:
anat = anat_T2
return anat
def _sub_anat_filter(anat, shortest_entity, label):
"""Helper function to filter anat paths in sub_list
Parameters
----------
anat : list or dict
shortest_entity : bool
label : str
Returns
-------
list or dict
same type as 'anat' parameter
"""
if isinstance(anat, dict):
if 'T1w' in anat:
anat['T1w'] = _t1w_filter(anat['T1w'],
shortest_entity,
label)
return anat
return _t1w_filter(anat, shortest_entity, label)
def _sub_list_filter_by_label(sub_list, label_type, label):
"""Function to filter a sub_list by a CLI-provided label.
Parameters
----------
sub_list : list
label_type : str
'T1w' or 'bold'
label : str or list
Returns
-------
list
Examples
--------
>>> from CPAC.pipeline.test.sample_data import sub_list
>>> _sub_list_filter_by_label(sub_list, 'bold', 'task-PEER1')[
... 0]['func'].keys()
dict_keys(['PEER1'])
"""
label_list = [label] if isinstance(label, str) else list(label)
new_sub_list = []
if label_type in label_list:
shortest_entity = True
label_list.remove(label_type)
else:
shortest_entity = False
if label_type == 'T1w':
for sub in [sub for sub in sub_list if 'anat' in sub]:
try:
sub['anat'] = _sub_anat_filter(sub['anat'],
shortest_entity,
label_list[0] if not
shortest_entity else None)
if sub['anat']:
new_sub_list.append(sub)
except LookupError as lookup_error:
warn(str(lookup_error))
elif label_type == 'bold':
for sub in [sub for sub in sub_list if 'func' in sub]:
try:
all_scans = [sub['func'][scan].get('scan') for
scan in sub['func']]
new_func = {}
for entities in label_list:
matched_scans = bids_match_entities(all_scans, entities,
label_type)
for scan in matched_scans:
new_func = {
**new_func,
**_match_functional_scan(sub['func'], scan)
}
if shortest_entity:
new_func = {
**new_func,
**_match_functional_scan(
sub['func'], bids_shortest_entity(all_scans)
)
}
sub['func'] = new_func
new_sub_list.append(sub)
except LookupError as lookup_error:
warn(str(lookup_error))
return new_sub_list
def _match_functional_scan(sub_list_func_dict, scan_file_to_match):
"""Function to subset a scan from a sub_list_func_dict by a scan filename
Parameters
---------
sub_list_func_dict : dict
sub_list[sub]['func']
scan_file_to_match : str
Returns
-------
dict
Examples
--------
>>> from CPAC.pipeline.test.sample_data import sub_list
>>> matched = _match_functional_scan(
... sub_list[0]['func'],
... '/fake/data/sub-0001/ses-NFB3/func/'
... 'sub-0001_ses-NFB3_task-PEER1_bold.nii.gz')
>>> matched.keys()
dict_keys(['PEER1'])
>>> all([key in matched['PEER1'] for key in [
... 'fmap_mag', 'fmap_phase', 'scan', 'scan_parameters'
... ]])
True
"""
return {
entity: sub_list_func_dict[entity] for entity in
sub_list_func_dict if
sub_list_func_dict[entity].get('scan') == scan_file_to_match
}