Nodes

Warning

A Nipype Node has an initialization parameter mem_gb that differs from the commandline option --mem_gb. While the commandline option is a limit, the Node initialization parameter is an estimate of the most memory that Node will consume when run. The Node parameter is not a limit; rather, this value is used to allocate system resources at runtime.

Conversely, the commandline option --n_cpus is a limit and the Node initialization parameter n_procs is also a limit of the maximum number of threads a Node will be permmitted to consume.

C-PAC automatically creates a JSON-like file called callback.log (via the function log_nodes_cb) when running. This file includes for each Node:

  • estimated memory,

  • memory usage observed at runtime,

  • specified maximum number of threads per Node, and

  • threads used at runtime.

The Nipype utility function log_to_dict reads a log file generated by log_nodes_cb to a Python dictionary.

When a developer creates or modifies a Node in C-PAC, a mem_gb and n_procs argument should be provided unless the respective defaults of 0.2 and None (number of available system cores) are expected to be sufficient. When testing, the mem_gb and n_procs arguments should be adjusted if the observed memory and/or thread usage of a Node exceeds the estimate.

class CPAC.pipeline.nipype_pipeline_engine.Node(interface, name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, *, mem_gb=2.0, **kwargs)[source]

Wraps interface objects for use in pipeline

A Node creates a sandbox-like directory for executing the underlying interface. It will copy or link inputs into this directory to ensure that input data are not overwritten. A hash of the input state is used to determine if the Node inputs have changed and whether the node needs to be re-executed.

Examples

>>> from nipype import Node
>>> from nipype.interfaces import spm
>>> realign = Node(spm.Realign(), 'realign')
>>> realign.inputs.in_files = 'functional.nii'  
>>> realign.inputs.register_to_mean = True
>>> realign.run() 
Attributes
fullname

Build the full name down the hierarchy.

inputs

Return the inputs of the underlying interface

interface

Return the underlying interface object

itername

Get the name of the expanded iterable.

mem_gb

Get estimated memory (GB)

n_procs

Get the estimated number of processes/threads

name

Set the unique name of this workflow element.

needed_outputs
outputs

Return the output fields of the underlying interface

result

Get result from result file (do not hold it in memory)

Methods

clone(name)

Clone an EngineBase object.

get_output(parameter)

Retrieve a particular output of the node

hash_exists([updatehash])

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()

Print interface help

is_cached([rm_outdated])

Check if the interface has been run previously, and whether cached results are up-to-date.

load(filename)

Load this workflow element from a file.

output_dir()

Return the location of the output directory for the node

run([updatehash])

Execute the node in its directory.

save([filename])

Store this workflow element to a file.

set_input(parameter, val)

Set interface input value

update(**opts)

Update inputs

base_dir

Define the work directory for this instance of workflow element.

clone(name)

Clone an EngineBase object.

Parameters
namestring (mandatory)

A clone of node or workflow must have a new name

property fullname

Build the full name down the hierarchy.

get_output(parameter)[source]

Retrieve a particular output of the node

hash_exists(updatehash=False)[source]

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()[source]

Print interface help

property inputs

Return the inputs of the underlying interface

property interface

Return the underlying interface object

is_cached(rm_outdated=False)[source]

Check if the interface has been run previously, and whether cached results are up-to-date.

property itername

Get the name of the expanded iterable.

static load(filename)

Load this workflow element from a file.

property mem_gb

Get estimated memory (GB)

property n_procs

Get the estimated number of processes/threads

property name

Set the unique name of this workflow element.

output_dir()[source]

Return the location of the output directory for the node

property outputs

Return the output fields of the underlying interface

property result

Get result from result file (do not hold it in memory)

run(updatehash=False)[source]

Execute the node in its directory.

Parameters
updatehash: boolean

When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.

save(filename=None)

Store this workflow element to a file.

set_input(parameter, val)[source]

Set interface input value

update(**opts)[source]

Update inputs

class CPAC.pipeline.nipype_pipeline_engine.MapNode(interface, iterfield, name, serial=False, nested=False, **kwargs)[source]

mem_gb=2.0

n_procs=1

Wraps interface objects that need to be iterated on a list of inputs.

Examples

>>> from nipype import MapNode
>>> from nipype.interfaces import fsl
>>> realign = MapNode(fsl.MCFLIRT(), 'in_file', 'realign')
>>> realign.inputs.in_file = ['functional.nii',
...                           'functional2.nii',
...                           'functional3.nii']  
>>> realign.run() 
Attributes
fullname

Build the full name down the hierarchy.

inputs

Return the inputs of the underlying interface

interface

Return the underlying interface object

itername

Get the name of the expanded iterable.

mem_gb

Get estimated memory (GB)

n_procs

Get the estimated number of processes/threads

name

Set the unique name of this workflow element.

needed_outputs
outputs

Return the output fields of the underlying interface

result

Get result from result file (do not hold it in memory)

Methods

clone(name)

Clone an EngineBase object.

get_output(parameter)

Retrieve a particular output of the node

get_subnodes()

Generate subnodes of a mapnode and write pre-execution report

hash_exists([updatehash])

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()

Print interface help

is_cached([rm_outdated])

Check if the interface has been run previously, and whether cached results are up-to-date.

load(filename)

Load this workflow element from a file.

num_subnodes()

Get the number of subnodes to iterate in this MapNode

output_dir()

Return the location of the output directory for the node

run([updatehash])

Execute the node in its directory.

save([filename])

Store this workflow element to a file.

set_input(parameter, val)

Set interface input value or nodewrapper attribute Priority goes to interface.

update(**opts)

Update inputs

base_dir

Define the work directory for this instance of workflow element.

clone(name)

Clone an EngineBase object.

Parameters
namestring (mandatory)

A clone of node or workflow must have a new name

property fullname

Build the full name down the hierarchy.

get_output(parameter)

Retrieve a particular output of the node

get_subnodes()[source]

Generate subnodes of a mapnode and write pre-execution report

hash_exists(updatehash=False)

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()

Print interface help

property inputs

Return the inputs of the underlying interface

property interface

Return the underlying interface object

is_cached(rm_outdated=False)

Check if the interface has been run previously, and whether cached results are up-to-date.

property itername

Get the name of the expanded iterable.

static load(filename)

Load this workflow element from a file.

property mem_gb

Get estimated memory (GB)

property n_procs

Get the estimated number of processes/threads

property name

Set the unique name of this workflow element.

num_subnodes()[source]

Get the number of subnodes to iterate in this MapNode

output_dir()

Return the location of the output directory for the node

property outputs

Return the output fields of the underlying interface

property result

Get result from result file (do not hold it in memory)

run(updatehash=False)

Execute the node in its directory.

Parameters
updatehash: boolean

When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.

save(filename=None)

Store this workflow element to a file.

set_input(parameter, val)[source]

Set interface input value or nodewrapper attribute Priority goes to interface.

update(**opts)

Update inputs

CPAC.utils.monitoring.log_nodes_cb(node, status)[source]

Function to record node run statistics to a log file as json dictionaries

Parameters
nodenipype.pipeline.engine.Node

the node being logged

statusstring

acceptable values are ‘start’, ‘end’; otherwise it is considered and error

Returns
None

this function does not return any values, it logs the node status info to the callback logger

draw_gantt_chart

Module to draw an html gantt chart from logfile produced by CPAC.utils.monitoring.log_nodes_cb().

See https://nipype.readthedocs.io/en/latest/api/generated/nipype.utils.draw_gantt_chart.html

CPAC.utils.monitoring.draw_gantt_chart.calculate_resource_timeseries(events, resource)[source]

Given as event dictionary, calculate the resources used as a timeseries

Parameters
eventsdictionary

a dictionary of event-based node dictionaries of the workflow execution statistics

resourcestring

the resource of interest to return the time-series of; e.g. ‘runtime_memory_gb’, ‘estimated_threads’, etc

Returns
time_seriespandas Series

a pandas Series object that contains timestamps as the indices and the resource amount as values

CPAC.utils.monitoring.draw_gantt_chart.create_event_dict(start_time, nodes_list)[source]

Function to generate a dictionary of event (start/finish) nodes from the nodes list

Parameters
start_timedatetime.datetime

a datetime object of the pipeline start time

nodes_listlist

a list of the node dictionaries that were run in the pipeline

Returns
eventsdictionary

a dictionary where the key is the timedelta from the start of the pipeline execution to the value node it accompanies

CPAC.utils.monitoring.draw_gantt_chart.draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, colors)[source]

Function to return the html-string of the node drawings for the gantt chart

Parameters
startdatetime.datetime obj

start time for first node

nodes_listlist

a list of the node dictionaries

coresinteger

the number of cores given to the workflow via the ‘n_procs’ plugin arg

total_durationfloat

total duration of the workflow execution (in seconds)

minute_scaleinteger

the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish

space_between_minutesinteger

scale factor in pixel spacing between minute line markers

colorslist

a list of colors to choose from when coloring the nodes in the gantt chart

Returns
resultstring

the html-formatted string for producing the minutes-based time line markers

CPAC.utils.monitoring.draw_gantt_chart.generate_gantt_chart(logfile, cores, minute_scale=10, space_between_minutes=50, colors=['#7070FF', '#4E4EB2', '#2D2D66', '#9B9BFF'])[source]

Generates a gantt chart in html showing the workflow execution based on a callback log file. This script was intended to be used with the MultiprocPlugin. The following code shows how to set up the workflow in order to generate the log file:

Parameters
logfilestring

filepath to the callback log file to plot the gantt chart of

coresinteger

the number of cores given to the workflow via the ‘n_procs’ plugin arg

minute_scaleinteger (optional); default=10

the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish

space_between_minutesinteger (optional); default=50

scale factor in pixel spacing between minute line markers

colorslist (optional)

a list of colors to choose from when coloring the nodes in the gantt chart

Returns
None

the function does not return any value but writes out an html file in the same directory as the callback log path passed in

CPAC.utils.monitoring.draw_gantt_chart.resource_overusage_report(cblog)[source]

Function to parse through a callback log for memory and/or thread usage above estimates / limits.

Parameters
cblog: str

path to callback.log

Returns
text_report: str
excessive: dict
CPAC.utils.monitoring.draw_gantt_chart.resource_report(callback_log, num_cores, logger=None)[source]

Function to attempt to warn any excessive resource usage and generate an interactive HTML chart.

Parameters
callback_log: str

path to callback.log

num_cores: int
logger: Logger

https://docs.python.org/3/library/logging.html#logger-objects

Returns
None