# Copyright (C) 2012-2024 C-PAC Developers
# This file is part of C-PAC.
# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""Functions for calculating motion parameters."""
# pylint: disable=ungrouped-imports,wrong-import-order,wrong-import-position
from nipype.interfaces import afni, fsl, utility as util
from nipype.interfaces.afni import preprocess, utils as afni_utils
from CPAC.func_preproc.utils import (
chunk_ts,
notch_filter_motion,
oned_text_concat,
split_ts_chunks,
)
from CPAC.generate_motion_statistics import (
affine_file_from_params_file,
motion_power_statistics,
)
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.pipeline.nodeblock import nodeblock
from CPAC.pipeline.schema import valid_options
from CPAC.utils.interfaces.function import Function
from CPAC.utils.utils import check_prov_for_motion_tool
[docs]
@nodeblock(
name="calc_motion_stats",
switch=[
["functional_preproc", "run"],
["functional_preproc", "motion_estimates_and_correction", "run"],
],
inputs=[
(
"desc-preproc_bold",
"space-bold_desc-brain_mask",
"desc-movementParameters_motion",
"max-displacement",
"rels-displacement",
"filtered-coordinate-transformation",
"coordinate-transformation",
),
"subject",
"scan",
],
outputs=[
"dvars",
"framewise-displacement-power",
"framewise-displacement-jenkinson",
"power-params",
"motion-params",
"motion",
"desc-summary_motion",
],
)
def calc_motion_stats(wf, cfg, strat_pool, pipe_num, opt=None):
"""Calculate motion statistics for motion parameters."""
motion_prov = strat_pool.get_cpac_provenance("desc-movementParameters_motion")
motion_correct_tool = check_prov_for_motion_tool(motion_prov)
coordinate_transformation = [
"filtered-coordinate-transformation",
"coordinate-transformation",
]
gen_motion_stats = motion_power_statistics(
name=f"gen_motion_stats_{pipe_num}",
motion_correct_tool=motion_correct_tool,
filtered=strat_pool.filtered_movement,
)
# Special case where the workflow is not getting outputs from
# resource pool but is connected to functional datasource
wf.connect(
*strat_pool.get_data("desc-preproc_bold"),
gen_motion_stats,
"inputspec.motion_correct",
)
wf.connect(
*strat_pool.get_data("space-bold_desc-brain_mask"),
gen_motion_stats,
"inputspec.mask",
)
wf.connect(
*strat_pool.get_data("desc-movementParameters_motion"),
gen_motion_stats,
"inputspec.movement_parameters",
)
wf.connect(
*strat_pool.get_data("max-displacement"),
gen_motion_stats,
"inputspec.max_displacement",
)
if strat_pool.check_rpool("rels-displacement"):
wf.connect(
*strat_pool.get_data("rels-displacement"),
gen_motion_stats,
"inputspec.rels_displacement",
)
if strat_pool.check_rpool(coordinate_transformation):
wf.connect(
*strat_pool.get_data(coordinate_transformation),
gen_motion_stats,
"inputspec.transformations",
)
outputs = {
"framewise-displacement-power": (gen_motion_stats, "outputspec.FDP_1D"),
"framewise-displacement-jenkinson": (gen_motion_stats, "outputspec.FDJ_1D"),
"dvars": (gen_motion_stats, "outputspec.DVARS_1D"),
"power-params": (gen_motion_stats, "outputspec.power_params"),
"motion-params": (gen_motion_stats, "outputspec.motion_params"),
"motion": (gen_motion_stats, "outputspec.motion"),
"desc-summary_motion": (gen_motion_stats, "outputspec.desc-summary_motion"),
}
return wf, outputs
[docs]
def estimate_reference_image(in_file):
"""fMRIPrep-style BOLD reference.
Generate a reference 3D map from BOLD and SBRef EPI images for BOLD datasets.
Given a 4D BOLD file[...], estimate a reference
image for subsequent motion estimation and coregistration steps.
For the case of BOLD datasets, it estimates a number of T1w saturated volumes
(non-steady state at the beginning of the scan) and calculates the median
across them.
Otherwise (SBRefs or detected zero non-steady state frames), a median of
of a subset of motion corrected volumes is used.
Ref: https://github.com/nipreps/niworkflows/blob/6d4f2b5/niworkflows/interfaces/registration.py#L446-L549
"""
# STATEMENT OF CHANGES:
# This function is derived from sources licensed under the Apache-2.0 terms,
# and this function has been changed.
# CHANGES:
# * Converted from a nipype.interfaces.base.SimpleInterface class to a function
# * Removed 3D image capabilities (now requires 4D image for in_file)
# * Modified docstring to reflect local changes
# * Updated style to match C-PAC codebase
# ORIGINAL WORK'S ATTRIBUTION NOTICE:
# Copyright (c) 2016, the CRN developers team.
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of niworkflows nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Modifications copyright (C) 2021 - 2024 C-PAC Developers
import os
import numpy as np
import nibabel as nib
ref_input = [in_file]
mc_out_file = "bold_mc.nii.gz"
# Build the nibabel spatial image we will work with
ref_im = []
for im_i in ref_input:
max_new_volumes = 50 - len(ref_im)
if max_new_volumes <= 0:
break
nib_i = nib.squeeze_image(nib.load(im_i))
if nib_i.dataobj.ndim == 3: # noqa: PLR2004
ref_im.append(nib_i)
elif nib_i.dataobj.ndim == 4: # noqa: PLR2004
ref_im += nib.four_to_three(nib_i.slicer[..., :max_new_volumes])
ref_im = nib.squeeze_image(nib.concat_images(ref_im))
out_file = os.path.join(os.getcwd(), "ref_bold.nii.gz")
# Slicing may induce inconsistencies with shape-dependent values in extensions.
# For now, remove all. If this turns out to be a mistake, we can select extensions
# that don't break pipeline stages.
ref_im.header.extensions.clear()
if ref_im.shape[-1] > 40: # noqa: PLR2004
ref_im = nib.Nifti1Image(
ref_im.dataobj[:, :, :, 20:40], ref_im.affine, ref_im.header
)
ref_name = os.path.join(os.getcwd(), "slice.nii.gz")
ref_im.to_filename(ref_name)
os.system(f"3dvolreg -Fourier -twopass -zpad 4 -prefix {mc_out_file} {ref_name}")
mc_slice_nii = nib.load(mc_out_file)
median_image_data = np.median(mc_slice_nii.get_fdata(), axis=3)
nib.Nifti1Image(median_image_data, ref_im.affine, ref_im.header).to_filename(
out_file
)
return out_file
_MOTION_CORRECTED_OUTPUTS = {
"desc-preproc_bold": {"Description": "Motion-corrected BOLD time-series."},
"desc-motion_bold": {"Description": "Motion-corrected BOLD time-series."},
}
# the "filtered" outputs here are just for maintaining expecting
# forking and connections and will not be output
_MOTION_PARAM_OUTPUTS = {
"max-displacement": {},
"rels-displacement": {},
"desc-movementParameters_motion": {
"Description": "Each line contains for one timepoint a 6-DOF "
"rigid transform parameters in the format "
"defined by AFNI's 3dvolreg: [roll, pitch, yaw, "
"superior displacement, left displacement, "
"posterior displacement]. Rotation parameters "
"are in degrees counterclockwise, and translation "
"parameters are in millimeters."
},
"filtered-coordinate-transformation": {"Description": "UNFILTERED"},
"coordinate-transformation": {
"Description": "Each row contains for one timepoint the first "
"12 values of a 4x4 affine matrix"
},
}
[docs]
@nodeblock(
name="motion_correction",
switch=["functional_preproc", "motion_estimates_and_correction", "run"],
option_key=[
"functional_preproc",
"motion_estimates_and_correction",
"motion_correction",
"using",
],
option_val=["3dvolreg", "mcflirt"],
inputs=[("desc-preproc_bold", "motion-basefile")],
outputs={**_MOTION_CORRECTED_OUTPUTS, **_MOTION_PARAM_OUTPUTS},
)
def func_motion_correct(wf, cfg, strat_pool, pipe_num, opt=None):
wf, outputs = motion_correct_connections(wf, cfg, strat_pool, pipe_num, opt)
return wf, outputs
[docs]
@nodeblock(
name="motion_correction_only",
switch=["functional_preproc", "motion_estimates_and_correction", "run"],
option_key=[
"functional_preproc",
"motion_estimates_and_correction",
"motion_correction",
"using",
],
option_val=["3dvolreg", "mcflirt"],
inputs=[("desc-preproc_bold", "motion-basefile")],
outputs=_MOTION_CORRECTED_OUTPUTS,
)
def func_motion_correct_only(wf, cfg, strat_pool, pipe_num, opt=None):
wf, wf_outputs = motion_correct_connections(wf, cfg, strat_pool, pipe_num, opt)
outputs = {
"desc-preproc_bold": wf_outputs["desc-motion_bold"],
"desc-motion_bold": wf_outputs["desc-motion_bold"],
}
return (wf, outputs)
[docs]
@nodeblock(
name="motion_estimates",
switch=["functional_preproc", "motion_estimates_and_correction", "run"],
option_key=[
"functional_preproc",
"motion_estimates_and_correction",
"motion_correction",
"using",
],
option_val=["3dvolreg", "mcflirt"],
inputs=[("desc-preproc_bold", "motion-basefile")],
outputs=_MOTION_PARAM_OUTPUTS,
)
def func_motion_estimates(wf, cfg, strat_pool, pipe_num, opt=None):
"""Calculate motion estimates using 3dVolReg or MCFLIRT."""
from CPAC.pipeline.utils import present_outputs
wf, wf_outputs = motion_correct_connections(wf, cfg, strat_pool, pipe_num, opt)
return (
wf,
present_outputs(
wf_outputs,
[
"coordinate-transformation",
"filtered-coordinate-transformation",
"max-displacement",
"desc-movementParameters_motion",
"rels-displacement",
],
),
)
[docs]
def get_mcflirt_rms_abs(rms_files):
for path in rms_files:
if "abs.rms" in path:
abs_file = path
if "rel.rms" in path:
rels_file = path
return abs_file, rels_file
[docs]
@nodeblock(
name="get_motion_ref",
switch=["functional_preproc", "motion_estimates_and_correction", "run"],
option_key=[
"functional_preproc",
"motion_estimates_and_correction",
"motion_correction",
"motion_correction_reference",
],
option_val=["mean", "median", "selected_volume", "fmriprep_reference"],
inputs=["desc-preproc_bold", "desc-reorient_bold"],
outputs=["motion-basefile"],
)
def get_motion_ref(wf, cfg, strat_pool, pipe_num, opt=None):
if opt not in get_motion_ref.option_val:
msg = (
"\n\n[!] Error: The 'motion_correction_reference' "
"parameter of the 'motion_correction' workflow "
"must be one of:\n\t{0}.\n\nTool input: '{1}'"
"\n\n".format(
" or ".join([f"'{val}'" for val in get_motion_ref.option_val]), opt
)
)
raise ValueError(msg)
if opt == "mean":
func_get_RPI = pe.Node(
interface=afni_utils.TStat(),
name=f"func_get_mean_RPI_{pipe_num}",
mem_gb=0.48,
mem_x=(1435097126797993 / 302231454903657293676544, "in_file"),
)
func_get_RPI.inputs.options = "-mean"
func_get_RPI.inputs.outputtype = "NIFTI_GZ"
node, out = strat_pool.get_data("desc-preproc_bold")
wf.connect(node, out, func_get_RPI, "in_file")
elif opt == "median":
func_get_RPI = pe.Node(
interface=afni_utils.TStat(), name=f"func_get_median_RPI_{pipe_num}"
)
func_get_RPI.inputs.options = "-median"
func_get_RPI.inputs.outputtype = "NIFTI_GZ"
node, out = strat_pool.get_data("desc-preproc_bold")
wf.connect(node, out, func_get_RPI, "in_file")
elif opt == "selected_volume":
func_get_RPI = pe.Node(
interface=afni.Calc(), name=f"func_get_selected_RPI_{pipe_num}"
)
func_get_RPI.inputs.set(
expr="a",
single_idx=cfg.functional_preproc["motion_estimates_and_correction"][
"motion_correction"
]["motion_correction_reference_volume"],
outputtype="NIFTI_GZ",
)
node, out = strat_pool.get_data("desc-preproc_bold")
wf.connect(node, out, func_get_RPI, "in_file_a")
elif opt == "fmriprep_reference":
func_get_RPI = pe.Node(
Function(
input_names=["in_file"],
output_names=["out_file"],
function=estimate_reference_image,
),
name=f"func_get_fmriprep_ref_{pipe_num}",
)
node, out = strat_pool.get_data("desc-reorient_bold")
wf.connect(node, out, func_get_RPI, "in_file")
outputs = {"motion-basefile": (func_get_RPI, "out_file")}
return (wf, outputs)
[docs]
def motion_correct_3dvolreg(wf, cfg, strat_pool, pipe_num):
"""Calculate motion parameters with 3dvolreg."""
if int(cfg.pipeline_setup["system_config"]["max_cores_per_participant"]) > 1:
chunk_imports = ["import nibabel as nib"]
chunk = pe.Node(
Function(
input_names=["func_file", "n_chunks", "chunk_size"],
output_names=["TR_ranges"],
function=chunk_ts,
imports=chunk_imports,
),
name=f"chunk_{pipe_num}",
)
# chunk.inputs.n_chunks = int(cfg.pipeline_setup['system_config'][
# 'max_cores_per_participant'])
# 10-TR sized chunks
chunk.inputs.chunk_size = 10
node, out = strat_pool.get_data("desc-preproc_bold")
wf.connect(node, out, chunk, "func_file")
split_imports = ["import os", "import subprocess"]
split = pe.Node(
Function(
input_names=["func_file", "tr_ranges"],
output_names=["split_funcs"],
function=split_ts_chunks,
imports=split_imports,
),
name=f"split_{pipe_num}",
)
node, out = strat_pool.get_data("desc-preproc_bold")
wf.connect(node, out, split, "func_file")
wf.connect(chunk, "TR_ranges", split, "tr_ranges")
out_split_func = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_split_func_{pipe_num}",
)
wf.connect(split, "split_funcs", out_split_func, "out_file")
func_motion_correct = pe.MapNode(
interface=preprocess.Volreg(),
name=f"func_generate_ref_{pipe_num}",
iterfield=["in_file"],
)
wf.connect(out_split_func, "out_file", func_motion_correct, "in_file")
func_concat = pe.Node(
interface=afni_utils.TCat(), name=f"func_concat_{pipe_num}"
)
func_concat.inputs.outputtype = "NIFTI_GZ"
wf.connect(func_motion_correct, "out_file", func_concat, "in_files")
out_motion = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_motion_{pipe_num}",
)
wf.connect(func_concat, "out_file", out_motion, "out_file")
else:
out_split_func = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_split_func_{pipe_num}",
)
node, out = strat_pool.get_data("desc-preproc_bold")
wf.connect(node, out, out_split_func, "out_file")
func_motion_correct = pe.Node(
interface=preprocess.Volreg(), name=f"func_generate_ref_{pipe_num}"
)
wf.connect(out_split_func, "out_file", func_motion_correct, "in_file")
out_motion = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_motion_{pipe_num}",
)
wf.connect(func_motion_correct, "out_file", out_motion, "out_file")
func_motion_correct.inputs.zpad = 4
func_motion_correct.inputs.outputtype = "NIFTI_GZ"
args = "-Fourier"
if cfg.functional_preproc["motion_estimates_and_correction"]["motion_correction"][
"AFNI-3dvolreg"
]["functional_volreg_twopass"]:
args = f"-twopass {args}"
func_motion_correct.inputs.args = args
# Calculate motion parameters
func_motion_correct_A = func_motion_correct.clone(
f"func_motion_correct_3dvolreg_{pipe_num}"
)
func_motion_correct_A.inputs.md1d_file = "max_displacement.1D"
func_motion_correct_A.inputs.args = args
wf.connect(out_split_func, "out_file", func_motion_correct_A, "in_file")
node, out = strat_pool.get_data("motion-basefile")
wf.connect(node, out, func_motion_correct_A, "basefile")
if int(cfg.pipeline_setup["system_config"]["max_cores_per_participant"]) > 1:
motion_concat = pe.Node(
interface=afni_utils.TCat(), name=f"motion_concat_{pipe_num}"
)
motion_concat.inputs.outputtype = "NIFTI_GZ"
wf.connect(func_motion_correct_A, "out_file", motion_concat, "in_files")
out_motion_A = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_motion_A_{pipe_num}",
)
wf.connect(motion_concat, "out_file", out_motion_A, "out_file")
concat_imports = ["import os"]
md1d_concat = pe.Node(
Function(
input_names=["in_files"],
output_names=["out_file"],
function=oned_text_concat,
imports=concat_imports,
),
name=f"md1d_concat_{pipe_num}",
)
wf.connect(func_motion_correct_A, "md1d_file", md1d_concat, "in_files")
oned_concat = pe.Node(
Function(
input_names=["in_files"],
output_names=["out_file"],
function=oned_text_concat,
imports=concat_imports,
),
name=f"oned_concat_{pipe_num}",
)
wf.connect(func_motion_correct_A, "oned_file", oned_concat, "in_files")
oned_matrix_concat = pe.Node(
Function(
input_names=["in_files"],
output_names=["out_file"],
function=oned_text_concat,
imports=concat_imports,
),
name=f"oned_matrix_concat_{pipe_num}",
)
wf.connect(
func_motion_correct_A, "oned_matrix_save", oned_matrix_concat, "in_files"
)
out_md1d = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_md1d_{pipe_num}",
)
wf.connect(md1d_concat, "out_file", out_md1d, "out_file")
out_oned = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_oned_{pipe_num}",
)
wf.connect(oned_concat, "out_file", out_oned, "out_file")
out_oned_matrix = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_oned_matrix_{pipe_num}",
)
wf.connect(oned_matrix_concat, "out_file", out_oned_matrix, "out_file")
else:
out_motion_A = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_motion_A_{pipe_num}",
)
wf.connect(func_motion_correct_A, "out_file", out_motion_A, "out_file")
out_md1d = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_md1d_{pipe_num}",
)
wf.connect(func_motion_correct_A, "md1d_file", out_md1d, "out_file")
out_oned = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_oned_{pipe_num}",
)
wf.connect(func_motion_correct_A, "oned_file", out_oned, "out_file")
out_oned_matrix = pe.Node(
interface=util.IdentityInterface(fields=["out_file"]),
name=f"out_oned_matrix_{pipe_num}",
)
wf.connect(
func_motion_correct_A, "oned_matrix_save", out_oned_matrix, "out_file"
)
outputs = {
"desc-preproc_bold": (out_motion_A, "out_file"),
"desc-motion_bold": (out_motion_A, "out_file"),
"max-displacement": (out_md1d, "out_file"),
"desc-movementParameters_motion": (out_oned, "out_file"),
"coordinate-transformation": (out_oned_matrix, "out_file"),
"filtered-coordinate-transformation": (out_oned_matrix, "out_file"),
}
return wf, outputs
[docs]
def motion_correct_mcflirt(wf, cfg, strat_pool, pipe_num):
"""Calculate motion parameters with MCFLIRT."""
func_motion_correct_A = pe.Node(
interface=fsl.MCFLIRT(save_mats=True, save_plots=True),
name=f"func_motion_correct_mcflirt_{pipe_num}",
mem_gb=2.5,
)
func_motion_correct_A.inputs.save_mats = True
func_motion_correct_A.inputs.save_plots = True
func_motion_correct_A.inputs.save_rms = True
node, out = strat_pool.get_data("desc-preproc_bold")
wf.connect(node, out, func_motion_correct_A, "in_file")
node, out = strat_pool.get_data("motion-basefile")
wf.connect(node, out, func_motion_correct_A, "ref_file")
normalize_motion_params = pe.Node(
Function(
input_names=["in_file"],
output_names=["out_file"],
function=normalize_motion_parameters,
),
name=f"norm_motion_params_{pipe_num}",
)
wf.connect(func_motion_correct_A, "par_file", normalize_motion_params, "in_file")
get_rms_abs = pe.Node(
Function(
input_names=["rms_files"],
output_names=["abs_file", "rels_file"],
function=get_mcflirt_rms_abs,
),
name=f"get_mcflirt_rms_abs_{pipe_num}",
)
wf.connect(func_motion_correct_A, "rms_files", get_rms_abs, "rms_files")
outputs = {
"desc-preproc_bold": (func_motion_correct_A, "out_file"),
"desc-motion_bold": (func_motion_correct_A, "out_file"),
"max-displacement": (get_rms_abs, "abs_file"),
"rels-displacement": (get_rms_abs, "rels_file"),
"desc-movementParameters_motion": (normalize_motion_params, "out_file"),
"coordinate-transformation": (func_motion_correct_A, "mat_file"),
"filtered-coordinate-transformation": (func_motion_correct_A, "mat_file"),
}
return wf, outputs
motion_correct = {
"3dvolreg": motion_correct_3dvolreg,
"mcflirt": motion_correct_mcflirt,
}
[docs]
def motion_correct_connections(wf, cfg, strat_pool, pipe_num, opt):
"""Check opt for valid option, then connect that option."""
motion_correct_options = valid_options["motion_correction"]
if opt not in motion_correct_options:
msg = (
"\n\n[!] Error: The 'tool' parameter of the "
"'motion_correction' workflow must be one of "
f"{str(motion_correct_options).strip('[{()}]')}"
f".\n\nTool input: {opt}\n\n"
)
raise KeyError(msg)
return motion_correct[opt](wf, cfg, strat_pool, pipe_num)
[docs]
@nodeblock(
name="motion_estimate_filter",
config=[
"functional_preproc",
"motion_estimates_and_correction",
"motion_estimate_filter",
],
switch=["run"],
option_key="filters",
option_val="USER-DEFINED",
inputs=[
(
"desc-preproc_bold",
"space-bold_desc-brain_mask",
"max-displacement",
"rels-displacement",
"coordinate-transformation",
"desc-movementParameters_motion",
),
"TR",
],
outputs={
"filtered-coordinate-transformation": {
"Description": "Affine matrix regenerated from"
" filtered motion parameters. Note:"
" the translation vector does not"
" account for recentering inherent"
" in rotation; this omission does"
" not seem to affect framewise"
" displacement calculation, for which"
" this matrix is used."
},
"desc-movementParameters_motion": {
"Description": "Filtered movement parameters"
" (3 rotation, 3 translation)."
},
"desc-movementParametersUnfiltered_motion": {
"Description": "Unfiltered movement parameters"
" (3 rotation, 3 translation)."
},
"motion-filter-info": {},
"motion-filter-plot": {},
},
)
def motion_estimate_filter(wf, cfg, strat_pool, pipe_num, opt=None):
"""Filter motion parameters.
.. versionchanged:: 1.8.6
Beginning with version 1.8.6, C-PAC outputs both the unfiltered
and the filtered motion parameters and uses the unfiltered
parameters in QC. Previous versions only reported the filtered
parameters and used the filtered parameters for QC.
"""
notch_imports = [
"import os",
"import numpy as np",
"from scipy.signal import iirnotch, filtfilt, firwin, freqz",
"from matplotlib import pyplot as plt",
"from CPAC.func_preproc.utils import degrees_to_mm, mm_to_degrees",
]
notch = pe.Node(
Function(
input_names=[
"motion_params",
"filter_type",
"TR",
"fc_RR_min",
"fc_RR_max",
"center_freq",
"freq_bw",
"lowpass_cutoff",
"filter_order",
],
output_names=["filtered_motion_params", "filter_info", "filter_plot"],
function=notch_filter_motion,
imports=notch_imports,
),
name=f'filter_motion_params_{opt["Name"]}_{pipe_num}',
)
notch.inputs.filter_type = opt.get("filter_type")
notch.inputs.fc_RR_min = opt.get("breathing_rate_min")
notch.inputs.fc_RR_max = opt.get("breathing_rate_max")
notch.inputs.center_freq = opt.get("center_frequency")
notch.inputs.freq_bw = opt.get("filter_bandwidth")
notch.inputs.lowpass_cutoff = opt.get("lowpass_cutoff")
notch.inputs.filter_order = opt.get("filter_order")
movement_parameters = strat_pool.node_data("desc-movementParameters_motion")
wf.connect(
movement_parameters.node, movement_parameters.out, notch, "motion_params"
)
node, out = strat_pool.get_data("TR")
wf.connect(node, out, notch, "TR")
affine = pe.Node(
Function(
input_names=["params_file"],
output_names=["affine_file"],
function=affine_file_from_params_file,
),
name='affine_from_filtered_params_' f'{opt["Name"]}_{pipe_num}',
)
wf.connect(notch, "filtered_motion_params", affine, "params_file")
outputs = {
"filtered-coordinate-transformation": (affine, "affine_file"),
"motion-filter-info": (notch, "filter_info"),
"motion-filter-plot": (notch, "filter_plot"),
"desc-movementParameters_motion": (notch, "filtered_motion_params"),
}
if not cfg.switch_is_off(
[
"functional_preproc",
"motion_estimates_and_correction",
"motion_estimate_filter",
"run",
]
):
outputs["desc-movementParametersUnfiltered_motion"] = (
movement_parameters.node,
movement_parameters.out,
)
return (wf, outputs)
[docs]
def normalize_motion_parameters(in_file):
"""Convert FSL mcflirt motion parameters to AFNI space."""
import os
import numpy as np
motion_params = np.genfromtxt(in_file).T
motion_params = np.vstack(
(
motion_params[2, :] * 180 / np.pi,
motion_params[0, :] * 180 / np.pi,
-motion_params[1, :] * 180 / np.pi,
motion_params[5, :],
motion_params[3, :],
-motion_params[4, :],
)
)
motion_params = np.transpose(motion_params)
out_file = os.path.join(os.getcwd(), "motion_params.tsv")
np.savetxt(out_file, motion_params)
return out_file