# Functions in this file adapted from NeuroData group:
# STATEMENT OF CHANGES:
# This file is derived from sources licensed under the Apache-2.0 terms,
# and this file has been changed.
# CHANGES:
# * Minor refactoring for compatibility with C-PAC
# ORIGINAL WORK'S ATTRIBUTION NOTICE:
# Copyright 2016 NeuroData (http://neurodata.io)
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# graph.py
# Created by Greg Kiar on 2016-01-27.
# Email: gkiar@jhu.edu
# Can be found here:
# https://github.com/neurodata/m2g/blob/v0.1.0/ndmg/graph/graph.py
# Modifications Copyright (C) 2022-2024 C-PAC Developers
# This file is part of C-PAC.
import os
import numpy as np
import nibabel as nib
from CPAC.utils.monitoring.custom_logging import getLogger
logger = getLogger("nuerodata.m2g.ndmg")
[docs]
def ndmg_roi_timeseries(func_file, label_file):
"""
Function to extract average timeseries for the voxels in each
roi of the labelled atlas.
Returns the roi timeseries as a numpy.ndarray.
**Positional Arguments**
func_file:
- the path to the 4d volume to extract timeseries
label_file:
- the path to the labelled atlas containing labels
for the voxels in the fmri image
roits_file:
- the path to where the roi timeseries will be saved. If
None, don't save and just return the roi_timeseries.
# Adapted from ndmg v0.1.1
# Copyright 2016 NeuroData (http://neurodata.io)
"""
labeldata = nib.load(label_file).get_fdata()
# rois are all the nonzero unique values the parcellation can take
rois = np.sort(np.unique(labeldata[labeldata > 0]))
funcdata = nib.load(func_file).get_fdata()
# initialize time series to [numrois]x[numtimepoints]
roi_ts = np.zeros((len(rois), funcdata.shape[3]))
# iterate over the unique possible rois. note that the rois
# could have nonstandard values, so assign indices w enumerate
for idx, roi in enumerate(rois):
roibool = labeldata == roi # get a bool where our voxels in roi
try:
roi_vts = funcdata[roibool, :]
except IndexError as e:
err = (
"\n[!] Error: functional data and ROI mask may not be in "
"the same space or be the same size.\nDetails: "
f"{e}"
)
raise IndexError(err)
# take the mean for the voxel timeseries, and ignore voxels with
# no variance
ts = roi_vts[roi_vts.std(axis=1) != 0, :].mean(axis=0)
if ts.size != 0:
roi_ts[idx, :] = ts
roits_file = os.path.join(os.getcwd(), "timeseries.npz")
np.savez(roits_file, ts=roi_ts, rois=rois)
return (roi_ts, rois, roits_file)
[docs]
class graph(object):
def __init__(self, N, rois, attr=None, sens="dwi"):
"""
Initializes the graph with nodes corresponding to the number of ROIs
**Positional Arguments:**
N:
- Number of rois
rois:
- Set of ROIs as either an array or niftii file)
attr:
- Node or graph attributes. Can be a list. If 1 dimensional
will be interpretted as a graph attribute. If N
dimensional will be interpretted as node attributes. If
it is any other dimensional, it will be ignored.
# Adapted from ndmg v0.1.1
# Copyright 2016 NeuroData (http://neurodata.io)
"""
from collections import defaultdict
import numpy as np
import nibabel as nib
self.N = N
self.edge_dict = defaultdict(int)
self.rois = nib.load(rois).get_fdata()
n_ids = np.unique(self.rois)
self.n_ids = n_ids[n_ids > 0]
self.modal = sens
pass
[docs]
def make_graph(self, streamlines, attr=None):
"""
Takes streamlines and produces a graph
**Positional Arguments:**
streamlines:
- Fiber streamlines either file or array in a dipy EuDX
or compatible format.
"""
from itertools import product
import time
import networkx as nx
import numpy as np
self.g = nx.Graph(
name="Generated by NeuroData's MRI Graphs (ndmg)",
version="0.1.1",
date=time.asctime(time.localtime()),
source="http://m2g.io",
region="brain",
sensor=self.modal,
ecount=0,
vcount=len(self.n_ids),
)
logger.info(self.g.graph)
[str(self.g.add_node(ids)) for ids in self.n_ids]
nlines = np.shape(streamlines)[0]
logger.info("# of Streamlines: %s", nlines)
print_id = np.max((int(nlines * 0.05), 1)) # in case nlines*.05=0
for idx, streamline in enumerate(streamlines):
if (idx % print_id) == 0:
logger.info(idx)
points = np.round(streamline).astype(int)
p = set()
for point in points:
try:
loc = self.rois[point[0], point[1], point[2]]
except IndexError:
pass
else:
pass
if loc:
p.add(loc)
edges = {tuple(sorted(x)) for x in product(p, p)}
for edge in edges:
lst = tuple(sorted([str(node) for node in edge]))
self.edge_dict[lst] += 1
edge_list = [(int(k[0]), int(k[1]), v) for k, v in self.edge_dict.items()]
self.g.add_weighted_edges_from(edge_list)
[docs]
def cor_graph(self, timeseries, attr=None):
"""
Takes timeseries and produces a correlation matrix
**Positional Arguments:**
timeseries:
-the timeseries file to extract correlation for.
dimensions are [numrois]x[numtimesteps].
"""
import numpy as np
ts = timeseries[0] # noqa: F841
rois = timeseries[1]
logger.info("Estimating correlation matrix for %s ROIs...", self.N)
self.g = np.abs(np.corrcoef(timeseries)) # calculate pearson correlation
self.g = np.nan_to_num(self.g).astype(object)
self.n_ids = rois
# roilist = self.g.nodes()
# for (idx_out, roi_out) in enumerate(roilist):
# for (idx_in, roi_in) in enumerate(roilist):
# self.edge_dict[(roi_out, roi_in)] = float(cor[idx_out, idx_in])
# edge_list = [(str(k[0]), str(k[1]), v) for k, v in self.edge_dict.items()]
# self.g.add_weighted_edges_from(edge_list)
return self.g
[docs]
def get_graph(self):
"""Returns the graph object created."""
try:
return self.g
except AttributeError:
logger.error("The graph has not yet been defined.")
pass
[docs]
def as_matrix(self):
"""Returns the graph as a matrix."""
import networkx as nx
g = self.get_graph()
return nx.to_numpy_matrix(g, nodelist=np.sort(g.nodes()).tolist())
[docs]
def save_graph(self, graphname):
"""
Saves the graph to disk
**Positional Arguments:**
graphname:
- Filename for the graph.
"""
import networkx as nx
import numpy as np
if self.modal == "dwi":
self.g.graph["ecount"] = nx.number_of_edges(self.g)
nx.write_weighted_edgelist(self.g, graphname, delimiter=",")
elif self.modal == "func":
np.savetxt(
graphname,
self.g,
comments="",
delimiter=",",
header=",".join([str(n) for n in self.n_ids]),
)
else:
msg = "Unsupported Modality."
raise ValueError(msg)
[docs]
def summary(self):
"""User friendly wrapping and display of graph properties."""
import networkx as nx
logger.info("\n Graph Summary: %s", nx.info(self.g))
[docs]
def ndmg_create_graphs(ts, labels):
out_file = os.path.join(os.getcwd(), "measure-correlation.csv")
connectome = graph(ts.shape[0], labels, sens="func")
connectome.cor_graph(ts)
connectome.save_graph(out_file)
return out_file