Source code for pipeline.Preprocessing.preprocessing

from __future__ import annotations
import typing
import os
import logging
from functools import reduce
from pathlib import Path

import yaml
from joblib import Parallel, delayed
from tqdm.auto import tqdm
from tqdm.asyncio import tqdm_asyncio
import numpy as np
import pandas as pd

from . import process_custom
from .inputloader import get_indirs
from utils.loaderutils.preprocessing import load_dataframes
from utils.scriptutils.loghandler import configure_logger

configure_logger()


[docs]def enough_true_hits( event_hits_particles: pd.DataFrame, num_true_hits_threshold: int, event_id_str: str | None = None, num_events: int | None = None, required_num_events: int | None = None, ) -> bool: """Check whether an event has enough true hits to be saved. Args: event_hits_particles: DataFrame of all hits for an event. num_true_hits_threshold: Minimum number of true hits required for the event to be saved. event_id_str: String representation of the event ID. num_events: The current number of saved events. required_num_events: The desired number of saved events. Returns: ``True`` if the event has enough true hits to be saved, ``False`` otherwise. Notes: The function checks the number of true hits in an event and compares it with the given threshold. If the number of true hits is equal to 0, the event is discarded as it contains only fake hits. If the number of true hits is below the given threshold, the event is discarded as it does not contain enough true hits. Otherwise, the event is saved as the function returns ``True``. """ #: Number of true hits for this event num_true_hits = len( event_hits_particles[event_hits_particles["particle_id"] != 0].drop_duplicates( subset=["hit_id"] ) ) if num_true_hits == 0: logging.debug(f"Discarding event {event_id_str}, contains only fake hits.") return False elif num_true_hits < num_true_hits_threshold: logging.debug( f"Discarding event {event_id_str}, contains only {num_true_hits} true hits", f"below {num_true_hits_threshold} threshold", ) return False else: logging.debug( f"Saving event {event_id_str}, {num_events}/{required_num_events},", f"contains {num_true_hits} true hits.", ) return True
[docs]def apply_custom_processing( hits_particles: pd.DataFrame, particles: pd.DataFrame, processing: str | typing.Sequence[str] | None = None, ) -> typing.Tuple[pd.DataFrame, pd.DataFrame]: """Apply custom processing to the dataframe of hits-particles and particles. The custom processing functions are defined in :py:mod:`pipeline.Preprocessing.process_custom`. Args: hits_particles: dataframe of hits-particles particles: dataframe of particles processing: Name(s) of the processing function(s) to apply to the dataframes. The processing functions as defined in :py:mod:`pipeline.Preprocessing.process_custom` Returns: Processed dataframe of hits-particles and particles """ if processing is None: return hits_particles, particles else: # Get name of processing functions if isinstance(processing, str): processing_fct_names = [processing] else: processing_fct_names = [ str(processing_fct_name) for processing_fct_name in processing ] if processing_fct_names: # Apply processing hits_particles_noise = hits_particles[hits_particles["particle_id"] == 0] hits_particles_no_noise = hits_particles[hits_particles["particle_id"] != 0] n_particles = particles.shape[0] for processing_fct_name in processing_fct_names: logging.info(f"Apply `{processing_fct_name}`") processing_fct: process_custom.SelectionFunction = getattr( process_custom, processing_fct_name ) hits_particles_no_noise, particles = processing_fct( hits_particles_no_noise, particles ) logging.info( f"{processing_fct_name}: {particles.shape[0]/n_particles:.1%} " + "particles kept" ) n_particles = particles.shape[0] hits_particles = pd.concat( (hits_particles_noise, hits_particles_no_noise), axis=0 ) return hits_particles, particles
[docs]def dump_if_enough_hits( event_hits_particles: pd.DataFrame, event_particles: pd.DataFrame, event_id: int, output_dir: str, num_true_hits_threshold: int | None = None, pbar: tqdm_asyncio | None = None, ) -> bool: #: String representation of the event ID event_id_str = str(event_id).zfill(18) no_hits = event_hits_particles.shape[0] == 0 if not no_hits and ( # skip events with no hits (num_true_hits_threshold is None) or enough_true_hits( event_hits_particles=event_hits_particles, num_true_hits_threshold=num_true_hits_threshold, event_id_str=event_id_str, ) ): # Save event_hits_particles.to_parquet( f"{output_dir}/event{event_id_str}-hits_particles.parquet", ) event_particles.to_parquet( f"{output_dir}/event{event_id_str}-particles.parquet", ) if pbar is not None: pbar.update() return True else: return False
[docs]def preprocess( input_dir: str, output_dir: str, hits_particles_filename: str | None = None, particles_filename: str | None = None, subdirs: int | str | typing.List[str] | None = None, n_events: int = -1, processing: str | typing.List[str] | None = None, num_true_hits_threshold: int | None = None, hits_particles_columns: typing.List[str] | None = None, particles_columns: typing.List[str] | None = None, n_workers: int = 1, raise_enough_events: bool = True, ): """Preprocess the first ``n_events`` events in the input files and save the events in separate parquet files called ``event{event_id}-hits_particles.parquet`` and ``event{event_id}-hits.parquet``. Args: input_dir: A single input directory if ``subdirs`` is ``None``, or the main directory where sub-directories are output_dir: the output directory where the parquet files are saved. hits_particles_filename: Name of the hits-particles file name (without the ``.parquet.lz4`` extension). Default is ``hits_velo``. particles_filename: Name of the particle file name (without the ``.parquet.lz4`` extension). Default is ``mc_particles``. subdirs: * If ``subdirs`` is None, there is a single input directory, ``input_dir`` * If ``subdirs`` is a string or a list of strings, they specify \ the sub-directories with respect to ``input_dir``. If ``input_dir`` \ is ``None``, then they are the (list of) input directories directly, which \ can be useful if the input directories are not at the same location \ (even though it is discouraged) * If ``subdirs`` is an integer, it corresponds to the the name of the last \ sub-directory to consider (i.e., from 0 to ``subdirs``). If ``subdirs`` \ is ``-1``, all the sub-directories are considered as input. * If ``subdirs`` is a dictionary, the keys ``start`` and ``stop`` specify \ the first and last sub-directories to consider as input. n_events: Number of events to save. For ``n_workers`` higher than 1, more events may be produced. processing: Name(s) of the processing function(s) to apply to the dataframes. The processing functions as defined in :py:mod:`pipeline.Preprocessing.process_custom` num_true_hits_threshold: Minimum number of true hits required for the event to be saved. hits_particles_columns: columns to load for the dataframe of hits and the hits-particles association information particles_columns: columns to load for the dataframe of particles n_workers: If greater than 1, the input dataframes are all loaded and processed in parallel. raise_enough_events: whether to raise an error if not any events where generated. """ pd.set_option("chained_assignment", None) # disable chaine assignment warning os.makedirs(output_dir, exist_ok=True) logging.info(f"Preprocessing: output will be written in {output_dir}") def load_and_process_dataframes_reduced(indir: str) -> typing.List[pd.DataFrame]: """Load the dataframes of hits-particles and particles located in ``indir``, and apply the custom processing functions defined by ``processing``. Args: indir: input directory where the dataframes of hits-particles and particles are saved Returns: List of two dataframes: the dataframe of hits-particles and the dataframe of particles """ logging.info(f"Load dataframes in {indir}") hits_particles, particles = load_dataframes( indir=indir, hits_particles_filename=hits_particles_filename, particles_filename=particles_filename, particles_columns=particles_columns, hits_particles_columns=hits_particles_columns, ) hits_particles, particles = apply_custom_processing( hits_particles=hits_particles, particles=particles, processing=processing, ) return [hits_particles, particles] def load_and_dump_if_enough_hits(indir: str, output_dir: str) -> int: dataframes = load_and_process_dataframes_reduced(indir=indir) grouped_by_dataframes = [ dataframe.groupby(by=["event_id"]) for dataframe in dataframes ] event_ids = reduce( np.intersect1d, [dataframe["event_id"].unique() for dataframe in dataframes] ) n_output_saved = 0 for event_id in event_ids: current_dataframes = [ grouped_by_dataframe.get_group(event_id) for grouped_by_dataframe in grouped_by_dataframes ] event_hits_particles, event_particles = current_dataframes n_output_saved += dump_if_enough_hits( event_hits_particles=event_hits_particles, event_particles=event_particles, event_id=event_id, output_dir=output_dir, num_true_hits_threshold=num_true_hits_threshold, ) return n_output_saved def valid_indir(indir: str) -> bool: log_path = os.path.join(indir, "log.yaml") if os.path.exists(log_path): with open(log_path, "r") as log_file: log_status = yaml.load(log_file, Loader=yaml.SafeLoader) return log_status["returncode"] == 0 else: print(indir, "invalid") return False n_required_events = np.inf if n_events == -1 else n_events indirs = get_indirs(input_dir=input_dir, subdirs=subdirs, condition=valid_indir) if len(indirs) == 0: raise ValueError("No input directories.") list_n_output_saved: typing.List[int] = Parallel( # type: ignore n_jobs=n_workers, batch_size=1 # type: ignore )( delayed(load_and_dump_if_enough_hits)(indir=indir, output_dir=output_dir) for indir in tqdm(indirs) ) n_output_saved = sum(list_n_output_saved) pd.set_option("chained_assignment", "warn") # re-enable chained-assignment warning Path(os.path.join(output_dir, "done")).touch() if raise_enough_events and n_output_saved < n_required_events: raise Exception( "Not enough events found with more than " f"{num_true_hits_threshold} true hits" )