"""
Functions in this file adapted from NeuroData group:
STATEMENT OF CHANGES:
This file is derived from sources licensed under the Apache-2.0 terms,
and this file has been changed.
CHANGES:
* Minor refactoring for compatibility with C-PAC
ORIGINAL WORK'S ATTRIBUTION NOTICE:
Copyright 2016 NeuroData (http://neurodata.io)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
graph.py
Created by Greg Kiar on 2016-01-27.
Email: gkiar@jhu.edu
Can be found here:
https://github.com/neurodata/m2g/blob/v0.1.0/ndmg/graph/graph.py
Modifications Copyright (C) 2022 C-PAC Developers
This file is part of C-PAC.
"""
import os
import nibabel as nb
import numpy as np
[docs]def ndmg_roi_timeseries(func_file, label_file):
"""
Function to extract average timeseries for the voxels in each
roi of the labelled atlas.
Returns the roi timeseries as a numpy.ndarray.
**Positional Arguments**
func_file:
- the path to the 4d volume to extract timeseries
label_file:
- the path to the labelled atlas containing labels
for the voxels in the fmri image
roits_file:
- the path to where the roi timeseries will be saved. If
None, don't save and just return the roi_timeseries.
# Adapted from ndmg v0.1.1
# Copyright 2016 NeuroData (http://neurodata.io)
"""
labeldata = nb.load(label_file).get_fdata()
# rois are all the nonzero unique values the parcellation can take
rois = np.sort(np.unique(labeldata[labeldata > 0]))
funcdata = nb.load(func_file).get_fdata()
# initialize time series to [numrois]x[numtimepoints]
roi_ts = np.zeros((len(rois), funcdata.shape[3]))
# iterate over the unique possible rois. note that the rois
# could have nonstandard values, so assign indices w enumerate
for idx, roi in enumerate(rois):
roibool = labeldata == roi # get a bool where our voxels in roi
try:
roi_vts = funcdata[roibool, :]
except IndexError as e:
err = '\n[!] Error: functional data and ROI mask may not be in ' \
'the same space or be the same size.\nDetails: ' \
'{0}'.format(e)
raise IndexError(err)
# take the mean for the voxel timeseries, and ignore voxels with
# no variance
ts = roi_vts[roi_vts.std(axis=1) != 0, :].mean(axis=0)
if ts.size != 0:
roi_ts[idx, :] = ts
roits_file = os.path.join(os.getcwd(), 'timeseries.npz')
np.savez(roits_file, ts=roi_ts, rois=rois)
return (roi_ts, rois, roits_file)
[docs]class graph(object):
def __init__(self, N, rois, attr=None, sens="dwi"):
"""
Initializes the graph with nodes corresponding to the number of ROIs
**Positional Arguments:**
N:
- Number of rois
rois:
- Set of ROIs as either an array or niftii file)
attr:
- Node or graph attributes. Can be a list. If 1 dimensional
will be interpretted as a graph attribute. If N
dimensional will be interpretted as node attributes. If
it is any other dimensional, it will be ignored.
# Adapted from ndmg v0.1.1
# Copyright 2016 NeuroData (http://neurodata.io)
"""
import numpy as np
import nibabel as nb
from collections import defaultdict
self.N = N
self.edge_dict = defaultdict(int)
self.rois = nb.load(rois).get_fdata()
n_ids = np.unique(self.rois)
self.n_ids = n_ids[n_ids > 0]
self.modal = sens
pass
[docs] def make_graph(self, streamlines, attr=None):
"""
Takes streamlines and produces a graph
**Positional Arguments:**
streamlines:
- Fiber streamlines either file or array in a dipy EuDX
or compatible format.
"""
import time
import numpy as np
import networkx as nx
from itertools import product
self.g = nx.Graph(name="Generated by NeuroData's MRI Graphs (ndmg)",
version='0.1.1',
date=time.asctime(time.localtime()),
source="http://m2g.io",
region="brain",
sensor=self.modal,
ecount=0,
vcount=len(self.n_ids)
)
print(self.g.graph)
[str(self.g.add_node(ids)) for ids in self.n_ids]
nlines = np.shape(streamlines)[0]
print("# of Streamlines: " + str(nlines))
print_id = np.max((int(nlines*0.05), 1)) # in case nlines*.05=0
for idx, streamline in enumerate(streamlines):
if (idx % print_id) == 0:
print(idx)
points = np.round(streamline).astype(int)
p = set()
for point in points:
try:
loc = self.rois[point[0], point[1], point[2]]
except IndexError:
pass
else:
pass
if loc:
p.add(loc)
edges = set([tuple(sorted(x)) for x in product(p, p)])
for edge in edges:
lst = tuple(sorted([str(node) for node in edge]))
self.edge_dict[lst] += 1
edge_list = [(int(k[0]), int(k[1]), v) for k, v in self.edge_dict.items()]
self.g.add_weighted_edges_from(edge_list)
[docs] def cor_graph(self, timeseries, attr=None):
"""
Takes timeseries and produces a correlation matrix
**Positional Arguments:**
timeseries:
-the timeseries file to extract correlation for.
dimensions are [numrois]x[numtimesteps]
"""
import numpy as np
ts = timeseries[0]
rois = timeseries[1]
print("Estimating correlation matrix for {} ROIs...".format(self.N))
self.g = np.abs(np.corrcoef(timeseries)) # calculate pearson correlation
self.g = np.nan_to_num(self.g).astype(object)
self.n_ids = rois
# roilist = self.g.nodes()
# for (idx_out, roi_out) in enumerate(roilist):
# for (idx_in, roi_in) in enumerate(roilist):
# self.edge_dict[(roi_out, roi_in)] = float(cor[idx_out, idx_in])
# edge_list = [(str(k[0]), str(k[1]), v) for k, v in self.edge_dict.items()]
# self.g.add_weighted_edges_from(edge_list)
return self.g
[docs] def get_graph(self):
"""
Returns the graph object created
"""
try:
return self.g
except AttributeError:
print("Error: the graph has not yet been defined.")
pass
[docs] def as_matrix(self):
"""
Returns the graph as a matrix.
"""
import networkx as nx
g = self.get_graph()
return nx.to_numpy_matrix(g, nodelist=np.sort(g.nodes()).tolist())
[docs] def save_graph(self, graphname):
"""
Saves the graph to disk
**Positional Arguments:**
graphname:
- Filename for the graph
"""
import numpy as np
import networkx as nx
if self.modal == 'dwi':
self.g.graph['ecount'] = nx.number_of_edges(self.g)
nx.write_weighted_edgelist(self.g, graphname, delimiter=",")
elif self.modal == 'func':
np.savetxt(graphname, self.g, comments='', delimiter=',',
header=','.join([str(n) for n in self.n_ids]))
else:
raise ValueError("Unsupported Modality.")
pass
[docs] def summary(self):
"""
User friendly wrapping and display of graph properties
"""
import networkx as nx
print("\n Graph Summary:")
print(nx.info(self.g))
pass
[docs]def ndmg_create_graphs(ts, labels):
out_file = os.path.join(os.getcwd(), 'measure-correlation.csv')
connectome = graph(ts.shape[0], labels, sens="func")
conn = connectome.cor_graph(ts)
connectome.save_graph(out_file)
return out_file