Nodes#
A Nipype Node
has an initialization parameter mem_gb
that differs from the commandline option --mem_gb
. While the commandline option is a limit, the Node initialization parameter is an estimate of the most memory that Node will consume when run. The Node parameter is not a limit; rather, this value is used to allocate system resources at runtime.
Conversely, the commandline option --n_cpus
is a limit and the Node initialization parameter n_procs
is also a limit of the maximum number of threads a Node will be permmitted to consume.
C-PAC automatically creates a JSON-like file called callback.log
(via the function log_nodes_cb
) when running. This file includes for each Node:
estimated memory,
memory usage observed at runtime,
specified maximum number of threads per Node, and
threads used at runtime.
A callback.log
can be provided to the pipeline configuration file (see Computer Settings) or with the commandline flag --runtime_usage
. If a callback log is provided in the pipeline configuration, nodes with names that match nodes recorded in that pipeline log will have their memory estimates overridden by the values in the callback log plus a buffer percent (provided with the --runtime_buffer
flag or in the pipeline configuration file).
When a developer creates or modifies a Node in C-PAC, a mem_gb
and n_procs
argument should be provided unless the respective defaults of 0.2 and None (number of available system cores) are expected to be sufficient. When testing, the mem_gb
and n_procs
arguments should be adjusted if the observed memory and/or thread usage of a Node exceeds the estimate.
For nodes that will use a varying amount of memory depending on the node’s input data, the optional parameter mem_x
takes a tuple of (memory multiplier, input file, multiplier mode)
where memory multiplier
is a number and input file
is the string name of the Node input to multiply such that the memory estimate returned by the mem_gb
attribute is the mem_gb
argument plus memory multiplier
times the dimensions of the input file
as specified in the multiplier mode
(xyzt
(spatial × temporal; default), xyz
(spatial), or just t
(temporal)).
Note
mem_x
is a new parameter in C-PAC v1.8.1 and subject to change in future releases as we continue to develop methods for setting data- and operation-dependent memory estimates.
- class CPAC.pipeline.nipype_pipeline_engine.Node(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, *, mem_x, **kwargs)[source]#
Wraps interface objects for use in pipeline
A Node creates a sandbox-like directory for executing the underlying interface. It will copy or link inputs into this directory to ensure that input data are not overwritten. A hash of the input state is used to determine if the Node inputs have changed and whether the node needs to be re-executed.
Examples
>>> from nipype import Node >>> from nipype.interfaces import spm >>> realign = Node(spm.Realign(), 'realign') >>> realign.inputs.in_files = 'functional.nii' >>> realign.inputs.register_to_mean = True >>> realign.run()
- __init__(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, *, mem_x, **kwargs)[source]#
- Parameters
interface (interface object) – node specific interface (fsl.Bet(), spm.Coregister())
name (alphanumeric string) – node specific name
iterables (generator) –
Input field and list to iterate using the pipeline engine for example to iterate over different frac values in fsl.Bet() for a single field the input can be a tuple, otherwise a list of tuples : :
node.iterables = (‘frac’,[0.5,0.6,0.7]) node.iterables = [(‘fwhm’,[2,4]),(‘fieldx’,[0.5,0.6,0.7])]
If this node has an itersource, then the iterables values is a dictionary which maps an iterable source field value to the target iterables field values, e.g. : : :
inputspec.iterables = (‘images’,[‘img1.nii’, ‘img2.nii’]]) node.itersource = (‘inputspec’, [‘frac’]) node.iterables = (‘frac’, {‘img1.nii’ : [0.5, 0.6],
’img2.nii’ : [0.6, 0.7]})
If this node’s synchronize flag is set, then an alternate form of the iterables is a [fields, values] list, where fields is the list of iterated fields and values is the list of value tuples for the given fields, e.g. : : :
node.synchronize = True node.iterables = [(‘frac’, ‘threshold’),
- [(0.5, True),
(0.6, False)]]
itersource (tuple) – The (name, fields) iterables source which specifies the name of the predecessor iterable node and the input fields to use from that source node. The output field values comprise the key to the iterables parameter value mapping dictionary.
synchronize (boolean) – Flag indicating whether iterables are synchronized. If the iterables are synchronized, then this iterable node is expanded once per iteration over all of the iterables values. Otherwise, this iterable node is expanded once per each permutation of the iterables values.
overwrite (Boolean) – Whether to overwrite contents of output directory if it already exists. If directory exists and hash matches it assumes that process has been executed
needed_outputs (list of output_names) – Force the node to keep only specific outputs. By default all outputs are kept. Setting this attribute will delete any output files and directories from the node’s working directory that are not part of the needed_outputs.
run_without_submitting (boolean) – Run the node without submitting to a job engine or to a multiprocessing pool
mem_gb (int or float) – Estimate (in GB) of constant memory to allocate for this node.
mem_x (2-tuple or 3-tuple) –
(
multiplier
,input_file
) (int or float, str)(
multiplier
,input_file
,mode
) (int or float, str, str)Note This parameter (
mem_x
) is likely to change in a future release as we incorporate more factors into memory estimates.- See also⚡️ Setting data- and operation-dependent memory-estimates
GitHub epic of issues related to improving Node memory estimates based on the data and operations involved.
Multiplier for memory allocation such that
multiplier
timesmode
of 4-D file atinput_file
plusself._mem_gb
equals the total memory allocation for the node.input_file
can be a Node input string or an actual path.mode
can be any one of * ‘xyzt’ (spatial * temporal) (default if not specified) * ‘xyz’ (spatial) * ‘t’ (temporal)
- base_dir#
Define the work directory for this instance of workflow element.
- clone(name)#
Clone an EngineBase object.
- Parameters
name (string (mandatory)) – A clone of node or workflow must have a new name
- property fullname#
Build the full name down the hierarchy.
- hash_exists(updatehash=False)[source]#
Decorate the new is_cached method with hash updating to maintain backwards compatibility.
- property inputs#
Return the inputs of the underlying interface
- property interface#
Return the underlying interface object
- is_cached(rm_outdated=False)[source]#
Check if the interface has been run previously, and whether cached results are up-to-date.
- property itername#
Get the name of the expanded iterable.
- static load(filename)#
Load this workflow element from a file.
- property mem_gb#
Get estimated memory (GB)
- property mem_x#
Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only). Returns
None
if already consumed or not set.
- property n_procs#
Get the estimated number of processes/threads
- property name#
Set the unique name of this workflow element.
- property outputs#
Return the output fields of the underlying interface
- override_mem_gb(new_mem_gb)[source]#
Override the Node’s memory estimate with a new value.
- Parameters
new_mem_gb (int or float) – new memory estimate in GB
- property result#
Get result from result file (do not hold it in memory)
- run(updatehash=False)[source]#
Execute the node in its directory.
- Parameters
updatehash (boolean) – When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.
- save(filename=None)#
Store this workflow element to a file.
- class CPAC.pipeline.nipype_pipeline_engine.MapNode(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, **kwargs)[source]#
Wraps interface objects that need to be iterated on a list of inputs.
Examples
>>> from nipype import MapNode >>> from nipype.interfaces import fsl >>> realign = MapNode(fsl.MCFLIRT(), 'in_file', 'realign') >>> realign.inputs.in_file = ['functional.nii', ... 'functional2.nii', ... 'functional3.nii'] >>> realign.run()
- __init__(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, **kwargs)[source]#
- Parameters
interface (interface object) – node specific interface (fsl.Bet(), spm.Coregister())
name (alphanumeric string) – node specific name
iterables (generator) –
Input field and list to iterate using the pipeline engine for example to iterate over different frac values in fsl.Bet() for a single field the input can be a tuple, otherwise a list of tuples : :
node.iterables = (‘frac’,[0.5,0.6,0.7]) node.iterables = [(‘fwhm’,[2,4]),(‘fieldx’,[0.5,0.6,0.7])]
If this node has an itersource, then the iterables values is a dictionary which maps an iterable source field value to the target iterables field values, e.g. : : :
inputspec.iterables = (‘images’,[‘img1.nii’, ‘img2.nii’]]) node.itersource = (‘inputspec’, [‘frac’]) node.iterables = (‘frac’, {‘img1.nii’ : [0.5, 0.6],
’img2.nii’ : [0.6, 0.7]})
If this node’s synchronize flag is set, then an alternate form of the iterables is a [fields, values] list, where fields is the list of iterated fields and values is the list of value tuples for the given fields, e.g. : : :
node.synchronize = True node.iterables = [(‘frac’, ‘threshold’),
- [(0.5, True),
(0.6, False)]]
itersource (tuple) – The (name, fields) iterables source which specifies the name of the predecessor iterable node and the input fields to use from that source node. The output field values comprise the key to the iterables parameter value mapping dictionary.
synchronize (boolean) – Flag indicating whether iterables are synchronized. If the iterables are synchronized, then this iterable node is expanded once per iteration over all of the iterables values. Otherwise, this iterable node is expanded once per each permutation of the iterables values.
overwrite (Boolean) – Whether to overwrite contents of output directory if it already exists. If directory exists and hash matches it assumes that process has been executed
needed_outputs (list of output_names) – Force the node to keep only specific outputs. By default all outputs are kept. Setting this attribute will delete any output files and directories from the node’s working directory that are not part of the needed_outputs.
run_without_submitting (boolean) – Run the node without submitting to a job engine or to a multiprocessing pool
mem_gb (int or float) – Estimate (in GB) of constant memory to allocate for this node.
mem_x (2-tuple or 3-tuple) –
(
multiplier
,input_file
) (int or float, str)(
multiplier
,input_file
,mode
) (int or float, str, str)Note This parameter (
mem_x
) is likely to change in a future release as we incorporate more factors into memory estimates.- See also⚡️ Setting data- and operation-dependent memory-estimates
GitHub epic of issues related to improving Node memory estimates based on the data and operations involved.
Multiplier for memory allocation such that
multiplier
timesmode
of 4-D file atinput_file
plusself._mem_gb
equals the total memory allocation for the node.input_file
can be a Node input string or an actual path.mode
can be any one of * ‘xyzt’ (spatial * temporal) (default if not specified) * ‘xyz’ (spatial) * ‘t’ (temporal)
- base_dir#
Define the work directory for this instance of workflow element.
- clone(name)#
Clone an EngineBase object.
- Parameters
name (string (mandatory)) – A clone of node or workflow must have a new name
- property fullname#
Build the full name down the hierarchy.
- get_output(parameter)#
Retrieve a particular output of the node
- hash_exists(updatehash=False)#
Decorate the new is_cached method with hash updating to maintain backwards compatibility.
- help()#
Print interface help
- property inputs#
Return the inputs of the underlying interface
- property interface#
Return the underlying interface object
- is_cached(rm_outdated=False)#
Check if the interface has been run previously, and whether cached results are up-to-date.
- property itername#
Get the name of the expanded iterable.
- static load(filename)#
Load this workflow element from a file.
- property mem_gb#
Get estimated memory (GB)
- property mem_x#
Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only). Returns
None
if already consumed or not set.
- property n_procs#
Get the estimated number of processes/threads
- property name#
Set the unique name of this workflow element.
- output_dir()#
Return the location of the output directory for the node
- property outputs#
Return the output fields of the underlying interface
- override_mem_gb(new_mem_gb)#
Override the Node’s memory estimate with a new value.
- Parameters
new_mem_gb (int or float) – new memory estimate in GB
- property result#
Get result from result file (do not hold it in memory)
- run(updatehash=False)#
Execute the node in its directory.
- Parameters
updatehash (boolean) – When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.
- save(filename=None)#
Store this workflow element to a file.
- set_input(parameter, val)[source]#
Set interface input value or nodewrapper attribute Priority goes to interface.
- update(**opts)#
Update inputs
- class nipype.interfaces.base.core.Interface[source]#
This is an abstract definition for Interface objects.
It provides no functionality. It defines the necessary attributes and methods all Interface objects should have.
See also
- property always_run#
Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.
- property can_resume#
Defines if the interface can reuse partial results after interruption. Only applies to interfaces being run within a workflow context.
- property version#
interfaces should implement a version property
- class CPAC.utils.interfaces.function.Function(input_names=None, output_names='out', function=None, imports=None, as_module=False, **inputs)[source]#
Runs arbitrary function as an interface
Examples
>>> func = 'def func(arg1, arg2=5): return arg1 + arg2' >>> fi = Function(input_names=['arg1', 'arg2'], output_names=['out']) >>> fi.inputs.function_str = func >>> res = fi.run(arg1=1) >>> res.outputs.out 6
- __init__(input_names=None, output_names='out', function=None, imports=None, as_module=False, **inputs)[source]#
- Parameters
input_names (single str or list or None) – names corresponding to function inputs if
None
, derive input names from function argument namesoutput_names (single str or list) – names corresponding to function outputs (default: ‘out’). if list of length > 1, has to match the number of outputs
function (callable) – callable python object. must be able to execute in an isolated namespace (possibly in concert with the
imports
parameter)imports (list of strings) – list of import statements that allow the function to execute in an otherwise empty namespace
- aggregate_outputs(runtime=None, needed_outputs=None)#
Collate expected outputs and apply output traits validation.
- property always_run#
Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.
- property can_resume#
Defines if the interface can reuse partial results after interruption. Only applies to interfaces being run within a workflow context.
- classmethod help(returnhelp=False)#
Prints class help
- input_spec#
alias of
CPAC.utils.interfaces.function.function.FunctionInputSpec
- load_inputs_from_json(json_file, overwrite=True)#
A convenient way to load pre-set inputs from a JSON file.
- output_spec#
alias of
nipype.interfaces.base.specs.DynamicTraitedSpec
- run(cwd=None, ignore_exception=None, **inputs)#
Execute this interface.
This interface will not raise an exception if runtime.returncode is non-zero.
- Parameters
cwd (specify a folder where the interface should be run) –
inputs (allows the interface settings to be updated) –
- Returns
results – A copy of the instance that was executed, provenance information and, if successful, results
- Return type
nipype.interfaces.base.support.InterfaceResult
- save_inputs_to_json(json_file)#
A convenient way to save current inputs to a JSON file.
- property version#
interfaces should implement a version property
- class CPAC.pipeline.nipype_pipeline_engine.Workflow(name, base_dir=None, debug=False)[source]#
Controls the setup and execution of a pipeline of processes.
- __init__(name, base_dir=None, debug=False)[source]#
Create a workflow object. :param name: unique identifier for the workflow :type name: alphanumeric string :param base_dir: path to workflow storage :type base_dir: string, optional :param debug: enable verbose debug-level logging :type debug: boolean, optional
- add_nodes(nodes)[source]#
Add nodes to a workflow
- Parameters
nodes (list) – A list of EngineBase-based objects
- base_dir#
Define the work directory for this instance of workflow element.
- clone(name)[source]#
Clone a workflow
Note
Will reset attributes used for executing workflow. See _init_runtime_fields.
- Parameters
name (alphanumeric name) – unique name for the workflow
- connect(*args, **kwargs)[source]#
Connect nodes in the pipeline.
This routine also checks if inputs and outputs are actually provided by the nodes that are being connected.
Creates edges in the directed graph using the nodes and edges specified in the connection_list. Uses the NetworkX method DiGraph.add_edges_from.
- Parameters
args (list or a set of four positional arguments) –
Four positional arguments of the form:
connect(source, sourceoutput, dest, destinput)
source : nodewrapper node sourceoutput : string (must be in source.outputs) dest : nodewrapper node destinput : string (must be in dest.inputs)
A list of 3-tuples of the following form:
[(source, target, [('sourceoutput/attribute', 'targetinput'), ...]), ...]
Or:
[(source, target, [(('sourceoutput1', func, arg2, ...), 'targetinput'), ...]), ...] sourceoutput1 will always be the first argument to func and func will be evaluated and the results sent ot targetinput currently func needs to define all its needed imports within the function as we use the inspect module to get at the source code and execute it remotely
- export(filename=None, prefix='output', format='python', include_config=False)[source]#
Export object into a different format
- Parameters
filename (string) – file to save the code to; overrides prefix
prefix (string) – prefix to use for output file
format (string) – one of “python”
include_config (boolean) – whether to include node and workflow config values
- property fullname#
Build the full name down the hierarchy.
- property itername#
Get the name of the expanded iterable.
- static load(filename)#
Load this workflow element from a file.
- property name#
Set the unique name of this workflow element.
- remove_nodes(nodes)[source]#
Remove nodes from a workflow
- Parameters
nodes (list) – A list of EngineBase-based objects
- run(plugin=None, plugin_args=None, updatehash=False)[source]#
Execute the workflow
- Parameters
plugin (plugin name or object) – Plugin to use for execution. You can create your own plugins for execution.
plugin_args (dictionary containing arguments to be sent to plugin) – constructor. see individual plugin doc strings for details.
- save(filename=None)#
Store this workflow element to a file.
- write_graph(dotfilename='graph.dot', graph2use='hierarchical', format='png', simple_form=True)[source]#
Generates a graphviz dot file and a png file
- Parameters
graph2use ('orig', 'hierarchical' (default), 'flat', 'exec', 'colored') – orig - creates a top level graph without expanding internal workflow nodes; flat - expands workflow nodes recursively; hierarchical - expands workflow nodes recursively with a notion on hierarchy; colored - expands workflow nodes recursively with a notion on hierarchy in color; exec - expands workflows to depict iterables
format ('png', 'svg') –
simple_form (boolean (default: True)) – Determines if the node name used in the graph should be of the form ‘nodename (package)’ when True or ‘nodename.Class.package’ when False.
The Nipype utility function log_to_dict
reads a log file generated by log_nodes_cb
to a Python dictionary.
- CPAC.utils.monitoring.log_nodes_cb(node, status)[source]#
Function to record node run statistics to a log file as json dictionaries
- Parameters
node (nipype.pipeline.engine.Node) – the node being logged
status (string) – acceptable values are ‘start’, ‘end’; otherwise it is considered and error
- Returns
this function does not return any values, it logs the node status info to the callback logger
- Return type
None
Module to draw an html gantt chart from logfile produced by
CPAC.utils.monitoring.log_nodes_cb()
.
See https://nipype.readthedocs.io/en/latest/api/generated/nipype.utils.draw_gantt_chart.html
- CPAC.utils.monitoring.draw_gantt_chart.calculate_resource_timeseries(events, resource)[source]#
Given as event dictionary, calculate the resources used as a timeseries
- Parameters
events (dictionary) – a dictionary of event-based node dictionaries of the workflow execution statistics
resource (string) – the resource of interest to return the time-series of; e.g. ‘runtime_memory_gb’, ‘estimated_threads’, etc
- Returns
time_series – a pandas Series object that contains timestamps as the indices and the resource amount as values
- Return type
pandas Series
- CPAC.utils.monitoring.draw_gantt_chart.create_event_dict(start_time, nodes_list)[source]#
Function to generate a dictionary of event (start/finish) nodes from the nodes list
- Parameters
start_time (datetime.datetime) – a datetime object of the pipeline start time
nodes_list (list) – a list of the node dictionaries that were run in the pipeline
- Returns
events – a dictionary where the key is the timedelta from the start of the pipeline execution to the value node it accompanies
- Return type
dictionary
- CPAC.utils.monitoring.draw_gantt_chart.draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, colors)[source]#
Function to return the html-string of the node drawings for the gantt chart
- Parameters
start (datetime.datetime obj) – start time for first node
nodes_list (list) – a list of the node dictionaries
cores (integer) – the number of cores given to the workflow via the ‘n_procs’ plugin arg
total_duration (float) – total duration of the workflow execution (in seconds)
minute_scale (integer) – the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish
space_between_minutes (integer) – scale factor in pixel spacing between minute line markers
colors (list) – a list of colors to choose from when coloring the nodes in the gantt chart
- Returns
result – the html-formatted string for producing the minutes-based time line markers
- Return type
string
- CPAC.utils.monitoring.draw_gantt_chart.generate_gantt_chart(logfile, cores, minute_scale=10, space_between_minutes=50, colors=['#7070FF', '#4E4EB2', '#2D2D66', '#9B9BFF'])[source]#
Generates a gantt chart in html showing the workflow execution based on a callback log file. This script was intended to be used with the MultiprocPlugin. The following code shows how to set up the workflow in order to generate the log file:
- Parameters
logfile (string) – filepath to the callback log file to plot the gantt chart of
cores (integer) – the number of cores given to the workflow via the ‘n_procs’ plugin arg
minute_scale (integer (optional); default=10) – the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish
space_between_minutes (integer (optional); default=50) – scale factor in pixel spacing between minute line markers
colors (list (optional)) – a list of colors to choose from when coloring the nodes in the gantt chart
- Returns
None – the function does not return any value but writes out an html file in the same directory as the callback log path passed in
Usage
—–
# import logging
# import logging.handlers
# from nipype.utils.profiler import log_nodes_cb
# log_filename = ‘callback.log’
# logger = logging.getLogger(‘callback’)
# logger.setLevel(logging.DEBUG)
# handler = logging.FileHandler(log_filename)
# logger.addHandler(handler)
# #create workflow
# workflow = …
# workflow.run(plugin=’MultiProc’,
# plugin_args={‘n_procs’ (8, ‘memory’:12, ‘status_callback’: log_nodes_cb}))
# generate_gantt_chart(‘callback.log’, 8)
- CPAC.utils.monitoring.draw_gantt_chart.resource_overusage_report(cblog)[source]#
Function to parse through a callback log for memory and/or thread usage above estimates / limits.
- Parameters
cblog (str) – path to callback.log
- Returns
text_report (str)
excessive (dict)
- CPAC.utils.monitoring.draw_gantt_chart.resource_report(callback_log, num_cores, logger=None)[source]#
Function to attempt to warn any excessive resource usage and generate an interactive HTML chart.
- Parameters
callback_log (str) – path to callback.log
num_cores (int) –
logger (Logger) – https://docs.python.org/3/library/logging.html#logger-objects
- Returns
- Return type
None