from __future__ import annotations
import typing
import os
import logging
from functools import reduce
from pathlib import Path
import yaml
from joblib import Parallel, delayed
from tqdm.auto import tqdm
from tqdm.asyncio import tqdm_asyncio
import numpy as np
import pandas as pd
from . import process_custom
from .inputloader import get_indirs
from utils.loaderutils.preprocessing import load_dataframes
from utils.scriptutils.loghandler import configure_logger
configure_logger()
[docs]def enough_true_hits(
event_hits_particles: pd.DataFrame,
num_true_hits_threshold: int,
event_id_str: str | None = None,
num_events: int | None = None,
required_num_events: int | None = None,
) -> bool:
"""Check whether an event has enough true hits to be saved.
Args:
event_hits_particles: DataFrame of all hits for an event.
num_true_hits_threshold: Minimum number of true hits required for the event
to be saved.
event_id_str: String representation of the event ID.
num_events: The current number of saved events.
required_num_events: The desired number of saved events.
Returns:
``True`` if the event has enough true hits to be saved, ``False`` otherwise.
Notes:
The function checks the number of true hits in an event and compares it
with the given threshold.
If the number of true hits is equal to 0, the event is discarded as
it contains only fake hits.
If the number of true hits is below the given threshold,
the event is discarded as it does not contain enough true hits.
Otherwise, the event is saved as the function returns ``True``.
"""
#: Number of true hits for this event
num_true_hits = len(
event_hits_particles[event_hits_particles["particle_id"] != 0].drop_duplicates(
subset=["hit_id"]
)
)
if num_true_hits == 0:
logging.debug(f"Discarding event {event_id_str}, contains only fake hits.")
return False
elif num_true_hits < num_true_hits_threshold:
logging.debug(
f"Discarding event {event_id_str}, contains only {num_true_hits} true hits",
f"below {num_true_hits_threshold} threshold",
)
return False
else:
logging.debug(
f"Saving event {event_id_str}, {num_events}/{required_num_events},",
f"contains {num_true_hits} true hits.",
)
return True
[docs]def apply_custom_processing(
hits_particles: pd.DataFrame,
particles: pd.DataFrame,
processing: str | typing.Sequence[str] | None = None,
) -> typing.Tuple[pd.DataFrame, pd.DataFrame]:
"""Apply custom processing to the dataframe of hits-particles and particles.
The custom processing functions are defined in
:py:mod:`pipeline.Preprocessing.process_custom`.
Args:
hits_particles: dataframe of hits-particles
particles: dataframe of particles
processing: Name(s) of the processing function(s) to apply to the dataframes.
The processing functions as defined in
:py:mod:`pipeline.Preprocessing.process_custom`
Returns:
Processed dataframe of hits-particles and particles
"""
if processing is None:
return hits_particles, particles
else:
# Get name of processing functions
if isinstance(processing, str):
processing_fct_names = [processing]
else:
processing_fct_names = [
str(processing_fct_name) for processing_fct_name in processing
]
if processing_fct_names:
# Apply processing
hits_particles_noise = hits_particles[hits_particles["particle_id"] == 0]
hits_particles_no_noise = hits_particles[hits_particles["particle_id"] != 0]
n_particles = particles.shape[0]
for processing_fct_name in processing_fct_names:
logging.info(f"Apply `{processing_fct_name}`")
processing_fct: process_custom.SelectionFunction = getattr(
process_custom, processing_fct_name
)
hits_particles_no_noise, particles = processing_fct(
hits_particles_no_noise, particles
)
logging.info(
f"{processing_fct_name}: {particles.shape[0]/n_particles:.1%} "
+ "particles kept"
)
n_particles = particles.shape[0]
hits_particles = pd.concat(
(hits_particles_noise, hits_particles_no_noise), axis=0
)
return hits_particles, particles
[docs]def dump_if_enough_hits(
event_hits_particles: pd.DataFrame,
event_particles: pd.DataFrame,
event_id: int,
output_dir: str,
num_true_hits_threshold: int | None = None,
pbar: tqdm_asyncio | None = None,
) -> bool:
#: String representation of the event ID
event_id_str = str(event_id).zfill(18)
no_hits = event_hits_particles.shape[0] == 0
if not no_hits and ( # skip events with no hits
(num_true_hits_threshold is None)
or enough_true_hits(
event_hits_particles=event_hits_particles,
num_true_hits_threshold=num_true_hits_threshold,
event_id_str=event_id_str,
)
):
# Save
event_hits_particles.to_parquet(
f"{output_dir}/event{event_id_str}-hits_particles.parquet",
)
event_particles.to_parquet(
f"{output_dir}/event{event_id_str}-particles.parquet",
)
if pbar is not None:
pbar.update()
return True
else:
return False
[docs]def preprocess(
input_dir: str,
output_dir: str,
hits_particles_filename: str | None = None,
particles_filename: str | None = None,
subdirs: int | str | typing.List[str] | None = None,
n_events: int = -1,
processing: str | typing.List[str] | None = None,
num_true_hits_threshold: int | None = None,
hits_particles_columns: typing.List[str] | None = None,
particles_columns: typing.List[str] | None = None,
n_workers: int = 1,
raise_enough_events: bool = True,
):
"""Preprocess the first ``n_events`` events in the input files and save the events
in separate parquet files called ``event{event_id}-hits_particles.parquet``
and ``event{event_id}-hits.parquet``.
Args:
input_dir: A single input directory if ``subdirs`` is ``None``,
or the main directory where sub-directories are
output_dir: the output directory where the parquet files are saved.
hits_particles_filename: Name of the hits-particles file name
(without the ``.parquet.lz4`` extension). Default is ``hits_velo``.
particles_filename: Name of the particle file name
(without the ``.parquet.lz4`` extension). Default is ``mc_particles``.
subdirs:
* If ``subdirs`` is None, there is a single input directory, ``input_dir``
* If ``subdirs`` is a string or a list of strings, they specify \
the sub-directories with respect to ``input_dir``. If ``input_dir`` \
is ``None``, then they are the (list of) input directories directly, which \
can be useful if the input directories are not at the same location \
(even though it is discouraged)
* If ``subdirs`` is an integer, it corresponds to the the name of the last \
sub-directory to consider (i.e., from 0 to ``subdirs``). If ``subdirs`` \
is ``-1``, all the sub-directories are considered as input.
* If ``subdirs`` is a dictionary, the keys ``start`` and ``stop`` specify \
the first and last sub-directories to consider as input.
n_events: Number of events to save. For ``n_workers`` higher than 1, more events
may be produced.
processing: Name(s) of the processing function(s) to apply to the dataframes.
The processing functions as defined in
:py:mod:`pipeline.Preprocessing.process_custom`
num_true_hits_threshold: Minimum number of true hits required for the event
to be saved.
hits_particles_columns: columns to load for the dataframe of hits
and the hits-particles association information
particles_columns: columns to load for the dataframe of particles
n_workers: If greater than 1, the input dataframes are all loaded and processed
in parallel.
raise_enough_events: whether to raise an error if not any events where
generated.
"""
pd.set_option("chained_assignment", None) # disable chaine assignment warning
os.makedirs(output_dir, exist_ok=True)
logging.info(f"Preprocessing: output will be written in {output_dir}")
def load_and_process_dataframes_reduced(indir: str) -> typing.List[pd.DataFrame]:
"""Load the dataframes of hits-particles and particles located in ``indir``,
and apply the custom processing functions defined by ``processing``.
Args:
indir: input directory where the dataframes of hits-particles and particles
are saved
Returns:
List of two dataframes: the dataframe of hits-particles and the dataframe
of particles
"""
logging.info(f"Load dataframes in {indir}")
hits_particles, particles = load_dataframes(
indir=indir,
hits_particles_filename=hits_particles_filename,
particles_filename=particles_filename,
particles_columns=particles_columns,
hits_particles_columns=hits_particles_columns,
)
hits_particles, particles = apply_custom_processing(
hits_particles=hits_particles,
particles=particles,
processing=processing,
)
return [hits_particles, particles]
def load_and_dump_if_enough_hits(indir: str, output_dir: str) -> int:
dataframes = load_and_process_dataframes_reduced(indir=indir)
grouped_by_dataframes = [
dataframe.groupby(by=["event_id"]) for dataframe in dataframes
]
event_ids = reduce(
np.intersect1d, [dataframe["event_id"].unique() for dataframe in dataframes]
)
n_output_saved = 0
for event_id in event_ids:
current_dataframes = [
grouped_by_dataframe.get_group(event_id)
for grouped_by_dataframe in grouped_by_dataframes
]
event_hits_particles, event_particles = current_dataframes
n_output_saved += dump_if_enough_hits(
event_hits_particles=event_hits_particles,
event_particles=event_particles,
event_id=event_id,
output_dir=output_dir,
num_true_hits_threshold=num_true_hits_threshold,
)
return n_output_saved
def valid_indir(indir: str) -> bool:
log_path = os.path.join(indir, "log.yaml")
if os.path.exists(log_path):
with open(log_path, "r") as log_file:
log_status = yaml.load(log_file, Loader=yaml.SafeLoader)
return log_status["returncode"] == 0
else:
print(indir, "invalid")
return False
n_required_events = np.inf if n_events == -1 else n_events
indirs = get_indirs(input_dir=input_dir, subdirs=subdirs, condition=valid_indir)
if len(indirs) == 0:
raise ValueError("No input directories.")
list_n_output_saved: typing.List[int] = Parallel( # type: ignore
n_jobs=n_workers, batch_size=1 # type: ignore
)(
delayed(load_and_dump_if_enough_hits)(indir=indir, output_dir=output_dir)
for indir in tqdm(indirs)
)
n_output_saved = sum(list_n_output_saved)
pd.set_option("chained_assignment", "warn") # re-enable chained-assignment warning
Path(os.path.join(output_dir, "done")).touch()
if raise_enough_events and n_output_saved < n_required_events:
raise Exception(
"Not enough events found with more than "
f"{num_true_hits_threshold} true hits"
)