Source code for CPAC.nuisance.bandpass

import os
from pathlib import Path

import numpy as np
from numpy.typing import NDArray
import nibabel as nib
from scipy.fftpack import fft, ifft


def ideal_bandpass(data, sample_period, bandpass_freqs):
    # Derived from YAN Chao-Gan 120504 based on REST.
    sample_freq = 1.0 / sample_period
    sample_length = data.shape[0]

    data_p = np.zeros(int(2 ** np.ceil(np.log2(sample_length))))
    data_p[:sample_length] = data

    LowCutoff, HighCutoff = bandpass_freqs

    if LowCutoff is None:  # No lower cutoff (low-pass filter)
        low_cutoff_i = 0
    elif LowCutoff > sample_freq / 2.0:
        # Cutoff beyond fs/2 (all-stop filter)
        low_cutoff_i = int(data_p.shape[0] / 2)
    else:
        low_cutoff_i = np.ceil(LowCutoff * data_p.shape[0] * sample_period).astype(
            "int"
        )

    if HighCutoff > sample_freq / 2.0 or HighCutoff is None:
        # Cutoff beyond fs/2 or unspecified (become a highpass filter)
        high_cutoff_i = int(data_p.shape[0] / 2)
    else:
        high_cutoff_i = np.fix(HighCutoff * data_p.shape[0] * sample_period).astype(
            "int"
        )

    freq_mask = np.zeros_like(data_p, dtype="bool")
    freq_mask[low_cutoff_i : high_cutoff_i + 1] = True
    freq_mask[data_p.shape[0] - high_cutoff_i : data_p.shape[0] + 1 - low_cutoff_i] = (
        True
    )

    f_data = fft(data_p)
    f_data[freq_mask is not True] = 0.0
    return np.real_if_close(ifft(f_data)[:sample_length])


def read_1D(one_D: Path | str) -> tuple[list[str], NDArray]:
    """Parse a header from a 1D file, returing that header and a Numpy Array."""
    header = []
    with open(one_D, "r") as _f:
        # Each leading line that doesn't start with a number goes into the header
        for line in _f.readlines():
            try:
                float(line.split()[0])
                break
            except ValueError:
                header.append(line)

    regressor = np.loadtxt(one_D, skiprows=len(header))
    return header, regressor


[docs] def bandpass_voxels(realigned_file, regressor_file, bandpass_freqs, sample_period=None): """Performs ideal bandpass filtering on each voxel time-series. Parameters ---------- realigned_file : string Path of a realigned nifti file. bandpass_freqs : tuple Tuple containing the bandpass frequencies. (LowCutoff_HighPass HighCutoff_LowPass) sample_period : float, optional Length of sampling period in seconds. If not specified, this value is read from the nifti file provided. Returns ------- bandpassed_file : string Path of filtered output (nifti file). """ nii = nib.load(realigned_file) data = nii.get_fdata().astype("float64") mask = (data != 0).sum(-1) != 0 Y = data[mask].T Yc = Y - np.tile(Y.mean(0), (Y.shape[0], 1)) if not sample_period: hdr = nii.header sample_period = float(hdr.get_zooms()[3]) # Sketchy check to convert TRs in millisecond units if sample_period > 20.0: sample_period /= 1000.0 Y_bp = np.zeros_like(Y) for j in range(Y.shape[1]): Y_bp[:, j] = ideal_bandpass(Yc[:, j], sample_period, bandpass_freqs) data[mask] = Y_bp.T img = nib.Nifti1Image(data, header=nii.header, affine=nii.affine) bandpassed_file = os.path.join(os.getcwd(), "bandpassed_demeaned_filtered.nii.gz") img.to_filename(bandpassed_file) regressor_bandpassed_file = None if regressor_file is not None: if regressor_file.endswith(".nii.gz") or regressor_file.endswith(".nii"): nii = nib.load(regressor_file) data = nii.get_fdata().astype("float64") mask = (data != 0).sum(-1) != 0 Y = data[mask].T Yc = Y - np.tile(Y.mean(0), (Y.shape[0], 1)) Y_bp = np.zeros_like(Y) for j in range(Y.shape[1]): Y_bp[:, j] = ideal_bandpass(Yc[:, j], sample_period, bandpass_freqs) data[mask] = Y_bp.T img = nib.Nifti1Image(data, header=nii.header, affine=nii.affine) regressor_bandpassed_file = os.path.join( os.getcwd(), "regressor_bandpassed_demeaned_filtered.nii.gz" ) img.to_filename(regressor_bandpassed_file) else: header: list[str] regressor: NDArray header, regressor = read_1D(regressor_file) Yc = regressor - np.tile(regressor.mean(0), (regressor.shape[0], 1)) Y_bp = np.zeros_like(Yc) # Modify to allow just 1 regressor column shape = ( regressor.shape[0] if len(regressor.shape) < 1 else regressor.shape[1] ) for j in range(shape): Y_bp[:, j] = ideal_bandpass(Yc[:, j], sample_period, bandpass_freqs) regressor_bandpassed_file = os.path.join( os.getcwd(), "regressor_bandpassed_demeaned_filtered.1D" ) with open(regressor_bandpassed_file, "w") as ofd: # write out the header information for line in header: ofd.write(line) nuisance_regressors = np.array(Y_bp) np.savetxt(ofd, nuisance_regressors, fmt="%.18f", delimiter="\t") return bandpassed_file, regressor_bandpassed_file
def afni_1dBandpass(in_file, highpass, lowpass, tr=1): """ Perform AFNI 1dBandpass. Parameters ---------- in_file : string Path of an input 1D file highpass : float LowCutoff/HighPass lowpass : float HighCutoff/LowPass Returns ------- out_file : string Path of an output 1D file """ import os basename = os.path.basename(in_file) filename, file_extension = os.path.splitext(basename) out_file = os.path.join(os.getcwd(), filename + "_bp" + file_extension) cmd = "1dBandpass -dt %f %f %f %s > %s" % (tr, highpass, lowpass, in_file, out_file) os.system(cmd) return out_file