import os
import re
from collections import OrderedDict
import nipype.interfaces.ants as ants
import nipype.interfaces.fsl as fsl
import nipype.interfaces.utility as util
from CPAC.pipeline import nipype_pipeline_engine as pe
from nipype.interfaces import afni
from nipype import logging
from CPAC.nuisance.utils.compcor import calc_compcor_components
from CPAC.nuisance.utils.crc import encode as crc_encode
from CPAC.utils.interfaces.fsl import Merge as fslMerge
from CPAC.utils.interfaces.function import Function
from CPAC.registration.utils import check_transforms, generate_inverse_transform_flags
logger = logging.getLogger('nipype.workflow')
[docs]def find_offending_time_points(fd_j_file_path=None, fd_p_file_path=None, dvars_file_path=None,
fd_j_threshold=None, fd_p_threshold=None, dvars_threshold=None,
number_of_previous_trs_to_censor=0,
number_of_subsequent_trs_to_censor=0):
"""
Applies criterion in method to find time points whose FD or DVARS (or both)
are above threshold.
:param fd_j_file_path: path to TSV containing framewise displacement as a
single column. If not specified, it will not be used.
:param fd_p_file_path: path to TSV containing framewise displacement as a
single column. If not specified, it will not be used.
:param dvars_file_path: path to TSV containing DVARS as a single column.
If not specified, it will not be used.
:param fd_j_threshold: threshold to apply to framewise displacement (Jenkinson),
it can be a value such as 0.2 or a floating point multiple of the
standard deviation specified as, e.g. '1.5SD'.
:param fd_p_threshold: threshold to apply to framewise displacement (Power),
it can be a value such as 0.2 or a floating point multiple of the
standard deviation specified as, e.g. '1.5SD'.
:param dvars_threshold: threshold to apply to DVARS, can be a value such
as 0.5 or a floating point multiple of the standard deviation specified
as, e.g. '1.5SD'.
:param number_of_previous_trs_to_censor: extent of censorship window before
the censor.
:param number_of_subsequent_trs_to_censor: extent of censorship window after
the censor.
:return: File path to TSV file containing the volumes to be censored.
"""
import numpy as np
import os
import re
offending_time_points = set()
time_course_len = 0
types = ['FDJ', 'FDP', 'DVARS']
file_paths = [fd_j_file_path, fd_p_file_path, dvars_file_path]
thresholds = [fd_j_threshold, fd_p_threshold, dvars_threshold]
#types = ['FDP', 'DVARS']
#file_paths = [fd_p_file_path, dvars_file_path]
#thresholds = [fd_p_threshold, dvars_threshold]
for type, file_path, threshold in zip(types, file_paths, thresholds):
if not file_path:
continue
if not os.path.isfile(file_path):
raise ValueError(
"File {0} could not be found."
.format(file_path)
)
if not threshold:
raise ValueError("Method requires the specification of a threshold, none received")
metric = np.loadtxt(file_path)
if type == 'DVARS':
metric = np.array([0.0] + metric.tolist())
if not time_course_len:
time_course_len = metric.shape[0]
else:
assert time_course_len == metric.shape[0], "Threshold metric files does not have same size."
try:
threshold_sd = \
re.match(r"([0-9]*\.*[0-9]*)\s*SD", str(threshold))
if threshold_sd:
threshold_sd = float(threshold_sd.groups()[0])
threshold = metric.mean() + \
threshold_sd * metric.std()
else:
threshold = float(threshold)
except:
raise ValueError("Could not translate threshold {0} into a "
"meaningful value".format(threshold))
offending_time_points |= \
set(np.where(metric > threshold)[0].tolist())
extended_censors = []
for censor in offending_time_points:
extended_censors += list(range(
(censor - number_of_previous_trs_to_censor),
(censor + number_of_subsequent_trs_to_censor + 1)))
extended_censors = [
censor
for censor in np.unique(extended_censors)
if 0 <= censor < time_course_len
]
censor_vector = np.ones((time_course_len, 1))
censor_vector[extended_censors] = 0
out_file_path = os.path.join(os.getcwd(), "censors.tsv")
np.savetxt(
out_file_path, censor_vector, fmt='%d', header='censor', comments=''
)
return out_file_path
def compute_threshold(in_file, mask, threshold):
return threshold
def compute_pct_threshold(in_file, mask, threshold_pct):
import nibabel as nb
import numpy as np
m = nb.load(mask).get_fdata().astype(bool)
if not np.any(m):
return 0.0
d = nb.load(in_file).get_fdata()[m]
return np.percentile(
d,
100.0 - threshold_pct
)
def compute_sd_threshold(in_file, mask, threshold_sd):
import nibabel as nb
import numpy as np
m = nb.load(mask).get_fdata().astype(bool)
if not np.any(m):
return 0.0
d = nb.load(in_file).get_fdata()[m]
return d.mean() + threshold_sd * d.std()
def temporal_variance_mask(threshold, by_slice=False, erosion=False, degree=1):
threshold_method = "VAR"
if isinstance(threshold, str):
regex_match = {
"SD": r"([0-9]+(\.[0-9]+)?)\s*SD",
"PCT": r"([0-9]+(\.[0-9]+)?)\s*PCT",
}
for method, regex in regex_match.items():
matched = re.match(regex, threshold)
if matched:
threshold_method = method
threshold_value = matched.groups()[0]
try:
threshold_value = float(threshold_value)
except:
raise ValueError("Error converting threshold value {0} from {1} to a "
"floating point number. The threshold value can "
"contain SD or PCT for selecting a threshold based on "
"the variance distribution, otherwise it should be a "
"floating point number.".format(threshold_value,
threshold))
if threshold_value < 0:
raise ValueError("Threshold value should be positive, instead of {0}."
.format(threshold_value))
if threshold_method == "PCT" and threshold_value >= 100.0:
raise ValueError("Percentile should be less than 100, received {0}."
.format(threshold_value))
threshold = threshold_value
wf = pe.Workflow(name='tcompcor')
input_node = pe.Node(util.IdentityInterface(fields=['functional_file_path', 'mask_file_path']), name='inputspec')
output_node = pe.Node(util.IdentityInterface(fields=['mask']), name='outputspec')
# C-PAC default performs linear regression while nipype performs quadratic regression
detrend = pe.Node(afni.Detrend(args='-polort {0}'.format(degree), outputtype='NIFTI'), name='detrend')
wf.connect(input_node, 'functional_file_path', detrend, 'in_file')
std = pe.Node(afni.TStat(args='-nzstdev', outputtype='NIFTI'), name='std')
wf.connect(input_node, 'mask_file_path', std, 'mask')
wf.connect(detrend, 'out_file', std, 'in_file')
var = pe.Node(afni.Calc(expr='a*a', outputtype='NIFTI'), name='var')
wf.connect(std, 'out_file', var, 'in_file_a')
if by_slice:
slices = pe.Node(fsl.Slice(), name='slicer')
wf.connect(var, 'out_file', slices, 'in_file')
mask_slices = pe.Node(fsl.Slice(), name='mask_slicer')
wf.connect(input_node, 'mask_file_path', mask_slices, 'in_file')
mapper = pe.MapNode(util.IdentityInterface(fields=['out_file', 'mask_file']), name='slice_mapper', iterfield=['out_file', 'mask_file'])
wf.connect(slices, 'out_files', mapper, 'out_file')
wf.connect(mask_slices, 'out_files', mapper, 'mask_file')
else:
mapper_list = pe.Node(util.Merge(1), name='slice_mapper_list')
wf.connect(var, 'out_file', mapper_list, 'in1')
mask_mapper_list = pe.Node(util.Merge(1), name='slice_mask_mapper_list')
wf.connect(input_node, 'mask_file_path', mask_mapper_list, 'in1')
mapper = pe.Node(util.IdentityInterface(fields=['out_file', 'mask_file']), name='slice_mapper')
wf.connect(mapper_list, 'out', mapper, 'out_file')
wf.connect(mask_mapper_list, 'out', mapper, 'mask_file')
if threshold_method == "PCT":
threshold_node = pe.MapNode(Function(input_names=['in_file', 'mask', 'threshold_pct'],
output_names=['threshold'],
function=compute_pct_threshold, as_module=True),
name='threshold_value', iterfield=['in_file', 'mask'])
threshold_node.inputs.threshold_pct = threshold_value
wf.connect(mapper, 'out_file', threshold_node, 'in_file')
wf.connect(mapper, 'mask_file', threshold_node, 'mask')
elif threshold_method == "SD":
threshold_node = pe.MapNode(Function(input_names=['in_file', 'mask', 'threshold_sd'],
output_names=['threshold'],
function=compute_sd_threshold, as_module=True),
name='threshold_value', iterfield=['in_file', 'mask'])
threshold_node.inputs.threshold_sd = threshold_value
wf.connect(mapper, 'out_file', threshold_node, 'in_file')
wf.connect(mapper, 'mask_file', threshold_node, 'mask')
else:
threshold_node = pe.MapNode(Function(input_names=['in_file', 'mask', 'threshold'],
output_names=['threshold'],
function=compute_threshold, as_module=True),
name='threshold_value', iterfield=['in_file', 'mask'])
threshold_node.inputs.threshold = threshold_value
wf.connect(mapper, 'out_file', threshold_node, 'in_file')
wf.connect(mapper, 'mask_file', threshold_node, 'mask')
threshold_mask = pe.MapNode(interface=fsl.maths.Threshold(), name='threshold', iterfield=['in_file', 'thresh'])
threshold_mask.inputs.args = '-bin'
wf.connect(mapper, 'out_file', threshold_mask, 'in_file')
wf.connect(threshold_node, 'threshold', threshold_mask, 'thresh')
merge_slice_masks = pe.Node(interface=fslMerge(), name='merge_slice_masks')
merge_slice_masks.inputs.dimension = 'z'
wf.connect(
threshold_mask, 'out_file',
merge_slice_masks, 'in_files'
)
wf.connect(merge_slice_masks, 'merged_file', output_node, 'mask')
return wf
[docs]def generate_summarize_tissue_mask(nuisance_wf,
pipeline_resource_pool,
regressor_descriptor,
regressor_selector,
csf_mask_exist,
use_ants=True,
ventricle_mask_exist=True,
all_bold=False):
"""
Add tissue mask generation into pipeline according to the selector.
:param nuisance_wf: Nuisance regressor workflow.
:param pipeline_resource_pool: dictionary of available resources.
:param regressor_descriptor: dictionary of steps to build, including keys:
'tissue', 'resolution', 'erosion'
:param regressor_selector: dictionary with the original selector
:return: the full path of the 3D nifti file containing the mask created by
this operation.
"""
steps = [
key
for key in ['tissue', 'resolution', 'erosion']
if key in regressor_descriptor
]
full_mask_key = "_".join(
regressor_descriptor[s]
for s in steps
)
for step_i, step in enumerate(steps):
mask_key = "_".join(
regressor_descriptor[s]
for s in steps[:step_i+1]
)
if mask_key in pipeline_resource_pool:
continue
node_mask_key = re.sub(r"[^\w]", "_", mask_key)
prev_mask_key = "_".join(
regressor_descriptor[s]
for s in steps[:step_i]
)
if step == 'tissue':
pass
elif step == 'resolution':
if all_bold:
pass
if csf_mask_exist:
mask_to_epi = pe.Node(interface=fsl.FLIRT(),
name='{}_flirt'.format(node_mask_key),
mem_gb=3.63,
mem_x=(3767129957844731 / 1208925819614629174706176,
'in_file'))
mask_to_epi.inputs.interp = 'nearestneighbour'
if regressor_selector['extraction_resolution'] == "Functional":
# apply anat2func matrix
mask_to_epi.inputs.apply_xfm = True
mask_to_epi.inputs.output_type = 'NIFTI_GZ'
nuisance_wf.connect(*(
pipeline_resource_pool['Functional_mean'] +
(mask_to_epi, 'reference')
))
nuisance_wf.connect(*(
pipeline_resource_pool['Transformations']['anat_to_func_linear_xfm'] +
(mask_to_epi, 'in_matrix_file')
))
else:
resolution = regressor_selector['extraction_resolution']
mask_to_epi.inputs.apply_isoxfm = resolution
nuisance_wf.connect(*(
pipeline_resource_pool['Anatomical_{}mm'
.format(resolution)] +
(mask_to_epi, 'reference')
))
nuisance_wf.connect(*(
pipeline_resource_pool[prev_mask_key] +
(mask_to_epi, 'in_file')
))
pipeline_resource_pool[mask_key] = \
(mask_to_epi, 'out_file')
if full_mask_key.startswith('CerebrospinalFluid'):
pipeline_resource_pool = generate_summarize_tissue_mask_ventricles_masking(
nuisance_wf,
pipeline_resource_pool,
regressor_descriptor,
regressor_selector,
node_mask_key,
csf_mask_exist,
use_ants,
ventricle_mask_exist
)
elif step == 'erosion':
erode_mask_node = pe.Node(
afni.Calc(args='-b a+i -c a-i -d a+j -e a-j -f a+k -g a-k', expr='a*(1-amongst(0,b,c,d,e,f,g))', outputtype='NIFTI_GZ'),
name='{}'.format(node_mask_key)
)
nuisance_wf.connect(*(
pipeline_resource_pool[prev_mask_key] +
(erode_mask_node, 'in_file_a')
))
pipeline_resource_pool[mask_key] = \
(erode_mask_node, 'out_file')
return pipeline_resource_pool, full_mask_key
def generate_summarize_tissue_mask_ventricles_masking(nuisance_wf,
pipeline_resource_pool,
regressor_descriptor,
regressor_selector,
mask_key,
csf_mask_exist,
use_ants=True,
ventricle_mask_exist=True):
if csf_mask_exist == False:
logger.warning('Segmentation is Off, - therefore will be using '
'lateral_ventricle_mask as CerebrospinalFluid_mask.')
# Mask CSF with Ventricles
if '{}_Unmasked'.format(mask_key) not in pipeline_resource_pool:
if ventricle_mask_exist:
ventricles_key = 'VentriclesToAnat'
if 'resolution' in regressor_descriptor:
ventricles_key += '_{}'.format(regressor_descriptor['resolution'])
if ventricles_key not in pipeline_resource_pool:
transforms = pipeline_resource_pool['Transformations']
if use_ants is True:
# perform the transform using ANTS
collect_linear_transforms = pe.Node(util.Merge(3), name='{}_ants_transforms'.format(ventricles_key))
nuisance_wf.connect(*(transforms['mni_to_anat_linear_xfm'] + (collect_linear_transforms, 'in1')))
# generate inverse transform flags, which depends on the number of transforms
inverse_transform_flags = pe.Node(util.Function(input_names=['transform_list'],
output_names=['inverse_transform_flags'],
function=generate_inverse_transform_flags),
name='{0}_inverse_transform_flags'.format(ventricles_key))
nuisance_wf.connect(collect_linear_transforms, 'out', inverse_transform_flags, 'transform_list')
lat_ven_mni_to_anat = pe.Node(
interface=ants.ApplyTransforms(),
name='{}_ants'.format(ventricles_key),
mem_gb=0.683,
mem_x=(3811976743057169 / 302231454903657293676544,
'input_image'))
lat_ven_mni_to_anat.inputs.interpolation = 'NearestNeighbor'
lat_ven_mni_to_anat.inputs.dimension = 3
nuisance_wf.connect(inverse_transform_flags, 'inverse_transform_flags', lat_ven_mni_to_anat, 'invert_transform_flags')
nuisance_wf.connect(collect_linear_transforms, 'out', lat_ven_mni_to_anat, 'transforms')
nuisance_wf.connect(*(pipeline_resource_pool['Ventricles'] + (lat_ven_mni_to_anat, 'input_image')))
resolution = regressor_selector['extraction_resolution']
if csf_mask_exist:
nuisance_wf.connect(*(
pipeline_resource_pool[mask_key] +
(lat_ven_mni_to_anat, 'reference_image')))
elif resolution == 'Functional':
nuisance_wf.connect(*(
pipeline_resource_pool['Functional_mean'] +
(lat_ven_mni_to_anat, 'reference_image')))
else:
nuisance_wf.connect(*(
pipeline_resource_pool['Anatomical_{}mm'.format(resolution)] +
(lat_ven_mni_to_anat, 'reference_image')))
pipeline_resource_pool[ventricles_key] = (lat_ven_mni_to_anat, 'output_image')
else:
# perform the transform using FLIRT
lat_ven_mni_to_anat = pe.Node(interface=fsl.ApplyWarp(),
name='{}_fsl_applywarp'.format(ventricles_key))
lat_ven_mni_to_anat.inputs.interp = 'nn'
nuisance_wf.connect(*(transforms['mni_to_anat_linear_xfm'] + (lat_ven_mni_to_anat, 'field_file')))
nuisance_wf.connect(*(pipeline_resource_pool['Ventricles'] + (lat_ven_mni_to_anat, 'in_file')))
nuisance_wf.connect(*(pipeline_resource_pool[mask_key] + (lat_ven_mni_to_anat, 'ref_file')))
pipeline_resource_pool[ventricles_key] = (lat_ven_mni_to_anat, 'out_file')
if csf_mask_exist:
# reduce CSF mask to the lateral ventricles
mask_csf_with_lat_ven = pe.Node(interface=afni.Calc(outputtype='NIFTI_GZ'),
name='{}_Ventricles'.format(mask_key))
mask_csf_with_lat_ven.inputs.expr = 'a*b'
mask_csf_with_lat_ven.inputs.out_file = 'csf_lat_ven_mask.nii.gz'
nuisance_wf.connect(*(pipeline_resource_pool[ventricles_key] + (mask_csf_with_lat_ven, 'in_file_a')))
nuisance_wf.connect(*(pipeline_resource_pool[mask_key] + (mask_csf_with_lat_ven, 'in_file_b')))
pipeline_resource_pool['{}_Unmasked'.format(mask_key)] = pipeline_resource_pool[mask_key]
pipeline_resource_pool[mask_key] = (mask_csf_with_lat_ven, 'out_file')
else:
pipeline_resource_pool[mask_key] = pipeline_resource_pool[ventricles_key]
return pipeline_resource_pool
class NuisanceRegressor(object):
def __init__(self, selector):
self.selector = selector
if 'Bandpass' in self.selector:
s = self.selector['Bandpass']
if type(s) is not dict or \
(not s.get('bottom_frequency') and
not s.get('top_frequency')):
del self.selector['Bandpass']
def get(self, key, default=None):
return self.selector.get(key, default)
def __contains__(self, key):
return key in self.selector
def __getitem__(self, key):
return self.selector[key]
@staticmethod
def _derivative_params(selector):
nr_repr = ''
if not selector:
return nr_repr
if selector.get('include_squared'):
nr_repr += 'S'
if selector.get('include_delayed'):
nr_repr += 'D'
if selector.get('include_delayed_squared'):
nr_repr += 'B'
if selector.get('include_backdiff'):
nr_repr += 'V'
if selector.get('include_backdiff_squared'):
nr_repr += 'C'
return nr_repr
@staticmethod
def _summary_params(selector):
summ = selector['summary']
methods = {
'PC': 'PC',
'DetrendPC': 'DPC',
'Mean': 'M',
'NormMean': 'NM',
'DetrendMean': 'DM',
'DetrendNormMean': 'DNM',
}
if type(summ) == dict:
method = summ['method']
rep = methods[method]
if method in ['DetrendPC', 'PC']:
rep += "%d" % summ['components']
else:
rep = methods[summ]
return rep
@staticmethod
def encode(selector):
regs = OrderedDict([
('GreyMatter', 'GM'),
('WhiteMatter', 'WM'),
('CerebrospinalFluid', 'CSF'),
('tCompCor', 'tC'),
('aCompCor', 'aC'),
('GlobalSignal', 'G'),
('Motion', 'M'),
('Custom', 'T'),
('PolyOrt', 'P'),
('Bandpass', 'BP'),
('Censor', 'C')
])
tissues = ['GreyMatter', 'WhiteMatter', 'CerebrospinalFluid']
selectors_representations = []
# tC-1.5PT-PC5S-SDB
# aC-WC-2mmE-PC5-SDB
# WM-2mmE-PC5-SDB
# CSF-2mmE-M-SDB
# GM-2mmE-DNM-SDB
# G-PC5-SDB
# M-SDB
# C-S-FD1.5SD-D1.5SD
# P-2
# B-T0.01-B0.1
for r in regs.keys():
if r not in selector:
continue
s = selector[r]
pieces = [regs[r]]
if r in tissues:
if s.get('extraction_resolution') and s['extraction_resolution'] != 'Functional':
res = "%.2gmm" % s['extraction_resolution']
if s.get('erode_mask'):
res += 'E'
pieces += [res]
pieces += [NuisanceRegressor._summary_params(s)]
pieces += [NuisanceRegressor._derivative_params(s)]
elif r == 'tCompCor':
threshold = ""
if s.get('by_slice'):
threshold += 'S'
t = s.get('threshold')
if t:
if type(t) != str:
t = "%.2f" % t
threshold += t
if s.get('erode_mask'):
threshold += 'E'
if s.get('degree'):
d = s.get('degree')
threshold += str(d)
pieces += [threshold]
pieces += [NuisanceRegressor._summary_params(s)]
pieces += [NuisanceRegressor._derivative_params(s)]
elif r == 'aCompCor':
if s.get('tissues'):
pieces += ["+".join([regs[t] for t in sorted(s['tissues'])])]
if s.get('extraction_resolution'):
res = "%.2gmm" % s['extraction_resolution']
if s.get('erode_mask'):
res += 'E'
pieces += [res]
pieces += [NuisanceRegressor._summary_params(s)]
pieces += [NuisanceRegressor._derivative_params(s)]
elif r == 'Custom':
for ss in s:
pieces += [
os.path.basename(ss['file'])[0:5] +
crc_encode(ss['file'])
]
elif r == 'GlobalSignal':
pieces += [NuisanceRegressor._summary_params(s)]
pieces += [NuisanceRegressor._derivative_params(s)]
elif r == 'Motion':
pieces += [NuisanceRegressor._derivative_params(s)]
elif r == 'PolyOrt':
pieces += ['%d' % s['degree']]
elif r == 'Bandpass':
if s.get('bottom_frequency'):
pieces += ['B%.2g' % s['bottom_frequency']]
if s.get('top_frequency'):
pieces += ['T%.2g' % s['top_frequency']]
elif r == 'Censor':
censoring = {
'Kill': 'K',
'Zero': 'Z',
'Interpolate': 'I',
'SpikeRegression': 'S',
}
thresholds = {
'FD_J': 'FD-J',
'FD_P': 'FD-P',
'DVARS': 'DV',
}
pieces += [censoring[s['method']]]
trs_range = ['0', '0']
if s.get('number_of_previous_trs_to_censor'):
trs_range[0] = '%d' % s['number_of_previous_trs_to_censor']
if s.get('number_of_subsequent_trs_to_censor'):
trs_range[1] = '%d' % s['number_of_subsequent_trs_to_censor']
pieces += ['+'.join(trs_range)]
threshs = sorted(s['thresholds'], reverse=True, key=lambda d: d['type'])
for st in threshs:
thresh = thresholds[st['type']]
if type(st['value']) == str:
thresh += st['value']
else:
thresh += "%.2g" % st['value']
pieces += [thresh]
selectors_representations += ['-'.join([_f for _f in pieces if _f])]
return "_".join(selectors_representations)
def __repr__(self):
return NuisanceRegressor.encode(self.selector)