Skip to content

Classifiers

online_cp.classifiers.ConformalNearestNeighboursClassifier

Bases: ConformalClassifier

Classifier using nearest neighbours as the nonconformity measure.

cp = ConformalNearestNeighboursClassifier(k=1, label_space=[-1, 1], rnd_state=1337, epsilon=0.1) Gamma, p_values = cp.predict(3, return_p_values=True) Gamma # predict both labels, as this is the first array([-1, 1]) [round(p_values[i], 4) for i in [-1, 1]][0.8781, 0.8781]

cp.learn_one(np.int64(3), 1)

Gamma, p_values = cp.predict(-2, return_p_values=True) Gamma # predict both labels, as this is the first array([-1, 1]) [round(p_values[i], 4) for i in [-1, 1]][0.1855, 0.1855]

Source code in src/online_cp/classifiers.py
class ConformalNearestNeighboursClassifier(ConformalClassifier):
    """
    Classifier using nearest neighbours as the nonconformity measure.

    >>> cp = ConformalNearestNeighboursClassifier(k=1, label_space=[-1, 1], rnd_state=1337, epsilon=0.1)
    >>> Gamma, p_values = cp.predict(3, return_p_values=True)
    >>> Gamma  # predict both labels, as this is the first
    array([-1,  1])
    >>> [round(p_values[i], 4) for i in [-1, 1]]
    [0.8781, 0.8781]

    >>> cp.learn_one(np.int64(3), 1)

    >>> Gamma, p_values = cp.predict(-2, return_p_values=True)
    >>> Gamma  # predict both labels, as this is the first
    array([-1,  1])
    >>> [round(p_values[i], 4) for i in [-1, 1]]
    [0.1855, 0.1855]
    """

    # TODO: implement: cp.learn_several([[3,1],[4,7],[5,2]], [1, -1, 1])

    # TODO Write tests

    def __init__(
        self,
        k=1,
        label_space=None,
        distance="euclidean",
        distance_func=None,
        aggregation="mean",
        verbose=0,
        rnd_state=None,
        n_jobs=None,
        epsilon=default_epsilon,
    ):
        super().__init__(epsilon=epsilon)
        self._label_space_fixed = label_space is not None
        self.label_space = np.asarray(label_space) if label_space is not None else None

        self.k = k

        if aggregation not in ("mean", "median"):
            raise ValueError(f"aggregation must be 'mean' or 'median', got '{aggregation}'")
        self.aggregation = aggregation

        self.distance = distance
        if distance_func is None:
            self.distance_func = self._standard_distance_func
        else:
            self.distance_func = distance_func
            self.distance = "custom"

        self.y = np.empty(0)
        self.X = None
        self.D = None
        self._label_indices = {}

        self.verbose = verbose
        self.rnd_gen = np.random.default_rng(rnd_state)

        self.n_jobs = n_jobs

    def reset(self):

        self.__init__(self.k, self.label_space)

    def _standard_distance_func(self, X, y=None):
        """
        By default we use scipy to compute distances
        """
        X = np.atleast_2d(X)
        if y is None:
            dists = squareform(pdist(X, metric=self.distance))
        else:
            y = np.atleast_2d(y)
            dists = cdist(X, y, metric=self.distance)
        return dists

    def learn_initial_training_set(self, X, y):
        if X.shape[0] > 0:
            self.X = X
            self.y = y
            self.D = self.distance_func(X)
            self._label_indices = self._build_label_indices(y)
            if self._label_space_fixed:
                unknown = set(np.unique(y)) - set(self.label_space)
                if unknown:
                    raise ValueError(
                        f"Labels {sorted(unknown)} not in declared label_space "
                        f"{self.label_space.tolist()}"
                    )
            elif self.label_space is None:
                self.label_space = np.unique(y)
            else:
                self.label_space = np.sort(
                    np.unique(np.concatenate([self.label_space, np.unique(y)]))
                )

    @staticmethod
    def update_distance_matrix(D, d):
        d = np.asarray(d).reshape(-1)
        n = D.shape[0]
        D_new = np.empty((n + 1, n + 1), dtype=np.result_type(D.dtype, d.dtype))
        D_new[:n, :n] = D
        D_new[:n, n] = d
        D_new[n, :n] = d
        D_new[n, n] = 0
        return D_new

    @staticmethod
    def _build_label_indices(y):
        return {label: np.flatnonzero(y == label) for label in np.unique(y)}

    @staticmethod
    def _extend_label_indices(label_indices, label, new_index):
        extended = label_indices.copy()
        if label in extended:
            extended[label] = np.concatenate((extended[label], np.array([new_index], dtype=int)))
        else:
            extended[label] = np.array([new_index], dtype=int)
        return extended

    def _find_nearest_distances(self, D, y=None, label_indices=None):
        """Vectorized nearest same/different class distances for any k.

        Aggregates the k nearest distances using self.aggregation ('mean' or 'median').
        This extends the 1-NN nonconformity measure of ALRW2 §2.3 to k-NN.
        """
        n = D.shape[0]
        k = self.k
        agg_func = np.mean if self.aggregation == "mean" else np.median
        same_label_distances = np.full(n, np.inf)
        different_label_distances = np.full(n, np.inf)

        if label_indices is None:
            if y is None:
                raise ValueError("Either y or label_indices must be provided")
            label_indices = self._build_label_indices(y)

        all_idx = np.arange(n)
        for idx in label_indices.values():
            not_mask = np.ones(n, dtype=bool)
            not_mask[idx] = False
            not_idx = all_idx[not_mask]

            # Same-class: for points of this label, k nearest same-label neighbors
            if len(idx) > 1:
                D_sub = D[np.ix_(idx, idx)].copy()
                np.fill_diagonal(D_sub, np.inf)
                m = len(idx) - 1  # available neighbors (excluding self)
                if m >= k:
                    same_label_distances[idx] = agg_func(np.partition(D_sub, k - 1, axis=1)[:, :k], axis=1)
                else:
                    # Fewer than k same-class neighbors: use all available
                    same_label_distances[idx] = agg_func(np.sort(D_sub, axis=1)[:, :m], axis=1)

            # Different-class: for points OF this label, k nearest among all other labels
            if len(idx) > 0 and len(not_idx) > 0:
                D_sub = D[np.ix_(idx, not_idx)]
                if len(not_idx) >= k:
                    different_label_distances[idx] = agg_func(np.partition(D_sub, k - 1, axis=1)[:, :k], axis=1)
                else:
                    different_label_distances[idx] = agg_func(D_sub, axis=1)

        return same_label_distances, different_label_distances

    def learn_one(self, x: NDArray[np.floating[Any]], y: Any, precomputed: NDArray[np.floating[Any]] | None = None, *, D: NDArray[np.floating[Any]] | None = None) -> None:
        if D is not None:
            warnings.warn(
                "The 'D' parameter is deprecated, use 'precomputed' instead",
                DeprecationWarning,
                stacklevel=2,
            )
            if precomputed is None:
                precomputed = D
        new_index = 0 if self.X is None else self.X.shape[0]

        # Enforce label-space policy
        if self._label_space_fixed:
            if y not in self.label_space:
                raise ValueError(
                    f"Label {y} not in declared label_space "
                    f"{self.label_space.tolist()}"
                )
        elif self.label_space is None:
            self.label_space = np.array([y])
        elif y not in self.label_space:
            self.label_space = np.sort(np.append(self.label_space, y))

        # Learn label y
        self.y = np.append(self.y, y)
        if y in self._label_indices:
            self._label_indices[y] = np.concatenate((self._label_indices[y], np.array([new_index], dtype=int)))
        else:
            self._label_indices[y] = np.array([new_index], dtype=int)

        # Learn object
        if self.X is None:
            self.X = x.reshape(1, -1)
            self.D = self.distance_func(self.X)
        else:
            if precomputed is None:
                d = self.distance_func(self.X, x)
                precomputed = self.update_distance_matrix(self.D, d)
            self.D = precomputed
            self.X = np.append(self.X, x.reshape(1, -1), axis=0)

    def compute_p_value(self, x: NDArray[np.floating[Any]], y: Any, return_update: bool = False) -> float | tuple[float, NDArray[np.floating[Any]] | None]:
        """Compute conformal p-value for a single (x, y) pair.

        Only tests the given label y (not the full label space),
        making this faster than predict() when only one p-value is needed.

        Parameters
        ----------
        x : array-like
            Test object.
        y : scalar
            Hypothesized label.
        return_update : bool
            If True, also return the updated distance matrix D.

        Returns
        -------
        p_value : float
            Smoothed conformal p-value for the hypothesis that x has label y.
        D : ndarray, optional
            Updated distance matrix (only if return_update=True).
        """
        tau = self.rnd_gen.uniform(0, 1)

        if self.y.shape[0] >= 1:
            d = self.distance_func(self.X, x)
            D = self.update_distance_matrix(self.D, d)
            label_indices = self._extend_label_indices(self._label_indices, y, D.shape[0] - 1)
            same_label_distances, different_label_distances = self._find_nearest_distances(D, label_indices=label_indices)
            Alpha = np.nan_to_num(same_label_distances / different_label_distances, nan=np.inf)
            p_value = self._compute_p_value(Alpha, tau, "nonconformity")
        else:
            D = None
            p_value = self._compute_p_value(np.array([np.inf]), tau, "nonconformity")

        if return_update:
            return p_value, D
        return p_value

    def predict(self, x: NDArray[np.floating[Any]], epsilon: float | NDArray[np.floating[Any]] | None = None, return_p_values: bool = False, return_update: bool = False, verbose: int = 0) -> ConformalPredictionSet | MultiLevelPredictionSet:
        p_values = {}
        tau = self.rnd_gen.uniform(0, 1)

        if epsilon is None:
            epsilon = self.epsilon

        if self.y.shape[0] >= 1:
            tic = time.time()
            d = self.distance_func(self.X, x)
            D = self.update_distance_matrix(self.D, d)
            time_update_D = time.time() - tic
            base_label_indices = self._label_indices
            test_index = D.shape[0] - 1

            tic = time.time()
            if self.n_jobs is not None:

                def process_label(label):
                    label_indices = self._extend_label_indices(base_label_indices, label, test_index)
                    same_label_distances, different_label_distances = self._find_nearest_distances(
                        D, label_indices=label_indices
                    )

                    Alpha = same_label_distances / different_label_distances
                    if verbose > 10:
                        print(f"Nonconformity scores for hypothesis y={label}: {Alpha}")
                        _, string = self._compute_p_value(Alpha, tau, "nonconformity", return_string=True)
                        print(f"p-value for hypothesis y={label}: {string}")

                    return label, self._compute_p_value(Alpha, tau, "nonconformity")

                results = Parallel(n_jobs=self.n_jobs)(delayed(process_label)(label) for label in self.label_space)
                p_values = dict(results)
            else:
                for label in self.label_space:
                    label_indices = self._extend_label_indices(base_label_indices, label, test_index)

                    same_label_distances, different_label_distances = self._find_nearest_distances(
                        D, label_indices=label_indices
                    )

                    Alpha = np.nan_to_num(same_label_distances / different_label_distances, nan=np.inf)

                    if verbose > 10:
                        print(f"Nonconformity scores for hypothesis y={label}: {Alpha}")
                        p_values[label], string = self._compute_p_value(Alpha, tau, "nonconformity", return_string=True)
                        print(f"p-value for hypothesis y={label}: {string}")

                    p_values[label] = self._compute_p_value(Alpha, tau, "nonconformity")
            time_compute_p_values = time.time() - tic

            tic = time.time()
            Gamma = self._compute_Gamma(p_values, epsilon)
            time_Gamma = time.time() - tic

            self.time_dict = {
                "Update distance matrix": time_update_D,
                "Compute p-values": time_compute_p_values,
                "Compute Gamma": time_Gamma,
            }

        else:
            for label in self.label_space:
                Alpha = np.array([np.inf])
                if verbose > 10:
                    print(f"Nonconformity scores for hypothesis y={label}: {Alpha}")
                    p_values[label], string = self._compute_p_value(Alpha, tau, "nonconformity", return_string=True)
                    print(f"p-value for hypothesis y={label}: {string}")
                p_values[label] = self._compute_p_value(Alpha, tau, "nonconformity")
            Gamma = self._compute_Gamma(p_values, epsilon)
            D = None
            self.time_dict = {}

        if return_update:
            if return_p_values:
                return Gamma, p_values, D
            else:
                return Gamma, D
        else:
            if return_p_values:
                return Gamma, p_values
            else:
                return Gamma

compute_p_value(x: NDArray[np.floating[Any]], y: Any, return_update: bool = False) -> float | tuple[float, NDArray[np.floating[Any]] | None]

Compute conformal p-value for a single (x, y) pair.

Only tests the given label y (not the full label space), making this faster than predict() when only one p-value is needed.

Parameters:

Name Type Description Default
x array - like

Test object.

required
y scalar

Hypothesized label.

required
return_update bool

If True, also return the updated distance matrix D.

False

Returns:

Name Type Description
p_value float

Smoothed conformal p-value for the hypothesis that x has label y.

D (ndarray, optional)

Updated distance matrix (only if return_update=True).

Source code in src/online_cp/classifiers.py
def compute_p_value(self, x: NDArray[np.floating[Any]], y: Any, return_update: bool = False) -> float | tuple[float, NDArray[np.floating[Any]] | None]:
    """Compute conformal p-value for a single (x, y) pair.

    Only tests the given label y (not the full label space),
    making this faster than predict() when only one p-value is needed.

    Parameters
    ----------
    x : array-like
        Test object.
    y : scalar
        Hypothesized label.
    return_update : bool
        If True, also return the updated distance matrix D.

    Returns
    -------
    p_value : float
        Smoothed conformal p-value for the hypothesis that x has label y.
    D : ndarray, optional
        Updated distance matrix (only if return_update=True).
    """
    tau = self.rnd_gen.uniform(0, 1)

    if self.y.shape[0] >= 1:
        d = self.distance_func(self.X, x)
        D = self.update_distance_matrix(self.D, d)
        label_indices = self._extend_label_indices(self._label_indices, y, D.shape[0] - 1)
        same_label_distances, different_label_distances = self._find_nearest_distances(D, label_indices=label_indices)
        Alpha = np.nan_to_num(same_label_distances / different_label_distances, nan=np.inf)
        p_value = self._compute_p_value(Alpha, tau, "nonconformity")
    else:
        D = None
        p_value = self._compute_p_value(np.array([np.inf]), tau, "nonconformity")

    if return_update:
        return p_value, D
    return p_value

online_cp.classifiers.ConformalSupportVectorMachine

Bases: ConformalClassifier

Conformal classifier using the Support Vector Machine with Lagrange multiplier nonconformity measure (ALRW Ch. 3).

For each candidate label, one-vs-rest binarization is applied and the SVM dual is solved on the augmented training set. The Lagrange multiplier alpha_i is the nonconformity score for example i: alpha_i = 0 means well inside the margin (conforming), alpha_i = C means on the margin boundary or misclassified (maximally nonconforming).

Supports multi-class classification via one-vs-rest decomposition. The Gram matrix is label-independent and reused across all candidate labels.

Parameters:

Name Type Description Default
kernel Kernel, callable, or str
  • An online_cp.kernels.Kernel instance (native).
  • A callable f(X, Y) -> (n, m) Gram matrix (sklearn-style).
  • A string: 'linear', 'rbf', 'poly'.
'rbf'
C float

Regularization parameter (upper bound on alpha_i). Default 1.0.

1.0
label_space array - like

The set of possible labels. Supports any number of classes. Default [-1, 1].

None
sigma float

Bandwidth for RBF kernel when kernel='rbf'. Default 1.0.

1.0
degree int

Degree for polynomial kernel when kernel='poly'. Default 3.

3
coef0 float

Constant for polynomial kernel. Default 1.0.

1.0
smo_tol float

Tolerance for SMO convergence. Default 1e-4.

0.001
smo_max_iter int

Maximum SMO iterations. Default 5000.

5000
epsilon float

Significance level. Default 0.1.

default_epsilon
rnd_state int or None

Random seed.

None

Examples:

>>> import numpy as np
>>> np.random.seed(42)
>>> X = np.vstack([np.random.normal(loc=-1, size=(20, 2)), np.random.normal(loc=1, size=(20, 2))])
>>> y = np.array([-1] * 20 + [1] * 20)
>>> svm = ConformalSupportVectorMachine(kernel="rbf", sigma=1.0, C=10.0)
>>> svm.learn_initial_training_set(X[:30], y[:30])
>>> Gamma = svm.predict(X[30])
>>> y[30] in Gamma
True
Source code in src/online_cp/classifiers.py
class ConformalSupportVectorMachine(ConformalClassifier):
    """
    Conformal classifier using the Support Vector Machine with Lagrange
    multiplier nonconformity measure (ALRW Ch. 3).

    For each candidate label, one-vs-rest binarization is applied and the
    SVM dual is solved on the augmented training set. The Lagrange multiplier
    alpha_i is the nonconformity score for example i: alpha_i = 0 means well
    inside the margin (conforming), alpha_i = C means on the margin boundary
    or misclassified (maximally nonconforming).

    Supports multi-class classification via one-vs-rest decomposition.
    The Gram matrix is label-independent and reused across all candidate labels.

    Parameters
    ----------
    kernel : Kernel, callable, or str
        - An online_cp.kernels.Kernel instance (native).
        - A callable f(X, Y) -> (n, m) Gram matrix (sklearn-style).
        - A string: 'linear', 'rbf', 'poly'.
    C : float
        Regularization parameter (upper bound on alpha_i). Default 1.0.
    label_space : array-like
        The set of possible labels. Supports any number of classes.
        Default [-1, 1].
    sigma : float
        Bandwidth for RBF kernel when kernel='rbf'. Default 1.0.
    degree : int
        Degree for polynomial kernel when kernel='poly'. Default 3.
    coef0 : float
        Constant for polynomial kernel. Default 1.0.
    smo_tol : float
        Tolerance for SMO convergence. Default 1e-4.
    smo_max_iter : int
        Maximum SMO iterations. Default 5000.
    epsilon : float
        Significance level. Default 0.1.
    rnd_state : int or None
        Random seed.

    Examples
    --------
    >>> import numpy as np
    >>> np.random.seed(42)
    >>> X = np.vstack([np.random.normal(loc=-1, size=(20, 2)), np.random.normal(loc=1, size=(20, 2))])
    >>> y = np.array([-1] * 20 + [1] * 20)
    >>> svm = ConformalSupportVectorMachine(kernel="rbf", sigma=1.0, C=10.0)
    >>> svm.learn_initial_training_set(X[:30], y[:30])
    >>> Gamma = svm.predict(X[30])
    >>> y[30] in Gamma
    True
    """

    def __init__(
        self,
        kernel="rbf",
        C=1.0,
        label_space=None,
        sigma=1.0,
        degree=3,
        coef0=1.0,
        smo_tol=1e-3,
        smo_max_iter=5000,
        epsilon=default_epsilon,
        rnd_state=None,
    ):
        super().__init__(epsilon=epsilon)
        self.C = C
        self._label_space_fixed = label_space is not None
        self.label_space = np.asarray(label_space) if label_space is not None else None
        self.sigma = sigma
        self.degree = degree
        self.coef0 = coef0
        self.smo_tol = smo_tol
        self.smo_max_iter = smo_max_iter
        self.rnd_gen = np.random.default_rng(rnd_state)

        self.X = None
        self.y = np.empty(0)
        self.K = None  # Cached Gram matrix

        # Resolve kernel
        self._kernel = self._resolve_kernel(kernel)

    def _resolve_kernel(self, kernel):
        """Resolve kernel specification into a callable with our interface."""
        try:
            from online_cp.kernels import GaussianKernel, Kernel, LinearKernel, PolynomialKernel
        except ModuleNotFoundError:
            from kernels import GaussianKernel, Kernel, LinearKernel, PolynomialKernel

        if isinstance(kernel, Kernel):
            return kernel
        elif isinstance(kernel, str):
            if kernel == "linear":
                return LinearKernel()
            elif kernel == "rbf":
                return GaussianKernel(sigma=self.sigma)
            elif kernel == "poly":
                return PolynomialKernel(d=self.degree, c=self.coef0)
            else:
                raise ValueError(f"Unknown kernel string: '{kernel}'. Use 'linear', 'rbf', or 'poly'.")
        elif callable(kernel):
            # Wrap sklearn-style callable: f(X, Y) -> matrix
            return _SklearnKernelAdapter(kernel)
        else:
            raise TypeError(f"kernel must be a Kernel instance, callable, or string, got {type(kernel)}")

    def _compute_gram(self, X):
        """Compute full Gram matrix."""
        return self._kernel(X)

    def _compute_kernel_row(self, X, x):
        """Compute kernel between all rows of X and a single point x."""
        return self._kernel(X, x).ravel()

    def learn_initial_training_set(self, X: NDArray[np.floating[Any]], y: NDArray[Any]) -> None:
        """Store training data and precompute Gram matrix."""
        if self._label_space_fixed:
            unknown = set(np.unique(y)) - set(self.label_space)
            if unknown:
                raise ValueError(
                    f"Labels {sorted(unknown)} not in declared label_space "
                    f"{self.label_space.tolist()}"
                )
        elif self.label_space is None:
            self.label_space = np.unique(y)
        else:
            self.label_space = np.sort(
                np.unique(np.concatenate([self.label_space, np.unique(y)]))
            )
        self.X = X.copy()
        self.y = y.copy().astype(float)
        self.K = self._compute_gram(X)

    def learn_one(self, x: NDArray[np.floating[Any]], y: Any) -> None:
        """Learn a new example, updating stored data and Gram matrix."""
        x = np.atleast_1d(x).ravel()

        # Enforce label-space policy
        if self._label_space_fixed:
            if y not in self.label_space:
                raise ValueError(
                    f"Label {y} not in declared label_space "
                    f"{self.label_space.tolist()}"
                )
        elif self.label_space is None:
            self.label_space = np.array([y])
        elif y not in self.label_space:
            self.label_space = np.sort(np.append(self.label_space, y))

        if self.X is None:
            self.X = x.reshape(1, -1)
            self.y = np.array([y], dtype=float)
            self.K = self._compute_gram(self.X)
        else:
            # Compute new kernel row
            k_row = self._compute_kernel_row(self.X, x)
            kappa = (
                self._kernel(x.reshape(1, -1)).item()
                if self._kernel(x.reshape(1, -1)).ndim > 0
                else self._kernel(x.reshape(1, -1))
            )
            # Extend Gram matrix
            n = self.K.shape[0]
            K_new = np.empty((n + 1, n + 1))
            K_new[:n, :n] = self.K
            K_new[:n, n] = k_row
            K_new[n, :n] = k_row
            K_new[n, n] = kappa
            self.K = K_new
            self.X = np.vstack([self.X, x.reshape(1, -1)])
            self.y = np.append(self.y, float(y))

    def predict(self, x: NDArray[np.floating[Any]], epsilon: float | NDArray[np.floating[Any]] | None = None, return_p_values: bool = False) -> ConformalPredictionSet | MultiLevelPredictionSet:
        """
        Predict the conformal prediction set for object x.

        For each candidate label, augment the training set with (x, label),
        solve the SVM dual, and use alpha_i as nonconformity scores.
        """
        if epsilon is None:
            epsilon = self.epsilon

        x = np.atleast_1d(x).ravel()
        tau = self.rnd_gen.uniform()
        p_values = {}

        if self.X is None or self.y.shape[0] == 0:
            # No training data — predict all labels
            for label in self.label_space:
                p_values[label] = 1.0
            Gamma = self._compute_Gamma(p_values, epsilon)
            if return_p_values:
                return Gamma, p_values
            return Gamma

        # Compute kernel row between training set and test point
        k_row = self._compute_kernel_row(self.X, x)
        kappa = self._kernel(x.reshape(1, -1))
        if np.ndim(kappa) > 0:
            kappa = kappa.item()

        # Build augmented Gram matrix (n+1 x n+1)
        n = self.K.shape[0]
        K_aug = np.empty((n + 1, n + 1))
        K_aug[:n, :n] = self.K
        K_aug[:n, n] = k_row
        K_aug[n, :n] = k_row
        K_aug[n, n] = kappa

        # For each candidate label, solve SVM and compute p-value
        for label in self.label_space:
            y_aug = np.append(self.y, float(label))

            # Binarize: one-vs-rest (label -> +1, everything else -> -1)
            y_binary = np.where(y_aug == label, 1.0, -1.0)

            alpha, _ = _smo_solve(K_aug, y_binary, self.C, tol=self.smo_tol, max_iter=self.smo_max_iter)

            # NCM = alpha_i (nonconformity: larger alpha = more nonconforming)
            # For multiclass (>2 labels), use only same-class alphas for the
            # p-value. The one-vs-rest binarization makes the Gram matrix Q
            # depend on the hypothesised label, so the NCM is only equivariant
            # to permutations within the positive class.  For binary problems
            # Q is invariant to the choice of reference label, so all alphas
            # are exchangeable and we use the full vector.
            if len(self.label_space) > 2:
                alpha = alpha[y_binary == 1.0]
            p_values[label] = self._compute_p_value(alpha, tau, "nonconformity")

        Gamma = self._compute_Gamma(p_values, epsilon)

        if return_p_values:
            return Gamma, p_values
        return Gamma

    def compute_p_value(self, x, y):
        """Compute the conformal p-value for (x, y) given current training set."""
        x = np.atleast_1d(x).ravel()
        tau = self.rnd_gen.uniform()

        if self.X is None or self.y.shape[0] == 0:
            return 1.0

        # Build augmented Gram matrix
        k_row = self._compute_kernel_row(self.X, x)
        kappa = self._kernel(x.reshape(1, -1))
        if np.ndim(kappa) > 0:
            kappa = kappa.item()

        n = self.K.shape[0]
        K_aug = np.empty((n + 1, n + 1))
        K_aug[:n, :n] = self.K
        K_aug[:n, n] = k_row
        K_aug[n, :n] = k_row
        K_aug[n, n] = kappa

        y_aug = np.append(self.y, float(y))

        # Binarize: one-vs-rest (label -> +1, everything else -> -1)
        y_binary = np.where(y_aug == y, 1.0, -1.0)

        alpha, _ = _smo_solve(K_aug, y_binary, self.C, tol=self.smo_tol, max_iter=self.smo_max_iter)

        # For multiclass, use only same-class alphas (see predict method comment)
        if len(self.label_space) > 2:
            alpha = alpha[y_binary == 1.0]
        return self._compute_p_value(alpha, tau, "nonconformity")

learn_initial_training_set(X: NDArray[np.floating[Any]], y: NDArray[Any]) -> None

Store training data and precompute Gram matrix.

Source code in src/online_cp/classifiers.py
def learn_initial_training_set(self, X: NDArray[np.floating[Any]], y: NDArray[Any]) -> None:
    """Store training data and precompute Gram matrix."""
    if self._label_space_fixed:
        unknown = set(np.unique(y)) - set(self.label_space)
        if unknown:
            raise ValueError(
                f"Labels {sorted(unknown)} not in declared label_space "
                f"{self.label_space.tolist()}"
            )
    elif self.label_space is None:
        self.label_space = np.unique(y)
    else:
        self.label_space = np.sort(
            np.unique(np.concatenate([self.label_space, np.unique(y)]))
        )
    self.X = X.copy()
    self.y = y.copy().astype(float)
    self.K = self._compute_gram(X)

learn_one(x: NDArray[np.floating[Any]], y: Any) -> None

Learn a new example, updating stored data and Gram matrix.

Source code in src/online_cp/classifiers.py
def learn_one(self, x: NDArray[np.floating[Any]], y: Any) -> None:
    """Learn a new example, updating stored data and Gram matrix."""
    x = np.atleast_1d(x).ravel()

    # Enforce label-space policy
    if self._label_space_fixed:
        if y not in self.label_space:
            raise ValueError(
                f"Label {y} not in declared label_space "
                f"{self.label_space.tolist()}"
            )
    elif self.label_space is None:
        self.label_space = np.array([y])
    elif y not in self.label_space:
        self.label_space = np.sort(np.append(self.label_space, y))

    if self.X is None:
        self.X = x.reshape(1, -1)
        self.y = np.array([y], dtype=float)
        self.K = self._compute_gram(self.X)
    else:
        # Compute new kernel row
        k_row = self._compute_kernel_row(self.X, x)
        kappa = (
            self._kernel(x.reshape(1, -1)).item()
            if self._kernel(x.reshape(1, -1)).ndim > 0
            else self._kernel(x.reshape(1, -1))
        )
        # Extend Gram matrix
        n = self.K.shape[0]
        K_new = np.empty((n + 1, n + 1))
        K_new[:n, :n] = self.K
        K_new[:n, n] = k_row
        K_new[n, :n] = k_row
        K_new[n, n] = kappa
        self.K = K_new
        self.X = np.vstack([self.X, x.reshape(1, -1)])
        self.y = np.append(self.y, float(y))

predict(x: NDArray[np.floating[Any]], epsilon: float | NDArray[np.floating[Any]] | None = None, return_p_values: bool = False) -> ConformalPredictionSet | MultiLevelPredictionSet

Predict the conformal prediction set for object x.

For each candidate label, augment the training set with (x, label), solve the SVM dual, and use alpha_i as nonconformity scores.

Source code in src/online_cp/classifiers.py
def predict(self, x: NDArray[np.floating[Any]], epsilon: float | NDArray[np.floating[Any]] | None = None, return_p_values: bool = False) -> ConformalPredictionSet | MultiLevelPredictionSet:
    """
    Predict the conformal prediction set for object x.

    For each candidate label, augment the training set with (x, label),
    solve the SVM dual, and use alpha_i as nonconformity scores.
    """
    if epsilon is None:
        epsilon = self.epsilon

    x = np.atleast_1d(x).ravel()
    tau = self.rnd_gen.uniform()
    p_values = {}

    if self.X is None or self.y.shape[0] == 0:
        # No training data — predict all labels
        for label in self.label_space:
            p_values[label] = 1.0
        Gamma = self._compute_Gamma(p_values, epsilon)
        if return_p_values:
            return Gamma, p_values
        return Gamma

    # Compute kernel row between training set and test point
    k_row = self._compute_kernel_row(self.X, x)
    kappa = self._kernel(x.reshape(1, -1))
    if np.ndim(kappa) > 0:
        kappa = kappa.item()

    # Build augmented Gram matrix (n+1 x n+1)
    n = self.K.shape[0]
    K_aug = np.empty((n + 1, n + 1))
    K_aug[:n, :n] = self.K
    K_aug[:n, n] = k_row
    K_aug[n, :n] = k_row
    K_aug[n, n] = kappa

    # For each candidate label, solve SVM and compute p-value
    for label in self.label_space:
        y_aug = np.append(self.y, float(label))

        # Binarize: one-vs-rest (label -> +1, everything else -> -1)
        y_binary = np.where(y_aug == label, 1.0, -1.0)

        alpha, _ = _smo_solve(K_aug, y_binary, self.C, tol=self.smo_tol, max_iter=self.smo_max_iter)

        # NCM = alpha_i (nonconformity: larger alpha = more nonconforming)
        # For multiclass (>2 labels), use only same-class alphas for the
        # p-value. The one-vs-rest binarization makes the Gram matrix Q
        # depend on the hypothesised label, so the NCM is only equivariant
        # to permutations within the positive class.  For binary problems
        # Q is invariant to the choice of reference label, so all alphas
        # are exchangeable and we use the full vector.
        if len(self.label_space) > 2:
            alpha = alpha[y_binary == 1.0]
        p_values[label] = self._compute_p_value(alpha, tau, "nonconformity")

    Gamma = self._compute_Gamma(p_values, epsilon)

    if return_p_values:
        return Gamma, p_values
    return Gamma

compute_p_value(x, y)

Compute the conformal p-value for (x, y) given current training set.

Source code in src/online_cp/classifiers.py
def compute_p_value(self, x, y):
    """Compute the conformal p-value for (x, y) given current training set."""
    x = np.atleast_1d(x).ravel()
    tau = self.rnd_gen.uniform()

    if self.X is None or self.y.shape[0] == 0:
        return 1.0

    # Build augmented Gram matrix
    k_row = self._compute_kernel_row(self.X, x)
    kappa = self._kernel(x.reshape(1, -1))
    if np.ndim(kappa) > 0:
        kappa = kappa.item()

    n = self.K.shape[0]
    K_aug = np.empty((n + 1, n + 1))
    K_aug[:n, :n] = self.K
    K_aug[:n, n] = k_row
    K_aug[n, :n] = k_row
    K_aug[n, n] = kappa

    y_aug = np.append(self.y, float(y))

    # Binarize: one-vs-rest (label -> +1, everything else -> -1)
    y_binary = np.where(y_aug == y, 1.0, -1.0)

    alpha, _ = _smo_solve(K_aug, y_binary, self.C, tol=self.smo_tol, max_iter=self.smo_max_iter)

    # For multiclass, use only same-class alphas (see predict method comment)
    if len(self.label_space) > 2:
        alpha = alpha[y_binary == 1.0]
    return self._compute_p_value(alpha, tau, "nonconformity")