Nodes¶
Warning
A Nipype Node has an initialization parameter mem_gb that differs from the commandline option --mem_gb. While the commandline option is a limit, the Node initialization parameter is an estimate of the most memory that Node will consume when run. The Node parameter is not a limit; rather, this value is used to allocate system resources at runtime.
Conversely, the commandline option --n_cpus is a limit and the Node initialization parameter n_procs is also a limit of the maximum number of threads a Node will be permmitted to consume.
C-PAC automatically creates a JSON-like file called callback.log (via the function log_nodes_cb) when running. This file includes for each Node:
estimated memory,
memory usage observed at runtime,
specified maximum number of threads per Node, and
threads used at runtime.
The Nipype utility function log_to_dict reads a log file generated by log_nodes_cb to a Python dictionary.
When a developer creates or modifies a Node in C-PAC, a mem_gb and n_procs argument should be provided unless the respective defaults of 0.2 and None (number of available system cores) are expected to be sufficient. When testing, the mem_gb and n_procs arguments should be adjusted if the observed memory and/or thread usage of a Node exceeds the estimate.
-
class
CPAC.pipeline.nipype_pipeline_engine.Node(interface, name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, *, mem_gb=2.0, **kwargs)[source]¶ Wraps interface objects for use in pipeline
A Node creates a sandbox-like directory for executing the underlying interface. It will copy or link inputs into this directory to ensure that input data are not overwritten. A hash of the input state is used to determine if the Node inputs have changed and whether the node needs to be re-executed.
Examples
>>> from nipype import Node >>> from nipype.interfaces import spm >>> realign = Node(spm.Realign(), 'realign') >>> realign.inputs.in_files = 'functional.nii' >>> realign.inputs.register_to_mean = True >>> realign.run()
- Attributes
fullnameBuild the full name down the hierarchy.
inputsReturn the inputs of the underlying interface
interfaceReturn the underlying interface object
iternameGet the name of the expanded iterable.
mem_gbGet estimated memory (GB)
n_procsGet the estimated number of processes/threads
nameSet the unique name of this workflow element.
- needed_outputs
outputsReturn the output fields of the underlying interface
resultGet result from result file (do not hold it in memory)
Methods
clone(name)Clone an EngineBase object.
get_output(parameter)Retrieve a particular output of the node
hash_exists([updatehash])Decorate the new is_cached method with hash updating to maintain backwards compatibility.
help()Print interface help
is_cached([rm_outdated])Check if the interface has been run previously, and whether cached results are up-to-date.
load(filename)Load this workflow element from a file.
Return the location of the output directory for the node
run([updatehash])Execute the node in its directory.
save([filename])Store this workflow element to a file.
set_input(parameter, val)Set interface input value
update(**opts)Update inputs
-
base_dir¶ Define the work directory for this instance of workflow element.
-
clone(name)¶ Clone an EngineBase object.
- Parameters
- namestring (mandatory)
A clone of node or workflow must have a new name
-
property
fullname¶ Build the full name down the hierarchy.
-
hash_exists(updatehash=False)[source]¶ Decorate the new is_cached method with hash updating to maintain backwards compatibility.
-
property
inputs¶ Return the inputs of the underlying interface
-
property
interface¶ Return the underlying interface object
-
is_cached(rm_outdated=False)[source]¶ Check if the interface has been run previously, and whether cached results are up-to-date.
-
property
itername¶ Get the name of the expanded iterable.
-
static
load(filename)¶ Load this workflow element from a file.
-
property
mem_gb¶ Get estimated memory (GB)
-
property
n_procs¶ Get the estimated number of processes/threads
-
property
name¶ Set the unique name of this workflow element.
-
property
outputs¶ Return the output fields of the underlying interface
-
property
result¶ Get result from result file (do not hold it in memory)
-
run(updatehash=False)[source]¶ Execute the node in its directory.
- Parameters
- updatehash: boolean
When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.
-
save(filename=None)¶ Store this workflow element to a file.
-
class
CPAC.pipeline.nipype_pipeline_engine.MapNode(interface, iterfield, name, serial=False, nested=False, **kwargs)[source]¶ mem_gb=2.0
n_procs=1
Wraps interface objects that need to be iterated on a list of inputs.
Examples
>>> from nipype import MapNode >>> from nipype.interfaces import fsl >>> realign = MapNode(fsl.MCFLIRT(), 'in_file', 'realign') >>> realign.inputs.in_file = ['functional.nii', ... 'functional2.nii', ... 'functional3.nii'] >>> realign.run()
- Attributes
fullnameBuild the full name down the hierarchy.
inputsReturn the inputs of the underlying interface
interfaceReturn the underlying interface object
iternameGet the name of the expanded iterable.
mem_gbGet estimated memory (GB)
n_procsGet the estimated number of processes/threads
nameSet the unique name of this workflow element.
- needed_outputs
outputsReturn the output fields of the underlying interface
resultGet result from result file (do not hold it in memory)
Methods
clone(name)Clone an EngineBase object.
get_output(parameter)Retrieve a particular output of the node
Generate subnodes of a mapnode and write pre-execution report
hash_exists([updatehash])Decorate the new is_cached method with hash updating to maintain backwards compatibility.
help()Print interface help
is_cached([rm_outdated])Check if the interface has been run previously, and whether cached results are up-to-date.
load(filename)Load this workflow element from a file.
Get the number of subnodes to iterate in this MapNode
Return the location of the output directory for the node
run([updatehash])Execute the node in its directory.
save([filename])Store this workflow element to a file.
set_input(parameter, val)Set interface input value or nodewrapper attribute Priority goes to interface.
update(**opts)Update inputs
-
base_dir¶ Define the work directory for this instance of workflow element.
-
clone(name)¶ Clone an EngineBase object.
- Parameters
- namestring (mandatory)
A clone of node or workflow must have a new name
-
property
fullname¶ Build the full name down the hierarchy.
-
get_output(parameter)¶ Retrieve a particular output of the node
-
hash_exists(updatehash=False)¶ Decorate the new is_cached method with hash updating to maintain backwards compatibility.
-
help()¶ Print interface help
-
property
inputs¶ Return the inputs of the underlying interface
-
property
interface¶ Return the underlying interface object
-
is_cached(rm_outdated=False)¶ Check if the interface has been run previously, and whether cached results are up-to-date.
-
property
itername¶ Get the name of the expanded iterable.
-
static
load(filename)¶ Load this workflow element from a file.
-
property
mem_gb¶ Get estimated memory (GB)
-
property
n_procs¶ Get the estimated number of processes/threads
-
property
name¶ Set the unique name of this workflow element.
-
output_dir()¶ Return the location of the output directory for the node
-
property
outputs¶ Return the output fields of the underlying interface
-
property
result¶ Get result from result file (do not hold it in memory)
-
run(updatehash=False)¶ Execute the node in its directory.
- Parameters
- updatehash: boolean
When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.
-
save(filename=None)¶ Store this workflow element to a file.
-
set_input(parameter, val)[source]¶ Set interface input value or nodewrapper attribute Priority goes to interface.
-
update(**opts)¶ Update inputs
-
CPAC.utils.monitoring.log_nodes_cb(node, status)[source]¶ Function to record node run statistics to a log file as json dictionaries
- Parameters
- nodenipype.pipeline.engine.Node
the node being logged
- statusstring
acceptable values are ‘start’, ‘end’; otherwise it is considered and error
- Returns
- None
this function does not return any values, it logs the node status info to the callback logger
draw_gantt_chart¶
Module to draw an html gantt chart from logfile produced by
CPAC.utils.monitoring.log_nodes_cb().
See https://nipype.readthedocs.io/en/latest/api/generated/nipype.utils.draw_gantt_chart.html
-
CPAC.utils.monitoring.draw_gantt_chart.calculate_resource_timeseries(events, resource)[source]¶ Given as event dictionary, calculate the resources used as a timeseries
- Parameters
- eventsdictionary
a dictionary of event-based node dictionaries of the workflow execution statistics
- resourcestring
the resource of interest to return the time-series of; e.g. ‘runtime_memory_gb’, ‘estimated_threads’, etc
- Returns
- time_seriespandas Series
a pandas Series object that contains timestamps as the indices and the resource amount as values
-
CPAC.utils.monitoring.draw_gantt_chart.create_event_dict(start_time, nodes_list)[source]¶ Function to generate a dictionary of event (start/finish) nodes from the nodes list
- Parameters
- start_timedatetime.datetime
a datetime object of the pipeline start time
- nodes_listlist
a list of the node dictionaries that were run in the pipeline
- Returns
- eventsdictionary
a dictionary where the key is the timedelta from the start of the pipeline execution to the value node it accompanies
-
CPAC.utils.monitoring.draw_gantt_chart.draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, colors)[source]¶ Function to return the html-string of the node drawings for the gantt chart
- Parameters
- startdatetime.datetime obj
start time for first node
- nodes_listlist
a list of the node dictionaries
- coresinteger
the number of cores given to the workflow via the ‘n_procs’ plugin arg
- total_durationfloat
total duration of the workflow execution (in seconds)
- minute_scaleinteger
the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish
- space_between_minutesinteger
scale factor in pixel spacing between minute line markers
- colorslist
a list of colors to choose from when coloring the nodes in the gantt chart
- Returns
- resultstring
the html-formatted string for producing the minutes-based time line markers
-
CPAC.utils.monitoring.draw_gantt_chart.generate_gantt_chart(logfile, cores, minute_scale=10, space_between_minutes=50, colors=['#7070FF', '#4E4EB2', '#2D2D66', '#9B9BFF'])[source]¶ Generates a gantt chart in html showing the workflow execution based on a callback log file. This script was intended to be used with the MultiprocPlugin. The following code shows how to set up the workflow in order to generate the log file:
- Parameters
- logfilestring
filepath to the callback log file to plot the gantt chart of
- coresinteger
the number of cores given to the workflow via the ‘n_procs’ plugin arg
- minute_scaleinteger (optional); default=10
the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish
- space_between_minutesinteger (optional); default=50
scale factor in pixel spacing between minute line markers
- colorslist (optional)
a list of colors to choose from when coloring the nodes in the gantt chart
- Returns
- None
the function does not return any value but writes out an html file in the same directory as the callback log path passed in
-
CPAC.utils.monitoring.draw_gantt_chart.resource_overusage_report(cblog)[source]¶ Function to parse through a callback log for memory and/or thread usage above estimates / limits.
- Parameters
- cblog: str
path to callback.log
- Returns
- text_report: str
- excessive: dict
-
CPAC.utils.monitoring.draw_gantt_chart.resource_report(callback_log, num_cores, logger=None)[source]¶ Function to attempt to warn any excessive resource usage and generate an interactive HTML chart.
- Parameters
- callback_log: str
path to callback.log
- num_cores: int
- logger: Logger
https://docs.python.org/3/library/logging.html#logger-objects
- Returns
- None