Source code for hapc.cv

"""
Cross-validated HAPC fitting (high-level Python API).

Provides :func:`cv_hapc` (counterpart of R ``cv.hapc()``) and the lower-level
helpers :func:`pcghal_cv` and :func:`fasthal_cv`.

All three Python CV variants delegate to a pure-C++ implementation that uses
the **same** fold partitioning as R (block partition with last fold absorbing
the remainder, then shuffled with ``std::mt19937(12345)``).  This guarantees
that ``cv.hapc()`` and ``cv_hapc()`` produce identical CV scores / best-λ / α
for the same inputs across languages (gaussian and binomial ``norm="sv"``;
binomial ``norm`` in ``{"1","2"}`` uses the same squared-error CV as R).
"""

from typing import NamedTuple, Optional

import numpy as np

from . import hapc_core
from .core import _C, cross_kernel_hapc, design_hapc
from .single import (
    _check_binomial_labels,
    _to_soft01,
    single_pcghal_classification_lasso,
)


[docs] class CVResult(NamedTuple): """Output of all CV variants. Attributes ---------- mses : np.ndarray, shape (L,) Mean cross-validated risk per λ. For binomial CV this is logistic deviance (the field is still called ``mses`` for API uniformity). lambdas : np.ndarray, shape (L,) λ grid that was searched. best_lambda : float λ minimising ``mses``. best_model_alpha : np.ndarray, shape (k,) α refit on the full training data at ``best_lambda``. predictions : np.ndarray or None Predictions on ``predict`` (if supplied). For ``pcghal_cv_classi`` these are probabilities; for ``pcghal_cv`` / ``fasthal_cv`` they match the underlying regression head (continuous). """ mses: np.ndarray lambdas: np.ndarray best_lambda: float best_model_alpha: np.ndarray predictions: Optional[np.ndarray]
def _grid(lambdas: Optional[np.ndarray], log_lambda_min: float, log_lambda_max: float, grid_length: int) -> np.ndarray: if lambdas is not None: return np.asarray(lambdas, dtype=np.float64) return np.exp(np.linspace(log_lambda_min, log_lambda_max, grid_length)) # --------------------------------------------------------------------------- # norm="sv" (gaussian) → C++ pcghal_cv_fit (init + PGD per fold) # ---------------------------------------------------------------------------
[docs] def pcghal_cv(X: np.ndarray, Y: np.ndarray, max_degree: int, npcs: int, lambdas: Optional[np.ndarray] = None, log_lambda_min: float = -5, log_lambda_max: float = -3, grid_length: int = 10, nfolds: int = 5, predict: Optional[np.ndarray] = None, center: bool = True, approx: bool = False, verbose: bool = False, max_iter: int = 5000, tol: float = 1e-3, step_factor: float = 0.8, crit: str = "grad", ini: str = "1") -> CVResult: """k-fold CV for gaussian HAPC with ``norm="sv"``. Per fold: initialise α with LASSO (``ini="1"``, default) or ridge (``ini="2"``), then projected gradient descent on squared error — same pipeline as R ``cv.hapc(family="gaussian", norm="sv")``. Delegates to C++ ``pcghal_cv_fit`` (shared with R), including fold IDs. Parameters ---------- X : np.ndarray, shape (n, p) Features. Y : np.ndarray, shape (n,) Continuous response. max_degree : int HAL interaction order. npcs : int Number of PCs. lambdas : np.ndarray, optional Explicit λ grid. If ``None``, built from ``log_*`` and ``grid_length``. log_lambda_min, log_lambda_max : float Log-space grid endpoints when ``lambdas`` is ``None``. grid_length : int, default 10 Number of grid points when ``lambdas`` is ``None``. nfolds : int, default 5 CV folds. predict : np.ndarray, optional, shape (m, p) Test matrix for predictions at the winning λ. center, approx : bool Passed through to design / kernel (``approx`` reserved). verbose : bool Print PGD progress. max_iter, tol, step_factor : float / int PGD budget and step-size factor. crit : {"grad", "risk"}, default "grad" PGD stopping rule (matches R default). ini : {"1", "2"}, default "1" ``"1"`` = LASSO initialisation, ``"2"`` = ridge. Returns ------- CVResult ``mses`` holds mean CV MSE per λ; ``best_model_alpha`` is α refit on all data at ``best_lambda``. """ if ini not in {"1", "2"}: raise ValueError(f"ini must be '1' or '2'; got '{ini}'") if crit not in {"grad", "risk"}: raise ValueError(f"crit must be 'grad' or 'risk'; got '{crit}'") X = _C(X) Y = _C(Y).ravel() n, p = X.shape if Y.size != n: raise ValueError(f"Y must have {n} elements, got {Y.size}") lams = _grid(lambdas, log_lambda_min, log_lambda_max, grid_length) if predict is not None: Xte = _C(predict) if Xte.shape[1] != p: raise ValueError(f"predict must have {p} columns") else: Xte = np.empty((0, p), dtype=np.float64) res = hapc_core.pcghal_cv_fit( X, Y, int(max_degree), int(npcs), lams.tolist(), int(nfolds), Xte, int(max_iter), float(tol), float(step_factor), bool(verbose), str(crit), bool(center), bool(approx), str(ini), ) predictions = (np.asarray(res.predictions) if predict is not None and res.predictions.size > 0 else None) return CVResult( mses=np.asarray(res.mses), lambdas=np.asarray(res.lambdas), best_lambda=float(res.best_lambda), best_model_alpha=np.asarray(res.best_alpha), predictions=predictions, )
# --------------------------------------------------------------------------- # norm in {"1","2"} (gaussian) → C++ fasthal_cv_fit # ---------------------------------------------------------------------------
[docs] def fasthal_cv(X: np.ndarray, Y: np.ndarray, npcs: int, lambdas: np.ndarray, nfolds: int = 5, predict: Optional[np.ndarray] = None, max_degree: int = 1, center: bool = True, approx: bool = False, l1: bool = True) -> CVResult: """k-fold CV with closed-form LASSO or ridge in the PC basis per fold. Used for gaussian ``norm="1"`` / ``norm="2"`` and, in R, also for ``family="binomial"`` with those norms (squared-error CV on ``Y``). Calls the same C++ routine as R ``fasthal_cv_call`` with identical fold construction (``std::mt19937(12345)`` shuffle), so CV scores match R. Parameters ---------- X : np.ndarray, shape (n, p) Training features. Y : np.ndarray, shape (n,) Response (continuous, or binary labels if using the binomial branch in :func:`cv_hapc`). npcs : int Number of principal components. lambdas : np.ndarray, shape (L,) Penalty grid (must be supplied explicitly; :func:`cv_hapc` builds it). nfolds : int, default 5 Number of CV folds. predict : np.ndarray, optional, shape (m, p) Test features for out-of-sample predictions at the CV-winning λ. max_degree : int, default 1 HAL interaction order. center : bool, default True Centre kernel / response like training. approx : bool, default False If True, approximate top eigenvectors (same flag as R). l1 : bool, default True ``True`` for LASSO (``norm="1"``), ``False`` for ridge (``norm="2"``). Returns ------- CVResult ``mses`` are mean squared errors on held-out folds. """ X = _C(X) Y = _C(Y).ravel() n, p = X.shape if Y.size != n: raise ValueError(f"Y must have {n} elements, got {Y.size}") lams = np.asarray(lambdas, dtype=np.float64) if predict is not None: Xte = _C(predict) if Xte.shape[1] != p: raise ValueError(f"predict must have {p} columns") else: Xte = np.empty((0, p), dtype=np.float64) res = hapc_core.fasthal_cv_fit( X, Y, int(npcs), lams.tolist(), int(nfolds), Xte, int(max_degree), bool(center), bool(approx), bool(l1), ) predictions = (np.asarray(res.predictions) if predict is not None and res.predictions.size > 0 else None) return CVResult( mses=np.asarray(res.mses), lambdas=np.asarray(res.lambdas), best_lambda=float(res.best_lambda), best_model_alpha=np.asarray(res.best_alpha), predictions=predictions, )
# --------------------------------------------------------------------------- # norm="sv" (binomial) → C++ pcghal_cv_classi_fit # ---------------------------------------------------------------------------
[docs] def pcghal_cv_classi(X: np.ndarray, Y: np.ndarray, max_degree: int, npcs: int, lambdas: Optional[np.ndarray] = None, log_lambda_min: float = -5, log_lambda_max: float = -3, grid_length: int = 10, nfolds: int = 5, predict: Optional[np.ndarray] = None, center: bool = True, verbose: bool = False, max_iter: int = 5000, tol: float = 1e-3, step_factor: float = 0.8, with_pgd: bool = True) -> CVResult: """k-fold logistic-loss CV for binomial HAPC. Calls the shared C++ logistic CV used by R's ``cv.hapc(family="binomial")``. Risk is **always logistic deviance**; we never use squared error on a binary response. Parameters ---------- X, Y : np.ndarray Features and 0/1 response. with_pgd : bool, default True ``True`` → ``norm="sv"`` (logistic ridge initialiser + projected gradient descent on logistic loss, per fold). ``False`` → ``norm="2"`` (logistic ridge only, no PGD), still scored with logistic deviance. max_degree, npcs, lambdas, log_lambda_min, log_lambda_max, grid_length,\ nfolds, predict, center, verbose, max_iter, tol, step_factor : See :func:`cv_hapc`. Returns ------- CVResult ``mses`` holds mean per-fold logistic deviance. ``predictions`` are probabilities on ``predict``. """ X = _C(X) Y = _C(Y).ravel() n, p = X.shape if Y.size != n: raise ValueError(f"Y must have {n} elements, got {Y.size}") lams = _grid(lambdas, log_lambda_min, log_lambda_max, grid_length) if predict is not None: Xte = _C(predict) if Xte.shape[1] != p: raise ValueError(f"predict must have {p} columns") else: Xte = np.empty((0, p), dtype=np.float64) res = hapc_core.pcghal_cv_classi_fit( X, Y, int(max_degree), int(npcs), lams.tolist(), int(nfolds), Xte, int(max_iter), float(tol), float(step_factor), bool(verbose), bool(center), bool(with_pgd), ) predictions = (np.asarray(res.predictions) if predict is not None and res.predictions.size > 0 else None) return CVResult( mses=np.asarray(res.deviances), lambdas=np.asarray(res.lambdas), best_lambda=float(res.best_lambda), best_model_alpha=np.asarray(res.best_alpha), predictions=predictions, )
# --------------------------------------------------------------------------- # norm="1" (binomial) → glmnet-style logistic LASSO CV (sklearn liblinear) # --------------------------------------------------------------------------- def _native_folds(n: int, K: int, seed: int = 12345) -> np.ndarray: """Block partition with last fold absorbing the remainder, then shuffled deterministically with a numpy MT19937 seeded by ``seed``. The shuffle algorithm is numpy's, not libstdc++'s, so this does **not** match the C++ ``make_folds`` output bit-for-bit — but it is deterministic and gives equally-sized folds. Used only by the binomial+norm="1" path, where the solver itself differs from the C++ paths anyway. """ fold_size = n // K folds = np.empty(n, dtype=np.int64) for i in range(n): folds[i] = (i // fold_size) + 1 for i in range(fold_size * K, n): folds[i] = K rng = np.random.default_rng(seed) rng.shuffle(folds) return folds
[docs] def pcghal_cv_classi_lasso(X: np.ndarray, Y: np.ndarray, max_degree: int, npcs: int, lambdas: Optional[np.ndarray] = None, log_lambda_min: float = -5, log_lambda_max: float = -3, grid_length: int = 10, nfolds: int = 5, predict: Optional[np.ndarray] = None, center: bool = True, verbose: bool = False, max_iter: int = 1000) -> CVResult: """k-fold logistic-LASSO CV for binomial HAPC, ``norm="1"``. Per fold, fits :func:`hapc.single_pcghal_classification_lasso` (sklearn LogisticRegression with ``penalty="l1"``, ``solver="liblinear"``, ``fit_intercept=False``) on the training portion and scores the held-out portion with logistic deviance. Mirrors R ``cv.hapc(family="binomial", norm="1")``, which uses ``glmnet`` per fold. Parameters ---------- See :func:`cv_hapc`. Returns ------- CVResult ``mses`` holds mean per-fold logistic deviance. ``best_model_alpha`` is the LASSO α refit on the full data at ``best_lambda``. ``predictions`` are probabilities at ``best_lambda`` on ``predict``. """ X = _C(X) Y = _C(Y).ravel() n, p = X.shape if Y.size != n: raise ValueError(f"Y must have {n} elements, got {Y.size}") lams = _grid(lambdas, log_lambda_min, log_lambda_max, grid_length) if not np.all(lams > 0): raise ValueError("All lambdas must be > 0 for logistic LASSO.") # Soft target in [0,1] used for the held-out cross-entropy deviance # (accepts hard {0,1}/{-1,+1} or fractional EM-HAL posteriors). q = _to_soft01(Y) folds = _native_folds(n, int(nfolds)) L = lams.size fold_dev = np.full((int(nfolds), L), np.nan) for k in range(1, int(nfolds) + 1): te = np.where(folds == k)[0] tr = np.where(folds != k)[0] if te.size == 0 or tr.size == 0: continue Xtr, Ytr = X[tr], Y[tr] Xte, Yte = X[te], q[te] for j, lam in enumerate(lams): res = single_pcghal_classification_lasso( Xtr, Ytr, max_degree=int(max_degree), npcs=int(npcs), lambda_=float(lam), predict=Xte, center=bool(center), verbose=bool(verbose), max_iter=int(max_iter), ) probs = np.clip(res.probabilities, 1e-15, 1 - 1e-15) dev = -(Yte * np.log(probs) + (1 - Yte) * np.log(1 - probs)) fold_dev[k - 1, j] = float(dev.mean()) deviances = np.nanmean(fold_dev, axis=0) best_idx = int(np.nanargmin(deviances)) best_lambda = float(lams[best_idx]) full = single_pcghal_classification_lasso( X, Y, max_degree=int(max_degree), npcs=int(npcs), lambda_=best_lambda, predict=predict, center=bool(center), verbose=bool(verbose), max_iter=int(max_iter), ) predictions = full.probabilities if predict is not None else None return CVResult( mses=np.asarray(deviances), lambdas=np.asarray(lams), best_lambda=best_lambda, best_model_alpha=np.asarray(full.alpha), predictions=predictions, )
# --------------------------------------------------------------------------- # High-level dispatcher # ---------------------------------------------------------------------------
[docs] def cv_hapc(X: np.ndarray, Y: np.ndarray, family: str = "gaussian", max_degree: int = 1, npcs: Optional[int] = None, log_lambda_min: float = -5, log_lambda_max: float = -3, grid_length: int = 10, nfolds: int = 5, norm: str = "sv", predict: Optional[np.ndarray] = None, max_iter: int = 5000, tol: float = 1e-3, step_factor: float = 0.8, verbose: bool = False, crit: str = "grad", center: bool = True, approx: bool = False, ini: str = "1", Delta: Optional[np.ndarray] = None, time_grid: Optional[np.ndarray] = None) -> CVResult: """k-fold cross-validated HAPC fit (Python counterpart of R ``cv.hapc()``). Parameters ---------- X : np.ndarray, shape (n, p) Features. Y : np.ndarray, shape (n,) Response. family : {"gaussian", "binomial", "logit-hazard"}, default "gaussian" Loss family. For ``"binomial"`` the loss is **always logistic** (deviance CV, not MSE). ``norm="sv"`` runs logistic ridge + PGD per fold; ``norm="2"`` runs logistic ridge only; ``norm="1"`` runs logistic LASSO via ``sklearn.linear_model.LogisticRegression`` with ``solver="liblinear"`` (mirrors R ``glmnet`` for the binomial+norm="1" path). ``"logit-hazard"`` is a discrete-time logistic hazard model: ``Y`` is then the observed time ``T = min(T_event, C)``, ``Delta`` (event indicator) is required, and the call forwards to :func:`hapc.hazard_hapc` (returning a :class:`hapc.hazard.HazardResult`, not a :class:`CVResult`). ``norm="sv"`` is not supported for this family. Delta : np.ndarray, optional Event indicator (0/1), required only for ``family="logit-hazard"``. time_grid : np.ndarray, optional Discrete time grid for ``family="logit-hazard"`` (see :func:`hapc.hazard_hapc`). max_degree : int, default 1 Maximum interaction order. npcs : int, optional Number of PCs. Defaults to ``n``. log_lambda_min, log_lambda_max : float Bounds of the log-λ grid (defaults ``-5``, ``-3``). grid_length : int, default 10 Number of grid points. nfolds : int, default 5 Number of CV folds. Identical fold partitions to R. norm : {"sv", "1", "2"}, default "sv" Norm constraint. See :func:`hapc.single.hapc`. predict : np.ndarray, optional Test features. max_iter, tol, step_factor, crit : optional PGD parameters used when ``norm="sv"``. center, approx, verbose, ini : optional See :func:`hapc.single.hapc`. Returns ------- CVResult Examples -------- >>> import numpy as np >>> from hapc import cv_hapc >>> rng = np.random.default_rng(0) >>> X = rng.standard_normal((100, 3)) >>> Y = np.sin(np.pi * X[:, 0]) + rng.standard_normal(100) * 0.1 >>> cv = cv_hapc(X, Y, max_degree=2, npcs=99, nfolds=5) >>> bool(cv.best_lambda > 0) True """ # family = "logit-hazard": discrete-time logistic hazard. The second # positional argument Y carries the observed times T = min(T^event, C); the # event indicator Delta must be supplied. norm = "sv" is flagged inside # hazard_hapc. See hapc.hazard_hapc. if family == "logit-hazard": if Delta is None: raise ValueError( "family='logit-hazard' requires 'Delta' (event indicator); pass " "the observed times T as Y and Delta separately. See hazard_hapc." ) from .hazard import hazard_hapc # local import avoids circular import return hazard_hapc( X, T=Y, Delta=Delta, norm=norm, max_degree=max_degree, npcs=npcs, time_grid=time_grid, log_lambda_min=log_lambda_min, log_lambda_max=log_lambda_max, grid_length=grid_length, nfolds=nfolds, predict=predict, center=center, verbose=verbose, max_iter=max_iter, tol=tol, step_factor=step_factor, ) if family not in {"gaussian", "binomial"}: raise ValueError( f"family must be 'gaussian', 'binomial', or 'logit-hazard'; got '{family}'" ) if npcs is None: npcs = int(X.shape[0]) lams = _grid(None, log_lambda_min, log_lambda_max, grid_length) if family == "binomial": # Validate labels; allow soft labels in [0,1] only for norm in {"1","2"}. _check_binomial_labels(Y, norm) if norm in {"sv", "2"}: return pcghal_cv_classi( X, Y, max_degree=max_degree, npcs=npcs, lambdas=lams, nfolds=nfolds, predict=predict, center=center, verbose=verbose, max_iter=max_iter, tol=tol, step_factor=step_factor, with_pgd=(norm == "sv"), ) if norm == "1": return pcghal_cv_classi_lasso( X, Y, max_degree=max_degree, npcs=npcs, lambdas=lams, nfolds=nfolds, predict=predict, center=center, verbose=verbose, ) raise ValueError( f"family='binomial' supports norm in {{'sv','1','2'}}; got '{norm}'" ) if norm == "sv": return pcghal_cv( X, Y, max_degree=max_degree, npcs=npcs, lambdas=lams, nfolds=nfolds, predict=predict, center=center, approx=approx, verbose=verbose, max_iter=max_iter, tol=tol, step_factor=step_factor, crit=crit, ini=ini, ) if norm in {"1", "2"}: return fasthal_cv( X, Y, npcs=npcs, lambdas=lams, nfolds=nfolds, predict=predict, max_degree=max_degree, center=center, approx=approx, l1=(norm == "1"), ) raise ValueError(f"norm must be one of {{'sv','1','2'}}; got '{norm}'")
__all__ = [ "CVResult", "pcghal_cv", "pcghal_cv_classi", "pcghal_cv_classi_lasso", "fasthal_cv", "cv_hapc", ]