Source code for CPAC.qc.xcp

"""`Generate eXtensible Connectivity Pipeline-style quality control files <https://fcp-indi.github.io/docs/user/xcpqc>`_

Columns
-------
sub : str
    subject label :cite:`cite-BIDS21`
ses : str
    session label :cite:`cite-BIDS21`
task : str
    task label :cite:`cite-BIDS21`
run : int
    run index :cite:`cite-BIDS21`
desc : str
    description :cite:`cite-BIDS21`
space : str
    space label :cite:`cite-BIDS21`
meanFD : float
    mean Jenkinson framewise displacement :cite:`cite-Jenk02` :func:`CPAC.generate_motion_statistics.calculate_FD_J`
relMeansRMSMotion : float
    "mean value of RMS motion" :cite:`cite-Ciri19`
relMaxRMSMotion : float
    "maximum vaue of RMS motion" :cite:`cite-Ciri19`
meanDVInit : float
    "mean DVARS" :cite:`cite-Ciri19`
meanDVFinal : float
    "mean DVARS" :cite:`cite-Ciri19`
nVolCensored : int
    "total number of volume(s) censored :cite:`cite-Ciri19`
nVolsRemoved : int
    number of volumes in derivative minus number of volumes in original
    functional scan
motionDVCorrInit : float
    "correlation of RMS and DVARS before regresion" :cite:`cite-Ciri19`
motionDVCorrFinal : float
    "correlation of RMS and DVARS after regresion" :cite:`cite-Ciri19`
coregDice : float
    "Coregsitration of Functional and T1w:[…] Dice index" :cite:`cite-Ciri19` :cite:`cite-Penn19`
coregJaccard : float
    "Coregsitration of Functional and T1w:[…] Jaccard index" :cite:`cite-Ciri19` :cite:`cite-Penn19`
coregCrossCorr : float
    "Coregsitration of Functional and T1w:[…] cross correlation" :cite:`cite-Ciri19` :cite:`cite-Penn19`
coregCoverag : float
    "Coregsitration of Functional and T1w:[…] Coverage index" :cite:`cite-Ciri19` :cite:`cite-Penn19`
normDice : float
    "Normalization of T1w/Functional to Template:[…] Dice index" :cite:`cite-Ciri19` :cite:`cite-Penn19`
normJaccard : float
    "Normalization of T1w/Functional to Template:[…] Jaccard index" :cite:`cite-Ciri19` :cite:`cite-Penn19`
normCrossCorr : float
    "Normalization of T1w/Functional to Template:[…] cross correlation" :cite:`cite-Ciri19` :cite:`cite-Penn19`
normCoverage : float
    "Normalization of T1w/Functional to Template:[…] Coverage index" :cite:`cite-Ciri19` :cite:`cite-Penn19`
"""  # noqa: E501  # pylint: disable=line-too-long
import os
import re
from io import BufferedReader

import nibabel as nb
import numpy as np
import pandas as pd
from CPAC.generate_motion_statistics.generate_motion_statistics import \
    motion_power_statistics
from CPAC.pipeline import nipype_pipeline_engine as pe
from CPAC.utils.interfaces.function import Function
from CPAC.utils.utils import check_prov_for_motion_tool

motion_params = ['movement-parameters', 'dvars',
                 'framewise-displacement-jenkinson']


[docs]def calculate_overlap(image_pair): ''' Function to calculate Dice, Jaccard, CrossCorr and Coverage:cite:`cite-Penn19` from a pair of arrays Parameters ---------- image_pair : 2-tuple array of which to calculate overlaps metrics Returns ------- coefficents : dict coeffiecients['dice'] : float Dice index coeffiecients['jaccard'] : float Jaccard index coeffiecients['cross_corr'] : float cross-correlation coeffiecients['coverage'] : float coverage index Examples -------- >>> import numpy as np >>> a1 = np.array([0, 0, 0, 1, 1, 1]) >>> a2 = np.array([0, 0, 1, 1, 0, 1]) >>> tuple(calculate_overlap((a1, a2)).values()) (0.6666666666666666, 0.5, 0.33333333333333326, 0.6666666666666666) >>> tuple(calculate_overlap((a1, a1)).values()) (1.0, 1.0, 0.9999999999999998, 1.0) >>> tuple(calculate_overlap((a2, a2)).values()) (1.0, 1.0, 0.9999999999999998, 1.0) ''' # noqa: E501 # pylint: disable=line-too-long if len(image_pair) != 2: raise IndexError('`calculate_overlap` requires 2 images, but ' f'{len(image_pair)} were provided') if len(image_pair[0]) != len(image_pair[1]): image_pair = _repeat_shorter(image_pair) image_pair = tuple(image.astype(bool) for image in image_pair) intersect = image_pair[0] * image_pair[1] vols = [np.sum(image) for image in image_pair] vol_intersect = np.sum(intersect) vol_sum = sum(vols) vol_union = vol_sum - vol_intersect coefficients = { 'dice': 2 * vol_intersect / vol_sum, 'jaccard': vol_intersect / vol_union, 'cross_corr': np.corrcoef(image_pair)[0, 1], 'coverage': vol_intersect / min(vols) } for name, coefficient in coefficients.items(): if not 1 >= abs(coefficient) >= 0 and not np.isnan(coefficient): raise ValueError(f'Valid range for {name} is [0, 1] but value ' f'{coefficient} was calculated.') return coefficients
def _connect_motion(wf, strat_pool, qc_file, brain_mask_key, final, pipe_num): # pylint: disable=invalid-name, too-many-arguments try: nodes = {'censor-indices': strat_pool.node_data('censor-indices')} wf.connect(nodes['censor-indices'].node, nodes['censor-indices'].out, qc_file, 'censor_indices') except LookupError: nodes = {} qc_file.inputs.censor_indices = [] motion_prov = strat_pool.get_cpac_provenance('movement-parameters') motion_correct_tool = check_prov_for_motion_tool(motion_prov) gen_motion_stats = motion_power_statistics('motion_stats-after_' f'{pipe_num}', motion_correct_tool) nodes = { **nodes, **{node_data: strat_pool.node_data(node_data) for node_data in [ 'subject', 'scan', brain_mask_key, 'max-displacement', *motion_params]}} if motion_correct_tool == '3dvolreg' and strat_pool.check_rpool( 'coordinate-transformation'): nodes['coordinate-transformation'] = strat_pool.node_data( 'coordinate-transformation') wf.connect(nodes['coordinate-transformation'].node, nodes['coordinate-transformation'].out, gen_motion_stats, 'inputspec.transformations') elif motion_correct_tool == 'mcflirt' and strat_pool.check_rpool( 'rels-displacement'): nodes['rels-displacement'] = strat_pool.node_data('rels-displacement') wf.connect(nodes['rels-displacement'].node, nodes['rels-displacement'].out, gen_motion_stats, 'inputspec.rels_displacement') wf.connect([ (final['func'].node, gen_motion_stats, [ (final['func'].out, 'inputspec.motion_correct')]), (nodes['subject'].node, gen_motion_stats, [ (nodes['subject'].out, 'inputspec.subject_id')]), (nodes['scan'].node, gen_motion_stats, [ (nodes['scan'].out, 'inputspec.scan_id')]), (nodes['movement-parameters'].node, gen_motion_stats, [ (nodes['movement-parameters'].out, 'inputspec.movement_parameters')]), (nodes['max-displacement'].node, gen_motion_stats, [ (nodes['max-displacement'].out, 'inputspec.max_displacement')]), (nodes[brain_mask_key].node, gen_motion_stats, [ (nodes[brain_mask_key].out, 'inputspec.mask')]), (gen_motion_stats, qc_file, [ ('outputspec.DVARS_1D', 'dvars_after'), ('outputspec.FDJ_1D', 'fdj_after')]), *[(nodes[node].node, qc_file, [ (nodes[node].out, node.replace('-', '_')) ]) for node in motion_params]]) return wf def _connect_xcp(wf, strat_pool, qc_file, original, final, t1w_bold, brain_mask_key, output_key, pipe_num): # pylint: disable=invalid-name, too-many-arguments if ( strat_pool.check_rpool('movement-parameters') and strat_pool.check_rpool(brain_mask_key) ): wf = _connect_motion(wf, strat_pool, qc_file, brain_mask_key, final, pipe_num) else: qc_file.inputs.censor_indices = [] for key in [*motion_params, 'movement_parameters', 'framewise_displacement_jenkinson', 'dvars_after', 'fdj_after']: setattr(qc_file.inputs, key, None) wf.connect([ (original['anat'].node, qc_file, [ (original['anat'].out, 'original_anat')]), (original['func'].node, qc_file, [ (original['func'].out, 'original_func')]), (final['anat'].node, qc_file, [(final['anat'].out, 'final_anat')]), (final['func'].node, qc_file, [(final['func'].out, 'final_func')]), (t1w_bold.node, qc_file, [(t1w_bold.out, 'space_T1w_bold')])]) outputs = {output_key: (qc_file, 'qc_file')} return wf, outputs
[docs]def dvcorr(dvars, fdj): """Function to correlate DVARS and FD-J""" dvars = np.loadtxt(dvars) fdj = np.loadtxt(fdj) if len(dvars) != len(fdj) - 1: raise ValueError( 'len(DVARS) should be 1 less than len(FDJ), but their respective ' f'lengths are {len(dvars)} and {len(fdj)}.' ) return np.corrcoef(dvars, fdj[1:])[0, 1]
[docs]def generate_xcp_qc(space, desc, original_anat, final_anat, original_func, final_func, space_T1w_bold, movement_parameters, dvars, censor_indices, framewise_displacement_jenkinson, dvars_after, fdj_after, template=None): # pylint: disable=too-many-arguments, too-many-locals, invalid-name """Function to generate an RBC-style QC CSV Parameters ---------- space : str 'native' or 'template' desc : str description string original_anat : str path to original 'T1w' image final_anat : str path to 'desc-preproc_T1w' image original_func : str path to original 'bold' image final_bold : str path to 'desc-preproc_bold' image space_T1w_bold : str path to 'space-T1w_desc-mean_bold' image movement_parameters: str path to movement parameters dvars : str path to DVARS before motion correction censor_indices : list list of indices of censored volumes framewise_displacement_jenkinson : str path to framewise displacement (Jenkinson) before motion correction dvars_after : str path to DVARS on final 'bold' image fdj_after : str path to framewise displacement (Jenkinson) on final 'bold' image template : str path to template Returns ------- str path to desc-xcp_quality TSV """ columns = ( 'sub,ses,task,run,desc,space,meanFD,relMeansRMSMotion,' 'relMaxRMSMotion,meanDVInit,meanDVFinal,nVolCensored,nVolsRemoved,' 'motionDVCorrInit,motionDVCorrFinal,coregDice,coregJaccard,' 'coregCrossCorr,coregCoverage,normDice,normJaccard,normCrossCorr,' 'normCoverage'.split(',') ) images = { 'original_anat': nb.load(original_anat), 'original_func': nb.load(original_func), 'final_anat': nb.load(final_anat), 'final_func': nb.load(final_func), 'space-T1w_bold': nb.load(space_T1w_bold) } if template is not None: images['template'] = nb.load(template) # `sub` through `desc` from_bids = { **strings_from_bids(original_func), 'space': space, 'desc': desc } # `nVolCensored` & `nVolsRemoved` n_vols_censored = len( censor_indices) if censor_indices is not None else 'unknown' shape_params = {'nVolCensored': n_vols_censored, 'nVolsRemoved': images['final_func'].shape[3] - images['original_func'].shape[3]} if isinstance(final_func, BufferedReader): final_func = final_func.name qc_filepath = os.path.join(os.getcwd(), 'xcpqc.tsv') desc_span = re.search(r'_desc-.*_', final_func) if desc_span: desc_span = desc_span.span() final_func = '_'.join([ final_func[:desc_span[0]], final_func[desc_span[1]:] ]) del desc_span if dvars: # `meanFD (Jenkinson)` power_params = {'meanFD': np.mean(np.loadtxt( framewise_displacement_jenkinson))} # `relMeansRMSMotion` & `relMaxRMSMotion` mot = np.genfromtxt(movement_parameters).T # Relative RMS of translation rms = np.sqrt(mot[3] ** 2 + mot[4] ** 2 + mot[5] ** 2) rms_params = { 'relMeansRMSMotion': [np.mean(rms)], 'relMaxRMSMotion': [np.max(rms)] } # `meanDVInit` & `meanDVFinal` meanDV = {'meanDVInit': np.mean(np.loadtxt(dvars))} try: meanDV['motionDVCorrInit'] = dvcorr( dvars, framewise_displacement_jenkinson) except ValueError as value_error: meanDV['motionDVCorrInit'] = f'ValueError({str(value_error)})' if dvars_after: if not fdj_after: fdj_after = framewise_displacement_jenkinson meanDV['meanDVFinal'] = np.mean(np.loadtxt(dvars_after)) try: meanDV['motionDVCorrFinal'] = dvcorr(dvars_after, fdj_after) except ValueError as value_error: meanDV['motionDVCorrFinal'] = f'ValueError({str(value_error)})' else: meanDV = _na_dict([f'{dv}DV{"Corr" + ts if dv == "motion" else ts}' for dv in ['mean', 'motion'] for ts in ['Init', 'Final']]) power_params = {'meanFD': 'n/a'} rms_params = _na_dict(['relMeansRMSMotion', 'relMaxRMSMotion']) # Overlap overlap_images = {variable: image.get_fdata().ravel() for variable, image in images.items() if variable in ['space-T1w_bold', 'original_anat', 'template']} overlap_params = {} (overlap_params['coregDice'], overlap_params['coregJaccard'], overlap_params['coregCrossCorr'], overlap_params['coregCoverage'] ) = [[item] for item in calculate_overlap( (overlap_images['space-T1w_bold'], overlap_images['original_anat']) ).values()] if space == 'native': for key in ['normDice', 'normJaccard', 'normCrossCorr', 'normCoverage']: overlap_params[key] = ['N/A: native space'] elif template is not None: (overlap_params['normDice'], overlap_params['normJaccard'], overlap_params['normCrossCorr'], overlap_params['normCoverage'] ) = [[item] for item in calculate_overlap( (images['final_func'].get_fdata().ravel(), overlap_images['template'])).values()] else: overlap_params = _na_dict(['normDice', 'normJaccard', 'normCrossCorr', 'normCoverage']) qc_dict = { **from_bids, **power_params, **rms_params, **shape_params, **overlap_params, **meanDV } df = pd.DataFrame(qc_dict, columns=columns) df.to_csv(qc_filepath, sep='\t', index=False) return qc_filepath
def _na_dict(keys): return {key: 'n/a' for key in keys} def _prep_qc_xcp(strat_pool, pipe_num, space): qc_file = pe.Node(Function(input_names=['subject', 'scan', 'space', 'desc', 'template', 'original_func', 'final_func', 'original_anat', 'final_anat', 'space_T1w_bold', 'movement_parameters', 'censor_indices', 'dvars', 'framewise_displacement_jenkinson', 'dvars_after', 'fdj_after'], output_names=['qc_file'], function=generate_xcp_qc, as_module=True), name=f'xcpqc-{space}_{pipe_num}') qc_file.inputs.desc = 'preproc' qc_file.inputs.space = space original = {} final = {} original['anat'] = strat_pool.node_data('T1w') original['func'] = strat_pool.node_data('bold') final['anat'] = strat_pool.node_data('desc-preproc_T1w') t1w_bold = strat_pool.node_data('space-T1w_desc-mean_bold') return qc_file, original, final, t1w_bold
[docs]def qc_xcp_native(wf, cfg, strat_pool, pipe_num, opt=None): # pylint: disable=invalid-name, unused-argument """ {'name': 'qc_xcp_native', 'config': ['pipeline_setup', 'output_directory', 'quality_control'], 'switch': ['generate_xcpqc_files'], 'option_key': 'None', 'option_val': 'None', 'inputs': [('bold', 'subject', 'scan', 'T1w', 'max-displacement', 'dvars', 'censor-indices', 'desc-preproc_bold', 'desc-preproc_T1w', 'space-T1w_desc-mean_bold', 'space-bold_desc-brain_mask', 'movement-parameters', 'framewise-displacement-jenkinson', 'rels-displacement', 'coordinate-transformation')], 'outputs': ['desc-xcp_quality']} """ space = 'native' qc_file, original, final, t1w_bold = _prep_qc_xcp(strat_pool, pipe_num, space) final['func'] = strat_pool.node_data('desc-preproc_bold') return _connect_xcp(wf, strat_pool, qc_file, original, final, t1w_bold, 'space-bold_desc-brain_mask', 'desc-xcp_quality', pipe_num)
[docs]def qc_xcp_skullstripped(wf, cfg, strat_pool, pipe_num, opt=None): # pylint: disable=invalid-name, unused-argument r""" Same as ``qc_xcp_native`` except no motion inputs. Node Block: {'name': 'qc_xcp_skullstripped', 'config': ['pipeline_setup', 'output_directory', 'quality_control'], 'switch': ['generate_xcpqc_files'], 'option_key': 'None', 'option_val': 'None', 'inputs': [('bold', 'subject', 'scan', 'T1w', 'desc-preproc_bold', 'desc-preproc_T1w', 'space-T1w_desc-mean_bold', 'space-bold_desc-brain_mask')], 'outputs': ['desc-xcp_quality']} """ # If strat has dvars, it should be captured by 'qc_xcp_native' if 'dvars' in strat_pool.get('desc-preproc_bold').get( 'json', {}).get('Sources', {}): return wf, {} return qc_xcp_native(wf, cfg, strat_pool, pipe_num, opt)
[docs]def qc_xcp_template(wf, cfg, strat_pool, pipe_num, opt=None): # pylint: disable=invalid-name, unused-argument """ {'name': 'qc_xcp_template', 'config': ['pipeline_setup', 'output_directory', 'quality_control'], 'switch': ['generate_xcpqc_files'], 'option_key': 'None', 'option_val': 'None', 'inputs': [('bold', 'subject', 'scan', 'T1w', 'T1w-brain-template-funcreg', 'space-T1w_desc-mean_bold', 'space-template_desc-preproc_bold', 'desc-preproc_T1w', 'space-template_desc-bold_mask')], 'outputs': ['space-template_desc-xcp_quality']} """ space = 'template' qc_file, original, final, t1w_bold = _prep_qc_xcp(strat_pool, pipe_num, space) final['func'] = strat_pool.node_data('space-template_desc-preproc_bold') template = strat_pool.node_data('T1w-brain-template-funcreg') wf.connect(template.node, template.out, qc_file, 'template') return _connect_xcp(wf, strat_pool, qc_file, original, final, t1w_bold, 'space-template_desc-bold_mask', 'space-template_desc-xcp_quality', pipe_num)
def _repeat_shorter(images): ''' Parameters ---------- images : 2-tuple Returns ------- images : 2-tuple Examples -------- >>> _repeat_shorter((np.array([1, 2, 3]), np.array([1]))) (array([1, 2, 3]), array([1, 1, 1])) >>> _repeat_shorter((np.array([0, 0]), (np.array([1, 1, 2, 2, 3, 4])))) (array([0, 0, 0, 0, 0, 0]), array([1, 1, 2, 2, 3, 4])) ''' lens = (len(images[0]), len(images[1])) if lens[1] > lens[0] and lens[1] % lens[0] == 0: return (np.tile(images[0], lens[1] // lens[0]), images[1]) if lens[0] > lens[1] and lens[0] % lens[1] == 0: return (images[0], np.tile(images[1], lens[0] // lens[1])) raise ValueError('operands could not be broadcast together with shapes ' f'({lens[0]},) ({lens[1]},)')
[docs]def strings_from_bids(final_func): """ Function to gather BIDS entities into a dictionary Parameters ---------- final_func : str Returns ------- dict Examples -------- >>> fake_path = ( ... '/path/to/sub-fakeSubject_ses-fakeSession_task-peer_run-3_' ... 'atlas-Schaefer400_space-MNI152NLin6_res-1x1x1_' ... 'desc-NilearnPearson_connectome.tsv') >>> strings_from_bids(fake_path)['desc'] 'NilearnPearson' >>> strings_from_bids(fake_path)['space'] 'MNI152NLin6' """ from_bids = dict( tuple(entity.split('-', 1)) if '-' in entity else ('suffix', entity) for entity in final_func.split('/')[-1].split('_') ) from_bids = {k: from_bids[k] for k in from_bids} if 'space' not in from_bids: from_bids['space'] = 'native' return from_bids