"""A module that helps to handle the YAML configuration.
"""
from __future__ import annotations
import typing
from collections.abc import MutableMapping
import warnings
import os
import os.path as op
from pathlib import Path
import yaml
StepConfigDict = typing.Dict[str, typing.Any]
PipelineConfigDict = typing.Dict[str, StepConfigDict]
[docs]def resolve_relative_path(path: str, folder_name: str = "") -> str:
etx4velo_repo = os.environ.get("ETX4VELO_REPO")
if etx4velo_repo is None:
return path
else:
return os.path.join(etx4velo_repo, folder_name, path)
[docs]def load_dict(
path_or_config: str | typing.Dict[typing.Any, typing.Any]
) -> typing.Dict[typing.Any, typing.Any]:
"""Load the dictionary stored in a dictionary file, or just passthrough
if the provided input is already a dictionary.
Args:
path_or_config: dictionary or path to a YAML file containing a dictionary
Returns:
dictionary contained in the YAML file or inputted dictionary
"""
if isinstance(path_or_config, str):
with open(path_or_config, "r") as config_file:
return yaml.load(config_file, Loader=yaml.SafeLoader)
elif isinstance(path_or_config, dict):
return path_or_config.copy()
else:
raise TypeError(
"`path_or_config` is expected either to be a string or a dictionary, "
f"but is {type(path_or_config).__name__}"
)
[docs]class CommonDirs:
"""A class that handles the common configuration in ``setup/common_config.yaml``."""
def __init__(self) -> None:
self._common_config = None
@property
def common_config(self) -> typing.Dict[str, typing.Any]:
"""Common configuration dictionary, in ``setup/common_config.yaml``."""
if self._common_config is None:
with open(
resolve_relative_path("setup/common_config.yaml"), "r"
) as common_config_file:
self._common_config = yaml.load(
common_config_file, Loader=yaml.SafeLoader
)
return self._common_config
def __getattribute__(self, __name: str) -> str:
possible_directories = [
"data_directory",
"artifact_directory",
"performance_directory",
"reference_directory",
"analysis_directory",
"export_directory",
]
if __name in possible_directories:
return resolve_relative_path(
self.common_config["directories"][__name], folder_name="etx4velo"
)
else:
return super().__getattribute__(__name)
@property
def repository(self) -> str:
"""Path to the repository."""
repository = os.environ.get("ETX4VELO_REPO")
if repository is None:
raise RuntimeError(
"The environement variable `ETX4VELO_REPO` was not defined. "
"Please consider running `source setup.sh`."
)
else:
return repository
@property
def test_config_path(self):
"""Path to the test configuration file."""
return op.join(self.repository, "etx4velo", "test_samples.yaml")
@property
def detectors(self) -> typing.List[str]:
"""List of available detectors."""
return list(self.common_config["detectors"].keys())
[docs] def get_filenames_from_detector(self, detector: str) -> typing.Dict[str, str]:
"""Get the .parquet filenames for a given detector."""
return self.common_config["detectors"][detector]
[docs]class PipelineConfig(MutableMapping):
_unresolved_steps: typing.Set[str] = set()
def __init__(
self,
path_or_config: str | PipelineConfigDict | "PipelineConfig",
common_config: StepConfigDict | None = None,
dir_path: str | None = None,
) -> None:
"""Augmented experiment configuration.
Args:
path_or_config: Configuration dictionary or path to a YAML file
that contains it
dir_path: path to directory paths in ``input`` are expressed relative to.
If not provided, the directory where this configuration is located
is used.
"""
super().__init__()
if isinstance(path_or_config, PipelineConfig):
self._common_config = (
{**common_config, **path_or_config._common_config}
if common_config is not None
else path_or_config._common_config
)
self._dir_path = (
dir_path if dir_path is not None else path_or_config._dir_path
)
self._configs = path_or_config._configs
else:
configs: PipelineConfigDict = load_dict(path_or_config=path_or_config)
# Extract common configuration
if common_config is None:
common_config = {}
common_config = {**common_config, **configs.pop("common", {})}
self._resolve_common_config(
common_config=common_config,
path=(path_or_config if isinstance(path_or_config, str) else None),
)
self._common_config = common_config
if dir_path is not None:
self._dir_path = dir_path
elif isinstance(path_or_config, str):
self._dir_path = str(Path(path_or_config).parent.absolute())
else:
self._dir_path = None # relative to the current working directory
self._configs = configs
# Format steps
# This may load extra steps from other configuration files
self._unresolved_steps = set(self._configs.keys())
steps = list(configs.keys())
for step in steps:
self._resolve_step_config(step=step) # in-place
assert not self._unresolved_steps
def __getitem__(self, step: str) -> StepConfigDict:
return self._configs[step]
def __setitem__(self, step: str, step_config: StepConfigDict) -> None:
self._configs[step] = step_config
def __delitem__(self, step: typing.Any) -> None:
del self._configs[step]
def __iter__(self) -> typing.Iterator:
return iter(self._configs)
def __len__(self) -> int:
return len(self._configs)
[docs] def add_config(
self,
path_or_config: str | PipelineConfigDict,
steps: typing.Sequence[str] | None = None,
) -> None:
"""Add a configuration to the current configuration.
Args:
path: path to the configuration to add
steps: list of steps to add from this configuration. If not specified,
all the steps are added.
raises:
ValueError: a step that already exists in the current dictionary
is trying to be added.
"""
if (self.dir_path is not None) and isinstance(path_or_config, str):
path_or_config = op.join(self.dir_path, path_or_config)
config = PipelineConfig(path_or_config=path_or_config)
if steps is None:
steps = config.steps
# Add the required steps to the configuration
for step in steps:
if (step_config := config.get(step)) is not None:
if step in self:
warnings.warn(
(
f"Trying to add step {step} from the experiment "
f"configuration `{config.experiment_name}`"
"but the latter was already defined. Skipping it..."
),
UserWarning,
)
else:
self[step] = step_config
else:
raise ValueError(
f"The step `{step}` could not be found in the experiment "
f"configuration `{config.experiment_name}`"
)
# If a common configuration parameter is not defined in the current
# configuration, use this configuration to define this parameter
for key, value in config.common_config.items():
if key not in self.common_config:
self.common_config[key] = value
def _handle_input_entry(
self, input_entry: str, step: str, step_config: StepConfigDict
) -> None:
"""the ``input`` of a step corresponds to the name of the step that
needs to be run before this step.
In that case, we can look up the output directory of the input step to
propagate it to the input directory of the current step.
The function modifies ``step_config`` in place.
Args:
input_entry: value associated with the key ``input`` in the step
configuration
step: current step
step_config: configuration of this step
"""
# first, check that `input_dir` or `input_subdirectory` was not
# provided (which would be redundant with `input`)
for property in ["input_dir", "input_subdirectory"]:
if property in self:
raise ValueError(
f"Both `input` and `{property}` were provided for step {step}, "
"which is redundant."
)
if ":" in input_entry:
input_entries = input_entry.split(":")
if len(input_entries) != 2:
raise SyntaxError(
"The `input` entry should be in the format `<path>:<step> "
f"but is: {input_entry}"
)
input_path, input_step = input_entries
else:
input_path = None
input_step = input_entry
# if input path is provided, add it to the configuration
if input_path:
self.add_config(path_or_config=input_path, steps=[input_step])
else:
# this step needs to be resolved beforehand
self._resolve_step_config(step=input_step)
# Deduce the input directory
step_config["input_dir"] = self[input_step]["output_dir"]
def _resolve_step_config(self, step: str) -> None:
"""This function
* Deduces the ``input_dir`` from ``input`` or ``input_subdirectory``
* Deduces the ``output_dir`` from ``output_subdirectory`` or set it to be
equal to the name of the step.
This function is in-place: the changes are applied to the input ``step_config``
dictionary.
Args:
step: name of the step (e.g., ``embedding``, ``processing``)
step_config: configuration dictionary of this step
"""
if step in self._unresolved_steps:
step_config = self._configs[step]
# If `input` is provided, use it to deduce the input directory `input_dir`
# Remove `input` entry if it exists.
# We could keep it since the latter provides more information than just
# the `input_dir`
input_entry: str | None = step_config.pop("input", None)
if input_entry is not None:
self._handle_input_entry(
input_entry=input_entry, step=step, step_config=step_config
)
# Transform `input_subdirectory` and `output_subdirectory`
# into the actual `input_dir` and `output_dir`
for inoutput in ["input", "output"]:
if (
subdir := step_config.pop(f"{inoutput}_subdirectory", None)
) is not None:
assert f"{inoutput}_dir" not in step_config, (
f"`{inoutput}_subdirectory` and `{inoutput}_dir` as both "
f"the configuration of {step}, which might create a clash."
)
step_config[f"{inoutput}_dir"] = op.join(
self.data_experiment_dir,
subdir,
)
# if `output_dir` does not exist, create it through the name of the step
if "output_dir" not in step_config:
step_config["output_dir"] = op.join(self.data_experiment_dir, step)
self._unresolved_steps.remove(step)
def _resolve_common_config(
self, common_config: StepConfigDict, path: str | None = None
) -> None:
"""This function
* Adds ``experiment_name`` to the common configuration dictionary if it is
missing, from the name of the pipeline configuration
* Add ``detector`` to the common configuration dictionary if it is missing,
assuming it is the first in the list of available detectors.
Args:
common_config: common configuration dictionary
path: path to the current YAML configuration file (if available)
raises:
ValueError: no experiment name in the common configuration but
not input path was provided.
"""
if "experiment_name" not in common_config:
if path is not None:
common_config["experiment_name"] = op.splitext(op.basename(path))[0]
else:
raise ValueError(
"The `experiment_name` was not provided in the `common` "
"configuration. Since the configuration was not loaded "
"from a path, it cannot be derived from the name of the "
"YAML configuration."
)
if "detector" not in common_config:
common_config["detector"] = cdirs.detectors[0]
@property
def dir_path(self) -> str | None:
"""Path to the directory the paths in `input` are expressed w.r.t."""
return self._dir_path
@property
def steps(self) -> typing.List[str]:
return list(self.keys())
[docs] def dict(self) -> typing.Dict[str, typing.Dict[str, typing.Any]]:
"""Turn the experiment configuration dictionary into a regular dictionary
of dictionaries.
"""
return {"common": self.common_config, **self._configs}
@property
def common_config(self) -> typing.Dict[str, typing.Any]:
"""Common configuration dictionary"""
return self._common_config
@property
def experiment_name(self) -> str:
"""Name of the experiment"""
return self.common_config["experiment_name"]
@property
def data_experiment_dir(self) -> str:
"Path to the dictionary that contains all the data of the given experiment."
return op.join(cdirs.data_directory, self.experiment_name)
@property
def performance_dir(self) -> str:
"""Directory where"""
return op.join(cdirs.performance_directory, self.experiment_name)
@property
def detector(self) -> str:
"""Detector the pipeline is applied to."""
return self._common_config["detector"]
@property
def required_test_dataset_names(self) -> typing.List[str]:
return self.common_config["test_dataset_names"]
[docs] def get_test_batch_dir(self, step: str, test_dataset_name: str) -> str:
return op.join(
cdirs.data_directory,
self.experiment_name,
step,
"test",
test_dataset_name + "/",
)
cdirs = CommonDirs()
[docs]def load_config(path_or_config: str | PipelineConfigDict) -> PipelineConfigDict:
"""Load the configuration if not already.
Also replace ``input_subdirectory`` by ``input_dir`` and ``output_subdirectory``
by ``output_subdirectory`` in the loaded configuration. For this reason,
please always load the configuration using this function.
"""
if isinstance(path_or_config, dict):
return path_or_config
else:
return PipelineConfig(path_or_config=path_or_config).dict()
[docs]def get_pipeline_config_path(experiment_name: str) -> str:
"""Get the path to the pipeline config YAML file.
Args:
experiment_name: name of the experiment
Returns:
Path where the YAML file that contains the configuration of ``experiment_name``
is stored.
"""
return resolve_relative_path(
op.join("pipeline_configs", experiment_name + ".yaml"), folder_name="etx4velo"
)
[docs]def get_detector_from_pipeline_config(path_or_config: str | dict) -> str:
detector = load_config(path_or_config)["common"].get("detector")
if detector is None:
return cdirs.detectors[0]
else:
return detector
[docs]def get_detector_from_experiment_name(experiment_name: str) -> str:
"""Get the detector of an experimetn.
Args:
experiment_name: Name of an experiment
Returns:
Detector used in this experiment
"""
return get_detector_from_pipeline_config(
path_or_config=get_pipeline_config_path(experiment_name)
)