Source code for pipeline.utils.loaderutils.tracks
import typing
import os
import logging
from tqdm.auto import tqdm
import pandas as pd
import torch
from torch_geometric.data import Data
from utils.commonutils.ctests import get_required_test_dataset_names
from utils.commonutils.config import load_config, get_detector_from_pipeline_config
from .preprocessing import load_preprocessed_dataframes
from Preprocessing.preprocessing_paths import get_truncated_paths_for_partition
[docs]def get_tracks_from_batch(batch: Data) -> pd.DataFrame:
return pd.DataFrame(
{
"event_id": int(batch.event_str),
"hit_id": batch.hit_id,
"track_id": batch.labels,
}
)
[docs]def load_tracks_event(input_path: str) -> pd.DataFrame:
"""Load the dataframe of tracks out of track building.
Args:
input_path: Path to the PyTorch Geometric data pickle file that contains
the graph together with the reconstructed tracks
Returns:
Dataframe with columns ``event_id``, ``hit_id``, ``track_id``
"""
graph = torch.load(input_path, map_location="cpu")
df_tracks = get_tracks_from_batch(graph)
return df_tracks
[docs]def load_tracks(
input_dir: str,
) -> pd.DataFrame:
"""Load the tracks from graphs.
Args:
tracks_input_dir: input directory where the PyTorch Data objects are saved,
which contains the reconstructed tracks.
Returns:
Dataframe with columns ``event_id``, ``hit_id``, ``track_id``,
for all the events in ``input_dir``.
"""
logging.info(f"Load tracks in {input_dir}.")
#: List of dataframes of tracks (one dataframe = one event)
list_df_tracks = []
# Loop over the graphs (one graph = one event)
input_paths = [
entry.path
for entry in os.scandir(input_dir)
if entry.is_file and entry.name != "done"
]
for input_path in tqdm(input_paths):
# Load the dataframe of tracks
df_tracks_event = load_tracks_event(input_path=input_path)
list_df_tracks.append(df_tracks_event)
# Return concatenated dataframe and array of event IDs
return pd.concat(list_df_tracks).drop_duplicates()
[docs]def load_tracks_preprocessed_dataframes_given_partition(
path_or_config: str | dict,
partition: str,
suffix: str = "",
) -> typing.Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
config = load_config(path_or_config=path_or_config)
detector = get_detector_from_pipeline_config(config)
# Load tracks
tracks_input_dir = get_tracks_input_directory(
config, partition=partition, suffix=suffix
)
df_tracks = load_tracks(input_dir=tracks_input_dir)
# Load dataframe of hits-particles association and dataframe of particles
truncated_paths = get_truncated_paths_for_partition(
path_or_config=config, partition=partition
)
if detector == "velo":
coordinates = ["x", "y", "z"]
elif detector == "scifi" or detector == "scifi_xz":
coordinates = ["zatyeq0", "xatyeq0", "dxdy"]
else:
raise TypeError(f"Detector {detector} is not supported")
df_hits_particles = load_preprocessed_dataframes(
truncated_paths=truncated_paths,
ending="-hits_particles",
columns=["particle_id", "hit_id", "plane"] + coordinates,
)
df_particles = load_preprocessed_dataframes(
truncated_paths=truncated_paths, ending="-particles"
)
return df_tracks, df_hits_particles, df_particles