Nodes

A Nipype Node has an initialization parameter mem_gb that differs from the commandline option --mem_gb. While the commandline option is a limit, the Node initialization parameter is an estimate of the most memory that Node will consume when run. The Node parameter is not a limit; rather, this value is used to allocate system resources at runtime.

Conversely, the commandline option --n_cpus is a limit and the Node initialization parameter n_procs is also a limit of the maximum number of threads a Node will be permmitted to consume.

C-PAC automatically creates a JSON-like file called callback.log (via the function log_nodes_cb) when running. This file includes for each Node:

  • estimated memory,

  • memory usage observed at runtime,

  • specified maximum number of threads per Node, and

  • threads used at runtime.

When a developer creates or modifies a Node in C-PAC, a mem_gb and n_procs argument should be provided unless the respective defaults of 0.2 and None (number of available system cores) are expected to be sufficient. When testing, the mem_gb and n_procs arguments should be adjusted if the observed memory and/or thread usage of a Node exceeds the estimate.

For nodes that will use a varying amount of memory depending on the node’s input data, the optional parameter mem_x takes a tuple of (memory multiplier, input file, multiplier mode) where memory multiplier is a number and input file is the string name of the Node input to multiply such that the memory estimate returned by the mem_gb attribute is the mem_gb argument plus memory multiplier times the dimensions of the input file as specified in the multiplier mode (xyzt (spatial × temporal; default), xyz (spatial), or just t (temporal)).

Note

mem_x is a new parameter in C-PAC v1.8.1 and subject to change in future releases as we continue to develop methods for setting data- and operation-dependent memory estimates.

class CPAC.pipeline.nipype_pipeline_engine.Node(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, *, mem_x, **kwargs)[source]

Wraps interface objects for use in pipeline

A Node creates a sandbox-like directory for executing the underlying interface. It will copy or link inputs into this directory to ensure that input data are not overwritten. A hash of the input state is used to determine if the Node inputs have changed and whether the node needs to be re-executed.

Examples

>>> from nipype import Node
>>> from nipype.interfaces import spm
>>> realign = Node(spm.Realign(), 'realign')
>>> realign.inputs.in_files = 'functional.nii'  
>>> realign.inputs.register_to_mean = True
>>> realign.run() 
Attributes:
fullname

Build the full name down the hierarchy.

inputs

Return the inputs of the underlying interface

interface

Return the underlying interface object

itername

Get the name of the expanded iterable.

mem_gb

Get estimated memory (GB)

mem_x

Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only).

n_procs

Get the estimated number of processes/threads

name

Set the unique name of this workflow element.

needed_outputs
outputs

Return the output fields of the underlying interface

result

Get result from result file (do not hold it in memory)

Methods

clone(name)

Clone an EngineBase object.

get_output(parameter)

Retrieve a particular output of the node

hash_exists([updatehash])

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()

Print interface help

is_cached([rm_outdated])

Check if the interface has been run previously, and whether cached results are up-to-date.

load(filename)

Load this workflow element from a file.

output_dir()

Return the location of the output directory for the node

run([updatehash])

Execute the node in its directory.

save([filename])

Store this workflow element to a file.

set_input(parameter, val)

Set interface input value

update(**opts)

Update inputs

__init__(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, *, mem_x, **kwargs)[source]
Parameters:
interfaceinterface object

node specific interface (fsl.Bet(), spm.Coregister())

namealphanumeric string

node specific name

iterablesgenerator

Input field and list to iterate using the pipeline engine for example to iterate over different frac values in fsl.Bet() for a single field the input can be a tuple, otherwise a list of tuples : :

node.iterables = (‘frac’,[0.5,0.6,0.7]) node.iterables = [(‘fwhm’,[2,4]),(‘fieldx’,[0.5,0.6,0.7])]

If this node has an itersource, then the iterables values is a dictionary which maps an iterable source field value to the target iterables field values, e.g. : : :

inputspec.iterables = (‘images’,[‘img1.nii’, ‘img2.nii’]]) node.itersource = (‘inputspec’, [‘frac’]) node.iterables = (‘frac’, {‘img1.nii’ : [0.5, 0.6],

‘img2.nii’ : [0.6, 0.7]})

If this node’s synchronize flag is set, then an alternate form of the iterables is a [fields, values] list, where fields is the list of iterated fields and values is the list of value tuples for the given fields, e.g. : : :

node.synchronize = True node.iterables = [(‘frac’, ‘threshold’),

[(0.5, True),

(0.6, False)]]

itersourcetuple

The (name, fields) iterables source which specifies the name of the predecessor iterable node and the input fields to use from that source node. The output field values comprise the key to the iterables parameter value mapping dictionary.

synchronizeboolean

Flag indicating whether iterables are synchronized. If the iterables are synchronized, then this iterable node is expanded once per iteration over all of the iterables values. Otherwise, this iterable node is expanded once per each permutation of the iterables values.

overwriteBoolean

Whether to overwrite contents of output directory if it already exists. If directory exists and hash matches it assumes that process has been executed

needed_outputslist of output_names

Force the node to keep only specific outputs. By default all outputs are kept. Setting this attribute will delete any output files and directories from the node’s working directory that are not part of the needed_outputs.

run_without_submittingboolean

Run the node without submitting to a job engine or to a multiprocessing pool

mem_gbint or float

Estimate (in GB) of constant memory to allocate for this node.

mem_x2-tuple or 3-tuple

(multiplier, input_file) (int or float, str)

(multiplier, input_file, mode) (int or float, str, str)

Note This parameter (mem_x) is likely to change in a future release as we incorporate more factors into memory estimates.

See also⚡️ Setting data- and operation-dependent memory-estimates

GitHub epic of issues related to improving Node memory estimates based on the data and operations involved.

Multiplier for memory allocation such that multiplier times mode of 4-D file at input_file plus self._mem_gb equals the total memory allocation for the node. input_file can be a Node input string or an actual path.

mode can be any one of * ‘xyzt’ (spatial * temporal) (default if not specified) * ‘xyz’ (spatial) * ‘t’ (temporal)

base_dir

Define the work directory for this instance of workflow element.

clone(name)

Clone an EngineBase object.

Parameters:
namestring (mandatory)

A clone of node or workflow must have a new name

property fullname

Build the full name down the hierarchy.

get_output(parameter)[source]

Retrieve a particular output of the node

hash_exists(updatehash=False)[source]

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()[source]

Print interface help

property inputs

Return the inputs of the underlying interface

property interface

Return the underlying interface object

is_cached(rm_outdated=False)[source]

Check if the interface has been run previously, and whether cached results are up-to-date.

property itername

Get the name of the expanded iterable.

static load(filename)

Load this workflow element from a file.

property mem_gb

Get estimated memory (GB)

property mem_x

Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only). Returns None if already consumed or not set.

property n_procs

Get the estimated number of processes/threads

property name

Set the unique name of this workflow element.

output_dir()[source]

Return the location of the output directory for the node

property outputs

Return the output fields of the underlying interface

property result

Get result from result file (do not hold it in memory)

run(updatehash=False)[source]

Execute the node in its directory.

Parameters:
updatehash: boolean

When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.

save(filename=None)

Store this workflow element to a file.

set_input(parameter, val)[source]

Set interface input value

update(**opts)[source]

Update inputs

class CPAC.pipeline.nipype_pipeline_engine.MapNode(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, **kwargs)[source]

Wraps interface objects that need to be iterated on a list of inputs.

Examples

>>> from nipype import MapNode
>>> from nipype.interfaces import fsl
>>> realign = MapNode(fsl.MCFLIRT(), 'in_file', 'realign')
>>> realign.inputs.in_file = ['functional.nii',
...                           'functional2.nii',
...                           'functional3.nii']  
>>> realign.run() 
Attributes:
fullname

Build the full name down the hierarchy.

inputs

Return the inputs of the underlying interface

interface

Return the underlying interface object

itername

Get the name of the expanded iterable.

mem_gb

Get estimated memory (GB)

mem_x

Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only).

n_procs

Get the estimated number of processes/threads

name

Set the unique name of this workflow element.

needed_outputs
outputs

Return the output fields of the underlying interface

result

Get result from result file (do not hold it in memory)

Methods

clone(name)

Clone an EngineBase object.

get_output(parameter)

Retrieve a particular output of the node

get_subnodes()

Generate subnodes of a mapnode and write pre-execution report

hash_exists([updatehash])

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()

Print interface help

is_cached([rm_outdated])

Check if the interface has been run previously, and whether cached results are up-to-date.

load(filename)

Load this workflow element from a file.

num_subnodes()

Get the number of subnodes to iterate in this MapNode

output_dir()

Return the location of the output directory for the node

run([updatehash])

Execute the node in its directory.

save([filename])

Store this workflow element to a file.

set_input(parameter, val)

Set interface input value or nodewrapper attribute Priority goes to interface.

update(**opts)

Update inputs

__init__(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, **kwargs)[source]
Parameters:
interfaceinterface object

node specific interface (fsl.Bet(), spm.Coregister())

namealphanumeric string

node specific name

iterablesgenerator

Input field and list to iterate using the pipeline engine for example to iterate over different frac values in fsl.Bet() for a single field the input can be a tuple, otherwise a list of tuples : :

node.iterables = (‘frac’,[0.5,0.6,0.7]) node.iterables = [(‘fwhm’,[2,4]),(‘fieldx’,[0.5,0.6,0.7])]

If this node has an itersource, then the iterables values is a dictionary which maps an iterable source field value to the target iterables field values, e.g. : : :

inputspec.iterables = (‘images’,[‘img1.nii’, ‘img2.nii’]]) node.itersource = (‘inputspec’, [‘frac’]) node.iterables = (‘frac’, {‘img1.nii’ : [0.5, 0.6],

‘img2.nii’ : [0.6, 0.7]})

If this node’s synchronize flag is set, then an alternate form of the iterables is a [fields, values] list, where fields is the list of iterated fields and values is the list of value tuples for the given fields, e.g. : : :

node.synchronize = True node.iterables = [(‘frac’, ‘threshold’),

[(0.5, True),

(0.6, False)]]

itersourcetuple

The (name, fields) iterables source which specifies the name of the predecessor iterable node and the input fields to use from that source node. The output field values comprise the key to the iterables parameter value mapping dictionary.

synchronizeboolean

Flag indicating whether iterables are synchronized. If the iterables are synchronized, then this iterable node is expanded once per iteration over all of the iterables values. Otherwise, this iterable node is expanded once per each permutation of the iterables values.

overwriteBoolean

Whether to overwrite contents of output directory if it already exists. If directory exists and hash matches it assumes that process has been executed

needed_outputslist of output_names

Force the node to keep only specific outputs. By default all outputs are kept. Setting this attribute will delete any output files and directories from the node’s working directory that are not part of the needed_outputs.

run_without_submittingboolean

Run the node without submitting to a job engine or to a multiprocessing pool

mem_gbint or float

Estimate (in GB) of constant memory to allocate for this node.

mem_x2-tuple or 3-tuple

(multiplier, input_file) (int or float, str)

(multiplier, input_file, mode) (int or float, str, str)

Note This parameter (mem_x) is likely to change in a future release as we incorporate more factors into memory estimates.

See also⚡️ Setting data- and operation-dependent memory-estimates

GitHub epic of issues related to improving Node memory estimates based on the data and operations involved.

Multiplier for memory allocation such that multiplier times mode of 4-D file at input_file plus self._mem_gb equals the total memory allocation for the node. input_file can be a Node input string or an actual path.

mode can be any one of * ‘xyzt’ (spatial * temporal) (default if not specified) * ‘xyz’ (spatial) * ‘t’ (temporal)

base_dir

Define the work directory for this instance of workflow element.

clone(name)

Clone an EngineBase object.

Parameters:
namestring (mandatory)

A clone of node or workflow must have a new name

property fullname

Build the full name down the hierarchy.

get_output(parameter)

Retrieve a particular output of the node

get_subnodes()[source]

Generate subnodes of a mapnode and write pre-execution report

hash_exists(updatehash=False)

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()

Print interface help

property inputs

Return the inputs of the underlying interface

property interface

Return the underlying interface object

is_cached(rm_outdated=False)

Check if the interface has been run previously, and whether cached results are up-to-date.

property itername

Get the name of the expanded iterable.

static load(filename)

Load this workflow element from a file.

property mem_gb

Get estimated memory (GB)

property mem_x

Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only). Returns None if already consumed or not set.

property n_procs

Get the estimated number of processes/threads

property name

Set the unique name of this workflow element.

num_subnodes()[source]

Get the number of subnodes to iterate in this MapNode

output_dir()

Return the location of the output directory for the node

property outputs

Return the output fields of the underlying interface

property result

Get result from result file (do not hold it in memory)

run(updatehash=False)

Execute the node in its directory.

Parameters:
updatehash: boolean

When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.

save(filename=None)

Store this workflow element to a file.

set_input(parameter, val)[source]

Set interface input value or nodewrapper attribute Priority goes to interface.

update(**opts)

Update inputs

class nipype.interfaces.base.core.Interface[source]

This is an abstract definition for Interface objects.

It provides no functionality. It defines the necessary attributes and methods all Interface objects should have.

Attributes:
always_run

Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.

can_resume

Defines if the interface can reuse partial results after interruption.

input_spec
output_spec
version

interfaces should implement a version property

Methods

aggregate_outputs([runtime, needed_outputs])

Called to populate outputs

help([returnhelp])

Prints class help

run()

Execute the command.

See also

Nipype: Interface Specifications

__init__()[source]

Subclasses must implement __init__

aggregate_outputs(runtime=None, needed_outputs=None)[source]

Called to populate outputs

property always_run

Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.

property can_resume

Defines if the interface can reuse partial results after interruption. Only applies to interfaces being run within a workflow context.

classmethod help(returnhelp=False)[source]

Prints class help

run()[source]

Execute the command.

property version

interfaces should implement a version property

class CPAC.utils.interfaces.function.Function(input_names=None, output_names='out', function=None, imports=None, as_module=False, **inputs)[source]

Runs arbitrary function as an interface

Examples

>>> func = 'def func(arg1, arg2=5): return arg1 + arg2'
>>> fi = Function(input_names=['arg1', 'arg2'], output_names=['out'])
>>> fi.inputs.function_str = func
>>> res = fi.run(arg1=1)
>>> res.outputs.out
6
Attributes:
always_run

Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.

can_resume

Defines if the interface can reuse partial results after interruption.

version

interfaces should implement a version property

Methods

aggregate_outputs([runtime, needed_outputs])

Collate expected outputs and apply output traits validation.

help([returnhelp])

Prints class help

input_spec

alias of CPAC.utils.interfaces.function.FunctionInputSpec

load_inputs_from_json(json_file[, overwrite])

A convenient way to load pre-set inputs from a JSON file.

output_spec

alias of nipype.interfaces.base.specs.DynamicTraitedSpec

run([cwd, ignore_exception])

Execute this interface.

save_inputs_to_json(json_file)

A convenient way to save current inputs to a JSON file.

__init__(input_names=None, output_names='out', function=None, imports=None, as_module=False, **inputs)[source]
Parameters:
input_namessingle str or list or None

names corresponding to function inputs if None, derive input names from function argument names

output_namessingle str or list

names corresponding to function outputs (default: ‘out’). if list of length > 1, has to match the number of outputs

functioncallable

callable python object. must be able to execute in an isolated namespace (possibly in concert with the imports parameter)

importslist of strings

list of import statements that allow the function to execute in an otherwise empty namespace

aggregate_outputs(runtime=None, needed_outputs=None)

Collate expected outputs and apply output traits validation.

property always_run

Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.

property can_resume

Defines if the interface can reuse partial results after interruption. Only applies to interfaces being run within a workflow context.

classmethod help(returnhelp=False)

Prints class help

input_spec

alias of CPAC.utils.interfaces.function.FunctionInputSpec

load_inputs_from_json(json_file, overwrite=True)

A convenient way to load pre-set inputs from a JSON file.

output_spec

alias of nipype.interfaces.base.specs.DynamicTraitedSpec

run(cwd=None, ignore_exception=None, **inputs)

Execute this interface.

This interface will not raise an exception if runtime.returncode is non-zero.

Parameters:
cwdspecify a folder where the interface should be run
inputsallows the interface settings to be updated
Returns:
resultsnipype.interfaces.base.support.InterfaceResult

A copy of the instance that was executed, provenance information and, if successful, results

save_inputs_to_json(json_file)

A convenient way to save current inputs to a JSON file.

property version

interfaces should implement a version property

class CPAC.pipeline.nipype_pipeline_engine.Workflow(name, base_dir=None)[source]
Attributes:
fullname

Build the full name down the hierarchy.

inputs
itername

Get the name of the expanded iterable.

name

Set the unique name of this workflow element.

outputs

Methods

add_nodes(nodes)

Add nodes to a workflow

clone(name)

Clone a workflow

connect(*args, **kwargs)

Connect nodes in the pipeline.

disconnect(*args)

Disconnect nodes See the docstring for connect for format.

export([filename, prefix, format, ...])

Export object into a different format

get_node(name)

Return an internal node by name

list_node_names()

List names of all nodes in a workflow

load(filename)

Load this workflow element from a file.

remove_nodes(nodes)

Remove nodes from a workflow

run([plugin, plugin_args, updatehash])

Execute the workflow

save([filename])

Store this workflow element to a file.

write_graph([dotfilename, graph2use, ...])

Generates a graphviz dot file and a png file

write_hierarchical_dotfile

__init__(name, base_dir=None)[source]

Create a workflow object.

Parameters:
namealphanumeric string

unique identifier for the workflow

base_dirstring, optional

path to workflow storage

add_nodes(nodes)[source]

Add nodes to a workflow

Parameters:
nodeslist

A list of EngineBase-based objects

base_dir

Define the work directory for this instance of workflow element.

clone(name)[source]

Clone a workflow

Note

Will reset attributes used for executing workflow. See _init_runtime_fields.

Parameters:
name: alphanumeric name

unique name for the workflow

connect(*args, **kwargs)[source]

Connect nodes in the pipeline.

This routine also checks if inputs and outputs are actually provided by the nodes that are being connected.

Creates edges in the directed graph using the nodes and edges specified in the connection_list. Uses the NetworkX method DiGraph.add_edges_from.

Parameters:
argslist or a set of four positional arguments

Four positional arguments of the form:

connect(source, sourceoutput, dest, destinput)

source : nodewrapper node sourceoutput : string (must be in source.outputs) dest : nodewrapper node destinput : string (must be in dest.inputs)

A list of 3-tuples of the following form:

[(source, target,
    [('sourceoutput/attribute', 'targetinput'),
    ...]),
...]

Or:

[(source, target, [(('sourceoutput1', func, arg2, ...),
                            'targetinput'), ...]),
...]
sourceoutput1 will always be the first argument to func
and func will be evaluated and the results sent ot targetinput

currently func needs to define all its needed imports within the
function as we use the inspect module to get at the source code
and execute it remotely
disconnect(*args)[source]

Disconnect nodes See the docstring for connect for format.

export(filename=None, prefix='output', format='python', include_config=False)[source]

Export object into a different format

Parameters:
filename: string

file to save the code to; overrides prefix

prefix: string

prefix to use for output file

format: string

one of “python”

include_config: boolean

whether to include node and workflow config values

property fullname

Build the full name down the hierarchy.

get_node(name)[source]

Return an internal node by name

property itername

Get the name of the expanded iterable.

list_node_names()[source]

List names of all nodes in a workflow

static load(filename)

Load this workflow element from a file.

property name

Set the unique name of this workflow element.

remove_nodes(nodes)[source]

Remove nodes from a workflow

Parameters:
nodeslist

A list of EngineBase-based objects

run(plugin=None, plugin_args=None, updatehash=False)[source]

Execute the workflow

Parameters:
plugin: plugin name or object

Plugin to use for execution. You can create your own plugins for execution.

plugin_argsdictionary containing arguments to be sent to plugin

constructor. see individual plugin doc strings for details.

save(filename=None)

Store this workflow element to a file.

write_graph(dotfilename='graph.dot', graph2use='hierarchical', format='png', simple_form=True)[source]

Generates a graphviz dot file and a png file

Parameters:
graph2use: ‘orig’, ‘hierarchical’ (default), ‘flat’, ‘exec’, ‘colored’

orig - creates a top level graph without expanding internal workflow nodes; flat - expands workflow nodes recursively; hierarchical - expands workflow nodes recursively with a notion on hierarchy; colored - expands workflow nodes recursively with a notion on hierarchy in color; exec - expands workflows to depict iterables

format: ‘png’, ‘svg’
simple_form: boolean (default: True)

Determines if the node name used in the graph should be of the form ‘nodename (package)’ when True or ‘nodename.Class.package’ when False.

The Nipype utility function log_to_dict reads a log file generated by log_nodes_cb to a Python dictionary.

CPAC.utils.monitoring.log_nodes_cb(node, status)[source]

Function to record node run statistics to a log file as json dictionaries

Parameters:
nodenipype.pipeline.engine.Node

the node being logged

statusstring

acceptable values are ‘start’, ‘end’; otherwise it is considered and error

Returns:
None

this function does not return any values, it logs the node status info to the callback logger

Module to draw an html gantt chart from logfile produced by CPAC.utils.monitoring.log_nodes_cb().

See https://nipype.readthedocs.io/en/latest/api/generated/nipype.utils.draw_gantt_chart.html

CPAC.utils.monitoring.draw_gantt_chart.calculate_resource_timeseries(events, resource)[source]

Given as event dictionary, calculate the resources used as a timeseries

Parameters:
eventsdictionary

a dictionary of event-based node dictionaries of the workflow execution statistics

resourcestring

the resource of interest to return the time-series of; e.g. ‘runtime_memory_gb’, ‘estimated_threads’, etc

Returns:
time_seriespandas Series

a pandas Series object that contains timestamps as the indices and the resource amount as values

CPAC.utils.monitoring.draw_gantt_chart.create_event_dict(start_time, nodes_list)[source]

Function to generate a dictionary of event (start/finish) nodes from the nodes list

Parameters:
start_timedatetime.datetime

a datetime object of the pipeline start time

nodes_listlist

a list of the node dictionaries that were run in the pipeline

Returns:
eventsdictionary

a dictionary where the key is the timedelta from the start of the pipeline execution to the value node it accompanies

CPAC.utils.monitoring.draw_gantt_chart.draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, colors)[source]

Function to return the html-string of the node drawings for the gantt chart

Parameters:
startdatetime.datetime obj

start time for first node

nodes_listlist

a list of the node dictionaries

coresinteger

the number of cores given to the workflow via the ‘n_procs’ plugin arg

total_durationfloat

total duration of the workflow execution (in seconds)

minute_scaleinteger

the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish

space_between_minutesinteger

scale factor in pixel spacing between minute line markers

colorslist

a list of colors to choose from when coloring the nodes in the gantt chart

Returns:
resultstring

the html-formatted string for producing the minutes-based time line markers

CPAC.utils.monitoring.draw_gantt_chart.generate_gantt_chart(logfile, cores, minute_scale=10, space_between_minutes=50, colors=['#7070FF', '#4E4EB2', '#2D2D66', '#9B9BFF'])[source]

Generates a gantt chart in html showing the workflow execution based on a callback log file. This script was intended to be used with the MultiprocPlugin. The following code shows how to set up the workflow in order to generate the log file:

Parameters:
logfilestring

filepath to the callback log file to plot the gantt chart of

coresinteger

the number of cores given to the workflow via the ‘n_procs’ plugin arg

minute_scaleinteger (optional); default=10

the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish

space_between_minutesinteger (optional); default=50

scale factor in pixel spacing between minute line markers

colorslist (optional)

a list of colors to choose from when coloring the nodes in the gantt chart

Returns:
None

the function does not return any value but writes out an html file in the same directory as the callback log path passed in

CPAC.utils.monitoring.draw_gantt_chart.resource_overusage_report(cblog)[source]

Function to parse through a callback log for memory and/or thread usage above estimates / limits.

Parameters:
cblog: str

path to callback.log

Returns:
text_report: str
excessive: dict
CPAC.utils.monitoring.draw_gantt_chart.resource_report(callback_log, num_cores, logger=None)[source]

Function to attempt to warn any excessive resource usage and generate an interactive HTML chart.

Parameters:
callback_log: str

path to callback.log

num_cores: int
logger: Logger

https://docs.python.org/3/library/logging.html#logger-objects

Returns:
None