Source code for pipeline.Processing.processing

"""A module that defines the processing at the event-level.
"""

import typing
import os
import logging

import numpy as np
import numpy.typing as npt
import pandas as pd
import torch
from torch_geometric.data import Data

from .modulewise_edges import get_modulewise_edges
from .planewise_edges import get_planewise_edges, get_planewise_custom_edges
from .sortedwise_edges import get_sortedwise_edges
from .compute import compute_columns


[docs]def get_normalised_features( hits: pd.DataFrame, features: typing.List[str], feature_means: npt.ArrayLike, feature_scales: npt.ArrayLike, ) -> npt.NDArray: """Get the the normalised features from the dataframe of hits. Args: hits: Dataframe of hits that contains the features features: list of the columns in the dataframe of ``hits``, which correspond to the features feature_means: Array of the means to substract the feature values to, in order to "centralise" them feature_scales: Array of the scales to divide the "centralised" feature values by, so that their scale is around 1 """ feature_means = np.asarray(feature_means) feature_scales = np.asarray(feature_scales) assert len(feature_means) == len(features), ( f"[len(features) == {len(features)}] != " f"[len(feature_means) == {len(feature_means)}]" ) assert len(feature_scales) == len(features), ( f"[len(features) == {len(features)}] != " f"[len(feature_scales) == {len(feature_scales)}]" ) array_features = hits[features].to_numpy() return (array_features - feature_means) / feature_scales
def _get_source_target_columns( columns: typing.List[str | typing.Dict[str, str]] ) -> typing.Tuple[typing.List[str], typing.List[str]]: columns_source = [] columns_target = [] for column in columns: if isinstance(column, dict): first_key = next(iter(column.keys())) columns_source.append(column[first_key]) columns_target.append(first_key) else: columns_source.append(column) columns_target.append(column) return columns_source, columns_target
[docs]def build_event( truncated_path: str, event_str: str, features: typing.List[str], feature_means: typing.List[float], feature_scales: typing.List[float], kept_hits_columns: typing.List[str | typing.Dict[str, str]], kept_particles_columns: typing.List[str], true_edges_column: str, ) -> Data: """Load the event, compute the necessary columns. Args: truncated path: path without the suffixes ``-particles.csv`` and ``hits_particles.csv`` feature_means: Array of the means to substract the feature values to, in order to "centralise" them feature_scales: Array of the scales to divide the "centralised" feature values by, so that their scale is around 1 kept_hits_columns: Columns to keep, initially stored in the dataframe of hits kept_particles_columns: Columns to keep, initially stored in the dataframe of particles, but merged to the particles of hits Returns: PyTorch data object, which will be saved for the training or inference. """ # Load CSV files particles = pd.read_parquet(truncated_path + "-particles.parquet") hits_particles = pd.read_parquet(truncated_path + "-hits_particles.parquet") kept_hits_columns_source, kept_hits_columns_target = _get_source_target_columns( kept_hits_columns ) merged_particles_columns = list( set( ["particle_id"] # index + ["vx", "vy", "vz"] # module-wise true edges # other columns to keep in the PyTorch data object ) ) # Add MC truth information to the dataframe of hits hits_particles = hits_particles.merge( right=particles[merged_particles_columns], on=["particle_id"], how="left", ) # In real data, we don't know about particle ID. # Thus, we don't know which hit ID can have multiple particle ID # -> restrain to unique hit IDs unique_hits = hits_particles.drop_duplicates("hit_id").copy() unique_hits.drop("particle_id", axis=1, inplace=True) unique_hits["hit_idx"] = np.arange(unique_hits.shape[0]) # Whether the hit is fake hits_particles["noisy"] = hits_particles["particle_id"] == 0 genuine_hit_ids = hits_particles[~hits_particles["noisy"]]["hit_id"].unique() fake_hit_ids = hits_particles[ hits_particles["noisy"] & (~hits_particles["hit_id"].isin(genuine_hit_ids)) ]["hit_id"].unique() unique_hits["fake"] = unique_hits["hit_id"].isin(fake_hit_ids) # Sort hits by planes if this is not already the case if not unique_hits["plane"].is_monotonic_increasing: unique_hits.sort_values("plane", inplace=True) unique_hits["hit_idx"] = np.arange(unique_hits.shape[0]) # Compute columns that are not already defined compute_columns( hits=unique_hits, columns=features, ) # Find the true edges using the dataframe of hits-particles association hits_particles = hits_particles.merge( unique_hits[["hit_id", "hit_idx"]], on="hit_id", how="left" ) if true_edges_column == "modulewise": true_edge_indices = get_modulewise_edges(hits_particles) elif true_edges_column == "planewise": true_edge_indices = get_planewise_edges(hits_particles) elif true_edges_column == "sortedwise": true_edge_indices = get_sortedwise_edges(hits_particles) elif true_edges_column == "planewise_xuvx": true_edge_indices = get_planewise_custom_edges(hits_particles) else: raise ValueError( f"`true_edges_column` is `{true_edges_column}`, which is not recognised." ) # Turn the indices in the hits-particles association into hit indices hit_particle_idx_to_new_hit_idx = hits_particles["hit_idx"].to_numpy() true_edge_indices = hit_particle_idx_to_new_hit_idx[true_edge_indices] true_edge_indices = np.unique(true_edge_indices, axis=1) normalised_features = get_normalised_features( unique_hits, features=features, feature_means=feature_means, feature_scales=feature_scales, ) kept_columns_source = ( # required columns [ "hit_id", # matching "fake", ] # Other columns + kept_hits_columns_source ) kept_columns_target = ["hit_id", "fake"] + kept_hits_columns_target particle_ids_hit_indices = torch.from_numpy( hits_particles[["particle_id", "hit_idx"]].to_numpy() ) torch_data = Data( x=torch.from_numpy(normalised_features).float(), truncated_path=truncated_path, # To know for sure where the data come from event_str=event_str, # for the file names **{ column_target: torch.from_numpy(unique_hits[column_source].to_numpy()) for column_source, column_target in zip( kept_columns_source, kept_columns_target ) }, signal_true_edges=torch.from_numpy(true_edge_indices), particle_id_hit_idx=particle_ids_hit_indices, **{ "particle_" + particle_column: torch.from_numpy(particles[particle_column].to_numpy()) for particle_column in kept_particles_columns }, ) if kept_particles_columns: torch_data["unique_particle_id"] = torch.from_numpy( particles["particle_id"].to_numpy() ) return torch_data
[docs]def prepare_event( truncated_path: str, output_dir: str, *args, **kwargs, ): """Load one event saved during pre-processing and save it in the right format in a PyTorch data object. Args: truncated_path: path of the input files, excluding ``-hit_particles.parquet`` and ``-particles.parquet`` output_dir: directory where to save all the processed PyTorch data overwrite: whether to overwrite an existing PyTorch data pickle file args, kwargs: passed to :py:func:`build_event` """ event_str = os.path.basename(truncated_path)[len("event") :] # Event number outpath = os.path.join(output_dir, str(event_str)) # Output file if not os.path.exists(outpath): logging.debug(f"Preparing event {event_str}") torch_data = build_event(truncated_path, event_str, *args, **kwargs) with open(outpath, "wb") as pickle_file: torch.save(torch_data, pickle_file) else: logging.debug(f"{event_str} already exists")