Source code for CPAC.utils.extract_data_multiscan

import os
import glob
import string
import yaml

[docs]def extract_data(c, param_map): """ Method to generate a CPAC input subject list python file. The method extracts anatomical functional data and scan parameters for each site( if multiple site) and for each scan and put it into a data structure read by python Note: ----- Use this tool only if the scan parameters are different for each scan as shown in the example below. Example: -------- subjects_list = [ { 'subject_id': '0021001', 'unique_id': 'session2', 'anat': '/home/data/multiband_data/NKITRT/0021001/anat/mprage.nii.gz', 'rest':{ 'RfMRI_mx_1400_rest': '/home/data/multiband_data/NKITRT/0021001/session2/RfMRI_mx_1400/rest.nii.gz', 'RfMRI_mx_645_rest': '/home/data/multiband_data/NKITRT/0021001/session2/RfMRI_mx_645/rest.nii.gz', 'RfMRI_std_2500_rest': '/home/data/multiband_data/NKITRT/0021001/session2/RfMRI_std_2500/rest.nii.gz', }, 'scan_parameters':{ 'TR':{ 'RfMRI_mx_1400_rest': '1.4', 'RfMRI_mx_645_rest': '1.4', 'RfMRI_std_2500_rest': '2.5', }, 'Acquisition':{ 'RfMRI_mx_1400_rest': '/home/data/1400.txt', 'RfMRI_mx_645_rest': '/home/data/645.txt', 'RfMRI_std_2500_rest': '/home/data/2500.txt', }, 'Reference':{ 'RfMRI_mx_1400_rest': '32', 'RfMRI_mx_645_rest': '20', 'RfMRI_std_2500_rest': '19', }, 'FirstTR':{ 'RfMRI_mx_1400_rest': '7', 'RfMRI_mx_645_rest': '15', 'RfMRI_std_2500_rest': '4', }, 'LastTR':{ 'RfMRI_mx_1400_rest': '440', 'RfMRI_mx_645_rest': '898', 'RfMRI_std_2500_rest': 'None', }, } }, ] """ #method to read each line of the file into list #returns list def get_list(arg): if isinstance(arg, list): ret_list = arg else: ret_list = [fline.rstrip('\r\n') for fline in open(arg, 'r').readlines()] return ret_list exclusion_list = [] if c.exclusionSubjectList is not None: exclusion_list = get_list(c.exclusionSubjectList) subject_list = [] if c.subjectList is not None: subject_list = get_list(c.subjectList) #check if Template is correct def checkTemplate(template): if template.count('%s') != 2: raise Exception("Please provide '%s' in the template" \ "where your site and subjects are present"\ "Please see examples") filename, ext = os.path.splitext(os.path.basename(template)) ext = os.path.splitext(filename)[1] + ext if ext not in [".nii", ".nii.gz"]: raise Exception("Invalid file name", os.path.basename(template)) def get_site_list(path): base = path.split('%s')[0] sites = os.listdir(base) return sites def check_length(scan_name, file_name): if len(file_name) > 30: msg = "filename- %s is too long."\ "It should not be more than 30 characters."%(file_name) raise Exception(msg) if len(scan_name) - len(os.path.splitext(os.path.splitext(file_name)[0])[0])>= 20: msg = "scan name %s is too long."\ "It should not be more than 20 characters"\ %(scan_name.replace("_"+os.path.splitext(os.path.splitext(file_name)[0])[0], '')) raise Exception(msg) def create_site_subject_mapping(base, relative): #mapping between site and subject site_subject_map = {} base_path_list = [] if c.siteList is not None: site_list = get_list(c.siteList) else: site_list = get_site_list(base) for site in site_list: paths = glob.glob(string.replace(base, '%s', site)) base_path_list.extend(paths) for path in paths: for sub in os.listdir(path): #check if subject is present in subject_list if subject_list: if sub in subject_list and sub not in exclusion_list: site_subject_map[sub] = site elif sub not in exclusion_list: if sub not in '.DS_Store': site_subject_map[sub] = site return base_path_list, site_subject_map #method to split the input template path #into base, path before subject directory #and relative, path after subject directory def getPath(template): checkTemplate(template) base, relative = template.rsplit("%s", 1) base, subject_map = create_site_subject_mapping(base, relative) base.sort() relative = relative.lstrip("/") return base, relative, subject_map #get anatomical base path and anatomical relative path anat_base, anat_relative = getPath(c.anatomicalTemplate)[:2] #get functional base path, functional relative path and site-subject map func_base, func_relative, subject_map = getPath(c.functionalTemplate) if not anat_base: print("No such file or directory ", anat_base) raise Exception("Anatomical Data template incorrect") if not func_base: print("No such file or directory", func_base) raise Exception("Functional Data template incorrect") if len(anat_base) != len(func_base): print("Some sites are missing, Please check your"\ "template", anat_base, "!=", func_base) raise Exception(" Base length Unequal. Some sites are missing."\ "extract_data doesn't script support this.Please" \ "Provide your own subjects_list file") #calculate the length of relative paths(path after subject directory) func_relative_len = len(func_relative.split('/')) anat_relative_len = len(anat_relative.split('/')) def check_for_sessions(relative_path, path_length): """ Method to check if there are sessions present """ #default session_present = False session_path = 'session_1' #session present if path_length is equal to 3 if path_length == 3: relative_path_list = relative_path.split('/') session_path = relative_path_list[0] relative_path = string.join(relative_path_list[1:], "/") session_present = True elif path_length > 3: raise Exception("extract_data script currently doesn't support"\ "this directory structure.Please provide the"\ "subjects_list file to run CPAC." \ "For more information refer to manual") return session_present, session_path, relative_path # if func_relative_len!= anat_relative_len: # raise Exception(" extract_data script currently doesn't"\ # "support different relative paths for"\ # "Anatomical and functional files") func_session_present, func_session_path, func_relative = \ check_for_sessions(func_relative, func_relative_len) anat_session_present, anat_session_path, anat_relative = \ check_for_sessions(anat_relative, anat_relative_len) f = open(os.path.join(c.outputSubjectListLocation, "CPAC_subject_list.yml"), 'wb') def fetch_path(i, anat_sub, func_sub, session_id): """ Method to extract anatomical and functional path for a session and print to file Parameters ---------- i : int index of site anat_sub : string string containing subject/ concatenated subject-session path for anatomical file func_sub : string string containing subject/ concatenated subject-session path for functional file session_id : string session Raises ------ Exception """ try: def print_begin_of_file(sub, session_id): print("-", file=f) print(" subject_id: '" + sub + "'", file=f) print(" unique_id: '" + session_id + "'", file=f) def print_end_of_file(sub, scan_list): if param_map is not None: def print_scan_param(index): try: for scan in scan_list: print(" " + scan[1] + ": '" + \ param_map.get((subject_map.get(sub), scan[0]))[index] + "'", file=f) except: raise Exception(" No Parameter values for the %s site and %s scan is defined in the scan"\ " parameters csv file" % (subject_map.get(sub), scan[0])) print("site for sub", sub, "->", subject_map.get(sub)) print(" scan_parameters: ", file=f) print(" tr:", file=f) print_scan_param(4) print(" acquisition:", file=f) print_scan_param(0) print(" reference:", file=f) print_scan_param(3) print(" first_tr:", file=f) print_scan_param(1) print(" last_tr:", file=f) print_scan_param(2) #get anatomical file anat_base_path = os.path.join(anat_base[i], anat_sub) func_base_path = os.path.join(func_base[i], func_sub) anat = None func = None anat = glob.glob(os.path.join(anat_base_path, anat_relative)) func = glob.glob(os.path.join(func_base_path, func_relative)) scan_list = [] if anat and func: print_begin_of_file(anat_sub.split("/")[0], session_id) print(" anat: '" + anat[0] + "'", file=f) print(" rest: ", file=f) #iterate for each rest session for iter in func: #get scan_id iterable = os.path.splitext(os.path.splitext(iter.replace(func_base_path,'').lstrip("/"))[0])[0] scan_name = iterable.replace("/", "_") scan_list.append((os.path.dirname(iterable), scan_name)) check_length(scan_name, os.path.basename(iter)) print(" " + scan_name + ": '" + iter + "'", file=f) print_end_of_file(anat_sub.split("/")[0], scan_list) except Exception: raise def walk(index, sub): """ Method which walks across each subject path in the data site path Parameters ---------- index : int index of site sub : string subject_id Raises ------ Exception """ try: if func_session_present: #if there are sessions if "*" in func_session_path: session_list = glob.glob(os.path.join(func_base[index], os.path.join(sub, func_session_path))) else: session_list = [func_session_path] for session in session_list: session_id = os.path.basename(session) if anat_session_present: if func_session_path == anat_session_path: fetch_path(index, os.path.join(sub, session_id), os.path.join(sub, session_id), session_id) else: fetch_path(index, os.path.join(sub, anat_session_path), os.path.join(sub, session_id), session_id) else: fetch_path(index, sub, os.path.join(sub, session_id), session_id) else: print("No sessions") session_id = '' fetch_path(index, sub, sub, session_id) except Exception: raise except: print("Please make sessions are consistent across all subjects") raise try: for i in range(len(anat_base)): for sub in os.listdir(anat_base[i]): #check if subject is present in subject_list if subject_list: if sub in subject_list and sub not in exclusion_list: print("extracting data for subject: ", sub) walk(i, sub) #check that subject is not in exclusion list elif sub not in exclusion_list and sub not in '.DS_Store': print("extracting data for subject: ", sub) walk(i, sub) name = os.path.join(c.outputSubjectListLocation, 'CPAC_subject_list.yml') print("Extraction Complete...Input Subjects_list for CPAC - %s" % name) except Exception: raise finally: f.close()
[docs]def generate_suplimentary_files(output_path): """ Method to generate phenotypic template file and subject list for group analysis """ from sets import Set import csv subjects_list = yaml.safe_load(open(os.path.join(output_path, 'CPAC_subject_list.yml'), 'r')) subject_scan_set = Set() subject_set = Set() scan_set = Set() data_list = [] for sub in subjects_list: if sub['unique_id']: subject_id = sub['subject_id'] + "_" + sub['unique_id'] else: subject_id = sub['subject_id'] for scan in list(sub['rest']): subject_scan_set.add((subject_id, scan)) subject_set.add(subject_id) scan_set.add(scan) for item in subject_scan_set: list1 = [] list1.append(item[0] + "/" + item[1]) for val in subject_set: if val in item: list1.append(1) else: list1.append(0) for val in scan_set: if val in item: list1.append(1) else: list1.append(0) data_list.append(list1) #prepare data for phenotypic file if len(scan_set) > 1: list1 = ['subject_id/Scan'] list1.extend(list(subject_set)) list1.extend(list(scan_set)) file_name = os.path.join(output_path, 'phenotypic_template.csv') f = open(file_name, 'wb') writer = csv.writer(f) if len(scan_set) > 1: writer.writerow(list1) writer.writerows(data_list) else: writer.writerow(['subject_id']) for sub in subject_set: writer.writerow([sub]) f.close() print("Template Phenotypic file for group analysis - %s" % file_name) file_name = os.path.join(output_path, "subject_list_group_analysis.txt") f = open(file_name, 'w') for sub in subject_set: print(sub, file=f) print("Subject list required later for group analysis - %s" % file_name) f.close()
[docs]def read_csv(csv_input): """ Method to read csv file 'Acquisition' 'Reference' 'Site' 'TR (seconds)' """ import csv from collections import defaultdict try: reader = csv.DictReader(open(csv_input, "U")) dict_labels = defaultdict(list) for line in reader: csv_dict = dict((k.lower(), v) for k, v in line.items()) dict_labels[csv_dict.get('site'), csv_dict.get('scan')] = \ [csv_dict[key] for key in sorted(csv_dict.keys()) \ if key != 'site' and key != 'scan'] if len(dict_labels) < 1: raise Exception("Scan Parameters File is either empty"\ "or missing header") except: print("Error reading scan parameters csv") raise return dict_labels
""" Class to set dictionary keys as map attributes """
[docs]class Configuration(object): def __init__(self, config_map): for key in config_map: if config_map[key] == 'None': config_map[key] = None setattr(self, key, config_map[key])
[docs]def run(data_config): """ Run method takes data_config file as the input argument """ c = Configuration(yaml.safe_load(open(os.path.realpath(data_config), 'r'))) if c.scanParametersCSV is not None: s_param_map = read_csv(c.scanParametersCSV) else: print("no scan parameters csv included"\ "make sure you turn off slice timing correction option"\ "in CPAC configuration") s_param_map = None extract_data(c, s_param_map) generate_suplimentary_files(c.outputSubjectListLocation)