Source code for pipeline.Preprocessing.inputloader
"""A module that defines the input loader that allow to loop over events scattered
in different parquet or CSV files.
"""
import typing
import os
[docs]def get_indirs(
input_dir: str | None = None,
subdirs: int | str | typing.List[str] | typing.Dict[str, int] | None = None,
condition: typing.Callable[[str], bool] | None = None,
):
"""Get the input directories that can be used as input of the preprocessing.
Args:
input_dir: A single input directory if ``subdirs`` is ``None``,
or the main directory where sub-directories are
subdirs:
* If ``subdirs`` is None, there is a single input directory, ``input_dir``
* If ``subdirs`` is a string or a list of strings, they specify \
the sub-directories with respect to ``input_dir``. If ``input_dir`` \
is ``None``, then they are the (list of) input directories directly, which \
can be useful if the input directories are not at the same location \
(even though it is discouraged)
* If ``subdirs`` is an integer, it corresponds to the the name of the last \
sub-directory to consider (i.e., from 0 to ``subdirs``). If ``subdirs`` \
is ``-1``, all the sub-directories are considered as input.
* If ``subdirs`` is a dictionary, the keys ``start`` and ``stop`` specify \
the first and last sub-directories to consider as input.
Returns:
List of input directories that can be considered.
"""
if input_dir is None:
if isinstance(subdirs, str):
return [subdirs]
elif isinstance(subdirs, list):
return [str(subdir) for subdir in subdirs]
else:
raise TypeError(
"`input_dir` is `None` but `subdirs` is neither a string nor "
"a list of strings, so the input directories of the preprocessing "
"cannot be determined."
)
else:
# Get the list of all the sub-directories inside ``input_dir``
# Filter this list according to ``subdirs``
if subdirs is None:
return [input_dir]
elif isinstance(subdirs, (int, dict)):
available_subdirs = sorted(
[
int(file_or_dir.name)
for file_or_dir in os.scandir(input_dir)
if file_or_dir.is_dir()
and (condition is None or condition(file_or_dir.path))
]
)
if subdirs == -1:
final_subdirs = available_subdirs
else:
if isinstance(subdirs, int):
start = 0
stop = subdirs
else: # dict
start = subdirs.get("start", 0)
stop = subdirs["stop"]
assert (
stop >= start
), f"`start` ({start}) is strictly higher than `stop ({stop})"
final_subdirs = [
subdir
for subdir in available_subdirs
if subdir >= start and subdir <= stop
]
elif isinstance(subdirs, str):
final_subdirs = [subdirs]
elif isinstance(subdirs, list):
final_subdirs = subdirs
else:
raise ValueError(
f"`input_dir` is not `None` and `subdirs` is `{subdirs}`, which are "
"not valid inputs."
)
return [os.path.join(input_dir, str(subdir)) for subdir in final_subdirs]