Source code for pipeline.TrackBuilding.components
import numpy as np
import pandas as pd
from utils.tools import tarray
try:
    import cupy as cp
except ImportError:
    cp = None
try:
    import cudf
except ImportError:
    cudf = None
if cp is not None and cudf is not None:
    CuDfDataFrame = cudf.DataFrame
[docs]    def cure_max_node_idx(
        df_labels: CuDfDataFrame,
        max_node_idx: int,
        node_column: str = "vertex",
        label_column: str = "labels",
    ) -> CuDfDataFrame:
        """Add the missing node indices in the dataframe returned by the cugraph weakly
        connected component algorithm.
        Args:
            df_labels: Dataframe of connected components with columns ``node_column``
                and ``label_column``, which indicate which connected component
                (given by ``label_column``) each node belongs to.
            max_node_idx: maximal node index. The node indices are assumed to range from
                0 to this maximal value included.
            node_column: name of the node index column in ``df_labels``
            label_column: name of the connected component label in ``df_labels``
        Notes:
            The connected component algorithm of ``cugraph`` assumes that the maximal
            node index is connected to an edge. In order for this algorithm to properly
            return the labels of disconnected components, the maximal node index needs
            to be present in edge indices of the graph.
        """
        max_node_idx_in_df_labels = df_labels[node_column].max()  # type: ignore
        if max_node_idx > max_node_idx_in_df_labels:
            max_track_id = df_labels[label_column].max()  # type: ignore
            df_new_labels = cudf.DataFrame(
                {
                    node_column: cp.arange(
                        start=max_node_idx_in_df_labels + 1,
                        stop=max_node_idx + 1,
                        dtype=df_labels[node_column].to_cupy().dtype,  # type: ignore
                    ),
                    label_column: cp.arange(
                        start=max_track_id + 1,
                        stop=max_track_id
                        + 1
                        + (max_node_idx - max_node_idx_in_df_labels),
                        dtype=df_labels[label_column].to_cupy().dtype,  # type: ignore
                    ),
                }
            )
            return cudf.concat((df_labels, df_new_labels), axis=0)
        else:
            return df_labels 
    def _connected_components_gpu(df_edges: CuDfDataFrame, max_node_idx: int):
        """Apply the connected components algorithm on GPU."""
        import cugraph
        graph = cugraph.from_cudf_edgelist(df_edges, renumber=False)
        return cure_max_node_idx(
            df_labels=cugraph.weakly_connected_components(graph),  # type: ignore
            max_node_idx=max_node_idx,
        )
def _connected_components_cpu(df_edges: pd.DataFrame, max_node_idx: int):
    """Apply the connected components algorithm on CPU."""
    if df_edges.shape[0]:
        import scipy.sparse as sps
        sparse_edges = sps.coo_matrix(
            (
                np.ones(df_edges.shape[0]),
                (df_edges["source"].to_numpy(), df_edges["destination"].to_numpy()),
            ),
            (max_node_idx + 1, max_node_idx + 1),
        )
        _, candidate_labels = sps.csgraph.connected_components(
            sparse_edges, directed=False, return_labels=True
        )
        return pd.DataFrame(
            {
                "vertex": np.arange(max_node_idx + 1),
                "labels": candidate_labels,
            }
        )
    else:
        return pd.DataFrame(
            {
                "vertex": [],
                "labels": [],
            }
        )
[docs]def connected_components(
    df_edges: tarray.DataFrame, max_node_idx: int
) -> tarray.DataFrame:
    """Apply the connected component algorithm. If ``df_edges`` is on GPU,
    ``cugraph`` is used. Otherwise, ``scipy`` is used.
    Args:
        df_edges: Dataframe of edges, with columns
            ``hit_idx_left`` and ``hit_idx_right``
        max_node_idx: Maximal node index
    Returns:
        Dataframe of connected components, with columns ``vertex`` and ``labels``.
    """
    if isinstance(df_edges, pd.DataFrame):
        return _connected_components_cpu(df_edges=df_edges, max_node_idx=max_node_idx)
    elif cudf is not None and isinstance(df_edges, cudf.DataFrame):
        return _connected_components_gpu(df_edges=df_edges, max_node_idx=max_node_idx)
    else:
        raise TypeError(
            f"`df_edges` is a {type(df_edges).__name__} instead of a DataFrame."
        )