Source code for pipeline.utils.loaderutils.tracks

import typing
import os
import logging

from tqdm.auto import tqdm
import pandas as pd
import torch
from torch_geometric.data import Data


from utils.commonutils.ctests import get_required_test_dataset_names
from utils.commonutils.config import load_config, get_detector_from_pipeline_config
from .preprocessing import load_preprocessed_dataframes
from Preprocessing.preprocessing_paths import get_truncated_paths_for_partition


[docs]def get_tracks_from_batch(batch: Data) -> pd.DataFrame: return pd.DataFrame( { "event_id": int(batch.event_str), "hit_id": batch.hit_id, "track_id": batch.labels, } )
[docs]def load_tracks_event(input_path: str) -> pd.DataFrame: """Load the dataframe of tracks out of track building. Args: input_path: Path to the PyTorch Geometric data pickle file that contains the graph together with the reconstructed tracks Returns: Dataframe with columns ``event_id``, ``hit_id``, ``track_id`` """ graph = torch.load(input_path, map_location="cpu") df_tracks = get_tracks_from_batch(graph) return df_tracks
[docs]def load_tracks( input_dir: str, ) -> pd.DataFrame: """Load the tracks from graphs. Args: tracks_input_dir: input directory where the PyTorch Data objects are saved, which contains the reconstructed tracks. Returns: Dataframe with columns ``event_id``, ``hit_id``, ``track_id``, for all the events in ``input_dir``. """ logging.info(f"Load tracks in {input_dir}.") #: List of dataframes of tracks (one dataframe = one event) list_df_tracks = [] # Loop over the graphs (one graph = one event) input_paths = [ entry.path for entry in os.scandir(input_dir) if entry.is_file and entry.name != "done" ] for input_path in tqdm(input_paths): # Load the dataframe of tracks df_tracks_event = load_tracks_event(input_path=input_path) list_df_tracks.append(df_tracks_event) # Return concatenated dataframe and array of event IDs return pd.concat(list_df_tracks).drop_duplicates()
[docs]def get_tracks_input_directory( path_or_config: str | dict, partition: str, suffix: str | None = None ) -> str: """Get the input directory where the tracks are stored for the given partition.""" if suffix is None: suffix = "" config = load_config(path_or_config=path_or_config) test_dataset_names = get_required_test_dataset_names(config) if partition in ["train", "val"]: tracks_input_dir = os.path.join( config[f"track_building{suffix}"]["output_dir"], partition ) elif partition in test_dataset_names: tracks_input_dir = os.path.join( config[f"track_building{suffix}"]["output_dir"], "test", partition ) else: raise ValueError( "`partition` is not recognised. It can either be `train`, `val` " "or the name of a test dataset: " + str(test_dataset_names) ) return tracks_input_dir
[docs]def load_tracks_preprocessed_dataframes_given_partition( path_or_config: str | dict, partition: str, suffix: str = "", ) -> typing.Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: config = load_config(path_or_config=path_or_config) detector = get_detector_from_pipeline_config(config) # Load tracks tracks_input_dir = get_tracks_input_directory( config, partition=partition, suffix=suffix ) df_tracks = load_tracks(input_dir=tracks_input_dir) # Load dataframe of hits-particles association and dataframe of particles truncated_paths = get_truncated_paths_for_partition( path_or_config=config, partition=partition ) if detector == "velo": coordinates = ["x", "y", "z"] elif detector == "scifi" or detector == "scifi_xz": coordinates = ["zatyeq0", "xatyeq0", "dxdy"] else: raise TypeError(f"Detector {detector} is not supported") df_hits_particles = load_preprocessed_dataframes( truncated_paths=truncated_paths, ending="-hits_particles", columns=["particle_id", "hit_id", "plane"] + coordinates, ) df_particles = load_preprocessed_dataframes( truncated_paths=truncated_paths, ending="-particles" ) return df_tracks, df_hits_particles, df_particles