Nodes

A Nipype Node has an initialization parameter mem_gb that differs from the commandline option --mem-gb. While the commandline option is a limit, the Node initialization parameter is an estimate of the most memory that Node will consume when run. The Node parameter is not a limit; rather, this value is used to allocate system resources at runtime.

Conversely, the commandline option --n-cpus is a limit and the Node initialization parameter n_procs is also a limit of the maximum number of threads a Node will be permmitted to consume.

C-PAC automatically creates a JSON-like file called callback.log (via the function log_nodes_cb) when running. This file includes for each Node:

  • estimated memory,

  • memory usage observed at runtime,

  • specified maximum number of threads per Node, and

  • threads used at runtime.

A callback.log can be provided to the pipeline configuration file (see Computer Settings) or with the commandline flag --runtime-usage. If a callback log is provided in the pipeline configuration, nodes with names that match nodes recorded in that pipeline log will have their memory estimates overridden by the values in the callback log plus a buffer percent (provided with the --runtime-buffer flag or in the pipeline configuration file).

When a developer creates or modifies a Node in C-PAC, a mem_gb and n_procs argument should be provided unless the respective defaults of 0.2 and None (number of available system cores) are expected to be sufficient. When testing, the mem_gb and n_procs arguments should be adjusted if the observed memory and/or thread usage of a Node exceeds the estimate.

For nodes that will use a varying amount of memory depending on the node’s input data, the optional parameter mem_x takes a tuple of (memory multiplier, input file, multiplier mode) where memory multiplier is a number and input file is the string name of the Node input to multiply such that the memory estimate returned by the mem_gb attribute is the mem_gb argument plus memory multiplier times the dimensions of the input file as specified in the multiplier mode (xyzt (spatial × temporal; default), xyz (spatial), or just t (temporal)).

Note

mem_x is a new parameter in C-PAC v1.8.1 and subject to change in future releases as we continue to develop methods for setting data- and operation-dependent memory estimates.

class CPAC.pipeline.nipype_pipeline_engine.Node(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb: float | None = 2.0, *, mem_x=None, throttle: bool | None = False, **kwargs)[source]

Wraps interface objects for use in pipeline

A Node creates a sandbox-like directory for executing the underlying interface. It will copy or link inputs into this directory to ensure that input data are not overwritten. A hash of the input state is used to determine if the Node inputs have changed and whether the node needs to be re-executed.

Examples

>>> from nipype import Node
>>> from nipype.interfaces import spm
>>> realign = Node(spm.Realign(), 'realign')
>>> realign.inputs.in_files = 'functional.nii'  
>>> realign.inputs.register_to_mean = True
>>> realign.run() 
__init__(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, mem_gb: float | None = 2.0, *, mem_x=None, throttle: bool | None = False, **kwargs) None[source]
Parameters:
  • interface (interface object) – node specific interface (fsl.Bet(), spm.Coregister())

  • name (alphanumeric string) – node specific name

  • iterables (generator) –

    Input field and list to iterate using the pipeline engine for example to iterate over different frac values in fsl.Bet() for a single field the input can be a tuple, otherwise a list of tuples : :

    node.iterables = (‘frac’,[0.5,0.6,0.7]) node.iterables = [(‘fwhm’,[2,4]),(‘fieldx’,[0.5,0.6,0.7])]

    If this node has an itersource, then the iterables values is a dictionary which maps an iterable source field value to the target iterables field values, e.g. : : :

    inputspec.iterables = (‘images’,[‘img1.nii’, ‘img2.nii’]]) node.itersource = (‘inputspec’, [‘frac’]) node.iterables = (‘frac’, {‘img1.nii’ : [0.5, 0.6],

    ’img2.nii’ : [0.6, 0.7]})

    If this node’s synchronize flag is set, then an alternate form of the iterables is a [fields, values] list, where fields is the list of iterated fields and values is the list of value tuples for the given fields, e.g. : : :

    node.synchronize = True node.iterables = [(‘frac’, ‘threshold’),

    [(0.5, True),

    (0.6, False)]]

  • itersource (tuple) – The (name, fields) iterables source which specifies the name of the predecessor iterable node and the input fields to use from that source node. The output field values comprise the key to the iterables parameter value mapping dictionary.

  • synchronize (boolean) – Flag indicating whether iterables are synchronized. If the iterables are synchronized, then this iterable node is expanded once per iteration over all of the iterables values. Otherwise, this iterable node is expanded once per each permutation of the iterables values.

  • overwrite (Boolean) – Whether to overwrite contents of output directory if it already exists. If directory exists and hash matches it assumes that process has been executed

  • needed_outputs (list of output_names) – Force the node to keep only specific outputs. By default all outputs are kept. Setting this attribute will delete any output files and directories from the node’s working directory that are not part of the needed_outputs.

  • run_without_submitting (boolean) – Run the node without submitting to a job engine or to a multiprocessing pool

  • mem_gb (int or float) – Estimate (in GB) of constant memory to allocate for this node.

  • mem_x (2-tuple or 3-tuple) –

    (multiplier, input_file) (int or float, str)

    (multiplier, input_file, mode) (int or float, str, str)

    Note This parameter (mem_x) is likely to change in a future release as we incorporate more factors into memory estimates.

    See also⚡️ Setting data- and operation-dependent memory-estimates

    GitHub epic of issues related to improving Node memory estimates based on the data and operations involved.

    Multiplier for memory allocation such that multiplier times mode of 4-D file at input_file plus self._mem_gb equals the total memory allocation for the node. input_file can be a Node input string or an actual path.

    mode can be any one of * ‘xyzt’ (spatial * temporal) (default if not specified) * ‘xyz’ (spatial) * ‘t’ (temporal)

  • throttle (bool, optional) – Assume this Node will use all available memory if no observation run is provided.

base_dir

Define the work directory for this instance of workflow element.

clone(name)

Clone an EngineBase object.

Parameters:

name (string (mandatory)) – A clone of node or workflow must have a new name

property fullname

Build the full name down the hierarchy.

get_output(parameter)[source]

Retrieve a particular output of the node

hash_exists(updatehash=False)[source]

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()[source]

Print interface help

property inputs

Return the inputs of the underlying interface

property interface

Return the underlying interface object

is_cached(rm_outdated=False)[source]

Check if the interface has been run previously, and whether cached results are up-to-date.

property itername

Get the name of the expanded iterable.

static load(filename)

Load this workflow element from a file.

property mem_gb

Get estimated memory (GB)

property mem_x

Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only). Returns None if already consumed or not set.

property n_procs

Get the estimated number of processes/threads

property name

Set the unique name of this workflow element.

output_dir()[source]

Return the location of the output directory for the node

property outputs

Return the output fields of the underlying interface

override_mem_gb(new_mem_gb)[source]

Override the Node’s memory estimate with a new value.

Parameters:

new_mem_gb (int or float) – new memory estimate in GB

property result

Get result from result file (do not hold it in memory)

run(updatehash=False)[source]

Execute the node in its directory.

Parameters:

updatehash (boolean) – When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.

save(filename=None)

Store this workflow element to a file.

set_input(parameter, val)[source]

Set interface input value

update(**opts)[source]

Update inputs

class CPAC.pipeline.nipype_pipeline_engine.MapNode(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, throttle=False, **kwargs)[source]

Wraps interface objects that need to be iterated on a list of inputs.

Examples

>>> from nipype import MapNode
>>> from nipype.interfaces import fsl
>>> realign = MapNode(fsl.MCFLIRT(), 'in_file', 'realign')
>>> realign.inputs.in_file = ['functional.nii',
...                           'functional2.nii',
...                           'functional3.nii']  
>>> realign.run() 
__init__(name, iterables=None, itersource=None, synchronize=False, overwrite=None, needed_outputs=None, run_without_submitting=False, n_procs=None, throttle=False, **kwargs)[source]
Parameters:
  • interface (interface object) – node specific interface (fsl.Bet(), spm.Coregister())

  • name (alphanumeric string) – node specific name

  • iterables (generator) –

    Input field and list to iterate using the pipeline engine for example to iterate over different frac values in fsl.Bet() for a single field the input can be a tuple, otherwise a list of tuples : :

    node.iterables = (‘frac’,[0.5,0.6,0.7]) node.iterables = [(‘fwhm’,[2,4]),(‘fieldx’,[0.5,0.6,0.7])]

    If this node has an itersource, then the iterables values is a dictionary which maps an iterable source field value to the target iterables field values, e.g. : : :

    inputspec.iterables = (‘images’,[‘img1.nii’, ‘img2.nii’]]) node.itersource = (‘inputspec’, [‘frac’]) node.iterables = (‘frac’, {‘img1.nii’ : [0.5, 0.6],

    ’img2.nii’ : [0.6, 0.7]})

    If this node’s synchronize flag is set, then an alternate form of the iterables is a [fields, values] list, where fields is the list of iterated fields and values is the list of value tuples for the given fields, e.g. : : :

    node.synchronize = True node.iterables = [(‘frac’, ‘threshold’),

    [(0.5, True),

    (0.6, False)]]

  • itersource (tuple) – The (name, fields) iterables source which specifies the name of the predecessor iterable node and the input fields to use from that source node. The output field values comprise the key to the iterables parameter value mapping dictionary.

  • synchronize (boolean) – Flag indicating whether iterables are synchronized. If the iterables are synchronized, then this iterable node is expanded once per iteration over all of the iterables values. Otherwise, this iterable node is expanded once per each permutation of the iterables values.

  • overwrite (Boolean) – Whether to overwrite contents of output directory if it already exists. If directory exists and hash matches it assumes that process has been executed

  • needed_outputs (list of output_names) – Force the node to keep only specific outputs. By default all outputs are kept. Setting this attribute will delete any output files and directories from the node’s working directory that are not part of the needed_outputs.

  • run_without_submitting (boolean) – Run the node without submitting to a job engine or to a multiprocessing pool

  • mem_gb (int or float) – Estimate (in GB) of constant memory to allocate for this node.

  • mem_x (2-tuple or 3-tuple) –

    (multiplier, input_file) (int or float, str)

    (multiplier, input_file, mode) (int or float, str, str)

    Note This parameter (mem_x) is likely to change in a future release as we incorporate more factors into memory estimates.

    See also⚡️ Setting data- and operation-dependent memory-estimates

    GitHub epic of issues related to improving Node memory estimates based on the data and operations involved.

    Multiplier for memory allocation such that multiplier times mode of 4-D file at input_file plus self._mem_gb equals the total memory allocation for the node. input_file can be a Node input string or an actual path.

    mode can be any one of * ‘xyzt’ (spatial * temporal) (default if not specified) * ‘xyz’ (spatial) * ‘t’ (temporal)

  • throttle (bool, optional) – Assume this Node will use all available memory if no observation run is provided.

base_dir

Define the work directory for this instance of workflow element.

clone(name)

Clone an EngineBase object.

Parameters:

name (string (mandatory)) – A clone of node or workflow must have a new name

property fullname

Build the full name down the hierarchy.

get_output(parameter)

Retrieve a particular output of the node

get_subnodes()[source]

Generate subnodes of a mapnode and write pre-execution report

hash_exists(updatehash=False)

Decorate the new is_cached method with hash updating to maintain backwards compatibility.

help()

Print interface help

property inputs

Return the inputs of the underlying interface

property interface

Return the underlying interface object

is_cached(rm_outdated=False)

Check if the interface has been run previously, and whether cached results are up-to-date.

property itername

Get the name of the expanded iterable.

static load(filename)

Load this workflow element from a file.

property mem_gb

Get estimated memory (GB)

property mem_x

Get dict of ‘multiplier’ (memory multiplier), ‘file’ (input file) and multiplier mode (spatial * temporal, spatial only or temporal only). Returns None if already consumed or not set.

property n_procs

Get the estimated number of processes/threads

property name

Set the unique name of this workflow element.

num_subnodes()[source]

Get the number of subnodes to iterate in this MapNode

output_dir()

Return the location of the output directory for the node

property outputs

Return the output fields of the underlying interface

override_mem_gb(new_mem_gb)

Override the Node’s memory estimate with a new value.

Parameters:

new_mem_gb (int or float) – new memory estimate in GB

property result

Get result from result file (do not hold it in memory)

run(updatehash=False)

Execute the node in its directory.

Parameters:

updatehash (boolean) – When the hash stored in the output directory as a result of a previous run does not match that calculated for this execution, updatehash=True only updates the hash without re-running.

save(filename=None)

Store this workflow element to a file.

set_input(parameter, val)[source]

Set interface input value or nodewrapper attribute Priority goes to interface.

update(**opts)

Update inputs

class nipype.interfaces.base.core.Interface[source]

This is an abstract definition for Interface objects.

It provides no functionality. It defines the necessary attributes and methods all Interface objects should have.

__init__()[source]

Subclasses must implement __init__

aggregate_outputs(runtime=None, needed_outputs=None)[source]

Called to populate outputs

property always_run

Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.

property can_resume

Defines if the interface can reuse partial results after interruption. Only applies to interfaces being run within a workflow context.

classmethod help(returnhelp=False)[source]

Prints class help

input_spec = None

The specification of the input, defined by a HasTraits class.

output_spec = None

The specification of the output, defined by a HasTraits class.

run()[source]

Execute the command.

property version

interfaces should implement a version property

class CPAC.utils.interfaces.function.Function(input_names=None, output_names='out', function=None, imports=None, as_module=False, **inputs)[source]

Runs arbitrary function as an interface

Examples

>>> func = 'def func(arg1, arg2=5): return arg1 + arg2'
>>> fi = Function(input_names=['arg1', 'arg2'], output_names=['out'])
>>> fi.inputs.function_str = func
>>> res = fi.run(arg1=1)
>>> res.outputs.out
6
__init__(input_names=None, output_names='out', function=None, imports=None, as_module=False, **inputs)[source]
Parameters:
  • input_names (single str or list or None) – names corresponding to function inputs if None, derive input names from function argument names

  • output_names (single str or list) – names corresponding to function outputs (default: ‘out’). if list of length > 1, has to match the number of outputs

  • function (callable) – callable python object. must be able to execute in an isolated namespace (possibly in concert with the imports parameter)

  • imports (list of strings) – list of import statements that allow the function to execute in an otherwise empty namespace. If these collide with imports defined via the Function.sig_imports decorator, the imports given as a parameter here will take precedence over those from the decorator.

aggregate_outputs(runtime=None, needed_outputs=None)

Collate expected outputs and apply output traits validation.

property always_run

Should the interface be always run even if the inputs were not changed? Only applies to interfaces being run within a workflow context.

property can_resume

Defines if the interface can reuse partial results after interruption. Only applies to interfaces being run within a workflow context.

classmethod help(returnhelp=False)

Prints class help

input_spec

alias of FunctionInputSpec

load_inputs_from_json(json_file, overwrite=True)

A convenient way to load pre-set inputs from a JSON file.

output_spec

alias of DynamicTraitedSpec

run(cwd=None, ignore_exception=None, **inputs)

Execute the command.

save_inputs_to_json(json_file)

A convenient way to save current inputs to a JSON file.

static sig_imports(imports: List[str]) Callable[source]

Sets an ns_imports attribute on a function for Function-node functions. This can be useful for classes needed for decorators, typehints and for avoiding redefinitions.

Parameters:

imports (list of str) – import statements to import the function in an otherwise empty namespace. If these collide with imports defined via the Function.__init__ initialization method, the imports given as a parameter here will be overridden by those from the initializer.

Returns:

func

Return type:

function

Examples

See the defintion of calculate_FD_J to see the decorator tested here being applied. >>> from CPAC.generate_motion_statistics import calculate_FD_J >>> calc_fdj = Function(input_names=[‘in_file’, ‘calc_from’, ‘center’], … output_names=[‘out_file’], … function=calculate_FD_J, … as_module=True) >>> calc_fdj.imports # doctest: +NORMALIZE_WHITESPACE [‘from CPAC.utils.interfaces.function import Function’,

‘import os’, ‘import sys’, ‘from typing import Optional’, ‘import numpy as np’, ‘from CPAC.utils.pytest import skipif’, ‘from CPAC.utils.typing import LITERAL, TUPLE’]

>>> from inspect import signature
>>> from nipype.utils.functions import (getsource,
...     create_function_from_source)
>>> f = create_function_from_source(getsource(calculate_FD_J),
...                                 calc_fdj.imports)
>>> inspect.signature(calculate_FD_J) == inspect.signature(f)
True
property version

interfaces should implement a version property

class CPAC.pipeline.nipype_pipeline_engine.Workflow(name, base_dir=None, debug=False)[source]

Controls the setup and execution of a pipeline of processes.

__init__(name, base_dir=None, debug=False)[source]

Create a workflow object. :param name: unique identifier for the workflow :type name: alphanumeric string :param base_dir: path to workflow storage :type base_dir: string, optional :param debug: enable verbose debug-level logging :type debug: boolean, optional

add_nodes(nodes)[source]

Add nodes to a workflow

Parameters:

nodes (list) – A list of EngineBase-based objects

base_dir

Define the work directory for this instance of workflow element.

clone(name)[source]

Clone a workflow

Note

Will reset attributes used for executing workflow. See _init_runtime_fields.

Parameters:

name (alphanumeric name) – unique name for the workflow

connect(*args, **kwargs)[source]

Connect nodes in the pipeline.

This routine also checks if inputs and outputs are actually provided by the nodes that are being connected.

Creates edges in the directed graph using the nodes and edges specified in the connection_list. Uses the NetworkX method DiGraph.add_edges_from.

Parameters:

args (list or a set of four positional arguments) –

Four positional arguments of the form:

connect(source, sourceoutput, dest, destinput)

source : nodewrapper node sourceoutput : string (must be in source.outputs) dest : nodewrapper node destinput : string (must be in dest.inputs)

A list of 3-tuples of the following form:

[(source, target,
    [('sourceoutput/attribute', 'targetinput'),
    ...]),
...]

Or:

[(source, target, [(('sourceoutput1', func, arg2, ...),
                            'targetinput'), ...]),
...]
sourceoutput1 will always be the first argument to func
and func will be evaluated and the results sent to targetinput

currently func needs to define all its needed imports within the
function as we use the inspect module to get at the source code
and execute it remotely

disconnect(*args)[source]

Disconnect nodes See the docstring for connect for format.

export(filename=None, prefix='output', format='python', include_config=False)[source]

Export object into a different format

Parameters:
  • filename (string) – file to save the code to; overrides prefix

  • prefix (string) – prefix to use for output file

  • format (string) – one of “python”

  • include_config (boolean) – whether to include node and workflow config values

property fullname

Build the full name down the hierarchy.

get_node(name)[source]

Return an internal node by name

property itername

Get the name of the expanded iterable.

list_node_names()[source]

List names of all nodes in a workflow

static load(filename)

Load this workflow element from a file.

property name

Set the unique name of this workflow element.

remove_nodes(nodes)[source]

Remove nodes from a workflow

Parameters:

nodes (list) – A list of EngineBase-based objects

run(plugin=None, plugin_args=None, updatehash=False)[source]

Execute the workflow

Parameters:
  • plugin (plugin name or object) – Plugin to use for execution. You can create your own plugins for execution.

  • plugin_args (dictionary containing arguments to be sent to plugin) – constructor. see individual plugin doc strings for details.

save(filename=None)

Store this workflow element to a file.

write_graph(dotfilename='graph.dot', graph2use='hierarchical', format='png', simple_form=True)[source]

Generates a graphviz dot file and a png file

Parameters:
  • graph2use ('orig', 'hierarchical' (default), 'flat', 'exec', 'colored') – orig - creates a top level graph without expanding internal workflow nodes; flat - expands workflow nodes recursively; hierarchical - expands workflow nodes recursively with a notion on hierarchy; colored - expands workflow nodes recursively with a notion on hierarchy in color; exec - expands workflows to depict iterables

  • format ('png', 'svg') –

  • simple_form (boolean (default: True)) – Determines if the node name used in the graph should be of the form ‘nodename (package)’ when True or ‘nodename.Class.package’ when False.

The Nipype utility function log_to_dict reads a log file generated by log_nodes_cb to a Python dictionary.

CPAC.utils.monitoring.log_nodes_cb(node, status)[source]

Function to record node run statistics to a log file as json dictionaries

Parameters:
  • node (nipype.pipeline.engine.Node) – the node being logged

  • status (string) – acceptable values are ‘start’, ‘end’; otherwise it is considered and error

Returns:

this function does not return any values, it logs the node status info to the callback logger

Return type:

None

Module to draw an html gantt chart from logfile produced by CPAC.utils.monitoring.log_nodes_cb().

See https://nipype.readthedocs.io/en/latest/api/generated/nipype.utils.draw_gantt_chart.html

CPAC.utils.monitoring.draw_gantt_chart.calculate_resource_timeseries(events, resource)[source]

Given as event dictionary, calculate the resources used as a timeseries

Parameters:
  • events (dictionary) – a dictionary of event-based node dictionaries of the workflow execution statistics

  • resource (string) – the resource of interest to return the time-series of; e.g. ‘runtime_memory_gb’, ‘estimated_threads’, etc

Returns:

time_series – a pandas Series object that contains timestamps as the indices and the resource amount as values

Return type:

pandas Series

CPAC.utils.monitoring.draw_gantt_chart.create_event_dict(start_time, nodes_list)[source]

Function to generate a dictionary of event (start/finish) nodes from the nodes list

Parameters:
  • start_time (datetime.datetime) – a datetime object of the pipeline start time

  • nodes_list (list) – a list of the node dictionaries that were run in the pipeline

Returns:

events – a dictionary where the key is the timedelta from the start of the pipeline execution to the value node it accompanies

Return type:

dictionary

CPAC.utils.monitoring.draw_gantt_chart.draw_nodes(start, nodes_list, cores, minute_scale, space_between_minutes, colors)[source]

Function to return the html-string of the node drawings for the gantt chart

Parameters:
  • start (datetime.datetime obj) – start time for first node

  • nodes_list (list) – a list of the node dictionaries

  • cores (integer) – the number of cores given to the workflow via the ‘n_procs’ plugin arg

  • total_duration (float) – total duration of the workflow execution (in seconds)

  • minute_scale (integer) – the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish

  • space_between_minutes (integer) – scale factor in pixel spacing between minute line markers

  • colors (list) – a list of colors to choose from when coloring the nodes in the gantt chart

Returns:

result – the html-formatted string for producing the minutes-based time line markers

Return type:

string

CPAC.utils.monitoring.draw_gantt_chart.generate_gantt_chart(logfile, cores, minute_scale=10, space_between_minutes=50, colors=['#7070FF', '#4E4EB2', '#2D2D66', '#9B9BFF'])[source]

Generates a gantt chart in html showing the workflow execution based on a callback log file. This script was intended to be used with the MultiprocPlugin. The following code shows how to set up the workflow in order to generate the log file:

Parameters:
  • logfile (string) – filepath to the callback log file to plot the gantt chart of

  • cores (integer) – the number of cores given to the workflow via the ‘n_procs’ plugin arg

  • minute_scale (integer (optional); default=10) – the scale, in minutes, at which to plot line markers for the gantt chart; for example, minute_scale=10 means there are lines drawn at every 10 minute interval from start to finish

  • space_between_minutes (integer (optional); default=50) – scale factor in pixel spacing between minute line markers

  • colors (list (optional)) – a list of colors to choose from when coloring the nodes in the gantt chart

Returns:

  • None – the function does not return any value but writes out an html file in the same directory as the callback log path passed in

  • Usage

  • -----

  • # import logging

  • # import logging.handlers

  • # from nipype.utils.profiler import log_nodes_cb

  • # log_filename = 'callback.log'

  • # logger = logging.getLogger(``’callback’``)

  • # logger.setLevel(logging.DEBUG)

  • # handler = logging.FileHandler(log_filename)

  • # logger.addHandler(handler)

  • # #create workflow

  • # workflow = ...

  • # workflow.run(plugin=``’MultiProc’``,

  • # plugin_args={‘n_procs’ (8, 'memory':12, 'status_callback': log_nodes_cb``}``:class:))

  • # generate_gantt_chart(``’callback.log’``, 8)

CPAC.utils.monitoring.draw_gantt_chart.resource_overusage_report(cblog)[source]

Function to parse through a callback log for memory and/or thread usage above estimates / limits.

Parameters:

cblog (str) – path to callback.log

Returns:

  • text_report (str)

  • excessive (dict)

CPAC.utils.monitoring.draw_gantt_chart.resource_report(callback_log, num_cores, logger=None)[source]

Function to attempt to warn any excessive resource usage and generate an interactive HTML chart.

Parameters:
Return type:

None