Source code for pipeline.Preprocessing.inputloader

"""A module that defines the input loader that allow to loop over events scattered
in different parquet or CSV files.
"""

import typing
import os


[docs]def get_indirs( input_dir: str | None = None, subdirs: int | str | typing.List[str] | typing.Dict[str, int] | None = None, condition: typing.Callable[[str], bool] | None = None, ): """Get the input directories that can be used as input of the preprocessing. Args: input_dir: A single input directory if ``subdirs`` is ``None``, or the main directory where sub-directories are subdirs: * If ``subdirs`` is None, there is a single input directory, ``input_dir`` * If ``subdirs`` is a string or a list of strings, they specify \ the sub-directories with respect to ``input_dir``. If ``input_dir`` \ is ``None``, then they are the (list of) input directories directly, which \ can be useful if the input directories are not at the same location \ (even though it is discouraged) * If ``subdirs`` is an integer, it corresponds to the the name of the last \ sub-directory to consider (i.e., from 0 to ``subdirs``). If ``subdirs`` \ is ``-1``, all the sub-directories are considered as input. * If ``subdirs`` is a dictionary, the keys ``start`` and ``stop`` specify \ the first and last sub-directories to consider as input. Returns: List of input directories that can be considered. """ if input_dir is None: if isinstance(subdirs, str): return [subdirs] elif isinstance(subdirs, list): return [str(subdir) for subdir in subdirs] else: raise TypeError( "`input_dir` is `None` but `subdirs` is neither a string nor " "a list of strings, so the input directories of the preprocessing " "cannot be determined." ) else: # Get the list of all the sub-directories inside ``input_dir`` # Filter this list according to ``subdirs`` if subdirs is None: return [input_dir] elif isinstance(subdirs, (int, dict)): available_subdirs = sorted( [ int(file_or_dir.name) for file_or_dir in os.scandir(input_dir) if file_or_dir.is_dir() and (condition is None or condition(file_or_dir.path)) ] ) if subdirs == -1: final_subdirs = available_subdirs else: if isinstance(subdirs, int): start = 0 stop = subdirs else: # dict start = subdirs.get("start", 0) stop = subdirs["stop"] assert ( stop >= start ), f"`start` ({start}) is strictly higher than `stop ({stop})" final_subdirs = [ subdir for subdir in available_subdirs if subdir >= start and subdir <= stop ] elif isinstance(subdirs, str): final_subdirs = [subdirs] elif isinstance(subdirs, list): final_subdirs = subdirs else: raise ValueError( f"`input_dir` is not `None` and `subdirs` is `{subdirs}`, which are " "not valid inputs." ) return [os.path.join(input_dir, str(subdir)) for subdir in final_subdirs]