Source code for pipeline.TrackBuilding.components
import numpy as np
import pandas as pd
from utils.tools import tarray
try:
import cupy as cp
except ImportError:
cp = None
try:
import cudf
except ImportError:
cudf = None
if cp is not None and cudf is not None:
CuDfDataFrame = cudf.DataFrame
[docs] def cure_max_node_idx(
df_labels: CuDfDataFrame,
max_node_idx: int,
node_column: str = "vertex",
label_column: str = "labels",
) -> CuDfDataFrame:
"""Add the missing node indices in the dataframe returned by the cugraph weakly
connected component algorithm.
Args:
df_labels: Dataframe of connected components with columns ``node_column``
and ``label_column``, which indicate which connected component
(given by ``label_column``) each node belongs to.
max_node_idx: maximal node index. The node indices are assumed to range from
0 to this maximal value included.
node_column: name of the node index column in ``df_labels``
label_column: name of the connected component label in ``df_labels``
Notes:
The connected component algorithm of ``cugraph`` assumes that the maximal
node index is connected to an edge. In order for this algorithm to properly
return the labels of disconnected components, the maximal node index needs
to be present in edge indices of the graph.
"""
max_node_idx_in_df_labels = df_labels[node_column].max() # type: ignore
if max_node_idx > max_node_idx_in_df_labels:
max_track_id = df_labels[label_column].max() # type: ignore
df_new_labels = cudf.DataFrame(
{
node_column: cp.arange(
start=max_node_idx_in_df_labels + 1,
stop=max_node_idx + 1,
dtype=df_labels[node_column].to_cupy().dtype, # type: ignore
),
label_column: cp.arange(
start=max_track_id + 1,
stop=max_track_id
+ 1
+ (max_node_idx - max_node_idx_in_df_labels),
dtype=df_labels[label_column].to_cupy().dtype, # type: ignore
),
}
)
return cudf.concat((df_labels, df_new_labels), axis=0)
else:
return df_labels
def _connected_components_gpu(df_edges: CuDfDataFrame, max_node_idx: int):
"""Apply the connected components algorithm on GPU."""
import cugraph
graph = cugraph.from_cudf_edgelist(df_edges, renumber=False)
return cure_max_node_idx(
df_labels=cugraph.weakly_connected_components(graph), # type: ignore
max_node_idx=max_node_idx,
)
def _connected_components_cpu(df_edges: pd.DataFrame, max_node_idx: int):
"""Apply the connected components algorithm on CPU."""
if df_edges.shape[0]:
import scipy.sparse as sps
sparse_edges = sps.coo_matrix(
(
np.ones(df_edges.shape[0]),
(df_edges["source"].to_numpy(), df_edges["destination"].to_numpy()),
),
(max_node_idx + 1, max_node_idx + 1),
)
_, candidate_labels = sps.csgraph.connected_components(
sparse_edges, directed=False, return_labels=True
)
return pd.DataFrame(
{
"vertex": np.arange(max_node_idx + 1),
"labels": candidate_labels,
}
)
else:
return pd.DataFrame(
{
"vertex": [],
"labels": [],
}
)
[docs]def connected_components(
df_edges: tarray.DataFrame, max_node_idx: int
) -> tarray.DataFrame:
"""Apply the connected component algorithm. If ``df_edges`` is on GPU,
``cugraph`` is used. Otherwise, ``scipy`` is used.
Args:
df_edges: Dataframe of edges, with columns
``hit_idx_left`` and ``hit_idx_right``
max_node_idx: Maximal node index
Returns:
Dataframe of connected components, with columns ``vertex`` and ``labels``.
"""
if isinstance(df_edges, pd.DataFrame):
return _connected_components_cpu(df_edges=df_edges, max_node_idx=max_node_idx)
elif cudf is not None and isinstance(df_edges, cudf.DataFrame):
return _connected_components_gpu(df_edges=df_edges, max_node_idx=max_node_idx)
else:
raise TypeError(
f"`df_edges` is a {type(df_edges).__name__} instead of a DataFrame."
)