"""A module that defines the processing at the event-level.
"""
import typing
import os
import logging
import numpy as np
import numpy.typing as npt
import pandas as pd
import torch
from torch_geometric.data import Data
from .modulewise_edges import get_modulewise_edges
from .planewise_edges import get_planewise_edges, get_planewise_custom_edges
from .sortedwise_edges import get_sortedwise_edges
from .compute import compute_columns
[docs]def get_normalised_features(
hits: pd.DataFrame,
features: typing.List[str],
feature_means: npt.ArrayLike,
feature_scales: npt.ArrayLike,
) -> npt.NDArray:
"""Get the the normalised features from the dataframe of hits.
Args:
hits: Dataframe of hits that contains the features
features: list of the columns in the dataframe of ``hits``, which correspond to
the features
feature_means: Array of the means to substract the feature values to,
in order to "centralise" them
feature_scales: Array of the scales to divide the "centralised" feature values
by, so that their scale is around 1
"""
feature_means = np.asarray(feature_means)
feature_scales = np.asarray(feature_scales)
assert len(feature_means) == len(features), (
f"[len(features) == {len(features)}] != "
f"[len(feature_means) == {len(feature_means)}]"
)
assert len(feature_scales) == len(features), (
f"[len(features) == {len(features)}] != "
f"[len(feature_scales) == {len(feature_scales)}]"
)
array_features = hits[features].to_numpy()
return (array_features - feature_means) / feature_scales
def _get_source_target_columns(
columns: typing.List[str | typing.Dict[str, str]]
) -> typing.Tuple[typing.List[str], typing.List[str]]:
columns_source = []
columns_target = []
for column in columns:
if isinstance(column, dict):
first_key = next(iter(column.keys()))
columns_source.append(column[first_key])
columns_target.append(first_key)
else:
columns_source.append(column)
columns_target.append(column)
return columns_source, columns_target
[docs]def build_event(
truncated_path: str,
event_str: str,
features: typing.List[str],
feature_means: typing.List[float],
feature_scales: typing.List[float],
kept_hits_columns: typing.List[str | typing.Dict[str, str]],
kept_particles_columns: typing.List[str],
true_edges_column: str,
) -> Data:
"""Load the event, compute the necessary columns.
Args:
truncated path: path without the suffixes ``-particles.csv``
and ``hits_particles.csv``
feature_means: Array of the means to substract the feature values to,
in order to "centralise" them
feature_scales: Array of the scales to divide the "centralised" feature values
by, so that their scale is around 1
kept_hits_columns: Columns to keep, initially stored in the dataframe of hits
kept_particles_columns: Columns to keep, initially stored in the dataframe of
particles, but merged to the particles of hits
Returns:
PyTorch data object, which will be saved for the training or inference.
"""
# Load CSV files
particles = pd.read_parquet(truncated_path + "-particles.parquet")
hits_particles = pd.read_parquet(truncated_path + "-hits_particles.parquet")
kept_hits_columns_source, kept_hits_columns_target = _get_source_target_columns(
kept_hits_columns
)
merged_particles_columns = list(
set(
["particle_id"] # index
+ ["vx", "vy", "vz"] # module-wise true edges
# other columns to keep in the PyTorch data object
)
)
# Add MC truth information to the dataframe of hits
hits_particles = hits_particles.merge(
right=particles[merged_particles_columns],
on=["particle_id"],
how="left",
)
# In real data, we don't know about particle ID.
# Thus, we don't know which hit ID can have multiple particle ID
# -> restrain to unique hit IDs
unique_hits = hits_particles.drop_duplicates("hit_id").copy()
unique_hits.drop("particle_id", axis=1, inplace=True)
unique_hits["hit_idx"] = np.arange(unique_hits.shape[0])
# Whether the hit is fake
hits_particles["noisy"] = hits_particles["particle_id"] == 0
genuine_hit_ids = hits_particles[~hits_particles["noisy"]]["hit_id"].unique()
fake_hit_ids = hits_particles[
hits_particles["noisy"] & (~hits_particles["hit_id"].isin(genuine_hit_ids))
]["hit_id"].unique()
unique_hits["fake"] = unique_hits["hit_id"].isin(fake_hit_ids)
# Sort hits by planes if this is not already the case
if not unique_hits["plane"].is_monotonic_increasing:
unique_hits.sort_values("plane", inplace=True)
unique_hits["hit_idx"] = np.arange(unique_hits.shape[0])
# Compute columns that are not already defined
compute_columns(
hits=unique_hits,
columns=features,
)
# Find the true edges using the dataframe of hits-particles association
hits_particles = hits_particles.merge(
unique_hits[["hit_id", "hit_idx"]], on="hit_id", how="left"
)
if true_edges_column == "modulewise":
true_edge_indices = get_modulewise_edges(hits_particles)
elif true_edges_column == "planewise":
true_edge_indices = get_planewise_edges(hits_particles)
elif true_edges_column == "sortedwise":
true_edge_indices = get_sortedwise_edges(hits_particles)
elif true_edges_column == "planewise_xuvx":
true_edge_indices = get_planewise_custom_edges(hits_particles)
else:
raise ValueError(
f"`true_edges_column` is `{true_edges_column}`, which is not recognised."
)
# Turn the indices in the hits-particles association into hit indices
hit_particle_idx_to_new_hit_idx = hits_particles["hit_idx"].to_numpy()
true_edge_indices = hit_particle_idx_to_new_hit_idx[true_edge_indices]
true_edge_indices = np.unique(true_edge_indices, axis=1)
normalised_features = get_normalised_features(
unique_hits,
features=features,
feature_means=feature_means,
feature_scales=feature_scales,
)
kept_columns_source = (
# required columns
[
"hit_id", # matching
"fake",
]
# Other columns
+ kept_hits_columns_source
)
kept_columns_target = ["hit_id", "fake"] + kept_hits_columns_target
particle_ids_hit_indices = torch.from_numpy(
hits_particles[["particle_id", "hit_idx"]].to_numpy()
)
torch_data = Data(
x=torch.from_numpy(normalised_features).float(),
truncated_path=truncated_path, # To know for sure where the data come from
event_str=event_str, # for the file names
**{
column_target: torch.from_numpy(unique_hits[column_source].to_numpy())
for column_source, column_target in zip(
kept_columns_source, kept_columns_target
)
},
signal_true_edges=torch.from_numpy(true_edge_indices),
particle_id_hit_idx=particle_ids_hit_indices,
**{
"particle_"
+ particle_column: torch.from_numpy(particles[particle_column].to_numpy())
for particle_column in kept_particles_columns
},
)
if kept_particles_columns:
torch_data["unique_particle_id"] = torch.from_numpy(
particles["particle_id"].to_numpy()
)
return torch_data
[docs]def prepare_event(
truncated_path: str,
output_dir: str,
*args,
**kwargs,
):
"""Load one event saved during pre-processing and save it in the right format
in a PyTorch data object.
Args:
truncated_path: path of the input files, excluding ``-hit_particles.parquet``
and ``-particles.parquet``
output_dir: directory where to save all the processed PyTorch data
overwrite: whether to overwrite an existing PyTorch data pickle file
args, kwargs: passed to :py:func:`build_event`
"""
event_str = os.path.basename(truncated_path)[len("event") :] # Event number
outpath = os.path.join(output_dir, str(event_str)) # Output file
if not os.path.exists(outpath):
logging.debug(f"Preparing event {event_str}")
torch_data = build_event(truncated_path, event_str, *args, **kwargs)
with open(outpath, "wb") as pickle_file:
torch.save(torch_data, pickle_file)
else:
logging.debug(f"{event_str} already exists")