# Copyright (C) 2016-2024 C-PAC Developers
# This file is part of C-PAC.
# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
import json
import os
import re
import sys
from warnings import warn
from botocore.exceptions import BotoCoreError
import yaml
from CPAC.utils.monitoring import UTLOGGER
[docs]
def bids_decode_fname(file_path, dbg=False, raise_error=True):
f_dict = {}
fname = os.path.basename(file_path)
# first lets make sure that we know how to handle the file
if "nii" not in fname.lower() and "json" not in fname.lower():
msg = f"File ({fname}) does not appear to be a nifti or json file"
raise IOError(msg)
if dbg:
UTLOGGER.debug("parsing %s", file_path)
# first figure out if there is a site directory level, this isn't
# specified in BIDS currently, but hopefully will be in the future
file_path_vals = os.path.dirname(file_path).split("/")
sub = [s for s in file_path_vals if "sub-" in s]
if dbg:
UTLOGGER.debug("found subject %s in %s", sub, file_path_vals)
if len(sub) > 1:
UTLOGGER.debug(
"Odd that there is more than one subject directory in (%s), does the"
" filename conform to BIDS format?",
file_path,
)
if sub:
sub_ndx = file_path_vals.index(sub[0])
if sub_ndx > 0 and file_path_vals[sub_ndx - 1]:
if dbg:
UTLOGGER.debug("setting site to %s", file_path_vals[sub_ndx - 1])
f_dict["site"] = file_path_vals[sub_ndx - 1]
else:
f_dict["site"] = "none"
elif file_path_vals[-1]:
if dbg:
UTLOGGER.debug(
"looking for subject id didn't pan out settling for last subdir %s",
file_path_vals[-1],
)
f_dict["site"] = file_path_vals[-1]
else:
f_dict["site"] = "none"
f_dict["site"] = re.sub(r"[\s\-\_]+", "", f_dict["site"])
fname = fname.split(".")[0]
# convert the filename string into a dictionary to pull out the other
# key value pairs
for key_val_pair in fname.split("_"):
# if the chunk has the shape key-val store key: val in f_dict
if "-" in key_val_pair:
chunks = key_val_pair.split("-")
f_dict[chunks[0]] = "-".join(chunks[1:])
else:
f_dict["scantype"] = key_val_pair.split(".")[0]
if "scantype" not in f_dict:
msg = (
f"Filename ({fname}) does not appear to contain"
" scan type, does it conform to the BIDS format?"
)
if raise_error:
raise ValueError(msg)
else:
UTLOGGER.error(msg)
elif not f_dict["scantype"]:
msg = (
f"Filename ({fname}) does not appear to contain"
" scan type, does it conform to the BIDS format?"
)
if raise_error:
raise ValueError(msg)
else:
UTLOGGER.error(msg)
elif "bold" in f_dict["scantype"] and not f_dict["task"]:
msg = (
f"Filename ({fname}) is a BOLD file, but doesn't contain a task, does"
" it conform to the BIDS format?"
)
if raise_error:
raise ValueError(msg)
else:
UTLOGGER.error(msg)
return f_dict
[docs]
def bids_entities_from_filename(filename):
"""Function to collect a list of BIDS entities from a given
filename.
Parameters
----------
filename : str
Returns
-------
entities : list
Examples
--------
>>> bids_entities_from_filename(
... 's3://fake/data/sub-0001/ses-NFB3/func/'
... 'sub-0001_ses-NFB3_task-MSIT_bold.nii.gz')
['sub-0001', 'ses-NFB3', 'task-MSIT', 'bold']
"""
return (
(filename.split("/")[-1] if "/" in filename else filename)
.split(".")[0]
.split("_")
)
[docs]
def bids_match_entities(file_list, entities, suffix):
"""Function to subset a list of filepaths by a passed BIDS entity.
Parameters
----------
file_list : list of str
entities : str
BIDS entities joined by underscores (e.g., 'ses-001_task-PEER1')
suffix : str
BIDS suffix (e.g., 'bold', 'T1w')
Returns
-------
list of str
Examples
--------
>>> bids_match_entities([
... 's3://fake/data/sub-001_ses-001_task-MSIT_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
... ], 'task-PEER1', 'bold')
['s3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz']
>>> bids_match_entities([
... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
... ], 'PEER', 'bold')
Traceback (most recent call last):
LookupError: No match found for provided entity "PEER" in
- s3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz
- s3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz
Perhaps you meant one of these?
- task-PEER1
- task-PEER2
"""
matches = [
file
for file in file_list
if (
f"_{entities}_" in "_".join(bids_entities_from_filename(file))
and bids_entities_from_filename(file)[-1] == suffix
)
or bids_entities_from_filename(file)[-1] != suffix
]
if file_list and not matches:
pp_file_list = "\n".join([f"- {file}" for file in file_list])
error_message = " ".join(
[
"No match found for provided",
"entity" if len(entities.split("_")) == 1 else "entities",
f'"{entities}" in\n{pp_file_list}',
]
)
partial_matches = [
match.group()
for match in [
re.search(re.compile(f"[^_]*{entities}[^_]*"), file)
for file in file_list
]
if match is not None
]
if partial_matches:
if len(partial_matches) == 1:
error_message += f'\nPerhaps you meant "{partial_matches[0]}"?'
else:
error_message = "\n".join(
[
error_message,
"Perhaps you meant one of these?",
*[f"- {match}" for match in partial_matches],
]
)
raise LookupError(error_message)
return matches
[docs]
def bids_remove_entity(name, key):
"""Remove an entity from a BIDS string by key.
Parameters
----------
name : str
BIDS string to remove entity from
key : str
BIDS key of entity to remove
Returns
-------
str
BIDS name with entity removed
Examples
--------
>>> bids_remove_entity('atlas-Yeo_space-MNI152NLin6_res-2x2x2', 'space')
'atlas-Yeo_res-2x2x2'
>>> bids_remove_entity('atlas-Yeo_space-MNI152NLin6_res-2x2x2', 'res')
'atlas-Yeo_space-MNI152NLin6'
"""
return "_".join(
entity
for entity in bids_entities_from_filename(name)
if not entity.startswith(f'{key.rstrip("-")}-')
)
[docs]
def bids_retrieve_params(bids_config_dict, f_dict, dbg=False):
"""
Retrieve the BIDS parameters from bids_config_dict for BIDS file
corresponding to f_dict. If an exact match for f_dict is not found
the nearest match is returned, corresponding to the BIDS inheritance
principle.
:param bids_config_dict: BIDS configuration dictionary, this is a
multi-level dictionary that maps the components of a bids filename
(i.e. sub, ses, acq, run) to a dictionary that contains the BIDS
parameters (RepetitionTime, EchoTime, etc). This information is
extracted from sidecar json files using the principle of inheritance
using the bids_parse_configs function
:param f_dict: Dictionary built from the name of a file in the BIDS
format. This is built using the bids_decode_fname by splitting on
"-" and "_" delimeters
:param dbg: boolean flag that indicates whether or not debug statements
should be printed, defaults to "False"
:return: returns a dictionary that contains the BIDS parameters
"""
params = {}
t_dict = bids_config_dict # pointer to current dictionary
# try to populate the configuration using information
# already in the list
for level in ["scantype", "site", "sub", "ses", "task", "acq", "rec", "dir", "run"]:
if level in f_dict:
key = "-".join([level, f_dict[level]])
else:
key = "-".join([level, "none"])
if dbg:
UTLOGGER.debug(key)
# if the key doesn't exist in the config dictionary, check to see if
# the generic key exists and return that
if key in t_dict:
t_dict = t_dict[key]
else:
if dbg:
UTLOGGER.debug(
"Couldn't find %s, so going with %s", key, "-".join([level, "none"])
)
key = "-".join([level, "none"])
if key in t_dict:
t_dict = t_dict[key]
# if we have an image parameter dictionary at this level, use it to
# initialize our configuration we look for "RepetitionTime", because
# according to the spec it is a mandatory parameter for JSON
# sidecar files
if dbg:
UTLOGGER.debug(t_dict)
for key in t_dict.keys():
if "RepetitionTime" in key:
params = t_dict
break
for k, v in params.items():
if isinstance(v, str):
params[k] = v.encode("ascii", errors="ignore")
return params
[docs]
def bids_parse_sidecar(config_dict, dbg=False, raise_error=True):
# type: (dict, bool) -> dict
"""
Uses the BIDS principle of inheritance to build a data structure that
maps parameters in side car .json files to components in the names of
corresponding nifti files.
:param config_dict: dictionary that maps paths of sidecar json files
(the key) to a dictionary containing the contents of the files (the values)
:param dbg: boolean flag that indicates whether or not debug statements
should be printed
:return: a dictionary that maps parameters to components from BIDS filenames
such as sub, sess, run, acq, and scan type
"""
# we are going to build a large-scale data structure, consisting of many
# levels of dictionaries to hold the data.
bids_config_dict = {}
# initialize 'default' entries, this essentially is a pointer traversal
# of the dictionary
t_dict = bids_config_dict
for level in ["scantype", "site", "sub", "ses", "task", "acq", "rec", "dir", "run"]:
key = "-".join([level, "none"])
t_dict[key] = {}
t_dict = t_dict[key]
if dbg:
UTLOGGER.debug(bids_config_dict)
# get the paths to the json yaml files in config_dict, the paths contain
# the information needed to map the parameters from the jsons (the vals
# of the config_dict) to corresponding nifti files. We sort the list
# by the number of path components, so that we can iterate from the outer
# most path to inner-most, which will help us address the BIDS inheritance
# principle
config_paths = sorted(config_dict.keys(), key=lambda p: len(p.split("/")))
if dbg:
UTLOGGER.debug(config_paths)
for cp in config_paths:
if dbg:
UTLOGGER.debug("processing %s", cp)
# decode the filepath into its various components as defined by BIDS
f_dict = bids_decode_fname(cp, raise_error=raise_error)
# handling inheritance is a complete pain, we will try to handle it by
# build the key from the bottom up, starting with the most
# parsimonious possible, incorporating configuration information that
# exists at each level
# first lets try to find any parameters that already apply at this
# level using the information in the json's file path
t_params = bids_retrieve_params(bids_config_dict, f_dict)
# now populate the parameters
bids_config = {}
if t_params:
bids_config.update(t_params)
# add in the information from this config file
t_config = config_dict[cp]
if t_config is list:
t_config = t_config[0]
try:
bids_config.update(t_config)
except ValueError:
err = (
"\n[!] Could not properly parse the AWS S3 path provided "
"- please double-check the bucket and the path.\n\nNote: "
"This could either be an issue with the path or the way "
"the data is organized in the directory. You can also "
"try providing a specific site sub-directory.\n\n"
)
raise ValueError(err)
# now put the configuration in the data structure, by first iterating
# to the location of the key, and then inserting it. When a key isn't
# defined we use the "none" value. A "none" indicates that the
# corresponding parameters apply to all possible settings of that key
# e.g. run-1, run-2, ... will all map to run-none if no jsons
# explicitly define values for those runs
t_dict = bids_config_dict # pointer to current dictionary
for level in [
"scantype",
"site",
"sub",
"ses",
"task",
"acq",
"rec",
"dir",
"run",
]:
if level in f_dict:
key = "-".join([level, f_dict[level]])
else:
key = "-".join([level, "none"])
if key not in t_dict:
t_dict[key] = {}
t_dict = t_dict[key]
t_dict.update(bids_config)
return bids_config_dict
[docs]
def bids_shortest_entity(file_list):
"""Function to return the single file with the shortest chain of
BIDS entities from a given list, returning the first if more than
one have the same minimum length.
Parameters
----------
file_list : list of strings
Returns
-------
str or None
Examples
--------
>>> bids_shortest_entity([
... 's3://fake/data/sub-001_ses-001_task-MSIT_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
... ])
's3://fake/data/sub-001_ses-001_bold.nii.gz'
"""
entity_lists = [bids_entities_from_filename(filename) for filename in file_list]
if not entity_lists:
return None
shortest_len = min(len(entity_list) for entity_list in entity_lists)
shortest_list = [
file_list[i]
for i in range(len(file_list))
if len(entity_lists[i]) == shortest_len
]
return shortest_list[0] if len(shortest_list) == 1 else shortest_list
[docs]
def gen_bids_outputs_sublist(base_path, paths_list, key_list, creds_path):
import copy
func_keys = [
"functional_to_anat_linear_xfm",
"motion_params",
"movement_parameters",
"motion_correct",
]
top_keys = list(set(key_list) - set(func_keys))
bot_keys = list(set(key_list).intersection(func_keys))
subjdict = {}
if not base_path.endswith("/"):
base_path = base_path + "/"
# output directories are a bit different than standard BIDS, so
# we handle things differently
for p in paths_list:
p = p.rstrip()
# find the participant and session info which should be at
# some level in the path
path_base = p.replace(base_path, "")
subj_info = path_base.split("/")[0]
resource = path_base.split("/")[1]
if resource not in key_list:
continue
if subj_info not in subjdict:
subjdict[subj_info] = {"subj_info": subj_info}
if creds_path:
subjdict[subj_info]["creds_path"] = creds_path
if resource in func_keys:
run_info = path_base.split("/")[2]
if "funcs" not in subjdict[subj_info]:
subjdict[subj_info]["funcs"] = {}
if run_info not in subjdict[subj_info]["funcs"]:
subjdict[subj_info]["funcs"][run_info] = {"run_info": run_info}
if resource in subjdict[subj_info]["funcs"][run_info]:
UTLOGGER.warning("resource %s already exists in subjdict ??", resource)
subjdict[subj_info]["funcs"][run_info][resource] = p
else:
subjdict[subj_info][resource] = p
sublist = []
for subj_info, subj_res in subjdict.items():
missing = 0
for tkey in top_keys:
if tkey not in subj_res:
UTLOGGER.warning("%s not found for %s", tkey, subj_info)
missing += 1
break
if missing == 0:
for func_key, func_res in subj_res["funcs"].items():
for bkey in bot_keys:
if bkey not in func_res:
UTLOGGER.warning("%s not found for %s", bkey, func_key)
missing += 1
break
if missing == 0:
UTLOGGER.info(
"adding: %s, %s, %d", subj_info, func_key, len(sublist)
)
tdict = copy.deepcopy(subj_res)
del tdict["funcs"]
tdict.update(func_res)
sublist.append(tdict)
return sublist
[docs]
def bids_gen_cpac_sublist(
bids_dir,
paths_list,
config_dict,
creds_path,
dbg=False,
raise_error=True,
only_one_anat=True,
):
"""
Generates a CPAC formatted subject list from information contained in a
BIDS formatted set of data.
Parameters
----------
bids_dir : str
base directory that contains all of the data, this could be a
directory that contains data for a multiple BIDS datasets, in
which case the intervening directories will be interpreted as
site names
paths_list : str
lists of all nifti files found in bids_dir, these paths are
relative to bids_dir
config_dict : dict
dictionary that contains information from the JSON sidecars
found in bids_dir, keys are relative paths and values are
dictionaries containing all of the parameter information. if
config_dict is None, the subject list will be built without the
parameters
creds_path : str
if using S3 bucket, this path credentials needed to access the
bucket, if accessing anonymous bucket, this can be set to None
dbg : bool
indicating whether or not the debug statements should be
printed
raise_error : bool
only_one_anat : bool
The "anat" key for a subject expects a string value, but we can
temporarily store a list instead by passing True here if we
will be filtering that list down to a single string later
Returns
-------
list
a list of dictionaries suitable for use by CPAC to specify data
to be processed
"""
if dbg:
UTLOGGER.debug(
"gen_bids_sublist called with:\n bids_dir: %s\n # paths: %s"
"\n config_dict: %s\n creds_path: %s",
bids_dir,
len(paths_list),
"missing" if not config_dict else "found",
creds_path,
)
# if configuration information is not desired, config_dict will be empty,
# otherwise parse the information in the sidecar json files into a dict
# we can use to extract data for our nifti files
if config_dict:
bids_config_dict = bids_parse_sidecar(config_dict, raise_error=raise_error)
subdict = {}
for p in paths_list:
if bids_dir in p:
str_list = p.split(bids_dir)
val = str_list[0]
val = val.rsplit("/")
val = val[0]
else:
str_list = p.split("/")
val = str_list[0]
if "sub-" not in val:
continue
p = p.rstrip()
f = os.path.basename(p)
if f.endswith(".nii") or f.endswith(".nii.gz"):
f_dict = bids_decode_fname(p, raise_error=raise_error)
if config_dict:
t_params = bids_retrieve_params(bids_config_dict, f_dict)
if not t_params:
UTLOGGER.warning(
"Did not receive any parameters for %s, is this a problem?", p
)
task_info = {
"scan": os.path.join(bids_dir, p),
"scan_parameters": t_params.copy(),
}
else:
task_info = os.path.join(bids_dir, p)
if "ses" not in f_dict:
f_dict["ses"] = "1"
if "sub" not in f_dict:
raise IOError(
"sub not found in %s," % (p) + " perhaps it isn't in BIDS format?"
)
if f_dict["sub"] not in subdict:
subdict[f_dict["sub"]] = {}
subjid = "-".join(["sub", f_dict["sub"]])
if f_dict["ses"] not in subdict[f_dict["sub"]]:
subdict[f_dict["sub"]][f_dict["ses"]] = {
"creds_path": creds_path,
"site_id": "-".join(["site", f_dict["site"]]),
"subject_id": subjid,
"unique_id": "-".join(["ses", f_dict["ses"]]),
}
if "T1w" in f_dict["scantype"] or "T2w" in f_dict["scantype"]:
if "lesion" in f_dict.keys() and "mask" in f_dict["lesion"]:
if "lesion_mask" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["lesion_mask"] = (
task_info["scan"]
)
else:
UTLOGGER.warning(
"Lesion mask file (%s) already found for (%s:%s)"
" discarding %s",
subdict[f_dict["sub"]][f_dict["ses"]]["lesion_mask"],
f_dict["sub"],
f_dict["ses"],
p,
)
# TODO deal with scan parameters anatomical
if "anat" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"] = {}
if (
f_dict["scantype"]
not in subdict[f_dict["sub"]][f_dict["ses"]]["anat"]
):
if only_one_anat:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
f_dict["scantype"]
] = task_info["scan"] if config_dict else task_info
else:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
f_dict["scantype"]
] = []
if not only_one_anat:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
f_dict["scantype"]
].append(task_info["scan"] if config_dict else task_info)
if "bold" in f_dict["scantype"]:
task_key = f_dict["task"]
if "run" in f_dict:
task_key = "_".join([task_key, "-".join(["run", f_dict["run"]])])
if "acq" in f_dict:
task_key = "_".join([task_key, "-".join(["acq", f_dict["acq"]])])
if "func" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["func"] = {}
if task_key not in subdict[f_dict["sub"]][f_dict["ses"]]["func"]:
if not isinstance(task_info, dict):
task_info = {"scan": task_info}
subdict[f_dict["sub"]][f_dict["ses"]]["func"][task_key] = task_info
else:
UTLOGGER.warning(
"Func file (%s) already found for (%s: %s: %s) discarding %s",
subdict[f_dict["sub"]][f_dict["ses"]]["func"][task_key],
f_dict["sub"],
f_dict["ses"],
task_key,
p,
)
if "phase" in f_dict["scantype"]:
if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
if (
f_dict["scantype"]
not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]
):
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][
f_dict["scantype"]
] = task_info
if "magnitude" in f_dict["scantype"]:
if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
if (
f_dict["scantype"]
not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]
):
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][
f_dict["scantype"]
] = task_info
if "epi" in f_dict["scantype"]:
pe_dir = f_dict["dir"]
if "acq" in f_dict:
if "fMRI" in f_dict["acq"]:
if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
if (
f"epi_{pe_dir}"
not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]
):
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][
f"epi_{pe_dir}"
] = task_info
sublist = []
for ksub, sub in subdict.items():
for kses, ses in sub.items():
if "anat" in ses or "func" in ses:
sublist.append(ses)
else:
if "anat" not in ses:
UTLOGGER.warning(
"%s %s %s is missing an anat",
ses["site_id"] if "none" not in ses["site_id"] else "",
ses["subject_id"],
ses["unique_id"],
)
if "func" not in ses:
UTLOGGER.warning(
"%s %s %s is missing a func",
ses["site_id"] if "none" not in ses["site_id"] else "",
ses["subject_id"],
ses["unique_id"],
)
return sublist
[docs]
def collect_bids_files_configs(bids_dir, aws_input_creds=""):
"""
:param bids_dir:
:param aws_input_creds:
:return:
"""
file_paths = []
config_dict = {}
suffixes = [
"T1w",
"T2w",
"bold",
"epi",
"phasediff",
"phase1",
"phase2",
"magnitude",
"magnitude1",
"magnitude2",
]
if bids_dir.lower().startswith("s3://"):
# s3 paths begin with s3://bucket/
bucket_name = bids_dir.split("/")[2]
s3_prefix = "/".join(bids_dir.split("/")[:3])
prefix = bids_dir.replace(s3_prefix, "").lstrip("/")
if aws_input_creds:
if not os.path.isfile(aws_input_creds):
raise IOError("Could not find aws_input_creds (%s)" % (aws_input_creds))
from indi_aws import fetch_creds
bucket = fetch_creds.return_bucket(aws_input_creds, bucket_name)
UTLOGGER.info("gathering files from S3 bucket (%s) for %s", bucket, prefix)
for s3_obj in bucket.objects.filter(Prefix=prefix):
for suf in suffixes:
if suf in str(s3_obj.key):
if suf == "epi" and "acq-fMRI" not in s3_obj.key:
continue
if str(s3_obj.key).endswith("json"):
try:
config_dict[s3_obj.key.replace(prefix, "").lstrip("/")] = (
json.loads(s3_obj.get()["Body"].read())
)
except Exception as e:
msg = (
f"Error retrieving {s3_obj.key.replace(prefix, '')}"
f" ({e.message})"
)
raise BotoCoreError(msg) from e
elif "nii" in str(s3_obj.key):
file_paths.append(
str(s3_obj.key).replace(prefix, "").lstrip("/")
)
else:
for root, dirs, files in os.walk(bids_dir, topdown=False, followlinks=True):
if files:
for f in files:
for suf in suffixes:
if suf == "epi" and "acq-fMRI" not in f:
continue
if "nii" in f and suf in f:
file_paths += [
os.path.join(root, f).replace(bids_dir, "").lstrip("/")
]
if f.endswith("json") and suf in f:
try:
config_dict.update(
{
os.path.join(
root.replace(bids_dir, "").lstrip("/"), f
): json.load(open(os.path.join(root, f), "r"))
}
)
except UnicodeDecodeError:
msg = f"Could not decode {os.path.join(root, f)}"
raise UnicodeDecodeError(msg)
if not file_paths and not config_dict:
msg = (
f"Didn't find any files in {bids_dir}. Please verify that the path is"
" typed correctly, that you have read access to the directory, and that it"
" is not empty."
)
raise IOError(msg)
return file_paths, config_dict
[docs]
def camelCase(string: str) -> str: # pylint: disable=invalid-name
"""Convert a hyphenated string to camelCase.
Parameters
----------
string : str
string to convert to camelCase
Returns
-------
str
Examples
--------
>>> camelCase('PearsonNilearn-aCompCor')
'PearsonNilearnACompCor'
>>> camelCase('mean-Pearson-Nilearn-aCompCor')
'meanPearsonNilearnACompCor'
"""
pieces = string.split("-")
for i in range(1, len(pieces)): # don't change case of first piece
if pieces[i]: # don't do anything to falsy pieces
pieces[i] = f"{pieces[i][0].upper()}{pieces[i][1:]}"
return "".join(pieces)
[docs]
def combine_multiple_entity_instances(bids_str: str) -> str:
"""Combines mutliple instances of a key in a BIDS string to a single
instance by camelCasing and concatenating the values.
Parameters
----------
bids_str : str
Returns
-------
str
Examples
--------
>>> combine_multiple_entity_instances(
... 'sub-1_ses-HBN_site-RU_task-rest_atlas-AAL_'
... 'desc-Nilearn_desc-36-param_suffix.ext')
'sub-1_ses-HBN_site-RU_task-rest_atlas-AAL_desc-Nilearn36Param_suffix.ext'
>>> combine_multiple_entity_instances(
... 'sub-1_ses-HBN_site-RU_task-rest_'
... 'run-1_framewise-displacement-power.1D')
'sub-1_ses-HBN_site-RU_task-rest_run-1_framewiseDisplacementPower.1D'
"""
_entity_list = bids_str.split("_")
entity_list = _entity_list[:-1]
suffixes = [camelCase(_entity_list[-1])]
entities = {}
for entity in entity_list:
if "-" in entity:
key, value = entity.split("-", maxsplit=1)
if key not in entities:
entities[key] = []
entities[key].append(value)
for key, value in entities.items():
entities[key] = camelCase("-".join(value))
if "desc" in entities: # make 'desc' final entity
suffixes.insert(0, f'desc-{entities.pop("desc")}')
return "_".join([f"{key}-{value}" for key, value in entities.items()] + suffixes)
[docs]
def insert_entity(resource, key, value):
"""Insert a `f'{key}-{value}'` BIDS entity before `desc-` if
present or before the suffix otherwise.
Parameters
----------
resource, key, value : str
Returns
-------
str
Examples
--------
>>> insert_entity('run-1_desc-preproc_bold', 'reg', 'default')
'run-1_reg-default_desc-preproc_bold'
>>> insert_entity('run-1_bold', 'reg', 'default')
'run-1_reg-default_bold'
>>> insert_entity('run-1_desc-preproc_bold', 'filt', 'notch4c0p31bw0p12')
'run-1_filt-notch4c0p31bw0p12_desc-preproc_bold'
>>> insert_entity('run-1_reg-default_bold', 'filt', 'notch4c0p31bw0p12')
'run-1_reg-default_filt-notch4c0p31bw0p12_bold'
"""
entities = resource.split("_")[:-1]
suff = resource.split("_")[-1]
new_entities = [[], []]
for entity in entities:
if entity.startswith("desc-"):
new_entities[1].append(entity)
else:
new_entities[0].append(entity)
return "_".join([*new_entities[0], f"{key}-{value}", *new_entities[1], suff])
[docs]
def load_yaml_config(config_filename, aws_input_creds):
if config_filename.lower().startswith("data:"):
try:
header, encoded = config_filename.split(",", 1)
config_content = b64decode(encoded)
return yaml.safe_load(config_content)
except:
msg = f"Error! Could not find load config from data URI {config_filename}"
raise BotoCoreError(msg)
if config_filename.lower().startswith("s3://"):
# s3 paths begin with s3://bucket/
bucket_name = config_filename.split("/")[2]
s3_prefix = "/".join(config_filename.split("/")[:3])
prefix = config_filename.replace(s3_prefix, "").lstrip("/")
if aws_input_creds:
if not os.path.isfile(aws_input_creds):
raise IOError("Could not find aws_input_creds (%s)" % (aws_input_creds))
from indi_aws import fetch_creds
bucket = fetch_creds.return_bucket(aws_input_creds, bucket_name)
downloaded_config = "/tmp/" + os.path.basename(config_filename)
bucket.download_file(prefix, downloaded_config)
config_filename = downloaded_config
config_filename = os.path.realpath(config_filename)
try:
return yaml.safe_load(open(config_filename, "r"))
except IOError:
msg = f"Error! Could not find config file {config_filename}"
raise FileNotFoundError(msg)
[docs]
def cl_strip_brackets(arg_list):
"""Removes '[' from before first and ']' from after final
arguments in a list of commandline arguments.
Parameters
----------
arg_list : list
Returns
-------
list
Examples
--------
>>> cl_strip_brackets('[a b c]'.split(' '))
['a', 'b', 'c']
>>> cl_strip_brackets('a b c'.split(' '))
['a', 'b', 'c']
>>> cl_strip_brackets('[ a b c ]'.split(' '))
['a', 'b', 'c']
"""
arg_list[0] = arg_list[0].lstrip("[")
arg_list[-1] = arg_list[-1].rstrip("]")
return [arg for arg in arg_list if arg]
[docs]
def create_cpac_data_config(
bids_dir,
participant_labels=None,
aws_input_creds=None,
skip_bids_validator=False,
only_one_anat=True,
):
"""
Create a C-PAC data config YAML file from a BIDS directory.
Parameters
----------
bids_dir : str
participant_labels : list or None
aws_input_creds
skip_bids_validator : bool
only_one_anat : bool
The "anat" key for a subject expects a string value, but we
can temporarily store a list instead by passing True here if
we will be filtering that list down to a single string later
Returns
-------
list
"""
UTLOGGER.info("Parsing %s..", bids_dir)
(file_paths, config) = collect_bids_files_configs(bids_dir, aws_input_creds)
if participant_labels and file_paths:
file_paths = [
file_path
for file_path in file_paths
if any(
participant_label in file_path
for participant_label in participant_labels
)
]
if not file_paths:
UTLOGGER.error("Did not find data for %s", ", ".join(participant_labels))
sys.exit(1)
raise_error = not skip_bids_validator
sub_list = bids_gen_cpac_sublist(
bids_dir,
file_paths,
config,
aws_input_creds,
raise_error=raise_error,
only_one_anat=only_one_anat,
)
if not sub_list:
UTLOGGER.error("Did not find data in %s", bids_dir)
sys.exit(1)
return sub_list
[docs]
def load_cpac_data_config(data_config_file, participant_labels, aws_input_creds):
"""
Loads the file as a check to make sure it is available and readable.
Parameters
----------
data_config_file : str
path to data config
participants_labels : list or None
aws_input_creds
Returns
-------
list
"""
sub_list = load_yaml_config(data_config_file, aws_input_creds)
if participant_labels:
sub_list = [
d
for d in sub_list
if (
d["subject_id"]
if d["subject_id"].startswith("sub-")
else "sub-" + d["subject_id"]
)
in participant_labels
]
if not sub_list:
UTLOGGER.error(
"Did not find data for %s in %s",
", ".join(participant_labels),
data_config_file
if not data_config_file.startswith("data:")
else "data URI",
)
sys.exit(1)
return sub_list
[docs]
def res_in_filename(cfg, label):
"""Specify resolution in filename.
Parameters
----------
cfg : CPAC.utils.configuration.Configuration
label : str
Returns
-------
label : str
Examples
--------
>>> from CPAC.utils.configuration import Configuration
>>> res_in_filename(Configuration({
... 'registration_workflows': {
... 'anatomical_registration': {'resolution_for_anat': '2x2x2'}}}),
... 'sub-1_res-anat_bold')
'sub-1_res-2x2x2_bold'
>>> res_in_filename(Configuration({
... 'registration_workflows': {
... 'anatomical_registration': {'resolution_for_anat': '2x2x2'}}}),
... 'sub-1_res-3mm_bold')
'sub-1_res-3mm_bold'
"""
if "_res-" in label:
# replace resolution text with actual resolution
resolution = label.split("_res-", 1)[1].split("_", 1)[0]
resolution = {
"anat": cfg[
"registration_workflows",
"anatomical_registration",
"resolution_for_anat",
],
"bold": cfg[
"registration_workflows",
"functional_registration",
"func_registration_to_template",
"output_resolution",
"func_preproc_outputs",
],
"derivative": cfg[
"registration_workflows",
"functional_registration",
"func_registration_to_template",
"output_resolution",
"func_derivative_outputs",
],
}.get(resolution, resolution)
label = re.sub("_res-[A-Za-z0-9]*_", f"_res-{resolution}_", label)
return label
[docs]
def sub_list_filter_by_labels(sub_list, labels):
"""Function to filter a sub_list by provided BIDS labels for
specified suffixes.
Parameters
----------
sub_list : list
labels : dict
labels['T1w'] : str or None
C-PAC currently only uses a single T1w image
labels['bold'] : str, list, or None
Returns
-------
list
"""
if labels.get("T1w"):
sub_list = _sub_list_filter_by_label(sub_list, "T1w", labels["T1w"])
if labels.get("bold"):
labels["bold"] = cl_strip_brackets(labels["bold"])
sub_list = _sub_list_filter_by_label(sub_list, "bold", labels["bold"])
return sub_list
[docs]
def with_key(entity: str, key: str) -> str:
"""Return a keyed BIDS entity.
Parameters
----------
entity, key : str
Returns
-------
str
Examples
--------
>>> with_key('sub-1', 'sub')
'sub-1'
>>> with_key('1', 'sub')
'sub-1'
"""
if not isinstance(entity, str):
entity = str(entity)
if not entity.startswith(f"{key}-"):
entity = "-".join((key, entity))
return entity
[docs]
def without_key(entity: str, key: str) -> str:
"""Return a BIDS entity value.
Parameters
----------
entity, key : str
Returns
-------
str
Examples
--------
>>> without_key('sub-1', 'sub')
'1'
>>> without_key('1', 'sub')
'1'
"""
if not isinstance(entity, str):
entity = str(entity)
if entity.startswith(f"{key}-"):
entity = entity.replace(f"{key}-", "")
return entity
def _t1w_filter(anat, shortest_entity, label):
"""Helper function to filter T1w paths.
Parameters
----------
anat: list or str
shortest_entity: bool
label: str
Returns
-------
anat: list
"""
if not isinstance(anat, list):
anat = [anat]
if shortest_entity:
anat = bids_shortest_entity(anat)
else:
anat = bids_match_entities(anat, label, "T1w")
# pylint: disable=invalid-name
try:
anat_T2 = bids_match_entities(anat, label, "T2w")
except LookupError:
anat_T2 = None
if anat_T2 is not None:
anat = anat_T2
return anat
def _sub_anat_filter(anat, shortest_entity, label):
"""Helper function to filter anat paths in sub_list.
Parameters
----------
anat : list or dict
shortest_entity : bool
label : str
Returns
-------
list or dict
same type as 'anat' parameter
"""
if isinstance(anat, dict):
if "T1w" in anat:
anat["T1w"] = _t1w_filter(anat["T1w"], shortest_entity, label)
return anat
return _t1w_filter(anat, shortest_entity, label)
def _sub_list_filter_by_label(sub_list, label_type, label):
"""Function to filter a sub_list by a CLI-provided label.
Parameters
----------
sub_list : list
label_type : str
'T1w' or 'bold'
label : str or list
Returns
-------
list
Examples
--------
>>> from CPAC.pipeline.test.sample_data import sub_list
>>> _sub_list_filter_by_label(sub_list, 'bold', 'task-PEER1')[
... 0]['func'].keys()
dict_keys(['PEER1'])
"""
label_list = [label] if isinstance(label, str) else list(label)
new_sub_list = []
if label_type in label_list:
shortest_entity = True
label_list.remove(label_type)
else:
shortest_entity = False
if label_type == "T1w":
for sub in [sub for sub in sub_list if "anat" in sub]:
try:
sub["anat"] = _sub_anat_filter(
sub["anat"],
shortest_entity,
label_list[0] if not shortest_entity else None,
)
if sub["anat"]:
new_sub_list.append(sub)
except LookupError as lookup_error:
warn(str(lookup_error))
elif label_type == "bold":
for sub in [sub for sub in sub_list if "func" in sub]:
try:
all_scans = [sub["func"][scan].get("scan") for scan in sub["func"]]
new_func = {}
for entities in label_list:
matched_scans = bids_match_entities(all_scans, entities, label_type)
for scan in matched_scans:
new_func = {
**new_func,
**_match_functional_scan(sub["func"], scan),
}
if shortest_entity:
new_func = {
**new_func,
**_match_functional_scan(
sub["func"], bids_shortest_entity(all_scans)
),
}
sub["func"] = new_func
new_sub_list.append(sub)
except LookupError as lookup_error:
warn(str(lookup_error))
return new_sub_list
def _match_functional_scan(sub_list_func_dict, scan_file_to_match):
"""Function to subset a scan from a sub_list_func_dict by a scan filename.
Parameters
----------
sub_list_func_dict : dict
sub_list[sub]['func']
scan_file_to_match : str
Returns
-------
dict
Examples
--------
>>> from CPAC.pipeline.test.sample_data import sub_list
>>> matched = _match_functional_scan(
... sub_list[0]['func'],
... '/fake/data/sub-0001/ses-NFB3/func/'
... 'sub-0001_ses-NFB3_task-PEER1_bold.nii.gz')
>>> matched.keys()
dict_keys(['PEER1'])
>>> all([key in matched['PEER1'] for key in [
... 'fmap_mag', 'fmap_phase', 'scan', 'scan_parameters'
... ]])
True
"""
return {
entity: sub_list_func_dict[entity]
for entity in sub_list_func_dict
if sub_list_func_dict[entity].get("scan") == scan_file_to_match
}