Source code for pipeline.TrackBuilding.components

import numpy as np
import pandas as pd
from utils.tools import tarray

try:
    import cupy as cp

except ImportError:
    cp = None

try:
    import cudf

except ImportError:
    cudf = None

if cp is not None and cudf is not None:
    CuDfDataFrame = cudf.DataFrame

[docs] def cure_max_node_idx( df_labels: CuDfDataFrame, max_node_idx: int, node_column: str = "vertex", label_column: str = "labels", ) -> CuDfDataFrame: """Add the missing node indices in the dataframe returned by the cugraph weakly connected component algorithm. Args: df_labels: Dataframe of connected components with columns ``node_column`` and ``label_column``, which indicate which connected component (given by ``label_column``) each node belongs to. max_node_idx: maximal node index. The node indices are assumed to range from 0 to this maximal value included. node_column: name of the node index column in ``df_labels`` label_column: name of the connected component label in ``df_labels`` Notes: The connected component algorithm of ``cugraph`` assumes that the maximal node index is connected to an edge. In order for this algorithm to properly return the labels of disconnected components, the maximal node index needs to be present in edge indices of the graph. """ max_node_idx_in_df_labels = df_labels[node_column].max() # type: ignore if max_node_idx > max_node_idx_in_df_labels: max_track_id = df_labels[label_column].max() # type: ignore df_new_labels = cudf.DataFrame( { node_column: cp.arange( start=max_node_idx_in_df_labels + 1, stop=max_node_idx + 1, dtype=df_labels[node_column].to_cupy().dtype, # type: ignore ), label_column: cp.arange( start=max_track_id + 1, stop=max_track_id + 1 + (max_node_idx - max_node_idx_in_df_labels), dtype=df_labels[label_column].to_cupy().dtype, # type: ignore ), } ) return cudf.concat((df_labels, df_new_labels), axis=0) else: return df_labels
def _connected_components_gpu(df_edges: CuDfDataFrame, max_node_idx: int): """Apply the connected components algorithm on GPU.""" import cugraph graph = cugraph.from_cudf_edgelist(df_edges, renumber=False) return cure_max_node_idx( df_labels=cugraph.weakly_connected_components(graph), # type: ignore max_node_idx=max_node_idx, ) def _connected_components_cpu(df_edges: pd.DataFrame, max_node_idx: int): """Apply the connected components algorithm on CPU.""" if df_edges.shape[0]: import scipy.sparse as sps sparse_edges = sps.coo_matrix( ( np.ones(df_edges.shape[0]), (df_edges["source"].to_numpy(), df_edges["destination"].to_numpy()), ), (max_node_idx + 1, max_node_idx + 1), ) _, candidate_labels = sps.csgraph.connected_components( sparse_edges, directed=False, return_labels=True ) return pd.DataFrame( { "vertex": np.arange(max_node_idx + 1), "labels": candidate_labels, } ) else: return pd.DataFrame( { "vertex": [], "labels": [], } )
[docs]def connected_components( df_edges: tarray.DataFrame, max_node_idx: int ) -> tarray.DataFrame: """Apply the connected component algorithm. If ``df_edges`` is on GPU, ``cugraph`` is used. Otherwise, ``scipy`` is used. Args: df_edges: Dataframe of edges, with columns ``hit_idx_left`` and ``hit_idx_right`` max_node_idx: Maximal node index Returns: Dataframe of connected components, with columns ``vertex`` and ``labels``. """ if isinstance(df_edges, pd.DataFrame): return _connected_components_cpu(df_edges=df_edges, max_node_idx=max_node_idx) elif cudf is not None and isinstance(df_edges, cudf.DataFrame): return _connected_components_gpu(df_edges=df_edges, max_node_idx=max_node_idx) else: raise TypeError( f"`df_edges` is a {type(df_edges).__name__} instead of a DataFrame." )