[docs]def load_pheno_file(pheno_file):
import os
import pandas as pd
if not os.path.isfile(pheno_file):
err = "\n\n[!] CPAC says: The group-level analysis phenotype file "\
"provided does not exist!\nPath provided: %s\n\n" \
% pheno_file
raise Exception(err)
with open(os.path.abspath(pheno_file),"r") as f:
pheno_dataframe = pd.read_csv(f)
return pheno_dataframe
[docs]def load_group_participant_list(group_participant_list_file):
import os
import pandas as pd
if not os.path.isfile(group_participant_list_file):
err = "\n\n[!] CPAC says: The group-level analysis subject list "\
"provided does not exist!\nPath provided: %s\n\n" \
% group_subject_list
raise Exception(err)
with open(group_participant_list_file,"r") as f:
group_subs_dataframe = pd.read_csv(f)
if "participant" not in ga_sublist.columns:
err = "\n\n[!] CPAC says: Your group-level analysis subject "\
"list CSV is missing a 'participant' column.\n\n"
raise Exception(err)
return group_subs_dataframe
[docs]def process_pheno_file(pheno_file_dataframe, group_subs_dataframe, \
participant_id_label):
# drops participants from the phenotype file if they are not in the group
# analysis participant list
# also handles sessions and series appropriately for repeated measures
# input
# pheno_file_dataframe: Pandas dataframe of the phenotype file
# group_subs_dataframe: Pandas dataframe of the group analysis
# participant list
# participant_id_label: string of the name of the participant column in
# the phenotype file
# output
# pheno_file_rows: a list of dictionaries, with each dictionary being
# one of the rows from the phenotype file, with the
# format of {header: value, header: value, ..}
import os
import pandas as pd
if not isinstance(pheno_file_dataframe, pd.DataFrame):
err = "\n\n[!] CPAC says: The phenotype information input should " \
"be a Python Pandas dataframe object.\n\n"
raise Exception(err)
if not isinstance(group_subs_dataframe, pd.DataFrame):
err = "\n\n[!] CPAC says: The group analysis participant list input "\
"should be a Python Pandas dataframe object.\n\n"
raise Exception(err)
if not isinstance(participant_id_label, str):
err = "\n\n[!] CPAC says: The participant ID label input should be " \
"a string.\n\n"
pheno = pheno_file_dataframe
pheno_file_rows = []
df_rows = []
# convert from dataframe to list and make those strings just in case
subjects = list(group_subs_dataframe.participant)
subjects = [str(i) for i in subjects]
sessions = None
series = None
if "session" in group_subs_dataframe.columns:
sessions = list(group_subs_dataframe.session)
sessions = [str(i) for i in sessions]
if "series" in group_subs_dataframe.columns:
series = list(group_subs_dataframe.series)
series = [str(i) for i in series]
# use an integer for iteration because we're not sure if there will be
# sessions and/or series
for i in range(0,len(subjects)):
full_id = []
subject = subjects[i]
full_id.append(subject)
if sessions and series:
session = sessions[i]
scan = series[i]
full_id.append(session)
full_id.append(scan)
try:
row = pheno[(pheno[participant_id_label] == subject) & \
(pheno.session == session) & \
(pheno.series == scan)]
except:
row = pheno[(pheno[participant_id_label] == int(subject)) & \
(pheno.session == session) & \
(pheno.series == scan)]
elif sessions:
session = sessions[i]
full_id.append(session)
try:
row = pheno[(pheno[participant_id_label] == subject) & \
(pheno.session == session)]
except:
row = pheno[(pheno[participant_id_label] == int(subject)) & \
(pheno.session == session)]
elif series:
scan = series[i]
full_id.append(scan)
try:
row = pheno[(pheno[participant_id_label] == subject) & \
(pheno.series == scan)]
except:
row = pheno[(pheno[participant_id_label] == int(subject)) & \
(pheno.series == scan)]
else:
full_id.append(subject)
try:
row = pheno[(pheno[participant_id_label] == subject)]
except:
row = pheno[(pheno[participant_id_label] == int(subject))]
if len(row) > 1:
err = "\n\n[!] CPAC says: Multiple phenotype entries were " \
"found for these criteria:\n\n%s\n\nPlease ensure " \
"your group analysis participant list and phenotype " \
"file are configured correctly.\n\n" % str(full_id)
raise Exception(err)
elif len(row) == 1:
df_rows.append(row)
new_pheno_df = pd.concat(df_rows)
pheno_file_rows = new_pheno_df.to_dict("records")
return pheno_file_rows
[docs]def create_pheno_dict(pheno_file_rows, ev_selections, participant_id_label):
# creates the phenotype data dictionary in a format Patsy requires,
# and also demeans the continuous EVs marked for demeaning
# input
# pheno_file_rows: a list of dictionaries, with each dictionary being
# one of the rows from the phenotype file, with the
# format of {header: value, header: value, ..}
# ev_selections: a dictionary with keys for "categorical" and "demean",
# with the entries being lists of phenotype EV names
# participant_id_label: string of the name of the participant column in
# the phenotype file
# output
# pheno_data_dict: a dictionary with each key being a phenotype column,
# and each entry being a list of values, IN ORDER
# data is also in Patsy-acceptable format
import os
import csv
import numpy as np
pheno_data_dict = {}
for line in pheno_file_rows:
for val in line.values():
# if there are any blank values in the pheno row, skip this
# row. if not, continue on with the "else" clause
if val == "":
break
else:
for key in line.keys():
# if there are blank entries because of an empty row in
# the CSV (such as ",,,,,"), move on to the next entry
#if len(line[key]) == 0:
# continue
if key not in pheno_data_dict.keys():
pheno_data_dict[key] = []
# create a list within one of the dictionary values for
# that EV if it is categorical; formats this list into a
# form Patsy can understand regarding categoricals:
# example: { ADHD: ['adhd1', 'adhd1', 'adhd0'] }
# instead of just [1, 1, 0], etc.
if 'categorical' in ev_selections.keys():
if key in ev_selections['categorical']:
pheno_data_dict[key].append(key + str(line[key]))
elif (key == subject_id_label) or (key == "session") or \
(key == "series"):
pheno_data_dict[key].append(line[key])
else:
pheno_data_dict[key].append(float(line[key]))
elif (key == subject_id_label) or (key == "session") or \
(key == "series"):
pheno_data_dict[key].append(line[key])
else:
pheno_data_dict[key].append(float(line[key]))
# this needs to run after each list in each key has been fully
# populated above
for key in pheno_data_dict.keys():
# demean the EVs marked for demeaning
if 'demean' in ev_selections.keys():
if key in ev_selections['demean']:
new_demeaned_evs = []
mean_evs = 0.0
# populate a dictionary, a key for each demeanable EV, with
# the value being the sum of all the values (which need to be
# converted to float first)
for val in pheno_data_dict[key]:
mean_evs += float(val)
# calculate the mean of the current EV in this loop
mean_evs = mean_evs / len(pheno_data_dict[key])
# remove the EV's mean from each value of this EV
# (demean it!)
for val in pheno_data_dict[key]:
new_demeaned_evs.append(float(val) - mean_evs)
# replace
pheno_data_dict[key] = new_demeaned_evs
# converts non-categorical EV lists into NumPy arrays
# so that Patsy may read them in properly
if 'categorical' in ev_selections.keys():
if key not in ev_selections['categorical']:
pheno_data_dict[key] = np.array(pheno_data_dict[key])
return pheno_data_dict
[docs]def get_measure_dict(param_file):
# load the CPAC-generated power parameters file and parse it
# input
# param_file: a full path to the CPAC-generated power parameters CSV
# output
# measure_dict: a dictionary of dictionaries in the following format
# {"MeanFD_Power": {"participant_01": 15.43,
# "participant_02": 13.22},
# "MeanFD_Jenkinson": {"participant_01": 18.55,
# "participant_02": 16.27},
# ...}
import os
import pandas as pd
if not os.path.isfile(param_file):
err = "\n\n[!] CPAC says: You've included a motion parameter in " \
"your group-level analysis model design formula, but " \
"there is no motion parameters file available.\n\n"
raise Exception(err)
with open(param_file,"r") as f:
motion_params = pd.read_csv(f, index_col=False)
measures = ['MeanFD_Power', 'MeanFD_Jenkinson', 'MeanDVARS']
measure_dict = {}
for m in measures:
measure_map = {}
if m in motion_params.columns:
part_ids = list(motion_params["Subject"])
part_ids = [str(i) for i in part_ids]
scan_ids = list(motion_params["Scan"])
scan_ids = [str(i) for i in scan_ids]
measure_vals = list(motion_params[m])
measure_vals = [float(i) for i in measure_vals]
for part_id, scan_id, measure_val in \
zip(part_ids, scan_ids, measure_vals):
measure_map[(part_id,scan_id)] = measure_val
measure_dict[m] = measure_map
return measure_dict
[docs]def get_custom_roi_info(roi_means_dict):
# check
if roi_means_dict == None:
err_string = "\n\n[!] CPAC says: The custom ROI means were not " \
"calculated properly during the group analysis " \
"model generation.\n\n"
raise Exception(err_string)
roi_num = len(roi_means_dict.values()[0])
# this will be a dictionary matching ROI regressor header labels with
# the actual ROI dictionaries
roi_dict_dict = {}
# split the roi_means_dict from { subID: [mean1,mean2,mean3,..], ..}
# to three dictionaries of { subID: mean1, .. }, { subID: mean2, .. },
# and so on
for num in range(0,roi_num):
label = "Custom_ROI_Mean_%d" % int(num+1)
temp_roi_dict = {}
for key in roi_means_dict.keys():
temp_roi_dict[key] = roi_means_dict[key][num-1]
roi_dict_dict[label] = temp_roi_dict
return roi_dict_dict
[docs]def model_group_var_separately(grouping_var, formula, pheno_data_dict, \
ev_selections, coding_scheme):
if grouping_var == None or grouping_var not in formula:
print('\n\n[!] CPAC says: Model group variances separately is ' \
'enabled, but the grouping variable set is either set to ' \
'None, or was not included in the model as one of the ' \
'EVs.\n')
print('Design formula: ', formula)
print('Grouping variable: ', grouping_var, '\n\n')
raise Exception
# do this a little early for the grouping variable so that it doesn't
# get in the way of doing this for the other EVs once they have the
# grouping variable in their names
if 'categorical' in ev_selections.keys():
for EV_name in ev_selections['categorical']:
if EV_name == grouping_var:
if coding_scheme == 'Treatment':
formula = formula.replace(EV_name, 'C(' + EV_name + ')')
elif coding_scheme == 'Sum':
formula = formula.replace(EV_name, 'C(' + EV_name + \
', Sum)')
groupvar_levels = []
grouping_var_id_dict = {}
idx = 0
for cat_ev_value in pheno_data_dict[grouping_var]:
# here, each "cat_ev_value" will be one of the Patsy-format values
# of the categorical EV that the user has selected as the grouping
# variable, i.e. "sex1, sex1, sex0, sex1", etc..
# cat_ev_level is the level digit or label without the EV name
# ex. sex1 becomes 1
cat_ev_level = str(cat_ev_value).replace(str(grouping_var), "")
if cat_ev_level not in groupvar_levels:
groupvar_levels.append(cat_ev_level)
# groupvar_levels only keeps track of how many levels there are in
# the grouping variable
# populate this dict for creating the .grp file:
try:
grouping_var_id_dict[cat_ev_level].append(idx)
except:
grouping_var_id_dict[cat_ev_level] = [idx]
idx += 1
split_EVs = {}
for key in pheno_data_dict.keys():
# here, "key" is the name of each EV from the phenotype file, as
# they are labeled in the phenotype file (not Patsy format)
if (key in formula) and (key != grouping_var):
# for the formula edit
new_key_string = ""
for level in groupvar_levels:
# for the new split EV label
groupvar_with_level = str(grouping_var) + str(level)
new_key = key + "__" + groupvar_with_level
# for the formula edit
if new_key_string == "":
new_key_string = new_key
else:
new_key_string = new_key_string + " + " + new_key
split_EVs[new_key] = []
# for the formula as well
if key in ev_selections["categorical"]:
ev_selections["categorical"].append(new_key)
for val, groupvar_val in zip(pheno_data_dict[key], \
pheno_data_dict[grouping_var]):
if groupvar_with_level == groupvar_val:
split_EVs[new_key].append(val)
else:
split_EVs[new_key].append(0)
del pheno_data_dict[key]
if key in ev_selections["categorical"]:
ev_selections["categorical"].remove(key)
# formula edit
formula = formula.replace(key, new_key_string)
# put split EVs into pheno data dict
pheno_data_dict.update(split_EVs)
# parse through ev_selections, find the categorical names within the
# design formula and insert C(<name>, Sum) into the design formula
# this is required for Patsy to process the categorical EVs
# properly when generating the design matrix (this goes into the
# .mat file)
if 'categorical' in ev_selections.keys():
for EV_name in ev_selections['categorical']:
if EV_name != grouping_var:
if coding_scheme == 'Treatment':
formula = formula.replace(EV_name, 'C(' + EV_name + ')')
elif coding_scheme == 'Sum':
formula = formula.replace(EV_name, 'C(' + EV_name + \
', Sum)')
# remove intercept when modeling group variances separately
formula = formula + " - 1"
return pheno_data_dict, formula, grouping_var_id_dict
[docs]def check_multicollinearity(matrix):
import numpy as np
print("\nChecking for multicollinearity in the model..")
U, s, V = np.linalg.svd(matrix)
max_singular = np.max(s)
min_singular = np.min(s)
print("Max singular: ", max_singular)
print("Min singular: ", min_singular)
print("Rank: ", np.linalg.matrix_rank(matrix), "\n")
if min_singular == 0:
print('[!] CPAC warns: Detected multicollinearity in the ' \
'computed group-level analysis model. Please double-' \
'check your model design.\n\n')
else:
condition_number = float(max_singular)/float(min_singular)
print("Condition number: %f\n\n" % condition_number)
if condition_number > 30:
print('[!] CPAC warns: Detected multicollinearity in the ' \
'computed group-level analysis model. Please double-' \
'check your model design.\n\n')
[docs]def write_mat_file(design_matrix, output_dir, model_name, \
depatsified_EV_names, current_output=None):
import os
import numpy as np
dimx = None
dimy = None
if len(design_matrix.shape) == 1:
dimy = 1
dimx = design_matrix.shape[0]
else:
dimx, dimy = design_matrix.shape
ppstring = '/PPheights'
for i in range(0, dimy):
ppstring += '\t' + '%1.5e' %(1.0)
ppstring += '\n'
filename = model_name + ".mat"
out_file = os.path.join(output_dir, filename)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(out_file, 'wt') as f:
print('/NumWaves\t%d' %dimy, file=f)
print('/NumPoints\t%d' %dimx, file=f)
print(ppstring, file=f)
# print labels for the columns - mainly for double-checking your model
col_string = '\n'
for col in depatsified_EV_names:
col_string = col_string + col + '\t'
print(col_string, '\n', file=f)
print('/Matrix', file=f)
np.savetxt(f, design_matrix, fmt='%1.5e', delimiter='\t')
[docs]def create_grp_file(design_matrix, grouping_var_id_dict, output_dir, \
model_name, current_output=None):
import os
import numpy as np
dimx = None
dimy = None
if len(design_matrix.shape) == 1:
dimy = 1
dimx = design_matrix.shape[0]
else:
dimx, dimy = design_matrix.shape
design_matrix_ones = np.ones(dimx)
if not (grouping_var_id_dict == None):
i = 1
for key in sorted(grouping_var_id_dict.keys()):
for index in grouping_var_id_dict[key]:
design_matrix_ones[index] = i
i += 1
filename = model_name + ".grp"
out_file = os.path.join(output_dir, filename)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(out_file, "wt") as f:
print('/NumWaves\t1', file=f)
print('/NumPoints\t%d\n' %dimx, file=f)
print('/Matrix', file=f)
np.savetxt(f, design_matrix_ones, fmt='%d', delimiter='\t')
[docs]def create_design_matrix(pheno_file, ev_selections, formula, \
subject_id_label, sub_list=None, \
coding_scheme="Treatment", grouping_var=None, \
new_regressor_dict=None, roi_means_dict=None, \
output_dir=None, model_name="design", \
current_output=None):
# this should allow the user to easily create a FLAMEO-formatted .mat file
# and .grp file from the command line or from within CPAC
# input
# pheno_file: full path to a CSV file with the phenotypic data
#
# ev_selections: a Python dictionary of two lists denoting which EVs are
# categorical, and which should be demeaned
# format - {"categorical": ["dx_group", "sex"],
# "demean": ["age"]}
#
# formula: a string with the Patsy-format design matrix formula
# more info here:
# http://patsy.readthedocs.org/en/latest/formulas.html
#
# subject_id_label: a string denoting the header label of the subject ID
# column in the phenotype CSV file
#
# sub_list: (optional) full path to a CSV file containing the
# participant IDs, and optionally session and/or series IDs
# that you want included from the phenotypic file
# NOTE: if not provided, all rows from phenotypic are included
# in the model
#
# coding_scheme: (optional) which encoding scheme Patsy should use when
# creating the design matrix - "Treatment" (default) or
# "Sum"
#
# grouping_var: (optional) the grouping variable to use if modeling
# group variances separately
#
# new_regressor_dict: (optional) a Python dictionary containing other
# dictionaries of subject IDs matched to the values
# of each new regressor
# format - {"MeanFD": {"sub001": 0.493,
# "sub002": 0.211,},
# "Measure_Mean": {"sub001": 0.193,
# "sub002": 0.392},
# ..}
#
# roi_means_dict: (optional) a Python dictionary of lists containing the
# mean values of user-defined ROIs of the derivative for
# each subject
# format - {"sub001": [3.23, 2.11],
# "sub002": [1.79, 3.03]}
# (with the length of the lists being the number of
# ROIs specified)
#
# output_dir: (optional) where to write the .mat file
#
# model_name: (optional) name of the group analysis model
#
# current_output: (optional) name of the derivative in the analysis
#
# output
# dmatrix: a Patsy object of the design matrix
# depatsified_EV_names: a list of the column names of the design matrix
import os
import patsy
import numpy as np
# if running this script alone outside of CPAC
if output_dir == None:
output_dir = os.getcwd()
# let's process the phenotype file data and drop rows (participants) if
# they are not listed in the participant list
pheno_file_df = load_pheno_file(pheno_file)
participant_list_df = load_group_participant_list(sub_list)
pheno_file_rows = process_pheno_file(pheno_file_df, participant_list_df, \
subject_id_label)
# get number of subjects that have the derivative for this current model
# (basically, the amount of time points, which must be greater than the
# number of EVs)
num_subjects = len(participant_list_df)
# for repeated measures
if "session" in participant_list_df.columns:
ev_selections["categorical"].append("session")
if "series" in participant_list_df.columns:
ev_selections["categorical"].append("series")
# start adding additionally created EVs
if new_regressor_dict:
for measure in new_regressor_dict.keys():
if (measure in formula):
measure_dict = new_regressor_dict[measure]
for pheno_row_dict in pheno_file_rows:
participant_id = pheno_row_dict[subject_id_label]
if ("session" in pheno_row_dict.keys()) and \
("series" in pheno_row_dict.keys()):
session_id = pheno_row_dict["session"]
series_id = pheno_row_dict["series"]
participant_tuple = \
(participant_id, session_id, series_id)
elif "session" in pheno_row_dict.keys():
session_id = pheno_row_dict["session"]
participant_tuple = (participant_id, session_id)
elif "series" in pheno_row_dict.keys():
series_id = pheno_row_dict["series"]
participant_tuple = (participant_id, series_id)
else:
participant_tuple = (participant_id)
pheno_row_dict[measure] = measure_dict[participant_tuple]
ev_selections["demean"].append(measure)
if "Custom_ROI_Mean" in formula:
# include the means of the specified ROIs as regressors
if roi_means_dict == None:
err = "\n\n[!] You included 'Custom_ROI_Mean' in your model " \
"design, but there are no mean of ROI values provided." \
"\n\n"
raise Exception(err)
# roi_dict_dict is a dictionary of dictionaries, with each dictionary
# holding all of the means for one ROI, with each entry being a mean
# for a participant (the keys are the participant IDs)
# ex. {participant_01: 35.15, participant_02: 50.00}
# with the float values being all of the means of one of
# the ROIs specified
# there will be a dictionary for each ROI specified
roi_dict_dict = get_custom_roi_info(roi_means_dict)
add_formula_string = ""
for roi_column in roi_dict_dict.keys():
roi_dict = roi_dict_dict[roi_column]
for pheno_row_dict in pheno_file_rows:
participant_id = pheno_row_dict[subject_id_label]
if ("session" in pheno_row_dict.keys()) and \
("series" in pheno_row_dict.keys()):
session_id = pheno_row_dict["session"]
series_id = pheno_row_dict["series"]
participant_tuple = \
(participant_id, session_id, series_id)
elif "session" in pheno_row_dict.keys():
session_id = pheno_row_dict["session"]
participant_tuple = (participant_id, session_id)
elif "series" in pheno_row_dict.keys():
series_id = pheno_row_dict["series"]
participant_tuple = (participant_id, series_id)
else:
participant_tuple = (participant_id)
pheno_row_dict[roi_column] = roi_dict[participant_tuple]
ev_selections["demean"].append(roi_column)
# create a string of all the new custom ROI regressor column names
# to be inserted into the design formula, so that Patsy will
# accept the phenotypic data dictionary that now has these columns
if add_formula_string == "":
add_formula_string = add_formula_string + roi_column
else:
add_formula_string = add_formula_string + " + " + roi_column
# a regressor column of ROI means for each custom-specified ROI has
# now been added to the model with appropriate column labels
formula = formula.replace("Custom_ROI_Mean",add_formula_string)
# return the data from the phenotype file processed properly for Patsy
# and load it into 'pheno_data_dict'
# format: dictionary, each key is the name of an EV, and its value is
# a LIST of values in order of the subjects
# - categorical EVs are already renamed from '0,1,..' to
# 'EV0,EV1,..' with EV being the EV name
# - EVs to be demeaned are already demeaned
# - numerical EVs (non-categorical) are in a list which
# have been converted into a NumPy array
pheno_data_dict = create_pheno_dict(pheno_file_rows, ev_selections, \
subject_id_label)
# handle modeling group variances separately (if enabled), then edit the
# formula to be in Patsy language
if grouping_var != None:
pheno_data_dict, formula, grouping_var_id_dict = \
model_group_var_separately(grouping_var, \
formula, pheno_data_dict, \
ev_selections, coding_scheme)
else:
grouping_var_id_dict = None
if 'categorical' in ev_selections.keys():
for EV_name in ev_selections['categorical']:
if coding_scheme == 'Treatment':
formula = formula.replace(EV_name, 'C(' + EV_name + ')')
elif coding_scheme == 'Sum':
formula = formula.replace(EV_name, 'C(' + EV_name + \
', Sum)')
# create the Patsy design matrix!
try:
dmatrix = patsy.dmatrix(formula, pheno_data_dict, NA_action='raise')
except:
print('\n\n[!] CPAC says: Design matrix creation wasn\'t ' \
'successful - do the terms in your formula correctly ' \
'correspond to the EVs listed in your phenotype file?\n')
print('Phenotype file provided: ')
print(pheno_file, '\n')
print("Phenotypic data columns (regressors): ", list(pheno_data_dict.keys()))
print("Formula: %s\n\n" % formula)
raise Exception
# check the model for multicollinearity - Patsy takes care of this, but
# just in case
check_multicollinearity(np.array(dmatrix))
# prepare for final stages
design_matrix = np.array(dmatrix, dtype=np.float16)
column_names = dmatrix.design_info.column_names
# check to make sure there are more time points than EVs!
if len(column_names) >= num_subjects:
err = "\n\n[!] CPAC says: There are more EVs than there are " \
"subjects currently included in the model for %s. There must " \
"be more subjects than EVs in the design.\n\nNumber of " \
"subjects: %d\nNumber of EVs: %d\n\nNote: An 'Intercept' " \
"column gets added to the design as an EV, so there will be " \
"one more EV than you may have specified in your design. In " \
"addition, if you specified to model group variances " \
"separately, an Intercept column will not be included, but " \
"the amount of EVs can nearly double once they are split " \
"along the grouping variable.\n\n" \
"If the number of subjects is lower than the number of " \
"subjects in your group analysis subject list, this may be " \
"because not every subject in the subject list has an output " \
"for %s in the individual-level analysis output directory.\n\n"\
% (current_output, num_subjects, len(column_names), \
current_output)
raise Exception(err)
# remove the header formatting Patsy creates for categorical variables
# because we are going to use depatsified_EV_names in the "Available EVs
# for Contrasts" list on the next page, and also to test user-made custom
# contrast files
depatsified_EV_names = []
for column in column_names:
# if using Sum encoding, a column name may look like this:
# C(adhd, Sum)[S.adhd0]
# this loop leaves it with only "adhd0" in this case, for the
# contrasts list for the next GUI page
column_string = column
string_for_removal = ''
for char in column_string:
string_for_removal = string_for_removal + char
if char == '.':
column_string = column_string.replace(string_for_removal, '')
string_for_removal = ''
column_string = column_string.replace(']', '')
depatsified_EV_names.append(column_string)
# write the .mat file finally
write_mat_file(design_matrix, output_dir, model_name, \
depatsified_EV_names, current_output)
# write the .grp file also
create_grp_file(design_matrix, grouping_var_id_dict, output_dir, \
model_name, current_output)
# return the PATSY OBJECT of dmatrix, not the Numpy array "design_matrix"
return dmatrix, depatsified_EV_names
[docs]def positive(dmat, a, coding, group_sep, grouping_var):
import numpy as np
# this is also where the "Intercept" column gets introduced into
# the contrasts columns, for when the user uses the model builder's
# contrast builder
evs = dmat.design_info.column_name_indexes
con = np.zeros(dmat.shape[1])
if group_sep == True:
if "__" in a and grouping_var in a:
ev_desc = a.split("__")
for ev in evs:
count = 0
for desc in ev_desc:
if desc in ev:
count += 1
if count == len(ev_desc):
con[evs[ev]] = 1
break
else:
# it is a dropped term so make all other terms in that
# category at -1
term = a.split('[')[0]
for ev in evs:
if ev.startswith(term):
con[evs[ev]]= -1
elif len(a.split(grouping_var)) > 2:
# this is if the current parsed contrast is the actual
# grouping variable, as the Patsified name will have the
# variable's name string in it twice
for ev in evs:
if a.split(".")[1] in ev:
con[evs[ev]] = 1
break
else:
# it is a dropped term so make all other terms in that
# category at -1
term = a.split('[')[0]
for ev in evs:
if ev.startswith(term):
con[evs[ev]]= -1
# else not modeling group variances separately
else:
if a in evs:
con[evs[a]] = 1
else:
# it is a dropped term so make all other terms in that category
# at -1
term = a.split('[')[0]
for ev in evs:
if ev.startswith(term):
con[evs[ev]]= -1
if coding == "Treatment":
# make Intercept 0
con[0] = 0
elif coding == "Sum":
# make Intercept 1
con[1] = 1
return con
[docs]def greater_than(dmat, a, b, coding, group_sep, grouping_var):
c1 = positive(dmat, a, coding, group_sep, grouping_var)
c2 = positive(dmat, b, coding, group_sep, grouping_var)
return c1-c2
[docs]def negative(dmat, a, coding, group_sep, grouping_var):
con = 0-positive(dmat, a, coding, group_sep, grouping_var)
return con
[docs]def create_dummy_string(length):
ppstring = ""
for i in range(0, length):
ppstring += '\t' + '%1.5e' %(1.0)
ppstring += '\n'
return ppstring
[docs]def create_con_file(con_dict, col_names, file_name, current_output, out_dir):
import os
print("col names: ")
print(col_names)
with open(os.path.join(out_dir, file_name) + ".con",'w+') as f:
# write header
num = 1
for key in con_dict:
f.write("/ContrastName%s\t%s\n" %(num,key))
num += 1
f.write("/NumWaves\t%d\n" %len(con_dict[key]))
f.write("/NumContrasts\t%d\n" %len(con_dict))
f.write("/PPheights%s" %create_dummy_string(len(con_dict[key])))
f.write("/RequiredEffect%s" %create_dummy_string(len(con_dict[key])))
f.write("\n\n")
# print labels for the columns - mainly for double-checking your
# model
col_string = '\n'
for col in col_names:
col_string = col_string + col + '\t'
print(col_string, '\n', file=f)
# write data
f.write("/Matrix\n")
for key in con_dict:
for v in con_dict[key]:
f.write("%1.5e\t" %v)
f.write("\n")
[docs]def create_fts_file(ftest_list, con_dict, model_name, current_output,
out_dir):
import os
import numpy as np
try:
print("\nFound f-tests in your model, writing f-tests file " \
"(.fts)..\n")
with open(os.path.join(out_dir, model_name + '.fts'), 'w') as f:
print('/NumWaves\t', len(con_dict), file=f)
print('/NumContrasts\t', len(ftest_list), file=f)
# process each f-test
ftst = []
for ftest_string in ftest_list:
ftest_vector = []
cons_in_ftest = ftest_string.split(",")
for con in con_dict.keys():
if con in cons_in_ftest:
ftest_vector.append(1)
else:
ftest_vector.append(0)
ftst.append(ftest_vector)
fts_n = np.array(ftst)
# print labels for the columns - mainly for double-checking your
# model
col_string = '\n'
for con in con_dict.keys():
col_string = col_string + con + '\t'
print(col_string, '\n', file=f)
print('/Matrix', file=f)
for i in range(fts_n.shape[0]):
print(' '.join(fts_n[i].astype('str')), file=f)
except Exception as e:
filepath = os.path.join(out_dir, "model_files", current_output, \
model_name + '.fts')
errmsg = "\n\n[!] CPAC says: Could not create .fts file for " \
"FLAMEO or write it to disk.\nAttempted filepath: %s\n" \
"Error details: %s\n\n" % (filepath, e)
raise Exception(errmsg)
[docs]def create_con_ftst_file(con_file, model_name, current_output, output_dir,
column_names, coding_scheme, group_sep):
"""
Create the contrasts and fts file
"""
import os
import numpy as np
with open(con_file, "r") as f:
evs = f.readline()
evs = evs.rstrip('\r\n').split(',')
count_ftests = 0
# TODO: this needs to be re-visited, but I think this was originally added
# TODO: to counteract the fact that if someone was making a custom
# TODO: contrasts matrix CSV, they wouldn't know that the design matrix
# TODO: would have the Intercept added to it? but what if it wasn't?
# TODO: comment out for now... but test
# remove "Contrasts" label and replace it with "Intercept"
#evs[0] = "Intercept"
fTest = False
print("evs: ")
print(evs)
for ev in evs:
if "f_test" in ev:
count_ftests += 1
if count_ftests > 0:
fTest = True
try:
data = np.genfromtxt(con_file, names=True, delimiter=',', dtype=None)
except:
print("Error: Could not successfully read in contrast file: ",con_file)
raise Exception
lst = data.tolist()
ftst = []
fts_columns = []
contrasts = []
contrast_names = []
length = None
length = len(list(lst[0]))
# lst = list of tuples, "tp"
# tp = tuple in the format (contrast_name, 0, 0, 0, 0, ...)
# with the zeroes being the vector of contrasts for that contrast
for tp in lst:
contrast_names.append(tp[0])
# create a list of integers that is the vector for the contrast
# ex. [0, 1, 1, 0, ..]
con_vector = list(tp)[1:(length-count_ftests)]
fts_vector = list(tp)[(length-count_ftests):length]
fts_columns.append(fts_vector)
# TODO: see note about Intercept above
# add Intercept column
# if not group_sep:
# if coding_scheme == "Treatment":
# con_vector.insert(0, 0)
# elif coding_scheme == "Sum":
# con_vector.insert(0, 1)
contrasts.append(con_vector)
# contrast_names = list of the names of the contrasts (not regressors)
# contrasts = list of lists with the contrast vectors
num_EVs_in_con_file = len(contrasts[0])
contrasts = np.array(contrasts, dtype=np.float16)
fts_columns = np.array(fts_columns)
# if there are f-tests, create the array for them
if fTest:
if len(contrast_names) < 2:
errmsg = "\n\n[!] CPAC says: Not enough contrasts for running " \
"f-tests.\nTip: Do you have only one contrast in your " \
"contrasts file? f-tests require more than one contrast.\n"\
"Either remove the f-tests or include more contrasts.\n\n"
raise Exception(errmsg)
fts_n = fts_columns.T
if len(column_names) != (num_EVs_in_con_file):
err_string = "\n\n[!] CPAC says: The number of EVs in your model " \
"design matrix (found in the %s.mat file) does not " \
"match the number of EVs (columns) in your custom " \
"contrasts matrix CSV file.\n\nCustom contrasts matrix "\
"file: %s\n\nNumber of EVs in design matrix: %d\n" \
"Number of EVs in contrasts file: %d\n\nThe column " \
"labels in the design matrix should match those in " \
"your contrasts .CSV file.\nColumn labels in design " \
"matrix:\n%s" % (model_name, con_file, \
len(column_names), num_EVs_in_con_file, \
str(column_names))
raise Exception(err_string)
for design_mat_col, con_csv_col in zip(column_names, evs[1:]):
if con_csv_col not in design_mat_col:
errmsg = "\n\n[!] CPAC says: The names of the EVs in your " \
"custom contrasts .csv file do not match the names or " \
"order of the EVs in the design matrix. Please make " \
"sure these are consistent.\nDesign matrix EV columns: "\
"%s\nYour contrasts matrix columns: %s\n\n" \
% (column_names, evs[1:])
raise Exception(errmsg)
out_dir = os.path.join(output_dir, model_name + '.con')
with open(out_dir,"wt") as f:
idx = 1
pp_str = '/PPheights'
re_str = '/RequiredEffect'
for name in contrast_names:
print('/ContrastName%d' %idx, '\t', name, file=f)
pp_str += '\t%1.5e' %(1)
re_str += '\t%1.5e' %(1)
idx += 1
print('/NumWaves\t', (contrasts.shape)[1], file=f)
print('/NumContrasts\t', (contrasts.shape)[0], file=f)
print(pp_str, file=f)
print(re_str + '\n', file=f)
# print labels for the columns - mainly for double-checking your model
col_string = '\n'
for ev in evs:
col_string = col_string + ev + '\t'
print(col_string, '\n', file=f)
print('/Matrix', file=f)
np.savetxt(f, contrasts, fmt='%1.5e', delimiter='\t')
if fTest:
print("\nFound f-tests in your model, writing f-tests file (.fts)..\n")
ftest_out_dir = os.path.join(output_dir, model_name + '.fts')
with open(ftest_out_dir,"wt") as f:
print('/NumWaves\t', (contrasts.shape)[0], file=f)
print('/NumContrasts\t', count_ftests, file=f)
# print labels for the columns - mainly for double-checking your
# model
col_string = '\n'
for con in contrast_names:
col_string = col_string + con + '\t'
print(col_string, '\n', file=f)
print('/Matrix', file=f)
for i in range(fts_n.shape[0]):
print(' '.join(fts_n[i].astype('str')), file=f)
[docs]def process_contrast(parsed_contrast, operator, ev_selections, group_sep, \
grouping_var, coding_scheme):
# take the contrast strings and process them appropriately
# extract the two separate contrasts (if there are two), and then
# identify which are categorical - adapting the string if so
parsed_EVs_in_contrast = []
EVs_in_contrast = parsed_contrast.split(operator)
if '' in EVs_in_contrast:
EVs_in_contrast.remove('')
for EV in EVs_in_contrast:
skip = 0
# they need to be put back into Patsy formatted header titles
# because the dmatrix gets passed into the function that writes
# out the contrast matrix
if 'categorical' in ev_selections.keys():
for cat_EV in ev_selections['categorical']:
# second half of this if clause is in case group variances
# are being modeled separately, and we don't want the EV
# that is the grouping variable (which is now present in
# other EV names) to confound this operation
if group_sep == True:
gpvar = grouping_var
else:
gpvar = "..."
if (cat_EV in EV) and not (gpvar in EV and \
"__" in EV):
# handle interactions
if ":" in EV:
temp_split_EV = EV.split(":")
for interaction_EV in temp_split_EV:
if cat_EV in interaction_EV:
current_EV = interaction_EV
else:
current_EV = EV
if coding_scheme == 'Treatment':
cat_EV_contrast = EV.replace(EV, 'C(' + cat_EV + \
')[T.' + current_EV+\
']')
elif coding_scheme == 'Sum':
cat_EV_contrast = EV.replace(EV, 'C(' + cat_EV + \
', Sum)[S.' + \
current_EV + ']')
parsed_EVs_in_contrast.append(cat_EV_contrast)
skip = 1
if skip == 0:
parsed_EVs_in_contrast.append(EV)
# handle interactions
if ":" in EV and len(parsed_EVs_in_contrast) == 2:
parsed_EVs_in_contrast = [parsed_EVs_in_contrast[0] + ":" + \
parsed_EVs_in_contrast[1]]
if ":" in EV and len(parsed_EVs_in_contrast) == 3:
parsed_EVs_in_contrast = [parsed_EVs_in_contrast[0], \
parsed_EVs_in_contrast[1] + ":" + \
parsed_EVs_in_contrast[2]]
return parsed_EVs_in_contrast
[docs]def run(group_config, current_output, param_file=None, \
derivative_means_dict=None, roi_means_dict=None, \
model_out_dir=None, CPAC_run=True):
import os
import csv
import numpy as np
# open the GROUP ANALYSIS FSL .YML CONFIG FILE, not the main pipeline
# config .yml file!
if CPAC_run:
c = group_config
else:
try:
c = Configuration(yaml.safe_load(open(os.path.realpath(group_config), \
'r')))
except:
raise Exception("Error in reading %s configuration file" \
% group_config)
# pull in the gpa settings!
ph = c.pheno_file
sublist = c.subject_list
ev_selections = c.ev_selections
subject_id_label = c.subject_id_label
formula = c.design_formula
coding_scheme = c.coding_scheme[0]
group_sep = c.group_sep
grouping_var = c.grouping_var
contrasts = c.contrasts
f_tests = c.f_tests
custom_contrasts = c.custom_contrasts
model_name = c.model_name
output_dir = c.output_dir
# make sure the group analysis output directory exists
try:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
except:
print('\n\n[!] CPAC says: Could not successfully create the group ' \
'analysis output directory:\n', output_dir, '\n\nMake sure ' \
'you have write access in this file structure.\n\n\n')
raise Exception
measure_dict = {}
# extract motion measures from CPAC-generated power params file
if param_file != None:
measure_dict = get_measure_dict(param_file)
# combine the motion measures dictionary with the measure_mean
# dictionary (if it exists)
if derivative_means_dict:
measure_dict["Measure_Mean"] = derivative_means_dict
# create the .mat and .grp files for FLAME
design_matrix, regressor_names = create_design_matrix(ph, sublist, \
ev_selections, formula, \
subject_id_label, coding_scheme, \
grouping_var, measure_dict, \
roi_means_dict, model_out_dir, \
model_name, current_output)
# Create contrasts_dict dictionary for the .con file generation later
contrasts_list = contrasts
contrasts_dict = {}
for contrast in contrasts_list:
# each 'contrast' is a string the user input of the desired contrast
# remove all spaces
parsed_contrast = contrast.replace(' ', '')
EVs_in_contrast = []
parsed_EVs_in_contrast = []
if '>' in parsed_contrast:
parsed_EVs_in_contrast = \
process_contrast(parsed_contrast, '>', ev_selections, \
group_sep, grouping_var, coding_scheme)
contrasts_dict[parsed_contrast] = \
greater_than(design_matrix, parsed_EVs_in_contrast[0], \
parsed_EVs_in_contrast[1], coding_scheme, \
group_sep, grouping_var)
elif '<' in parsed_contrast:
parsed_EVs_in_contrast = \
process_contrast(parsed_contrast, '<', ev_selections, \
group_sep, grouping_var, coding_scheme)
contrasts_dict[parsed_contrast] = \
greater_than(design_matrix, parsed_EVs_in_contrast[1], \
parsed_EVs_in_contrast[0], coding_scheme, \
group_sep, grouping_var)
else:
contrast_string = parsed_contrast.replace('+',',+,')
contrast_string = contrast_string.replace('-',',-,')
contrast_items = contrast_string.split(',')
if '' in contrast_items:
contrast_items.remove('')
if '+' in contrast_items and len(contrast_items) == 2:
parsed_EVs_in_contrast = \
process_contrast(parsed_contrast, '+', ev_selections, \
group_sep, grouping_var, coding_scheme)
contrasts_dict[parsed_contrast] = \
positive(design_matrix, parsed_EVs_in_contrast[0], \
coding_scheme, group_sep, grouping_var)
elif '-' in contrast_items and len(contrast_items) == 2:
parsed_EVs_in_contrast = \
process_contrast(parsed_contrast, '-', ev_selections, \
group_sep, grouping_var, coding_scheme)
contrasts_dict[parsed_contrast] = \
negative(design_matrix, parsed_EVs_in_contrast[0], \
coding_scheme, group_sep, grouping_var)
if len(contrast_items) > 2:
idx = 0
for item in contrast_items:
# they need to be put back into Patsy formatted header
# titles because the dmatrix gets passed into the function
# that writes out the contrast matrix
if 'categorical' in ev_selections.keys():
for cat_EV in ev_selections['categorical']:
if cat_EV in item:
if coding_scheme == 'Treatment':
item = item.replace(item, \
'C(' + cat_EV + ')[T.' + item + ']')
elif coding_scheme == 'Sum':
item = item.replace(item, \
'C(' + cat_EV + ', Sum)[S.' + item + ']')
if idx == 0:
if item != '+' and item != '-':
contrast_vector = positive(dmatrix, item)
if parsed_contrast not in contrasts_dict.keys():
contrasts_dict[parsed_contrast] = contrast_vector
else:
contrasts_dict[parsed_contrast] += contrast_vector
elif idx != 0:
if item != '+' and item != '-':
if contrast_items[idx-1] == '+':
contrast_vector = positive(dmatrix, item, \
coding_scheme, group_sep,\
grouping_var)
if parsed_contrast not in contrasts_dict.keys():
contrasts_dict[parsed_contrast] = contrast_vector
else:
contrasts_dict[parsed_contrast] += contrast_vector
if contrast_items[idx-1] == '-':
contrast_vector = negative(dmatrix, item, \
coding_scheme, group_sep,\
grouping_var)
if parsed_contrast not in contrasts_dict.keys():
contrasts_dict[parsed_contrast] = contrast_vector
else:
contrasts_dict[parsed_contrast] += contrast_vector
idx += 1
# finally write out the .con file and .fts file (if f-tests)
if (custom_contrasts == None) or (custom_contrasts == '') or \
("None" in custom_contrasts):
print("Writing contrasts file (.con) based on contrasts provided " \
"using the group analysis model builder's contrasts editor..")
create_con_file(contrasts_dict, regressor_names, model_name, \
current_output, model_out_dir)
if f_tests:
create_fts_file(f_tests, contrasts_dict, model_name, \
current_output, model_out_dir)
else:
print("\nWriting contrasts file (.con) based on contrasts provided " \
"with a custom contrasts matrix CSV file..\n")
create_con_ftst_file(custom_contrasts, model_name, current_output, \
model_out_dir, regressor_names, \
coding_scheme, group_sep)