Source code for CPAC.utils.strategy

# Copyright (C) 2018-2022  C-PAC Developers

# This file is part of C-PAC.

# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.

# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.

# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
import logging
import six
from CPAC.pipeline.engine import ResourcePool
logger = logging.getLogger('nipype.workflow')


[docs]class Strategy: def __init__(self): self._resource_pool = ResourcePool({}) self.leaf_node = None self.leaf_out_file = None self.name = []
[docs] def append_name(self, name): self.name.append(name)
[docs] def get_name(self): return self.name
[docs] def set_leaf_properties(self, node, out_file): self.leaf_node = node self.leaf_out_file = out_file
[docs] def get_leaf_properties(self): return self.leaf_node, self.leaf_out_file
[docs] def get_resource_pool(self): return self.resource_pool
[docs] def get_nodes_names(self): pieces = [n.split('_') for n in self.name] assert all(p[-1].isdigit() for p in pieces) return ['_'.join(p[:-1]) for p in pieces]
[docs] def get_node_from_resource_pool(self, resource_key): try: return self.resource_pool[resource_key] except: logger.error('No node for output: %s', resource_key) raise
@property def resource_pool(self): '''Strategy's ResourcePool dict''' return self._resource_pool.get_entire_rpool() @property def rpool(self): '''Strategy's ResourcePool''' return self._resource_pool
[docs] def update_resource_pool(self, resources, override=False): for key, value in resources.items(): if key in self.resource_pool and not override: raise Exception( 'Key %s already exists in resource pool, ' 'replacing with %s ' % (key, value) ) self.resource_pool[key] = value
[docs] def get(self, resource_key): return self.resource_pool.get(resource_key)
def __getitem__(self, resource_key): assert isinstance(resource_key, six.string_types) try: return self.resource_pool[resource_key] except: logger.error('No node for output: %s', resource_key) raise def __contains__(self, resource_key): assert isinstance(resource_key, six.string_types) return resource_key in self.resource_pool
[docs] def fork(self): fork = Strategy() fork.resource_pool = dict(self.resource_pool) fork.leaf_node = self.leaf_node fork.out_file = str(self.leaf_out_file) fork.leaf_out_file = str(self.leaf_out_file) fork.name = list(self.name) return fork
[docs] @staticmethod def get_forking_points(strategies): forking_points = [] for strat in strategies: strat_node_names = set(strat.get_nodes_names()) strat_forking = [] for counter_strat in strategies: counter_strat_node_names = set(counter_strat.get_nodes_names()) strat_forking += list(strat_node_names - counter_strat_node_names) strat_forking = list(set(strat_forking)) forking_points += [strat_forking] return forking_points
[docs] @staticmethod def get_forking_labels(strategies): fork_names = [] # fork_points is a list of lists, each list containing node names of # nodes run in that strat/fork that are unique to that strat/fork fork_points = Strategy.get_forking_points(strategies) for fork_point in fork_points: fork_point.sort() fork_name = [] for fork in fork_point: fork_label = '' # TODO: reorganize labels # registration if 'anat_mni_ants_register' in fork: fork_label = 'anat-ants' if 'anat_mni_fnirt_register' in fork: fork_label = 'anat-fnirt' if 'anat_mni_flirt_register' in fork: fork_label = 'anat-flirt' if 'func_to_epi_ants' in fork: fork_label = 'func-ants' if 'func_to_epi_fsl' in fork: fork_label = 'func-fsl' # skullstripping if 'func_preproc_afni' in fork: fork_label = 'func-3dautomask' if 'func_preproc_fsl' in fork: fork_label = 'func-bet' if 'func_preproc_fsl_afni' in fork: fork_label = 'func-bet-3dautomask' # motion correction reference if 'mean' in fork: fork_label = 'mean' if 'median' in fork: fork_label = 'median' if 'selected_volume' in fork: fork_label = 'selected_volume' # motion correction if 'mcflirt' in fork: fork_label += '_func-mcflirt' if '3dvolreg' in fork: fork_label += '_func-3dvolreg' if 'anat_refined' in fork: fork_label = 'func-anat-refined' if 'motion_filter' in fork: fork_label = 'motion-filter' # distortion correction if 'epi_distcorr' in fork: fork_label = 'dist-corr' if 'bbreg' in fork: fork_label = 'bbreg' if 'aroma' in fork: fork_label = 'aroma' if 'nuisance' in fork: fork_label = 'nuisance' if 'frequency_filter' in fork: fork_label = 'freq-filter' if 'gen_motion_stats_before_stc' in fork: fork_label = 'motion_stats_before_stc' if 'despike' in fork: fork_label = 'despike' if 'slice' in fork: fork_label = 'slice' if 'anat_preproc_afni' in fork: fork_label = 'anat-afni' if 'anat_preproc_bet' in fork: fork_label = 'anat-bet' if 'anat_preproc_ants' in fork: fork_label = 'anat-ants' if 'anat_preproc_unet' in fork: fork_label = 'anat-unet' fork_name += [fork_label] fork_names.append('_'.join(sorted(set(fork_name)))) return dict(zip(strategies, fork_names))