Source code for CPAC.utils.bids_utils

# Copyright (C) 2016-2024  C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
import json
import os
import re
import sys
from warnings import warn

from botocore.exceptions import BotoCoreError
import yaml

from CPAC.utils.monitoring import UTLOGGER


[docs] def bids_decode_fname(file_path, dbg=False, raise_error=True): f_dict = {} fname = os.path.basename(file_path) # first lets make sure that we know how to handle the file if "nii" not in fname.lower() and "json" not in fname.lower(): msg = f"File ({fname}) does not appear to be a nifti or json file" raise IOError(msg) if dbg: UTLOGGER.debug("parsing %s", file_path) # first figure out if there is a site directory level, this isn't # specified in BIDS currently, but hopefully will be in the future file_path_vals = os.path.dirname(file_path).split("/") sub = [s for s in file_path_vals if "sub-" in s] if dbg: UTLOGGER.debug("found subject %s in %s", sub, file_path_vals) if len(sub) > 1: UTLOGGER.debug( "Odd that there is more than one subject directory in (%s), does the" " filename conform to BIDS format?", file_path, ) if sub: sub_ndx = file_path_vals.index(sub[0]) if sub_ndx > 0 and file_path_vals[sub_ndx - 1]: if dbg: UTLOGGER.debug("setting site to %s", file_path_vals[sub_ndx - 1]) f_dict["site"] = file_path_vals[sub_ndx - 1] else: f_dict["site"] = "none" elif file_path_vals[-1]: if dbg: UTLOGGER.debug( "looking for subject id didn't pan out settling for last subdir %s", file_path_vals[-1], ) f_dict["site"] = file_path_vals[-1] else: f_dict["site"] = "none" f_dict["site"] = re.sub(r"[\s\-\_]+", "", f_dict["site"]) fname = fname.split(".")[0] # convert the filename string into a dictionary to pull out the other # key value pairs for key_val_pair in fname.split("_"): # if the chunk has the shape key-val store key: val in f_dict if "-" in key_val_pair: chunks = key_val_pair.split("-") f_dict[chunks[0]] = "-".join(chunks[1:]) else: f_dict["scantype"] = key_val_pair.split(".")[0] if "scantype" not in f_dict: msg = ( f"Filename ({fname}) does not appear to contain" " scan type, does it conform to the BIDS format?" ) if raise_error: raise ValueError(msg) else: UTLOGGER.error(msg) elif not f_dict["scantype"]: msg = ( f"Filename ({fname}) does not appear to contain" " scan type, does it conform to the BIDS format?" ) if raise_error: raise ValueError(msg) else: UTLOGGER.error(msg) elif "bold" in f_dict["scantype"] and not f_dict["task"]: msg = ( f"Filename ({fname}) is a BOLD file, but doesn't contain a task, does" " it conform to the BIDS format?" ) if raise_error: raise ValueError(msg) else: UTLOGGER.error(msg) return f_dict
[docs] def bids_entities_from_filename(filename): """Function to collect a list of BIDS entities from a given filename. Parameters ---------- filename : str Returns ------- entities : list Examples -------- >>> bids_entities_from_filename( ... 's3://fake/data/sub-0001/ses-NFB3/func/' ... 'sub-0001_ses-NFB3_task-MSIT_bold.nii.gz') ['sub-0001', 'ses-NFB3', 'task-MSIT', 'bold'] """ return ( (filename.split("/")[-1] if "/" in filename else filename) .split(".")[0] .split("_") )
[docs] def bids_match_entities(file_list, entities, suffix): """Function to subset a list of filepaths by a passed BIDS entity. Parameters ---------- file_list : list of str entities : str BIDS entities joined by underscores (e.g., 'ses-001_task-PEER1') suffix : str BIDS suffix (e.g., 'bold', 'T1w') Returns ------- list of str Examples -------- >>> bids_match_entities([ ... 's3://fake/data/sub-001_ses-001_task-MSIT_bold.nii.gz', ... 's3://fake/data/sub-001_ses-001_bold.nii.gz', ... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz', ... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz' ... ], 'task-PEER1', 'bold') ['s3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz'] >>> bids_match_entities([ ... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz', ... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz' ... ], 'PEER', 'bold') Traceback (most recent call last): LookupError: No match found for provided entity "PEER" in - s3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz - s3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz Perhaps you meant one of these? - task-PEER1 - task-PEER2 """ matches = [ file for file in file_list if ( f"_{entities}_" in "_".join(bids_entities_from_filename(file)) and bids_entities_from_filename(file)[-1] == suffix ) or bids_entities_from_filename(file)[-1] != suffix ] if file_list and not matches: pp_file_list = "\n".join([f"- {file}" for file in file_list]) error_message = " ".join( [ "No match found for provided", "entity" if len(entities.split("_")) == 1 else "entities", f'"{entities}" in\n{pp_file_list}', ] ) partial_matches = [ match.group() for match in [ re.search(re.compile(f"[^_]*{entities}[^_]*"), file) for file in file_list ] if match is not None ] if partial_matches: if len(partial_matches) == 1: error_message += f'\nPerhaps you meant "{partial_matches[0]}"?' else: error_message = "\n".join( [ error_message, "Perhaps you meant one of these?", *[f"- {match}" for match in partial_matches], ] ) raise LookupError(error_message) return matches
[docs] def bids_remove_entity(name, key): """Remove an entity from a BIDS string by key. Parameters ---------- name : str BIDS string to remove entity from key : str BIDS key of entity to remove Returns ------- str BIDS name with entity removed Examples -------- >>> bids_remove_entity('atlas-Yeo_space-MNI152NLin6_res-2x2x2', 'space') 'atlas-Yeo_res-2x2x2' >>> bids_remove_entity('atlas-Yeo_space-MNI152NLin6_res-2x2x2', 'res') 'atlas-Yeo_space-MNI152NLin6' """ return "_".join( entity for entity in bids_entities_from_filename(name) if not entity.startswith(f'{key.rstrip("-")}-') )
[docs] def bids_retrieve_params(bids_config_dict, f_dict, dbg=False): """ Retrieve the BIDS parameters from bids_config_dict for BIDS file corresponding to f_dict. If an exact match for f_dict is not found the nearest match is returned, corresponding to the BIDS inheritance principle. :param bids_config_dict: BIDS configuration dictionary, this is a multi-level dictionary that maps the components of a bids filename (i.e. sub, ses, acq, run) to a dictionary that contains the BIDS parameters (RepetitionTime, EchoTime, etc). This information is extracted from sidecar json files using the principle of inheritance using the bids_parse_configs function :param f_dict: Dictionary built from the name of a file in the BIDS format. This is built using the bids_decode_fname by splitting on "-" and "_" delimeters :param dbg: boolean flag that indicates whether or not debug statements should be printed, defaults to "False" :return: returns a dictionary that contains the BIDS parameters """ params = {} t_dict = bids_config_dict # pointer to current dictionary # try to populate the configuration using information # already in the list for level in ["scantype", "site", "sub", "ses", "task", "acq", "rec", "dir", "run"]: if level in f_dict: key = "-".join([level, f_dict[level]]) else: key = "-".join([level, "none"]) if dbg: UTLOGGER.debug(key) # if the key doesn't exist in the config dictionary, check to see if # the generic key exists and return that if key in t_dict: t_dict = t_dict[key] else: if dbg: UTLOGGER.debug( "Couldn't find %s, so going with %s", key, "-".join([level, "none"]) ) key = "-".join([level, "none"]) if key in t_dict: t_dict = t_dict[key] # if we have an image parameter dictionary at this level, use it to # initialize our configuration we look for "RepetitionTime", because # according to the spec it is a mandatory parameter for JSON # sidecar files if dbg: UTLOGGER.debug(t_dict) for key in t_dict.keys(): if "RepetitionTime" in key: params = t_dict break for k, v in params.items(): if isinstance(v, str): params[k] = v.encode("ascii", errors="ignore") return params
[docs] def bids_parse_sidecar(config_dict, dbg=False, raise_error=True): # type: (dict, bool) -> dict """ Uses the BIDS principle of inheritance to build a data structure that maps parameters in side car .json files to components in the names of corresponding nifti files. :param config_dict: dictionary that maps paths of sidecar json files (the key) to a dictionary containing the contents of the files (the values) :param dbg: boolean flag that indicates whether or not debug statements should be printed :return: a dictionary that maps parameters to components from BIDS filenames such as sub, sess, run, acq, and scan type """ # we are going to build a large-scale data structure, consisting of many # levels of dictionaries to hold the data. bids_config_dict = {} # initialize 'default' entries, this essentially is a pointer traversal # of the dictionary t_dict = bids_config_dict for level in ["scantype", "site", "sub", "ses", "task", "acq", "rec", "dir", "run"]: key = "-".join([level, "none"]) t_dict[key] = {} t_dict = t_dict[key] if dbg: UTLOGGER.debug(bids_config_dict) # get the paths to the json yaml files in config_dict, the paths contain # the information needed to map the parameters from the jsons (the vals # of the config_dict) to corresponding nifti files. We sort the list # by the number of path components, so that we can iterate from the outer # most path to inner-most, which will help us address the BIDS inheritance # principle config_paths = sorted(config_dict.keys(), key=lambda p: len(p.split("/"))) if dbg: UTLOGGER.debug(config_paths) for cp in config_paths: if dbg: UTLOGGER.debug("processing %s", cp) # decode the filepath into its various components as defined by BIDS f_dict = bids_decode_fname(cp, raise_error=raise_error) # handling inheritance is a complete pain, we will try to handle it by # build the key from the bottom up, starting with the most # parsimonious possible, incorporating configuration information that # exists at each level # first lets try to find any parameters that already apply at this # level using the information in the json's file path t_params = bids_retrieve_params(bids_config_dict, f_dict) # now populate the parameters bids_config = {} if t_params: bids_config.update(t_params) # add in the information from this config file t_config = config_dict[cp] if t_config is list: t_config = t_config[0] try: bids_config.update(t_config) except ValueError: err = ( "\n[!] Could not properly parse the AWS S3 path provided " "- please double-check the bucket and the path.\n\nNote: " "This could either be an issue with the path or the way " "the data is organized in the directory. You can also " "try providing a specific site sub-directory.\n\n" ) raise ValueError(err) # now put the configuration in the data structure, by first iterating # to the location of the key, and then inserting it. When a key isn't # defined we use the "none" value. A "none" indicates that the # corresponding parameters apply to all possible settings of that key # e.g. run-1, run-2, ... will all map to run-none if no jsons # explicitly define values for those runs t_dict = bids_config_dict # pointer to current dictionary for level in [ "scantype", "site", "sub", "ses", "task", "acq", "rec", "dir", "run", ]: if level in f_dict: key = "-".join([level, f_dict[level]]) else: key = "-".join([level, "none"]) if key not in t_dict: t_dict[key] = {} t_dict = t_dict[key] t_dict.update(bids_config) return bids_config_dict
[docs] def bids_shortest_entity(file_list): """Function to return the single file with the shortest chain of BIDS entities from a given list, returning the first if more than one have the same minimum length. Parameters ---------- file_list : list of strings Returns ------- str or None Examples -------- >>> bids_shortest_entity([ ... 's3://fake/data/sub-001_ses-001_task-MSIT_bold.nii.gz', ... 's3://fake/data/sub-001_ses-001_bold.nii.gz', ... 's3://fake/data/sub-001_ses-001_task-PEER1_bold.nii.gz', ... 's3://fake/data/sub-001_ses-001_task-PEER2_bold.nii.gz' ... ]) 's3://fake/data/sub-001_ses-001_bold.nii.gz' """ entity_lists = [bids_entities_from_filename(filename) for filename in file_list] if not entity_lists: return None shortest_len = min(len(entity_list) for entity_list in entity_lists) shortest_list = [ file_list[i] for i in range(len(file_list)) if len(entity_lists[i]) == shortest_len ] return shortest_list[0] if len(shortest_list) == 1 else shortest_list
[docs] def gen_bids_outputs_sublist(base_path, paths_list, key_list, creds_path): import copy func_keys = [ "functional_to_anat_linear_xfm", "motion_params", "movement_parameters", "motion_correct", ] top_keys = list(set(key_list) - set(func_keys)) bot_keys = list(set(key_list).intersection(func_keys)) subjdict = {} if not base_path.endswith("/"): base_path = base_path + "/" # output directories are a bit different than standard BIDS, so # we handle things differently for p in paths_list: p = p.rstrip() # find the participant and session info which should be at # some level in the path path_base = p.replace(base_path, "") subj_info = path_base.split("/")[0] resource = path_base.split("/")[1] if resource not in key_list: continue if subj_info not in subjdict: subjdict[subj_info] = {"subj_info": subj_info} if creds_path: subjdict[subj_info]["creds_path"] = creds_path if resource in func_keys: run_info = path_base.split("/")[2] if "funcs" not in subjdict[subj_info]: subjdict[subj_info]["funcs"] = {} if run_info not in subjdict[subj_info]["funcs"]: subjdict[subj_info]["funcs"][run_info] = {"run_info": run_info} if resource in subjdict[subj_info]["funcs"][run_info]: UTLOGGER.warning("resource %s already exists in subjdict ??", resource) subjdict[subj_info]["funcs"][run_info][resource] = p else: subjdict[subj_info][resource] = p sublist = [] for subj_info, subj_res in subjdict.items(): missing = 0 for tkey in top_keys: if tkey not in subj_res: UTLOGGER.warning("%s not found for %s", tkey, subj_info) missing += 1 break if missing == 0: for func_key, func_res in subj_res["funcs"].items(): for bkey in bot_keys: if bkey not in func_res: UTLOGGER.warning("%s not found for %s", bkey, func_key) missing += 1 break if missing == 0: UTLOGGER.info( "adding: %s, %s, %d", subj_info, func_key, len(sublist) ) tdict = copy.deepcopy(subj_res) del tdict["funcs"] tdict.update(func_res) sublist.append(tdict) return sublist
[docs] def bids_gen_cpac_sublist( bids_dir, paths_list, config_dict, creds_path, dbg=False, raise_error=True, only_one_anat=True, ): """ Generates a CPAC formatted subject list from information contained in a BIDS formatted set of data. Parameters ---------- bids_dir : str base directory that contains all of the data, this could be a directory that contains data for a multiple BIDS datasets, in which case the intervening directories will be interpreted as site names paths_list : str lists of all nifti files found in bids_dir, these paths are relative to bids_dir config_dict : dict dictionary that contains information from the JSON sidecars found in bids_dir, keys are relative paths and values are dictionaries containing all of the parameter information. if config_dict is None, the subject list will be built without the parameters creds_path : str if using S3 bucket, this path credentials needed to access the bucket, if accessing anonymous bucket, this can be set to None dbg : bool indicating whether or not the debug statements should be printed raise_error : bool only_one_anat : bool The "anat" key for a subject expects a string value, but we can temporarily store a list instead by passing True here if we will be filtering that list down to a single string later Returns ------- list a list of dictionaries suitable for use by CPAC to specify data to be processed """ if dbg: UTLOGGER.debug( "gen_bids_sublist called with:\n bids_dir: %s\n # paths: %s" "\n config_dict: %s\n creds_path: %s", bids_dir, len(paths_list), "missing" if not config_dict else "found", creds_path, ) # if configuration information is not desired, config_dict will be empty, # otherwise parse the information in the sidecar json files into a dict # we can use to extract data for our nifti files if config_dict: bids_config_dict = bids_parse_sidecar(config_dict, raise_error=raise_error) subdict = {} for p in paths_list: if bids_dir in p: str_list = p.split(bids_dir) val = str_list[0] val = val.rsplit("/") val = val[0] else: str_list = p.split("/") val = str_list[0] if "sub-" not in val: continue p = p.rstrip() f = os.path.basename(p) if f.endswith(".nii") or f.endswith(".nii.gz"): f_dict = bids_decode_fname(p, raise_error=raise_error) if config_dict: t_params = bids_retrieve_params(bids_config_dict, f_dict) if not t_params: UTLOGGER.warning( "Did not receive any parameters for %s, is this a problem?", p ) task_info = { "scan": os.path.join(bids_dir, p), "scan_parameters": t_params.copy(), } else: task_info = os.path.join(bids_dir, p) if "ses" not in f_dict: f_dict["ses"] = "1" if "sub" not in f_dict: raise IOError( "sub not found in %s," % (p) + " perhaps it isn't in BIDS format?" ) if f_dict["sub"] not in subdict: subdict[f_dict["sub"]] = {} subjid = "-".join(["sub", f_dict["sub"]]) if f_dict["ses"] not in subdict[f_dict["sub"]]: subdict[f_dict["sub"]][f_dict["ses"]] = { "creds_path": creds_path, "site_id": "-".join(["site", f_dict["site"]]), "subject_id": subjid, "unique_id": "-".join(["ses", f_dict["ses"]]), } if "T1w" in f_dict["scantype"] or "T2w" in f_dict["scantype"]: if "lesion" in f_dict.keys() and "mask" in f_dict["lesion"]: if "lesion_mask" not in subdict[f_dict["sub"]][f_dict["ses"]]: subdict[f_dict["sub"]][f_dict["ses"]]["lesion_mask"] = ( task_info["scan"] ) else: UTLOGGER.warning( "Lesion mask file (%s) already found for (%s:%s)" " discarding %s", subdict[f_dict["sub"]][f_dict["ses"]]["lesion_mask"], f_dict["sub"], f_dict["ses"], p, ) # TODO deal with scan parameters anatomical if "anat" not in subdict[f_dict["sub"]][f_dict["ses"]]: subdict[f_dict["sub"]][f_dict["ses"]]["anat"] = {} if ( f_dict["scantype"] not in subdict[f_dict["sub"]][f_dict["ses"]]["anat"] ): if only_one_anat: subdict[f_dict["sub"]][f_dict["ses"]]["anat"][ f_dict["scantype"] ] = task_info["scan"] if config_dict else task_info else: subdict[f_dict["sub"]][f_dict["ses"]]["anat"][ f_dict["scantype"] ] = [] if not only_one_anat: subdict[f_dict["sub"]][f_dict["ses"]]["anat"][ f_dict["scantype"] ].append(task_info["scan"] if config_dict else task_info) if "bold" in f_dict["scantype"]: task_key = f_dict["task"] if "run" in f_dict: task_key = "_".join([task_key, "-".join(["run", f_dict["run"]])]) if "acq" in f_dict: task_key = "_".join([task_key, "-".join(["acq", f_dict["acq"]])]) if "func" not in subdict[f_dict["sub"]][f_dict["ses"]]: subdict[f_dict["sub"]][f_dict["ses"]]["func"] = {} if task_key not in subdict[f_dict["sub"]][f_dict["ses"]]["func"]: if not isinstance(task_info, dict): task_info = {"scan": task_info} subdict[f_dict["sub"]][f_dict["ses"]]["func"][task_key] = task_info else: UTLOGGER.warning( "Func file (%s) already found for (%s: %s: %s) discarding %s", subdict[f_dict["sub"]][f_dict["ses"]]["func"][task_key], f_dict["sub"], f_dict["ses"], task_key, p, ) if "phase" in f_dict["scantype"]: if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]: subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {} if ( f_dict["scantype"] not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] ): subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][ f_dict["scantype"] ] = task_info if "magnitude" in f_dict["scantype"]: if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]: subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {} if ( f_dict["scantype"] not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] ): subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][ f_dict["scantype"] ] = task_info if "epi" in f_dict["scantype"]: pe_dir = f_dict["dir"] if "acq" in f_dict: if "fMRI" in f_dict["acq"]: if "fmap" not in subdict[f_dict["sub"]][f_dict["ses"]]: subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] = {} if ( f"epi_{pe_dir}" not in subdict[f_dict["sub"]][f_dict["ses"]]["fmap"] ): subdict[f_dict["sub"]][f_dict["ses"]]["fmap"][ f"epi_{pe_dir}" ] = task_info sublist = [] for ksub, sub in subdict.items(): for kses, ses in sub.items(): if "anat" in ses or "func" in ses: sublist.append(ses) else: if "anat" not in ses: UTLOGGER.warning( "%s %s %s is missing an anat", ses["site_id"] if "none" not in ses["site_id"] else "", ses["subject_id"], ses["unique_id"], ) if "func" not in ses: UTLOGGER.warning( "%s %s %s is missing a func", ses["site_id"] if "none" not in ses["site_id"] else "", ses["subject_id"], ses["unique_id"], ) return sublist
[docs] def collect_bids_files_configs(bids_dir, aws_input_creds=""): """ :param bids_dir: :param aws_input_creds: :return: """ file_paths = [] config_dict = {} suffixes = [ "T1w", "T2w", "bold", "epi", "phasediff", "phase1", "phase2", "magnitude", "magnitude1", "magnitude2", ] if bids_dir.lower().startswith("s3://"): # s3 paths begin with s3://bucket/ bucket_name = bids_dir.split("/")[2] s3_prefix = "/".join(bids_dir.split("/")[:3]) prefix = bids_dir.replace(s3_prefix, "").lstrip("/") if aws_input_creds: if not os.path.isfile(aws_input_creds): raise IOError("Could not find aws_input_creds (%s)" % (aws_input_creds)) from indi_aws import fetch_creds bucket = fetch_creds.return_bucket(aws_input_creds, bucket_name) UTLOGGER.info("gathering files from S3 bucket (%s) for %s", bucket, prefix) for s3_obj in bucket.objects.filter(Prefix=prefix): for suf in suffixes: if suf in str(s3_obj.key): if suf == "epi" and "acq-fMRI" not in s3_obj.key: continue if str(s3_obj.key).endswith("json"): try: config_dict[s3_obj.key.replace(prefix, "").lstrip("/")] = ( json.loads(s3_obj.get()["Body"].read()) ) except Exception as e: msg = ( f"Error retrieving {s3_obj.key.replace(prefix, '')}" f" ({e.message})" ) raise BotoCoreError(msg) from e elif "nii" in str(s3_obj.key): file_paths.append( str(s3_obj.key).replace(prefix, "").lstrip("/") ) else: for root, dirs, files in os.walk(bids_dir, topdown=False, followlinks=True): if files: for f in files: for suf in suffixes: if suf == "epi" and "acq-fMRI" not in f: continue if "nii" in f and suf in f: file_paths += [ os.path.join(root, f).replace(bids_dir, "").lstrip("/") ] if f.endswith("json") and suf in f: try: config_dict.update( { os.path.join( root.replace(bids_dir, "").lstrip("/"), f ): json.load(open(os.path.join(root, f), "r")) } ) except UnicodeDecodeError: msg = f"Could not decode {os.path.join(root, f)}" raise UnicodeDecodeError(msg) if not file_paths and not config_dict: msg = ( f"Didn't find any files in {bids_dir}. Please verify that the path is" " typed correctly, that you have read access to the directory, and that it" " is not empty." ) raise IOError(msg) return file_paths, config_dict
[docs] def camelCase(string: str) -> str: # pylint: disable=invalid-name """Convert a hyphenated string to camelCase. Parameters ---------- string : str string to convert to camelCase Returns ------- str Examples -------- >>> camelCase('PearsonNilearn-aCompCor') 'PearsonNilearnACompCor' >>> camelCase('mean-Pearson-Nilearn-aCompCor') 'meanPearsonNilearnACompCor' """ pieces = string.split("-") for i in range(1, len(pieces)): # don't change case of first piece if pieces[i]: # don't do anything to falsy pieces pieces[i] = f"{pieces[i][0].upper()}{pieces[i][1:]}" return "".join(pieces)
[docs] def combine_multiple_entity_instances(bids_str: str) -> str: """Combines mutliple instances of a key in a BIDS string to a single instance by camelCasing and concatenating the values. Parameters ---------- bids_str : str Returns ------- str Examples -------- >>> combine_multiple_entity_instances( ... 'sub-1_ses-HBN_site-RU_task-rest_atlas-AAL_' ... 'desc-Nilearn_desc-36-param_suffix.ext') 'sub-1_ses-HBN_site-RU_task-rest_atlas-AAL_desc-Nilearn36Param_suffix.ext' >>> combine_multiple_entity_instances( ... 'sub-1_ses-HBN_site-RU_task-rest_' ... 'run-1_framewise-displacement-power.1D') 'sub-1_ses-HBN_site-RU_task-rest_run-1_framewiseDisplacementPower.1D' """ _entity_list = bids_str.split("_") entity_list = _entity_list[:-1] suffixes = [camelCase(_entity_list[-1])] entities = {} for entity in entity_list: if "-" in entity: key, value = entity.split("-", maxsplit=1) if key not in entities: entities[key] = [] entities[key].append(value) for key, value in entities.items(): entities[key] = camelCase("-".join(value)) if "desc" in entities: # make 'desc' final entity suffixes.insert(0, f'desc-{entities.pop("desc")}') return "_".join([f"{key}-{value}" for key, value in entities.items()] + suffixes)
[docs] def insert_entity(resource, key, value): """Insert a `f'{key}-{value}'` BIDS entity before `desc-` if present or before the suffix otherwise. Parameters ---------- resource, key, value : str Returns ------- str Examples -------- >>> insert_entity('run-1_desc-preproc_bold', 'reg', 'default') 'run-1_reg-default_desc-preproc_bold' >>> insert_entity('run-1_bold', 'reg', 'default') 'run-1_reg-default_bold' >>> insert_entity('run-1_desc-preproc_bold', 'filt', 'notch4c0p31bw0p12') 'run-1_filt-notch4c0p31bw0p12_desc-preproc_bold' >>> insert_entity('run-1_reg-default_bold', 'filt', 'notch4c0p31bw0p12') 'run-1_reg-default_filt-notch4c0p31bw0p12_bold' """ entities = resource.split("_")[:-1] suff = resource.split("_")[-1] new_entities = [[], []] for entity in entities: if entity.startswith("desc-"): new_entities[1].append(entity) else: new_entities[0].append(entity) return "_".join([*new_entities[0], f"{key}-{value}", *new_entities[1], suff])
[docs] def load_yaml_config(config_filename, aws_input_creds): if config_filename.lower().startswith("data:"): try: header, encoded = config_filename.split(",", 1) config_content = b64decode(encoded) return yaml.safe_load(config_content) except: msg = f"Error! Could not find load config from data URI {config_filename}" raise BotoCoreError(msg) if config_filename.lower().startswith("s3://"): # s3 paths begin with s3://bucket/ bucket_name = config_filename.split("/")[2] s3_prefix = "/".join(config_filename.split("/")[:3]) prefix = config_filename.replace(s3_prefix, "").lstrip("/") if aws_input_creds: if not os.path.isfile(aws_input_creds): raise IOError("Could not find aws_input_creds (%s)" % (aws_input_creds)) from indi_aws import fetch_creds bucket = fetch_creds.return_bucket(aws_input_creds, bucket_name) downloaded_config = "/tmp/" + os.path.basename(config_filename) bucket.download_file(prefix, downloaded_config) config_filename = downloaded_config config_filename = os.path.realpath(config_filename) try: return yaml.safe_load(open(config_filename, "r")) except IOError: msg = f"Error! Could not find config file {config_filename}" raise FileNotFoundError(msg)
[docs] def cl_strip_brackets(arg_list): """Removes '[' from before first and ']' from after final arguments in a list of commandline arguments. Parameters ---------- arg_list : list Returns ------- list Examples -------- >>> cl_strip_brackets('[a b c]'.split(' ')) ['a', 'b', 'c'] >>> cl_strip_brackets('a b c'.split(' ')) ['a', 'b', 'c'] >>> cl_strip_brackets('[ a b c ]'.split(' ')) ['a', 'b', 'c'] """ arg_list[0] = arg_list[0].lstrip("[") arg_list[-1] = arg_list[-1].rstrip("]") return [arg for arg in arg_list if arg]
[docs] def create_cpac_data_config( bids_dir, participant_labels=None, aws_input_creds=None, skip_bids_validator=False, only_one_anat=True, ): """ Create a C-PAC data config YAML file from a BIDS directory. Parameters ---------- bids_dir : str participant_labels : list or None aws_input_creds skip_bids_validator : bool only_one_anat : bool The "anat" key for a subject expects a string value, but we can temporarily store a list instead by passing True here if we will be filtering that list down to a single string later Returns ------- list """ UTLOGGER.info("Parsing %s..", bids_dir) (file_paths, config) = collect_bids_files_configs(bids_dir, aws_input_creds) if participant_labels and file_paths: file_paths = [ file_path for file_path in file_paths if any( participant_label in file_path for participant_label in participant_labels ) ] if not file_paths: UTLOGGER.error("Did not find data for %s", ", ".join(participant_labels)) sys.exit(1) raise_error = not skip_bids_validator sub_list = bids_gen_cpac_sublist( bids_dir, file_paths, config, aws_input_creds, raise_error=raise_error, only_one_anat=only_one_anat, ) if not sub_list: UTLOGGER.error("Did not find data in %s", bids_dir) sys.exit(1) return sub_list
[docs] def load_cpac_data_config(data_config_file, participant_labels, aws_input_creds): """ Loads the file as a check to make sure it is available and readable. Parameters ---------- data_config_file : str path to data config participants_labels : list or None aws_input_creds Returns ------- list """ sub_list = load_yaml_config(data_config_file, aws_input_creds) if participant_labels: sub_list = [ d for d in sub_list if ( d["subject_id"] if d["subject_id"].startswith("sub-") else "sub-" + d["subject_id"] ) in participant_labels ] if not sub_list: UTLOGGER.error( "Did not find data for %s in %s", ", ".join(participant_labels), data_config_file if not data_config_file.startswith("data:") else "data URI", ) sys.exit(1) return sub_list
[docs] def res_in_filename(cfg, label): """Specify resolution in filename. Parameters ---------- cfg : CPAC.utils.configuration.Configuration label : str Returns ------- label : str Examples -------- >>> from CPAC.utils.configuration import Configuration >>> res_in_filename(Configuration({ ... 'registration_workflows': { ... 'anatomical_registration': {'resolution_for_anat': '2x2x2'}}}), ... 'sub-1_res-anat_bold') 'sub-1_res-2x2x2_bold' >>> res_in_filename(Configuration({ ... 'registration_workflows': { ... 'anatomical_registration': {'resolution_for_anat': '2x2x2'}}}), ... 'sub-1_res-3mm_bold') 'sub-1_res-3mm_bold' """ if "_res-" in label: # replace resolution text with actual resolution resolution = label.split("_res-", 1)[1].split("_", 1)[0] resolution = { "anat": cfg[ "registration_workflows", "anatomical_registration", "resolution_for_anat", ], "bold": cfg[ "registration_workflows", "functional_registration", "func_registration_to_template", "output_resolution", "func_preproc_outputs", ], "derivative": cfg[ "registration_workflows", "functional_registration", "func_registration_to_template", "output_resolution", "func_derivative_outputs", ], }.get(resolution, resolution) label = re.sub("_res-[A-Za-z0-9]*_", f"_res-{resolution}_", label) return label
[docs] def sub_list_filter_by_labels(sub_list, labels): """Function to filter a sub_list by provided BIDS labels for specified suffixes. Parameters ---------- sub_list : list labels : dict labels['T1w'] : str or None C-PAC currently only uses a single T1w image labels['bold'] : str, list, or None Returns ------- list """ if labels.get("T1w"): sub_list = _sub_list_filter_by_label(sub_list, "T1w", labels["T1w"]) if labels.get("bold"): labels["bold"] = cl_strip_brackets(labels["bold"]) sub_list = _sub_list_filter_by_label(sub_list, "bold", labels["bold"]) return sub_list
[docs] def with_key(entity: str, key: str) -> str: """Return a keyed BIDS entity. Parameters ---------- entity, key : str Returns ------- str Examples -------- >>> with_key('sub-1', 'sub') 'sub-1' >>> with_key('1', 'sub') 'sub-1' """ if not isinstance(entity, str): entity = str(entity) if not entity.startswith(f"{key}-"): entity = "-".join((key, entity)) return entity
[docs] def without_key(entity: str, key: str) -> str: """Return a BIDS entity value. Parameters ---------- entity, key : str Returns ------- str Examples -------- >>> without_key('sub-1', 'sub') '1' >>> without_key('1', 'sub') '1' """ if not isinstance(entity, str): entity = str(entity) if entity.startswith(f"{key}-"): entity = entity.replace(f"{key}-", "") return entity
def _t1w_filter(anat, shortest_entity, label): """Helper function to filter T1w paths. Parameters ---------- anat: list or str shortest_entity: bool label: str Returns ------- anat: list """ if not isinstance(anat, list): anat = [anat] if shortest_entity: anat = bids_shortest_entity(anat) else: anat = bids_match_entities(anat, label, "T1w") # pylint: disable=invalid-name try: anat_T2 = bids_match_entities(anat, label, "T2w") except LookupError: anat_T2 = None if anat_T2 is not None: anat = anat_T2 return anat def _sub_anat_filter(anat, shortest_entity, label): """Helper function to filter anat paths in sub_list. Parameters ---------- anat : list or dict shortest_entity : bool label : str Returns ------- list or dict same type as 'anat' parameter """ if isinstance(anat, dict): if "T1w" in anat: anat["T1w"] = _t1w_filter(anat["T1w"], shortest_entity, label) return anat return _t1w_filter(anat, shortest_entity, label) def _sub_list_filter_by_label(sub_list, label_type, label): """Function to filter a sub_list by a CLI-provided label. Parameters ---------- sub_list : list label_type : str 'T1w' or 'bold' label : str or list Returns ------- list Examples -------- >>> from CPAC.pipeline.test.sample_data import sub_list >>> _sub_list_filter_by_label(sub_list, 'bold', 'task-PEER1')[ ... 0]['func'].keys() dict_keys(['PEER1']) """ label_list = [label] if isinstance(label, str) else list(label) new_sub_list = [] if label_type in label_list: shortest_entity = True label_list.remove(label_type) else: shortest_entity = False if label_type == "T1w": for sub in [sub for sub in sub_list if "anat" in sub]: try: sub["anat"] = _sub_anat_filter( sub["anat"], shortest_entity, label_list[0] if not shortest_entity else None, ) if sub["anat"]: new_sub_list.append(sub) except LookupError as lookup_error: warn(str(lookup_error)) elif label_type == "bold": for sub in [sub for sub in sub_list if "func" in sub]: try: all_scans = [sub["func"][scan].get("scan") for scan in sub["func"]] new_func = {} for entities in label_list: matched_scans = bids_match_entities(all_scans, entities, label_type) for scan in matched_scans: new_func = { **new_func, **_match_functional_scan(sub["func"], scan), } if shortest_entity: new_func = { **new_func, **_match_functional_scan( sub["func"], bids_shortest_entity(all_scans) ), } sub["func"] = new_func new_sub_list.append(sub) except LookupError as lookup_error: warn(str(lookup_error)) return new_sub_list def _match_functional_scan(sub_list_func_dict, scan_file_to_match): """Function to subset a scan from a sub_list_func_dict by a scan filename. Parameters ---------- sub_list_func_dict : dict sub_list[sub]['func'] scan_file_to_match : str Returns ------- dict Examples -------- >>> from CPAC.pipeline.test.sample_data import sub_list >>> matched = _match_functional_scan( ... sub_list[0]['func'], ... '/fake/data/sub-0001/ses-NFB3/func/' ... 'sub-0001_ses-NFB3_task-PEER1_bold.nii.gz') >>> matched.keys() dict_keys(['PEER1']) >>> all([key in matched['PEER1'] for key in [ ... 'fmap_mag', 'fmap_phase', 'scan', 'scan_parameters' ... ]]) True """ return { entity: sub_list_func_dict[entity] for entity in sub_list_func_dict if sub_list_func_dict[entity].get("scan") == scan_file_to_match }