Source code for CPAC.utils.ndmg_utils

# Functions in this file adapted from NeuroData group:

# STATEMENT OF CHANGES:
#     This file is derived from sources licensed under the Apache-2.0 terms,
#     and this file has been changed.

# CHANGES:
#     * Minor refactoring for compatibility with C-PAC

# ORIGINAL WORK'S ATTRIBUTION NOTICE:
#     Copyright 2016 NeuroData (http://neurodata.io)

#     Licensed under the Apache License, Version 2.0 (the "License");
#     you may not use this file except in compliance with the License.
#     You may obtain a copy of the License at

#         http://www.apache.org/licenses/LICENSE-2.0

#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.

#     graph.py
#     Created by Greg Kiar on 2016-01-27.
#     Email: gkiar@jhu.edu

# Can be found here:
#     https://github.com/neurodata/m2g/blob/v0.1.0/ndmg/graph/graph.py

# Modifications Copyright (C) 2022-2024  C-PAC Developers

# This file is part of C-PAC.
import os

import numpy as np
import nibabel as nib

from CPAC.utils.monitoring.custom_logging import getLogger

logger = getLogger("nuerodata.m2g.ndmg")


[docs] def ndmg_roi_timeseries(func_file, label_file): """ Function to extract average timeseries for the voxels in each roi of the labelled atlas. Returns the roi timeseries as a numpy.ndarray. **Positional Arguments** func_file: - the path to the 4d volume to extract timeseries label_file: - the path to the labelled atlas containing labels for the voxels in the fmri image roits_file: - the path to where the roi timeseries will be saved. If None, don't save and just return the roi_timeseries. # Adapted from ndmg v0.1.1 # Copyright 2016 NeuroData (http://neurodata.io) """ labeldata = nib.load(label_file).get_fdata() # rois are all the nonzero unique values the parcellation can take rois = np.sort(np.unique(labeldata[labeldata > 0])) funcdata = nib.load(func_file).get_fdata() # initialize time series to [numrois]x[numtimepoints] roi_ts = np.zeros((len(rois), funcdata.shape[3])) # iterate over the unique possible rois. note that the rois # could have nonstandard values, so assign indices w enumerate for idx, roi in enumerate(rois): roibool = labeldata == roi # get a bool where our voxels in roi try: roi_vts = funcdata[roibool, :] except IndexError as e: err = ( "\n[!] Error: functional data and ROI mask may not be in " "the same space or be the same size.\nDetails: " f"{e}" ) raise IndexError(err) # take the mean for the voxel timeseries, and ignore voxels with # no variance ts = roi_vts[roi_vts.std(axis=1) != 0, :].mean(axis=0) if ts.size != 0: roi_ts[idx, :] = ts roits_file = os.path.join(os.getcwd(), "timeseries.npz") np.savez(roits_file, ts=roi_ts, rois=rois) return (roi_ts, rois, roits_file)
[docs] class graph(object): def __init__(self, N, rois, attr=None, sens="dwi"): """ Initializes the graph with nodes corresponding to the number of ROIs **Positional Arguments:** N: - Number of rois rois: - Set of ROIs as either an array or niftii file) attr: - Node or graph attributes. Can be a list. If 1 dimensional will be interpretted as a graph attribute. If N dimensional will be interpretted as node attributes. If it is any other dimensional, it will be ignored. # Adapted from ndmg v0.1.1 # Copyright 2016 NeuroData (http://neurodata.io) """ from collections import defaultdict import numpy as np import nibabel as nib self.N = N self.edge_dict = defaultdict(int) self.rois = nib.load(rois).get_fdata() n_ids = np.unique(self.rois) self.n_ids = n_ids[n_ids > 0] self.modal = sens pass
[docs] def make_graph(self, streamlines, attr=None): """ Takes streamlines and produces a graph **Positional Arguments:** streamlines: - Fiber streamlines either file or array in a dipy EuDX or compatible format. """ from itertools import product import time import networkx as nx import numpy as np self.g = nx.Graph( name="Generated by NeuroData's MRI Graphs (ndmg)", version="0.1.1", date=time.asctime(time.localtime()), source="http://m2g.io", region="brain", sensor=self.modal, ecount=0, vcount=len(self.n_ids), ) logger.info(self.g.graph) [str(self.g.add_node(ids)) for ids in self.n_ids] nlines = np.shape(streamlines)[0] logger.info("# of Streamlines: %s", nlines) print_id = np.max((int(nlines * 0.05), 1)) # in case nlines*.05=0 for idx, streamline in enumerate(streamlines): if (idx % print_id) == 0: logger.info(idx) points = np.round(streamline).astype(int) p = set() for point in points: try: loc = self.rois[point[0], point[1], point[2]] except IndexError: pass else: pass if loc: p.add(loc) edges = {tuple(sorted(x)) for x in product(p, p)} for edge in edges: lst = tuple(sorted([str(node) for node in edge])) self.edge_dict[lst] += 1 edge_list = [(int(k[0]), int(k[1]), v) for k, v in self.edge_dict.items()] self.g.add_weighted_edges_from(edge_list)
[docs] def cor_graph(self, timeseries, attr=None): """ Takes timeseries and produces a correlation matrix **Positional Arguments:** timeseries: -the timeseries file to extract correlation for. dimensions are [numrois]x[numtimesteps]. """ import numpy as np ts = timeseries[0] # noqa: F841 rois = timeseries[1] logger.info("Estimating correlation matrix for %s ROIs...", self.N) self.g = np.abs(np.corrcoef(timeseries)) # calculate pearson correlation self.g = np.nan_to_num(self.g).astype(object) self.n_ids = rois # roilist = self.g.nodes() # for (idx_out, roi_out) in enumerate(roilist): # for (idx_in, roi_in) in enumerate(roilist): # self.edge_dict[(roi_out, roi_in)] = float(cor[idx_out, idx_in]) # edge_list = [(str(k[0]), str(k[1]), v) for k, v in self.edge_dict.items()] # self.g.add_weighted_edges_from(edge_list) return self.g
[docs] def get_graph(self): """Returns the graph object created.""" try: return self.g except AttributeError: logger.error("The graph has not yet been defined.") pass
[docs] def as_matrix(self): """Returns the graph as a matrix.""" import networkx as nx g = self.get_graph() return nx.to_numpy_matrix(g, nodelist=np.sort(g.nodes()).tolist())
[docs] def save_graph(self, graphname): """ Saves the graph to disk **Positional Arguments:** graphname: - Filename for the graph. """ import networkx as nx import numpy as np if self.modal == "dwi": self.g.graph["ecount"] = nx.number_of_edges(self.g) nx.write_weighted_edgelist(self.g, graphname, delimiter=",") elif self.modal == "func": np.savetxt( graphname, self.g, comments="", delimiter=",", header=",".join([str(n) for n in self.n_ids]), ) else: msg = "Unsupported Modality." raise ValueError(msg)
[docs] def summary(self): """User friendly wrapping and display of graph properties.""" import networkx as nx logger.info("\n Graph Summary: %s", nx.info(self.g))
[docs] def ndmg_create_graphs(ts, labels): out_file = os.path.join(os.getcwd(), "measure-correlation.csv") connectome = graph(ts.shape[0], labels, sens="func") connectome.cor_graph(ts) connectome.save_graph(out_file) return out_file