Skip to content

Metrics

online_cp.metrics.Metric

Base class for online evaluation metrics.

Subclasses must implement _score(self, y, Gamma, **kw) which returns a single scalar for one observation.

Source code in src/online_cp/metrics.py
class Metric:
    """Base class for online evaluation metrics.

    Subclasses must implement ``_score(self, y, Gamma, **kw)`` which
    returns a single scalar for one observation.
    """

    @property
    def name(self) -> str:
        return self.__class__.__name__

    def __init__(self) -> None:
        self._values: list[float] = []
        self._sum = 0.0
        self._n = 0

    def update(self, y: Any = None, Gamma: Any = None, **kw: Any) -> float:
        """Record one observation.

        Parameters
        ----------
        y : scalar
            True label / response.
        Gamma : ConformalPredictionSet or ConformalPredictionInterval
            Prediction output from a conformal predictor.
        **kw : dict
            Additional keyword arguments (p_values, cpd, epsilon, etc.).
            Each metric picks what it needs.

        Returns
        -------
        float
            The metric value for this observation.
        """
        val = self._score(y=y, Gamma=Gamma, **kw)
        self._values.append(val)
        self._sum += val
        self._n += 1
        return val

    def _score(self, y, Gamma, **kw):
        raise NotImplementedError

    def get(self) -> float:
        """Return the running mean of the metric."""
        if self._n == 0:
            return 0.0
        return self._sum / self._n

    @property
    def values(self) -> NDArray[np.floating[Any]]:
        """Per-step history as a numpy array."""
        return np.asarray(self._values)

    def cumulative_mean(self) -> NDArray[np.floating[Any]]:
        """Cumulative running mean at each step."""
        return np.cumsum(self._values) / np.arange(1, self._n + 1)

    def reset(self) -> None:
        """Reset the metric to its initial state."""
        self._values = []
        self._sum = 0.0
        self._n = 0

    def __repr__(self) -> str:
        return f"{self.name}: {self.get():.4f}"

    def __add__(self, other: Metric | Metrics) -> Metrics:
        if isinstance(other, Metrics):
            return Metrics([self] + other._metrics)
        if isinstance(other, Metric):
            return Metrics([self, other])
        return NotImplemented

    def __radd__(self, other):
        if other == 0:
            return self
        return NotImplemented

values: NDArray[np.floating[Any]] property

Per-step history as a numpy array.

update(y: Any = None, Gamma: Any = None, **kw: Any) -> float

Record one observation.

Parameters:

Name Type Description Default
y scalar

True label / response.

None
Gamma ConformalPredictionSet or ConformalPredictionInterval

Prediction output from a conformal predictor.

None
**kw dict

Additional keyword arguments (p_values, cpd, epsilon, etc.). Each metric picks what it needs.

{}

Returns:

Type Description
float

The metric value for this observation.

Source code in src/online_cp/metrics.py
def update(self, y: Any = None, Gamma: Any = None, **kw: Any) -> float:
    """Record one observation.

    Parameters
    ----------
    y : scalar
        True label / response.
    Gamma : ConformalPredictionSet or ConformalPredictionInterval
        Prediction output from a conformal predictor.
    **kw : dict
        Additional keyword arguments (p_values, cpd, epsilon, etc.).
        Each metric picks what it needs.

    Returns
    -------
    float
        The metric value for this observation.
    """
    val = self._score(y=y, Gamma=Gamma, **kw)
    self._values.append(val)
    self._sum += val
    self._n += 1
    return val

get() -> float

Return the running mean of the metric.

Source code in src/online_cp/metrics.py
def get(self) -> float:
    """Return the running mean of the metric."""
    if self._n == 0:
        return 0.0
    return self._sum / self._n

cumulative_mean() -> NDArray[np.floating[Any]]

Cumulative running mean at each step.

Source code in src/online_cp/metrics.py
def cumulative_mean(self) -> NDArray[np.floating[Any]]:
    """Cumulative running mean at each step."""
    return np.cumsum(self._values) / np.arange(1, self._n + 1)

reset() -> None

Reset the metric to its initial state.

Source code in src/online_cp/metrics.py
def reset(self) -> None:
    """Reset the metric to its initial state."""
    self._values = []
    self._sum = 0.0
    self._n = 0

online_cp.metrics.Metrics

Composite of multiple metrics, created via the + operator.

Example

metric = ErrorRate() + IntervalWidth() metric.update(y=1.0, Gamma=interval)

Source code in src/online_cp/metrics.py
class Metrics:
    """Composite of multiple metrics, created via the ``+`` operator.

    Example
    -------
    >>> metric = ErrorRate() + IntervalWidth()
    >>> metric.update(y=1.0, Gamma=interval)
    """

    def __init__(self, metrics: list[Metric]) -> None:
        self._metrics = list(metrics)

    def update(self, y: Any = None, Gamma: Any = None, **kw: Any) -> None:
        """Update all contained metrics."""
        for m in self._metrics:
            m.update(y=y, Gamma=Gamma, **kw)

    def get(self) -> dict[str, float]:
        """Return a dict of {name: running_mean} for all metrics."""
        return {m.name: m.get() for m in self._metrics}

    def reset(self) -> None:
        """Reset all metrics."""
        for m in self._metrics:
            m.reset()

    def __repr__(self) -> str:
        return "\n".join(repr(m) for m in self._metrics)

    def __add__(self, other: Metrics | Metric) -> Metrics:
        if isinstance(other, Metrics):
            return Metrics(self._metrics + other._metrics)
        if isinstance(other, Metric):
            return Metrics(self._metrics + [other])
        return NotImplemented

    def __getitem__(self, key):
        """Access a metric by name or index."""
        if isinstance(key, int):
            return self._metrics[key]
        for m in self._metrics:
            if m.name == key:
                return m
        raise KeyError(f"Metric '{key}' not found")

    def __iter__(self):
        return iter(self._metrics)

    def __len__(self):
        return len(self._metrics)

update(y: Any = None, Gamma: Any = None, **kw: Any) -> None

Update all contained metrics.

Source code in src/online_cp/metrics.py
def update(self, y: Any = None, Gamma: Any = None, **kw: Any) -> None:
    """Update all contained metrics."""
    for m in self._metrics:
        m.update(y=y, Gamma=Gamma, **kw)

get() -> dict[str, float]

Return a dict of {name: running_mean} for all metrics.

Source code in src/online_cp/metrics.py
def get(self) -> dict[str, float]:
    """Return a dict of {name: running_mean} for all metrics."""
    return {m.name: m.get() for m in self._metrics}

reset() -> None

Reset all metrics.

Source code in src/online_cp/metrics.py
def reset(self) -> None:
    """Reset all metrics."""
    for m in self._metrics:
        m.reset()

__getitem__(key)

Access a metric by name or index.

Source code in src/online_cp/metrics.py
def __getitem__(self, key):
    """Access a metric by name or index."""
    if isinstance(key, int):
        return self._metrics[key]
    for m in self._metrics:
        if m.name == key:
            return m
    raise KeyError(f"Metric '{key}' not found")

online_cp.metrics.ErrorRate

Bases: Metric

Fraction of times the true label falls outside the prediction set.

Works for both classifiers (prediction sets) and regressors (intervals).

Source code in src/online_cp/metrics.py
class ErrorRate(Metric):
    """Fraction of times the true label falls outside the prediction set.

    Works for both classifiers (prediction sets) and regressors (intervals).
    """

    def _score(self, y, Gamma, **kw):
        return float(y not in Gamma)

online_cp.metrics.ObservedExcess

Bases: Metric

Number of incorrect labels in the prediction set (OE).

For classifiers: |Gamma| - 1 if y in Gamma, else |Gamma|. A conditionally proper efficiency criterion.

Source code in src/online_cp/metrics.py
class ObservedExcess(Metric):
    """Number of incorrect labels in the prediction set (OE).

    For classifiers: |Gamma| - 1 if y in Gamma, else |Gamma|.
    A conditionally proper efficiency criterion.
    """

    def _score(self, y, Gamma, **kw):
        if y in Gamma:
            return float(len(Gamma) - 1)
        return float(len(Gamma))

online_cp.metrics.ObservedFuzziness

Bases: Metric

Sum of p-values for incorrect labels (OF).

Requires p_values keyword argument (dict: label -> p-value). A conditionally proper efficiency criterion independent of epsilon.

Source code in src/online_cp/metrics.py
class ObservedFuzziness(Metric):
    """Sum of p-values for incorrect labels (OF).

    Requires ``p_values`` keyword argument (dict: label -> p-value).
    A conditionally proper efficiency criterion independent of epsilon.
    """

    def _score(self, y, Gamma=None, *, p_values=None, **kw):
        if p_values is None:
            raise ValueError("ObservedFuzziness requires p_values keyword argument")
        return float(sum(p for label, p in p_values.items() if label != y))

online_cp.metrics.SetSize

Bases: Metric

Size of the prediction set (for classifiers).

Source code in src/online_cp/metrics.py
class SetSize(Metric):
    """Size of the prediction set (for classifiers)."""

    def _score(self, y, Gamma, **kw):
        return float(len(Gamma))

online_cp.metrics.IntervalWidth

Bases: Metric

Width of the prediction interval (for regressors).

Source code in src/online_cp/metrics.py
class IntervalWidth(Metric):
    """Width of the prediction interval (for regressors)."""

    def _score(self, y, Gamma, **kw):
        return float(Gamma.width())

online_cp.metrics.WinklerScore

Bases: Metric

Winkler interval score — a proper scoring rule for interval forecasts.

Requires the prediction interval to have .lower and .upper attributes, and epsilon to be provided.

Source code in src/online_cp/metrics.py
class WinklerScore(Metric):
    """Winkler interval score — a proper scoring rule for interval forecasts.

    Requires the prediction interval to have ``.lower`` and ``.upper``
    attributes, and ``epsilon`` to be provided.
    """

    def _score(self, y, Gamma, *, epsilon=None, **kw):
        if epsilon is None:
            epsilon = getattr(Gamma, "epsilon", 0.1)
        lower = Gamma.lower
        upper = Gamma.upper
        if not np.isfinite(lower) or not np.isfinite(upper):
            return np.inf
        width = upper - lower
        if y < lower:
            return width + (2.0 / epsilon) * (lower - y)
        elif y > upper:
            return width + (2.0 / epsilon) * (y - upper)
        return width

online_cp.metrics.CRPS

Bases: Metric

Continuous Ranked Probability Score for conformal predictive distributions.

.. deprecated:: This class delegates to :class:TruncatedCRPS. Prefer using TruncatedCRPS or ConformalCRPS explicitly.

Requires cpd keyword argument (a conformal predictive distribution object).

Source code in src/online_cp/metrics.py
class CRPS(Metric):
    """Continuous Ranked Probability Score for conformal predictive distributions.

    .. deprecated::
        This class delegates to :class:`TruncatedCRPS`. Prefer using
        ``TruncatedCRPS`` or ``ConformalCRPS`` explicitly.

    Requires ``cpd`` keyword argument (a conformal predictive distribution object).
    """

    def _score(self, y, Gamma=None, *, cpd=None, **kw):
        import warnings
        warnings.warn(
            "CRPS is deprecated. Use TruncatedCRPS or ConformalCRPS instead.",
            DeprecationWarning,
            stacklevel=4,
        )
        return TruncatedCRPS()._score(y, Gamma, cpd=cpd, **kw)

Venn Prediction Metrics

online_cp.metrics.BrierScore

Bases: Metric

Brier score for Venn predictor outputs.

Evaluates the aggregated point probability from a VennPrediction using the standard Brier score: :math:(p_{\text{point}} - \mathbf{1}\{y = k\})^2 summed over all labels.

Requires venn keyword argument (a VennPrediction object).

Source code in src/online_cp/metrics.py
class BrierScore(Metric):
    """Brier score for Venn predictor outputs.

    Evaluates the aggregated point probability from a ``VennPrediction``
    using the standard Brier score: :math:`(p_{\\text{point}} - \\mathbf{1}\\{y = k\\})^2`
    summed over all labels.

    Requires ``venn`` keyword argument (a ``VennPrediction`` object).
    """

    def _score(self, y, Gamma=None, *, venn=None, **kw):
        if venn is None:
            raise ValueError("BrierScore requires venn keyword argument")
        point = venn.point  # shape (|Y|,), sums to 1
        label_idx = np.searchsorted(venn.label_space, y)
        indicator = np.zeros(len(venn.label_space))
        indicator[label_idx] = 1.0
        return float(np.sum((point - indicator) ** 2))

online_cp.metrics.LogLoss

Bases: Metric

Log loss for Venn predictor outputs.

Evaluates the aggregated point probability from a VennPrediction using negative log-likelihood: :math:-\log(p_{\text{point}}[y]).

Requires venn keyword argument (a VennPrediction object).

Source code in src/online_cp/metrics.py
class LogLoss(Metric):
    """Log loss for Venn predictor outputs.

    Evaluates the aggregated point probability from a ``VennPrediction``
    using negative log-likelihood: :math:`-\\log(p_{\\text{point}}[y])`.

    Requires ``venn`` keyword argument (a ``VennPrediction`` object).
    """

    _EPS = 1e-15  # clip to avoid log(0)

    def _score(self, y, Gamma=None, *, venn=None, **kw):
        if venn is None:
            raise ValueError("LogLoss requires venn keyword argument")
        point = venn.point
        label_idx = np.searchsorted(venn.label_space, y)
        prob_y = np.clip(point[label_idx], self._EPS, 1.0 - self._EPS)
        return float(-np.log(prob_y))

online_cp.metrics.Width

Bases: Metric

Width (sharpness) of a Venn multiprobability prediction.

For binary predictions: :math:p_1 - p_0. For multiclass: mean over labels of (max − min) probability across hypotheses.

Requires venn keyword argument (a VennPrediction object).

Source code in src/online_cp/metrics.py
class Width(Metric):
    """Width (sharpness) of a Venn multiprobability prediction.

    For binary predictions: :math:`p_1 - p_0`.
    For multiclass: mean over labels of (max − min) probability across
    hypotheses.

    Requires ``venn`` keyword argument (a ``VennPrediction`` object).
    """

    def _score(self, y, Gamma=None, *, venn=None, **kw):
        if venn is None:
            raise ValueError("Width requires venn keyword argument")
        probs = venn.probs  # shape (|Y|, |Y|)
        # For each label (column), compute max - min across hypotheses (rows)
        widths = probs.max(axis=0) - probs.min(axis=0)
        return float(widths.mean())

online_cp.metrics.CalibrationError

Bases: Metric

Expected Calibration Error (ECE) for Venn predictor outputs.

Accumulates (predicted probability, true indicator) pairs from a stream of VennPrediction objects, enabling post-hoc ECE computation via binning.

Two modes:

  • use_hypothesis=False (default): evaluates the point estimate from venn.point. This is the aggregated probability and is typically well-calibrated empirically.
  • use_hypothesis=True: evaluates the correct-hypothesis probability :math:P^y(y), which is theoretically calibrated by the Venn validity guarantee (ALRW2 Theorem 6.4).

The per-step _score() returns :math:|p - \mathbf{1}\{y = k\}| (absolute calibration gap), so metric.value gives the running mean absolute error. Use :meth:ece for the standard binned ECE.

For binary classification, the predicted probability is :math:P(y=1). For multiclass, probabilities are stored per-class (one-vs-rest) and ECE is computed as a weighted average across classes.

Requires venn keyword argument (a VennPrediction object).

Parameters:

Name Type Description Default
use_hypothesis bool

If True, use the correct-hypothesis probability :math:P^y(y) instead of the point estimate.

False
max_history int or None

Maximum number of (predicted, observed) pairs to store. If None, stores all. When exceeded, oldest pairs are discarded.

None
Source code in src/online_cp/metrics.py
class CalibrationError(Metric):
    """Expected Calibration Error (ECE) for Venn predictor outputs.

    Accumulates (predicted probability, true indicator) pairs from a
    stream of ``VennPrediction`` objects, enabling post-hoc ECE
    computation via binning.

    Two modes:

    - ``use_hypothesis=False`` (default): evaluates the *point estimate*
      from ``venn.point``. This is the aggregated probability and is
      typically well-calibrated empirically.
    - ``use_hypothesis=True``: evaluates the correct-hypothesis probability
      :math:`P^y(y)`, which is *theoretically calibrated* by the Venn
      validity guarantee (ALRW2 Theorem 6.4).

    The per-step ``_score()`` returns :math:`|p - \\mathbf{1}\\{y = k\\}|`
    (absolute calibration gap), so ``metric.value`` gives the running mean
    absolute error. Use :meth:`ece` for the standard binned ECE.

    For binary classification, the predicted probability is :math:`P(y=1)`.
    For multiclass, probabilities are stored per-class (one-vs-rest) and
    ECE is computed as a weighted average across classes.

    Requires ``venn`` keyword argument (a ``VennPrediction`` object).

    Parameters
    ----------
    use_hypothesis : bool, default False
        If True, use the correct-hypothesis probability :math:`P^y(y)`
        instead of the point estimate.
    max_history : int or None, default None
        Maximum number of (predicted, observed) pairs to store.
        If None, stores all. When exceeded, oldest pairs are discarded.
    """

    def __init__(self, use_hypothesis: bool = False, max_history: int | None = None) -> None:
        super().__init__()
        self.use_hypothesis = use_hypothesis
        self.max_history = max_history
        self._pairs: list[tuple[float, int]] = []  # (predicted_prob, true_indicator)

    def _score(self, y, Gamma=None, *, venn=None, **kw):
        if venn is None:
            raise ValueError("CalibrationError requires venn keyword argument")

        label_idx = int(np.searchsorted(venn.label_space, y))
        # For calibration we track P(y = positive_class) vs 1{y = positive_class}
        # For binary: positive_class = label_space[1]
        # For multiclass: use label_space[-1] (or user can filter externally)
        pos_idx = min(1, len(venn.label_space) - 1)

        if self.use_hypothesis:
            # P^y(positive_class): probability of positive class under correct hypothesis
            pred_prob = float(venn.probs[label_idx, pos_idx])
        else:
            # Point estimate probability for positive class
            pred_prob = float(venn.point[pos_idx])

        indicator = int(label_idx == pos_idx)
        self._pairs.append((pred_prob, indicator))

        if self.max_history is not None and len(self._pairs) > self.max_history:
            self._pairs.pop(0)

        return abs(pred_prob - indicator)

    @property
    def predicted(self) -> NDArray:
        """Array of stored predicted probabilities."""
        if not self._pairs:
            return np.array([])
        return np.array([p for p, _ in self._pairs])

    @property
    def observed(self) -> NDArray:
        """Array of stored true indicators (always 1 for correct-class prob)."""
        if not self._pairs:
            return np.array([])
        return np.array([o for _, o in self._pairs])

    def ece(self, n_bins: int = 10, strategy: str = "uniform") -> float:
        """Compute binned Expected Calibration Error.

        Parameters
        ----------
        n_bins : int, default 10
            Number of bins.
        strategy : str, default "uniform"
            Binning strategy: ``"uniform"`` (equal-width) or
            ``"quantile"`` (equal-mass).

        Returns
        -------
        float
            Weighted average of |mean_predicted - fraction_positive| across
            bins, weighted by bin count.
        """
        if not self._pairs:
            return 0.0

        predicted = self.predicted
        observed = self.observed

        if strategy == "uniform":
            bin_edges = np.linspace(0.0, 1.0, n_bins + 1)
        elif strategy == "quantile":
            quantiles = np.linspace(0.0, 1.0, n_bins + 1)
            bin_edges = np.quantile(predicted, quantiles)
            bin_edges[0] = 0.0
            bin_edges[-1] = 1.0
        else:
            raise ValueError(f"Unknown strategy {strategy!r}. Choose 'uniform' or 'quantile'.")

        ece_val = 0.0
        n_total = len(predicted)

        for i in range(n_bins):
            if i < n_bins - 1:
                mask = (predicted >= bin_edges[i]) & (predicted < bin_edges[i + 1])
            else:
                mask = (predicted >= bin_edges[i]) & (predicted <= bin_edges[i + 1])

            n_bin = mask.sum()
            if n_bin == 0:
                continue

            mean_pred = predicted[mask].mean()
            frac_pos = observed[mask].mean()
            ece_val += (n_bin / n_total) * abs(mean_pred - frac_pos)

        return float(ece_val)

    def bin_data(self, n_bins: int = 10, strategy: str = "uniform") -> tuple[NDArray, NDArray, NDArray]:
        """Return binned calibration data for plotting.

        Parameters
        ----------
        n_bins : int, default 10
            Number of bins.
        strategy : str, default "uniform"
            Binning strategy: ``"uniform"`` or ``"quantile"``.

        Returns
        -------
        mean_predicted : ndarray
            Mean predicted probability per bin.
        fraction_positive : ndarray
            Fraction of positive outcomes per bin.
        bin_counts : ndarray
            Number of samples per bin.
        """
        if not self._pairs:
            return np.array([]), np.array([]), np.array([])

        predicted = self.predicted
        observed = self.observed

        if strategy == "uniform":
            bin_edges = np.linspace(0.0, 1.0, n_bins + 1)
        elif strategy == "quantile":
            quantiles = np.linspace(0.0, 1.0, n_bins + 1)
            bin_edges = np.quantile(predicted, quantiles)
            bin_edges[0] = 0.0
            bin_edges[-1] = 1.0
        else:
            raise ValueError(f"Unknown strategy {strategy!r}.")

        mean_preds = []
        frac_pos = []
        counts = []

        for i in range(n_bins):
            if i < n_bins - 1:
                mask = (predicted >= bin_edges[i]) & (predicted < bin_edges[i + 1])
            else:
                mask = (predicted >= bin_edges[i]) & (predicted <= bin_edges[i + 1])

            n_bin = mask.sum()
            if n_bin == 0:
                continue

            mean_preds.append(predicted[mask].mean())
            frac_pos.append(observed[mask].mean())
            counts.append(n_bin)

        return np.array(mean_preds), np.array(frac_pos), np.array(counts)

predicted: NDArray property

Array of stored predicted probabilities.

observed: NDArray property

Array of stored true indicators (always 1 for correct-class prob).

ece(n_bins: int = 10, strategy: str = 'uniform') -> float

Compute binned Expected Calibration Error.

Parameters:

Name Type Description Default
n_bins int

Number of bins.

10
strategy str

Binning strategy: "uniform" (equal-width) or "quantile" (equal-mass).

"uniform"

Returns:

Type Description
float

Weighted average of |mean_predicted - fraction_positive| across bins, weighted by bin count.

Source code in src/online_cp/metrics.py
def ece(self, n_bins: int = 10, strategy: str = "uniform") -> float:
    """Compute binned Expected Calibration Error.

    Parameters
    ----------
    n_bins : int, default 10
        Number of bins.
    strategy : str, default "uniform"
        Binning strategy: ``"uniform"`` (equal-width) or
        ``"quantile"`` (equal-mass).

    Returns
    -------
    float
        Weighted average of |mean_predicted - fraction_positive| across
        bins, weighted by bin count.
    """
    if not self._pairs:
        return 0.0

    predicted = self.predicted
    observed = self.observed

    if strategy == "uniform":
        bin_edges = np.linspace(0.0, 1.0, n_bins + 1)
    elif strategy == "quantile":
        quantiles = np.linspace(0.0, 1.0, n_bins + 1)
        bin_edges = np.quantile(predicted, quantiles)
        bin_edges[0] = 0.0
        bin_edges[-1] = 1.0
    else:
        raise ValueError(f"Unknown strategy {strategy!r}. Choose 'uniform' or 'quantile'.")

    ece_val = 0.0
    n_total = len(predicted)

    for i in range(n_bins):
        if i < n_bins - 1:
            mask = (predicted >= bin_edges[i]) & (predicted < bin_edges[i + 1])
        else:
            mask = (predicted >= bin_edges[i]) & (predicted <= bin_edges[i + 1])

        n_bin = mask.sum()
        if n_bin == 0:
            continue

        mean_pred = predicted[mask].mean()
        frac_pos = observed[mask].mean()
        ece_val += (n_bin / n_total) * abs(mean_pred - frac_pos)

    return float(ece_val)

bin_data(n_bins: int = 10, strategy: str = 'uniform') -> tuple[NDArray, NDArray, NDArray]

Return binned calibration data for plotting.

Parameters:

Name Type Description Default
n_bins int

Number of bins.

10
strategy str

Binning strategy: "uniform" or "quantile".

"uniform"

Returns:

Name Type Description
mean_predicted ndarray

Mean predicted probability per bin.

fraction_positive ndarray

Fraction of positive outcomes per bin.

bin_counts ndarray

Number of samples per bin.

Source code in src/online_cp/metrics.py
def bin_data(self, n_bins: int = 10, strategy: str = "uniform") -> tuple[NDArray, NDArray, NDArray]:
    """Return binned calibration data for plotting.

    Parameters
    ----------
    n_bins : int, default 10
        Number of bins.
    strategy : str, default "uniform"
        Binning strategy: ``"uniform"`` or ``"quantile"``.

    Returns
    -------
    mean_predicted : ndarray
        Mean predicted probability per bin.
    fraction_positive : ndarray
        Fraction of positive outcomes per bin.
    bin_counts : ndarray
        Number of samples per bin.
    """
    if not self._pairs:
        return np.array([]), np.array([]), np.array([])

    predicted = self.predicted
    observed = self.observed

    if strategy == "uniform":
        bin_edges = np.linspace(0.0, 1.0, n_bins + 1)
    elif strategy == "quantile":
        quantiles = np.linspace(0.0, 1.0, n_bins + 1)
        bin_edges = np.quantile(predicted, quantiles)
        bin_edges[0] = 0.0
        bin_edges[-1] = 1.0
    else:
        raise ValueError(f"Unknown strategy {strategy!r}.")

    mean_preds = []
    frac_pos = []
    counts = []

    for i in range(n_bins):
        if i < n_bins - 1:
            mask = (predicted >= bin_edges[i]) & (predicted < bin_edges[i + 1])
        else:
            mask = (predicted >= bin_edges[i]) & (predicted <= bin_edges[i + 1])

        n_bin = mask.sum()
        if n_bin == 0:
            continue

        mean_preds.append(predicted[mask].mean())
        frac_pos.append(observed[mask].mean())
        counts.append(n_bin)

    return np.array(mean_preds), np.array(frac_pos), np.array(counts)