Source code for pipeline.utils.plotutils.performance_mpl
"""Plot general performance metrics using matplotlib only.
"""
from __future__ import annotations
import typing
import os.path as op
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from matplotlib.axes import Axes
from matplotlib.ticker import MaxNLocator
import numpy as np
import pandas as pd
from uncertainties import ufloat, unumpy
import torch
from utils.commonutils.config import get_performance_directory_experiment
from utils.modelutils.metrics import compute_classification_efficiency_purity
from GNN.triplet_gnn_base import TripletGNNBase
from .plotools import save_fig, add_text
from .plotconfig import partition_to_color, partition_to_label
metric_labels = {
    "eff": "Average edge efficiency per event",
    "pur": "Average edge purity per event",
}
metric_colors = {"eff": "purple", "pur": "blue"}
[docs]def get_auto_output_dir(
    path_or_config: str | dict | None = None, step: str | None = None
) -> str | None:
    if step is not None:
        if path_or_config is None:
            raise ValueError("`step` was provided but `path_or_config` was not.")
        else:
            output_dir = op.join(
                get_performance_directory_experiment(path_or_config), step
            )
    else:
        output_dir = None
    return output_dir
[docs]def plot_metric_epochs(
    metric_name: str,
    metrics: pd.DataFrame,
    metric_label: str | None = None,
    ax: Axes | None = None,
    marker: str = ".",
    lhcb: bool = False,
    path_or_config: str | dict | None = None,
    step: str | None = None,
    output_path: str | None = None,
    **kwargs,
) -> typing.Tuple[Figure | None, Axes]:
    """Plot a metric as a function of the epoch number
    Args:
        metric_name: name of the metric to plot in the dataframe of ``metrics``
        metrics: dataframe of metric values computed during training. It must contain
            the two columns ``train_loss`` and ``val_loss``
        name: Name of the step (e.g., ``gnn``, ``embedding``). If not given,
            the plot is not saved.
        metric_label: Label of the metric. Used in the y-axis
        marker: Marker format used in the plot
        ax: Matplotlib Axes to plot on.
        **kwargs: Other arguments passed to :py:func:`matplotlib.axes.Axes.plot`
    Returns:
        Figure and Axes of the plot
    """
    if ax is None:
        fig, ax_ = plt.subplots(figsize=(8, 6))
    else:
        fig = None
        ax_ = ax
    ax_.plot(metrics["epoch"], metrics[metric_name], marker=marker, **kwargs)
    ax_.set_xlabel("Sub-epoch")
    ax_.set_ylabel(metric_name if metric_label is None else metric_label)
    ax_.xaxis.set_major_locator(MaxNLocator(integer=True))
    ax_.grid(color="grey", alpha=0.5)
    if lhcb:
        add_text(ax=ax_, ha="right", y=0.7)
    if fig is not None:
        if output_path is None:
            output_dir = get_auto_output_dir(path_or_config=path_or_config, step=step)
            output_path = (
                op.join(output_dir, metric_name) if output_dir is not None else None
            )
        if output_path is not None:
            save_fig(fig, output_path)
    return fig, ax_
[docs]def plot_loss(
    metrics,
    output_path: str | None = None,
    path_or_config: str | dict | None = None,
    step: str | None = None,
    lhcb: bool = False,
) -> typing.Tuple[Figure, Axes]:
    """Plot the training and validation loss on the same plot.
    Args:
        metrics: dataframe of metric values computed during training. It must contain
            the two columns ``train_loss`` and ``val_loss``
        path_or_config: pipeline configuration
        step: Name of the step (e.g., ``gnn``, ``embedding``). If not given,
            and ``output_path`` is not given, the plot is not saved
        lhcb: whether to add "LHCb Simulation" to the plot.
    Returns:
        Figure and Axes of the plot
    """
    fig, ax = plt.subplots(figsize=(8, 6))
    for partition in ["train", "val"]:
        plot_metric_epochs(
            metric_name=f"{partition}_loss",
            metric_label="Loss",
            metrics=metrics,
            color=partition_to_color[partition],
            label=partition_to_label[partition],
            ax=ax,
        )
    ax.legend()
    ax.grid(color="grey", alpha=0.5)
    if lhcb:
        add_text(ax, ha="right", y=0.7)
    if output_path is None:
        output_dir = get_auto_output_dir(path_or_config=path_or_config, step=step)
        output_path = op.join(output_dir, "loss") if output_dir is not None else None
    if output_path is not None:
        save_fig(fig=fig, path=op.join(output_path))
    return fig, ax
[docs]def plot_edge_performance(
    model: TripletGNNBase,
    edge_score_cuts: typing.Sequence[float],
    path_or_config: str | dict | None = None,
    identifier: str | None = None,
    max_n_events: int | None = None,
    output_path: str | None = None,
    lhcb: bool = False,
    edge_partitions: typing.List[str] | None = None,
):
    """Plot the triplet selection efficiency and purity.
    Args:
        edge_score_cuts: Sequence of miminal edge scores to try
        path_or_config: Configuration of the current pipeline
        identifier: string to put in the name of the file
        max_nevents: Maximal number of events to use for this evaluation
        output_path: path where to save the plot
        lhcb: whether to add the ``LHCb Simulation`` text
    Returns:
        Dictionary of pandas dataframes that provides the efficiency and purity
        as a function of the edge score cut.
    """
    if identifier is None:
        identifier = ""
    if edge_partitions is None:
        edge_partitions = ["__all__"]
    results = {partition: {"eff": [], "pur": []} for partition in edge_partitions}
    assert model.testset is not None
    testset = model.testset if max_n_events is None else model.testset[:max_n_events]
    efficiencies = {
        partition: np.zeros(shape=(len(testset), len(edge_score_cuts)))
        for partition in edge_partitions
    }
    purities = {
        partition: np.zeros(shape=(len(testset), len(edge_score_cuts)))
        for partition in edge_partitions
    }
    for data_idx, test_data in enumerate(tqdm(testset)):
        with torch.no_grad():
            outputs = model.inference(
                batch=test_data.to(device=model.device), with_triplets=False  # type: ignore
            )
            score = torch.sigmoid(outputs["edge_output"])
        for cut_idx, cut in enumerate(edge_score_cuts):
            preds = score > cut
            for partition in edge_partitions:
                if partition == "__all__":
                    preds_part = preds
                    truths_part = test_data.y
                else:
                    partition_mask = outputs["edge_partitions"][partition]
                    preds_part = preds[partition_mask]
                    truths_part = test_data.y[partition_mask]
                eff, pur = compute_classification_efficiency_purity(
                    predictions=preds_part, truths=truths_part
                )
                efficiencies[partition][data_idx, cut_idx] = eff
                purities[partition][data_idx, cut_idx] = pur
    for partition in edge_partitions:
        for cut_idx, cut in enumerate(edge_score_cuts):
            efficiencies_cut = efficiencies[partition][:, cut_idx]
            purities_cut = purities[partition][:, cut_idx]
            results[partition]["eff"].append(
                ufloat(efficiencies_cut.mean(), efficiencies_cut.std())
            )
            results[partition]["pur"].append(
                ufloat(purities_cut.mean(), purities_cut.std())
            )
    results = {
        partition: pd.DataFrame({"score_cut": edge_score_cuts, **results[partition]})
        for partition in edge_partitions
    }
    fig, ax = plt.subplots(figsize=(8, 6))
    ax.set_xlabel(r"$s_{\text{edge, min}}$")
    ax.set_ylabel("Value")
    for metric_name in ["eff", "pur"]:
        for partition in edge_partitions:
            label_suffix = "" if partition == "__all__" else f": {partition}"
            ax.errorbar(
                x=results[partition]["score_cut"],
                y=unumpy.nominal_values(results[partition][metric_name]),
                yerr=unumpy.std_devs(results[partition][metric_name]),
                label=metric_labels[metric_name] + label_suffix,
                color=metric_colors[metric_name] if partition == "__all__" else None,
                alpha=1.0 if partition == "__all__" else 0.6,
            )
        ax.grid(color="grey", alpha=0.5, which="both", axis="both")
    ax.legend(loc="lower right")
    if lhcb:
        add_text(ax=ax, ha="right", y=0.3)
    if output_path is None:
        assert path_or_config is not None
        output_path = op.join(
            get_performance_directory_experiment(path_or_config),
            "gnn",
            f"edge_performance{identifier}",
        )
    save_fig(fig, output_path)
    return results
[docs]def plot_triplet_performance(
    model,
    edge_score_cut: float,
    triplet_score_cuts: typing.Sequence[float],
    path_or_config: str | dict | None = None,
    identifier: str | None = None,
    max_n_events: int | None = None,
    output_path: str | None = None,
    lhcb: bool = False,
):
    """Plot the triplet selection efficiency and purity.
    Args:
        edge_score_cut: minimal edge score to require to build the triplets
        triplet_score_cuts: Sequence of miminal triplet scores to try
        path_or_config: Configuration of the current pipeline
        identifier: string to put in the name of the file
        max_nevents: Maximal number of events to use for this evaluation
        output_path: Path where to save the plot
        lhcb: whether to add the ``LHCb Simulation`` text
    Returns:
        Dictionary of pandas dataframes that provides the efficiency and purity
        as a function of the triplet score cut.
    """
    if identifier is None:
        identifier = ""
    model.hparams["edge_score_cut"] = edge_score_cut
    testset = model.testset if max_n_events is None else model.testset[:max_n_events]
    triplet_names = ["articulation", "elbow_left", "elbow_right"]
    dict_efficiencies = {
        triplet_name: np.zeros(shape=(len(testset), len(triplet_score_cuts)))
        for triplet_name in triplet_names
    }
    dict_purities = {
        triplet_name: np.zeros(shape=(len(testset), len(triplet_score_cuts)))
        for triplet_name in triplet_names
    }
    for data_idx, test_data in enumerate(tqdm(testset)):
        with torch.no_grad():
            test_data = test_data.to(model.device)
            outputs = model.inference(
                batch=test_data,
                with_triplets=True,
                edge_score_cut=edge_score_cut,
                with_triplet_truths=True,
            )
            triplet_truths = outputs["triplet_truths"]
            triplet_scores = (
                {
                    triplet_name: torch.sigmoid(triplet_output)
                    for triplet_name, triplet_output in outputs[
                        "triplet_outputs"
                    ].items()
                }
                if "triplet_scores" not in outputs
                else outputs["triplet_scores"]
            )
        for triplet_name in triplet_names:
            for cut_idx, cut in enumerate(triplet_score_cuts):
                eff, pur = compute_classification_efficiency_purity(
                    predictions=triplet_scores[triplet_name] > cut,
                    truths=triplet_truths[triplet_name],
                )
                dict_efficiencies[triplet_name][data_idx, cut_idx] = eff
                dict_purities[triplet_name][data_idx, cut_idx] = pur
    dict_results = {}
    for triplet_name in triplet_names:
        triplet_results = {
            "eff": [],
            "pur": [],
            "triplet_score_cut": triplet_score_cuts,
        }
        for cut_idx, cut in enumerate(triplet_score_cuts):
            efficiencies_cut = dict_efficiencies[triplet_name][:, cut_idx]
            purities_cut = dict_purities[triplet_name][:, cut_idx]
            triplet_results["eff"].append(
                ufloat(efficiencies_cut.mean(), efficiencies_cut.std())
            )
            triplet_results["pur"].append(
                ufloat(purities_cut.mean(), purities_cut.std())
            )
        dict_results[triplet_name] = pd.DataFrame(triplet_results)
    for triplet_name, results in dict_results.items():
        fig, ax = plt.subplots(figsize=(8, 6))
        ax.set_xlabel(r"$s_{\text{triplet, min}}$")
        ax.set_ylabel("Value")
        for metric_name in ["eff", "pur"]:
            ax.errorbar(
                x=results["triplet_score_cut"],
                y=unumpy.nominal_values(results[metric_name]),
                yerr=unumpy.std_devs(results[metric_name]),
                label=metric_labels[metric_name],
                color=metric_colors[metric_name],
            )
            ax.grid(color="grey", alpha=0.5, which="both", axis="both")
        ax.legend()
        if lhcb:
            add_text(ax, ha="left", y=0.3)
        if output_path is None:
            assert path_or_config is not None
            output_path_ = op.join(
                get_performance_directory_experiment(path_or_config),
                "gnn",
                f"triplet_performance{identifier}_{triplet_name}.png",
            )
        else:
            output_path_ = output_path.format(triplet_name=triplet_name)
        save_fig(fig, output_path_)
    return dict_results
