#!/usr/bin/env python3
# Copyright (C) 2022-2024 C-PAC Developers
# This file is part of C-PAC.
# C-PAC is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
# C-PAC is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with C-PAC. If not, see <https://www.gnu.org/licenses/>.
"""Functions to create YAML configuration files from templates."""
from copy import deepcopy
from datetime import datetime
from hashlib import sha1
import os
import re
from typing import Optional
from click import BadParameter
import yaml
from CPAC.utils.configuration import Configuration, preconfig_yaml, Preconfiguration
from CPAC.utils.monitoring import UTLOGGER
from CPAC.utils.utils import update_config_dict, update_pipeline_values_1_8, YAML_BOOLS
YAML_LOOKUP = {yaml_str: key for key, value in YAML_BOOLS.items() for yaml_str in value}
[docs]
class YamlTemplate: # pylint: disable=too-few-public-methods
"""A class to link YAML comments to the contents of a YAML file.
Attributes
----------
comments : dict
Flat dictionary with ``'.'``-delimited pseudo-nested structure.
E.g., comments for ``{'pipeline_setup': {'pipeline_name': value}}``
would be keyed
``{'pipeline_setup': comment0, 'pipeline_setup.pipeline_name: comment1}`` to
allow comments at each level of depth.
dump : method
get_nested : method
original : str
"""
def __init__(self, original_yaml, base_config=None):
"""Initialize a YamlTemplate.
Parameters
----------
original_yaml : str
raw YAML or path to YAML file
base_config : Configuration, optional
"""
try:
original_yaml = preconfig_yaml(original_yaml)
except BadParameter:
pass
if os.path.exists(original_yaml):
with open(original_yaml, "r", encoding="utf-8") as _f:
original_yaml = _f.read()
self.comments = {}
self.template = original_yaml
if base_config is None:
if isinstance(self.template, dict):
self._dict = self.template
if isinstance(self.template, str):
self._dict = yaml.safe_load(self.template)
else:
self._dict = base_config.dict()
self._parse_comments()
get_nested = Configuration.get_nested
[docs]
def dump(self, new_dict, parents=None):
"""Dump YAML from a new dictionary with comments from template dictionary.
Parameters
----------
new_dict : dict
parents : list of str
Returns
-------
str
"""
# SSOT FSLDIR
try: # Get from current config
fsldir = self.get_nested(
new_dict, ["pipeline_setup", "system_config", "FSLDIR"]
)
except KeyError: # Get from imported base
fsldir = self.get_nested(
self._dict, ["pipeline_setup", "system_config", "FSLDIR"]
)
# Add YAML version directive to top of document and ensure
# C-PAC version comment and 'FROM' are at the top of the YAML
# output
if parents is None:
parents = []
_dump = ["%YAML 1.1", "---"]
if "pipeline_setup" not in new_dict:
new_dict["pipeline_setup"] = None
else:
_dump = []
# Prepare for indentation
line_level = len(parents)
# Get a safely mutable copy of the dict
loop_dict = deepcopy(
self.get_nested(new_dict, parents) if parents else new_dict
)
# Grab special key to print first
import_from = loop_dict.pop("FROM", None)
# Iterate through mutated dict
for key in loop_dict:
# List of progressively-indented key strings
keys = [*parents, key]
# Comments are stored in a flat dictionary with
# '.'-delimited pseudonested keys
comment = self.comments.get(".".join(keys))
# This exception should only happen from mutations
# introduced this function
try:
value = self.get_nested(new_dict, keys)
except KeyError: # exclude unincluded keys
continue
# Print comment if there's one above this key in the template
if comment:
if key != "pipeline_setup":
_dump += [""] # Add a blank line above the comment
_dump += [indent(line_level, 0) + line for line in comment]
# Print 'FROM' between preamble comment and rest of config
# if applicable
if key == "pipeline_setup" and import_from is not None:
_dump += [f"FROM: {import_from}", ""]
# Apply indentation to key
indented_key = f"{indent(line_level, 0)}{key}:"
# Print YAML-formatted value
if value is not None:
# SSOT FSLDIR
if isinstance(value, str) and fsldir in value and key != "FSLDIR":
value = re.sub(
r"\$*FSLDIR", "$FSLDIR", value.replace(fsldir, "$FSLDIR")
)
if isinstance(value, dict):
_dump += [indented_key, self.dump(new_dict, keys)]
elif isinstance(value, list):
list_line = _format_list_items(value, line_level)
if "\n" in list_line:
_dump += [indented_key, *list_line.split("\n")]
else:
_dump += [f"{indented_key} {list_line}"]
elif isinstance(value, bool) or (
isinstance(value, str) and value.lower() in YAML_LOOKUP
):
if isinstance(value, str):
value = YAML_LOOKUP[value.lower()]
value = "On" if value is True else "Off"
_dump += [f"{indented_key} {value}"]
else:
_dump += [f"{indented_key} {value}"]
elif key != "pipeline_setup":
_dump += [indented_key]
# Normalize line spacing and return YAML string
return re.sub("\n{3,}", "\n\n", "\n".join(_dump)).rstrip() + "\n"
def _parse_comments(self):
# Split YAML into lines
yaml_lines = self.template.split("\n")
# Initialize comment and key
comment = []
key = []
for line in yaml_lines:
# Calculate indentation
line_level = _count_indent(line)
# Remove indentation and trailing whitespace
stripped_line = line.strip()
# Collect a line of a comment
if stripped_line.startswith("#"):
comment.append(stripped_line)
# If a line is not a comment line:
elif not any(stripped_line.startswith(seq) for seq in ("%YAML", "---")):
# If the line is a key
if ":" in stripped_line:
# Set the key for the comments dictionary
line_key = stripped_line.split(":", 1)[0].strip()
if line_level == 0:
key = [line_key]
else:
key = [*key[:line_level], line_key]
# Store the full list of comment lines
self.comments[".".join(key)] = comment
# Reset the comment variable to collect the next comment
comment = []
def _count_indent(line):
"""Determine indentation level.
Parameters
----------
line : str
Returns
-------
number_of_indents : int
Examples
--------
>>> _count_indent('No indent')
0
>>> _count_indent(' Four spaces')
2
"""
return (len(line) - len(line.lstrip())) // 2
[docs]
def create_yaml_from_template(
d: Configuration | dict, # pylint: disable=invalid-name
template: str = "default",
import_from: Optional[str] = None,
skip_env_check: Optional[bool] = False,
) -> str:
"""Save dictionary to a YAML file, keeping the structure from the template.
For example, first level comments and ordering.
It may not be fully robust to YAML structures, but it works for C-PAC config files!
Parameters
----------
d : dict or Configuration
template : str
path to template, name of preconfig, or YAML as a string
import_from : str, optional
name of a preconfig. Full config is generated if omitted
skip_env_check : bool, optional
skip environment check (for validating a config without running)
Examples
--------
>>> import yaml
>>> from CPAC.utils.configuration import Configuration, Preconfiguration
>>> Configuration(yaml.safe_load(create_yaml_from_template({}))).dict(
... ) == Configuration({}).dict()
True
>>> fmriprep_options = Preconfiguration('fmriprep-options')
>>> fmriprep_options - Configuration({}) != {}
True
>>> fmriprep_options - fmriprep_options
{}
>>> fmriprep_options - Preconfiguration('fmriprep-options')
{}
>>> fmriprep_options - Configuration({'FROM': 'fmriprep-options'})
{}
>>> fmriprep_options - Configuration(yaml.safe_load(
... create_yaml_from_template(fmriprep_options, import_from=None)))
{}
>>> fmriprep_options - Configuration(yaml.safe_load(
... create_yaml_from_template(fmriprep_options,
... import_from='default')))
{}
>>> fmriprep_options - Configuration(yaml.safe_load(
... create_yaml_from_template(fmriprep_options, import_from='blank')))
{}
>>> different_sca = Configuration({'pipeline_setup': {
... 'pipeline_name': 'different_SCA'},
... 'seed_based_correlation_analysis': {'run': 'y',
... 'norm_timeseries_for_DR': 'Off'}})
>>> (Configuration(yaml.safe_load(create_yaml_from_template(
... different_sca))) - Configuration()).get(
... 'seed_based_correlation_analysis') not in (None, {})
True
"""
if import_from is None: # full config
d = d.dict() if isinstance(d, Configuration) else d
base_config = None
else: # config based on preconfig
d = Configuration(d) if not isinstance(d, Configuration) else d
base_config = Preconfiguration(import_from, skip_env_check=skip_env_check)
d = (d - base_config).left
d.update({"FROM": import_from})
yaml_template = YamlTemplate(template, base_config)
return yaml_template.dump(new_dict=d)
def _format_list_items(
l: list, # noqa: E741 # pylint:disable=invalid-name
line_level: int,
short_list_length: int = 50,
) -> str:
"""Handle lists in the YAML.
Parameters
----------
l : list
line_level : int
Returns
-------
yaml : str
Examples
--------
>>> print(_format_list_items([1, 2, {'nested': 3}], 0))
- 1
- 2
- nested: 3
>>> print(
... _format_list_items([1, 2, {'nested': [3, {'deep': [4]}]}], 1))
- 1
- 2
- nested:
- 3
- deep:
- 4
"""
# keep short, simple lists in square brackets
if all(isinstance(item, (str, bool, int, float)) for item in l):
preformat = str([yaml_bool(item) for item in l])
if len(preformat) < short_list_length:
return preformat.replace("'", "").replace('"', "")
# list long or complex lists on lines with indented '-' lead-ins
return "\n".join(
[
f"{indent(line_level)}{li}"
for li in yaml.dump(yaml_bool(l), sort_keys=False)
.replace("'On'", "On")
.replace("'Off'", "Off")
.split("\n")
]
).rstrip()
[docs]
def hash_data_config(sub_list):
"""Generate a short SHA1 hash from a data config subject list of dicts.
Parameters
----------
sub_list : list of dicts
Returns
-------
data_config_hash : str, len(8)
Examples
--------
>>> sub_list = [{'site_id': f'site{i}', 'subject_id': f'sub{i}',
... 'unique_id': f'uid{i}'} for i in range(1, 4)]
>>> sub_list[0]
{'site_id': 'site1', 'subject_id': 'sub1', 'unique_id': 'uid1'}
>>> hash_data_config(sub_list)
'6f49a278'
"""
return sha1(
"_".join(
[
",".join([run.get(key, "") for run in sub_list])
for key in ["site_id", "subject_id", "unique_id"]
]
).encode("utf-8")
).hexdigest()[:8]
[docs]
def indent(line_level, plus=2):
"""Return an indent string for a given level.
Parameters
----------
line_level : int
The level of indentation to return
Returns
-------
str
The string of spaces to use for indentation
"""
return " " * (2 * line_level + plus)
[docs]
def yaml_bool(value):
"""Give On/Off value to bools.
Parameters
----------
value : any
Returns
-------
value : any
Examples
--------
>>> yaml_bool(True)
'On'
>>> yaml_bool([False, 'On', True])
['Off', 'On', 'On']
"""
if isinstance(value, str):
lookup_value = value.lower()
if lookup_value in YAML_LOOKUP:
value = YAML_LOOKUP[lookup_value]
elif isinstance(value, list):
return [yaml_bool(item) for item in value]
elif isinstance(value, dict):
# if 'Name' is a key, promote that item to the top
return {
**({"Name": value["Name"]} if "Name" in value else {}),
**{k: yaml_bool(value[k]) for k in value if k != "Name"},
}
if isinstance(value, bool):
if value:
return "On"
return "Off"
return value
[docs]
def upgrade_pipeline_to_1_8(path):
"""Upgrade a C-PAC 1.7 pipeline config to C-PAC 1.8.
Parameters
----------
path : str
Returns
-------
None
Outputs
-------
{path}.{now}.bak
original file
path
upgraded file
"""
# back up original config
now = datetime.isoformat(datetime.now()).replace(":", "_")
backup = f"{path}.{now}.bak"
UTLOGGER.info("Backing up %s to %s and upgrading to C-PAC 1.8", path, backup)
with open(path, "r", encoding="utf-8") as _f:
original = _f.read()
with open(backup, "w", encoding="utf-8") as _f:
_f.write(original)
# upgrade and overwrite
orig_dict = yaml.safe_load(original)
# set Regressor 'Name's if not provided
regressors = orig_dict.get("Regressors")
if isinstance(regressors, list):
for i, regressor in enumerate(regressors):
if "Name" not in regressor:
regressor["Name"] = f"Regressor-{i + 1!s}"
if "pipelineName" in orig_dict and len(original.strip()):
middle_dict, leftovers_dict, _complete_dict = update_config_dict(orig_dict)
with open(path, "w", encoding="utf-8") as _f:
_f.write(create_yaml_from_template(update_pipeline_values_1_8(middle_dict)))
if leftovers_dict:
with open(f"{path}.rem", "w", encoding="utf-8") as _f:
_f.write(yaml.dump(leftovers_dict))
[docs]
def update_a_preconfig(preconfig, import_from):
"""Update a preconfig with comments from another config.
Parameters
----------
preconfig : str
import_from : str
"""
UTLOGGER.info("Updating %s preconfig…", preconfig)
updated = create_yaml_from_template(
Preconfiguration(preconfig, skip_env_check=True),
import_from=import_from,
skip_env_check=True,
)
with open(preconfig_yaml(preconfig), "w", encoding="utf-8") as _f:
_f.write(updated)
[docs]
def update_all_preconfigs():
"""Update all other preconfigs with comments from default."""
from CPAC.pipeline import ALL_PIPELINE_CONFIGS
not_from_blank = (
"anat-only",
"blank",
"default",
"fx-options",
"nhp-macaque",
"preproc",
"rbc-options",
)
update_a_preconfig("blank", None)
for preconfig in ("anat-only", "preproc"):
update_a_preconfig(preconfig, "default")
for preconfig in ("fx-options", "rbc-options"):
update_a_preconfig(preconfig, "fmriprep-options")
update_a_preconfig("nhp-macaque", "monkey")
for preconfig in (_ for _ in ALL_PIPELINE_CONFIGS if _ not in not_from_blank):
update_a_preconfig(preconfig, "blank")
if __name__ == "__main__":
update_all_preconfigs()