Skip to content

Conformal Predictive Systems

online_cp.CPS.RidgePredictionMachine

Bases: ConformalPredictiveSystem

Conformal predictive system based on ridge regression.

Uses studentised residuals as the conformity measure.

Parameters:

Name Type Description Default
a float

Ridge regularisation parameter (default 0).

0
warnings bool

Whether to raise warnings on rank-deficient matrices (default True).

True
autotune bool

Whether to automatically tune the ridge parameter via GCV (default False).

False
verbose int

Verbosity level (default 0).

0
rnd_state int or None

Random seed for reproducibility.

None
epsilon float

Default significance level (default 0.1).

default_epsilon
Source code in src/online_cp/CPS.py
class RidgePredictionMachine(ConformalPredictiveSystem):
    """Conformal predictive system based on ridge regression.

    Uses studentised residuals as the conformity measure.

    Parameters
    ----------
    a : float, optional
        Ridge regularisation parameter (default 0).
    warnings : bool, optional
        Whether to raise warnings on rank-deficient matrices (default True).
    autotune : bool, optional
        Whether to automatically tune the ridge parameter via GCV (default False).
    verbose : int, optional
        Verbosity level (default 0).
    rnd_state : int or None, optional
        Random seed for reproducibility.
    epsilon : float, optional
        Default significance level (default 0.1).
    """

    def __init__(self, a=0, warnings=True, autotune=False, verbose=0, rnd_state=None, epsilon=default_epsilon):
        super().__init__(epsilon=epsilon)
        self.a = a
        self.X = None
        self.y = None
        self.p = None
        self.Id = None
        self.XTXinv = None

        # Should we raise warnings
        self.warnings = warnings
        # Do we autotune ridge prarmeter on warning
        self.autotune = autotune

        self.verbose = verbose
        self.rnd_gen = np.random.default_rng(rnd_state)

    def learn_initial_training_set(self, X: NDArray[np.floating[Any]], y: NDArray[np.floating[Any]]) -> None:
        self.X = X
        self.y = y
        self.p = X.shape[1]
        self.Id = np.identity(self.p)
        if self.autotune:
            self._tune_ridge_parameter()
        else:
            self.XTXinv = np.linalg.inv(self.X.T @ self.X + self.a * self.Id)

    def learn_one(self, x: NDArray[np.floating[Any]], y: float, precomputed: dict[str, Any] | None = None) -> None:
        """
        Learn a single example. If we have already computed X and XTXinv, use them for update. Then the last row of X is the object with label y.
        >>> cps = RidgePredictionMachine()
        >>> cps.learn_one(np.array([1, 0]), 1)
        >>> cps.X
        array([[1, 0]])
        >>> cps.y
        array([1])
        """
        # Learn label y
        if self.y is None:
            self.y = np.array([y])
        else:
            if hasattr(self, "h"):
                self.y = np.append(self.y, y.reshape(1, self.h), axis=0)
            else:
                self.y = np.append(self.y, y)

        if precomputed is not None:
            X = precomputed["X"]
            XTXinv = precomputed["XTXinv"]

            if X is not None:
                self.X = X
                self.p = self.X.shape[1]
                self.Id = np.identity(self.p)

            if XTXinv is not None:
                self.XTXinv = XTXinv

            else:
                if self.X.shape[0] == 1:
                    self.XTXinv = np.linalg.inv(self.X.T @ self.X + self.a * self.Id)
                else:
                    # Update XTX_inv (inverse of Kernel matrix plus regularisation) Use the Sherman-Morrison formula to update the hat matrix
                    # https://en.wikipedia.org/wiki/Sherman%E2%80%93Morrison_formula
                    self.XTXinv -= (self.XTXinv @ np.outer(x, x) @ self.XTXinv) / (1 + x.T @ self.XTXinv @ x)

                    # Check the rank
                    if self.warnings:
                        self.check_matrix_rank(self.XTXinv)

        else:
            # Learn object x
            if self.X is None:
                self.X = x.reshape(1, -1)
                self.p = self.X.shape[1]
                self.Id = np.identity(self.p)
            elif self.X.shape[0] == 1:
                self.X = np.append(self.X, x.reshape(1, -1), axis=0)
                self.XTXinv = np.linalg.inv(self.X.T @ self.X + self.a * self.Id)
            else:
                self.X = np.append(self.X, x.reshape(1, -1), axis=0)
                # Update XTX_inv (inverse of Kernel matrix plus regularisation) Use the Sherman-Morrison formula to update the hat matrix
                # https://en.wikipedia.org/wiki/Sherman%E2%80%93Morrison_formula
                self.XTXinv -= (self.XTXinv @ np.outer(x, x) @ self.XTXinv) / (1 + x.T @ self.XTXinv @ x)

                # Check the rank
                if self.warnings:
                    self.check_matrix_rank(self.XTXinv)

    def change_ridge_parameter(self, a):
        """
        Change the ridge parameter
        >>> cps = RidgePredictionMachine()
        >>> cps.learn_one(np.array([1, 0]), 1)
        >>> cps.change_ridge_parameter(1)
        >>> cps.a
        1
        """
        self.a = a
        if self.X is not None:
            self.XTXinv = np.linalg.inv(self.X.T @ self.X + self.a * self.Id)

    def check_matrix_rank(self, M):
        """
        Check if a matrix has full rank <==> is invertible
        Returns False if matrix is rank deficient
        NOTE In numerical linear algebra it is a bit more subtle. The condition number can tell us more.

        >>> cps = RidgePredictionMachine(warnings=False)
        >>> cps.check_matrix_rank(np.array([[1, 0], [1, 0]]))
        False
        >>> cps.check_matrix_rank(np.array([[1, 0], [0, 1]]))
        True
        """
        if np.linalg.matrix_rank(M) < M.shape[0]:
            if self.warnings:
                warnings.warn(
                    f"The matrix X is rank deficient. Condition number: {np.linalg.cond(M)}. Consider changing the ridge parameter",
                    stacklevel=2,
                )
            return False
        else:
            return True

    def _tune_ridge_parameter(self, a0=None):
        """
        Tune ridge parameter with Generalized cross validation https://pages.stat.wisc.edu/~wahba/stat860public/pdf1/golub.heath.wahba.pdf
        """
        XTX = self.X.T @ self.X
        n = self.X.shape[0]
        In = np.identity(n)

        def GCV(a):
            try:
                A = self.X @ np.linalg.inv(XTX + a * self.Id) @ self.X.T
                max_diag_H = np.max(np.diag(A))  # Maximum diagonal element of the hat matrix
                if max_diag_H > 1:
                    return np.inf
                return (1 / n) * np.linalg.norm((In - A) @ self.y) ** 2 / ((1 / n) * np.trace(In - A)) ** 2
            except (np.linalg.LinAlgError, ZeroDivisionError):
                return np.inf

        # Initial guess
        if a0 is None:
            a0 = 1e-6  # Just a small pertubation to avoid numerical issues

        # Bounds to ensure a >= 0
        res = minimize(
            GCV, x0=a0, bounds=Bounds(lb=1e-6, keep_feasible=True)
        )  # May be relevant to pass some arguments here, or even use another minimizer.
        a = res.x[0]

        if self.verbose > 0:
            print(f"New ridge parameter: {a}")
        self.change_ridge_parameter(a)

    def predict_cpd(self, x, return_update=False, save_time=False):
        def build_precomputed(X, XTXinv, C):
            computed = {
                "X": X,  # The updated matrix of objects
                "XTXinv": XTXinv,  # The updated kernel matrix
                "C": C,
            }
            return computed

        tic = time.time()
        # Add row to X matrix
        X = np.append(self.X, x.reshape(1, -1), axis=0)
        n = X.shape[0]
        y = self.y

        tic = time.time()
        # Update XTX_inv (inverse of Kernel matrix plus regularisation) Use the Sherman-Morrison formula to update the hat matrix
        # https://en.wikipedia.org/wiki/Sherman%E2%80%93Morrison_formula

        XTXinv = self.XTXinv - (self.XTXinv @ np.outer(x, x) @ self.XTXinv) / (1 + x.T @ self.XTXinv @ x)
        toc_update_XTXinv = time.time() - tic

        tic = time.time()

        # Efficient computation avoiding full O(n²d) hat matrix.
        # Only compute the diagonal, last row, and H[:-1,:-1]@y in O(nd²).
        XTXinv_x = XTXinv @ x  # (d,)   — O(d²)
        h = np.sum((X @ XTXinv) * X, axis=1)  # diag(H) — O(nd²)
        h_last_row = X[:-1] @ XTXinv_x  # H[-1, :-1] — O(nd)
        Hy = X[:-1] @ (XTXinv @ (X[:-1].T @ y))  # H[:-1,:-1] @ y — O(nd)

        sqrt_one_minus_h = np.sqrt(1 - h[:-1])
        A = np.dot(h_last_row, y) / np.sqrt(1 - h[-1]) + (y - Hy) / sqrt_one_minus_h
        B = np.sqrt(1 - h[-1]) * np.ones(n - 1) + h_last_row / sqrt_one_minus_h
        C = np.zeros(n + 1)
        C[1:-1] = A / B
        C[0] = -np.inf
        C[-1] = np.inf
        C.sort()

        toc_compute_C = time.time() - tic

        time_dict = {"Update hat matrix": toc_update_XTXinv, "Compute C": toc_compute_C}
        time_dict = time_dict if save_time else None
        cpd = RidgePredictiveDistributionFunction(C=C, time_dict=time_dict, epsilon=self.epsilon)

        if return_update:
            return cpd, build_precomputed(X, XTXinv, C)
        else:
            return cpd

learn_one(x: NDArray[np.floating[Any]], y: float, precomputed: dict[str, Any] | None = None) -> None

Learn a single example. If we have already computed X and XTXinv, use them for update. Then the last row of X is the object with label y.

cps = RidgePredictionMachine() cps.learn_one(np.array([1, 0]), 1) cps.X array([[1, 0]]) cps.y array([1])

Source code in src/online_cp/CPS.py
def learn_one(self, x: NDArray[np.floating[Any]], y: float, precomputed: dict[str, Any] | None = None) -> None:
    """
    Learn a single example. If we have already computed X and XTXinv, use them for update. Then the last row of X is the object with label y.
    >>> cps = RidgePredictionMachine()
    >>> cps.learn_one(np.array([1, 0]), 1)
    >>> cps.X
    array([[1, 0]])
    >>> cps.y
    array([1])
    """
    # Learn label y
    if self.y is None:
        self.y = np.array([y])
    else:
        if hasattr(self, "h"):
            self.y = np.append(self.y, y.reshape(1, self.h), axis=0)
        else:
            self.y = np.append(self.y, y)

    if precomputed is not None:
        X = precomputed["X"]
        XTXinv = precomputed["XTXinv"]

        if X is not None:
            self.X = X
            self.p = self.X.shape[1]
            self.Id = np.identity(self.p)

        if XTXinv is not None:
            self.XTXinv = XTXinv

        else:
            if self.X.shape[0] == 1:
                self.XTXinv = np.linalg.inv(self.X.T @ self.X + self.a * self.Id)
            else:
                # Update XTX_inv (inverse of Kernel matrix plus regularisation) Use the Sherman-Morrison formula to update the hat matrix
                # https://en.wikipedia.org/wiki/Sherman%E2%80%93Morrison_formula
                self.XTXinv -= (self.XTXinv @ np.outer(x, x) @ self.XTXinv) / (1 + x.T @ self.XTXinv @ x)

                # Check the rank
                if self.warnings:
                    self.check_matrix_rank(self.XTXinv)

    else:
        # Learn object x
        if self.X is None:
            self.X = x.reshape(1, -1)
            self.p = self.X.shape[1]
            self.Id = np.identity(self.p)
        elif self.X.shape[0] == 1:
            self.X = np.append(self.X, x.reshape(1, -1), axis=0)
            self.XTXinv = np.linalg.inv(self.X.T @ self.X + self.a * self.Id)
        else:
            self.X = np.append(self.X, x.reshape(1, -1), axis=0)
            # Update XTX_inv (inverse of Kernel matrix plus regularisation) Use the Sherman-Morrison formula to update the hat matrix
            # https://en.wikipedia.org/wiki/Sherman%E2%80%93Morrison_formula
            self.XTXinv -= (self.XTXinv @ np.outer(x, x) @ self.XTXinv) / (1 + x.T @ self.XTXinv @ x)

            # Check the rank
            if self.warnings:
                self.check_matrix_rank(self.XTXinv)

change_ridge_parameter(a)

Change the ridge parameter

cps = RidgePredictionMachine() cps.learn_one(np.array([1, 0]), 1) cps.change_ridge_parameter(1) cps.a 1

Source code in src/online_cp/CPS.py
def change_ridge_parameter(self, a):
    """
    Change the ridge parameter
    >>> cps = RidgePredictionMachine()
    >>> cps.learn_one(np.array([1, 0]), 1)
    >>> cps.change_ridge_parameter(1)
    >>> cps.a
    1
    """
    self.a = a
    if self.X is not None:
        self.XTXinv = np.linalg.inv(self.X.T @ self.X + self.a * self.Id)

check_matrix_rank(M)

Check if a matrix has full rank <==> is invertible Returns False if matrix is rank deficient NOTE In numerical linear algebra it is a bit more subtle. The condition number can tell us more.

cps = RidgePredictionMachine(warnings=False) cps.check_matrix_rank(np.array([[1, 0], [1, 0]])) False cps.check_matrix_rank(np.array([[1, 0], [0, 1]])) True

Source code in src/online_cp/CPS.py
def check_matrix_rank(self, M):
    """
    Check if a matrix has full rank <==> is invertible
    Returns False if matrix is rank deficient
    NOTE In numerical linear algebra it is a bit more subtle. The condition number can tell us more.

    >>> cps = RidgePredictionMachine(warnings=False)
    >>> cps.check_matrix_rank(np.array([[1, 0], [1, 0]]))
    False
    >>> cps.check_matrix_rank(np.array([[1, 0], [0, 1]]))
    True
    """
    if np.linalg.matrix_rank(M) < M.shape[0]:
        if self.warnings:
            warnings.warn(
                f"The matrix X is rank deficient. Condition number: {np.linalg.cond(M)}. Consider changing the ridge parameter",
                stacklevel=2,
            )
        return False
    else:
        return True

online_cp.CPS.KernelRidgePredictionMachine

Bases: ConformalPredictiveSystem

This conformal predictive system uses the "studentised residuals" as conformity measure. Algorithm 7.3 in Algorithmic Learning in a Random World (2nd edition).

Source code in src/online_cp/CPS.py
class KernelRidgePredictionMachine(ConformalPredictiveSystem):
    """
    This conformal predictive system uses the "studentised residuals" as conformity measure.
    Algorithm 7.3 in Algorithmic Learning in a Random World (2nd edition).
    """

    def __init__(self, kernel, a=0, warnings=True, autotune=False, verbose=0, rnd_state=None, epsilon=default_epsilon):
        super().__init__(epsilon=epsilon)

        self.kernel = kernel

        self.a = a
        self.X = None
        self.y = None

        # Should we raise warnings
        self.warnings = warnings
        # Do we autotune ridge prarmeter on warning
        self.autotune = autotune

        self.verbose = verbose
        self.rnd_gen = np.random.default_rng(rnd_state)

    def learn_initial_training_set(self, X: NDArray[np.floating[Any]], y: NDArray[np.floating[Any]]) -> None:
        self.X = X
        self.y = y
        Id = np.identity(self.X.shape[0])
        self.K = self.kernel(self.X)
        if self.autotune:
            self._tune_ridge_parameter()
        else:
            self.Kinv = np.linalg.inv(self.K + self.a * Id)
        H = self.K @ self.Kinv
        self.h_diag = H.diagonal().copy()
        self.Hy = H @ y

    def _tune_ridge_parameter(self, a0=None):
        """
        Tune ridge parameter with Generalized Cross Validation (GCV) in the kernel space.
        """
        n = self.K.shape[0]
        In = np.identity(n)

        def GCV(a):
            try:
                A = self.K @ np.linalg.inv(self.K + a * In)
                max_diag_H = np.max(np.diag(A))  # Maximum diagonal element of the hat matrix
                if max_diag_H > 1:
                    return np.inf
                return (1 / n) * np.linalg.norm((In - A) @ self.y) ** 2 / ((1 / n) * np.trace(In - A)) ** 2
            except (np.linalg.LinAlgError, ZeroDivisionError):
                return np.inf

        # Initial guess
        if a0 is None:
            a0 = 1e-6  # Small perturbation to avoid numerical issues

        # Bounds to ensure a >= 0
        res = minimize(GCV, x0=a0, bounds=Bounds(lb=1e-6, keep_feasible=True))
        a = res.x[0]

        if self.verbose > 0:
            print(f"New ridge parameter: {a}")
        self.change_ridge_parameter(a)

    def change_ridge_parameter(self, a):
        """
        Change the ridge parameter
        >>> cps = RidgePredictionMachine()
        >>> cps.learn_one(np.array([1, 0]), 1)
        >>> cps.change_ridge_parameter(1)
        >>> cps.a
        1
        """
        self.a = a
        if self.X is not None:
            Id = np.identity(self.X.shape[0])

            self.K = self.kernel(self.X)
            self.Kinv = np.linalg.inv(self.K + self.a * Id)
            H = self.K @ self.Kinv
            self.h_diag = H.diagonal().copy()
            self.Hy = H @ self.y

    def _update_Kinv(self, Kinv, k, d):
        return np.block([[Kinv + d * Kinv @ k @ k.T @ Kinv, -d * Kinv @ k], [-d * k.T @ Kinv, d]])

    @staticmethod
    def _update_K(K, k, kappa):
        return np.block([[K, k], [k.T, kappa]])

    def learn_one(self, x: NDArray[np.floating[Any]], y: float, precomputed: dict[str, Any] | None = None) -> None:
        """
        Learn a single example
        """
        x = np.atleast_2d(x)
        # Learn label y
        if self.y is None:
            self.y = np.array([y])
        else:
            self.y = np.append(self.y, y)

        if precomputed is not None:
            # Incremental update of h_diag and Hy from precomputed intermediates
            v = precomputed["v"]
            d_val = precomputed["d"]
            k = precomputed["k"]
            kappa = precomputed["kappa"]
            a_d = self.a * d_val

            # h_diag_new[:-1] = h_diag_old - a_d * v^2
            # h_diag_new[-1] = 1 - a_d  (= d*kappa - d*k^T v = d*(kappa - k^T Kinv k))
            v_flat = v.ravel()
            new_h_last = (d_val * kappa - d_val * (k.T @ v)).item()
            self.h_diag = np.append(self.h_diag - a_d * v_flat**2, new_h_last)

            # Hy_new[:-1] = Hy_old - a_d * v * (v^T @ y_old) + a_d * v * y_new
            #             = Hy_old + a_d * v * (y_new - v^T @ y_old)
            # Hy_new[-1]  = a_d * (v^T @ y_old) + new_h_last * y_new
            # But y_new is the label we just appended: self.y[-1] = y
            y_old = self.y[:-1]
            vTy_old = float(v_flat @ y_old)
            self.Hy = np.append(self.Hy + a_d * v_flat * (y - vTy_old), a_d * vTy_old + new_h_last * y)

            self.K = self._update_K(self.K, k, kappa)
            self.Kinv = self._update_Kinv(self.Kinv, k, d_val)
            self.X = np.append(self.X, x.reshape(1, -1), axis=0)
        else:
            if self.X is None:
                self.X = x.reshape(1, -1)
                Id = np.identity(self.X.shape[0])
                self.K = self.kernel(self.X)
                self.Kinv = np.linalg.inv(self.K + self.a * Id)
                H = self.K @ self.Kinv
                self.h_diag = H.diagonal().copy()
                self.Hy = H @ self.y
            else:
                k = self.kernel(self.X, x).reshape(-1, 1)
                kappa = self.kernel(x, x)
                d_val = (1 / (kappa + self.a - k.T @ self.Kinv @ k)).item()
                a_d = self.a * d_val

                # Compute v = Kinv @ k
                v = self.Kinv @ k  # (n, 1)
                v_flat = v.ravel()

                # Incremental update of h_diag and Hy
                new_h_last = (d_val * kappa - d_val * (k.T @ v)).item()
                self.h_diag = np.append(self.h_diag - a_d * v_flat**2, new_h_last)

                y_old = self.y[:-1]
                vTy_old = float(v_flat @ y_old)
                self.Hy = np.append(self.Hy + a_d * v_flat * (y - vTy_old), a_d * vTy_old + new_h_last * y)

                self.K = self._update_K(self.K, k, kappa)
                self.Kinv = self._update_Kinv(self.Kinv, k, d_val)
                self.X = np.append(self.X, x.reshape(1, -1), axis=0)

    def predict_cpd(self, x, return_update=False, save_time=False):

        def build_precomputed(v, d_val, k, kappa):
            computed = {
                "v": v,
                "d": d_val,
                "k": k,
                "kappa": kappa,
            }
            return computed

        x = np.atleast_2d(x)
        # Temporarily update kernel matrix
        k = self.kernel(self.X, x).reshape(-1, 1)
        kappa = self.kernel(x, x)
        d_val = (1 / (kappa + self.a - k.T @ self.Kinv @ k)).item()
        a_d = self.a * d_val
        y = self.y

        # Compute v = Kinv @ k (the key intermediate)
        v = self.Kinv @ k  # (n, 1)
        v_flat = v.ravel()

        # Efficient O(n) computation — avoid forming full (n+1)×(n+1) hat matrix.
        # H_new diagonal: h_diag_new[:-1] = h_diag_old - a_d * v^2, h_new[-1] = d*kappa - d*k^T*v
        h_train = self.h_diag - a_d * v_flat**2
        h_last = (d_val * kappa - d_val * (k.T @ v)).item()

        # H_new last row (= last col by symmetry): H[-1, :-1] = a_d * v^T
        h_last_row = a_d * v_flat

        # H_new[:-1,:-1] @ y = Hy_old - a_d * v * (v^T @ y)
        Hy_train = self.Hy - a_d * v_flat * float(v_flat @ y)

        n = len(y) + 1  # augmented size

        sqrt_one_minus_h = np.sqrt(1 - h_train)
        A = np.dot(h_last_row, y) / np.sqrt(1 - h_last) + (y - Hy_train) / sqrt_one_minus_h
        B = np.sqrt(1 - h_last) * np.ones(n - 1) + h_last_row / sqrt_one_minus_h

        C = np.zeros(n + 1)
        C[1:-1] = A / B
        C[0] = -np.inf
        C[-1] = np.inf
        assert not np.isnan(C).any(), "C contains NaN values"
        C.sort()

        time_dict = None
        cpd = RidgePredictiveDistributionFunction(C=C, time_dict=time_dict, epsilon=self.epsilon)

        if return_update:
            return cpd, build_precomputed(v, d_val, k, kappa)
        else:
            return cpd

change_ridge_parameter(a)

Change the ridge parameter

cps = RidgePredictionMachine() cps.learn_one(np.array([1, 0]), 1) cps.change_ridge_parameter(1) cps.a 1

Source code in src/online_cp/CPS.py
def change_ridge_parameter(self, a):
    """
    Change the ridge parameter
    >>> cps = RidgePredictionMachine()
    >>> cps.learn_one(np.array([1, 0]), 1)
    >>> cps.change_ridge_parameter(1)
    >>> cps.a
    1
    """
    self.a = a
    if self.X is not None:
        Id = np.identity(self.X.shape[0])

        self.K = self.kernel(self.X)
        self.Kinv = np.linalg.inv(self.K + self.a * Id)
        H = self.K @ self.Kinv
        self.h_diag = H.diagonal().copy()
        self.Hy = H @ self.y

learn_one(x: NDArray[np.floating[Any]], y: float, precomputed: dict[str, Any] | None = None) -> None

Learn a single example

Source code in src/online_cp/CPS.py
def learn_one(self, x: NDArray[np.floating[Any]], y: float, precomputed: dict[str, Any] | None = None) -> None:
    """
    Learn a single example
    """
    x = np.atleast_2d(x)
    # Learn label y
    if self.y is None:
        self.y = np.array([y])
    else:
        self.y = np.append(self.y, y)

    if precomputed is not None:
        # Incremental update of h_diag and Hy from precomputed intermediates
        v = precomputed["v"]
        d_val = precomputed["d"]
        k = precomputed["k"]
        kappa = precomputed["kappa"]
        a_d = self.a * d_val

        # h_diag_new[:-1] = h_diag_old - a_d * v^2
        # h_diag_new[-1] = 1 - a_d  (= d*kappa - d*k^T v = d*(kappa - k^T Kinv k))
        v_flat = v.ravel()
        new_h_last = (d_val * kappa - d_val * (k.T @ v)).item()
        self.h_diag = np.append(self.h_diag - a_d * v_flat**2, new_h_last)

        # Hy_new[:-1] = Hy_old - a_d * v * (v^T @ y_old) + a_d * v * y_new
        #             = Hy_old + a_d * v * (y_new - v^T @ y_old)
        # Hy_new[-1]  = a_d * (v^T @ y_old) + new_h_last * y_new
        # But y_new is the label we just appended: self.y[-1] = y
        y_old = self.y[:-1]
        vTy_old = float(v_flat @ y_old)
        self.Hy = np.append(self.Hy + a_d * v_flat * (y - vTy_old), a_d * vTy_old + new_h_last * y)

        self.K = self._update_K(self.K, k, kappa)
        self.Kinv = self._update_Kinv(self.Kinv, k, d_val)
        self.X = np.append(self.X, x.reshape(1, -1), axis=0)
    else:
        if self.X is None:
            self.X = x.reshape(1, -1)
            Id = np.identity(self.X.shape[0])
            self.K = self.kernel(self.X)
            self.Kinv = np.linalg.inv(self.K + self.a * Id)
            H = self.K @ self.Kinv
            self.h_diag = H.diagonal().copy()
            self.Hy = H @ self.y
        else:
            k = self.kernel(self.X, x).reshape(-1, 1)
            kappa = self.kernel(x, x)
            d_val = (1 / (kappa + self.a - k.T @ self.Kinv @ k)).item()
            a_d = self.a * d_val

            # Compute v = Kinv @ k
            v = self.Kinv @ k  # (n, 1)
            v_flat = v.ravel()

            # Incremental update of h_diag and Hy
            new_h_last = (d_val * kappa - d_val * (k.T @ v)).item()
            self.h_diag = np.append(self.h_diag - a_d * v_flat**2, new_h_last)

            y_old = self.y[:-1]
            vTy_old = float(v_flat @ y_old)
            self.Hy = np.append(self.Hy + a_d * v_flat * (y - vTy_old), a_d * vTy_old + new_h_last * y)

            self.K = self._update_K(self.K, k, kappa)
            self.Kinv = self._update_Kinv(self.Kinv, k, d_val)
            self.X = np.append(self.X, x.reshape(1, -1), axis=0)

online_cp.CPS.NearestNeighboursPredictionMachine

Bases: ConformalPredictiveSystem

Source code in src/online_cp/CPS.py
class NearestNeighboursPredictionMachine(ConformalPredictiveSystem):
    def __init__(
        self,
        k,
        distance="euclidean",
        distance_func=None,
        warnings=True,
        verbose=0,
        rnd_state=None,
        epsilon=default_epsilon,
    ):
        """
        Consider adding possibility to update self.k as the training set grows, e.g. by some heuristic or something.
        Two rules of thumb are quite simple:
            1. Choose k close to sqrt(n) where n is the training set size
            2. If the data has large variance, choose k larger. If the variance is small, choose k smaller. This is less clear, however.
        """
        # TODO: The sorting of conformity scores is the most time consuming step. Can it be done with parallel processing to speed things up?
        super().__init__(epsilon=epsilon)

        self.k = k

        self.distance = distance
        if distance_func is None:
            self.distance_func = self._standard_distance_func
        else:
            self.distance_func = distance_func
            self.distance = "custom"

        self.X = None
        self.y = None
        self.D = None

        # Should we raise warnings
        self.warnings = warnings

        self.verbose = verbose
        self.rnd_gen = np.random.default_rng(rnd_state)

    def _standard_distance_func(self, X, y=None):
        """
        By default we use scipy to compute distances
        """
        # TODO: Can this be done using parallel processing?
        X = np.atleast_2d(X)
        if y is None:
            dists = squareform(pdist(X, metric=self.distance))
        else:
            y = np.atleast_2d(y)
            dists = cdist(X, y, metric=self.distance)
        return dists

    def learn_initial_training_set(self, X: NDArray[np.floating[Any]], y: NDArray[np.floating[Any]]) -> None:
        """
        The Nearest neighbours prediction machine assumes all labels are unique. If they are not, we add noise to break ties.

        >>> cps = NearestNeighboursPredictionMachine(k=3)
        >>> X = np.array([[1], [2]])
        >>> y = np.array([1, 2])
        >>> cps.learn_initial_training_set(X, y)
        >>> cps.X
        array([[1],
               [2]])
        >>> cps.y
        array([1, 2])
        >>> cps.D
        array([[0., 1.],
               [1., 0.]])
        """
        self.X = X
        self.D = self.distance_func(X)
        self.y = y

    @staticmethod
    def update_distance_matrix(D, d):
        return np.block([[D, d], [d.T, np.array([0])]])

    def learn_one(self, x: NDArray[np.floating[Any]], y: float, precomputed: dict[str, Any] | None = None) -> None:
        """
        The Nearest neighbours prediction machine assumes all labels are unique. If they are not, we add noise to break ties.
        precomputed is a dictionary
        {
            'X': X,
            'D': D,
        }
        >>> cps = NearestNeighboursPredictionMachine(k=3, rnd_state=2024)
        >>> X = np.array([[1], [2]])
        >>> y = np.array([1, 2])
        >>> cps.learn_initial_training_set(X, y)
        >>> cps.learn_one(np.array([3]), 3)
        >>> cps.y
        array([1, 2, 3])
        >>> cps.X
        array([[1],
               [2],
               [3]])
        >>> cps.D
        array([[0., 1., 2.],
               [1., 0., 1.],
               [2., 1., 0.]])
        """
        # Learn label y
        if self.y is None:
            self.y = np.array([y])
        else:
            self.y = np.append(self.y, y)

        if precomputed is None:
            # Learn object x
            if self.X is None:
                self.X = x.reshape(1, -1)
                self.D = self.distance_func(self.X)
            else:
                d = self.distance_func(self.X, x)
                self.D = self.update_distance_matrix(self.D, d)
                self.X = np.append(self.X, x.reshape(1, -1), axis=0)
        else:
            self.X = precomputed["X"]
            self.D = precomputed["D"]

    def predict_cpd(self, x, return_update=False, save_time=False):
        """
        >>> import numpy as np
        >>> rnd_gen = np.random.default_rng(2024)
        >>> X = rnd_gen.normal(loc=0, scale=1, size=(100, 4))
        >>> beta = np.array([2, 1, 0, 0])
        >>> Y = X @ beta + rnd_gen.normal(loc=0, scale=1, size=100)
        >>> cps = NearestNeighboursPredictionMachine(k=3)
        >>> cps.learn_initial_training_set(X, Y)
        >>> x = rnd_gen.normal(loc=0, scale=1, size=(1, 4))
        >>> cpd = cps.predict_cpd(x)
        >>> cpd.L
        array([0.        , 0.        , 0.18811881, 0.53465347, 0.76237624])
        >>> cpd.U
        array([0.17821782, 0.18811881, 0.54455446, 0.76237624, 1.        ])
        """
        tic = time.time()
        # Temporarily update the distance matrix
        if self.X.shape[0] < self.k:
            # FIXME: Make some graceful error handling here
            raise Exception("Training set is too small...")
        else:
            d = self.distance_func(self.X, x)
            D = self.update_distance_matrix(self.D, d)
            y = np.append(self.y, -np.inf)  # Initialise label as -inf
        toc_dist = time.time() - tic

        tic = time.time()
        # Find all neighbours and semi-neighbours
        # Use argpartition for O(n) selection of k+1 smallest, then sort them
        # for deterministic tie-breaking consistent with full argsort.
        top_k1 = np.argpartition(D, self.k + 1, axis=0)[: self.k + 1]
        for col in range(D.shape[1]):
            idx = top_k1[:, col]
            order = np.argsort(D[idx, col])
            top_k1[:, col] = idx[order]
        k_nearest = top_k1[1:]  # skip self (distance=0, always first after sort)
        toc_sort = time.time() - tic

        tic = time.time()
        n = self.X.shape[0]

        full_neighbours = set()
        single_neighbours = set()
        semi_neighbours = set()

        k_nearest_of_n = set(k_nearest.T[-1])

        for i, col in enumerate(k_nearest.T):
            i_is_neighbour_of_n = i in k_nearest_of_n
            n_is_neighbour_of_i = n in col
            if i_is_neighbour_of_n and n_is_neighbour_of_i:
                full_neighbours.add(i)
            elif i_is_neighbour_of_n:
                single_neighbours.add(i)
            elif n_is_neighbour_of_i:
                semi_neighbours.add(i)

        neighbours = full_neighbours | single_neighbours
        full_or_semi = full_neighbours | semi_neighbours
        idx_all_neighbours_and_semi_neighbours = np.array(
            sorted(full_neighbours | single_neighbours | semi_neighbours)
        )
        toc_find_neighbours = time.time() - tic

        # Line 1
        Kprime = len(idx_all_neighbours_and_semi_neighbours)
        # Line 2 and 3
        Y = np.zeros(shape=Kprime + 2)
        Y[0] = -np.inf
        Y[-1] = np.inf
        Y[1:-1] = y[idx_all_neighbours_and_semi_neighbours]
        idx_mem = {i: idx_all_neighbours_and_semi_neighbours[i - 1] for i in range(1, Kprime + 1)}
        sorted_indices = np.argsort(Y)[1:-1]
        Y.sort()

        # Line 4: conformity scores and histogram
        Alpha = np.array([(y[k_nearest.T[i]] <= y_i).sum() for i, y_i in enumerate(y)])
        N = np.array([(Alpha == k).sum() for k in range(self.k + 1)])

        # Line 5
        L = -np.inf * np.ones(Kprime + 1)
        U = -np.inf * np.ones(Kprime + 1)
        L[0] = 0
        U[0] = N[0] / (n + 1)

        tic = time.time()
        # Line 6
        for k in range(1, Kprime + 1):
            idx = idx_mem[sorted_indices[k - 1]]
            if idx in neighbours:
                N[Alpha[-1]] -= 1
                Alpha[-1] += 1
                N[Alpha[-1]] += 1
            if idx in full_or_semi:
                N[Alpha[idx]] -= 1
                Alpha[idx] -= 1
                N[Alpha[idx]] += 1
            L[k] = N[: Alpha[-1]].sum() / (n + 1) if Alpha[-1] != 0 else 0
            U[k] = N[: Alpha[-1] + 1].sum() / (n + 1) if Alpha[-1] != 0 else N[0] / (n + 1)
        toc_loop = time.time() - tic

        time_dict = {
            "Compute distance matrix": toc_dist,
            "Sort distance matrix": toc_sort,
            "Find all neighbours and semi-neighbours": toc_find_neighbours,
            "Loop": toc_loop,
        }
        time_dict = time_dict if save_time else None
        # Line 12
        cpd = NearestNeighboursPredictiveDistributionFunction(L, U, Y, time_dict, epsilon=self.epsilon)

        if return_update:
            X = np.append(self.X, x.reshape(1, -1), axis=0)
            return cpd, {"X": X, "D": D}
        else:
            return cpd

__init__(k, distance='euclidean', distance_func=None, warnings=True, verbose=0, rnd_state=None, epsilon=default_epsilon)

Consider adding possibility to update self.k as the training set grows, e.g. by some heuristic or something. Two rules of thumb are quite simple: 1. Choose k close to sqrt(n) where n is the training set size 2. If the data has large variance, choose k larger. If the variance is small, choose k smaller. This is less clear, however.

Source code in src/online_cp/CPS.py
def __init__(
    self,
    k,
    distance="euclidean",
    distance_func=None,
    warnings=True,
    verbose=0,
    rnd_state=None,
    epsilon=default_epsilon,
):
    """
    Consider adding possibility to update self.k as the training set grows, e.g. by some heuristic or something.
    Two rules of thumb are quite simple:
        1. Choose k close to sqrt(n) where n is the training set size
        2. If the data has large variance, choose k larger. If the variance is small, choose k smaller. This is less clear, however.
    """
    # TODO: The sorting of conformity scores is the most time consuming step. Can it be done with parallel processing to speed things up?
    super().__init__(epsilon=epsilon)

    self.k = k

    self.distance = distance
    if distance_func is None:
        self.distance_func = self._standard_distance_func
    else:
        self.distance_func = distance_func
        self.distance = "custom"

    self.X = None
    self.y = None
    self.D = None

    # Should we raise warnings
    self.warnings = warnings

    self.verbose = verbose
    self.rnd_gen = np.random.default_rng(rnd_state)

learn_initial_training_set(X: NDArray[np.floating[Any]], y: NDArray[np.floating[Any]]) -> None

The Nearest neighbours prediction machine assumes all labels are unique. If they are not, we add noise to break ties.

cps = NearestNeighboursPredictionMachine(k=3) X = np.array([[1], [2]]) y = np.array([1, 2]) cps.learn_initial_training_set(X, y) cps.X array([[1], [2]]) cps.y array([1, 2]) cps.D array([[0., 1.], [1., 0.]])

Source code in src/online_cp/CPS.py
def learn_initial_training_set(self, X: NDArray[np.floating[Any]], y: NDArray[np.floating[Any]]) -> None:
    """
    The Nearest neighbours prediction machine assumes all labels are unique. If they are not, we add noise to break ties.

    >>> cps = NearestNeighboursPredictionMachine(k=3)
    >>> X = np.array([[1], [2]])
    >>> y = np.array([1, 2])
    >>> cps.learn_initial_training_set(X, y)
    >>> cps.X
    array([[1],
           [2]])
    >>> cps.y
    array([1, 2])
    >>> cps.D
    array([[0., 1.],
           [1., 0.]])
    """
    self.X = X
    self.D = self.distance_func(X)
    self.y = y

learn_one(x: NDArray[np.floating[Any]], y: float, precomputed: dict[str, Any] | None = None) -> None

The Nearest neighbours prediction machine assumes all labels are unique. If they are not, we add noise to break ties. precomputed is a dictionary { 'X': X, 'D': D, }

cps = NearestNeighboursPredictionMachine(k=3, rnd_state=2024) X = np.array([[1], [2]]) y = np.array([1, 2]) cps.learn_initial_training_set(X, y) cps.learn_one(np.array([3]), 3) cps.y array([1, 2, 3]) cps.X array([[1], [2], [3]]) cps.D array([[0., 1., 2.], [1., 0., 1.], [2., 1., 0.]])

Source code in src/online_cp/CPS.py
def learn_one(self, x: NDArray[np.floating[Any]], y: float, precomputed: dict[str, Any] | None = None) -> None:
    """
    The Nearest neighbours prediction machine assumes all labels are unique. If they are not, we add noise to break ties.
    precomputed is a dictionary
    {
        'X': X,
        'D': D,
    }
    >>> cps = NearestNeighboursPredictionMachine(k=3, rnd_state=2024)
    >>> X = np.array([[1], [2]])
    >>> y = np.array([1, 2])
    >>> cps.learn_initial_training_set(X, y)
    >>> cps.learn_one(np.array([3]), 3)
    >>> cps.y
    array([1, 2, 3])
    >>> cps.X
    array([[1],
           [2],
           [3]])
    >>> cps.D
    array([[0., 1., 2.],
           [1., 0., 1.],
           [2., 1., 0.]])
    """
    # Learn label y
    if self.y is None:
        self.y = np.array([y])
    else:
        self.y = np.append(self.y, y)

    if precomputed is None:
        # Learn object x
        if self.X is None:
            self.X = x.reshape(1, -1)
            self.D = self.distance_func(self.X)
        else:
            d = self.distance_func(self.X, x)
            self.D = self.update_distance_matrix(self.D, d)
            self.X = np.append(self.X, x.reshape(1, -1), axis=0)
    else:
        self.X = precomputed["X"]
        self.D = precomputed["D"]

predict_cpd(x, return_update=False, save_time=False)

import numpy as np rnd_gen = np.random.default_rng(2024) X = rnd_gen.normal(loc=0, scale=1, size=(100, 4)) beta = np.array([2, 1, 0, 0]) Y = X @ beta + rnd_gen.normal(loc=0, scale=1, size=100) cps = NearestNeighboursPredictionMachine(k=3) cps.learn_initial_training_set(X, Y) x = rnd_gen.normal(loc=0, scale=1, size=(1, 4)) cpd = cps.predict_cpd(x) cpd.L array([0. , 0. , 0.18811881, 0.53465347, 0.76237624]) cpd.U array([0.17821782, 0.18811881, 0.54455446, 0.76237624, 1. ])

Source code in src/online_cp/CPS.py
def predict_cpd(self, x, return_update=False, save_time=False):
    """
    >>> import numpy as np
    >>> rnd_gen = np.random.default_rng(2024)
    >>> X = rnd_gen.normal(loc=0, scale=1, size=(100, 4))
    >>> beta = np.array([2, 1, 0, 0])
    >>> Y = X @ beta + rnd_gen.normal(loc=0, scale=1, size=100)
    >>> cps = NearestNeighboursPredictionMachine(k=3)
    >>> cps.learn_initial_training_set(X, Y)
    >>> x = rnd_gen.normal(loc=0, scale=1, size=(1, 4))
    >>> cpd = cps.predict_cpd(x)
    >>> cpd.L
    array([0.        , 0.        , 0.18811881, 0.53465347, 0.76237624])
    >>> cpd.U
    array([0.17821782, 0.18811881, 0.54455446, 0.76237624, 1.        ])
    """
    tic = time.time()
    # Temporarily update the distance matrix
    if self.X.shape[0] < self.k:
        # FIXME: Make some graceful error handling here
        raise Exception("Training set is too small...")
    else:
        d = self.distance_func(self.X, x)
        D = self.update_distance_matrix(self.D, d)
        y = np.append(self.y, -np.inf)  # Initialise label as -inf
    toc_dist = time.time() - tic

    tic = time.time()
    # Find all neighbours and semi-neighbours
    # Use argpartition for O(n) selection of k+1 smallest, then sort them
    # for deterministic tie-breaking consistent with full argsort.
    top_k1 = np.argpartition(D, self.k + 1, axis=0)[: self.k + 1]
    for col in range(D.shape[1]):
        idx = top_k1[:, col]
        order = np.argsort(D[idx, col])
        top_k1[:, col] = idx[order]
    k_nearest = top_k1[1:]  # skip self (distance=0, always first after sort)
    toc_sort = time.time() - tic

    tic = time.time()
    n = self.X.shape[0]

    full_neighbours = set()
    single_neighbours = set()
    semi_neighbours = set()

    k_nearest_of_n = set(k_nearest.T[-1])

    for i, col in enumerate(k_nearest.T):
        i_is_neighbour_of_n = i in k_nearest_of_n
        n_is_neighbour_of_i = n in col
        if i_is_neighbour_of_n and n_is_neighbour_of_i:
            full_neighbours.add(i)
        elif i_is_neighbour_of_n:
            single_neighbours.add(i)
        elif n_is_neighbour_of_i:
            semi_neighbours.add(i)

    neighbours = full_neighbours | single_neighbours
    full_or_semi = full_neighbours | semi_neighbours
    idx_all_neighbours_and_semi_neighbours = np.array(
        sorted(full_neighbours | single_neighbours | semi_neighbours)
    )
    toc_find_neighbours = time.time() - tic

    # Line 1
    Kprime = len(idx_all_neighbours_and_semi_neighbours)
    # Line 2 and 3
    Y = np.zeros(shape=Kprime + 2)
    Y[0] = -np.inf
    Y[-1] = np.inf
    Y[1:-1] = y[idx_all_neighbours_and_semi_neighbours]
    idx_mem = {i: idx_all_neighbours_and_semi_neighbours[i - 1] for i in range(1, Kprime + 1)}
    sorted_indices = np.argsort(Y)[1:-1]
    Y.sort()

    # Line 4: conformity scores and histogram
    Alpha = np.array([(y[k_nearest.T[i]] <= y_i).sum() for i, y_i in enumerate(y)])
    N = np.array([(Alpha == k).sum() for k in range(self.k + 1)])

    # Line 5
    L = -np.inf * np.ones(Kprime + 1)
    U = -np.inf * np.ones(Kprime + 1)
    L[0] = 0
    U[0] = N[0] / (n + 1)

    tic = time.time()
    # Line 6
    for k in range(1, Kprime + 1):
        idx = idx_mem[sorted_indices[k - 1]]
        if idx in neighbours:
            N[Alpha[-1]] -= 1
            Alpha[-1] += 1
            N[Alpha[-1]] += 1
        if idx in full_or_semi:
            N[Alpha[idx]] -= 1
            Alpha[idx] -= 1
            N[Alpha[idx]] += 1
        L[k] = N[: Alpha[-1]].sum() / (n + 1) if Alpha[-1] != 0 else 0
        U[k] = N[: Alpha[-1] + 1].sum() / (n + 1) if Alpha[-1] != 0 else N[0] / (n + 1)
    toc_loop = time.time() - tic

    time_dict = {
        "Compute distance matrix": toc_dist,
        "Sort distance matrix": toc_sort,
        "Find all neighbours and semi-neighbours": toc_find_neighbours,
        "Loop": toc_loop,
    }
    time_dict = time_dict if save_time else None
    # Line 12
    cpd = NearestNeighboursPredictiveDistributionFunction(L, U, Y, time_dict, epsilon=self.epsilon)

    if return_update:
        X = np.append(self.X, x.reshape(1, -1), axis=0)
        return cpd, {"X": X, "D": D}
    else:
        return cpd

online_cp.CPS.DempsterHillConformalPredictiveSystem

Bases: ConformalPredictiveSystem

Source code in src/online_cp/CPS.py
class DempsterHillConformalPredictiveSystem(ConformalPredictiveSystem):
    def __init__(self, verbose=0, rnd_state=None, epsilon=default_epsilon):
        """
        The Dempster-Hill conformal predictive system uses only the labels of the examples, so the latter can be ignored.
        """
        super().__init__(epsilon=epsilon)
        self.y = None
        self.verbose = verbose
        self.rnd_gen = np.random.default_rng(rnd_state)

    def learn_initial_training_set(self, y):
        self.y = y

    def learn_one(self, y):
        self.y = np.append(self.y, y)

    def predict_cpd(self, save_time=False):
        tic = time.time()
        Y = np.zeros(shape=self.y.shape[0] + 2)
        Y[0] = -np.inf
        Y[-1] = np.inf
        Y[1:-1] = self.y
        Y.sort()
        time_sort = time.time() - tic

        time_dict = {"Sort labels": time_sort} if save_time else None

        return DempsterHillConformalPredictiveDistribution(Y, time_dict, epsilon=self.epsilon)

__init__(verbose=0, rnd_state=None, epsilon=default_epsilon)

The Dempster-Hill conformal predictive system uses only the labels of the examples, so the latter can be ignored.

Source code in src/online_cp/CPS.py
def __init__(self, verbose=0, rnd_state=None, epsilon=default_epsilon):
    """
    The Dempster-Hill conformal predictive system uses only the labels of the examples, so the latter can be ignored.
    """
    super().__init__(epsilon=epsilon)
    self.y = None
    self.verbose = verbose
    self.rnd_gen = np.random.default_rng(rnd_state)