Source code for stereocomplex.physics.cmo_physical

"""Geometrically constrained Common Main Objective stereo model.

This module implements a compact paraxial CMO model for ray-space model
selection and bundle-adjustment experiments. It is intentionally distinct from
the polynomial non-central surrogate in :mod:`stereocomplex.physics.cmo`: the
two channels share a main-objective geometry, and their chief rays converge at
the working plane instead of being represented by independent origins.

Scientific background:

* Olympus US 7,564,619 describes a CMO architecture with decentered aperture
  stops near the image-side focal plane of the common main objective.
* Wang et al., Optics and Lasers in Engineering 134 (2020), describe
  common-main-objective stereo microscopes as having parallel optical paths and
  image planes parallel to the focal plane.
* Schreier, Garcia and Sutton, Experimental Mechanics 44(3), 278-288 (2004),
  and Pan, Wang and Cheng, Optics Express 22(15), 18373-18387 (2014), discuss
  calibration and 3D measurement with stereo light microscopes.

The implementation is a first-order ray model, not a full lens-design model.
It is meant to test whether a measured rayfield is compatible with a compact
CMO-like shared-rig parameterization.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Literal

import numpy as np
from scipy.optimize import least_squares  # type: ignore

from stereocomplex.physics.central_models import undistort_brown_normalized
from stereocomplex.physics.parallel_plate_fit import rayfield_two_plane_residuals

Array = np.ndarray


def transverse_gauge_origin(origin: Array, d: Array) -> Array:
    """Project origin to be transverse to direction (line-representation gauge).

    The physical sub-pupil does NOT satisfy O·d=0.  This function returns
    O_perp = O - (O·d)d, which represents the SAME 3D line as (O,d) but
    with an origin that satisfies the transverse gauge convention used by
    the Zernike rayfield.  Use ONLY for comparison with Zernike fields,
    NOT inside the physical model's ray() method.
    """
    O_arr = np.asarray(origin, dtype=np.float64)
    d_arr = np.asarray(d, dtype=np.float64)
    return O_arr - np.sum(O_arr * d_arr, axis=-1, keepdims=True) * d_arr


def _normalize(v: Array, eps: float = 1e-15) -> Array:
    arr = np.asarray(v, dtype=np.float64)
    n = np.linalg.norm(arr, axis=-1, keepdims=True)
    return arr / np.maximum(n, float(eps))


def _roty(angle_rad: float) -> Array:
    c = float(np.cos(angle_rad))
    s = float(np.sin(angle_rad))
    return np.array([[c, 0.0, s], [0.0, 1.0, 0.0], [-s, 0.0, c]], dtype=np.float64)


def _rotx(angle_rad: float) -> Array:
    c = float(np.cos(angle_rad))
    s = float(np.sin(angle_rad))
    return np.array([[1.0, 0.0, 0.0], [0.0, c, -s], [0.0, s, c]], dtype=np.float64)


def _grid_pixels(image_size: tuple[int, int], grid_shape: tuple[int, int]) -> Array:
    width, height = image_size
    nx, ny = grid_shape
    u = np.linspace(0.0, float(width - 1), int(nx))
    v = np.linspace(0.0, float(height - 1), int(ny))
    uu, vv = np.meshgrid(u, v, indexing="xy")
    return np.column_stack([uu.reshape(-1), vv.reshape(-1)])


def _ray_rms(residuals: Array) -> float:
    r = np.asarray(residuals, dtype=np.float64).reshape(-1, 6)
    return float(np.sqrt(np.mean(np.linalg.norm(r, axis=1) ** 2)))


def _aic_bic(
    rss: float, n_residual_scalars: int, n_observations: int, p: int
) -> tuple[float, float]:
    rss_per_scalar = max(float(rss) / max(int(n_residual_scalars), 1), 1e-30)
    n_scalar = float(max(int(n_residual_scalars), 1))
    n_obs = float(max(int(n_observations), 1))
    return (
        float(2.0 * float(p) + n_scalar * np.log(rss_per_scalar)),
        float(float(p) * np.log(n_obs) + n_scalar * np.log(rss_per_scalar)),
    )


[docs] @dataclass(frozen=True) class CMOPhysicalStereoModel: """Shared-rig paraxial CMO stereo model. The model uses a common main objective with two decentered effective sub-pupils separated by ``b_mm``. For the central pixel, the left and right chief rays converge to the optical axis at ``working_distance_mm``. The chief-ray angle is controlled by ``b_mm / (2 f_obj_mm)`` because the entrance-pupil plane is placed one objective focal length before the working plane. Sensor coordinates are converted to object-plane coordinates through the tube-lens focal length and pixel pitch. The per-channel distortion is an effective direction distortion ``D_c``, parameterized by five Brown-Conrady-like coefficients applied to normalized angular coordinates. It absorbs residual aberrations of the tube lens and main objective; it is not a derivation from one specific physical aberration model. """ f_obj_mm: float working_distance_mm: float b_mm: float f_tube_mm: float cx_principal_px: float cy_principal_px: float pixel_pitch_mm: float theta_axis_tilt_rad: float = 0.0 theta_pitch_rad: float = 0.0 telecentric_offset_mm: float = 0.0 distortion_left: tuple[float, float, float, float, float] = (0.0, 0.0, 0.0, 0.0, 0.0) distortion_right: tuple[float, float, float, float, float] = (0.0, 0.0, 0.0, 0.0, 0.0) image_size: tuple[int, int] | None = None share_principal_point: bool = True delta_cx_diff_px: float = 0.0 delta_cy_diff_px: float = 0.0 name: str = "cmo_physical_stereo" is_stereo_shared: bool = True @property def n_parameters(self) -> int: """Number of free parameters in the flattened parameter vector. Returns 19 when ``share_principal_point=True`` (a single principal point serves both channels) or 21 when each channel carries its own principal-point offset via ``delta_cx_diff_px`` and ``delta_cy_diff_px``. The layout is: 6 shared optical parameters (``f_obj_mm``, ``working_distance_mm``, ``b_mm``, ``f_tube_mm``, ``cx``, ``cy``), optionally 2 principal-point deltas, 2 global tilt angles (``theta_axis_tilt_rad``, ``theta_pitch_rad``), one axial telecentric offset/depth parameter (``telecentric_offset_mm``), and 5 Brown-Conrady distortion coefficients per channel (10 total). """ return 19 if self.share_principal_point else 21
[docs] @classmethod def from_parameter_vector(cls, x: Array, **kwargs) -> CMOPhysicalStereoModel: """Reconstruct the model from a flat parameter vector. Inverse of :meth:`parameter_vector`. The parameter layout depends on ``share_principal_point``: *Shared PP (19 params)* | 0: ``f_obj_mm`` | 1: ``working_distance_mm`` | 2: ``b_mm`` | 3: ``f_tube_mm`` | 4: ``cx_principal_px`` | 5: ``cy_principal_px`` | 6: ``theta_axis_tilt_rad`` | 7: ``theta_pitch_rad`` | 8: ``telecentric_offset_mm`` | 9-13: ``distortion_left`` (k1, k2, p1, p2, k3) | 14-18: ``distortion_right`` *Per-channel PP (21 params)* | 0-5: as above | 6: ``delta_cx_diff_px`` | 7: ``delta_cy_diff_px`` | 8: ``theta_axis_tilt_rad`` | 9: ``theta_pitch_rad`` | 10: ``telecentric_offset_mm`` | 11-15: ``distortion_left`` | 16-20: ``distortion_right`` Parameters ---------- x : ndarray of float, shape (19,) or (21,) Flattened parameter vector. **kwargs Must include ``pixel_pitch_mm`` (float, mm). May include ``image_size``, ``share_principal_point``. Returns ------- CMOPhysicalStereoModel """ arr = np.asarray(x, dtype=np.float64).reshape(-1) if "pixel_pitch_mm" not in kwargs: raise ValueError( "pixel_pitch_mm must be provided as a fixed CMOPhysicalStereoModel parameter" ) share_principal_point = bool(kwargs.get("share_principal_point", True)) expected = 19 if share_principal_point else 21 if arr.size != expected: raise ValueError(f"CMOPhysicalStereoModel expects {expected} parameters") if share_principal_point: delta_cx_diff = 0.0 delta_cy_diff = 0.0 theta_idx = 6 pitch_idx = 7 tele_idx = 8 left_start = 9 else: delta_cx_diff = float(arr[6]) delta_cy_diff = float(arr[7]) theta_idx = 8 pitch_idx = 9 tele_idx = 10 left_start = 11 return cls( f_obj_mm=float(arr[0]), working_distance_mm=float(arr[1]), b_mm=float(arr[2]), f_tube_mm=float(arr[3]), cx_principal_px=float(arr[4]), cy_principal_px=float(arr[5]), pixel_pitch_mm=float(kwargs["pixel_pitch_mm"]), theta_axis_tilt_rad=float(arr[theta_idx]), theta_pitch_rad=float(arr[pitch_idx]), telecentric_offset_mm=float(arr[tele_idx]), distortion_left=tuple(float(v) for v in arr[left_start : left_start + 5]), # type: ignore[arg-type] distortion_right=tuple(float(v) for v in arr[left_start + 5 : left_start + 10]), # type: ignore[arg-type] image_size=kwargs.get("image_size"), share_principal_point=share_principal_point, delta_cx_diff_px=delta_cx_diff, delta_cy_diff_px=delta_cy_diff, )
[docs] def parameter_vector(self) -> Array: """Serialize the model to a flat parameter vector. Inverse of :meth:`from_parameter_vector`. The order matches the layout documented there: shared optical params, optional PP deltas, tilt angles, then left and right distortion tuples. Returns ------- ndarray of float, shape (19,) or (21,) """ common = [ self.f_obj_mm, self.working_distance_mm, self.b_mm, self.f_tube_mm, self.cx_principal_px, self.cy_principal_px, ] if not self.share_principal_point: common.extend([self.delta_cx_diff_px, self.delta_cy_diff_px]) common.append(self.theta_axis_tilt_rad) common.append(self.theta_pitch_rad) common.append(self.telecentric_offset_mm) return np.r_[ np.asarray(common, dtype=np.float64), np.asarray(self.distortion_left, dtype=np.float64), np.asarray(self.distortion_right, dtype=np.float64), ].astype(np.float64)
[docs] def flat_parameter_dict(self) -> dict[str, float]: """Return all free parameters as a flat dictionary keyed by name. Distortion coefficients are named ``{channel}_{k}`` where *channel* is ``left`` or ``right`` and *k* is one of ``k1, k2, p1, p2, k3`` (Brown-Conrady convention). Principal-point deltas are included only when ``share_principal_point=False``. Returns ------- dict Mapping from parameter name (str) to value (float, in its native unit: mm, px, or rad). """ keys = ("k1", "k2", "p1", "p2", "k3") params = { "f_obj_mm": float(self.f_obj_mm), "working_distance_mm": float(self.working_distance_mm), "b_mm": float(self.b_mm), "f_tube_mm": float(self.f_tube_mm), "cx_principal_px": float(self.cx_principal_px), "cy_principal_px": float(self.cy_principal_px), "theta_axis_tilt_rad": float(self.theta_axis_tilt_rad), "theta_pitch_rad": float(self.theta_pitch_rad), "telecentric_offset_mm": float(self.telecentric_offset_mm), } if not self.share_principal_point: params.update( { "delta_cx_diff_px": float(self.delta_cx_diff_px), "delta_cy_diff_px": float(self.delta_cy_diff_px), } ) params.update( {f"left_{k}": float(v) for k, v in zip(keys, self.distortion_left, strict=True)} ) params.update( {f"right_{k}": float(v) for k, v in zip(keys, self.distortion_right, strict=True)} ) return params
[docs] def parameter_dict(self) -> dict[str, object]: """Return the parameter set split into free and fixed groups. Fixed parameters are quantities treated as known sensor constants: ``pixel_pitch_mm``, ``image_width``, ``image_height``, and ``share_principal_point``. Free parameters are the optical, geometric, and distortion coefficients (see :meth:`flat_parameter_dict`). Returns ------- dict ``{"free": {...}, "fixed": {...}}``. All values carry their native units (mm, px, or rad for free; mm or px for fixed). """ fixed = { "pixel_pitch_mm": float(self.pixel_pitch_mm), "image_width": float(self.image_size[0]) if self.image_size is not None else float("nan"), "image_height": float(self.image_size[1]) if self.image_size is not None else float("nan"), "share_principal_point": bool(self.share_principal_point), } return {"free": self.flat_parameter_dict(), "fixed": fixed}
[docs] def channel(self, channel: Literal["left", "right"]) -> CMOPhysicalChannelModel: """Return a single-channel facade for the given channel name. The returned :class:`CMOPhysicalChannelModel` delegates all ray-tracing and parameter queries to this shared rig, transparently passing the channel identifier. This lets downstream code treat each channel as an independent ray model while the underlying parameters remain coupled through the shared CMO geometry. Parameters ---------- channel : {"left", "right"} Which stereo channel to wrap. Returns ------- CMOPhysicalChannelModel """ return CMOPhysicalChannelModel(rig=self, channel=channel)
[docs] def principal_point_for_channel(self, channel: Literal["left", "right"]) -> tuple[float, float]: """Compute the principal point in pixels for a given channel. When ``share_principal_point=True``, both channels share the same ``(cx_principal_px, cy_principal_px)``. When ``False``, the left channel shifts by ``-0.5 * delta`` and the right by ``+0.5 * delta``, so that ``delta_cx_diff_px`` / ``delta_cy_diff_px`` encode the total separation between the two principal points. Parameters ---------- channel : {"left", "right"} Returns ------- (float, float) Principal-point coordinates ``(cx, cy)`` in px. """ if self.share_principal_point: return float(self.cx_principal_px), float(self.cy_principal_px) sign = -1.0 if channel == "left" else 1.0 return ( float(self.cx_principal_px) + sign * 0.5 * float(self.delta_cx_diff_px), float(self.cy_principal_px) + sign * 0.5 * float(self.delta_cy_diff_px), )
[docs] def ray(self, u: Array, v: Array, channel: Literal["left", "right"]) -> tuple[Array, Array]: """Compute 3-D rays for pixel coordinates through the CMO paraxial model. This is the core optical forward model. For each pixel ``(u, v)`` the pipeline is: 1. Convert pixel displacement from the principal point to angular coordinates via the tube lens: ``alpha = (pixel - principal) * pixel_pitch / f_tube``. 2. Undo Brown-Conrady distortion to obtain undistorted angular coordinates ``(alpha_x, alpha_y)``. 3. Place the effective sub-pupil (ray origin) at ``(±b/2, 0, z_pupil)`` where ``z_pupil = working_distance - f_obj + telecentric_offset``. The sign is negative for the left channel, positive for right. 4. Determine the object-plane intersection at ``(WD * alpha_x, WD * alpha_y, WD)``. 5. The ray direction is the unit vector from pupil to object-plane point. 6. If ``theta_axis_tilt_rad`` or ``theta_pitch_rad`` is non-zero, rotate both origin and direction by the corresponding yaw and pitch matrices. .. warning:: ``f_obj_mm`` and ``telecentric_offset_mm`` are **exactly degenerate** — only their difference enters ``z_pupil``. Do not assert either value independently; use ``working_distance_mm``, ``b_mm``, ``f_tube_mm``, and ``f_obj_mm - telecentric_offset_mm``. Parameters ---------- u : ndarray Horizontal pixel coordinates (column index, 0-based). v : ndarray Vertical pixel coordinates (row index, 0-based). channel : {"left", "right"} Returns ------- origin : ndarray, shape (..., 3) Ray origin in mm (the effective sub-pupil position). direction : ndarray, shape (..., 3) Unit ray direction vector (dimensionless). """ uu, vv = np.broadcast_arrays( np.asarray(u, dtype=np.float64), np.asarray(v, dtype=np.float64) ) shape = uu.shape uf = uu.reshape(-1) vf = vv.reshape(-1) cx, cy = self.principal_point_for_channel(channel) alpha_x_d = (uf - cx) * float(self.pixel_pitch_mm) / float(self.f_tube_mm) alpha_y_d = (vf - cy) * float(self.pixel_pitch_mm) / float(self.f_tube_mm) coeffs = self.distortion_left if channel == "left" else self.distortion_right alpha_x, alpha_y = undistort_brown_normalized(alpha_x_d, alpha_y_d, *coeffs, n_iter=10) sign = -1.0 if channel == "left" else 1.0 z_pupil = ( float(self.working_distance_mm) - float(self.f_obj_mm) + float(self.telecentric_offset_mm) ) pupil = np.column_stack( [ np.full_like(alpha_x, sign * 0.5 * float(self.b_mm)), np.zeros_like(alpha_y), np.full_like(alpha_x, z_pupil), ] ) object_plane_point = np.column_stack( [ float(self.working_distance_mm) * alpha_x, float(self.working_distance_mm) * alpha_y, np.full_like(alpha_x, float(self.working_distance_mm)), ] ) directions = _normalize(object_plane_point - pupil) if self.theta_axis_tilt_rad != 0.0: R = _roty(float(self.theta_axis_tilt_rad)) pupil = (R @ pupil.T).T directions = (R @ directions.T).T if self.theta_pitch_rad != 0.0: R = _rotx(float(self.theta_pitch_rad)) pupil = (R @ pupil.T).T directions = (R @ directions.T).T return pupil.reshape((*shape, 3)), directions.reshape((*shape, 3))
[docs] @dataclass(frozen=True) class CMOPhysicalChannelModel: """Single-channel facade for a shared CMO physical rig.""" rig: CMOPhysicalStereoModel channel: Literal["left", "right"] name: str = "cmo_physical_channel" is_stereo_shared: bool = True @property def n_parameters(self) -> int: """Number of free parameters, delegated to the shared rig. See :attr:`CMOPhysicalStereoModel.n_parameters`. """ return self.rig.n_parameters
[docs] def parameter_vector(self) -> Array: """Serialize to a flat parameter vector, delegated to the shared rig. See :meth:`CMOPhysicalStereoModel.parameter_vector`. """ return self.rig.parameter_vector()
[docs] @classmethod def from_parameter_vector(cls, x: Array, **kwargs) -> CMOPhysicalChannelModel: """Reconstruct a channel model from a flat parameter vector. Builds the shared :class:`CMOPhysicalStereoModel` from *x*, then wraps it with the channel given by ``kwargs["channel"]`` (default ``"left"``). Parameters ---------- x : ndarray of float, shape (19,) or (21,) Flattened parameter vector (see :meth:`CMOPhysicalStereoModel.from_parameter_vector`). **kwargs Must include ``pixel_pitch_mm``. May include ``image_size``, ``share_principal_point``, and ``channel`` (``"left"`` or ``"right"``). Returns ------- CMOPhysicalChannelModel """ rig = CMOPhysicalStereoModel.from_parameter_vector( x, image_size=kwargs.get("image_size"), pixel_pitch_mm=kwargs.get("pixel_pitch_mm"), share_principal_point=kwargs.get("share_principal_point", True), ) channel = kwargs.get("channel", "left") if channel not in {"left", "right"}: raise ValueError("channel must be 'left' or 'right'") return cls(rig=rig, channel=channel)
[docs] def parameter_dict(self) -> dict[str, object]: """Return free/fixed parameter groups, delegated to the shared rig. See :meth:`CMOPhysicalStereoModel.parameter_dict`. """ return self.rig.parameter_dict()
[docs] def ray(self, u: Array, v: Array) -> tuple[Array, Array]: """Compute 3-D rays, delegating to the shared rig with this channel. See :meth:`CMOPhysicalStereoModel.ray`. """ return self.rig.ray(u, v, self.channel)
[docs] def project_point(self, X: Array, max_iter: int = 20) -> tuple[Array, bool]: """Project a 3-D point with the non-tilted paraxial channel approximation. Given a point *X*, find the pixel whose approximate ray passes through it. This helper is intentionally narrower than :meth:`CMOPhysicalStereoModel.ray`: it uses the sub-pupil ``z = working_distance_mm - f_obj_mm`` and does not invert ``telecentric_offset_mm``, ``theta_axis_tilt_rad`` or ``theta_pitch_rad``. Use it only for the legacy non-tilted projection path; use ``ray()`` for the full fitted 19/21-parameter model. Returns ``(uv, ok)`` where *uv* is (2,) and *ok* is True if the point projects inside the sensor. """ import numpy as np from stereocomplex.physics.central_models import ( brown_conrady_distort_normalized, ) X_arr = np.asarray(X, dtype=np.float64).reshape(3) rig = self.rig cx, cy = rig.principal_point_for_channel(self.channel) sign = -1.0 if self.channel == "left" else 1.0 S = np.array([sign * 0.5 * rig.b_mm, 0.0, rig.working_distance_mm - rig.f_obj_mm]) d = X_arr - S if abs(d[2]) < 1e-12: return np.full(2, np.nan), False lam = (rig.working_distance_mm - S[2]) / d[2] P = S + lam * d alpha_x_u = P[0] / rig.working_distance_mm alpha_y_u = P[1] / rig.working_distance_mm coeffs = rig.distortion_left if self.channel == "left" else rig.distortion_right # Apply distortion (forward) to go from undistorted to distorted angular coords alpha_x_d, alpha_y_d = brown_conrady_distort_normalized( np.array([alpha_x_u]), np.array([alpha_y_u]), *coeffs, ) u = cx + alpha_x_d[0] * rig.f_tube_mm / rig.pixel_pitch_mm v = cy + alpha_y_d[0] * rig.f_tube_mm / rig.pixel_pitch_mm W, H = rig.image_size or (int(2 * cx), int(2 * cy)) ok = (0 <= float(u) <= W - 1) and (0 <= float(v) <= H - 1) return np.array([float(u), float(v)]), bool(ok)
[docs] @dataclass(frozen=True) class CMOPhysicalStereoFitResult: """Result of fitting a shared physical CMO model to two rayfields.""" model: CMOPhysicalStereoModel success: bool message: str n_parameters: int n_samples: int n_residual_scalars: int rss: float left_rms_mm: float right_rms_mm: float rms_mm: float aic: float bic: float parameter_vector: Array parameter_dict: dict[str, object]
[docs] def fit_cmo_physical_stereo_model_to_rayfields( left_field, right_field, image_size: tuple[int, int], initial_parameters: Array, bounds: tuple[Array, Array] | None = None, *, pixel_pitch_mm: float, z_planes: tuple[float, float] = (50.0, 250.0), grid_shape: tuple[int, int] = (17, 13), support_pixels_left: Array | None = None, support_pixels_right: Array | None = None, support_weight: float = 1.0, full_grid_weight: float = 0.25, robust_loss: str = "huber", max_nfev: int = 2000, ) -> CMOPhysicalStereoFitResult: """Fit the shared physical CMO rig to left and right measured Zernike rayfields. This entry point fits the shared 19/21-parameter paraxial CMO model: baseline, working distance, objective focal length, tube focal length, principal-point terms, two global tilt angles, one axial telecentric offset/depth parameter, and per-channel Brown-Conrady distortion. It does **not** fit the paper's 26-parameter CMO + per-arm SE(3) bundle-adjustment model. Parameters ---------- left_field : ZernikeRayField Measured left-channel rayfield (origin + direction). right_field : ZernikeRayField Measured right-channel rayfield. image_size : (int, int) Sensor dimensions in pixels (width, height). initial_parameters : ndarray, shape (19,) or (21,) Starting parameter vector. 19 = shared PP, 21 = aligned PP. bounds : tuple of ndarray, optional Lower and upper bounds for the parameters. pixel_pitch_mm : float Sensor pixel pitch in millimetres. z_planes : (float, float) Two z-planes in mm (default 50, 250) where ray intersections are evaluated. grid_shape : (int, int) Subsampling grid for the full-image residual. support_pixels_left, support_pixels_right : ndarray, optional Observed-pixel coords (N, 2) whose ray gap is up-weighted. support_weight : float Relative weight of support-pixel vs full-grid residuals. full_grid_weight : float Relative weight of the full-grid residual term. robust_loss : str ``"huber"`` or ``"soft_l1"`` — robust loss for least_squares. max_nfev : int Maximum number of function evaluations for the optimiser. Returns ------- CMOPhysicalStereoFitResult Dataclass result object with ``parameter_vector`` (optimal params), ``message``, ``success``, ``model`` (the fitted CMO stereo model), and diagnostics. Notes ----- The two-zone loss (sparse support + subsampled full grid) balances fidelity to observation pixels with smooth ray-space behaviour across the image. It is a rayfield-to-model fit, not the final per-arm SE(3) paper BA. """ x0 = np.asarray(initial_parameters, dtype=np.float64).reshape(-1) if x0.size not in {19, 21}: raise ValueError("initial_parameters must contain 19 shared-PP or 21 aligned-PP values") share_principal_point = x0.size == 19 full = _grid_pixels(image_size, grid_shape) support_l = ( full if support_pixels_left is None else np.asarray(support_pixels_left, dtype=np.float64).reshape(-1, 2) ) support_r = ( full if support_pixels_right is None else np.asarray(support_pixels_right, dtype=np.float64).reshape(-1, 2) ) include_full = full_grid_weight > 0 and ( support_pixels_left is not None or support_pixels_right is not None ) def model_at(x: Array) -> CMOPhysicalStereoModel: """Reconstruct a :class:`CMOPhysicalStereoModel` from parameter vector *x*. Convenience closure that supplies the fixed arguments (``image_size``, ``pixel_pitch_mm``, ``share_principal_point``) captured from the enclosing fit context. """ return CMOPhysicalStereoModel.from_parameter_vector( x, image_size=image_size, pixel_pitch_mm=pixel_pitch_mm, share_principal_point=share_principal_point, ) def residuals(x: Array) -> Array: """Evaluate the stacked ray-to-rayfield residual for parameter vector *x*. For each channel, computes the two-plane rayfield residual (:func:`rayfield_two_plane_residuals`) at the support pixel set and, optionally, at the full grid pixels (weighted by ``full_grid_weight``). The left and right residual blocks are concatenated into a single 1-D array for the least-squares solver. """ model = model_at(x) left = model.channel("left") right = model.channel("right") blocks = [ float(support_weight) * rayfield_two_plane_residuals(left_field, left, support_l, z_planes=z_planes), float(support_weight) * rayfield_two_plane_residuals(right_field, right, support_r, z_planes=z_planes), ] if include_full: blocks.extend( [ float(full_grid_weight) * rayfield_two_plane_residuals(left_field, left, full, z_planes=z_planes), float(full_grid_weight) * rayfield_two_plane_residuals(right_field, right, full, z_planes=z_planes), ] ) return np.concatenate(blocks) if bounds is None: if share_principal_point: lower_common = [1.0, 1.0, 0.0, 1.0, -np.inf, -np.inf, -0.25, -0.25, -500.0] upper_common = [500.0, 1000.0, 200.0, 1000.0, np.inf, np.inf, 0.25, 0.25, 500.0] else: lower_common = [ 1.0, 1.0, 0.0, 1.0, -np.inf, -np.inf, -50.0, -50.0, -0.25, -0.25, -500.0, ] upper_common = [ 500.0, 1000.0, 200.0, 1000.0, np.inf, np.inf, 50.0, 50.0, 0.25, 0.25, 500.0, ] lower = np.array( lower_common + [-1.0, -1.0, -0.1, -0.1, -1.0] * 2, dtype=np.float64, ) upper = np.array( upper_common + [1.0, 1.0, 0.1, 0.1, 1.0] * 2, dtype=np.float64, ) bounds = (lower, upper) sol = least_squares( residuals, x0=x0, bounds=bounds, loss=robust_loss, f_scale=1.0, max_nfev=int(max_nfev), xtol=1e-10, ftol=1e-10, gtol=1e-10, ) fitted = model_at(sol.x) left_res = rayfield_two_plane_residuals( left_field, fitted.channel("left"), support_l, z_planes=z_planes ) right_res = rayfield_two_plane_residuals( right_field, fitted.channel("right"), support_r, z_planes=z_planes ) combined = residuals(sol.x) rss = float(np.sum(combined * combined)) n_res = int(combined.size) n_samples = int( support_l.shape[0] + support_r.shape[0] + (2 * full.shape[0] if include_full else 0) ) aic, bic = _aic_bic(rss, n_res, n_samples, fitted.n_parameters) left_rms = _ray_rms(left_res) right_rms = _ray_rms(right_res) return CMOPhysicalStereoFitResult( model=fitted, success=bool(sol.success), message=str(sol.message), n_parameters=fitted.n_parameters, n_samples=n_samples, n_residual_scalars=n_res, rss=rss, left_rms_mm=left_rms, right_rms_mm=right_rms, rms_mm=float(np.sqrt(0.5 * (left_rms**2 + right_rms**2))), aic=aic, bic=bic, parameter_vector=np.asarray(sol.x, dtype=np.float64).copy(), parameter_dict=fitted.parameter_dict(), )
[docs] @dataclass(frozen=True) class CMOTelecentricStereoModel: """Quasi-telecentric CMO model with rigid sub-pupil + affine direction field. Each channel has a **constant origin** :math:`S_c` (the effective sub-pupil) and a direction field that varies **affinely** across pixels. This captures the quasi-telecentric character of infinity-corrected CMO microscopes where the direction is nearly constant across the field of view. The direction field is: .. math:: d_c(u,v) = \\operatorname{normalize}\\left( d_{c,0} + s_x \\tilde{u}\\, e_x + s_y \\tilde{v}\\, e_y \\right) where :math:`\\tilde{u} = (u - c_x) \\cdot p_{\\text{pix}} / f_{\\text{obj}}` and :math:`\\tilde{v} = (v - c_y) \\cdot p_{\\text{pix}} / f_{\\text{obj}}`. """ f_obj_mm: float # for sub-pupil depth: z_pupil = WD - f_obj working_distance_mm: float b_mm: float cx_principal_px: float cy_principal_px: float pixel_pitch_mm: float f_angular_mm: float = 0.0 # pixel→angle normalisation (≈ f_obj, but decoupled) theta_convergence_half_rad: float = 0.0 d_y_common: float = 0.0 s_x_L: float = 0.0 s_y_L: float = 0.0 s_x_R: float = 0.0 s_y_R: float = 0.0 s_xy_L: float = 0.0 # cross-term: d_x += s_xy * ṽ s_yx_L: float = 0.0 # cross-term: d_y += s_yx * ũ s_xy_R: float = 0.0 s_yx_R: float = 0.0 shared_slopes: bool = True # Pupil shear: small affine origin variation (transverse to direction) rho_x_L: float = 0.0 rho_y_L: float = 0.0 rho_x_R: float = 0.0 rho_y_R: float = 0.0 shared_shear: bool = True # Quadratic direction correction (centered, before normalization) q_xx_L: float = 0.0 q_yy_L: float = 0.0 q_xx_R: float = 0.0 q_yy_R: float = 0.0 shared_quadratic: bool = True image_size: tuple[int, int] | None = None name: str = "cmo_telecentric" is_stereo_shared: bool = True # All .ray() methods return rays in the internal OpenCV frame # (u → +X, v → +Y, Z forward). See core/conventions.py. frame_convention: str = "opencv_y_down" @property def n_parameters(self) -> int: """Number of free parameters in the flattened vector. Public parameter vectors have 12, 14, or 16 entries. The 12-parameter variant uses shared slopes and shared shear; disabling shared slopes yields 14 entries, and disabling shared shear yields 16. There is no supported 10-parameter public vector. See :meth:`from_parameter_vector` for the exact layout. """ n = 12 if self.shared_slopes else 14 if not self.shared_shear: n += 2 # per-channel rho if not self.shared_quadratic: n += 2 # per-channel q return n
[docs] @classmethod def from_parameter_vector( cls, x: Array, **kwargs: Any, ) -> CMOTelecentricStereoModel: """Reconstruct the telecentric model from a flat parameter vector. Layout (12 params, shared slopes + shared shear):: 0 f_obj_mm 1 working_distance_mm 2 b_mm 3 cx_principal_px 4 cy_principal_px 5 f_angular_mm 6 theta_convergence_half_rad 7 d_y_common 8 s_x_L (also s_x_R when shared_slopes) 9 s_y_L (also s_y_R when shared_slopes) 10 rho_x_L (also rho_x_R when shared_shear) 11 rho_y_L (also rho_y_R when shared_shear) When ``shared_slopes=False`` (14 params), indices 10-11 are ``s_x_R, s_y_R`` and shear starts at 12. Parameters ---------- x : ndarray of float, shape (12,), (14,), or (16,) **kwargs Must include ``pixel_pitch_mm`` (float, mm). May include ``image_size``. Returns ------- CMOTelecentricStereoModel """ arr = np.asarray(x, dtype=np.float64).reshape(-1) if arr.size not in {12, 14, 16}: raise ValueError( f"CMOTelecentricStereoModel expects 12/14/16 parameters, got {arr.size}" ) sh_slope = arr.size == 12 # shared slopes sh_shear = arr.size in {12, 14} # shared shear (12=shared both, 14=per-ch slopes+sh shear) sh_quad = True # quadratic always shared for now # Slope extraction sxL = float(arr[8]) syL = float(arr[9]) if sh_slope: sxR = sxL syR = syL rho_idx = 10 else: sxR = float(arr[10]) syR = float(arr[11]) rho_idx = 12 # Shear extraction if sh_shear: rxL = float(arr[rho_idx]) ryL = float(arr[rho_idx + 1]) rxR = rxL ryR = ryL q_idx = rho_idx + 2 else: rxL = float(arr[rho_idx]) ryL = float(arr[rho_idx + 1]) rxR = float(arr[rho_idx + 2]) ryR = float(arr[rho_idx + 3]) q_idx = rho_idx + 4 # Quadratic extraction (only if present in vector) if arr.size > q_idx + 1: qxL = float(arr[q_idx]) qyL = float(arr[q_idx + 1]) qxR = qxL qyR = qyL # shared quadratic else: qxL = 0.0 qyL = 0.0 qxR = 0.0 qyR = 0.0 return cls( f_obj_mm=float(arr[0]), working_distance_mm=float(arr[1]), b_mm=float(arr[2]), cx_principal_px=float(arr[3]), cy_principal_px=float(arr[4]), pixel_pitch_mm=float(kwargs["pixel_pitch_mm"]), f_angular_mm=float(arr[5]), theta_convergence_half_rad=float(arr[6]), d_y_common=float(arr[7]), s_x_L=sxL, s_y_L=syL, s_x_R=sxR, s_y_R=syR, shared_slopes=sh_slope, rho_x_L=rxL, rho_y_L=ryL, rho_x_R=rxR, rho_y_R=ryR, shared_shear=sh_shear, q_xx_L=qxL, q_yy_L=qyL, q_xx_R=qxR, q_yy_R=qyR, shared_quadratic=sh_quad, image_size=kwargs.get("image_size"), )
[docs] def parameter_vector(self) -> Array: """Serialize the telecentric model to a flat parameter vector. Inverse of :meth:`from_parameter_vector`. The order is: base optical params (10), optionally per-channel slopes (2), then shear coefficients (2 shared or 4 per-channel). Returns ------- ndarray of float, shape (12,), (14,), or (16,) """ vec = [ self.f_obj_mm, self.working_distance_mm, self.b_mm, self.cx_principal_px, self.cy_principal_px, self.f_angular_mm, self.theta_convergence_half_rad, self.d_y_common, self.s_x_L, self.s_y_L, ] if not self.shared_slopes: vec.extend([self.s_x_R, self.s_y_R]) if self.shared_shear: vec.extend([self.rho_x_L, self.rho_y_L]) else: vec.extend([self.rho_x_L, self.rho_y_L, self.rho_x_R, self.rho_y_R]) return np.array(vec, dtype=np.float64)
[docs] def parameter_dict(self) -> dict[str, object]: """Return free/fixed parameter groups. Fixed keys: ``pixel_pitch_mm``, ``image_width``, ``image_height`` (sensor constants). Free keys include all optical/geometric parameters with ``rho`` consolidated when ``shared_shear=True`` and ``s_x``/``s_y`` consolidated when ``shared_slopes=True``. Returns ------- dict ``{"free": {...}, "fixed": {...}}``. """ free = { "f_obj_mm": float(self.f_obj_mm), "working_distance_mm": float(self.working_distance_mm), "b_mm": float(self.b_mm), "cx_principal_px": float(self.cx_principal_px), "cy_principal_px": float(self.cy_principal_px), "f_angular_mm": float(self.f_angular_mm), "theta_convergence_half_deg": float(np.degrees(self.theta_convergence_half_rad)), "d_y_common": float(self.d_y_common), "s_x_L": float(self.s_x_L), "s_y_L": float(self.s_y_L), "shared_slopes": bool(self.shared_slopes), "shared_shear": bool(self.shared_shear), } if not self.shared_slopes: free["s_x_R"] = float(self.s_x_R) free["s_y_R"] = float(self.s_y_R) else: free["s_x"] = float(self.s_x_L) free["s_y"] = float(self.s_y_L) free["rho_x_L"] = float(self.rho_x_L) free["rho_y_L"] = float(self.rho_y_L) if not self.shared_shear: free["rho_x_R"] = float(self.rho_x_R) free["rho_y_R"] = float(self.rho_y_R) else: free["rho_x"] = float(self.rho_x_L) free["rho_y"] = float(self.rho_y_L) return { "free": free, "fixed": { "pixel_pitch_mm": float(self.pixel_pitch_mm), "image_width": float(self.image_size[0]) if self.image_size else float("nan"), "image_height": float(self.image_size[1]) if self.image_size else float("nan"), }, }
[docs] def channel(self, channel: Literal["left", "right"]) -> CMOTelecentricChannelModel: """Return a single-channel facade for the given channel name. See :meth:`CMOPhysicalStereoModel.channel` — the pattern is identical: the returned :class:`CMOTelecentricChannelModel` delegates to this shared rig. Parameters ---------- channel : {"left", "right"} Returns ------- CMOTelecentricChannelModel """ return CMOTelecentricChannelModel(rig=self, channel=channel)
[docs] def ray(self, u: Array, v: Array, channel: Literal["left", "right"]) -> tuple[Array, Array]: """Compute 3-D rays through the quasi-telecentric CMO model. For each pixel ``(u, v)``: 1. Convert pixel displacement to normalised angular coordinates :math:`\\tilde{u}, \\tilde{v}` using ``f_angular_mm`` (or ``f_obj_mm`` as fallback). 2. Set the chief-ray direction :math:`d_0` from the convergence half-angle ``theta_convergence_half_rad`` and common vertical offset ``d_y_common``. The left channel tilts rightward (:math:`+\\sin\\theta`), the right channel leftward (:math:`-\\sin\\theta`). 3. Apply an affine + cross-term + centered-quadratic correction to the direction's *x* and *y* components, then re-normalize. 4. Place the ray origin at the rigid sub-pupil :math:`(\\pm b/2, 0, z_{\\text{pupil}})` plus a small affine pupil shear :math:`(\\rho_x \\tilde{u}, \\rho_y \\tilde{v}, 0)`. This model captures the quasi-telecentric character of infinity-corrected stereo microscopes: the direction field varies only slowly across the field of view, and the origin is nearly constant. Parameters ---------- u : ndarray Horizontal pixel coordinates (0-based column index). v : ndarray Vertical pixel coordinates (0-based row index). channel : {"left", "right"} Returns ------- origin : ndarray, shape (..., 3) Ray origin in mm at the sub-pupil plane. direction : ndarray, shape (..., 3) Unit ray direction vector (dimensionless). """ uu, vv = np.broadcast_arrays( np.asarray(u, dtype=np.float64), np.asarray(v, dtype=np.float64) ) shape = uu.shape uf, vf = uu.reshape(-1), vv.reshape(-1) # Normalised angular coordinates (using f_angular, decoupled from sub-pupil depth) f_ang = float(self.f_angular_mm) if float(self.f_angular_mm) > 0.1 else float(self.f_obj_mm) tilde_u = (uf - float(self.cx_principal_px)) * float(self.pixel_pitch_mm) / f_ang tilde_v = (vf - float(self.cy_principal_px)) * float(self.pixel_pitch_mm) / f_ang # Chief-ray direction (symmetric X, shared Y) # Left channel: ray from (-b/2, 0, z_pupil) to (0, 0, WD) → d_x > 0 # Right channel: ray from (+b/2, 0, z_pupil) to (0, 0, WD) → d_x < 0 dir_sign = 1.0 if channel == "left" else -1.0 sin_th = float(np.sin(float(self.theta_convergence_half_rad))) cos_th = float(np.cos(float(self.theta_convergence_half_rad))) dy = float(self.d_y_common) d0 = np.column_stack( [ np.full_like(uf, dir_sign * sin_th), np.full_like(uf, dy), np.full_like(uf, cos_th), ] ) # Affine + cross + quadratic correction (per-channel) sx_ch = float(self.s_x_L) if channel == "left" else float(self.s_x_R) sy_ch = float(self.s_y_L) if channel == "left" else float(self.s_y_R) sxy_ch = float(self.s_xy_L) if channel == "left" else float(self.s_xy_R) syx_ch = float(self.s_yx_L) if channel == "left" else float(self.s_yx_R) qx_ch = float(self.q_xx_L) if channel == "left" else float(self.q_xx_R) qy_ch = float(self.q_yy_L) if channel == "left" else float(self.q_yy_R) # Centered quadratics umean2 = float(np.mean(tilde_u**2)) vmean2 = float(np.mean(tilde_v**2)) d_raw = np.column_stack( [ d0[:, 0] + sx_ch * tilde_u + sxy_ch * tilde_v + qx_ch * (tilde_u**2 - umean2), d0[:, 1] + sy_ch * tilde_v + syx_ch * tilde_u + qy_ch * (tilde_v**2 - vmean2), d0[:, 2], ] ) directions = _normalize(d_raw) # Origin: sub-pupil (left at -b/2, right at +b/2) + pupil shear pupil_sign = -1.0 if channel == "left" else 1.0 z_pupil = float(self.working_distance_mm) - float(self.f_obj_mm) rho_x = float(self.rho_x_L) if channel == "left" else float(self.rho_x_R) rho_y = float(self.rho_y_L) if channel == "left" else float(self.rho_y_R) O_rigid = np.column_stack( [ np.full_like(uf, pupil_sign * 0.5 * float(self.b_mm)), np.zeros_like(uf), np.full_like(uf, z_pupil), ] ) # Pupil shear: affine origin variation delta_O = np.column_stack([rho_x * tilde_u, rho_y * tilde_v, np.zeros_like(uf)]) pupil = O_rigid + delta_O # physical origin, NOT gauge-projected return pupil.reshape((*shape, 3)), directions.reshape((*shape, 3))
[docs] @dataclass(frozen=True) class CMOTelecentricNModel: """Named collection of telecentric CMO channels for N-camera workflows.""" channels: tuple[CMOTelecentricChannelModel, ...] name: str = "cmo_telecentric_n" is_stereo_shared: bool = False def __post_init__(self) -> None: if not self.channels: raise ValueError("at least one channel is required") names = [channel.channel for channel in self.channels] if len(set(names)) != len(names): raise ValueError("channel names must be unique")
[docs] @classmethod def from_stereo(cls, stereo: CMOTelecentricStereoModel) -> CMOTelecentricNModel: """Wrap a stereo telecentric model as a 2-channel N-camera container. This is a convenience constructor for the N-camera migration: it takes an existing :class:`CMOTelecentricStereoModel` and returns a :class:`CMOTelecentricNModel` with channels ``("left", "right")``, allowing stereo models to be used wherever an N-channel container is expected. Parameters ---------- stereo : CMOTelecentricStereoModel Returns ------- CMOTelecentricNModel """ return cls((stereo.channel("left"), stereo.channel("right")))
@property def channel_names(self) -> tuple[str, ...]: """Names of all channels, in order. Returns ------- tuple of str """ return tuple(channel.channel for channel in self.channels) @property def n_channels(self) -> int: """Number of channels in the collection. Returns ------- int """ return len(self.channels) @property def n_parameters(self) -> int: """Total free parameters summed across all channels. Returns ------- int """ return sum(channel.n_parameters for channel in self.channels)
[docs] def channel(self, name: str) -> CMOTelecentricChannelModel: """Look up a channel by name. Parameters ---------- name : str Channel name (e.g. ``"left"``). Returns ------- CMOTelecentricChannelModel Raises ------ KeyError If no channel matches *name*. """ for channel in self.channels: if channel.channel == name: return channel raise KeyError(name)
[docs] def ray(self, u: Array, v: Array, channel: str) -> tuple[Array, Array]: """Compute 3-D rays for the named channel. Delegates to the appropriate :class:`CMOTelecentricChannelModel`. Parameters ---------- u : ndarray Horizontal pixel coordinates. v : ndarray Vertical pixel coordinates. channel : str Channel name. Returns ------- origin : ndarray, shape (..., 3) direction : ndarray, shape (..., 3) """ return self.channel(channel).ray(u, v)
[docs] @dataclass(frozen=True) class CMOTelecentricChannelModel: """Single-channel facade for a shared telecentric CMO rig.""" rig: CMOTelecentricStereoModel channel: Literal["left", "right"] name: str = "cmo_telecentric_channel" is_stereo_shared: bool = True @property def n_parameters(self) -> int: """Number of free parameters, delegated to the shared rig. See :attr:`CMOTelecentricStereoModel.n_parameters`. """ return self.rig.n_parameters
[docs] def parameter_vector(self) -> Array: """Serialize to a flat parameter vector, delegated to the shared rig. See :meth:`CMOTelecentricStereoModel.parameter_vector`. """ return self.rig.parameter_vector()
[docs] @classmethod def from_parameter_vector(cls, x: Array, **kwargs: Any) -> CMOTelecentricChannelModel: """Reconstruct a channel model from a flat parameter vector. Builds the shared :class:`CMOTelecentricStereoModel` from *x*, then wraps it with the channel given by ``kwargs["channel"]`` (default ``"left"``). Parameters ---------- x : ndarray of float Flattened parameter vector (see :meth:`CMOTelecentricStereoModel.from_parameter_vector`). **kwargs Must include ``pixel_pitch_mm``. May include ``image_size`` and ``channel``. Returns ------- CMOTelecentricChannelModel """ rig = CMOTelecentricStereoModel.from_parameter_vector( x, pixel_pitch_mm=kwargs["pixel_pitch_mm"], image_size=kwargs.get("image_size"), ) channel = kwargs.get("channel", "left") if channel not in {"left", "right"}: raise ValueError("channel must be 'left' or 'right'") return cls(rig=rig, channel=channel)
[docs] def parameter_dict(self) -> dict[str, object]: """Return free/fixed parameter groups, delegated to the shared rig. See :meth:`CMOTelecentricStereoModel.parameter_dict`. """ return self.rig.parameter_dict()
[docs] def ray(self, u: Array, v: Array) -> tuple[Array, Array]: """Compute 3-D rays, delegating to the shared rig with this channel. See :meth:`CMOTelecentricStereoModel.ray`. """ return self.rig.ray(u, v, self.channel)
[docs] def fit_cmo_telecentric_model_to_rayfields( left_field, right_field, image_size: tuple[int, int], initial_parameters: Array, *, pixel_pitch_mm: float, z_planes: tuple[float, float] = (50.0, 250.0), grid_shape: tuple[int, int] = (17, 13), full_grid_weight: float = 0.0, max_nfev: int = 500, ) -> CMOPhysicalStereoFitResult: """Fit the telecentric CMO model to left and right measured Zernike rayfields. This variant assumes telecentricity in object space, reducing the parameter count compared to the full physical model. It fits baseline, working distance, and per-channel sub-pupil positions while keeping the objective focal length fixed at infinity in object space. Parameters ---------- left_field : ZernikeRayField Measured left-channel rayfield (origin field + direction field). right_field : ZernikeRayField Measured right-channel rayfield. image_size : (int, int) Sensor dimensions in pixels (width, height). initial_parameters : ndarray, shape (12,), (14,), or (16,) Starting parameter vector. The supported variants are shared slopes/shear (12), per-channel slopes with shared shear (14), and per-channel slopes plus per-channel shear (16). pixel_pitch_mm : float Sensor pixel pitch in millimetres. z_planes : (float, float) Two z-planes in mm (default 50, 250) for ray intersection evaluation. grid_shape : (int, int) Subsampling grid (width, height) for the full-image residual. full_grid_weight : float Relative weight of the full-grid residual term (0 = only support pixels). max_nfev : int Maximum number of function evaluations for the optimiser. Returns ------- CMOPhysicalStereoFitResult Dataclass result object with ``parameter_vector`` (optimal parameters), ``message``, ``success``, and ``model`` (the fitted CMOTelecentricStereoModel). """ x0 = np.asarray(initial_parameters, dtype=np.float64).reshape(-1) if x0.size not in {12, 14, 16}: raise ValueError(f"initial_parameters must contain 12, 14, or 16 values, got {x0.size}") full = _grid_pixels(image_size, grid_shape) support_l = full support_r = full include_full = full_grid_weight > 0 def model_at(x: Array) -> CMOTelecentricStereoModel: """Reconstruct a :class:`CMOTelecentricStereoModel` from parameter vector *x*. Convenience closure supplying ``pixel_pitch_mm`` and ``image_size`` captured from the enclosing fit context. """ return CMOTelecentricStereoModel.from_parameter_vector( x, pixel_pitch_mm=pixel_pitch_mm, image_size=image_size ) def residuals(x: Array) -> Array: """Evaluate stacked two-plane rayfield residuals for parameter vector *x*. Computes left and right channel residuals via :func:`rayfield_two_plane_residuals`, optionally adding a full-grid block weighted by ``full_grid_weight``. """ model = model_at(x) left = model.channel("left") right = model.channel("right") blocks = [ rayfield_two_plane_residuals(left_field, left, support_l, z_planes=z_planes), rayfield_two_plane_residuals(right_field, right, support_r, z_planes=z_planes), ] if include_full: blocks.extend( [ float(full_grid_weight) * rayfield_two_plane_residuals(left_field, left, full, z_planes=z_planes), float(full_grid_weight) * rayfield_two_plane_residuals(right_field, right, full, z_planes=z_planes), ] ) return np.concatenate(blocks) # Bounds: physically reasonable ranges, extended for larger parameter vectors. # cx / cy are constrained to the central half of the sensor so the affine # model cannot push the principal point to a corner and compensate with sign # flips in the slope / shear parameters — that degenerate solution inverts # the world-frame Y axis relative to the Zernike reference. cx_center, cy_center = 0.5 * image_size[0], 0.5 * image_size[1] pp_margin = 0.25 * min(image_size) lower = np.array( [1.0, 1.0, 0.0, cx_center - pp_margin, cy_center - pp_margin, 1.0, 0.0, -0.3, -10.0, -10.0], dtype=np.float64 ) upper = np.array( [500.0, 1000.0, 200.0, cx_center + pp_margin, cy_center + pp_margin, 500.0, 0.5, 0.3, 10.0, 10.0], dtype=np.float64 ) if x0.size >= 12: lower = np.concatenate([lower, [-10.0, -10.0]]) upper = np.concatenate([upper, [10.0, 10.0]]) if x0.size >= 14: lower = np.concatenate([lower, [-10.0, -10.0]]) upper = np.concatenate([upper, [10.0, 10.0]]) if x0.size >= 16: lower = np.concatenate([lower, [-10.0, -10.0]]) upper = np.concatenate([upper, [10.0, 10.0]]) sol = least_squares( residuals, x0=x0, bounds=(lower, upper), loss="huber", f_scale=1.0, max_nfev=int(max_nfev), xtol=1e-10, ftol=1e-10, gtol=1e-10, ) fitted = model_at(sol.x) left_res = rayfield_two_plane_residuals( left_field, fitted.channel("left"), support_l, z_planes=z_planes ) right_res = rayfield_two_plane_residuals( right_field, fitted.channel("right"), support_r, z_planes=z_planes ) combined = residuals(sol.x) rss = float(np.sum(combined * combined)) n_res = int(combined.size) n_samples = int(support_l.shape[0] + support_r.shape[0]) left_rms = _ray_rms(left_res) right_rms = _ray_rms(right_res) aic, bic = _aic_bic(rss, n_res, n_samples, fitted.n_parameters) return CMOPhysicalStereoFitResult( model=fitted, success=bool(sol.success), message=str(sol.message), n_parameters=fitted.n_parameters, n_samples=n_samples, n_residual_scalars=n_res, rss=rss, left_rms_mm=left_rms, right_rms_mm=right_rms, rms_mm=float(np.sqrt(0.5 * (left_rms**2 + right_rms**2))), aic=aic, bic=bic, parameter_vector=np.asarray(sol.x, dtype=np.float64).copy(), parameter_dict=fitted.parameter_dict(), )
# ═══════════════════════════════════════════════════════════════════════ # Polynomial pre-warp helpers # ═══════════════════════════════════════════════════════════════════════ def _poly_terms_2d(level: int) -> list[tuple[int, int]]: """Monomial exponent pairs (pu, pv) for a 2D polynomial of given level. Level 0: constant (1 term). Level 1: + u, v (3 terms). Level 2: + u^2, uv, v^2 (6 terms). Level 3: + u^3, u^2 v, u v^2, v^3 (10 terms). """ terms: list[tuple[int, int]] = [(0, 0)] if level >= 1: terms.extend([(1, 0), (0, 1)]) if level >= 2: terms.extend([(2, 0), (1, 1), (0, 2)]) if level >= 3: terms.extend([(3, 0), (2, 1), (1, 2), (0, 3)]) return terms def _n_warp_coeff_per_axis(level: int) -> int: return len(_poly_terms_2d(level)) def _n_warp_coeff_total(level: int, shared: bool) -> int: """Total warp coefficients across both axes and channels.""" if level == 0: return 0 # identity warp, no coefficients per_axis = _n_warp_coeff_per_axis(level) per_channel = per_axis * 2 # xi + eta return per_channel if shared else per_channel * 2 def _polyval_2d(u: Array, v: Array, coeffs: tuple[float, ...], level: int) -> Array: """Evaluate a 2D polynomial at (u, v).""" u_arr = np.asarray(u, dtype=np.float64) v_arr = np.asarray(v, dtype=np.float64) result = np.zeros_like(u_arr) terms = _poly_terms_2d(level) for (pu, pv), c in zip(terms, coeffs, strict=True): result = result + float(c) * (u_arr ** float(pu)) * (v_arr ** float(pv)) return result # ═══════════════════════════════════════════════════════════════════════ # CMOWarpedStereoModel # ═══════════════════════════════════════════════════════════════════════
[docs] @dataclass(frozen=True) class CMOWarpedStereoModel: """CMO model with configurable polynomial pixel pre-warp. Pipeline per channel:: (xi, eta) = W_c(u, v) -- polynomial pre-warp (level 0..3) d = normalize(d_chief + affine correction on warped coords) O = S_c + pupil shear on warped coords The warp maps pixel coordinates to optical field coordinates before the telecentric direction model acts. This captures non-radial, asymmetric field-angle distortions that a pure affine direction field cannot. """ # --- Telecentric stereo base --- f_obj_mm: float working_distance_mm: float b_mm: float cx_principal_px: float cy_principal_px: float pixel_pitch_mm: float f_angular_mm: float = 0.0 theta_convergence_half_rad: float = 0.0 d_y_common: float = 0.0 s_x_L: float = 0.0 s_y_L: float = 0.0 s_x_R: float = 0.0 s_y_R: float = 0.0 s_xy_L: float = 0.0 s_yx_L: float = 0.0 s_xy_R: float = 0.0 s_yx_R: float = 0.0 shared_slopes: bool = True rho_x_L: float = 0.0 rho_y_L: float = 0.0 rho_x_R: float = 0.0 rho_y_R: float = 0.0 shared_shear: bool = True q_xx_L: float = 0.0 q_yy_L: float = 0.0 q_xx_R: float = 0.0 q_yy_R: float = 0.0 shared_quadratic: bool = True image_size: tuple[int, int] | None = None name: str = "cmo_warped_stereo" is_stereo_shared: bool = True # --- Warp fields --- warp_xi_L: tuple[float, ...] = () warp_eta_L: tuple[float, ...] = () warp_xi_R: tuple[float, ...] = () warp_eta_R: tuple[float, ...] = () warp_level: int = 0 shared_warp: bool = True def __post_init__(self) -> None: if int(self.warp_level) == 0: return # level 0 = identity, no coeff validation needed per_axis = _n_warp_coeff_per_axis(int(self.warp_level)) for name, tup in [ ("warp_xi_L", self.warp_xi_L), ("warp_eta_L", self.warp_eta_L), ("warp_xi_R", self.warp_xi_R), ("warp_eta_R", self.warp_eta_R), ]: if len(tup) != per_axis: raise ValueError( f"{name} has {len(tup)} coeffs, expected {per_axis} for level {self.warp_level}" ) @property def n_parameters(self) -> int: """Number of free parameters including warp coefficients. The base count is the telecentric parameter count (12, 14, or 16 depending on shared flags). Added to this is the total warp coefficient count from :func:`_n_warp_coeff_total`, which depends on ``warp_level`` and ``shared_warp``. """ n = 12 if self.shared_slopes else 14 if not self.shared_shear: n += 2 if not self.shared_quadratic: n += 2 n += _n_warp_coeff_total(int(self.warp_level), bool(self.shared_warp)) return n def _get_warp_coeffs( self, channel: Literal["left", "right"] ) -> tuple[tuple[float, ...], tuple[float, ...]]: if channel == "left": return self.warp_xi_L, self.warp_eta_L return self.warp_xi_R, self.warp_eta_R
[docs] def parameter_vector(self) -> Array: """Serialize the warped model to a flat parameter vector. The vector begins with the telecentric base parameters (see :meth:`CMOTelecentricStereoModel.parameter_vector`), followed by the warp polynomial coefficients: ``warp_xi_L``, ``warp_eta_L``, and optionally ``warp_xi_R``, ``warp_eta_R`` when ``shared_warp=False``. Returns ------- ndarray of float """ vec: list[float] = [ float(self.f_obj_mm), float(self.working_distance_mm), float(self.b_mm), float(self.cx_principal_px), float(self.cy_principal_px), float(self.f_angular_mm), float(self.theta_convergence_half_rad), float(self.d_y_common), float(self.s_x_L), float(self.s_y_L), ] if not self.shared_slopes: vec.extend([float(self.s_x_R), float(self.s_y_R)]) if self.shared_shear: vec.extend([float(self.rho_x_L), float(self.rho_y_L)]) else: vec.extend( [float(self.rho_x_L), float(self.rho_y_L), float(self.rho_x_R), float(self.rho_y_R)] ) if self.warp_level > 0: vec.extend(self.warp_xi_L) vec.extend(self.warp_eta_L) if not self.shared_warp: vec.extend(self.warp_xi_R) vec.extend(self.warp_eta_R) return np.array(vec, dtype=np.float64)
[docs] @classmethod def from_parameter_vector(cls, x: Array, **kwargs: Any) -> CMOWarpedStereoModel: """Reconstruct the warped model from a flat parameter vector. The vector layout is: telecentric base (see :meth:`CMOTelecentricStereoModel.from_parameter_vector`), then warp polynomial coefficients in order ``warp_xi_L, warp_eta_L`` (and ``warp_xi_R, warp_eta_R`` if ``shared_warp=False``). The number of coefficients per polynomial axis is determined by ``warp_level`` (via :func:`_poly_terms_2d`). At level 0 (identity warp), no warp coefficients are present. Parameters ---------- x : ndarray of float **kwargs Must include ``pixel_pitch_mm`` (float, mm). Must include ``warp_level`` (int) and ``shared_warp`` (bool) for non-zero warp levels. May include ``image_size``. Returns ------- CMOWarpedStereoModel """ arr = np.asarray(x, dtype=np.float64).reshape(-1) pixel_pitch_mm = float(kwargs["pixel_pitch_mm"]) image_size = kwargs.get("image_size") warp_level = int(kwargs.get("warp_level", 0)) shared_warp = bool(kwargs.get("shared_warp", True)) n_warp = _n_warp_coeff_total(warp_level, shared_warp) n_base = int(arr.size) - n_warp if n_base not in {12, 14, 16}: raise ValueError( f"CMOWarpedStereoModel: invalid base param count {n_base} " f"(warp_level={warp_level}, shared_warp={shared_warp})" ) shared_slopes = n_base == 12 shared_shear = n_base in {12, 14} base = arr[:n_base] sx_L = float(base[8]) sy_L = float(base[9]) if shared_slopes: sx_R, sy_R = sx_L, sy_L rho_idx = 10 else: sx_R = float(base[10]) sy_R = float(base[11]) rho_idx = 12 if shared_shear: rx_L = float(base[rho_idx]) ry_L = float(base[rho_idx + 1]) rx_R, ry_R = rx_L, ry_L else: rx_L = float(base[rho_idx]) ry_L = float(base[rho_idx + 1]) rx_R = float(base[rho_idx + 2]) ry_R = float(base[rho_idx + 3]) warp_flat = arr[n_base:] if warp_level > 0 and n_warp > 0: per_axis = _n_warp_coeff_per_axis(warp_level) xi_L = tuple(float(v) for v in warp_flat[:per_axis]) eta_L = tuple(float(v) for v in warp_flat[per_axis : 2 * per_axis]) if shared_warp: xi_R, eta_R = xi_L, eta_L else: xi_R = tuple(float(v) for v in warp_flat[2 * per_axis : 3 * per_axis]) eta_R = tuple(float(v) for v in warp_flat[3 * per_axis : 4 * per_axis]) else: xi_L = eta_L = xi_R = eta_R = () return cls( f_obj_mm=float(base[0]), working_distance_mm=float(base[1]), b_mm=float(base[2]), cx_principal_px=float(base[3]), cy_principal_px=float(base[4]), pixel_pitch_mm=pixel_pitch_mm, f_angular_mm=float(base[5]), theta_convergence_half_rad=float(base[6]), d_y_common=float(base[7]), s_x_L=sx_L, s_y_L=sy_L, s_x_R=sx_R, s_y_R=sy_R, shared_slopes=shared_slopes, rho_x_L=rx_L, rho_y_L=ry_L, rho_x_R=rx_R, rho_y_R=ry_R, shared_shear=shared_shear, image_size=image_size, warp_xi_L=xi_L, warp_eta_L=eta_L, warp_xi_R=xi_R, warp_eta_R=eta_R, warp_level=warp_level, shared_warp=shared_warp, )
[docs] def parameter_dict(self) -> dict[str, object]: """Return free/fixed parameter groups with warp coefficients. Fixed keys are sensor constants. Free keys include telecentric base parameters plus per-channel, per-monomial warp coefficients keyed as ``warp_xi_{L|R}_u{pu}v{pv}`` and ``warp_eta_{L|R}_u{pu}v{pv}``. Returns ------- dict ``{"free": {...}, "fixed": {...}}``. """ d: dict[str, object] = { "f_obj_mm": float(self.f_obj_mm), "working_distance_mm": float(self.working_distance_mm), "b_mm": float(self.b_mm), "cx_principal_px": float(self.cx_principal_px), "cy_principal_px": float(self.cy_principal_px), "f_angular_mm": float(self.f_angular_mm), "theta_convergence_half_rad": float(self.theta_convergence_half_rad), "theta_convergence_half_deg": float(np.degrees(float(self.theta_convergence_half_rad))), "d_y_common": float(self.d_y_common), "warp_level": int(self.warp_level), "shared_warp": bool(self.shared_warp), } free: dict[str, float] = {} free["s_x_L"] = float(self.s_x_L) free["s_y_L"] = float(self.s_y_L) if not self.shared_slopes: free["s_x_R"] = float(self.s_x_R) free["s_y_R"] = float(self.s_y_R) if self.shared_shear: free["rho_x"] = float(self.rho_x_L) free["rho_y"] = float(self.rho_y_L) else: free["rho_x_L"] = float(self.rho_x_L) free["rho_y_L"] = float(self.rho_y_L) free["rho_x_R"] = float(self.rho_x_R) free["rho_y_R"] = float(self.rho_y_R) d["free"] = free terms = _poly_terms_2d(int(self.warp_level)) for ch, xi_tup, eta_tup in [ ("L", self.warp_xi_L, self.warp_eta_L), ("R", self.warp_xi_R, self.warp_eta_R), ]: for i, (pu, pv) in enumerate(terms): if i < len(xi_tup): d[f"warp_xi_{ch}_u{pu}v{pv}"] = float(xi_tup[i]) d[f"warp_eta_{ch}_u{pu}v{pv}"] = float(eta_tup[i]) return d
[docs] def channel(self, channel: Literal["left", "right"]) -> CMOWarpedChannelModel: """Return a single-channel facade for the given channel name. See :meth:`CMOPhysicalStereoModel.channel` — identical pattern. Parameters ---------- channel : {"left", "right"} Returns ------- CMOWarpedChannelModel """ return CMOWarpedChannelModel(rig=self, channel=channel)
[docs] def ray(self, u: Array, v: Array, channel: Literal["left", "right"]) -> tuple[Array, Array]: """Compute 3-D rays through the warped CMO model. Extends :meth:`CMOTelecentricStereoModel.ray` with an initial pixel-domain polynomial warp: 1. If ``warp_level > 0``, map pixel coordinates ``(u, v)`` through the polynomial warp :math:`W_c`: ``(xi, eta) = W_c(u, v)``. Otherwise ``(xi, eta) = (u, v)``. 2. Convert the warped coordinates to normalised angular coordinates, then proceed with the telecentric direction model (chief-ray direction + affine/cross/quadratic correction + normalisation, sub-pupil origin + pupil shear). The warp captures residual field-angle distortions that the affine direction-field model alone cannot represent, while the telecentric base handles the dominant quasi-telecentric behaviour. Parameters ---------- u : ndarray Horizontal pixel coordinates. v : ndarray Vertical pixel coordinates. channel : {"left", "right"} Returns ------- origin : ndarray, shape (..., 3) Ray origin in mm. direction : ndarray, shape (..., 3) Unit ray direction vector. """ uu, vv = np.broadcast_arrays( np.asarray(u, dtype=np.float64), np.asarray(v, dtype=np.float64) ) shape = uu.shape uf, vf = uu.reshape(-1), vv.reshape(-1) if self.warp_level > 0: xi_c, eta_c = self._get_warp_coeffs(channel) xi = _polyval_2d(uf, vf, xi_c, int(self.warp_level)) eta = _polyval_2d(uf, vf, eta_c, int(self.warp_level)) else: xi, eta = uf.astype(np.float64), vf.astype(np.float64) f_ang = float(self.f_angular_mm) if float(self.f_angular_mm) > 0.1 else float(self.f_obj_mm) pp = float(self.pixel_pitch_mm) / f_ang tilde_u = (xi - float(self.cx_principal_px)) * pp tilde_v = (eta - float(self.cy_principal_px)) * pp dir_sign = 1.0 if channel == "left" else -1.0 sin_th = float(np.sin(float(self.theta_convergence_half_rad))) cos_th = float(np.cos(float(self.theta_convergence_half_rad))) dy = float(self.d_y_common) d0 = np.column_stack( [ np.full_like(uf, dir_sign * sin_th), np.full_like(uf, dy), np.full_like(uf, cos_th), ] ) sx_ch = float(self.s_x_L) if channel == "left" else float(self.s_x_R) sy_ch = float(self.s_y_L) if channel == "left" else float(self.s_y_R) sxy_ch = float(self.s_xy_L) if channel == "left" else float(self.s_xy_R) syx_ch = float(self.s_yx_L) if channel == "left" else float(self.s_yx_R) qx_ch = float(self.q_xx_L) if channel == "left" else float(self.q_xx_R) qy_ch = float(self.q_yy_L) if channel == "left" else float(self.q_yy_R) umean2 = float(np.mean(tilde_u**2)) vmean2 = float(np.mean(tilde_v**2)) d_raw = np.column_stack( [ d0[:, 0] + sx_ch * tilde_u + sxy_ch * tilde_v + qx_ch * (tilde_u**2 - umean2), d0[:, 1] + sy_ch * tilde_v + syx_ch * tilde_u + qy_ch * (tilde_v**2 - vmean2), d0[:, 2], ] ) directions = _normalize(d_raw) pupil_sign = -1.0 if channel == "left" else 1.0 z_pupil = float(self.working_distance_mm) - float(self.f_obj_mm) rho_x = float(self.rho_x_L) if channel == "left" else float(self.rho_x_R) rho_y = float(self.rho_y_L) if channel == "left" else float(self.rho_y_R) O_rigid = np.column_stack( [ np.full_like(uf, pupil_sign * 0.5 * float(self.b_mm)), np.zeros_like(uf), np.full_like(uf, z_pupil), ] ) delta_O = np.column_stack([rho_x * tilde_u, rho_y * tilde_v, np.zeros_like(uf)]) pupil = O_rigid + delta_O return pupil.reshape((*shape, 3)), directions.reshape((*shape, 3))
[docs] @dataclass(frozen=True) class CMOWarpedChannelModel: """Single-channel facade for a shared warped CMO rig.""" rig: CMOWarpedStereoModel channel: Literal["left", "right"] name: str = "cmo_warped_channel" is_stereo_shared: bool = True @property def n_parameters(self) -> int: """Number of free parameters, delegated to the shared rig. See :attr:`CMOWarpedStereoModel.n_parameters`. """ return self.rig.n_parameters
[docs] def parameter_vector(self) -> Array: """Serialize to a flat parameter vector, delegated to the shared rig. See :meth:`CMOWarpedStereoModel.parameter_vector`. """ return self.rig.parameter_vector()
[docs] @classmethod def from_parameter_vector(cls, x: Array, **kwargs: Any) -> CMOWarpedChannelModel: """Reconstruct a channel model from a flat parameter vector. Builds the shared :class:`CMOWarpedStereoModel` from *x*, then wraps it with the channel given by ``kwargs["channel"]``. Parameters ---------- x : ndarray of float Flattened parameter vector (see :meth:`CMOWarpedStereoModel.from_parameter_vector`). **kwargs Must include ``pixel_pitch_mm``, ``warp_level``, and ``shared_warp``. May include ``image_size`` and ``channel``. Returns ------- CMOWarpedChannelModel """ rig = CMOWarpedStereoModel.from_parameter_vector(x, **kwargs) channel = kwargs.get("channel", "left") if channel not in {"left", "right"}: raise ValueError("channel must be 'left' or 'right'") return cls(rig=rig, channel=channel)
[docs] def parameter_dict(self) -> dict[str, object]: """Return free/fixed parameter groups, delegated to the shared rig. See :meth:`CMOWarpedStereoModel.parameter_dict`. """ return self.rig.parameter_dict()
[docs] def ray(self, u: Array, v: Array) -> tuple[Array, Array]: """Compute 3-D rays, delegating to the shared rig with this channel. See :meth:`CMOWarpedStereoModel.ray`. """ return self.rig.ray(u, v, self.channel)
# ═══════════════════════════════════════════════════════════════════════ # Fitting and residual analysis # ═══════════════════════════════════════════════════════════════════════
[docs] def fit_cmo_warped_model_to_rayfields( left_field, right_field, image_size: tuple[int, int], initial_parameters: Array, *, pixel_pitch_mm: float, z_planes: tuple[float, float] = (50.0, 250.0), grid_shape: tuple[int, int] = (17, 13), max_nfev: int = 500, warp_level: int = 0, shared_warp: bool = True, ) -> CMOPhysicalStereoFitResult: """Fit the warped CMO model with sensor-plane polynomial corrections. The most flexible CMO variant. It adds a per-channel 2-D polynomial warp on the sensor plane before ray evaluation, capturing residual non-idealities beyond the physical model. Parameters ---------- left_field : ZernikeRayField Measured left-channel rayfield. right_field : ZernikeRayField Measured right-channel rayfield. image_size : (int, int) Sensor dimensions in pixels (width, height). initial_parameters : ndarray Starting parameter vector including CMO rig + warp coefficients. pixel_pitch_mm : float Sensor pixel pitch in millimetres. z_planes : (float, float) Two z-planes in mm (default 50, 250) for ray intersection evaluation. grid_shape : (int, int) Subsampling grid for the full-image residual. max_nfev : int Maximum number of function evaluations. warp_level : int Polynomial level for the sensor-plane warp. shared_warp : bool If True, left and right channels share the same warp coefficients. Returns ------- CMOPhysicalStereoFitResult Dataclass result object with ``parameter_vector`` (optimal params), ``message``, ``success``, and ``model`` (the fitted CMOWarpedStereoModel). """ x0 = np.asarray(initial_parameters, dtype=np.float64).reshape(-1) full = _grid_pixels(image_size, grid_shape) def model_at(x: Array) -> CMOWarpedStereoModel: """Reconstruct a :class:`CMOWarpedStereoModel` from parameter vector *x*. Convenience closure supplying ``pixel_pitch_mm``, ``image_size``, ``warp_level``, and ``shared_warp`` captured from the enclosing fit context. """ return CMOWarpedStereoModel.from_parameter_vector( x, pixel_pitch_mm=pixel_pitch_mm, image_size=image_size, warp_level=warp_level, shared_warp=shared_warp, ) def residuals(x: Array) -> Array: """Evaluate stacked two-plane rayfield residuals for parameter vector *x*. Computes left and right channel residuals via :func:`rayfield_two_plane_residuals` at the full grid. """ model = model_at(x) left = model.channel("left") right = model.channel("right") return np.concatenate( [ rayfield_two_plane_residuals(left_field, left, full, z_planes=z_planes), rayfield_two_plane_residuals(right_field, right, full, z_planes=z_planes), ] ) # Bounds tele_lo = [1.0, 1.0, 0.0, -np.inf, -np.inf, 1.0, 0.0, -0.3, -10.0, -10.0] tele_hi = [500.0, 1000.0, 200.0, np.inf, np.inf, 500.0, 0.5, 0.3, 10.0, 10.0] if x0.size > 10: n_extra = x0.size - 10 tele_lo.extend([-np.inf] * n_extra) tele_hi.extend([np.inf] * n_extra) n_warp = _n_warp_coeff_total(warp_level, shared_warp) warp_lo: list[float] = [] warp_hi: list[float] = [] if n_warp > 0: for pu, pv in _poly_terms_2d(warp_level): deg = pu + pv lim = {0: 500.0, 1: 10.0, 2: 0.01, 3: 1e-4}.get(deg, 1e-4) warp_lo.append(-lim) warp_hi.append(lim) n_repeats = 2 if shared_warp else 4 warp_lo = warp_lo * n_repeats warp_hi = warp_hi * n_repeats n_tele = x0.size - n_warp lower = np.array(tele_lo[:n_tele] + warp_lo, dtype=np.float64) upper = np.array(tele_hi[:n_tele] + warp_hi, dtype=np.float64) sol = least_squares( residuals, x0=x0, bounds=(lower, upper), loss="huber", f_scale=1.0, max_nfev=int(max_nfev), xtol=1e-10, ftol=1e-10, gtol=1e-10, ) fitted = model_at(sol.x) left_res = rayfield_two_plane_residuals( left_field, fitted.channel("left"), full, z_planes=z_planes ) right_res = rayfield_two_plane_residuals( right_field, fitted.channel("right"), full, z_planes=z_planes ) combined = residuals(sol.x) rss = float(np.sum(combined * combined)) n_res = int(combined.size) n_samples = int(full.shape[0] * 2) left_rms = _ray_rms(left_res) right_rms = _ray_rms(right_res) aic, bic = _aic_bic(rss, n_res, n_samples, fitted.n_parameters) return CMOPhysicalStereoFitResult( model=fitted, success=bool(sol.success), message=str(sol.message), n_parameters=fitted.n_parameters, n_samples=n_samples, n_residual_scalars=n_res, rss=rss, left_rms_mm=left_rms, right_rms_mm=right_rms, rms_mm=float(np.sqrt(0.5 * (left_rms**2 + right_rms**2))), aic=aic, bic=bic, parameter_vector=np.asarray(sol.x, dtype=np.float64).copy(), parameter_dict=fitted.parameter_dict(), )
[docs] def compute_cmo_zernike_residuals( cmo_model, zernike_left, zernike_right, grid_shape: tuple[int, int] = (17, 13), image_size: tuple[int, int] | None = None, zernike_order: int = 4, ) -> dict[str, Any]: """Compute direction/moment residuals between a CMO model and Zernike rayfield. Returns delta_d (deg), delta_m (mm), and Zernike modal projection of the direction residual field. """ if image_size is None: raise ValueError("image_size is required") W, H = int(image_size[0]), int(image_size[1]) pixels = _grid_pixels(image_size, grid_shape) uf, vf = pixels[:, 0], pixels[:, 1] OzL, dzL = zernike_left.ray(uf, vf) OzR, dzR = zernike_right.ray(uf, vf) if hasattr(cmo_model, "channel"): OmL, dmL = cmo_model.ray(uf, vf, "left") OmR, dmR = cmo_model.ray(uf, vf, "right") else: OmL, dmL = cmo_model.ray(uf, vf) OmR, dmR = OmL, dmL dot_L = np.clip(np.sum(dzL * dmL, axis=1), -1.0, 1.0) dot_R = np.clip(np.sum(dzR * dmR, axis=1), -1.0, 1.0) ang_L = np.degrees(np.arccos(dot_L)) ang_R = np.degrees(np.arccos(dot_R)) mzL = np.cross(OzL, dzL) mmL = np.cross(OmL, dmL) mzR = np.cross(OzR, dzR) mmR = np.cross(OmR, dmR) mom_L = np.linalg.norm(mzL - mmL, axis=1) mom_R = np.linalg.norm(mzR - mmR, axis=1) delta_d_L = dzL - dmL delta_d_R = dzR - dmR from stereocomplex.core.model_compact.zernike import zernike_modes, eval_real_zernike modes = zernike_modes(int(zernike_order)) xi = 2.0 * uf / float(W - 1) - 1.0 zeta = 2.0 * vf / float(H - 1) - 1.0 rho = np.sqrt(xi * xi + zeta * zeta) / np.sqrt(2.0) theta = np.arctan2(zeta, xi) B = np.empty((uf.size, len(modes)), dtype=np.float64) for j, mode in enumerate(modes): B[:, j] = eval_real_zernike(mode, rho, theta) BtB_inv = np.linalg.inv(B.T @ B + 1e-10 * np.eye(B.shape[1])) B_pinv = BtB_inv @ B.T projection: dict[str, float] = {} for ch_label, dd in [("L", delta_d_L), ("R", delta_d_R)]: for comp, cl in enumerate(["dx", "dy", "dz"]): c = B_pinv @ dd[:, comp] for j, mode in enumerate(modes): projection[f"{ch_label}_n{mode.n}_m{mode.m}_{mode.kind}_{cl}"] = float(c[j]) Bsq = np.sum(B**2, axis=0) var_total_L = float(np.sum(delta_d_L**2)) var_total_R = float(np.sum(delta_d_R**2)) mode_contribs = [] for j, mode in enumerate(modes): cLx = projection.get(f"L_n{mode.n}_m{mode.m}_{mode.kind}_dx", 0.0) cLy = projection.get(f"L_n{mode.n}_m{mode.m}_{mode.kind}_dy", 0.0) cLz = projection.get(f"L_n{mode.n}_m{mode.m}_{mode.kind}_dz", 0.0) cRx = projection.get(f"R_n{mode.n}_m{mode.m}_{mode.kind}_dx", 0.0) cRy = projection.get(f"R_n{mode.n}_m{mode.m}_{mode.kind}_dy", 0.0) cRz = projection.get(f"R_n{mode.n}_m{mode.m}_{mode.kind}_dz", 0.0) frac_L = float((cLx**2 + cLy**2 + cLz**2) * Bsq[j]) / max(var_total_L, 1e-16) frac_R = float((cRx**2 + cRy**2 + cRz**2) * Bsq[j]) / max(var_total_R, 1e-16) mode_contribs.append( { "mode": f"Z_{mode.n}^{mode.m}({mode.kind})", "n": mode.n, "m": mode.m, "kind": mode.kind, "frac_var_L": frac_L, "frac_var_R": frac_R, } ) mode_contribs.sort(key=lambda x: abs(x["frac_var_L"]) + abs(x["frac_var_R"]), reverse=True) return { "dir_rms_deg_L": float(np.sqrt(np.mean(ang_L**2))), "dir_rms_deg_R": float(np.sqrt(np.mean(ang_R**2))), "dir_rms_deg_total": float(np.sqrt(np.mean(np.concatenate([ang_L, ang_R]) ** 2))), "mom_rms_mm_L": float(np.sqrt(np.mean(mom_L**2))), "mom_rms_mm_R": float(np.sqrt(np.mean(mom_R**2))), "mom_rms_mm_total": float(np.sqrt(np.mean(np.concatenate([mom_L, mom_R]) ** 2))), "zernike_projection": projection, "top_direction_modes": mode_contribs[:8], "all_mode_contributions": mode_contribs, "n_points": int(pixels.shape[0]), "grid_shape": grid_shape, }
__all__ = [ "CMOPhysicalChannelModel", "CMOPhysicalStereoFitResult", "CMOPhysicalStereoModel", "CMOTelecentricChannelModel", "CMOTelecentricNModel", "CMOTelecentricStereoModel", "fit_cmo_physical_stereo_model_to_rayfields", "fit_cmo_telecentric_model_to_rayfields", ]