# Copyright (C) 2016-2024 C-PAC Developers
# This file is part of C-PAC.
# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
from base64 import b64decode
from collections.abc import Iterable
import json
import os
import re
import sys
from typing import Any, Callable, Optional
from warnings import warn
from botocore.exceptions import BotoCoreError
import yaml
from CPAC.utils.monitoring import UTLOGGER
[docs]
class SpecifiedBotoCoreError(BotoCoreError):
"""Specified :py:class:`~botocore.exceptions.BotoCoreError`."""
def __init__(self, msg: str, *args, **kwargs) -> None:
"""Initialize BotoCoreError with message."""
msg = msg.format(**kwargs)
Exception.__init__(self, msg)
self.kwargs = kwargs
[docs]
def bids_decode_fname(file_path, dbg=False, raise_error=True):
f_dict = {}
fname = os.path.basename(file_path)
# first lets make sure that we know how to handle the file
if "nii" not in fname.lower() and "json" not in fname.lower():
msg = f"File ({fname}) does not appear to be a nifti or json file"
raise IOError(msg)
if dbg:
UTLOGGER.debug("parsing %s", file_path)
# first figure out if there is a site directory level, this isn't
# specified in BIDS currently, but hopefully will be in the future
file_path_vals = os.path.dirname(file_path).split("/")
sub = [s for s in file_path_vals if "sub-" in s]
if dbg:
UTLOGGER.debug("found subject %s in %s", sub, file_path_vals)
if len(sub) > 1:
UTLOGGER.debug(
"Odd that there is more than one subject directory in (%s), does the"
" filename conform to BIDS format?",
file_path,
)
if sub:
sub_ndx = file_path_vals.index(sub[0])
if sub_ndx > 0 and file_path_vals[sub_ndx - 1]:
if dbg:
UTLOGGER.debug("setting site to %s", file_path_vals[sub_ndx - 1])
f_dict["site"] = file_path_vals[sub_ndx - 1]
else:
f_dict["site"] = "none"
elif file_path_vals[-1]:
if dbg:
UTLOGGER.debug(
"looking for subject id didn't pan out settling for last subdir %s",
file_path_vals[-1],
)
f_dict["site"] = file_path_vals[-1]
else:
f_dict["site"] = "none"
f_dict["site"] = re.sub(r"[\s\-\_]+", "", f_dict["site"])
fname = fname.split(".")[0]
# convert the filename string into a dictionary to pull out the other
# key value pairs
for key_val_pair in fname.split("_"):
# if the chunk has the shape key-val store key: val in f_dict
if "-" in key_val_pair:
chunks = key_val_pair.split("-")
f_dict[chunks[0]] = "-".join(chunks[1:])
else:
f_dict["scantype"] = key_val_pair.split(".")[0]
if "scantype" not in f_dict:
msg = (
f"Filename ({fname}) does not appear to contain"
" scan type, does it conform to the BIDS format?"
)
if raise_error:
raise ValueError(msg)
else:
UTLOGGER.error(msg)
elif not f_dict["scantype"]:
msg = (
f"Filename ({fname}) does not appear to contain"
" scan type, does it conform to the BIDS format?"
)
if raise_error:
raise ValueError(msg)
else:
UTLOGGER.error(msg)
elif "bold" in f_dict["scantype"] and not f_dict["task"]:
msg = (
f"Filename ({fname}) is a BOLD file, but doesn't contain a task, does"
" it conform to the BIDS format?"
)
if raise_error:
raise ValueError(msg)
else:
UTLOGGER.error(msg)
return f_dict
[docs]
def bids_entities_from_filename(filename):
"""Function to collect a list of BIDS entities from a given
filename.
Parameters
----------
filename : str
Returns
-------
entities : list
Examples
--------
>>> bids_entities_from_filename(
... 's3://fake/data/sub-0001/ses-NFB3/func/'
... 'sub-0001_ses-NFB3_task-MSIT_bold.nii.gz')
['sub-0001', 'ses-NFB3', 'task-MSIT', 'bold']
"""
return (
(filename.split("/")[-1] if "/" in filename else filename)
.split(".")[0]
.split("_")
)
[docs]
def bids_match_entities(file_list, entities, suffix):
"""Function to subset a list of filepaths by a passed BIDS entity.
Parameters
----------
file_list : list of str
entities : str
BIDS entities joined by underscores (e.g., 'ses-001_task-PEER1')
suffix : str
BIDS suffix (e.g., 'bold', 'T1w')
Returns
-------
list of str
Examples
--------
>>> bids_match_entities([
... 's3://fake/data/sub-001_ses-001_task-MSIT_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
... ], 'task-PEER1', 'bold')
['s3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz']
>>> bids_match_entities([
... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
... ], 'PEER', 'bold')
Traceback (most recent call last):
LookupError: No match found for provided entity "PEER" in
- s3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz
- s3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz
Perhaps you meant one of these?
- task-PEER1
- task-PEER2
"""
matches = [
file
for file in file_list
if (
f"_{entities}_" in "_".join(bids_entities_from_filename(file))
and bids_entities_from_filename(file)[-1] == suffix
)
or bids_entities_from_filename(file)[-1] != suffix
]
if file_list and not matches:
pp_file_list = "\n".join([f"- {file}" for file in file_list])
error_message = " ".join(
[
"No match found for provided",
"entity" if len(entities.split("_")) == 1 else "entities",
f'"{entities}" in\n{pp_file_list}',
]
)
partial_matches = [
match.group()
for match in [
re.search(re.compile(f"[^_]*{entities}[^_]*"), file)
for file in file_list
]
if match is not None
]
if partial_matches:
if len(partial_matches) == 1:
error_message += f'\nPerhaps you meant "{partial_matches[0]}"?'
else:
error_message = "\n".join(
[
error_message,
"Perhaps you meant one of these?",
*[f"- {match}" for match in partial_matches],
]
)
raise LookupError(error_message)
return matches
[docs]
def bids_remove_entity(name, key):
"""Remove an entity from a BIDS string by key.
Parameters
----------
name : str
BIDS string to remove entity from
key : str
BIDS key of entity to remove
Returns
-------
str
BIDS name with entity removed
Examples
--------
>>> bids_remove_entity('atlas-Yeo_space-MNI152NLin6_res-2x2x2', 'space')
'atlas-Yeo_res-2x2x2'
>>> bids_remove_entity('atlas-Yeo_space-MNI152NLin6_res-2x2x2', 'res')
'atlas-Yeo_space-MNI152NLin6'
"""
return "_".join(
entity
for entity in bids_entities_from_filename(name)
if not entity.startswith(f'{key.rstrip("-")}-')
)
[docs]
def bids_retrieve_params(bids_config_dict, f_dict, dbg=False):
"""
Retrieve the BIDS parameters from bids_config_dict for BIDS file
corresponding to f_dict. If an exact match for f_dict is not found
the nearest match is returned, corresponding to the BIDS inheritance
principle.
:param bids_config_dict: BIDS configuration dictionary, this is a
multi-level dictionary that maps the components of a bids filename
(i.e. sub, ses, acq, run) to a dictionary that contains the BIDS
parameters (RepetitionTime, EchoTime, etc). This information is
extracted from sidecar json files using the principle of inheritance
using the bids_parse_configs function
:param f_dict: Dictionary built from the name of a file in the BIDS
format. This is built using the bids_decode_fname by splitting on
"-" and "_" delimeters
:param dbg: boolean flag that indicates whether or not debug statements
should be printed, defaults to "False"
:return: returns a dictionary that contains the BIDS parameters
"""
params = {}
t_dict = bids_config_dict # pointer to current dictionary
# try to populate the configuration using information
# already in the list
for level in ["scantype", "site", "sub", "ses", "task", "acq", "rec", "dir", "run"]:
if level in f_dict:
key = "-".join([level, f_dict[level]])
else:
key = "-".join([level, "none"])
if dbg:
UTLOGGER.debug(key)
# if the key doesn't exist in the config dictionary, check to see if
# the generic key exists and return that
if key in t_dict:
t_dict = t_dict[key]
else:
if dbg:
UTLOGGER.debug(
"Couldn't find %s, so going with %s", key, "-".join([level, "none"])
)
key = "-".join([level, "none"])
if key in t_dict:
t_dict = t_dict[key]
# if we have an image parameter dictionary at this level, use it to
# initialize our configuration we look for "RepetitionTime", because
# according to the spec it is a mandatory parameter for JSON
# sidecar files
if dbg:
UTLOGGER.debug(t_dict)
for key in t_dict.keys():
if "RepetitionTime" in key:
params = t_dict
break
for k, v in params.items():
if isinstance(v, str):
params[k] = v.encode("ascii", errors="ignore")
return params
[docs]
def bids_parse_sidecar(config_dict, dbg=False, raise_error=True):
# type: (dict, bool) -> dict
"""
Uses the BIDS principle of inheritance to build a data structure that
maps parameters in side car .json files to components in the names of
corresponding nifti files.
:param config_dict: dictionary that maps paths of sidecar json files
(the key) to a dictionary containing the contents of the files (the values)
:param dbg: boolean flag that indicates whether or not debug statements
should be printed
:return: a dictionary that maps parameters to components from BIDS filenames
such as sub, sess, run, acq, and scan type
"""
# we are going to build a large-scale data structure, consisting of many
# levels of dictionaries to hold the data.
bids_config_dict = {}
# initialize 'default' entries, this essentially is a pointer traversal
# of the dictionary
t_dict = bids_config_dict
for level in ["scantype", "site", "sub", "ses", "task", "acq", "rec", "dir", "run"]:
key = "-".join([level, "none"])
t_dict[key] = {}
t_dict = t_dict[key]
if dbg:
UTLOGGER.debug(bids_config_dict)
# get the paths to the json yaml files in config_dict, the paths contain
# the information needed to map the parameters from the jsons (the vals
# of the config_dict) to corresponding nifti files. We sort the list
# by the number of path components, so that we can iterate from the outer
# most path to inner-most, which will help us address the BIDS inheritance
# principle
config_paths = sorted(config_dict.keys(), key=lambda p: len(p.split("/")))
if dbg:
UTLOGGER.debug(config_paths)
for cp in config_paths:
if dbg:
UTLOGGER.debug("processing %s", cp)
# decode the filepath into its various components as defined by BIDS
f_dict = bids_decode_fname(cp, raise_error=raise_error)
# handling inheritance is a complete pain, we will try to handle it by
# build the key from the bottom up, starting with the most
# parsimonious possible, incorporating configuration information that
# exists at each level
# first lets try to find any parameters that already apply at this
# level using the information in the json's file path
t_params = bids_retrieve_params(bids_config_dict, f_dict)
# now populate the parameters
bids_config = {}
if t_params:
bids_config.update(t_params)
# add in the information from this config file
t_config = config_dict[cp]
if t_config is list:
t_config = t_config[0]
try:
bids_config.update(t_config)
except ValueError:
err = (
"\n[!] Could not properly parse the AWS S3 path provided "
"- please double-check the bucket and the path.\n\nNote: "
"This could either be an issue with the path or the way "
"the data is organized in the directory. You can also "
"try providing a specific site sub-directory.\n\n"
)
raise ValueError(err)
# now put the configuration in the data structure, by first iterating
# to the location of the key, and then inserting it. When a key isn't
# defined we use the "none" value. A "none" indicates that the
# corresponding parameters apply to all possible settings of that key
# e.g. run-1, run-2, ... will all map to run-none if no jsons
# explicitly define values for those runs
t_dict = bids_config_dict # pointer to current dictionary
for level in [
"scantype",
"site",
"sub",
"ses",
"task",
"acq",
"rec",
"dir",
"run",
]:
if level in f_dict:
key = "-".join([level, f_dict[level]])
else:
key = "-".join([level, "none"])
if key not in t_dict:
t_dict[key] = {}
t_dict = t_dict[key]
t_dict.update(bids_config)
return bids_config_dict
[docs]
def bids_shortest_entity(file_list):
"""Function to return the single file with the shortest chain of
BIDS entities from a given list, returning the first if more than
one have the same minimum length.
Parameters
----------
file_list : list of strings
Returns
-------
str or None
Examples
--------
>>> bids_shortest_entity([
... 's3://fake/data/sub-001_ses-001_task-MSIT_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz',
... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz'
... ])
's3://fake/data/sub-001_ses-001_bold.nii.gz'
"""
entity_lists = [bids_entities_from_filename(filename) for filename in file_list]
if not entity_lists:
return None
shortest_len = min(len(entity_list) for entity_list in entity_lists)
shortest_list = [
file_list[i]
for i in range(len(file_list))
if len(entity_lists[i]) == shortest_len
]
return shortest_list[0] if len(shortest_list) == 1 else shortest_list
[docs]
def gen_bids_outputs_sublist(base_path, paths_list, key_list, creds_path):
import copy
func_keys = [
"functional_to_anat_linear_xfm",
"motion_params",
"movement_parameters",
"motion_correct",
]
top_keys = list(set(key_list) - set(func_keys))
bot_keys = list(set(key_list).intersection(func_keys))
subjdict = {}
if not base_path.endswith("/"):
base_path = base_path + "/"
# output directories are a bit different than standard BIDS, so
# we handle things differently
for p in paths_list:
p = p.rstrip()
# find the participant and session info which should be at
# some level in the path
path_base = p.replace(base_path, "")
subj_info = path_base.split("/")[0]
resource = path_base.split("/")[1]
if resource not in key_list:
continue
if subj_info not in subjdict:
subjdict[subj_info] = {"subj_info": subj_info}
if creds_path:
subjdict[subj_info]["creds_path"] = creds_path
if resource in func_keys:
run_info = path_base.split("/")[2]
if "funcs" not in subjdict[subj_info]:
subjdict[subj_info]["funcs"] = {}
if run_info not in subjdict[subj_info]["funcs"]:
subjdict[subj_info]["funcs"][run_info] = {"run_info": run_info}
if resource in subjdict[subj_info]["funcs"][run_info]:
UTLOGGER.warning("resource %s already exists in subjdict ??", resource)
subjdict[subj_info]["funcs"][run_info][resource] = p
else:
subjdict[subj_info][resource] = p
sublist = []
for subj_info, subj_res in subjdict.items():
missing = 0
for tkey in top_keys:
if tkey not in subj_res:
UTLOGGER.warning("%s not found for %s", tkey, subj_info)
missing += 1
break
if missing == 0:
for func_key, func_res in subj_res["funcs"].items():
for bkey in bot_keys:
if bkey not in func_res:
UTLOGGER.warning("%s not found for %s", bkey, func_key)
missing += 1
break
if missing == 0:
UTLOGGER.info(
"adding: %s, %s, %d", subj_info, func_key, len(sublist)
)
tdict = copy.deepcopy(subj_res)
del tdict["funcs"]
tdict.update(func_res)
sublist.append(tdict)
return sublist
[docs]
def bids_gen_cpac_sublist(
bids_dir,
paths_list,
config_dict,
creds_path,
dbg=False,
raise_error=True,
only_one_anat=True,
):
"""
Generates a CPAC formatted subject list from information contained in a
BIDS formatted set of data.
Parameters
----------
bids_dir : str
base directory that contains all of the data, this could be a
directory that contains data for a multiple BIDS datasets, in
which case the intervening directories will be interpreted as
site names
paths_list : str
lists of all nifti files found in bids_dir, these paths are
relative to bids_dir
config_dict : dict
dictionary that contains information from the JSON sidecars
found in bids_dir, keys are relative paths and values are
dictionaries containing all of the parameter information. if
config_dict is None, the subject list will be built without the
parameters
creds_path : str
if using S3 bucket, this path credentials needed to access the
bucket, if accessing anonymous bucket, this can be set to None
dbg : bool
indicating whether or not the debug statements should be
printed
raise_error : bool
only_one_anat : bool
The "anat" key for a subject expects a string value, but we can
temporarily store a list instead by passing True here if we
will be filtering that list down to a single string later
Returns
-------
list
a list of dictionaries suitable for use by CPAC to specify data
to be processed
"""
if dbg:
UTLOGGER.debug(
"gen_bids_sublist called with:\n bids_dir: %s\n # paths: %s"
"\n config_dict: %s\n creds_path: %s",
bids_dir,
len(paths_list),
"missing" if not config_dict else "found",
creds_path,
)
# if configuration information is not desired, config_dict will be empty,
# otherwise parse the information in the sidecar json files into a dict
# we can use to extract data for our nifti files
if config_dict:
bids_config_dict = bids_parse_sidecar(config_dict, raise_error=raise_error)
subdict = {}
for p in paths_list:
if bids_dir in p:
str_list = p.split(bids_dir)
val = str_list[0]
val = val.rsplit("/")
val = val[0]
else:
str_list = p.split("/")
val = str_list[0]
if "sub-" not in val:
continue
p = p.rstrip()
f = os.path.basename(p)
if f.endswith(".nii") or f.endswith(".nii.gz"):
f_dict = bids_decode_fname(p, raise_error=raise_error)
if config_dict:
t_params = bids_retrieve_params(bids_config_dict, f_dict)
if not t_params:
UTLOGGER.warning(
"Did not receive any parameters for %s, is this a problem?", p
)
task_info = {
"scan": os.path.join(bids_dir, p),
"scan_parameters": t_params.copy(),
}
else:
task_info = os.path.join(bids_dir, p)
if "ses" not in f_dict:
f_dict["ses"] = "1"
if "sub" not in f_dict:
raise IOError(
"sub not found in %s," % (p) + " perhaps it isn't in BIDS format?"
)
if f_dict["sub"] not in subdict:
subdict[f_dict["sub"]] = {}
subjid = "-".join(["sub", f_dict["sub"]])
if f_dict["ses"] not in subdict[f_dict["sub"]]:
subdict[f_dict["sub"]][f_dict["ses"]] = {
"creds_path": creds_path,
"site_id": "-".join(["site", f_dict["site"]]),
"subject_id": subjid,
"unique_id": "-".join(["ses", f_dict["ses"]]),
}
if "T1w" in f_dict["scantype"] or "T2w" in f_dict["scantype"]:
if "lesion" in f_dict.keys() and "mask" in f_dict["lesion"]:
if "lesion_mask" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["lesion_mask"] = (
task_info["scan"]
)
else:
UTLOGGER.warning(
"Lesion mask file (%s) already found for (%s:%s)"
" discarding %s",
subdict[f_dict["sub"]][f_dict["ses"]]["lesion_mask"],
f_dict["sub"],
f_dict["ses"],
p,
)
# TODO deal with scan parameters anatomical
if "anat" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"] = {}
if (
f_dict["scantype"]
not in subdict[f_dict["sub"]][f_dict["ses"]]["anat"]
):
if only_one_anat:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
f_dict["scantype"]
] = task_info["scan"] if config_dict else task_info
else:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
f_dict["scantype"]
] = []
if not only_one_anat:
subdict[f_dict["sub"]][f_dict["ses"]]["anat"][
f_dict["scantype"]
].append(task_info["scan"] if config_dict else task_info)
if "bold" in f_dict["scantype"]:
task_key = f_dict["task"]
if "run" in f_dict:
task_key = "_".join([task_key, "-".join(["run", f_dict["run"]])])
if "acq" in f_dict:
task_key = "_".join([task_key, "-".join(["acq", f_dict["acq"]])])
if "func" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["func"] = {}
if task_key not in subdict[f_dict["sub"]][f_dict["ses"]]["func"]:
if not isinstance(task_info, dict):
task_info = {"scan": task_info}
subdict[f_dict["sub"]][f_dict["ses"]]["func"][task_key] = task_info
else:
UTLOGGER.warning(
"Func file (%s) already found for (%s: %s: %s) discarding %s",
subdict[f_dict["sub"]][f_dict["ses"]]["func"][task_key],
f_dict["sub"],
f_dict["ses"],
task_key,
p,
)
if "phase" in f_dict["scantype"]:
if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
if (
f_dict["scantype"]
not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]
):
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][
f_dict["scantype"]
] = task_info
if "magnitude" in f_dict["scantype"]:
if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
if (
f_dict["scantype"]
not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]
):
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][
f_dict["scantype"]
] = task_info
if "epi" in f_dict["scantype"]:
pe_dir = f_dict["dir"]
if "acq" in f_dict:
if "fMRI" in f_dict["acq"]:
if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]:
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {}
if (
f"epi_{pe_dir}"
not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"]
):
subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][
f"epi_{pe_dir}"
] = task_info
sublist = []
for ksub, sub in subdict.items():
for kses, ses in sub.items():
if "anat" in ses or "func" in ses:
sublist.append(ses)
else:
if "anat" not in ses:
UTLOGGER.warning(
"%s %s %s is missing an anat",
ses["site_id"] if "none" not in ses["site_id"] else "",
ses["subject_id"],
ses["unique_id"],
)
if "func" not in ses:
UTLOGGER.warning(
"%s %s %s is missing a func",
ses["site_id"] if "none" not in ses["site_id"] else "",
ses["subject_id"],
ses["unique_id"],
)
return sublist
[docs]
def collect_bids_files_configs(bids_dir, aws_input_creds=""):
"""
:param bids_dir:
:param aws_input_creds:
:return:
"""
file_paths = []
config_dict = {}
suffixes = [
"T1w",
"T2w",
"bold",
"epi",
"phasediff",
"phase1",
"phase2",
"magnitude",
"magnitude1",
"magnitude2",
]
if bids_dir.lower().startswith("s3://"):
# s3 paths begin with s3://bucket/
bucket_name = bids_dir.split("/")[2]
s3_prefix = "/".join(bids_dir.split("/")[:3])
prefix = bids_dir.replace(s3_prefix, "").lstrip("/")
if aws_input_creds:
if not os.path.isfile(aws_input_creds):
raise IOError("Could not find aws_input_creds (%s)" % (aws_input_creds))
from indi_aws import fetch_creds
bucket = fetch_creds.return_bucket(aws_input_creds, bucket_name)
UTLOGGER.info("gathering files from S3 bucket (%s) for %s", bucket, prefix)
for s3_obj in bucket.objects.filter(Prefix=prefix):
for suf in suffixes:
if suf in str(s3_obj.key):
if suf == "epi" and "acq-fMRI" not in s3_obj.key:
continue
if str(s3_obj.key).endswith("json"):
try:
config_dict[s3_obj.key.replace(prefix, "").lstrip("/")] = (
json.loads(s3_obj.get()["Body"].read())
)
except Exception as e:
msg = (
f"Error retrieving {s3_obj.key.replace(prefix, '')}"
f" ({e.message})"
)
raise SpecifiedBotoCoreError(msg) from e
elif "nii" in str(s3_obj.key):
file_paths.append(
str(s3_obj.key).replace(prefix, "").lstrip("/")
)
else:
for root, dirs, files in os.walk(bids_dir, topdown=False, followlinks=True):
if files:
for f in files:
for suf in suffixes:
if suf == "epi" and "acq-fMRI" not in f:
continue
if "nii" in f and suf in f:
file_paths += [
os.path.join(root, f).replace(bids_dir, "").lstrip("/")
]
if f.endswith("json") and suf in f:
try:
config_dict.update(
{
os.path.join(
root.replace(bids_dir, "").lstrip("/"), f
): json.load(open(os.path.join(root, f), "r"))
}
)
except UnicodeDecodeError as unicode_decode_error:
msg = f"Could not decode {os.path.join(root, f)}"
raise UnicodeDecodeError(
unicode_decode_error.encoding,
unicode_decode_error.object,
unicode_decode_error.start,
unicode_decode_error.end,
msg,
)
if not file_paths and not config_dict:
msg = (
f"Didn't find any files in {bids_dir}. Please verify that the path is"
" typed correctly, that you have read access to the directory, and that it"
" is not empty."
)
raise IOError(msg)
return file_paths, config_dict
[docs]
def camelCase(string: str) -> str: # pylint: disable=invalid-name
"""Convert a hyphenated string to camelCase.
Parameters
----------
string : str
string to convert to camelCase
Returns
-------
str
Examples
--------
>>> camelCase('PearsonNilearn-aCompCor')
'PearsonNilearnACompCor'
>>> camelCase('mean-Pearson-Nilearn-aCompCor')
'meanPearsonNilearnACompCor'
"""
pieces = string.split("-")
for i in range(1, len(pieces)): # don't change case of first piece
if pieces[i]: # don't do anything to falsy pieces
pieces[i] = f"{pieces[i][0].upper()}{pieces[i][1:]}"
return "".join(pieces)
[docs]
def combine_multiple_entity_instances(bids_str: str) -> str:
"""Combines mutliple instances of a key in a BIDS string to a single
instance by camelCasing and concatenating the values.
Parameters
----------
bids_str : str
Returns
-------
str
Examples
--------
>>> combine_multiple_entity_instances(
... 'sub-1_ses-HBN_site-RU_task-rest_atlas-AAL_'
... 'desc-Nilearn_desc-36-param_suffix.ext')
'sub-1_ses-HBN_site-RU_task-rest_atlas-AAL_desc-Nilearn36Param_suffix.ext'
>>> combine_multiple_entity_instances(
... 'sub-1_ses-HBN_site-RU_task-rest_'
... 'run-1_framewise-displacement-power.1D')
'sub-1_ses-HBN_site-RU_task-rest_run-1_framewiseDisplacementPower.1D'
"""
_entity_list = bids_str.split("_")
entity_list = _entity_list[:-1]
suffixes = [camelCase(_entity_list[-1])]
entities = {}
for entity in entity_list:
if "-" in entity:
key, value = entity.split("-", maxsplit=1)
if key not in entities:
entities[key] = []
entities[key].append(value)
for key, value in entities.items():
entities[key] = camelCase("-".join(value))
if "desc" in entities: # make 'desc' final entity
suffixes.insert(0, f'desc-{entities.pop("desc")}')
return "_".join([f"{key}-{value}" for key, value in entities.items()] + suffixes)
[docs]
def insert_entity(resource, key, value):
"""Insert a `f'{key}-{value}'` BIDS entity before `desc-` if
present or before the suffix otherwise.
Parameters
----------
resource, key, value : str
Returns
-------
str
Examples
--------
>>> insert_entity('run-1_desc-preproc_bold', 'reg', 'default')
'run-1_reg-default_desc-preproc_bold'
>>> insert_entity('run-1_bold', 'reg', 'default')
'run-1_reg-default_bold'
>>> insert_entity('run-1_desc-preproc_bold', 'filt', 'notch4c0p31bw0p12')
'run-1_filt-notch4c0p31bw0p12_desc-preproc_bold'
>>> insert_entity('run-1_reg-default_bold', 'filt', 'notch4c0p31bw0p12')
'run-1_reg-default_filt-notch4c0p31bw0p12_bold'
"""
entities = resource.split("_")[:-1]
suff = resource.split("_")[-1]
new_entities = [[], []]
for entity in entities:
if entity.startswith("desc-"):
new_entities[1].append(entity)
else:
new_entities[0].append(entity)
return "_".join([*new_entities[0], f"{key}-{value}", *new_entities[1], suff])
[docs]
def apply_modifications(
yaml_contents: str, modifications: Optional[list[Callable[[str], str]]]
) -> str:
"""Apply modification functions to YAML contents"""
if modifications:
for modification in modifications:
yaml_contents = modification(yaml_contents)
return yaml_contents
[docs]
def load_yaml_config(
config_filename: str,
aws_input_creds,
modifications: Optional[list[Callable[[str], str]]] = None,
) -> dict | list | str:
"""Load a YAML config file, possibly from AWS, with modifications applied.
`modifications` should be a list of functions that take a single string argument (the loaded YAML contents) and return a single string argument (the modified YAML contents).
"""
if config_filename.lower().startswith("data:"):
try:
_header, encoded = config_filename.split(",", 1)
config_content = apply_modifications(
b64decode(encoded).decode("utf-8"), modifications
)
return yaml.safe_load(config_content)
except Exception:
msg = f"Error! Could not find load config from data URI {config_filename}"
raise SpecifiedBotoCoreError(msg=msg)
if config_filename.lower().startswith("s3://"):
# s3 paths begin with s3://bucket/
bucket_name = config_filename.split("/")[2]
s3_prefix = "/".join(config_filename.split("/")[:3])
prefix = config_filename.replace(s3_prefix, "").lstrip("/")
if aws_input_creds:
if not os.path.isfile(aws_input_creds):
raise IOError("Could not find aws_input_creds (%s)" % (aws_input_creds))
from indi_aws import fetch_creds
bucket = fetch_creds.return_bucket(aws_input_creds, bucket_name)
downloaded_config = "/tmp/" + os.path.basename(config_filename)
bucket.download_file(prefix, downloaded_config)
config_filename = downloaded_config
config_filename = os.path.realpath(config_filename)
try:
with open(config_filename, "r") as _f:
return yaml.safe_load(apply_modifications(_f.read(), modifications))
except IOError:
msg = f"Error! Could not find config file {config_filename}"
raise FileNotFoundError(msg)
[docs]
def cl_strip_brackets(arg_list):
"""Removes '[' from before first and ']' from after final
arguments in a list of commandline arguments.
Parameters
----------
arg_list : list
Returns
-------
list
Examples
--------
>>> cl_strip_brackets('[a b c]'.split(' '))
['a', 'b', 'c']
>>> cl_strip_brackets('a b c'.split(' '))
['a', 'b', 'c']
>>> cl_strip_brackets('[ a b c ]'.split(' '))
['a', 'b', 'c']
"""
arg_list[0] = arg_list[0].lstrip("[")
arg_list[-1] = arg_list[-1].rstrip("]")
return [arg for arg in arg_list if arg]
[docs]
def create_cpac_data_config(
bids_dir,
participant_labels=None,
aws_input_creds=None,
skip_bids_validator=False,
only_one_anat=True,
):
"""
Create a C-PAC data config YAML file from a BIDS directory.
Parameters
----------
bids_dir : str
participant_labels : list or None
aws_input_creds
skip_bids_validator : bool
only_one_anat : bool
The "anat" key for a subject expects a string value, but we
can temporarily store a list instead by passing True here if
we will be filtering that list down to a single string later
Returns
-------
list
"""
UTLOGGER.info("Parsing %s..", bids_dir)
(file_paths, config) = collect_bids_files_configs(bids_dir, aws_input_creds)
if participant_labels and file_paths:
file_paths = [
file_path
for file_path in file_paths
if any(
participant_label in file_path
for participant_label in participant_labels
)
]
if not file_paths:
UTLOGGER.error("Did not find data for %s", ", ".join(participant_labels))
sys.exit(1)
raise_error = not skip_bids_validator
sub_list = bids_gen_cpac_sublist(
bids_dir,
file_paths,
config,
aws_input_creds,
raise_error=raise_error,
only_one_anat=only_one_anat,
)
if not sub_list:
UTLOGGER.error("Did not find data in %s", bids_dir)
sys.exit(1)
return sub_list
def _check_value_type(
sub_list: list[dict[str, Any]],
keys: list[str] = ["subject_id", "unique_id"],
value_type: type = int,
any_or_all: Callable[[Iterable], bool] = any,
) -> bool:
"""Check if any or all of a key in a sub_list is of a given type."""
return any_or_all(
isinstance(sub.get(key), value_type) for key in keys for sub in sub_list
)
[docs]
def coerce_data_config_strings(contents: str) -> str:
"""Coerge `subject_id` and `unique_id` to be strings."""
for key in ["subject_id: ", "unique_id: "]:
contents = re.sub(f"{key}(?!!!)", f"{key}!!str ", contents)
return contents.replace(": !!str !!", ": !!")
[docs]
def load_cpac_data_config(data_config_file, participant_labels, aws_input_creds):
"""
Loads the file as a check to make sure it is available and readable.
Parameters
----------
data_config_file : str
path to data config
participants_labels : list or None
aws_input_creds
Returns
-------
list
"""
sub_list: list[dict[str, str]] = load_yaml_config(
data_config_file, aws_input_creds, modifications=[coerce_data_config_strings]
)
if participant_labels:
sub_list = [
d
for d in sub_list
if (
d["subject_id"]
if d["subject_id"].startswith("sub-")
else "sub-" + d["subject_id"]
)
in participant_labels
]
if not sub_list:
UTLOGGER.error(
"Did not find data for %s in %s",
", ".join(participant_labels),
data_config_file
if not data_config_file.startswith("data:")
else "data URI",
)
sys.exit(1)
return sub_list
[docs]
def res_in_filename(cfg, label):
"""Specify resolution in filename.
Parameters
----------
cfg : CPAC.utils.configuration.Configuration
label : str
Returns
-------
label : str
Examples
--------
>>> from CPAC.utils.configuration import Configuration
>>> res_in_filename(Configuration({
... 'registration_workflows': {
... 'anatomical_registration': {'resolution_for_anat': '2x2x2'}}}),
... 'sub-1_res-anat_bold')
'sub-1_res-2x2x2_bold'
>>> res_in_filename(Configuration({
... 'registration_workflows': {
... 'anatomical_registration': {'resolution_for_anat': '2x2x2'}}}),
... 'sub-1_res-3mm_bold')
'sub-1_res-3mm_bold'
"""
if "_res-" in label:
# replace resolution text with actual resolution
resolution = label.split("_res-", 1)[1].split("_", 1)[0]
resolution = {
"anat": cfg[
"registration_workflows",
"anatomical_registration",
"resolution_for_anat",
],
"bold": cfg[
"registration_workflows",
"functional_registration",
"func_registration_to_template",
"output_resolution",
"func_preproc_outputs",
],
"derivative": cfg[
"registration_workflows",
"functional_registration",
"func_registration_to_template",
"output_resolution",
"func_derivative_outputs",
],
}.get(resolution, resolution)
label = re.sub("_res-[A-Za-z0-9]*_", f"_res-{resolution}_", label)
return label
[docs]
def sub_list_filter_by_labels(sub_list, labels):
"""Function to filter a sub_list by provided BIDS labels for
specified suffixes.
Parameters
----------
sub_list : list
labels : dict
labels['T1w'] : str or None
C-PAC currently only uses a single T1w image
labels['bold'] : str, list, or None
Returns
-------
list
"""
if labels.get("T1w"):
sub_list = _sub_list_filter_by_label(sub_list, "T1w", labels["T1w"])
if labels.get("bold"):
labels["bold"] = cl_strip_brackets(labels["bold"])
sub_list = _sub_list_filter_by_label(sub_list, "bold", labels["bold"])
return sub_list
[docs]
def with_key(entity: str, key: str) -> str:
"""Return a keyed BIDS entity.
Parameters
----------
entity, key : str
Returns
-------
str
Examples
--------
>>> with_key('sub-1', 'sub')
'sub-1'
>>> with_key('1', 'sub')
'sub-1'
"""
if not isinstance(entity, str):
entity = str(entity)
if not entity.startswith(f"{key}-"):
entity = "-".join((key, entity))
return entity
[docs]
def without_key(entity: str, key: str) -> str:
"""Return a BIDS entity value.
Parameters
----------
entity, key : str
Returns
-------
str
Examples
--------
>>> without_key('sub-1', 'sub')
'1'
>>> without_key('1', 'sub')
'1'
"""
if not isinstance(entity, str):
entity = str(entity)
if entity.startswith(f"{key}-"):
entity = entity.replace(f"{key}-", "")
return entity
def _t1w_filter(anat, shortest_entity, label):
"""Helper function to filter T1w paths.
Parameters
----------
anat: list or str
shortest_entity: bool
label: str
Returns
-------
anat: list
"""
if not isinstance(anat, list):
anat = [anat]
if shortest_entity:
anat = bids_shortest_entity(anat)
else:
anat = bids_match_entities(anat, label, "T1w")
# pylint: disable=invalid-name
try:
anat_T2 = bids_match_entities(anat, label, "T2w")
except LookupError:
anat_T2 = None
if anat_T2 is not None:
anat = anat_T2
return anat
def _sub_anat_filter(anat, shortest_entity, label):
"""Helper function to filter anat paths in sub_list.
Parameters
----------
anat : list or dict
shortest_entity : bool
label : str
Returns
-------
list or dict
same type as 'anat' parameter
"""
if isinstance(anat, dict):
if "T1w" in anat:
anat["T1w"] = _t1w_filter(anat["T1w"], shortest_entity, label)
return anat
return _t1w_filter(anat, shortest_entity, label)
def _sub_list_filter_by_label(sub_list, label_type, label):
"""Function to filter a sub_list by a CLI-provided label.
Parameters
----------
sub_list : list
label_type : str
'T1w' or 'bold'
label : str or list
Returns
-------
list
Examples
--------
>>> from CPAC.pipeline.test.sample_data import sub_list
>>> _sub_list_filter_by_label(sub_list, 'bold', 'task-PEER1')[
... 0]['func'].keys()
dict_keys(['PEER1'])
"""
label_list = [label] if isinstance(label, str) else list(label)
new_sub_list = []
if label_type in label_list:
shortest_entity = True
label_list.remove(label_type)
else:
shortest_entity = False
if label_type == "T1w":
for sub in [sub for sub in sub_list if "anat" in sub]:
try:
sub["anat"] = _sub_anat_filter(
sub["anat"],
shortest_entity,
label_list[0] if not shortest_entity else None,
)
if sub["anat"]:
new_sub_list.append(sub)
except LookupError as lookup_error:
warn(str(lookup_error))
elif label_type == "bold":
for sub in [sub for sub in sub_list if "func" in sub]:
try:
all_scans = [sub["func"][scan].get("scan") for scan in sub["func"]]
new_func = {}
for entities in label_list:
matched_scans = bids_match_entities(all_scans, entities, label_type)
for scan in matched_scans:
new_func = {
**new_func,
**_match_functional_scan(sub["func"], scan),
}
if shortest_entity:
new_func = {
**new_func,
**_match_functional_scan(
sub["func"], bids_shortest_entity(all_scans)
),
}
sub["func"] = new_func
new_sub_list.append(sub)
except LookupError as lookup_error:
warn(str(lookup_error))
return new_sub_list
def _match_functional_scan(sub_list_func_dict, scan_file_to_match):
"""Function to subset a scan from a sub_list_func_dict by a scan filename.
Parameters
----------
sub_list_func_dict : dict
sub_list[sub]['func']
scan_file_to_match : str
Returns
-------
dict
Examples
--------
>>> from CPAC.pipeline.test.sample_data import sub_list
>>> matched = _match_functional_scan(
... sub_list[0]['func'],
... '/fake/data/sub-0001/ses-NFB3/func/'
... 'sub-0001_ses-NFB3_task-PEER1_bold.nii.gz')
>>> matched.keys()
dict_keys(['PEER1'])
>>> all([key in matched['PEER1'] for key in [
... 'fmap_mag', 'fmap_phase', 'scan', 'scan_parameters'
... ]])
True
"""
return {
entity: sub_list_func_dict[entity]
for entity in sub_list_func_dict
if sub_list_func_dict[entity].get("scan") == scan_file_to_match
}