Source code for CPAC.utils.extract_data_multiscan

# Copyright (C) 2012-2024  C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
import glob
import os
import string

import yaml

from CPAC.utils.monitoring import UTLOGGER


[docs] def extract_data(c, param_map): """ Method to generate a CPAC input subject list python file. The method extracts anatomical functional data and scan parameters for each site( if multiple site) and for each scan and put it into a data structure read by python. Note: ----- Use this tool only if the scan parameters are different for each scan as shown in the example below. Example: -------- subjects_list = [ { 'subject_id': '0021001', 'unique_id': 'session2', 'anat': '/home/data/multiband_data/NKITRT/0021001/anat/mprage.nii.gz', 'rest':{ 'RfMRI_mx_1400_rest': '/home/data/multiband_data/NKITRT/0021001/session2/RfMRI_mx_1400/rest.nii.gz', 'RfMRI_mx_645_rest': '/home/data/multiband_data/NKITRT/0021001/session2/RfMRI_mx_645/rest.nii.gz', 'RfMRI_std_2500_rest': '/home/data/multiband_data/NKITRT/0021001/session2/RfMRI_std_2500/rest.nii.gz', }, 'scan_parameters':{ 'TR':{ 'RfMRI_mx_1400_rest': '1.4', 'RfMRI_mx_645_rest': '1.4', 'RfMRI_std_2500_rest': '2.5', }, 'Acquisition':{ 'RfMRI_mx_1400_rest': '/home/data/1400.txt', 'RfMRI_mx_645_rest': '/home/data/645.txt', 'RfMRI_std_2500_rest': '/home/data/2500.txt', }, 'Reference':{ 'RfMRI_mx_1400_rest': '32', 'RfMRI_mx_645_rest': '20', 'RfMRI_std_2500_rest': '19', }, 'FirstTR':{ 'RfMRI_mx_1400_rest': '7', 'RfMRI_mx_645_rest': '15', 'RfMRI_std_2500_rest': '4', }, 'LastTR':{ 'RfMRI_mx_1400_rest': '440', 'RfMRI_mx_645_rest': '898', 'RfMRI_std_2500_rest': 'None', }, } }, ] """ # method to read each line of the file into list # returns list def get_list(arg): if isinstance(arg, list): ret_list = arg else: ret_list = [fline.rstrip("\r\n") for fline in open(arg, "r").readlines()] return ret_list exclusion_list = [] if c.exclusionSubjectList is not None: exclusion_list = get_list(c.exclusionSubjectList) subject_list = [] if c.subjectList is not None: subject_list = get_list(c.subjectList) # check if Template is correct def checkTemplate(template): if template.count("%s") != 2: msg = ( "Please provide '%s' in the template" "where your site and subjects are present" "Please see examples" ) raise Exception(msg) filename, ext = os.path.splitext(os.path.basename(template)) ext = os.path.splitext(filename)[1] + ext if ext not in [".nii", ".nii.gz"]: msg = "Invalid file name" raise Exception(msg, os.path.basename(template)) def get_site_list(path): base = path.split("%s")[0] return os.listdir(base) def check_length(scan_name, file_name): if len(file_name) > 30: msg = ( "filename- %s is too long." "It should not be more than 30 characters." % (file_name) ) raise Exception(msg) if ( len(scan_name) - len(os.path.splitext(os.path.splitext(file_name)[0])[0]) >= 20 ): msg = ( "scan name %s is too long." "It should not be more than 20 characters" % ( scan_name.replace( "_" + os.path.splitext(os.path.splitext(file_name)[0])[0], "" ) ) ) raise Exception(msg) def create_site_subject_mapping(base, relative): # mapping between site and subject site_subject_map = {} base_path_list = [] if c.siteList is not None: site_list = get_list(c.siteList) else: site_list = get_site_list(base) for site in site_list: paths = glob.glob(string.replace(base, "%s", site)) base_path_list.extend(paths) for path in paths: for sub in os.listdir(path): # check if subject is present in subject_list if subject_list: if sub in subject_list and sub not in exclusion_list: site_subject_map[sub] = site elif sub not in exclusion_list: if sub not in ".DS_Store": site_subject_map[sub] = site return base_path_list, site_subject_map # method to split the input template path # into base, path before subject directory # and relative, path after subject directory def getPath(template): checkTemplate(template) base, relative = template.rsplit("%s", 1) base, subject_map = create_site_subject_mapping(base, relative) base.sort() relative = relative.lstrip("/") return base, relative, subject_map # get anatomical base path and anatomical relative path anat_base, anat_relative = getPath(c.anatomicalTemplate)[:2] # get functional base path, functional relative path and site-subject map func_base, func_relative, subject_map = getPath(c.functionalTemplate) if not anat_base: msg = ( f"No such file or directory {anat_base}" "\nAnatomical Data template incorrect" ) raise FileNotFoundError(msg) if not func_base: msg = ( f"No such file or directory {func_base}" "Functional Data template incorrect" ) raise FileNotFoundError(msg) if len(anat_base) != len(func_base): msg = ( f"Some sites are missing, Please check your template {anat_base} !=" f" {func_base}\nBase length Unequal. Some sites are missing. extract_data" " doesn't script support this. Please provide your own subjects_list file" ) raise FileNotFoundError(msg) # calculate the length of relative paths(path after subject directory) func_relative_len = len(func_relative.split("/")) anat_relative_len = len(anat_relative.split("/")) def check_for_sessions(relative_path, path_length): """Method to check if there are sessions present.""" # default session_present = False session_path = "session_1" # session present if path_length is equal to 3 if path_length == 3: relative_path_list = relative_path.split("/") session_path = relative_path_list[0] relative_path = string.join(relative_path_list[1:], "/") session_present = True elif path_length > 3: msg = ( "extract_data script currently doesn't support" "this directory structure.Please provide the" "subjects_list file to run CPAC." "For more information refer to manual" ) raise Exception(msg) return session_present, session_path, relative_path # if func_relative_len!= anat_relative_len: # raise Exception(" extract_data script currently doesn't"\ # "support different relative paths for"\ # "Anatomical and functional files") func_session_present, func_session_path, func_relative = check_for_sessions( func_relative, func_relative_len ) anat_session_present, anat_session_path, anat_relative = check_for_sessions( anat_relative, anat_relative_len ) f = open(os.path.join(c.outputSubjectListLocation, "CPAC_subject_list.yml"), "wb") def fetch_path(i, anat_sub, func_sub, session_id): """ Method to extract anatomical and functional path for a session and print to file. Parameters ---------- i : int index of site anat_sub : string string containing subject/ concatenated subject-session path for anatomical file func_sub : string string containing subject/ concatenated subject-session path for functional file session_id : string session Raises ------ Exception """ try: def print_begin_of_file(sub, session_id): print("-", file=f) print(" subject_id: '" + sub + "'", file=f) print(" unique_id: '" + session_id + "'", file=f) def print_end_of_file(sub, scan_list): if param_map is not None: def print_scan_param(index): try: for scan in scan_list: print( " " + scan[1] + ": '" + param_map.get((subject_map.get(sub), scan[0]))[ index ] + "'", file=f, ) except: raise Exception( " No Parameter values for the %s site and %s scan is defined in the scan" " parameters csv file" % (subject_map.get(sub), scan[0]) ) UTLOGGER.info("site for sub %s -> %s", sub, subject_map.get(sub)) print(" scan_parameters: ", file=f) print(" tr:", file=f) print_scan_param(4) print(" acquisition:", file=f) print_scan_param(0) print(" reference:", file=f) print_scan_param(3) print(" first_tr:", file=f) print_scan_param(1) print(" last_tr:", file=f) print_scan_param(2) # get anatomical file anat_base_path = os.path.join(anat_base[i], anat_sub) func_base_path = os.path.join(func_base[i], func_sub) anat = None func = None anat = glob.glob(os.path.join(anat_base_path, anat_relative)) func = glob.glob(os.path.join(func_base_path, func_relative)) scan_list = [] if anat and func: print_begin_of_file(anat_sub.split("/")[0], session_id) print(" anat: '" + anat[0] + "'", file=f) print(" rest: ", file=f) # iterate for each rest session for iter in func: # get scan_id iterable = os.path.splitext( os.path.splitext(iter.replace(func_base_path, "").lstrip("/"))[ 0 ] )[0] scan_name = iterable.replace("/", "_") scan_list.append((os.path.dirname(iterable), scan_name)) check_length(scan_name, os.path.basename(iter)) print(" " + scan_name + ": '" + iter + "'", file=f) print_end_of_file(anat_sub.split("/")[0], scan_list) except Exception: raise def walk(index, sub): """ Method which walks across each subject path in the data site path. Parameters ---------- index : int index of site sub : string subject_id Raises ------ Exception """ try: if func_session_present: # if there are sessions if "*" in func_session_path: session_list = glob.glob( os.path.join( func_base[index], os.path.join(sub, func_session_path) ) ) else: session_list = [func_session_path] for session in session_list: session_id = os.path.basename(session) if anat_session_present: if func_session_path == anat_session_path: fetch_path( index, os.path.join(sub, session_id), os.path.join(sub, session_id), session_id, ) else: fetch_path( index, os.path.join(sub, anat_session_path), os.path.join(sub, session_id), session_id, ) else: fetch_path( index, sub, os.path.join(sub, session_id), session_id ) else: UTLOGGER.info("No sessions") session_id = "" fetch_path(index, sub, sub, session_id) except Exception as e: msg = "Please make sessions are consistent across all subjects" raise ValueError(msg) from e try: for i in range(len(anat_base)): for sub in os.listdir(anat_base[i]): # check if subject is present in subject_list if subject_list: if sub in subject_list and sub not in exclusion_list: UTLOGGER.info("extracting data for subject: %s", sub) walk(i, sub) # check that subject is not in exclusion list elif sub not in exclusion_list and sub not in ".DS_Store": UTLOGGER.info("extracting data for subject: %s", sub) walk(i, sub) name = os.path.join(c.outputSubjectListLocation, "CPAC_subject_list.yml") UTLOGGER.info("Extraction Complete...Input Subjects_list for CPAC - %s", name) except Exception: raise finally: f.close()
[docs] def generate_suplimentary_files(output_path): """ Method to generate phenotypic template file and subject list for group analysis. """ import csv subjects_list = yaml.safe_load( open(os.path.join(output_path, "CPAC_subject_list.yml"), "r") ) subject_scan_set = set() subject_set = set() scan_set = set() data_list = [] for sub in subjects_list: if sub["unique_id"]: subject_id = sub["subject_id"] + "_" + sub["unique_id"] else: subject_id = sub["subject_id"] for scan in list(sub["rest"]): subject_scan_set.add((subject_id, scan)) subject_set.add(subject_id) scan_set.add(scan) for item in subject_scan_set: list1 = [] list1.append(item[0] + "/" + item[1]) for val in subject_set: if val in item: list1.append(1) else: list1.append(0) for val in scan_set: if val in item: list1.append(1) else: list1.append(0) data_list.append(list1) # prepare data for phenotypic file if len(scan_set) > 1: list1 = ["subject_id/Scan"] list1.extend(list(subject_set)) list1.extend(list(scan_set)) file_name = os.path.join(output_path, "phenotypic_template.csv") f = open(file_name, "wb") writer = csv.writer(f) if len(scan_set) > 1: writer.writerow(list1) writer.writerows(data_list) else: writer.writerow(["subject_id"]) for sub in subject_set: writer.writerow([sub]) f.close() UTLOGGER.info("Template Phenotypic file for group analysis - %s", file_name) file_name = os.path.join(output_path, "subject_list_group_analysis.txt") f = open(file_name, "w") for sub in subject_set: print(sub, file=f) UTLOGGER.info("Subject list required later for group analysis - %s", file_name) f.close()
[docs] def read_csv(csv_input): """ Method to read csv file 'Acquisition' 'Reference' 'Site' 'TR (seconds)'. """ from collections import defaultdict import csv try: reader = csv.DictReader(open(csv_input, "U")) dict_labels = defaultdict(list) for line in reader: csv_dict = {k.lower(): v for k, v in line.items()} dict_labels[csv_dict.get("site"), csv_dict.get("scan")] = [ csv_dict[key] for key in sorted(csv_dict.keys()) if key not in ("site", "scan") ] if len(dict_labels) < 1: msg = "Scan Parameters File is either empty or missing header" raise ValueError(msg) except: msg = "Error reading scan parameters csv" raise ValueError(msg) return dict_labels
""" Class to set dictionary keys as map attributes """
[docs] class Configuration(object): def __init__(self, config_map): for key in config_map: if config_map[key] == "None": config_map[key] = None setattr(self, key, config_map[key])
[docs] def run(data_config): """ Run method takes data_config file as the input argument. """ c = Configuration(yaml.safe_load(open(os.path.realpath(data_config), "r"))) if c.scanParametersCSV is not None: s_param_map = read_csv(c.scanParametersCSV) else: UTLOGGER.warning( "no scan parameters csv included. make sure you turn off slice timing" " correction option in CPAC configuration" ) s_param_map = None extract_data(c, s_param_map) generate_suplimentary_files(c.outputSubjectListLocation)