Source code for CPAC.utils.extract_data
import sys
import os
import glob
import string
import logging
import yaml
[docs]def extract_data(c, param_map):
"""
Method to generate a CPAC input subject list
python file. The method extracts anatomical
and functional data for each site( if multiple site)
and/or scan parameters for each site and put it into
a data structure read by python
Example:
subjects_list =[
{
'subject_id' : '0050386',
'unique_id' : 'session_1',
'anat': '/Users/home/data/NYU/0050386/session_1/anat_1/anat.nii.gz',
'rest':{
'rest_1_rest' : '/Users/home/data/NYU/0050386/session_1/rest_1/rest.nii.gz',
'rest_2_rest' : '/Users/home/data/NYU/0050386/session_1/rest_2/rest.nii.gz',
}
'scan_parameters':{
'tr': '2',
'acquisition': 'alt+z2',
'reference': '17',
'first_tr': '',
'last_tr': '',
}
},
]
or
subjects_list =[
{
'subject_id' : '0050386',
'unique_id' : 'session_1',
'anat': '/Users/home/data/NYU/0050386/session_1/anat_1/anat.nii.gz',
'rest':{
'rest_1_rest' : '/Users/home/data/NYU/0050386/session_1/rest_1/rest.nii.gz',
'rest_2_rest' : '/Users/home/data/NYU/0050386/session_1/rest_2/rest.nii.gz',
}
},
]
"""
#method to read each line of the file into list
#returns list
def get_list(arg):
if isinstance(arg, list):
ret_list = arg
else:
ret_list = [fline.rstrip('\r\n') for fline in open(arg, 'r').readlines()]
return ret_list
exclusion_list = []
if c.exclusionSubjectList is not None:
exclusion_list = get_list(c.exclusionSubjectList)
subject_list = []
if c.subjectList is not None:
subject_list = get_list(c.subjectList)
#check if Template is correct
def checkTemplate(template):
if template.count('%s') != 2:
msg = "Please provide '%s' in the template" \
"where your site and subjects are present"\
"Please see examples"
logging.exception(msg)
raise Exception(msg)
filename, ext = os.path.splitext(os.path.basename(template))
ext = os.path.splitext(filename)[1] + ext
if ext not in [".nii", ".nii.gz"]:
msg = "Invalid file name", os.path.basename(template)
logging.exception(msg)
raise Exception(msg)
def get_site_list(path):
base, relative = path.split('%s')
sites = os.listdir(base)
return sites
def check_length(scan_name, file_name):
if len(file_name) > 30:
msg = "filename- %s is too long."\
"It should not be more than 30 characters."%(file_name)
logging.exception(msg)
raise Exception(msg)
if len(scan_name) - len(os.path.splitext(os.path.splitext(file_name)[0])[0])>= 40:
msg = "scan name %s is too long."\
"It should not be more than 20 characters"\
%(scan_name.replace("_"+os.path.splitext(os.path.splitext(file_name)[0])[0], ''))
logging.exception(msg)
raise Exception(msg)
def create_site_subject_mapping(base, relative):
#mapping between site and subject
site_subject_map = {}
base_path_list = []
if c.siteList is not None:
site_list = get_list(c.siteList)
else:
site_list = get_site_list(base)
for site in site_list:
paths = glob.glob(string.replace(base, '%s', site))
base_path_list.extend(paths)
for path in paths:
for sub in os.listdir(path):
#check if subject is present in subject_list
if subject_list:
if sub in subject_list and sub not in exclusion_list:
site_subject_map[sub] = site
elif sub not in exclusion_list:
if sub not in '.DS_Store':
site_subject_map[sub] = site
return base_path_list, site_subject_map
#method to split the input template path
#into base, path before subject directory
#and relative, path after subject directory
def getPath(template):
checkTemplate(template)
base, relative = template.rsplit("%s", 1)
base, subject_map = create_site_subject_mapping(base, relative)
base.sort()
relative = relative.lstrip("/")
return base, relative, subject_map
#get anatomical base path and anatomical relative path
anat_base, anat_relative = getPath(c.anatomicalTemplate)[:2]
#get functional base path, functional relative path and site-subject map
func_base, func_relative, subject_map = getPath(c.functionalTemplate)
if not anat_base:
msg = "Anatomical Data template incorrect. No such file or directory %s", anat_base
logging.exception(msg)
raise Exception(msg)
if not func_base:
msg = "Functional Data template incorrect. No such file or directory %s, func_base"
logging.exception(msg)
raise Exception(msg)
if len(anat_base) != len(func_base):
msg1 = "Some sites are missing, Please check your template"\
, anat_base, "!=", func_base
logging.exception(msg1)
msg2 = " Base length Unequal. Some sites are missing."\
"extract_data doesn't script support this.Please" \
"Provide your own subjects_list file"
logging.exception(msg2)
raise Exception(msg2)
#calculate the length of relative paths(path after subject directory)
func_relative_len = len(func_relative.split('/'))
anat_relative_len = len(anat_relative.split('/'))
def check_for_sessions(relative_path, path_length):
"""
Method to check if there are sessions present
"""
#default
session_present = False
session_path = 'session_1'
#session present if path_length is equal to 3
if path_length == 3:
relative_path_list = relative_path.split('/')
session_path = relative_path_list[0]
relative_path = string.join(relative_path_list[1:], "/")
session_present = True
elif path_length > 3:
msg = "extract_data script currently doesn't support this directory structure."\
"Please provide the subjects_list file to run CPAC."\
"For more information refer to manual"
logging.exception(msg)
raise Exception(msg)
return session_present, session_path, relative_path
func_session_present, func_session_path, func_relative = \
check_for_sessions(func_relative, func_relative_len)
anat_session_present, anat_session_path, anat_relative = \
check_for_sessions(anat_relative, anat_relative_len)
f = open(os.path.join(c.outputSubjectListLocation, "CPAC_subject_list_%s.yml" % c.subjectListName), 'wb')
def fetch_path(i, anat_sub, func_sub, session_id):
"""
Method to extract anatomical and functional
path for a session and print to file
Parameters
----------
i : int
index of site
anat_sub : string
string containing subject/ concatenated
subject-session path for anatomical file
func_sub : string
string containing subject/ concatenated
subject-session path for functional file
session_id : string
session
Raises
------
Exception
"""
try:
def print_begin_of_file(sub, session_id):
print("-", file=f)
print(" subject_id: '" + sub + "'", file=f)
print(" unique_id: '" + session_id + "'", file=f)
def print_end_of_file(sub):
if param_map is not None:
try:
logging.debug("site for sub %s -> %s" %(sub, subject_map.get(sub)))
logging.debug("scan parameters for the above site %s"%param_map.get(subject_map.get(sub)))
print(" scan_parameters:", file=f)
print(" tr: '" + param_map.get(subject_map.get(sub))[4] + "'", file=f)
print(" acquisition: '" + param_map.get(subject_map.get(sub))[0] + "'", file=f)
print(" reference: '" + param_map.get(subject_map.get(sub))[3] + "'", file=f)
print(" first_tr: '" + param_map.get(subject_map.get(sub))[1] + "'", file=f)
print(" last_tr: '" + param_map.get(subject_map.get(sub))[2] + "'", file=f)
except:
msg = " No Parameter values for the %s site is defined in the scan"\
" parameters csv file" %subject_map.get(sub)
raise ValueError(msg)
#get anatomical file
anat_base_path = os.path.join(anat_base[i], anat_sub)
func_base_path = os.path.join(func_base[i], func_sub)
anat = None
func = None
anat = glob.glob(os.path.join(anat_base_path, anat_relative))
func = glob.glob(os.path.join(func_base_path, func_relative))
if anat and func:
print_begin_of_file(anat_sub.split("/")[0], session_id)
print(" anat: '" + os.path.realpath(anat[0]) + "'", file=f)
print(" rest: ", file=f)
#iterate for each rest session
for iter in func:
#get scan_id
iterable = os.path.splitext(os.path.splitext(iter.replace(func_base_path, '').lstrip("/"))[0])[0]
iterable = iterable.replace("/", "_")
check_length(iterable, os.path.basename(os.path.realpath(iter)))
print(" " + iterable + ": '" + os.path.realpath(iter) + "'", file=f)
print_end_of_file(anat_sub.split("/")[0])
else:
logging.debug("skipping subject %s"%anat_sub.split("/")[0])
except ValueError:
logging.exception(ValueError.message)
raise
except Exception as e:
err_msg = 'Exception while felching anatomical and functional ' \
'paths: \n' + str(e)
logging.exception(err_msg)
raise Exception(err_msg)
def walk(index, sub):
"""
Method which walks across each subject
path in the data site path
Parameters
----------
index : int
index of site
sub : string
subject_id
Raises
------
Exception
"""
try:
if func_session_present:
#if there are sessions
if "*" in func_session_path:
session_list = glob.glob(os.path.join(func_base[index], os.path.join(sub, func_session_path)))
else:
session_list = [func_session_path]
if session_list:
for session in session_list:
session_id = os.path.basename(session)
if anat_session_present:
if func_session_path == anat_session_path:
fetch_path(index, os.path.join(sub, session_id), os.path.join(sub, session_id), session_id)
else:
fetch_path(index, os.path.join(sub, anat_session_path), os.path.join(sub, session_id), session_id)
else:
fetch_path(index, sub, os.path.join(sub, session_id), session_id)
else:
logging.debug("Skipping subject %s", sub)
else:
logging.debug("No sessions")
session_id = ''
fetch_path(index, sub, sub, session_id)
except Exception:
logging.exception(Exception.message)
raise
except:
err_msg = 'Please make sessions are consistent across all ' \
'subjects.\n\n'
logging.exception(err_msg)
raise Exception(err_msg)
try:
for i in range(len(anat_base)):
for sub in os.listdir(anat_base[i]):
#check if subject is present in subject_list
if subject_list:
if sub in subject_list and sub not in exclusion_list:
logging.debug("extracting data for subject: %s", sub)
walk(i, sub)
#check that subject is not in exclusion list
elif sub not in exclusion_list and sub not in '.DS_Store':
logging.debug("extracting data for subject: %s", sub)
walk(i, sub)
name = os.path.join(c.outputSubjectListLocation, 'CPAC_subject_list.yml')
print("Extraction Successfully Completed...Input Subjects_list for CPAC - %s" % name)
except Exception:
logging.exception(Exception.message)
raise
finally:
f.close()
[docs]def generate_supplementary_files(data_config_outdir, data_config_name):
"""
Method to generate phenotypic template file
and subject list for group analysis
"""
import os
from sets import Set
import csv
data_config_path = os.path.join(data_config_outdir, data_config_name)
try:
subjects_list = yaml.safe_load(open(data_config_path, 'r'))
except:
err = "\n\n[!] Data configuration file couldn't be read!\nFile " \
"path: {0}\n".format(data_config_path)
subject_scan_set = Set()
subID_set = Set()
session_set = Set()
subject_set = Set()
scan_set = Set()
data_list = []
try:
for sub in subjects_list:
if sub['unique_id']:
subject_id = sub['subject_id'] + "_" + sub['unique_id']
else:
subject_id = sub['subject_id']
try:
for scan in sub['func']:
subject_scan_set.add((subject_id, scan))
subID_set.add(sub['subject_id'])
session_set.add(sub['unique_id'])
subject_set.add(subject_id)
scan_set.add(scan)
except KeyError:
try:
for scan in sub['rest']:
subject_scan_set.add((subject_id, scan))
subID_set.add(sub['subject_id'])
session_set.add(sub['unique_id'])
subject_set.add(subject_id)
scan_set.add(scan)
except KeyError:
# one of the participants in the subject list has no
# functional scans
subID_set.add(sub['subject_id'])
session_set.add(sub['unique_id'])
subject_set.add(subject_id)
except TypeError as e:
print('Subject list could not be populated!')
print('This is most likely due to a mis-formatting in your '\
'inclusion and/or exclusion subjects txt file or your '\
'anatomical and/or functional path templates.')
print('Error: %s' % e)
err_str = 'Check formatting of your anatomical/functional path '\
'templates and inclusion/exclusion subjects text files'
raise TypeError(err_str)
for item in subject_scan_set:
list1 = []
list1.append(item[0] + "/" + item[1])
for val in subject_set:
if val in item:
list1.append(1)
else:
list1.append(0)
for val in scan_set:
if val in item:
list1.append(1)
else:
list1.append(0)
data_list.append(list1)
# generate the phenotypic file templates for group analysis
file_name = os.path.join(data_config_outdir, 'phenotypic_template_%s.csv'
% data_config_name)
try:
f = open(file_name, 'wb')
except:
print('\n\nCPAC says: I couldn\'t save this file to your drive:\n')
print(file_name, '\n\n')
print('Make sure you have write access? Then come back. Don\'t ' \
'worry.. I\'ll wait.\n\n')
raise IOError
writer = csv.writer(f)
writer.writerow(['participant', 'EV1', '..'])
for sub in sorted(subID_set):
writer.writerow([sub, ''])
f.close()
print("Template Phenotypic file for group analysis - %s" % file_name)
"""
# generate the phenotypic file templates for repeated measures
if (len(session_set) > 1) and (len(scan_set) > 1):
file_name = os.path.join(data_config_outdir, 'phenotypic_template_repeated' \
'_measures_mult_sessions_and_scans_%s.csv' \
% data_config_name)
try:
f = open(file_name, 'wb')
except:
print '\n\nCPAC says: I couldn\'t save this file to your drive:\n'
print file_name, '\n\n'
print 'Make sure you have write access? Then come back. Don\'t ' \
'worry.. I\'ll wait.\n\n'
raise IOError
writer = csv.writer(f)
writer.writerow(['participant', 'session', 'series', 'EV1', '..'])
for session in sorted(session_set):
for scan in sorted(scan_set):
for sub in sorted(subID_set):
writer.writerow([sub, session, scan, ''])
f.close()
print "Template Phenotypic file for group analysis with repeated " \
"measures (multiple sessions and scans) - %s" % file_name
if (len(session_set) > 1):
file_name = os.path.join(data_config_outdir, 'phenotypic_template_repeated' \
'_measures_multiple_sessions_%s.csv' % data_config_name)
try:
f = open(file_name, 'wb')
except:
print '\n\nCPAC says: I couldn\'t save this file to your drive:\n'
print file_name, '\n\n'
print 'Make sure you have write access? Then come back. Don\'t ' \
'worry.. I\'ll wait.\n\n'
raise IOError
writer = csv.writer(f)
writer.writerow(['participant', 'session', 'EV1', '..'])
for session in sorted(session_set):
for sub in sorted(subID_set):
writer.writerow([sub, session, ''])
f.close()
print "Template Phenotypic file for group analysis with repeated " \
"measures (multiple sessions) - %s" % file_name
if (len(scan_set) > 1):
file_name = os.path.join(data_config_outdir, 'phenotypic_template_repeated' \
'_measures_multiple_scans_%s.csv' % data_config_name)
try:
f = open(file_name, 'wb')
except:
print '\n\nCPAC says: I couldn\'t save this file to your drive:\n'
print file_name, '\n\n'
print 'Make sure you have write access? Then come back. Don\'t ' \
'worry.. I\'ll wait.\n\n'
raise IOError
writer = csv.writer(f)
writer.writerow(['participant', 'series', 'EV1', '..'])
for scan in sorted(scan_set):
for sub in sorted(subID_set):
writer.writerow([sub, scan, ''])
f.close()
print "Template Phenotypic file for group analysis with repeated " \
"measures (multiple scans) - %s" % file_name
"""
# generate the group analysis subject lists
file_name = os.path.join(data_config_outdir,
'participant_list_group_analysis_%s.txt'
% data_config_name)
try:
with open(file_name, 'w') as f:
for sub in sorted(subID_set):
print(sub, file=f)
except:
print('\n\nCPAC says: I couldn\'t save this file to your drive:\n')
print(file_name, '\n\n')
print('Make sure you have write access? Then come back. Don\'t ' \
'worry.. I\'ll wait.\n\n')
raise IOError
print("Participant list required later for group analysis - %s\n\n" \
% file_name)
[docs]def read_csv(csv_input):
"""
Method to read csv file
'Acquisition'
'Reference'
'Site'
'TR (seconds)'
"""
import csv
from collections import defaultdict
try:
reader = csv.DictReader(open(csv_input, "U"))
dict_labels = defaultdict(list)
for line in reader:
csv_dict = dict((k.lower(), v) for k, v in line.items())
dict_labels[csv_dict.get('site')] = [
csv_dict[key] for key in sorted(list(
csv_dict.keys()
)) if key != 'site' and key != 'scan'
]
if len(dict_labels) < 1:
msg ="Scan Parameters File is either empty"\
"or missing header"
logging.exception(msg)
raise Exception(msg)
return dict_labels
except IOError:
msg = "Error reading the csv file %s", csv_input
logging.exception(msg)
raise Exception(msg)
except:
msg = "Error reading scan parameters csv. Make sure you are using the correct template"
logging.exception(msg)
raise Exception(msg)
"""
Class to set dictionary keys as map attributes
"""
[docs]class Configuration(object):
def __init__(self, config_map):
for key in config_map:
if config_map[key] == 'None':
config_map[key] = None
setattr(self, key, config_map[key])
[docs]def run(data_config):
"""
Run method takes data_config
file as the input argument
"""
root = logging.getLogger()
if root.handlers:
for handler in root.handlers:
root.removeHandler(handler)
logging.basicConfig(filename=os.path.join(os.getcwd(), 'extract_data_logs.log'), filemode='w', level=logging.DEBUG,\
format="%(levelname)s %(asctime)s %(lineno)d %(message)s")
print("For any errors or messages check the log file - %s"\
% os.path.join(os.getcwd(), 'extract_data_logs.log'))
c = Configuration(yaml.safe_load(open(os.path.realpath(data_config), 'r')))
if c.scanParametersCSV is not None:
s_param_map = read_csv(c.scanParametersCSV)
else:
logging.debug("no scan parameters csv included\n"\
"make sure you turn off slice timing correction option\n"\
"in CPAC configuration\n")
s_param_map = None
generate_supplementary_files(c.outputSubjectListLocation, c.subjectListName)
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python extract_data.py data_config.yml")
sys.exit()
else:
run(sys.argv[1])