Source code for pipeline.utils.commonutils.config

"""A module that helps to handle the YAML configuration.
"""

from __future__ import annotations
import typing
from collections.abc import MutableMapping
import warnings
import os
import os.path as op
from pathlib import Path
import yaml

StepConfigDict = typing.Dict[str, typing.Any]
PipelineConfigDict = typing.Dict[str, StepConfigDict]


[docs]def resolve_relative_path(path: str, folder_name: str = "") -> str: etx4velo_repo = os.environ.get("ETX4VELO_REPO") if etx4velo_repo is None: return path else: return os.path.join(etx4velo_repo, folder_name, path)
[docs]def load_dict( path_or_config: str | typing.Dict[typing.Any, typing.Any] ) -> typing.Dict[typing.Any, typing.Any]: """Load the dictionary stored in a dictionary file, or just passthrough if the provided input is already a dictionary. Args: path_or_config: dictionary or path to a YAML file containing a dictionary Returns: dictionary contained in the YAML file or inputted dictionary """ if isinstance(path_or_config, str): with open(path_or_config, "r") as config_file: return yaml.load(config_file, Loader=yaml.SafeLoader) elif isinstance(path_or_config, dict): return path_or_config.copy() else: raise TypeError( "`path_or_config` is expected either to be a string or a dictionary, " f"but is {type(path_or_config).__name__}" )
[docs]class CommonDirs: """A class that handles the common configuration in ``setup/common_config.yaml``.""" def __init__(self) -> None: self._common_config = None @property def common_config(self) -> typing.Dict[str, typing.Any]: """Common configuration dictionary, in ``setup/common_config.yaml``.""" if self._common_config is None: with open( resolve_relative_path("setup/common_config.yaml"), "r" ) as common_config_file: self._common_config = yaml.load( common_config_file, Loader=yaml.SafeLoader ) return self._common_config def __getattribute__(self, __name: str) -> str: possible_directories = [ "data_directory", "artifact_directory", "performance_directory", "reference_directory", "analysis_directory", "export_directory", ] if __name in possible_directories: return resolve_relative_path( self.common_config["directories"][__name], folder_name="etx4velo" ) else: return super().__getattribute__(__name) @property def repository(self) -> str: """Path to the repository.""" repository = os.environ.get("ETX4VELO_REPO") if repository is None: raise RuntimeError( "The environement variable `ETX4VELO_REPO` was not defined. " "Please consider running `source setup.sh`." ) else: return repository @property def test_config_path(self): """Path to the test configuration file.""" return op.join(self.repository, "etx4velo", "test_samples.yaml") @property def detectors(self) -> typing.List[str]: """List of available detectors.""" return list(self.common_config["detectors"].keys())
[docs] def get_filenames_from_detector(self, detector: str) -> typing.Dict[str, str]: """Get the .parquet filenames for a given detector.""" return self.common_config["detectors"][detector]
[docs]class PipelineConfig(MutableMapping): _unresolved_steps: typing.Set[str] = set() def __init__( self, path_or_config: str | PipelineConfigDict | "PipelineConfig", common_config: StepConfigDict | None = None, dir_path: str | None = None, ) -> None: """Augmented experiment configuration. Args: path_or_config: Configuration dictionary or path to a YAML file that contains it dir_path: path to directory paths in ``input`` are expressed relative to. If not provided, the directory where this configuration is located is used. """ super().__init__() if isinstance(path_or_config, PipelineConfig): self._common_config = ( {**common_config, **path_or_config._common_config} if common_config is not None else path_or_config._common_config ) self._dir_path = ( dir_path if dir_path is not None else path_or_config._dir_path ) self._configs = path_or_config._configs else: configs: PipelineConfigDict = load_dict(path_or_config=path_or_config) # Extract common configuration if common_config is None: common_config = {} common_config = {**common_config, **configs.pop("common", {})} self._resolve_common_config( common_config=common_config, path=(path_or_config if isinstance(path_or_config, str) else None), ) self._common_config = common_config if dir_path is not None: self._dir_path = dir_path elif isinstance(path_or_config, str): self._dir_path = str(Path(path_or_config).parent.absolute()) else: self._dir_path = None # relative to the current working directory self._configs = configs # Format steps # This may load extra steps from other configuration files self._unresolved_steps = set(self._configs.keys()) steps = list(configs.keys()) for step in steps: self._resolve_step_config(step=step) # in-place assert not self._unresolved_steps def __getitem__(self, step: str) -> StepConfigDict: return self._configs[step] def __setitem__(self, step: str, step_config: StepConfigDict) -> None: self._configs[step] = step_config def __delitem__(self, step: typing.Any) -> None: del self._configs[step] def __iter__(self) -> typing.Iterator: return iter(self._configs) def __len__(self) -> int: return len(self._configs)
[docs] def add_config( self, path_or_config: str | PipelineConfigDict, steps: typing.Sequence[str] | None = None, ) -> None: """Add a configuration to the current configuration. Args: path: path to the configuration to add steps: list of steps to add from this configuration. If not specified, all the steps are added. raises: ValueError: a step that already exists in the current dictionary is trying to be added. """ if (self.dir_path is not None) and isinstance(path_or_config, str): path_or_config = op.join(self.dir_path, path_or_config) config = PipelineConfig(path_or_config=path_or_config) if steps is None: steps = config.steps # Add the required steps to the configuration for step in steps: if (step_config := config.get(step)) is not None: if step in self: warnings.warn( ( f"Trying to add step {step} from the experiment " f"configuration `{config.experiment_name}`" "but the latter was already defined. Skipping it..." ), UserWarning, ) else: self[step] = step_config else: raise ValueError( f"The step `{step}` could not be found in the experiment " f"configuration `{config.experiment_name}`" ) # If a common configuration parameter is not defined in the current # configuration, use this configuration to define this parameter for key, value in config.common_config.items(): if key not in self.common_config: self.common_config[key] = value
def _handle_input_entry( self, input_entry: str, step: str, step_config: StepConfigDict ) -> None: """the ``input`` of a step corresponds to the name of the step that needs to be run before this step. In that case, we can look up the output directory of the input step to propagate it to the input directory of the current step. The function modifies ``step_config`` in place. Args: input_entry: value associated with the key ``input`` in the step configuration step: current step step_config: configuration of this step """ # first, check that `input_dir` or `input_subdirectory` was not # provided (which would be redundant with `input`) for property in ["input_dir", "input_subdirectory"]: if property in self: raise ValueError( f"Both `input` and `{property}` were provided for step {step}, " "which is redundant." ) if ":" in input_entry: input_entries = input_entry.split(":") if len(input_entries) != 2: raise SyntaxError( "The `input` entry should be in the format `<path>:<step> " f"but is: {input_entry}" ) input_path, input_step = input_entries else: input_path = None input_step = input_entry # if input path is provided, add it to the configuration if input_path: self.add_config(path_or_config=input_path, steps=[input_step]) else: # this step needs to be resolved beforehand self._resolve_step_config(step=input_step) # Deduce the input directory step_config["input_dir"] = self[input_step]["output_dir"] def _resolve_step_config(self, step: str) -> None: """This function * Deduces the ``input_dir`` from ``input`` or ``input_subdirectory`` * Deduces the ``output_dir`` from ``output_subdirectory`` or set it to be equal to the name of the step. This function is in-place: the changes are applied to the input ``step_config`` dictionary. Args: step: name of the step (e.g., ``embedding``, ``processing``) step_config: configuration dictionary of this step """ if step in self._unresolved_steps: step_config = self._configs[step] # If `input` is provided, use it to deduce the input directory `input_dir` # Remove `input` entry if it exists. # We could keep it since the latter provides more information than just # the `input_dir` input_entry: str | None = step_config.pop("input", None) if input_entry is not None: self._handle_input_entry( input_entry=input_entry, step=step, step_config=step_config ) # Transform `input_subdirectory` and `output_subdirectory` # into the actual `input_dir` and `output_dir` for inoutput in ["input", "output"]: if ( subdir := step_config.pop(f"{inoutput}_subdirectory", None) ) is not None: assert f"{inoutput}_dir" not in step_config, ( f"`{inoutput}_subdirectory` and `{inoutput}_dir` as both " f"the configuration of {step}, which might create a clash." ) step_config[f"{inoutput}_dir"] = op.join( self.data_experiment_dir, subdir, ) # if `output_dir` does not exist, create it through the name of the step if "output_dir" not in step_config: step_config["output_dir"] = op.join(self.data_experiment_dir, step) self._unresolved_steps.remove(step) def _resolve_common_config( self, common_config: StepConfigDict, path: str | None = None ) -> None: """This function * Adds ``experiment_name`` to the common configuration dictionary if it is missing, from the name of the pipeline configuration * Add ``detector`` to the common configuration dictionary if it is missing, assuming it is the first in the list of available detectors. Args: common_config: common configuration dictionary path: path to the current YAML configuration file (if available) raises: ValueError: no experiment name in the common configuration but not input path was provided. """ if "experiment_name" not in common_config: if path is not None: common_config["experiment_name"] = op.splitext(op.basename(path))[0] else: raise ValueError( "The `experiment_name` was not provided in the `common` " "configuration. Since the configuration was not loaded " "from a path, it cannot be derived from the name of the " "YAML configuration." ) if "detector" not in common_config: common_config["detector"] = cdirs.detectors[0] @property def dir_path(self) -> str | None: """Path to the directory the paths in `input` are expressed w.r.t.""" return self._dir_path @property def steps(self) -> typing.List[str]: return list(self.keys())
[docs] def dict(self) -> typing.Dict[str, typing.Dict[str, typing.Any]]: """Turn the experiment configuration dictionary into a regular dictionary of dictionaries. """ return {"common": self.common_config, **self._configs}
@property def common_config(self) -> typing.Dict[str, typing.Any]: """Common configuration dictionary""" return self._common_config @property def experiment_name(self) -> str: """Name of the experiment""" return self.common_config["experiment_name"] @property def data_experiment_dir(self) -> str: "Path to the dictionary that contains all the data of the given experiment." return op.join(cdirs.data_directory, self.experiment_name) @property def performance_dir(self) -> str: """Directory where""" return op.join(cdirs.performance_directory, self.experiment_name) @property def detector(self) -> str: """Detector the pipeline is applied to.""" return self._common_config["detector"] @property def required_test_dataset_names(self) -> typing.List[str]: return self.common_config["test_dataset_names"]
[docs] def get_test_batch_dir(self, step: str, test_dataset_name: str) -> str: return op.join( cdirs.data_directory, self.experiment_name, step, "test", test_dataset_name + "/", )
cdirs = CommonDirs()
[docs]def load_config(path_or_config: str | PipelineConfigDict) -> PipelineConfigDict: """Load the configuration if not already. Also replace ``input_subdirectory`` by ``input_dir`` and ``output_subdirectory`` by ``output_subdirectory`` in the loaded configuration. For this reason, please always load the configuration using this function. """ if isinstance(path_or_config, dict): return path_or_config else: return PipelineConfig(path_or_config=path_or_config).dict()
[docs]def get_performance_directory_experiment(path_or_config: str | dict) -> str: """Helper function to get the directory where to save plots and reports of metric performances. Args: path_or_config: configuration dictionary, or path to the YAML file that contains the configuration Returns: Path to the directory whereto save performance metric plots and reports """ config = load_config(path_or_config) performance_directory = os.path.join( cdirs.performance_directory, config["common"]["experiment_name"], ) os.makedirs(performance_directory, exist_ok=True) return performance_directory
[docs]def get_pipeline_config_path(experiment_name: str) -> str: """Get the path to the pipeline config YAML file. Args: experiment_name: name of the experiment Returns: Path where the YAML file that contains the configuration of ``experiment_name`` is stored. """ return resolve_relative_path( op.join("pipeline_configs", experiment_name + ".yaml"), folder_name="etx4velo" )
[docs]def get_detector_from_pipeline_config(path_or_config: str | dict) -> str: detector = load_config(path_or_config)["common"].get("detector") if detector is None: return cdirs.detectors[0] else: return detector
[docs]def get_detector_from_experiment_name(experiment_name: str) -> str: """Get the detector of an experimetn. Args: experiment_name: Name of an experiment Returns: Detector used in this experiment """ return get_detector_from_pipeline_config( path_or_config=get_pipeline_config_path(experiment_name) )