""" Timebase alignment helpers (Phase 1). We support: - validating precomputed `sync_offsets.json` (handled elsewhere) - computing sanity metrics from timestamps (drops/drift) - (optional) audio pulse alignment from WAV files (cross-correlation) All functions are dependency-light (std lib + numpy). """ from __future__ import annotations import json import wave from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Optional, Tuple import numpy as np def load_timestamps_seconds(path: Path) -> np.ndarray: """ Load timestamps (seconds) from JSON. Supported: - {"t": [...]} or {"timestamps_s": [...]} - [...] (list of numbers) """ obj = json.loads(Path(path).read_text()) if isinstance(obj, dict): if "t" in obj and isinstance(obj["t"], list): arr = np.asarray(obj["t"], dtype=np.float64) elif "timestamps_s" in obj and isinstance(obj["timestamps_s"], list): arr = np.asarray(obj["timestamps_s"], dtype=np.float64) else: raise ValueError(f"Unsupported timestamps JSON schema: {path}") elif isinstance(obj, list): arr = np.asarray(obj, dtype=np.float64) else: raise ValueError(f"Unsupported timestamps JSON schema: {path}") if arr.ndim != 1: raise ValueError(f"Timestamps must be 1D, got shape {arr.shape} in {path}") return arr @dataclass(frozen=True) class DropStats: num_timestamps: int expected_dt_s: float num_gaps: int max_gap_s: float def dropped_frames_stats( timestamps_s: np.ndarray, *, expected_dt_s: Optional[float] = None, gap_factor: float = 1.5, ) -> DropStats: """ Heuristic dropped-frame detection via large timestamp gaps. """ ts = np.asarray(timestamps_s, dtype=np.float64) if ts.size < 2: return DropStats(num_timestamps=int(ts.size), expected_dt_s=0.0, num_gaps=0, max_gap_s=0.0) dts = np.diff(ts) dts = dts[np.isfinite(dts) & (dts > 0)] if dts.size == 0: return DropStats(num_timestamps=int(ts.size), expected_dt_s=0.0, num_gaps=0, max_gap_s=0.0) exp = float(expected_dt_s) if expected_dt_s is not None else float(np.median(dts)) gaps = dts > (float(gap_factor) * exp) max_gap = float(np.max(dts)) if dts.size else 0.0 return DropStats( num_timestamps=int(ts.size), expected_dt_s=exp, num_gaps=int(np.sum(gaps)), max_gap_s=max_gap, ) def linear_drift_fit( t_ref_s: np.ndarray, t_other_s: np.ndarray, ) -> Dict[str, float]: """ Fit t_other ≈ a * t_ref + b. Returns drift in ppm and residual stats. """ a_ref = np.asarray(t_ref_s, dtype=np.float64) a_oth = np.asarray(t_other_s, dtype=np.float64) n = int(min(a_ref.size, a_oth.size)) if n < 3: return {"n": float(n), "a": 1.0, "b": 0.0, "drift_ppm": 0.0, "rmse_s": 0.0} x = a_ref[:n] y = a_oth[:n] # subtract initial time to reduce conditioning x0 = float(x[0]) y0 = float(y[0]) x = x - x0 y = y - y0 A = np.stack([x, np.ones_like(x)], axis=1) coeff, *_ = np.linalg.lstsq(A, y, rcond=None) a = float(coeff[0]) b = float(coeff[1] + y0 - a * x0) y_hat = a * (a_ref[:n]) + b resid = y_hat - a_oth[:n] rmse = float(np.sqrt(np.mean(resid * resid))) drift_ppm = float((a - 1.0) * 1e6) return {"n": float(n), "a": a, "b": b, "drift_ppm": drift_ppm, "rmse_s": rmse} def _read_wav_mono(path: Path) -> Tuple[np.ndarray, int]: with wave.open(str(path), "rb") as wf: sr = int(wf.getframerate()) n = int(wf.getnframes()) chans = int(wf.getnchannels()) sampwidth = int(wf.getsampwidth()) raw = wf.readframes(n) if sampwidth == 2: a = np.frombuffer(raw, dtype=np.int16).astype(np.float32) a /= 32768.0 elif sampwidth == 4: a = np.frombuffer(raw, dtype=np.int32).astype(np.float32) a /= 2147483648.0 else: raise ValueError(f"Unsupported WAV sample width: {sampwidth}") if chans > 1: a = a.reshape(-1, chans).mean(axis=1) return a, sr def audio_offset_seconds( ref_wav: Path, other_wav: Path, *, max_lag_s: float = 1.0, downsample_hz: int = 2000, ) -> float: """ Estimate offset between two WAVs via cross-correlation of amplitude envelopes. Positive means other lags ref (other should be shifted earlier by offset). """ ref, sr_ref = _read_wav_mono(ref_wav) oth, sr_oth = _read_wav_mono(other_wav) if sr_ref != sr_oth: # crude resample by decimation to a shared low rate target = int(min(sr_ref, sr_oth, downsample_hz)) else: target = int(min(sr_ref, downsample_hz)) def downsample(x: np.ndarray, sr: int) -> np.ndarray: step = max(1, int(round(sr / target))) return x[::step] ref_d = downsample(ref, sr_ref) oth_d = downsample(oth, sr_oth) # Use absolute amplitude as simple envelope. ref_e = np.abs(ref_d) oth_e = np.abs(oth_d) ref_e -= float(np.mean(ref_e)) oth_e -= float(np.mean(oth_e)) max_lag = int(round(float(max_lag_s) * target)) # FFT-based correlation n = int(2 ** int(np.ceil(np.log2(ref_e.size + oth_e.size + 1)))) F = np.fft.rfft(ref_e, n=n) G = np.fft.rfft(oth_e, n=n) corr = np.fft.irfft(F * np.conj(G), n=n) # corr[k] corresponds to lag k (oth shifted by k) corr = np.concatenate([corr[-(oth_e.size - 1) :], corr[: ref_e.size]]) center = oth_e.size - 1 lo = max(0, center - max_lag) hi = min(corr.size, center + max_lag + 1) window = corr[lo:hi] best = int(np.argmax(window)) + lo lag = best - center return float(lag) / float(target) def sync_sanity_from_timestamps( timestamps_by_device: Dict[str, np.ndarray], *, reference_device: Optional[str] = None, ) -> Dict[str, Any]: """ Compute per-device dropped-frame stats and drift relative to a reference device. """ if not timestamps_by_device: return {"ok": False, "reason": "no_timestamps"} ref_id = reference_device or sorted(timestamps_by_device.keys())[0] ref = timestamps_by_device.get(ref_id) if ref is None: return {"ok": False, "reason": "missing_reference", "reference_device": ref_id} drop: Dict[str, Any] = {} drift: Dict[str, Any] = {} for did, ts in timestamps_by_device.items(): ds = dropped_frames_stats(ts) drop[did] = { "num_timestamps": ds.num_timestamps, "expected_dt_s": ds.expected_dt_s, "num_gaps": ds.num_gaps, "max_gap_s": ds.max_gap_s, } drift[did] = ( linear_drift_fit(ref, ts) if did != ref_id else {"n": float(ts.size), "a": 1.0, "b": 0.0, "drift_ppm": 0.0, "rmse_s": 0.0} ) return { "ok": True, "reference_device": ref_id, "dropped_frames": drop, "drift": drift, }