Nodes¶
A Nipype Node
has an initialization parameter mem_gb
that differs from the commandline option --mem_gb
. While the commandline option is a limit, the Node initialization parameter is an estimate of the most memory that Node will consume when run. The Node parameter is not a limit; rather, this value is used to allocate system resources at runtime.
Conversely, the commandline option --n_cpus
is a limit and the Node initialization parameter n_procs
is also a limit of the maximum number of threads a Node will be permmitted to consume.
C-PAC automatically creates a JSON-like file called callback.log
(via the function log_nodes_cb
) when running. This file includes for each Node:
estimated memory,
memory usage observed at runtime,
specified maximum number of threads per Node, and
threads used at runtime.
When a developer creates or modifies a Node in C-PAC, a mem_gb
and n_procs
argument should be provided unless the respective defaults of 0.2 and None (number of available system cores) are expected to be sufficient. When testing, the mem_gb
and n_procs
arguments should be adjusted if the observed memory and/or thread usage of a Node exceeds the estimate.
For nodes that will use a varying amount of memory depending on the node’s input data, the optional parameter mem_x
takes a tuple of (memory multiplier, input file, multiplier mode)
where memory multiplier
is a number and input file
is the string name of the Node input to multiply such that the memory estimate returned by the mem_gb
attribute is the mem_gb
argument plus memory multiplier
times the dimensions of the input file
as specified in the multiplier mode
(xyzt
(spatial × temporal; default), xyz
(spatial), or just t
(temporal)).
Note
mem_x
is a new parameter in C-PAC v1.8.1 and subject to change in future releases as we continue to develop methods for setting data- and operation-dependent memory estimates.
- class CPAC.pipeline.nipype_pipeline_engine.Node(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, *, mem_x, **kwargs)[source]¶
Wraps interface objects for use in pipeline
A Node creates a sandbox-like directory for executing the underlying interface. It will copy or link inputs into this directory to ensure that input data are not overwritten. A hash of the input state is used to determine if the Node inputs have changed and whether the node needs to be re-executed.
Examples
>>> from nipype import Node >>> from nipype.interfaces import spm >>> realign = Node(spm.Realign(), 'realign') >>> realign.inputs.in_files = 'functional.nii' >>> realign.inputs.register_to_mean = True >>> realign.run()
- Attributes:
fullname
Build the full name down the hierarchy.
inputs
Return the inputs of the underlying interface
interface
Return the underlying interface object
itername
Get the name of the expanded iterable.
mem_gb
Get estimated memory (GB)
mem_x
Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only).
n_procs
Get the estimated number of processes/threads
name
Set the unique name of this workflow element.
- needed_outputs
outputs
Return the output fields of the underlying interface
result
Get result from result file (do not hold it in memory)
Methods
clone
(name)Clone an EngineBase object.
get_output
(parameter)Retrieve a particular output of the node
hash_exists
([updatehash])Decorate the new is_cached method with hash updating to maintain backwards compatibility.
help
()Print interface help
is_cached
([rm_outdated])Check if the interface has been run previously, and whether cached results are up-to-date.
load
(filename)Load this workflow element from a file.
Return the location of the output directory for the node
run
([updatehash])Execute the node in its directory.
save
([filename])Store this workflow element to a file.
set_input
(parameter, val)Set interface input value
update
(**opts)Update inputs
- __init__(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, *, mem_x, **kwargs)[source]¶
- Parameters:
- interfaceinterface object
node specific interface (fsl.Bet(), spm.Coregister())
- namealphanumeric string
node specific name
- iterablesgenerator
Input field and list to iterate using the pipeline engine for example to iterate over different frac values in fsl.Bet() for a single field the input can be a tuple, otherwise a list of tuples : :
node.iterables = (‘frac’,[0.5,0.6,0.7]) node.iterables = [(‘fwhm’,[2,4]),(‘fieldx’,[0.5,0.6,0.7])]
If this node has an itersource, then the iterables values is a dictionary which maps an iterable source field value to the target iterables field values, e.g. : : :
inputspec.iterables = (‘images’,[‘img1.nii’, ‘img2.nii’]]) node.itersource = (‘inputspec’, [‘frac’]) node.iterables = (‘frac’, {‘img1.nii’ : [0.5, 0.6],
‘img2.nii’ : [0.6, 0.7]})
If this node’s synchronize flag is set, then an alternate form of the iterables is a [fields, values] list, where fields is the list of iterated fields and values is the list of value tuples for the given fields, e.g. : : :
node.synchronize = True node.iterables = [(‘frac’, ‘threshold’),
- [(0.5, True),
(0.6, False)]]
- itersourcetuple
The (name, fields) iterables source which specifies the name of the predecessor iterable node and the input fields to use from that source node. The output field values comprise the key to the iterables parameter value mapping dictionary.
- synchronizeboolean
Flag indicating whether iterables are synchronized. If the iterables are synchronized, then this iterable node is expanded once per iteration over all of the iterables values. Otherwise, this iterable node is expanded once per each permutation of the iterables values.
- overwriteBoolean
Whether to overwrite contents of output directory if it already exists. If directory exists and hash matches it assumes that process has been executed
- needed_outputslist of output_names
Force the node to keep only specific outputs. By default all outputs are kept. Setting this attribute will delete any output files and directories from the node’s working directory that are not part of the needed_outputs.
- run_without_submittingboolean
Run the node without submitting to a job engine or to a multiprocessing pool
- mem_gbint or float
Estimate (in GB) of constant memory to allocate for this node.
- mem_x2-tuple or 3-tuple
(
multiplier
,input_file
) (int or float, str)(
multiplier
,input_file
,mode
) (int or float, str, str)Note This parameter (
mem_x
) is likely to change in a future release as we incorporate more factors into memory estimates.
- See also⚡️ Setting data- and operation-dependent memory-estimates
GitHub epic of issues related to improving Node memory estimates based on the data and operations involved.
Multiplier for memory allocation such that
multiplier
timesmode
of 4-D file atinput_file
plusself._mem_gb
equals the total memory allocation for the node.input_file
can be a Node input string or an actual path.
mode
can be any one of * ‘xyzt’ (spatial * temporal) (default if not specified) * ‘xyz’ (spatial) * ‘t’ (temporal)
- base_dir¶
Define the work directory for this instance of workflow element.
- clone(name)¶
Clone an EngineBase object.
- Parameters:
- namestring (mandatory)
A clone of node or workflow must have a new name
- property fullname¶
Build the full name down the hierarchy.
- hash_exists(updatehash=False)[source]¶
Decorate the new is_cached method with hash updating to maintain backwards compatibility.
- property inputs¶
Return the inputs of the underlying interface
- property interface¶
Return the underlying interface object
- is_cached(rm_outdated=False)[source]¶
Check if the interface has been run previously, and whether cached results are up-to-date.
- property itername¶
Get the name of the expanded iterable.
- static load(filename)¶
Load this workflow element from a file.
- property mem_gb¶
Get estimated memory (GB)
- property mem_x¶
Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only). Returns
None
if already consumed or not set.
- property n_procs¶
Get the estimated number of processes/threads
- property name¶
Set the unique name of this workflow element.
- property outputs¶
Return the output fields of the underlying interface
- property result¶
Get result from result file (do not hold it in memory)
- run(updatehash=False)[source]¶
Execute the node in its directory.
- Parameters:
- updatehash: boolean
When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.
- save(filename=None)¶
Store this workflow element to a file.
- class CPAC.pipeline.nipype_pipeline_engine.MapNode(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, **kwargs)[source]¶
Wraps interface objects that need to be iterated on a list of inputs.
Examples
>>> from nipype import MapNode >>> from nipype.interfaces import fsl >>> realign = MapNode(fsl.MCFLIRT(), 'in_file', 'realign') >>> realign.inputs.in_file = ['functional.nii', ... 'functional2.nii', ... 'functional3.nii'] >>> realign.run()
- Attributes:
fullname
Build the full name down the hierarchy.
inputs
Return the inputs of the underlying interface
interface
Return the underlying interface object
itername
Get the name of the expanded iterable.
mem_gb
Get estimated memory (GB)
mem_x
Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only).
n_procs
Get the estimated number of processes/threads
name
Set the unique name of this workflow element.
- needed_outputs
outputs
Return the output fields of the underlying interface
result
Get result from result file (do not hold it in memory)
Methods
clone
(name)Clone an EngineBase object.
get_output
(parameter)Retrieve a particular output of the node
Generate subnodes of a mapnode and write pre-execution report
hash_exists
([updatehash])Decorate the new is_cached method with hash updating to maintain backwards compatibility.
help
()Print interface help
is_cached
([rm_outdated])Check if the interface has been run previously, and whether cached results are up-to-date.
load
(filename)Load this workflow element from a file.
Get the number of subnodes to iterate in this MapNode
Return the location of the output directory for the node
run
([updatehash])Execute the node in its directory.
save
([filename])Store this workflow element to a file.
set_input
(parameter, val)Set interface input value or nodewrapper attribute Priority goes to interface.
update
(**opts)Update inputs
- __init__(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb=2.0, **kwargs)[source]¶
- Parameters:
- interfaceinterface object
node specific interface (fsl.Bet(), spm.Coregister())
- namealphanumeric string
node specific name
- iterablesgenerator
Input field and list to iterate using the pipeline engine for example to iterate over different frac values in fsl.Bet() for a single field the input can be a tuple, otherwise a list of tuples : :
node.iterables = (‘frac’,[0.5,0.6,0.7]) node.iterables = [(‘fwhm’,[2,4]),(‘fieldx’,[0.5,0.6,0.7])]
If this node has an itersource, then the iterables values is a dictionary which maps an iterable source field value to the target iterables field values, e.g. : : :
inputspec.iterables = (‘images’,[‘img1.nii’, ‘img2.nii’]]) node.itersource = (‘inputspec’, [‘frac’]) node.iterables = (‘frac’, {‘img1.nii’ : [0.5, 0.6],
‘img2.nii’ : [0.6, 0.7]})
If this node’s synchronize flag is set, then an alternate form of the iterables is a [fields, values] list, where fields is the list of iterated fields and values is the list of value tuples for the given fields, e.g. : : :
node.synchronize = True node.iterables = [(‘frac’, ‘threshold’),
- [(0.5, True),
(0.6, False)]]
- itersourcetuple
The (name, fields) iterables source which specifies the name of the predecessor iterable node and the input fields to use from that source node. The output field values comprise the key to the iterables parameter value mapping dictionary.
- synchronizeboolean
Flag indicating whether iterables are synchronized. If the iterables are synchronized, then this iterable node is expanded once per iteration over all of the iterables values. Otherwise, this iterable node is expanded once per each permutation of the iterables values.
- overwriteBoolean
Whether to overwrite contents of output directory if it already exists. If directory exists and hash matches it assumes that process has been executed
- needed_outputslist of output_names
Force the node to keep only specific outputs. By default all outputs are kept. Setting this attribute will delete any output files and directories from the node’s working directory that are not part of the needed_outputs.
- run_without_submittingboolean
Run the node without submitting to a job engine or to a multiprocessing pool
- mem_gbint or float
Estimate (in GB) of constant memory to allocate for this node.
- mem_x2-tuple or 3-tuple
(
multiplier
,input_file
) (int or float, str)(
multiplier
,input_file
,mode
) (int or float, str, str)Note This parameter (
mem_x
) is likely to change in a future release as we incorporate more factors into memory estimates.
- See also⚡️ Setting data- and operation-dependent memory-estimates
GitHub epic of issues related to improving Node memory estimates based on the data and operations involved.
Multiplier for memory allocation such that
multiplier
timesmode
of 4-D file atinput_file
plusself._mem_gb
equals the total memory allocation for the node.input_file
can be a Node input string or an actual path.
mode
can be any one of * ‘xyzt’ (spatial * temporal) (default if not specified) * ‘xyz’ (spatial) * ‘t’ (temporal)
- base_dir¶
Define the work directory for this instance of workflow element.
- clone(name)¶
Clone an EngineBase object.
- Parameters:
- namestring (mandatory)
A clone of node or workflow must have a new name
- property fullname¶
Build the full name down the hierarchy.
- get_output(parameter)¶
Retrieve a particular output of the node
- hash_exists(updatehash=False)¶
Decorate the new is_cached method with hash updating to maintain backwards compatibility.
- help()¶
Print interface help
- property inputs¶
Return the inputs of the underlying interface
- property interface¶
Return the underlying interface object
- is_cached(rm_outdated=False)¶
Check if the interface has been run previously, and whether cached results are up-to-date.
- property itername¶
Get the name of the expanded iterable.
- static load(filename)¶
Load this workflow element from a file.
- property mem_gb¶
Get estimated memory (GB)
- property mem_x¶
Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only). Returns
None
if already consumed or not set.
- property n_procs¶
Get the estimated number of processes/threads
- property name¶
Set the unique name of this workflow element.
- output_dir()¶
Return the location of the output directory for the node
- property outputs¶
Return the output fields of the underlying interface
- property result¶
Get result from result file (do not hold it in memory)
- run(updatehash=False)¶
Execute the node in its directory.
- Parameters:
- updatehash: boolean
When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.
- save(filename=None)¶
Store this workflow element to a file.
- set_input(parameter, val)[source]¶
Set interface input value or nodewrapper attribute Priority goes to interface.
- update(**opts)¶
Update inputs
- class nipype.interfaces.base.core.Interface[source]¶
This is an abstract definition for Interface objects.
It provides no functionality. It defines the necessary attributes and methods all Interface objects should have.
- Attributes:
always_run
Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.
can_resume
Defines if the interface can reuse partial results after interruption.
- input_spec
- output_spec
version
interfaces should implement a version property
Methods
aggregate_outputs
([runtime, needed_outputs])Called to populate outputs
help
([returnhelp])Prints class help
run
()Execute the command.
See also
Nipype: Interface Specifications
- property always_run¶
Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.
- property can_resume¶
Defines if the interface can reuse partial results after interruption. Only applies to interfaces being run within a workflow context.
- property version¶
interfaces should implement a version property
- class CPAC.utils.interfaces.function.Function(input_names=None, output_names='out', function=None, imports=None, as_module=False, **inputs)[source]¶
Runs arbitrary function as an interface
Examples
>>> func = 'def func(arg1, arg2=5): return arg1 + arg2' >>> fi = Function(input_names=['arg1', 'arg2'], output_names=['out']) >>> fi.inputs.function_str = func >>> res = fi.run(arg1=1) >>> res.outputs.out 6
- Attributes:
always_run
Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.
can_resume
Defines if the interface can reuse partial results after interruption.
version
interfaces should implement a version property
Methods
aggregate_outputs
([runtime, needed_outputs])Collate expected outputs and apply output traits validation.
help
([returnhelp])Prints class help
alias of
CPAC.utils.interfaces.function.FunctionInputSpec
load_inputs_from_json
(json_file[, overwrite])A convenient way to load pre-set inputs from a JSON file.
alias of
nipype.interfaces.base.specs.DynamicTraitedSpec
run
([cwd, ignore_exception])Execute this interface.
save_inputs_to_json
(json_file)A convenient way to save current inputs to a JSON file.
- __init__(input_names=None, output_names='out', function=None, imports=None, as_module=False, **inputs)[source]¶
- Parameters:
- input_namessingle str or list or None
names corresponding to function inputs if
None
, derive input names from function argument names- output_namessingle str or list
names corresponding to function outputs (default: ‘out’). if list of length > 1, has to match the number of outputs
- functioncallable
callable python object. must be able to execute in an isolated namespace (possibly in concert with the
imports
parameter)- importslist of strings
list of import statements that allow the function to execute in an otherwise empty namespace
- aggregate_outputs(runtime=None, needed_outputs=None)¶
Collate expected outputs and apply output traits validation.
- property always_run¶
Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.
- property can_resume¶
Defines if the interface can reuse partial results after interruption. Only applies to interfaces being run within a workflow context.
- classmethod help(returnhelp=False)¶
Prints class help
- input_spec¶
alias of
CPAC.utils.interfaces.function.FunctionInputSpec
- load_inputs_from_json(json_file, overwrite=True)¶
A convenient way to load pre-set inputs from a JSON file.
- output_spec¶
alias of
nipype.interfaces.base.specs.DynamicTraitedSpec
- run(cwd=None, ignore_exception=None, **inputs)¶
Execute this interface.
This interface will not raise an exception if runtime.returncode is non-zero.
- Parameters:
- cwdspecify a folder where the interface should be run
- inputsallows the interface settings to be updated
- Returns:
- results
nipype.interfaces.base.support.InterfaceResult
A copy of the instance that was executed, provenance information and, if successful, results
- save_inputs_to_json(json_file)¶
A convenient way to save current inputs to a JSON file.
- property version¶
interfaces should implement a version property
- class CPAC.pipeline.nipype_pipeline_engine.Workflow(name, base_dir=None)[source]¶
- Attributes:
Methods
add_nodes
(nodes)Add nodes to a workflow
clone
(name)Clone a workflow
connect
(*args, **kwargs)Connect nodes in the pipeline.
disconnect
(*args)Disconnect nodes See the docstring for connect for format.
export
([filename, prefix, format, ...])Export object into a different format
get_node
(name)Return an internal node by name
List names of all nodes in a workflow
load
(filename)Load this workflow element from a file.
remove_nodes
(nodes)Remove nodes from a workflow
run
([plugin, plugin_args, updatehash])Execute the workflow
save
([filename])Store this workflow element to a file.
write_graph
([dotfilename, graph2use, ...])Generates a graphviz dot file and a png file
write_hierarchical_dotfile
- __init__(name, base_dir=None)[source]¶
Create a workflow object.
- Parameters:
- namealphanumeric string
unique identifier for the workflow
- base_dirstring, optional
path to workflow storage
- add_nodes(nodes)[source]¶
Add nodes to a workflow
- Parameters:
- nodeslist
A list of EngineBase-based objects
- base_dir¶
Define the work directory for this instance of workflow element.
- clone(name)[source]¶
Clone a workflow
Note
Will reset attributes used for executing workflow. See _init_runtime_fields.
- Parameters:
- name: alphanumeric name
unique name for the workflow
- connect(*args, **kwargs)[source]¶
Connect nodes in the pipeline.
This routine also checks if inputs and outputs are actually provided by the nodes that are being connected.
Creates edges in the directed graph using the nodes and edges specified in the connection_list. Uses the NetworkX method DiGraph.add_edges_from.
- Parameters:
- argslist or a set of four positional arguments
Four positional arguments of the form:
connect(source, sourceoutput, dest, destinput)source : nodewrapper node sourceoutput : string (must be in source.outputs) dest : nodewrapper node destinput : string (must be in dest.inputs)
A list of 3-tuples of the following form:
[(source, target, [('sourceoutput/attribute', 'targetinput'), ...]), ...]Or:
[(source, target, [(('sourceoutput1', func, arg2, ...), 'targetinput'), ...]), ...] sourceoutput1 will always be the first argument to func and func will be evaluated and the results sent ot targetinput currently func needs to define all its needed imports within the function as we use the inspect module to get at the source code and execute it remotely
- export(filename=None, prefix='output', format='python', include_config=False)[source]¶
Export object into a different format
- Parameters:
- filename: string
file to save the code to; overrides prefix
- prefix: string
prefix to use for output file
- format: string
one of “python”
- include_config: boolean
whether to include node and workflow config values
- property fullname¶
Build the full name down the hierarchy.
- property itername¶
Get the name of the expanded iterable.
- static load(filename)¶
Load this workflow element from a file.
- property name¶
Set the unique name of this workflow element.
- remove_nodes(nodes)[source]¶
Remove nodes from a workflow
- Parameters:
- nodeslist
A list of EngineBase-based objects
- run(plugin=None, plugin_args=None, updatehash=False)[source]¶
Execute the workflow
- Parameters:
- plugin: plugin name or object
Plugin to use for execution. You can create your own plugins for execution.
- plugin_argsdictionary containing arguments to be sent to plugin
constructor. see individual plugin doc strings for details.
- save(filename=None)¶
Store this workflow element to a file.
- write_graph(dotfilename='graph.dot', graph2use='hierarchical', format='png', simple_form=True)[source]¶
Generates a graphviz dot file and a png file
- Parameters:
- graph2use: ‘orig’, ‘hierarchical’ (default), ‘flat’, ‘exec’, ‘colored’
orig - creates a top level graph without expanding internal workflow nodes; flat - expands workflow nodes recursively; hierarchical - expands workflow nodes recursively with a notion on hierarchy; colored - expands workflow nodes recursively with a notion on hierarchy in color; exec - expands workflows to depict iterables
- format: ‘png’, ‘svg’
- simple_form: boolean (default: True)
Determines if the node name used in the graph should be of the form ‘nodename (package)’ when True or ‘nodename.Class.package’ when False.
The Nipype utility function
log_to_dict
reads a log file generated bylog_nodes_cb
to a Python dictionary.
- CPAC.utils.monitoring.log_nodes_cb(node, status)[source]¶
Function to record node run statistics to a log file as json dictionaries
- Parameters:
- nodenipype.pipeline.engine.Node
the node being logged
- statusstring
acceptable values are ‘start’, ‘end’; otherwise it is considered and error
- Returns:
- None
this function does not return any values, it logs the node status info to the callback logger
Module to draw an html gantt chart from logfile produced by
CPAC.utils.monitoring.log_nodes_cb()
.See https://nipype.readthedocs.io/en/latest/api/generated/nipype.utils.draw_gantt_chart.html
- CPAC.utils.monitoring.draw_gantt_chart.calculate_resource_timeseries(events, resource)[source]¶
Given as event dictionary, calculate the resources used as a timeseries
- Parameters:
- eventsdictionary
a dictionary of event-based node dictionaries of the workflow execution statistics
- resourcestring
the resource of interest to return the time-series of; e.g. ‘runtime_memory_gb’, ‘estimated_threads’, etc
- Returns:
- time_seriespandas Series
a pandas Series object that contains timestamps as the indices and the resource amount as values
- CPAC.utils.monitoring.draw_gantt_chart.create_event_dict(start_time, nodes_list)[source]¶
Function to generate a dictionary of event (start/finish) nodes from the nodes list
- Parameters:
- start_timedatetime.datetime
a datetime object of the pipeline start time
- nodes_listlist
a list of the node dictionaries that were run in the pipeline
- Returns:
- eventsdictionary
a dictionary where the key is the timedelta from the start of the pipeline execution to the value node it accompanies
- CPAC.utils.monitoring.draw_gantt_chart.draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, colors)[source]¶
Function to return the html-string of the node drawings for the gantt chart
- Parameters:
- startdatetime.datetime obj
start time for first node
- nodes_listlist
a list of the node dictionaries
- coresinteger
the number of cores given to the workflow via the ‘n_procs’ plugin arg
- total_durationfloat
total duration of the workflow execution (in seconds)
- minute_scaleinteger
the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish
- space_between_minutesinteger
scale factor in pixel spacing between minute line markers
- colorslist
a list of colors to choose from when coloring the nodes in the gantt chart
- Returns:
- resultstring
the html-formatted string for producing the minutes-based time line markers
- CPAC.utils.monitoring.draw_gantt_chart.generate_gantt_chart(logfile, cores, minute_scale=10, space_between_minutes=50, colors=['#7070FF', '#4E4EB2', '#2D2D66', '#9B9BFF'])[source]¶
Generates a gantt chart in html showing the workflow execution based on a callback log file. This script was intended to be used with the MultiprocPlugin. The following code shows how to set up the workflow in order to generate the log file:
- Parameters:
- logfilestring
filepath to the callback log file to plot the gantt chart of
- coresinteger
the number of cores given to the workflow via the ‘n_procs’ plugin arg
- minute_scaleinteger (optional); default=10
the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish
- space_between_minutesinteger (optional); default=50
scale factor in pixel spacing between minute line markers
- colorslist (optional)
a list of colors to choose from when coloring the nodes in the gantt chart
- Returns:
- None
the function does not return any value but writes out an html file in the same directory as the callback log path passed in
- CPAC.utils.monitoring.draw_gantt_chart.resource_overusage_report(cblog)[source]¶
Function to parse through a callback log for memory and/or thread usage above estimates / limits.
- Parameters:
- cblog: str
path to callback.log
- Returns:
- text_report: str
- excessive: dict
- CPAC.utils.monitoring.draw_gantt_chart.resource_report(callback_log, num_cores, logger=None)[source]¶
Function to attempt to warn any excessive resource usage and generate an interactive HTML chart.
- Parameters:
- callback_log: str
path to callback.log
- num_cores: int
- logger: Logger
https://docs.python.org/3/library/logging.html#logger-objects
- Returns:
- None