Source code for stereocomplex.physics.model_selection

from __future__ import annotations

from dataclasses import dataclass
from typing import Any

import numpy as np

from stereocomplex.physics.base import PhysicalRayFieldModel
from stereocomplex.physics.central_models import CentralBrownConradyModel, CentralPinholeModel
from stereocomplex.physics.parallel_plate_fit import (
    PinholeParallelPlateModel,
    rayfield_two_plane_residuals,
)


[docs] @dataclass(frozen=True) class PhysicalModelSpec: """Specification for one physical candidate to fit or score.""" name: str model_class: type initial_parameters: np.ndarray bounds: tuple[np.ndarray, np.ndarray] | None = None model_kwargs: dict[str, Any] | None = None
[docs] @dataclass(frozen=True) class PhysicalModelFitResult: """Fit metrics and optimal parameters for one physical rayfield model.""" model_name: str model: PhysicalRayFieldModel success: bool message: str n_parameters: int n_samples: int n_residual_scalars: int rss: float rms_mm: float median_mm: float p95_mm: float support_rms_mm: float full_grid_rms_mm: float aic: float bic: float parameter_vector: np.ndarray parameter_dict: dict[str, Any]
[docs] @dataclass(frozen=True) class OpticalModelSelectionReport: """AIC/BIC/RMS comparison across fitted optical model candidates.""" target_name: str candidates: list[PhysicalModelFitResult] best_by_aic: str best_by_bic: str best_by_rms: str warnings: list[str]
[docs] def rows(self) -> list[dict[str, float | int | str | bool]]: """Return model comparison rows as a list of dicts (suitable for DataFrame).""" return [ { "model": c.model_name, "parameters": c.n_parameters, "rms_mm": c.rms_mm, "support_rms_mm": c.support_rms_mm, "full_grid_rms_mm": c.full_grid_rms_mm, "bic": c.bic, "aic": c.aic, "selected_bic": c.model_name == self.best_by_bic, } for c in self.candidates ]
[docs] @dataclass(frozen=True) class MultiChannelOpticalModelSelectionReport: """Aggregated model-selection report across named camera channels.""" channel_reports: dict[str, OpticalModelSelectionReport] best_by_bic: str bic_by_model: dict[str, float] @property def channel_names(self) -> tuple[str, ...]: """Return channel name strings.""" return tuple(self.channel_reports) @property def n_channels(self) -> int: """Number of channels in the rayfield.""" return len(self.channel_reports)
[docs] def rows(self) -> list[dict[str, float | int | str | bool]]: """Return model comparison rows as a list of dicts (suitable for DataFrame).""" return [ { "model": model, "bic": bic, "selected_bic": model == self.best_by_bic, "n_channels": self.n_channels, } for model, bic in sorted(self.bic_by_model.items()) ]
[docs] def aggregate_model_selection_reports( channel_reports: dict[str, OpticalModelSelectionReport], ) -> MultiChannelOpticalModelSelectionReport: """Aggregate per-channel model-selection reports by summing BIC values.""" if not channel_reports: raise ValueError("at least one channel report is required") common_models: set[str] | None = None for report in channel_reports.values(): names = {candidate.model_name for candidate in report.candidates} common_models = names if common_models is None else common_models.intersection(names) if not common_models: raise ValueError("channel reports do not share any candidate model names") bic_by_model: dict[str, float] = {} for model_name in sorted(common_models): bic_by_model[model_name] = float( sum( next( candidate.bic for candidate in report.candidates if candidate.model_name == model_name ) for report in channel_reports.values() ) ) best_by_bic = min(bic_by_model, key=bic_by_model.__getitem__) return MultiChannelOpticalModelSelectionReport( channel_reports=channel_reports, best_by_bic=best_by_bic, bic_by_model=bic_by_model, )
[docs] def reprojection_guard_penalty( px_rms: float, *, threshold_px: float = 1.5, n_pixel_observations: int, hard_penalty: float = 1.0e6, alpha: float = 1.0, ) -> float: """Penalty term for models exceeding a pixel reprojection threshold. Returns 0 if ``px_rms <= threshold_px``. Otherwise returns a hard barrier plus a log-ratio term that ranks non-usable models among themselves:: penalty = hard_penalty + alpha * n_obs * log((px_rms / threshold)^2) This is designed to be added to a ray-space BIC to produce an operational ``bic_usable`` that enforces a usability constraint. Notes ----- This penalty is not a statistical likelihood term. It is an operational guard: a model whose direct pixel reprojection RMS is above ``threshold_px`` is considered non-usable for calibration, even if it explains the rayfield compactly. """ px = float(px_rms) if not np.isfinite(px): return float(hard_penalty) if px <= threshold_px: return 0.0 ratio2 = (px / float(threshold_px)) ** 2 return float(hard_penalty) + float(alpha) * float(n_pixel_observations) * np.log( max(ratio2, 1.0 + 1e-10) )
[docs] def usable_bic( bic_ray: float, px_rms: float, *, threshold_px: float = 1.5, n_pixel_observations: int, hard_penalty: float = 1.0e6, alpha: float = 1.0, ) -> float: """Return ray-space BIC plus an operational pixel-RMS usability guard. ``bic_ray`` remains the Gaussian BIC computed from ray-space residuals. ``usable_bic`` adds a hard penalty when the same model is not usable in direct reprojection, according to ``threshold_px``. This deliberately separates two questions: - ray-space BIC: which optical family explains the measured rayfield? - usable BIC: which model is accurate enough for calibration use? """ return float(bic_ray) + reprojection_guard_penalty( px_rms, threshold_px=threshold_px, n_pixel_observations=n_pixel_observations, hard_penalty=hard_penalty, alpha=alpha, )
def _grid_pixels(image_size: tuple[int, int], grid_shape: tuple[int, int]) -> np.ndarray: width, height = image_size nx, ny = grid_shape u = np.linspace(0.0, float(width - 1), int(nx)) v = np.linspace(0.0, float(height - 1), int(ny)) uu, vv = np.meshgrid(u, v) return np.column_stack([uu.reshape(-1), vv.reshape(-1)]) def _residual_norm_stats(residuals: np.ndarray) -> tuple[float, float, float]: r = np.asarray(residuals, dtype=np.float64).reshape(-1, 6) norms = np.linalg.norm(r, axis=1) return ( float(np.sqrt(np.mean(norms**2))), float(np.median(norms)), float(np.percentile(norms, 95)), ) def _aic_bic( rss: float, n_residual_scalars: int, n_observations: int, p: int ) -> tuple[float, float]: """Return Gaussian AIC/BIC for the weighted ray-space objective. The likelihood term uses the scalar residual count because the two-plane ray distance contributes six metric residual components per sampled pixel. The BIC parameter penalty uses the number of sampled pixel observations, which is the defensible independent-sample count for model selection. """ rss_per_scalar = max(float(rss) / max(int(n_residual_scalars), 1), 1e-30) n_scalar = float(max(int(n_residual_scalars), 1)) n_obs = float(max(int(n_observations), 1)) aic = 2.0 * float(p) + n_scalar * np.log(rss_per_scalar) bic = float(p) * np.log(n_obs) + n_scalar * np.log(rss_per_scalar) return float(aic), float(bic)
[docs] def default_physical_model_specs(*, include_telecentric: bool = False) -> list[PhysicalModelSpec]: """Return the standard candidate set used in the parallel-plate notebook. Set *include_telecentric* to True to add the quasi-telecentric CMO candidate. """ specs = [ PhysicalModelSpec( name="central_pinhole", model_class=CentralPinholeModel, initial_parameters=np.zeros(0, dtype=np.float64), ), PhysicalModelSpec( name="central_brown_conrady", model_class=CentralBrownConradyModel, initial_parameters=np.zeros(5, dtype=np.float64), bounds=( np.array([-1.0, -1.0, -0.1, -0.1, -1.0], dtype=np.float64), np.array([1.0, 1.0, 0.1, 0.1, 1.0], dtype=np.float64), ), ), PhysicalModelSpec( name="pinhole_parallel_plate", model_class=PinholeParallelPlateModel, initial_parameters=np.array([0.0, 0.0, 8.0], dtype=np.float64), bounds=( np.array([-30.0, -30.0, 0.0], dtype=np.float64), np.array([30.0, 30.0, 50.0], dtype=np.float64), ), model_kwargs={"eta": 1.5, "d1_mm": 80.0}, ), ] if include_telecentric: from stereocomplex.physics.cmo_physical import CMOTelecentricChannelModel # noqa: PLC0415 specs.append( PhysicalModelSpec( name="cmo_telecentric", model_class=CMOTelecentricChannelModel, initial_parameters=np.array( [62.0, 65.0, 25.0, 1024.0, 1024.0, 62.0, 0.2, 0.06, 0.0, 0.0], dtype=np.float64, ), model_kwargs={"pixel_pitch_mm": 0.0055}, ), ) return specs
def _make_model( model_class: type, x: np.ndarray, K: np.ndarray, model_kwargs: dict[str, Any] ) -> PhysicalRayFieldModel: if hasattr(model_class, "from_parameter_vector"): return model_class.from_parameter_vector(x, K=K, **model_kwargs) return model_class(K=K, **model_kwargs) def _as_stereo_selection_result( *, name: str, left: PhysicalModelFitResult, right: PhysicalModelFitResult, ) -> PhysicalModelFitResult: rss = float(left.rss + right.rss) n_res = int(left.n_residual_scalars + right.n_residual_scalars) n_samples = int(left.n_samples + right.n_samples) p = int(left.n_parameters + right.n_parameters) aic, bic = _aic_bic(rss, n_res, n_samples, p) rms = float(np.sqrt(0.5 * (left.rms_mm**2 + right.rms_mm**2))) support_rms = float(np.sqrt(0.5 * (left.support_rms_mm**2 + right.support_rms_mm**2))) full_rms = float(np.sqrt(0.5 * (left.full_grid_rms_mm**2 + right.full_grid_rms_mm**2))) median = float(0.5 * (left.median_mm + right.median_mm)) p95 = float(max(left.p95_mm, right.p95_mm)) return PhysicalModelFitResult( model_name=name, model=left.model, success=bool(left.success and right.success), message=f"left: {left.message}; right: {right.message}", n_parameters=p, n_samples=n_samples, n_residual_scalars=n_res, rss=rss, rms_mm=rms, median_mm=median, p95_mm=p95, support_rms_mm=support_rms, full_grid_rms_mm=full_rms, aic=aic, bic=bic, parameter_vector=np.r_[left.parameter_vector, right.parameter_vector], parameter_dict={"left": left.parameter_dict, "right": right.parameter_dict}, ) def _as_shared_cmo_selection_result(name: str, result: Any) -> PhysicalModelFitResult: return PhysicalModelFitResult( model_name=name, model=result.model, success=bool(result.success), message=str(result.message), n_parameters=int(result.n_parameters), n_samples=int(result.n_samples), n_residual_scalars=int(result.n_residual_scalars), rss=float(result.rss), rms_mm=float(result.rms_mm), median_mm=float("nan"), p95_mm=float("nan"), support_rms_mm=float(result.rms_mm), full_grid_rms_mm=float(result.rms_mm), aic=float(result.aic), bic=float(result.bic), parameter_vector=np.asarray(result.parameter_vector, dtype=np.float64), parameter_dict=result.parameter_dict, )
[docs] def fit_physical_model_to_rayfield( model_class: type, target_field, K: np.ndarray, image_size: tuple[int, int], initial_parameters: np.ndarray | None = None, bounds: tuple[np.ndarray, np.ndarray] | None = None, z_planes: tuple[float, float] = (100.0, 1000.0), grid_shape: tuple[int, int] = (25, 19), support_pixels: np.ndarray | None = None, support_weight: float = 1.0, full_grid_weight: float = 0.25, robust_loss: str = "huber", max_nfev: int = 2000, name: str | None = None, **model_kwargs, ) -> PhysicalModelFitResult: """Fit a physical model candidate to a measured rayfield in ray space. This is the core model selection routine. Given a measured rayfield and a physical model class, it optimises the model parameters so that the model rays best reproduce the measured rays, then returns a fit result with RMS error and parameter estimates. Parameters ---------- model_class : type Physical model class with ``ray(u,v) -> (origin, direction)``. target_field : ZernikeRayField Measured rayfield to match (origin + direction field). K : ndarray, shape (3, 3) Camera matrix. image_size : (int, int) Sensor dimensions in pixels (width, height). initial_parameters : ndarray, optional Starting parameter vector (defaults to model_class default). bounds : (lo, hi), optional Lower and upper bounds for the parameters. z_planes : (float, float) Two z-planes in mm (default 100, 1000) for ray intersection evaluation. grid_shape : (int, int) Subsampling grid for the full-image residual. support_pixels : ndarray, optional Observed-pixel coords (N, 2) whose ray gap is up-weighted. support_weight : float Relative weight of support-pixel residuals. full_grid_weight : float Relative weight of the full-grid residual term. robust_loss : str ``"huber"`` or ``"soft_l1"`` — robust loss for least_squares. max_nfev : int Maximum number of function evaluations. name : str, optional Human-readable model name for reporting. **model_kwargs Additional keyword arguments forwarded to the model constructor. Returns ------- PhysicalModelFitResult Dataclass result object with ``parameter_vector`` (optimal parameters), ``rms_mm``, ``success``, and ``model`` (the fitted instance). """ from scipy.optimize import least_squares # type: ignore K_arr = np.asarray(K, dtype=np.float64).reshape(3, 3) full_pixels = _grid_pixels(image_size, grid_shape) if support_pixels is None: support = full_pixels include_full_grid = False else: support = np.asarray(support_pixels, dtype=np.float64).reshape(-1, 2) include_full_grid = full_grid_weight > 0 if support.size == 0: raise ValueError("support_pixels must not be empty") if support_weight <= 0: raise ValueError("support_weight must be strictly positive") if full_grid_weight < 0: raise ValueError("full_grid_weight must be non-negative") if initial_parameters is None: initial_parameters = np.zeros(0, dtype=np.float64) x0 = np.asarray(initial_parameters, dtype=np.float64).reshape(-1) p = int(x0.size) def model_at(x: np.ndarray) -> PhysicalRayFieldModel: """Return a copy of the model evaluated at a specific parameter vector.""" return _make_model(model_class, x, K_arr, model_kwargs) def combined_residuals(x: np.ndarray) -> np.ndarray: """Combine support-grid and optional full-grid ray-space residuals.""" model = model_at(x) blocks = [ float(support_weight) * rayfield_two_plane_residuals(target_field, model, support, z_planes=z_planes), ] if include_full_grid: blocks.append( float(full_grid_weight) * rayfield_two_plane_residuals(target_field, model, full_pixels, z_planes=z_planes) ) return np.concatenate(blocks) if p == 0: sol_x = x0 success = True message = "zero-parameter model scored without optimization" else: if bounds is None: bounds = (-np.inf * np.ones_like(x0), np.inf * np.ones_like(x0)) sol = least_squares( combined_residuals, x0=x0, bounds=bounds, loss=robust_loss, f_scale=1.0, max_nfev=int(max_nfev), xtol=1e-10, ftol=1e-10, gtol=1e-10, ) sol_x = np.asarray(sol.x, dtype=np.float64) success = bool(sol.success) message = str(sol.message) fitted_model = model_at(sol_x) combined = combined_residuals(sol_x) rss = float(np.sum(combined * combined)) n_res = int(combined.size) n_samples = int(support.shape[0] + (full_pixels.shape[0] if include_full_grid else 0)) aic, bic = _aic_bic(rss, n_res, n_samples, p) # Slice combined to recover per-region unweighted residuals without extra # rayfield evaluations. Layout: [sw*support | fgw*full] (when fgw > 0). n_support_flat = support.shape[0] * 6 support_stats = _residual_norm_stats(combined[:n_support_flat] / float(support_weight)) rms, median, p95 = support_stats support_rms = support_stats[0] if include_full_grid: full_rms = _residual_norm_stats(combined[n_support_flat:] / float(full_grid_weight))[0] elif support_pixels is None: full_rms = support_rms else: full_rms = _residual_norm_stats( rayfield_two_plane_residuals(target_field, fitted_model, full_pixels, z_planes=z_planes) )[0] parameter_dict = ( fitted_model.parameter_dict() if hasattr(fitted_model, "parameter_dict") else {} ) model_name = ( name if name is not None else str(getattr(fitted_model, "name", model_class.__name__)) ) return PhysicalModelFitResult( model_name=model_name, model=fitted_model, success=success, message=message, n_parameters=p, n_samples=n_samples, n_residual_scalars=n_res, rss=rss, rms_mm=rms, median_mm=median, p95_mm=p95, support_rms_mm=support_rms, full_grid_rms_mm=full_rms, aic=aic, bic=bic, parameter_vector=sol_x, parameter_dict=parameter_dict, )
[docs] def select_physical_model_from_rayfield( target_field, candidate_specs: list[PhysicalModelSpec] | None, K: np.ndarray, image_size: tuple[int, int], z_planes: tuple[float, float] = (100.0, 1000.0), grid_shape: tuple[int, int] = (25, 19), support_pixels: np.ndarray | None = None, support_weight: float = 1.0, full_grid_weight: float = 0.25, max_nfev: int = 2000, target_right=None, K_right: np.ndarray | None = None, support_pixels_right: np.ndarray | None = None, ) -> OpticalModelSelectionReport: """Fit candidate physical models and select the best in ray space. With only ``target_field`` this keeps the historical single-channel behavior. When ``target_right`` is provided, non-shared candidates are fitted independently to both channels and aggregated, while stereo-shared candidates such as ``CMOPhysicalStereoModel`` are fitted once with their shared parameter vector. """ from stereocomplex.physics.cmo_physical import ( CMOPhysicalStereoModel, fit_cmo_physical_stereo_model_to_rayfields, ) specs = default_physical_model_specs() if candidate_specs is None else list(candidate_specs) results: list[PhysicalModelFitResult] = [] warnings: list[str] = [] K_r = np.asarray(K if K_right is None else K_right, dtype=np.float64).reshape(3, 3) stereo_mode = target_right is not None for spec in specs: kwargs = dict(spec.model_kwargs or {}) try: if stereo_mode and bool(getattr(spec.model_class, "is_stereo_shared", False)): pixel_pitch = kwargs.pop("pixel_pitch_mm", None) if pixel_pitch is None: raise ValueError("pixel_pitch_mm is required for stereo-shared CMO selection") if ( spec.model_class is CMOPhysicalStereoModel or spec.model_class.__name__ == "CMOPhysicalStereoModel" ): result = _as_shared_cmo_selection_result( spec.name, fit_cmo_physical_stereo_model_to_rayfields( left_field=target_field, right_field=target_right, image_size=image_size, initial_parameters=spec.initial_parameters, bounds=spec.bounds, pixel_pitch_mm=float(pixel_pitch), z_planes=z_planes, grid_shape=grid_shape, support_pixels_left=support_pixels, support_pixels_right=support_pixels_right, support_weight=support_weight, full_grid_weight=full_grid_weight, max_nfev=max_nfev, ), ) elif spec.model_class.__name__ in ( "CMOTelecentricStereoModel", "CMOTelecentricChannelModel", ): from stereocomplex.physics.cmo_physical import ( fit_cmo_telecentric_model_to_rayfields, ) result = _as_shared_cmo_selection_result( spec.name, fit_cmo_telecentric_model_to_rayfields( left_field=target_field, right_field=target_right, image_size=image_size, initial_parameters=spec.initial_parameters, pixel_pitch_mm=float(pixel_pitch), z_planes=z_planes, grid_shape=grid_shape, full_grid_weight=full_grid_weight, max_nfev=max_nfev, ), ) else: raise NotImplementedError( f"stereo-shared dispatch is not implemented for {spec.model_class}" ) elif stereo_mode: left = fit_physical_model_to_rayfield( model_class=spec.model_class, target_field=target_field, K=K, image_size=image_size, initial_parameters=spec.initial_parameters, bounds=spec.bounds, z_planes=z_planes, grid_shape=grid_shape, support_pixels=support_pixels, support_weight=support_weight, full_grid_weight=full_grid_weight, max_nfev=max_nfev, name=f"{spec.name}_left", **kwargs, ) right = fit_physical_model_to_rayfield( model_class=spec.model_class, target_field=target_right, K=K_r, image_size=image_size, initial_parameters=spec.initial_parameters, bounds=spec.bounds, z_planes=z_planes, grid_shape=grid_shape, support_pixels=support_pixels_right, support_weight=support_weight, full_grid_weight=full_grid_weight, max_nfev=max_nfev, name=f"{spec.name}_right", **kwargs, ) result = _as_stereo_selection_result(name=spec.name, left=left, right=right) else: result = fit_physical_model_to_rayfield( model_class=spec.model_class, target_field=target_field, K=K, image_size=image_size, initial_parameters=spec.initial_parameters, bounds=spec.bounds, z_planes=z_planes, grid_shape=grid_shape, support_pixels=support_pixels, support_weight=support_weight, full_grid_weight=full_grid_weight, max_nfev=max_nfev, name=spec.name, **kwargs, ) except Exception as exc: # pragma: no cover - surfaced in report for users warnings.append(f"{spec.name} failed: {exc}") continue results.append(result) if not results: raise RuntimeError("all physical candidate fits failed") return OpticalModelSelectionReport( target_name=str(getattr(target_field, "name", target_field.__class__.__name__)), candidates=results, best_by_aic=min(results, key=lambda r: r.aic).model_name, best_by_bic=min(results, key=lambda r: r.bic).model_name, best_by_rms=min(results, key=lambda r: r.rms_mm).model_name, warnings=warnings, )
__all__ = [ "OpticalModelSelectionReport", "PhysicalModelFitResult", "PhysicalModelSpec", "default_physical_model_specs", "fit_physical_model_to_rayfield", "reprojection_guard_penalty", "select_physical_model_from_rayfield", "usable_bic", ]