Source code for pipeline.utils.graphutils.truths

"""A module that implements various ways of getting the intersection of the
predicted and truth graphs in order to get the target ``y``.

The Exa.TrkX function remains the fastest.
"""

from __future__ import annotations
import numpy as np
import numpy.typing as npt
import torch
from utils.tools import tarray
import scipy as sp


default_device = "cuda" if torch.cuda.is_available() else "cpu"


[docs]def get_truths_exatrkx( edge_indices: torch.Tensor, true_edge_indices: torch.Tensor, device: str | torch.device | None = None, ): """Get the targets of each edge in ``edge_indices`` given the true edges in ``true_edge_indices``. Args: edge_indices: predicted edge indices true_edge_indices: true edge indices Returns: Edge indices (might be in a different order) and corresponding targets. Notes: The function turns the tensors into numpy arrays, on CPU. """ if device is None: device = edge_indices.device elif str(device) == "cuda:0": device = "cuda" if edge_indices.shape[1] == 0: return edge_indices, torch.zeros( size=(0,), dtype=torch.bool, device=edge_indices.device ) elif true_edge_indices.shape[1] == 0: return edge_indices, torch.zeros( size=(edge_indices.shape[1],), dtype=torch.bool, device=edge_indices.device ) else: edge_indices_np: npt.NDArray = edge_indices.cpu().numpy() true_edge_indices_np: npt.NDArray = true_edge_indices.cpu().numpy() n_nodes = max(edge_indices_np.max(), true_edge_indices_np.max()) + 1 predicted_csr = sp.sparse.coo_matrix( (np.ones(edge_indices_np.shape[1]), edge_indices_np), shape=(n_nodes, n_nodes), ).tocsr() true_csr = sp.sparse.coo_matrix( (np.ones(true_edge_indices_np.shape[1]), true_edge_indices_np), shape=(n_nodes, n_nodes), ).tocsr() intersection_csr = predicted_csr.multiply(true_csr) - ( (predicted_csr - true_csr) > 0 ) intersection_coo = intersection_csr.tocoo() new_edge_indices = ( torch.from_numpy(np.vstack([intersection_coo.row, intersection_coo.col])) .long() .to(device) ) y = torch.from_numpy(intersection_coo.data > 0).to(device) return new_edge_indices, y
[docs]def get_truths_pytorch( edge_indices: torch.Tensor, true_edge_indices: torch.Tensor ) -> torch.Tensor: """Get the targets of each edge in ``edge_indices`` given the true edges in ``true_edge_indices``. Args: edge_indices: predicted edge indices true_edge_indices: true edge indices Returns: Edge targets. """ pred_graph_expanded = edge_indices.unsqueeze(-1) # Shape: [2, n, 1] truth_graph_expanded = true_edge_indices.unsqueeze(-2) # Shape: [2, 1, m] # Calculate the element-wise equality and apply the logical AND along dimension 0 (rows) equal_edges = (pred_graph_expanded == truth_graph_expanded).all( dim=0 ) # Shape: [n, m] # Check if any element in true_edge_indices matches each element in edge_indices y = equal_edges.any(dim=-1) return y
[docs]def get_truth_cudf( edge_indices: torch.Tensor, true_edge_indices: torch.Tensor, ) -> torch.Tensor: """Get the truth array ``y``. This function is approximatively 7 times slower than going to the CSR representation. """ use_cuda = edge_indices.device.type == "cuda" np_or_cp = tarray.get_numpy_or_cupy(use_cuda=use_cuda) df_edges = tarray.to_dataframe( { "left_idx": edge_indices[0], "right_idx": edge_indices[1], "edge_idx": np_or_cp.arange(edge_indices.shape[1]), }, use_cuda=use_cuda, ) df_true_edges = tarray.to_dataframe( { "left_idx": true_edge_indices[0], "right_idx": true_edge_indices[1], }, use_cuda=use_cuda, ) df_merged_edges = df_edges.merge( df_true_edges, on=["left_idx", "right_idx"], how="left", indicator=True, ) edge_truths = df_merged_edges["_merge"] == "both" return tarray.series_to_tensor(edge_truths)