Source code for CPAC.func_preproc.utils

import math
import subprocess

from matplotlib import pyplot as plt
import numpy as np
import nibabel as nib
from scipy.signal import filtfilt, firwin, freqz, iirnotch


[docs] def nullify(value, function=None): from traits.trait_base import Undefined if value is None: return Undefined if function: return function(value) return value
[docs] def chunk_ts(func_file, n_chunks=None, chunk_size=None): func_img = nib.load(func_file) trs = func_img.shape[3] TR_ranges = [] if n_chunks: chunk_size = trs / n_chunks elif chunk_size: n_chunks = int(trs / chunk_size) else: msg = ( "\n[!] Dev error: Either 'n_chunks' or 'chunk_size' " "arguments must be passed to 'chunk_ts' function.\n" ) raise Exception(msg) for chunk_idx in range(0, n_chunks): if chunk_idx == n_chunks - 1: TR_ranges.append((int(chunk_idx * chunk_size), int(trs - 1))) else: TR_ranges.append( (int(chunk_idx * chunk_size), int((chunk_idx + 1) * chunk_size - 1)) ) return TR_ranges
[docs] def split_ts_chunks(func_file, tr_ranges): if ".nii" in func_file: ext = ".nii" if ".nii.gz" in func_file: ext = ".nii.gz" split_funcs = [] for chunk_idx, tr_range in enumerate(tr_ranges): out_file = os.path.join( os.getcwd(), os.path.basename(func_file).replace(ext, f"_{chunk_idx}{ext}"), ) in_file = f"{func_file}[{tr_range[0]}..{tr_range[1]}]" cmd = ["3dcalc", "-a", in_file, "-expr", "a", "-prefix", out_file] subprocess.check_output(cmd) split_funcs.append(out_file) return split_funcs
[docs] def oned_text_concat(in_files): out_file = os.path.join( os.getcwd(), os.path.basename(in_files[0].replace("_0", "")) ) out_txt = [] for txt in in_files: with open(txt, "r") as f: txt_lines = f.readlines() if not out_txt: out_txt = list(txt_lines) else: for line in txt_lines: if "#" in line: continue out_txt.append(line) with open(out_file, "wt") as f: for line in out_txt: f.write(line) return out_file
def degrees_to_mm(degrees, head_radius): # function to convert degrees of motion to mm return 2 * math.pi * head_radius * (degrees / 360) def mm_to_degrees(mm, head_radius): # function to convert mm of motion to degrees return 360 * mm / (2 * math.pi * head_radius) def degrees_to_mm(degrees, head_radius): # function to convert degrees of motion to mm return 2 * math.pi * head_radius * (degrees / 360) def mm_to_degrees(mm, head_radius): # function to convert mm of motion to degrees return 360 * mm / (2 * math.pi * head_radius)
[docs] def degrees_to_mm(degrees, head_radius): # function to convert degrees of motion to mm return 2 * math.pi * head_radius * (degrees / 360)
[docs] def mm_to_degrees(mm, head_radius): # function to convert mm of motion to degrees return 360 * mm / (2 * math.pi * head_radius)
[docs] def notch_filter_motion( motion_params, filter_type, TR, fc_RR_min=None, fc_RR_max=None, center_freq=None, freq_bw=None, lowpass_cutoff=None, filter_order=4, ): # Adapted from DCAN Labs: # https://github.com/DCAN-Labs/dcan_bold_processing/blob/c120097/matlab_code/filtered_movement_regressors.m if "ms" in TR: TR = float(TR.replace("ms", "")) / 1000 elif "ms" not in TR and "s" in TR: TR = float(TR.replace("s", "")) params_data = np.loadtxt(motion_params) # Sampling frequency fs = 1 / TR # Nyquist frequency fNy = fs / 2 if filter_type == "notch": # Respiratory Rate if fc_RR_min and fc_RR_max: rr = [float(fc_RR_min) / float(60), float(fc_RR_max) / float(60)] rr_fNy = [rr[0] + fNy, rr[1] + fNy] fa = abs(rr - np.floor(np.divide(rr_fNy, fs)) * fs) elif center_freq and freq_bw: tail = float(freq_bw) / float(2) fa = [center_freq - tail, center_freq + tail] W_notch = np.divide(fa, fNy) Wn = np.mean(W_notch) bw = np.diff(W_notch) # for filter info center_freq = Wn * fNy bandwidth = fa[1] - fa[0] Q = Wn / bw [b_filt, a_filt] = iirnotch(Wn, Q) np.floor(filter_order / 2) filter_info = ( f"Motion estimate filter information\n\nType: Notch\n" f"\nCenter freq: {center_freq}\nBandwidth: {bandwidth}\n\n" f"Wn: {Wn}\nQ: {Q}\n\n" f"Based on:\nSampling freq: {fs}\nNyquist freq: {fNy}" ) elif filter_type == "lowpass": if fc_RR_min: rr = float(fc_RR_min) / float(60) rr_fNy = rr + fNy fa = abs(rr - np.floor(np.divide(rr_fNy, fs)) * fs) elif lowpass_cutoff: fa = lowpass_cutoff Wn = fa / fNy if filter_order: b_filt = firwin(filter_order + 1, Wn) a_filt = 1 filter_info = ( f"Motion estimate filter information\n\nType: Lowpass" f"\n\nCutoff freq: {fa}\nWn: {Wn}\n\n" f"Based on:\nSampling freq: {fs}\nNyquist freq: {fNy}" ) filter_design = os.path.join(os.getcwd(), "motion_estimate_filter_design.txt") filter_plot = os.path.join(os.getcwd(), "motion_estimate_filter_freq-response.png") # plot frequency response for user info w, h = freqz(b_filt, a_filt, fs=fs) fig, ax1 = plt.subplots() ax1.set_title("Motion estimate filter frequency response") ax1.plot(w, 20 * np.log10(abs(h)), "b") ax1.set_ylabel("Amplitude [dB]", color="b") ax1.set_xlabel("Frequency [Hz]") plt.savefig(filter_plot) with open(filter_design, "wt") as f: f.write(filter_info) # convert rotation params from degrees to mm params_data[:, 0:3] = degrees_to_mm(params_data[:, 0:3], head_radius=50) filtered_params = filtfilt(b_filt, a_filt, params_data.T) # back rotation params to degrees filtered_params[0:3, :] = mm_to_degrees(filtered_params[0:3, :], head_radius=50) # back rotation params to degrees filtered_params[0:3, :] = mm_to_degrees(filtered_params[0:3, :], head_radius=50) filtered_motion_params = os.path.join( os.getcwd(), f"{os.path.basename(motion_params)}_filtered.1D" ) np.savetxt(filtered_motion_params, filtered_params.T, fmt="%f") return (filtered_motion_params, filter_design, filter_plot)