Source code for CPAC.func_preproc.utils


import numpy as np
from scipy.signal import iirnotch, firwin, filtfilt, freqz
from matplotlib import pyplot as plt
import nibabel as nb
import subprocess
import math


[docs]def nullify(value, function=None): from traits.trait_base import Undefined if value is None: return Undefined if function: return function(value) return value
[docs]def chunk_ts(func_file, n_chunks=None, chunk_size=None): func_img = nb.load(func_file) trs = func_img.shape[3] TR_ranges = [] if n_chunks: chunk_size = trs/n_chunks elif chunk_size: n_chunks = int(trs/chunk_size) else: raise Exception("\n[!] Dev error: Either 'n_chunks' or 'chunk_size' " "arguments must be passed to 'chunk_ts' function.\n") for chunk_idx in range(0, n_chunks): if chunk_idx == n_chunks - 1: TR_ranges.append((int(chunk_idx*chunk_size), int(trs - 1))) else: TR_ranges.append((int(chunk_idx*chunk_size), int((chunk_idx+1)*chunk_size - 1))) return TR_ranges
[docs]def split_ts_chunks(func_file, tr_ranges): if '.nii' in func_file: ext = '.nii' if '.nii.gz' in func_file: ext = '.nii.gz' split_funcs = [] for chunk_idx, tr_range in enumerate(tr_ranges): out_file = os.path.join(os.getcwd(), os.path.basename(func_file).replace(ext, "_{0}{1}".format(chunk_idx, ext))) in_file = "{0}[{1}..{2}]".format(func_file, tr_range[0], tr_range[1]) cmd = ["3dcalc", "-a", in_file, "-expr", "a", "-prefix", out_file] retcode = subprocess.check_output(cmd) split_funcs.append(out_file) return split_funcs
[docs]def oned_text_concat(in_files): out_file = os.path.join(os.getcwd(), os.path.basename(in_files[0].replace("_0", ""))) out_txt = [] for txt in in_files: with open(txt, 'r') as f: txt_lines = f.readlines() if not out_txt: out_txt = [x for x in txt_lines] else: for line in txt_lines: if "#" in line: continue out_txt.append(line) with open(out_file, 'wt') as f: for line in out_txt: f.write(line) return out_file
def degrees_to_mm(degrees, head_radius): # function to convert degrees of motion to mm mm = 2*math.pi*head_radius*(degrees/360) return mm def mm_to_degrees(mm, head_radius): # function to convert mm of motion to degrees degrees = 360*mm/(2*math.pi*head_radius) return degrees def degrees_to_mm(degrees, head_radius): # function to convert degrees of motion to mm mm = 2*math.pi*head_radius*(degrees/360) return mm def mm_to_degrees(mm, head_radius): # function to convert mm of motion to degrees degrees = 360*mm/(2*math.pi*head_radius) return degrees
[docs]def degrees_to_mm(degrees, head_radius): # function to convert degrees of motion to mm mm = 2*math.pi*head_radius*(degrees/360) return mm
[docs]def mm_to_degrees(mm, head_radius): # function to convert mm of motion to degrees degrees = 360*mm/(2*math.pi*head_radius) return degrees
[docs]def notch_filter_motion(motion_params, filter_type, TR, fc_RR_min=None, fc_RR_max=None, center_freq=None, freq_bw=None, lowpass_cutoff=None, filter_order=4): # Adapted from DCAN Labs: # https://github.com/DCAN-Labs/dcan_bold_processing/blob/master/ # ...matlab_code/filtered_movement_regressors.m if "ms" in TR: TR = float(TR.replace("ms", ""))/1000 elif "ms" not in TR and "s" in TR: TR = float(TR.replace("s", "")) params_data = np.loadtxt(motion_params) # Sampling frequency fs = 1 / TR # Nyquist frequency fNy = fs / 2 if filter_type == "notch": # Respiratory Rate if fc_RR_min and fc_RR_max: rr = [float(fc_RR_min) / float(60), float(fc_RR_max) / float(60)] rr_fNy = [rr[0] + fNy, rr[1] + fNy] fa = abs(rr - np.floor(np.divide(rr_fNy, fs)) * fs) elif center_freq and freq_bw: tail = float(freq_bw)/float(2) fa = [center_freq-tail, center_freq+tail] W_notch = np.divide(fa, fNy) Wn = np.mean(W_notch) bw = np.diff(W_notch) # for filter info center_freq = Wn * fNy bandwidth = fa[1] - fa[0] Q = Wn/bw [b_filt, a_filt] = iirnotch(Wn, Q) num_f_apply = np.floor(filter_order / 2) filter_info = f"Motion estimate filter information\n\nType: Notch\n" \ f"\nCenter freq: {center_freq}\nBandwidth: {bandwidth}\n\n" \ f"Wn: {Wn}\nQ: {Q}\n\n" \ f"Based on:\nSampling freq: {fs}\nNyquist freq: {fNy}" elif filter_type == "lowpass": if fc_RR_min: rr = float(fc_RR_min) / float(60) rr_fNy = rr + fNy fa = abs(rr - np.floor(np.divide(rr_fNy, fs)) * fs) elif lowpass_cutoff: fa = lowpass_cutoff Wn = fa/fNy if filter_order: b_filt = firwin(filter_order+1, Wn) a_filt = 1 num_f_apply = 0 filter_info = f"Motion estimate filter information\n\nType: Lowpass" \ f"\n\nCutoff freq: {fa}\nWn: {Wn}\n\n" \ f"Based on:\nSampling freq: {fs}\nNyquist freq: {fNy}" filter_design = os.path.join(os.getcwd(), "motion_estimate_filter_design.txt") filter_plot = os.path.join(os.getcwd(), "motion_estimate_filter_freq-response.png") # plot frequency response for user info w, h = freqz(b_filt, a_filt, fs=fs) fig, ax1 = plt.subplots() ax1.set_title('Motion estimate filter frequency response') ax1.plot(w, 20 * np.log10(abs(h)), 'b') ax1.set_ylabel('Amplitude [dB]', color='b') ax1.set_xlabel('Frequency [Hz]') plt.savefig(filter_plot) with open(filter_design, 'wt') as f: f.write(filter_info) # convert rotation params from degrees to mm params_data[:, 0:3] = degrees_to_mm(params_data[:, 0:3], head_radius=50) filtered_params = filtfilt(b_filt, a_filt, params_data.T) # back rotation params to degrees filtered_params[0:3,:] = mm_to_degrees(filtered_params[0:3,:], head_radius = 50) # back rotation params to degrees filtered_params[0:3,:] = mm_to_degrees(filtered_params[0:3,:], head_radius = 50) filtered_motion_params = os.path.join(os.getcwd(), "{0}_filtered.1D".format(os.path.basename(motion_params))) np.savetxt(filtered_motion_params, filtered_params.T, fmt='%f') return (filtered_motion_params, filter_design, filter_plot)