# Copyright (C) 2012-2024 C-PAC Developers
# This file is part of C-PAC.
# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
from collections.abc import Iterable
import os
from pathlib import Path
from typing import Optional
import nibabel as nib
from CPAC.pipeline.nipype_pipeline_engine import Node
from CPAC.pipeline.schema import valid_options
from CPAC.utils.docs import docstring_parameter
from CPAC.utils.interfaces.function import Function
from CPAC.utils.monitoring import IFLOGGER
[docs]
def convert_pvalue_to_r(datafile, p_value, two_tailed=False):
"""
Calculate correlation threshold from p_value.
Parameters
----------
datafile : string
filepath to dataset to extract number of time pts from
p_value : float
significance threshold p-value
two_tailed : boolean (optional); default=False
flag to indicate whether to calculate the two-tailed t-test
threshold for the returned correlation value
Returns
-------
r_value : float
correlation threshold value
"""
import numpy as np
import nibabel as nib
import scipy.stats
# Get two-tailed distribution
if two_tailed:
p_value = p_value / 2
# Load in data and number of time pts
img = nib.load(datafile).get_fdata()
t_pts = img.shape[-1]
# N-2 degrees of freedom with Pearson correlation (two sample means)
deg_freedom = t_pts - 2
# Inverse Survival Function (Inverse of SF)
# Note: survival function (SF) is also known as the complementary
# cumulative distribution function (CCDF): F_(x) = p = P(X > x) = 1 - F(x)
# The inverse will yield: x = F_^-1(p) = F_^-1(P(X > x))
# where x is a value under the distribution of the random variable X
# such that the probability of getting greater than x, is p
t_value = scipy.stats.t.isf(p_value, deg_freedom)
return np.sqrt(t_value**2 / (deg_freedom + t_value**2))
# Return correlation coefficient
[docs]
def merge_lists(
deg_list: Optional[list[str]] = None,
eig_list: Optional[list[str]] = None,
lfcd_list: Optional[list[str]] = None,
):
"""Actually do the list merging.
Parameters
----------
deg_list : list of str, optional
list of paths to degree centrality outputs
eig_list : list of str, optional
list of paths to eigenvector centrality outputs
lfcd_list : list of str, optional
list of paths to local functional connectivity density outputs
Returns
-------
degree_weighted : str
path to weighted degree centrality output
degree_binarized : str
path to binarized degree centrality output
eigen_weighted : str
path to weighted eigenvector centrality output
eigen_binarized : str
path to binarized eigenvector centrality output
lfcd_weighted : str
path to weighted local functional connectivity density output
lfcd_binarized : str
path to binarized local functional connectivity density output
"""
if deg_list is None:
deg_list = []
if eig_list is None:
eig_list = []
if lfcd_list is None:
lfcd_list = []
merged_list = []
merged_list.extend(deg_list)
merged_list.extend(eig_list)
merged_list.extend(lfcd_list)
# and we're hijacking this function to parse out the files instead!
degree_weighted = None
degree_binarized = None
eigen_weighted = None
eigen_binarized = None
lfcd_weighted = None
lfcd_binarized = None
for path in merged_list:
if "degree" in path and "Weighted" in path:
degree_weighted = path
elif "degree" in path and "Binarize" in path:
degree_binarized = path
elif "eigen" in path and "Weighted" in path:
eigen_weighted = path
elif "eigen" in path and "Binarize" in path:
eigen_binarized = path
elif "local_functional" in path and "Weighted" in path:
lfcd_weighted = path
elif "local_functional" in path and "Binarize" in path:
lfcd_binarized = path
return (
degree_weighted,
degree_binarized,
eigen_weighted,
eigen_binarized,
lfcd_weighted,
lfcd_binarized,
)
[docs]
def create_merge_node(pipe_num: int) -> Node:
"""Create a Function Node to merge lists for the centrality workflow.
Parameters
----------
pipe_num : int
Returns
-------
Node
a Function Node to merge lists for the centrality workflow
Notes
-----
Node Inputs::
deg_list : list of strings
list of paths to degree centrality outputs
eig_list : list of strings
list of paths to eigenvector centrality outputs
lfcd_list : list of strings
list of paths to local functional connectivity density outputs
Node Outputs::
degree_weighted : string
path to weighted degree centrality output
degree_binarized : string
path to binarized degree centrality output
eigen_weighted : string
path to weighted eigenvector centrality output
eigen_binarized : string
path to binarized eigenvector centrality output
lfcd_weighted : string
path to weighted local functional connectivity density output
lfcd_binarized : string
path to binarized local functional connectivity density output
"""
return Node(
Function(
input_names=["deg_list", "eig_list", "lfcd_list"],
output_names=[
"degree_weighted",
"degree_binarized",
"eigen_weighted",
"eigen_binarized",
"lfcd_weighted",
"lfcd_binarized",
],
function=merge_lists,
as_module=True,
),
name=f"centrality_merge_node_{pipe_num}",
)
[docs]
@Function.sig_imports(
[
"from collections.abc import Iterable",
"import os",
"from pathlib import Path",
"import nibabel as nib",
"from CPAC.pipeline.schema import valid_options",
"from CPAC.utils.docs import docstring_parameter",
]
)
@docstring_parameter(
weight_options=tuple(valid_options["centrality"]["weight_options"])
)
def sep_nifti_subbriks(nifti_file: Path | str, out_names: Iterable[str]) -> list[str]:
"""Separate sub-briks of niftis and save specified out
Parameters
----------
nifti_file : ~pathlib.Path or str
path to NIfTI output of an AFNI centrality tool
out_names : iterable of str
an iterable of strings, each ending with one of {weight_options}
Returns
-------
list of str
paths to each of the specified outputs as its own file
"""
output_niftis = []
weight_options = valid_options["centrality"]["weight_options"]
selected_options = {_[::-1].split("_", 1)[0][::-1]: _ for _ in out_names}
nii_img = nib.load(nifti_file)
nii_arr = nii_img.get_fdata()
nii_affine = nii_img.affine
nii_dims = nii_arr.shape
if nii_dims[-1] != len(weight_options):
if len(nii_dims) == 3 and len(out_names) == 1: # noqa: PLR2004
pass
else:
err_msg = "out_names must have same number of elements as nifti sub-briks"
raise Exception(err_msg)
for brik, option in enumerate(weight_options):
if option in selected_options:
if len(nii_dims) == 3: # noqa: PLR2004
brik_arr = nii_arr
elif len(nii_dims) > 3: # noqa: PLR2004
brik_arr = nii_arr[:, :, :, 0, brik]
out_file = os.path.join(os.getcwd(), selected_options[option] + ".nii.gz")
out_img = nib.Nifti1Image(brik_arr, nii_affine)
out_img.to_filename(out_file)
output_niftis.append(out_file)
return output_niftis
[docs]
@docstring_parameter(
m_options=valid_options["centrality"]["method_options"],
t_options=valid_options["centrality"]["threshold_options"],
)
def check_centrality_params(method_option, threshold_option, threshold):
"""
Function to check the centrality parameters.
Parameters
----------
method_option : str or int
one of {m_options} or index of option
threshold_option : str
one of {t_options} or index of option
threshold: float
Returns
-------
method_option : str
one of {m_options}
threshold_option : str
one of {t_options}
"""
# Check method option
if isinstance(method_option, int):
if method_option < len(valid_options["centrality"]["method_options"]):
method_option = valid_options["centrality"]["method_options"][method_option]
else:
raise MethodOptionError(method_option)
elif not isinstance(method_option, str):
raise TypeError(
"Method option must be a string, but type '%s' "
"provided" % type(method_option).__name__
)
# Check threshold option
if isinstance(threshold_option, list):
threshold_option = threshold_option[0]
if isinstance(threshold_option, int):
if threshold_option < len(valid_options["centrality"]["threshold_options"]):
threshold_option = valid_options["centrality"]["threshold_options"][
threshold_option
]
else:
raise ThresholdOptionError(threshold_option, method_option)
elif not isinstance(threshold_option, str):
raise TypeError(
"Threshold option must be a string, but type '%s' "
"provided" % type(threshold_option).__name__
)
# Format input strings
method_option = method_option.lower().rstrip(" ")
method_options_v1 = ["degree", "eigenvector", "lfcd"]
if method_option in method_options_v1:
method_option = valid_options["centrality"]["method_options"][
method_options_v1.index(method_option)
]
if " " not in threshold_option:
threshold_option = " ".join([threshold_option, "threshold"])
threshold_option = threshold_option.capitalize().rstrip(" ")
# Check for strings properly formatted
if method_option not in valid_options["centrality"]["method_options"]:
raise MethodOptionError(method_option)
# Check for strings properly formatted
if threshold_option not in valid_options["centrality"]["threshold_options"]:
raise ThresholdOptionError(threshold_option, method_option)
# Check for invalid combinations of method_option + threshold_option
if (
method_option == "local_functional_connectivity_density"
and threshold_option == "Sparsity threshold"
):
raise ThresholdOptionError(threshold_option, method_option)
# If it's significance/sparsity thresholding, check for (0,1]
if threshold_option in ("Significance threshold", "Sparsity threshold"):
if threshold <= 0 or threshold > 1:
raise ThresholdError(threshold_option, threshold)
# If it's correlation, check for [-1,1]
elif threshold_option == "Correlation threshold":
if threshold < -1 or threshold > 1:
raise ThresholdError(threshold_option, threshold)
else:
raise ThresholdOptionError(threshold_option, method_option)
# Return valid method and threshold options
return method_option, threshold_option
[docs]
class MethodOptionError(ValueError):
"""Raised when a selected centrality method option is not supported."""
[docs]
def __init__(self, method_option):
self.method_option = method_option
self.message = "Method option '%s' not supported" % method_option
super().__init__(self.message)
[docs]
class ThresholdError(ValueError):
"""Selected threshold value is not supported for selected threshold option."""
[docs]
def __init__(self, threshold_option, threshold):
self.threshold_option = threshold_option
self.threshold = threshold
IFLOGGER.error("%s", type(threshold))
self.message = f"For '{threshold_option}', threshold value must be "
if threshold_option in ("Significance threshold", "Sparsity threshold"):
self.message += "a positive number greater than 0 "
elif threshold_option == "Correlation threshold":
self.message += "greater than or equal to -1 "
else:
raise ThresholdOptionError(threshold_option)
self.message += (
f"and less than or equal to 1.\n Currently it is set at {threshold}"
)
super().__init__(self.message)
[docs]
class ThresholdOptionError(ValueError):
"""Selected threshold option is not supported for selected centrality measure."""
[docs]
def __init__(self, threshold_option, method_option=None):
self.method_option = method_option
self.threshold_option = threshold_option
self.message = f"Threshold option '{threshold_option}' not supported"
if self.method_option:
self.message += f" for network centrality measure '{method_option}'"
self.message += "; fix this in the pipeline config"
if (
method_option == "local_functional_connectivity_density"
and threshold_option == "Sparsity threshold"
):
_valid_options = " or ".join(
[
f"'{t}'"
for t in valid_options["centrality"]["threshold_options"]
if t != threshold_option
]
)
self.message += f". '{method_option}' must use {_valid_options}."
super().__init__(self.message)