Source code for CPAC.sca.utils

import os
from CPAC.pipeline import nipype_pipeline_engine as pe
import nipype.interfaces.utility as util


[docs]def compute_fisher_z_score(correlation_file, timeseries_one_d): """ Computes the fisher z transform of the input correlation map If the correlation map contains data for multiple ROIs then the function returns z score for each ROI as a seperate nifti file Parameters ---------- correlation_file : string Input correlations file Returns ------- out_file : list (nifti files) list of z_scores for mask or ROI """ import nibabel as nb import numpy as np import os roi_numbers = [] if '#' in open(timeseries_one_d, 'r').readline().rstrip('\r\n'): roi_numbers = open(timeseries_one_d, 'r').readline().rstrip('\r\n').replace('#', '').split('\t') corr_img = nb.load(correlation_file) corr_data = corr_img.get_fdata() hdr = corr_img.header corr_data = np.log((1 + corr_data) / (1 - corr_data)) / 2.0 dims = corr_data.shape out_file = [] if len(dims) == 5 or len(roi_numbers) > 0: if len(dims) == 5: x, y, z, one, roi_number = dims corr_data = np.reshape(corr_data, (x * y * z, roi_number), order='F') for i in range(0, len(roi_numbers)): sub_data = corr_data if len(dims) == 5: sub_data = np.reshape(corr_data[:, i], (x, y, z), order='F') sub_img = nb.Nifti1Image(sub_data, header=corr_img.header, affine=corr_img.affine) sub_z_score_file = os.path.join(os.getcwd(), 'z_score_ROI_number_%s.nii.gz' % (roi_numbers[i])) sub_img.to_filename(sub_z_score_file) out_file.append(sub_z_score_file) else: z_score_img = nb.Nifti1Image(corr_data, header=hdr, affine=corr_img.affine) z_score_file = os.path.join(os.getcwd(), 'z_score.nii.gz') z_score_img.to_filename(z_score_file) out_file.append(z_score_file) return out_file
def check_ts(in_file): import os import numpy as np if in_file.endswith('.txt'): try: timepoints, rois = np.loadtxt(in_file).shape except ValueError: timepoints = np.loadtxt(in_file).shape[0] rois = 1 out_file = in_file elif in_file.endswith('.csv') or in_file.endswith('.1D'): csv_array = np.genfromtxt(in_file, delimiter=',') if np.isnan(csv_array[0][0]): csv_array = csv_array[1:] timepoints, rois = csv_array.shape # multiple regression (fsl_glm) needs this format for -design input out_file = os.path.join(os.getcwd(), os.path.basename(in_file).replace('.csv', '.txt')) np.savetxt(out_file, csv_array, delimiter='\t') if rois > timepoints: message = ('\n\n\n****The number of timepoints (' + str(timepoints) + ') is smaller than the number of ROIs to run (' + str(rois) + ') - therefore the GLM is' + ' underspecified and can\'t run.****\n\n\n') print(message) raise Exception(message) else: return out_file
[docs]def map_to_roi(timeseries, maps): """ Renames the outputs of the temporal multiple regression workflow for sca according to the header information of the timeseries.txt file that was passed NOTE: This is only run if the temporal regression is run as part of sca (which = 'RT') when calling the temporal regression workflow. If you run the temporal regression workflow manually, don\'t set (which = 'RT') unless you provide a timeseries.txt file with a header containing the names of the timeseries Parameters ---------- timeseries : string Input timeseries.txt file maps : List (nifti files) List of output files generated by the temporal regression workflow if (which == 'RT') Returns ------- labels : List (strings) List of names that the output files should be renamed to maps : List (nifti files) List of output files generated by the temporal regression workflow if (which == 'RT') """ import numpy as np import pandas as pd from nipype import logging logger = logging.getLogger('nipype.workflow') testMat = pd.read_csv(timeseries) timepoints, rois = testMat.shape if rois > timepoints: logger.warning('The number of timepoints is smaller than the number ' 'of ROIs to run - therefore the GLM is ' 'underspecified and can\'t run.') # pull in the ROI labels from the header of the extracted timeseries # CSV file with open(timeseries, "r") as f: roi_file_lines = f.read().splitlines() roi_err = "\n\n[!] The output of 3dROIstats, used in extracting " \ "the timeseries, was not in the expected format.\n\nROI " \ "output file: {0}\n\n".format(timeseries) for line in roi_file_lines: if "Mean_" in line: try: roi_list = line.split(",") # clear out any blank strings/non ROI labels in the list roi_list = [x for x in roi_list if "Mean" in x] # rename labels roi_list = \ [x.replace("Mean", "sca_tempreg_z_maps_roi").replace(" ", "").replace("#", "") for x in roi_list] except: raise Exception(roi_err) break else: raise Exception(roi_err) new_labels = [] for roi_label in roi_list: new_labels.append(os.path.join(os.getcwd(), roi_label)) numMaps = len(maps) maps.sort() # if not numMaps / 2 == rois: # raise Exception('You specified {0} timeseries but only {1} spatial ' # 'maps were generated'.format(str(rois), # str(numMaps/2))) maps = maps[:rois] return roi_list, maps