Source code for pipeline.utils.graphutils.truths
"""A module that implements various ways of getting the intersection of the
predicted and truth graphs in order to get the target ``y``.
The Exa.TrkX function remains the fastest.
"""
from __future__ import annotations
import numpy as np
import numpy.typing as npt
import torch
from utils.tools import tarray
import scipy as sp
default_device = "cuda" if torch.cuda.is_available() else "cpu"
[docs]def get_truths_exatrkx(
edge_indices: torch.Tensor,
true_edge_indices: torch.Tensor,
device: str | torch.device | None = None,
):
"""Get the targets of each edge in ``edge_indices`` given the true edges
in ``true_edge_indices``.
Args:
edge_indices: predicted edge indices
true_edge_indices: true edge indices
Returns:
Edge indices (might be in a different order) and corresponding targets.
Notes:
The function turns the tensors into numpy arrays, on CPU.
"""
if device is None:
device = edge_indices.device
elif str(device) == "cuda:0":
device = "cuda"
if edge_indices.shape[1] == 0:
return edge_indices, torch.zeros(
size=(0,), dtype=torch.bool, device=edge_indices.device
)
elif true_edge_indices.shape[1] == 0:
return edge_indices, torch.zeros(
size=(edge_indices.shape[1],), dtype=torch.bool, device=edge_indices.device
)
else:
edge_indices_np: npt.NDArray = edge_indices.cpu().numpy()
true_edge_indices_np: npt.NDArray = true_edge_indices.cpu().numpy()
n_nodes = max(edge_indices_np.max(), true_edge_indices_np.max()) + 1
predicted_csr = sp.sparse.coo_matrix(
(np.ones(edge_indices_np.shape[1]), edge_indices_np),
shape=(n_nodes, n_nodes),
).tocsr()
true_csr = sp.sparse.coo_matrix(
(np.ones(true_edge_indices_np.shape[1]), true_edge_indices_np),
shape=(n_nodes, n_nodes),
).tocsr()
intersection_csr = predicted_csr.multiply(true_csr) - (
(predicted_csr - true_csr) > 0
)
intersection_coo = intersection_csr.tocoo()
new_edge_indices = (
torch.from_numpy(np.vstack([intersection_coo.row, intersection_coo.col]))
.long()
.to(device)
)
y = torch.from_numpy(intersection_coo.data > 0).to(device)
return new_edge_indices, y
[docs]def get_truths_pytorch(
edge_indices: torch.Tensor, true_edge_indices: torch.Tensor
) -> torch.Tensor:
"""Get the targets of each edge in ``edge_indices`` given the true edges
in ``true_edge_indices``.
Args:
edge_indices: predicted edge indices
true_edge_indices: true edge indices
Returns:
Edge targets.
"""
pred_graph_expanded = edge_indices.unsqueeze(-1) # Shape: [2, n, 1]
truth_graph_expanded = true_edge_indices.unsqueeze(-2) # Shape: [2, 1, m]
# Calculate the element-wise equality and apply the logical AND along dimension 0 (rows)
equal_edges = (pred_graph_expanded == truth_graph_expanded).all(
dim=0
) # Shape: [n, m]
# Check if any element in true_edge_indices matches each element in edge_indices
y = equal_edges.any(dim=-1)
return y
[docs]def get_truth_cudf(
edge_indices: torch.Tensor,
true_edge_indices: torch.Tensor,
) -> torch.Tensor:
"""Get the truth array ``y``. This function is approximatively 7 times slower than
going to the CSR representation.
"""
use_cuda = edge_indices.device.type == "cuda"
np_or_cp = tarray.get_numpy_or_cupy(use_cuda=use_cuda)
df_edges = tarray.to_dataframe(
{
"left_idx": edge_indices[0],
"right_idx": edge_indices[1],
"edge_idx": np_or_cp.arange(edge_indices.shape[1]),
},
use_cuda=use_cuda,
)
df_true_edges = tarray.to_dataframe(
{
"left_idx": true_edge_indices[0],
"right_idx": true_edge_indices[1],
},
use_cuda=use_cuda,
)
df_merged_edges = df_edges.merge(
df_true_edges,
on=["left_idx", "right_idx"],
how="left",
indicator=True,
)
edge_truths = df_merged_edges["_merge"] == "both"
return tarray.series_to_tensor(edge_truths)