Source code for hapc.ate

"""Average Treatment Effect estimation with HAPC + undersmoothing.

Provides :func:`ate_hapc`, a high-level convenience wrapper that:

1. Cross-validates the **propensity** model (binomial, ``A ~ W``) on a
   log-spaced grid
   ``(log_lambda_prop_min, log_lambda_prop_max, grid_length_prop)``
   and the **outcome** model (gaussian, ``Y ~ (A, W)``) on a separate grid
   ``(log_lambda_out_min, log_lambda_out_max, grid_length_out)``
   (each built like :func:`hapc.cv_hapc`).
2. Fixes the propensity score at its CV-best λ.
3. Computes σ = std of the ATE efficient influence function (EIF) at the
   CV configuration ``(π̂_CV, μ̂_CV)``.
4. Restricts to the **undersmoothing region** ``λ ≤ λ_CV`` (decreasing λ from
   the CV value = a more flexible outcome fit) and selects the **smallest** λ
   for which ``|mean(EIF)| ≤ σ / (√n · log n)`` — the most undersmoothed fit
   still inside the band (matching §8.3 of the manuscript).  If no λ in the
   region meets the threshold, the λ minimising ``|mean(EIF)|`` is used and a
   warning is emitted.  Warnings also fire when the CV fit already satisfies
   the band (no undersmoothing happens) or when the selection is pinned to the
   grid edge (possible over-undersmoothing).
5. Returns a **doubly robust** ATE point estimate at the undersmoothed outcome
   model and a ``(1 - alpha)`` Wald confidence interval from the EIF evaluated
   at that estimate (see Notes).

This ``method="undersmooth"`` path does not implement sample splitting /
cross-fitting: nuisances are fit on the full sample and the EIF is evaluated on
the same sample, with bias control delegated to the undersmoothing step.

Because the empirical score is evaluated **in-sample**, the undersmoothing gate
is fragile: reducing λ drives the score toward zero through in-sample
overfitting (or floors it via PC truncation) rather than through genuine bias
removal, and ``std(EIF)`` underestimates the sampling variance. The result is a
biased point estimate and/or a Wald interval that under-covers.

``method="crossfit"`` (DML-style K-fold cross-fitting) avoids this: both
nuisances are CV-selected and fit on the training folds and the EIF is evaluated
only on the held-out fold, so its mean and variance are honest and no
undersmoothing is needed. The default remains ``"undersmooth"`` for backward
compatibility; prefer ``"crossfit"`` for valid point estimates and coverage.
"""

import warnings
from typing import NamedTuple, Optional

import numpy as np

try:
    from scipy.stats import norm as _normal
except ImportError as _e:  # pragma: no cover
    raise ImportError(
        "scipy is required for ate_hapc (used for normal quantiles). "
        "It ships transitively with scikit-learn; run `pip install scipy`."
    ) from _e

from .cv import CVResult, cv_hapc
from .single import hapc as _hapc


[docs] class ATEResult(NamedTuple): """Output of :func:`ate_hapc`. Attributes ---------- estimate : float Doubly robust (AIPW-style) ATE at the undersmoothed outcome model: ``mean(A/π̂·(Y-μ̂₁)+μ̂₁ - (1-A)/(1-π̂)·(Y-μ̂₀) - μ̂₀)``, matching the efficient influence function used for the Wald interval (see Notes). lower : float Lower endpoint of the ``(1 - alpha)`` Wald confidence interval. upper : float Upper endpoint of the ``(1 - alpha)`` Wald confidence interval. """ estimate: float lower: float upper: float
def _plot_ate_diagnostics( cv_prop: CVResult, cv_out: CVResult, traj_lambdas: np.ndarray, traj_abs_mean_eif: np.ndarray, lam_prop_cv: float, lam_out_cv: float, lam_undersmooth: float, threshold: float, ) -> None: """Raise ImportError if matplotlib is missing; otherwise show diagnostic figures.""" try: import matplotlib.pyplot as plt except ImportError as e: # pragma: no cover raise ImportError( "plot_diagnostics=True requires matplotlib. " "Install with: pip install matplotlib" ) from e fig = plt.figure(figsize=(11.0, 7.5)) gs = fig.add_gridspec(2, 2, height_ratios=[1.0, 1.1], hspace=0.35, wspace=0.3) ax_prop = fig.add_subplot(gs[0, 0]) ax_out = fig.add_subplot(gs[0, 1]) ax_traj = fig.add_subplot(gs[1, :]) lp = np.asarray(cv_prop.lambdas, dtype=float) sp = np.asarray(cv_prop.mses, dtype=float) lo = np.asarray(cv_out.lambdas, dtype=float) so = np.asarray(cv_out.mses, dtype=float) ax_prop.semilogx(lp, sp, "o-", color="C1", lw=1.5, ms=5) ax_prop.axvline(lam_prop_cv, color="C3", ls="--", lw=1.5, label=f"CV λ = {lam_prop_cv:.4g}") ax_prop.set_xlabel("λ (propensity)") ax_prop.set_ylabel("Mean CV logistic deviance") ax_prop.set_title("Propensity CV (A ~ W, binomial)") ax_prop.legend(loc="best", fontsize=8) ax_prop.grid(True, alpha=0.3) ax_out.semilogx(lo, so, "o-", color="C2", lw=1.5, ms=5) ax_out.axvline(lam_out_cv, color="C3", ls="--", lw=1.5, label=f"CV λ = {lam_out_cv:.4g}") ax_out.set_xlabel("λ (outcome)") ax_out.set_ylabel("Mean CV MSE") ax_out.set_title("Outcome CV (Y ~ (A,W), gaussian)") ax_out.legend(loc="best", fontsize=8) ax_out.grid(True, alpha=0.3) tv = np.asarray(traj_lambdas, dtype=float) yv = np.asarray(traj_abs_mean_eif, dtype=float) ok = np.isfinite(tv) & np.isfinite(yv) & (tv > 0) tv, yv = tv[ok], yv[ok] order = np.argsort(tv) tv, yv = tv[order], yv[order] if tv.size: ax_traj.semilogx(tv, yv, "o-", color="C0", lw=2, ms=6, label=r"$|\mathrm{mean}(\mathrm{EIF}_{\mathrm{ATE}})|$") ax_traj.fill_between(tv, 0, threshold, alpha=0.12, color="gray") else: ax_traj.text( 0.5, 0.5, "No valid outcome fits on λ grid", transform=ax_traj.transAxes, ha="center", va="center", ) ax_traj.axhline(threshold, color="gray", lw=2, alpha=0.85, label=r"Threshold $\sigma_{\mathrm{CV}}/(\sqrt{n}\log n)$") ax_traj.axvline(lam_out_cv, color="C3", ls="--", lw=1.8, label=f"Outcome CV λ = {lam_out_cv:.4g}") ax_traj.axvline(lam_undersmooth, color="C4", ls="-", lw=2.0, label=f"Undersmoothed λ = {lam_undersmooth:.4g}") ax_traj.set_xlabel("Outcome λ (undersmoothing grid)") ax_traj.set_ylabel(r"$|\mathrm{mean}(\mathrm{EIF})|$") ax_traj.set_title("Undersmoothing trajectory (fixed propensity at its CV-λ)") ax_traj.legend(loc="best", fontsize=9, ncol=2) ax_traj.grid(True, alpha=0.3) fig.suptitle("ate_hapc diagnostics", fontsize=12, y=0.98) fig.subplots_adjust(top=0.92, bottom=0.08, hspace=0.4, wspace=0.3) plt.show() def _print_undersmoothing_table( path_abs: dict, threshold: float, sigma_cv: float, n: int, lam_out_cv: float, lam_und: float, ) -> None: """Print a ``|mean(EIF)|`` vs ``λ`` table for the undersmoothing scan (stdout). Lets the user confirm the undersmoothing gate fired correctly: every outcome ``λ`` in the undersmoothing region (``λ ≤ λ_CV``, listed from λ_CV downward = increasing flexibility), its in-sample ``|mean(EIF)|``, and whether it lies within the threshold ``τ = σ_CV / (√n · log n)`` (σ_CV is the std of the EIF at the CV outcome fit). The CV λ and the selected undersmoothed λ are flagged. """ lams = sorted(path_abs, reverse=True) # λ_CV → smallest λ (scan direction) print("\nate_hapc undersmoothing report (method='undersmooth')") print(f" n = {n} sigma_CV = {sigma_cv:.6g} " f"tau = sigma_CV / (sqrt(n) * log(n)) = {threshold:.6g}") print(f" CV outcome lambda = {lam_out_cv:.4e}") print(f" undersmoothed lambda = {lam_und:.4e}") print(f" {'lambda':>13} {'|mean EIF|':>12} {'<= tau':>7} note") print(" " + "-" * 54) for lam in lams: a = path_abs[lam] ok = "yes" if a <= threshold else "no" flags = [] if abs(lam - lam_out_cv) <= lam_out_cv * 1e-9: flags.append("CV lambda") if abs(lam - lam_und) <= lam_und * 1e-9: flags.append("<-- selected") print(f" {lam:13.4e} {a:12.6f} {ok:>7} {', '.join(flags)}") print() def _warn_if_cv_edge(best_lambda: float, lambdas, label: str, prefix: str) -> None: """Warn if the CV-selected λ sits on a grid edge (optimum likely outside). A boundary CV λ means the MSE-optimal penalty is outside the searched range, so the nuisance is mis-regularised (and, for the outcome, the undersmoothing region starts from the wrong place). ``prefix`` is the parameter stem (``"prop"`` or ``"out"``) used in the remedial hint. """ lams = np.asarray(lambdas, dtype=float) if lams.size < 2: return lo, hi = float(lams.min()), float(lams.max()) if abs(np.log(best_lambda) - np.log(hi)) < 1e-6: side, knob = "upper", f"raise log_lambda_{prefix}_max" elif abs(np.log(best_lambda) - np.log(lo)) < 1e-6: side, knob = "lower", f"lower log_lambda_{prefix}_min" else: return warnings.warn( f"ate_hapc: {label} CV λ is at the {side} edge of its grid " f"(λ={best_lambda:.3g}); the CV optimum is likely outside the range — " f"{knob} so CV selects an interior λ.", stacklevel=3, ) def _coerce_binary(A: np.ndarray) -> np.ndarray: """Return ``A`` re-encoded as floats in ``{0,1}``. Accepts ``{0,1}``, ``{-1,+1}`` (or any pair where one value is non-positive and one positive — falls back to the sign). """ A = np.asarray(A).ravel() u = set(np.unique(A).tolist()) if u.issubset({0, 1, 0.0, 1.0}): return A.astype(np.float64) if u.issubset({-1, 1, -1.0, 1.0}): return ((A > 0).astype(np.float64)) raise ValueError( f"A must be binary in {{0,1}} or {{-1,+1}}; found {sorted(u)}" ) def _make_folds(n: int, K: int, strat: np.ndarray, seed: int) -> np.ndarray: """Stratified K-fold assignment in ``{0,…,K-1}`` (balanced within each arm). Stratifying on the treatment ``strat`` keeps both arms present in every training split, which the propensity/outcome fits and positivity require. Deterministic given ``seed``. """ rng = np.random.default_rng(seed) folds = np.empty(n, dtype=np.int64) for cls in (0.0, 1.0): idx = np.where(strat == cls)[0] rng.shuffle(idx) folds[idx] = np.arange(idx.size) % K return folds def _ate_crossfit( X: np.ndarray, Y: np.ndarray, A01: np.ndarray, n: int, *, alpha: float, max_degree: int, npcs: int, log_lambda_prop_min: float, log_lambda_prop_max: float, grid_length_prop: int, log_lambda_out_min: float, log_lambda_out_max: float, grid_length_out: int, nfolds: int, norm: str, max_iter: int, tol: float, step_factor: float, verbose: bool, crit: str, center: bool, approx: bool, ini: str, cf_folds: int, cf_seed: int, ) -> "ATEResult": """Cross-fitted (DML-style) AIPW ATE; counterpart to the undersmoothing path. Both nuisances (propensity ``A~W`` and outcome ``Y~(A,W)``) are CV-selected **and** fit on the training folds, then evaluated on the held-out fold, so the efficient influence function is an honest out-of-sample quantity. This removes the in-sample bias the undersmoothing path tries to control, and the EIF variance is no longer underestimated. """ if cf_folds < 2: raise ValueError(f"cf_folds must be >= 2; got {cf_folds}") if min(int(A01.sum()), int(n - A01.sum())) < cf_folds: raise ValueError( f"cf_folds={cf_folds} exceeds the rarer treatment arm's count; " "use fewer folds or more data." ) folds = _make_folds(n, cf_folds, A01, cf_seed) pi1 = np.empty(n, dtype=np.float64) mu1 = np.empty(n, dtype=np.float64) mu0 = np.empty(n, dtype=np.float64) cv_kwargs = dict( max_degree=max_degree, nfolds=nfolds, norm=norm, max_iter=max_iter, tol=tol, step_factor=step_factor, verbose=verbose, crit=crit, center=center, approx=approx, ini=ini, ) fit_kwargs = dict( max_iter=max_iter, tol=tol, step_factor=step_factor, verbose=verbose, crit=crit, center=center, approx=approx, ini=ini, ) for k in range(cf_folds): te = np.where(folds == k)[0] tr = np.where(folds != k)[0] if te.size == 0 or tr.size == 0: raise ValueError(f"Empty cross-fitting fold {k}; reduce cf_folds.") # Cap PCs at the training-fold rank budget. npcs_k = max(1, min(int(npcs), tr.size - 1)) Xtr, Xte = X[tr], X[te] # --- propensity: CV + refit on training folds, predict on held-out ---- cv_prop = cv_hapc( Xtr, A01[tr], family="binomial", log_lambda_min=log_lambda_prop_min, log_lambda_max=log_lambda_prop_max, grid_length=grid_length_prop, npcs=npcs_k, **cv_kwargs, ) prop = _hapc( Xtr, A01[tr], family="binomial", max_degree=max_degree, npcs=npcs_k, lambda_=float(cv_prop.best_lambda), norm=norm, predict=Xte, **fit_kwargs, ) pi1[te] = np.clip(np.asarray(prop.probabilities).ravel(), 1e-8, 1 - 1e-8) # --- outcome: CV + refit on training folds, predict both arms --------- Xout_tr = np.column_stack([A01[tr], Xtr]) cv_out = cv_hapc( Xout_tr, Y[tr], family="gaussian", log_lambda_min=log_lambda_out_min, log_lambda_max=log_lambda_out_max, grid_length=grid_length_out, npcs=npcs_k, **cv_kwargs, ) m = te.size Xeval = np.vstack([ np.column_stack([np.ones(m), Xte]), np.column_stack([np.zeros(m), Xte]), ]) res = _hapc( Xout_tr, Y[tr], family="gaussian", max_degree=max_degree, npcs=npcs_k, lambda_=float(cv_out.best_lambda), norm=norm, predict=Xeval, **fit_kwargs, ) p = np.asarray(res.predictions).ravel() if p.size != 2 * m: raise RuntimeError( f"Outcome predict returned {p.size} values, expected {2 * m}." ) mu1[te], mu0[te] = p[:m], p[m:] # --- cross-fitted AIPW point estimate + Wald CI ------------------------- phi = ( (A01 / pi1) * (Y - mu1) + mu1 - ((1.0 - A01) / (1.0 - pi1)) * (Y - mu0) - mu0 ) psi = float(phi.mean()) sigma = float(np.std(phi, ddof=0)) z = float(_normal.ppf(1.0 - alpha / 2.0)) half = z * sigma / np.sqrt(n) return ATEResult(estimate=psi, lower=psi - half, upper=psi + half)
[docs] def ate_hapc(X: np.ndarray, Y: np.ndarray, A: np.ndarray, alpha: float = 0.05, max_degree: int = 1, npcs: Optional[int] = None, log_lambda_prop_min: float = -5, log_lambda_prop_max: float = 2, grid_length_prop: int = 14, log_lambda_out_min: float = -10, log_lambda_out_max: float = 2, grid_length_out: int = 22, nfolds: int = 5, norm: str = "sv", predict: Optional[np.ndarray] = None, max_iter: int = 5000, tol: float = 1e-3, step_factor: float = 0.8, verbose: bool = False, crit: str = "grad", center: bool = True, approx: bool = False, ini: str = "1", plot_diagnostics: bool = False, method: str = "undersmooth", cf_folds: int = 5, cf_seed: int = 0, report_undersmoothing: bool = False) -> ATEResult: """ATE estimate with HAPC nuisances and outcome undersmoothing. Parameters ---------- X : np.ndarray, shape (n, p) Covariate matrix ``W`` (do NOT include the treatment column). Y : np.ndarray, shape (n,) Continuous outcome. A : np.ndarray, shape (n,) Binary treatment in ``{0,1}`` or ``{-1,+1}``. alpha : float, default 0.05 Significance level. The returned interval has confidence ``1 - alpha``. max_degree, npcs, nfolds, norm, predict, max_iter, tol, step_factor,\ verbose, crit, center, approx, ini : Same meaning and defaults as in :func:`hapc.cv_hapc` (except λ grids, see below). ``predict`` is accepted for signature parity with :func:`cv_hapc` and is currently ignored (``ate_hapc`` always evaluates the EIF on the training sample). log_lambda_prop_min, log_lambda_prop_max, grid_length_prop : Equally spaced log-λ grid for **propensity** cross-validation (``A ~ W``, binomial), same rule as :func:`cv_hapc`. log_lambda_out_min, log_lambda_out_max, grid_length_out : Log-λ grid (defaults ``-10``, ``2``, ``22``) for **outcome** cross-validation ``Y ~ (A, W)`` (gaussian) and for the **undersmoothing** scan over ``λ ≤ λ_CV``. The grid deliberately reaches very small λ: for ``method="undersmooth"`` the (nearly) unbiased regime typically sits well below the CV-optimal λ, so a grid that stops too high (e.g. the old ``[e^{-5}, e^{-3}]``) never reaches it and no undersmoothing effectively occurs. Undersmoothing only removes bias when the PC basis is rich enough to represent the outcome surface, so use the **full basis** (``npcs = n``, the default); a truncated basis floors the bias at an approximation level that no λ can remove. plot_diagnostics : bool, default False If True, open a matplotlib figure with (1) propensity CV curve (logistic deviance vs λ), (2) outcome CV curve (MSE vs λ), and (3) the undersmoothing path: ``|mean(EIF)|`` vs outcome λ with the threshold line and vertical markers for the CV and selected undersmoothed λ. Requires ``matplotlib`` (``pip install matplotlib``). Only meaningful for ``method="undersmooth"`` (ignored otherwise). method : {"undersmooth", "crossfit"}, default "undersmooth" Bias-control strategy. * ``"undersmooth"`` — the original single-sample procedure: fit the nuisances on the full sample, evaluate the EIF in-sample, and pick the outcome λ by the undersmoothing gate (Notes). Kept as the default for backward compatibility. * ``"crossfit"`` — DML-style K-fold **cross-fitting**: CV-select and fit both nuisances on the training folds and evaluate the EIF only on the held-out fold. No undersmoothing is performed (CV-optimal λ is used per fold); the outcome λ grid is still used for the per-fold CV. This makes the EIF mean and variance honest, so the AIPW estimate is √n-unbiased and the Wald CI attains nominal coverage without relying on the undersmoothing gate. Prefer this unless you specifically need the single-sample undersmoothed estimator. cf_folds : int, default 5 Number of cross-fitting folds (only used when ``method="crossfit"``). Folds are stratified by treatment so both arms appear in every split. cf_seed : int, default 0 Seed for the (deterministic) cross-fitting fold assignment. report_undersmoothing : bool, default False If True (and ``method="undersmooth"``), print a table to stdout of ``|mean(EIF)|`` versus outcome λ over the undersmoothing region (``λ ≤ λ_CV``), with the threshold ``τ = σ_CV / (√n · log n)``, so the user can confirm the undersmoothing gate fired correctly. Ignored for ``method="crossfit"`` (which evaluates the EIF out-of-fold). Returns ------- ATEResult Named tuple with three fields ``(estimate, lower, upper)``. Notes ----- The procedure is: 1. Cross-validate the propensity ``A ~ W`` (binomial) on its grid and the outcome ``Y ~ (A, W)`` (gaussian) on the outcome grid (independently specified). 2. Fix the propensity at its CV-best λ; refit on the full sample to obtain ``π̂(W_i) = P(A=1 | W_i)``. 3. At the CV-best outcome λ, compute a **plugin-centered** influence vector (same mean as the DR EIF at :math:`\\psi=\\overline{\\mu}_1-\\overline{\\mu}_0`) and let ``σ = std(·)``. 4. Threshold ``τ = σ / (√n · log n)``. 5. Among the **outcome** λ in the undersmoothing region ``λ ≤ λ_CV``, pick the **smallest** λ for which ``|mean(EIF_diff)| ≤ τ`` (the most undersmoothed fit still inside the band) — call it ``λ_u``. If none qualifies, ``λ_u`` minimises ``|mean(EIF_diff)|`` (with a warning). 6. **Doubly robust** point estimate at the **undersmoothed** outcome fit ``(π̂, μ̂₁^{λ_u}, μ̂₀^{λ_u})``: ``ψ̂ = mean(A/π̂·(Y-μ̂₁)+μ̂₁ - (1-A)/(1-π̂)·(Y-μ̂₀) - μ̂₀)``. The CI uses the one-step influence function evaluated at the **CV** outcome fit ``(μ̂₁^{λ_CV}, μ̂₀^{λ_CV})`` (centered at ``ψ̂``): ``φ_i = A_i/π̂_i·(Y_i-μ̂_{1i}) + μ̂_{1i} - (1-A_i)/(1-π̂_i)·(Y_i-μ̂_{0i}) - μ̂_{0i} - ψ̂``, and ``CI: ψ̂ ± z_{1-α/2} · std(φ) / √n``. The CV (rather than undersmoothed) fit is used for ``std(φ)`` because a heavily undersmoothed, near-interpolating fit has artificially small in-sample residuals that collapse the SE and destroy coverage; the CV fit yields honest residuals. (For ``method="crossfit"`` no such issue arises — the EIF is evaluated out-of-fold.) This contrasts with **plug-in** G-computation ``mean(μ̂₁(W)-μ̂₀(W))``, which can be materially biased when both nuisances are estimated on the same sample and the outcome regressions are regularized. The DR ``ψ̂`` is consistent if **either** the propensity **or** the pair ``(μ̂₁, μ̂₀)`` is correctly specified (standard double robustness). Examples -------- >>> import numpy as np >>> from hapc import ate_hapc >>> rng = np.random.default_rng(0) >>> n = 200 >>> W = np.column_stack([rng.uniform(-2, 2, n), rng.normal(0, 0.5, n)]) >>> p = 1.0 / (1.0 + np.exp(-(W[:, 0] + 0.5 * W[:, 1]))) >>> A = rng.binomial(1, p, n) >>> Y = 2 * W[:, 0] + 0.5 + rng.normal(0, 0.5, n) # truth: ATE=0 >>> res = ate_hapc(W, Y, A, alpha=0.05, max_degree=2, npcs=50, ... grid_length_prop=4, grid_length_out=4, nfolds=3, ... norm="2") >>> bool(res.lower <= res.estimate <= res.upper) True """ if not (0.0 < alpha < 1.0): raise ValueError(f"alpha must be in (0,1); got {alpha}") if method not in {"undersmooth", "crossfit"}: raise ValueError( f"method must be 'undersmooth' or 'crossfit'; got '{method}'" ) # --- Coerce inputs ------------------------------------------------------ X = np.ascontiguousarray(np.asarray(X, dtype=np.float64)) if X.ndim != 2: raise ValueError(f"X must be 2-D; got shape {X.shape}") Y = np.asarray(Y, dtype=np.float64).ravel() A01 = _coerce_binary(A) n, _p = X.shape if Y.size != n or A01.size != n: raise ValueError("X, Y, A must all have the same number of rows.") if npcs is None: npcs = int(n) if method == "crossfit": if plot_diagnostics: warnings.warn( "plot_diagnostics is only available for method='undersmooth'; " "ignoring it for method='crossfit'.", stacklevel=2, ) return _ate_crossfit( X, Y, A01, n, alpha=alpha, max_degree=max_degree, npcs=npcs, log_lambda_prop_min=log_lambda_prop_min, log_lambda_prop_max=log_lambda_prop_max, grid_length_prop=grid_length_prop, log_lambda_out_min=log_lambda_out_min, log_lambda_out_max=log_lambda_out_max, grid_length_out=grid_length_out, nfolds=nfolds, norm=norm, max_iter=max_iter, tol=tol, step_factor=step_factor, verbose=verbose, crit=crit, center=center, approx=approx, ini=ini, cf_folds=cf_folds, cf_seed=cf_seed, ) lambdas_out = np.exp( np.linspace(log_lambda_out_min, log_lambda_out_max, grid_length_out)) cv_kwargs_base = dict( max_degree=max_degree, npcs=npcs, nfolds=nfolds, norm=norm, max_iter=max_iter, tol=tol, step_factor=step_factor, verbose=verbose, crit=crit, center=center, approx=approx, ini=ini, ) # --- 1. CV propensity (binomial) --------------------------------------- cv_prop = cv_hapc( X, A01, family="binomial", log_lambda_min=log_lambda_prop_min, log_lambda_max=log_lambda_prop_max, grid_length=grid_length_prop, **cv_kwargs_base, ) lam_prop_cv = float(cv_prop.best_lambda) # Refit propensity at CV λ on full data, predict in-sample probabilities. prop = _hapc( X, A01, family="binomial", max_degree=max_degree, npcs=npcs, lambda_=lam_prop_cv, norm=norm, predict=X, max_iter=max_iter, tol=tol, step_factor=step_factor, verbose=verbose, crit=crit, center=center, approx=approx, ini=ini, ) pi1 = np.clip(np.asarray(prop.probabilities).ravel(), 1e-8, 1 - 1e-8) # --- 2. CV outcome (gaussian on [A,W]) --------------------------------- Xout = np.column_stack([A01, X]) cv_out = cv_hapc( Xout, Y, family="gaussian", log_lambda_min=log_lambda_out_min, log_lambda_max=log_lambda_out_max, grid_length=grid_length_out, **cv_kwargs_base, ) lam_out_cv = float(cv_out.best_lambda) # Stacked design for one-shot prediction at both arms. Xmu1 = np.column_stack([np.ones(n), X]) Xmu0 = np.column_stack([np.zeros(n), X]) Xeval = np.vstack([Xmu1, Xmu0]) def _mu_pair(lam: float): """Refit outcome at λ on full data, return (μ̂_1, μ̂_0) on training W.""" res = _hapc( Xout, Y, family="gaussian", max_degree=max_degree, npcs=npcs, lambda_=float(lam), norm=norm, predict=Xeval, max_iter=max_iter, tol=tol, step_factor=step_factor, verbose=verbose, crit=crit, center=center, approx=approx, ini=ini, ) p = np.asarray(res.predictions).ravel() if p.size != 2 * n: raise RuntimeError( f"Outcome predict returned {p.size} values, expected {2 * n}." ) return p[:n], p[n:] def _eif_plugin_centered(mu1: np.ndarray, mu0: np.ndarray) -> np.ndarray: """Plugin-centered influence vector (undersmoothing gate only). Its mean matches the DR EIF evaluated at plug-in :math:`\\psi=\\overline{\\mu}_1-\\overline{\\mu}_0`. The returned ATE uses ``_psi_dr`` / ``_eif_dr`` instead. """ eif1 = (A01 / pi1) * (Y - mu1) - (mu1 - mu1.mean()) eif0 = ((1.0 - A01) / (1.0 - pi1)) * (Y - mu0) - (mu0 - mu0.mean()) return eif1 - eif0 def _psi_dr(mu1: np.ndarray, mu0: np.ndarray) -> float: return float( np.mean( (A01 / pi1) * (Y - mu1) + mu1 - ((1.0 - A01) / (1.0 - pi1)) * (Y - mu0) - mu0 ) ) def _eif_dr(mu1: np.ndarray, mu0: np.ndarray, psi: float) -> np.ndarray: return ( (A01 / pi1) * (Y - mu1) + mu1 - ((1.0 - A01) / (1.0 - pi1)) * (Y - mu0) - mu0 - psi ) # --- 3. σ at CV configuration → threshold τ ---------------------------- mu1_cv, mu0_cv = _mu_pair(lam_out_cv) eif_cv = _eif_plugin_centered(mu1_cv, mu0_cv) sigma_cv = float(np.std(eif_cv, ddof=0)) threshold = sigma_cv / (np.sqrt(n) * np.log(n)) # --- 4. Undersmoothing sweep over the undersmoothing region λ ≤ λ_CV ---- # Faithful to §8.3: starting at the CV λ, decrease λ (more flexible outcome # fit) and select the *smallest* λ whose |mean(EIF)| ≤ τ — the most # undersmoothed fit still inside the band. Only λ ≤ λ_CV are eligible; # larger λ would be *over*-smoothing, not undersmoothing. The EIF is # evaluated in-sample, so the gate is fragile: the warnings below flag the # degenerate regimes (no undersmoothing region, band never met, the CV fit # already inside the band, or undersmoothing pinned to the grid edge). und_lams = np.sort(lambdas_out[lambdas_out <= lam_out_cv * (1.0 + 1e-9)]) if und_lams.size == 0: # numerical safety; CV λ is always a grid point und_lams = np.array([lam_out_cv], dtype=float) path_abs: dict = {} path_mu: dict = {} for lam in und_lams: try: mu1, mu0 = _mu_pair(float(lam)) except Exception: continue path_mu[float(lam)] = (mu1, mu0) path_abs[float(lam)] = float(abs(_eif_plugin_centered(mu1, mu0).mean())) if not path_mu: raise RuntimeError("Outcome model failed to fit on the entire λ grid.") fitted = sorted(path_mu) # ascending λ within = [lam for lam in fitted if path_abs[lam] <= threshold] if within: lam_und = float(min(within)) # smallest λ inside the band (§8.3 rule) if lam_und >= lam_out_cv * (1.0 - 1e-9): warnings.warn( "ate_hapc(method='undersmooth'): the CV outcome fit already " "satisfies the EIF threshold τ, so no undersmoothing occurred " "and any in-sample plug-in/DR bias is left uncorrected. Prefer " "method='crossfit', or lower log_lambda_out_min.", stacklevel=2, ) elif len(fitted) > 1 and lam_und <= float(fitted[0]) * (1.0 + 1e-9): warnings.warn( "ate_hapc(method='undersmooth'): undersmoothing reached the " "smallest outcome λ in the grid (the band stays satisfied down " "to the grid edge), which may over-undersmooth toward " "interpolation. Inspect plot_diagnostics or prefer " "method='crossfit'.", stacklevel=2, ) else: # Band never reached in the undersmoothing region → best effort. lam_und = float(min(path_abs, key=path_abs.get)) warnings.warn( "ate_hapc(method='undersmooth'): undersmoothing never brought " "|mean(EIF)| within the threshold τ along the outcome λ grid; using " "the λ that minimises it. The in-sample EIF gate is uninformative " "here — prefer method='crossfit'.", stacklevel=2, ) mu1_und, mu0_und = path_mu[lam_und] if report_undersmoothing: _print_undersmoothing_table( path_abs, threshold, sigma_cv, n, lam_out_cv, lam_und, ) if plot_diagnostics: t_lams: list[float] = [] t_abs: list[float] = [] for lam in np.sort(lambdas_out): try: mu1, mu0 = _mu_pair(float(lam)) except Exception: continue eif = _eif_plugin_centered(mu1, mu0) t_lams.append(float(lam)) t_abs.append(float(np.abs(eif.mean()))) _plot_ate_diagnostics( cv_prop, cv_out, np.asarray(t_lams), np.asarray(t_abs), lam_prop_cv, lam_out_cv, lam_und, threshold, ) # --- 5. Doubly robust point estimate + (1 - alpha) Wald CI -------------- # Point estimate at the undersmoothed fit (low bias), but the SE is the # std of the EIF at the **CV** outcome fit, not the undersmoothed one: a # heavily undersmoothed (near-interpolating) fit has artificially small # in-sample residuals (Y − μ̂) that collapse std(EIF) and destroy coverage. # The CV fit gives honest residuals, so std(EIF) is a consistent estimate # of the efficient SE. (When no undersmoothing occurs the two coincide.) psi = _psi_dr(mu1_und, mu0_und) eif_se = _eif_dr(mu1_cv, mu0_cv, psi) sigma = float(np.std(eif_se, ddof=0)) z = float(_normal.ppf(1.0 - alpha / 2.0)) half = z * sigma / np.sqrt(n) return ATEResult(estimate=psi, lower=psi - half, upper=psi + half)
__all__ = ["ATEResult", "ate_hapc"]