|
|
""" |
|
|
Timebase alignment helpers (Phase 1). |
|
|
|
|
|
We support: |
|
|
- validating precomputed `sync_offsets.json` (handled elsewhere) |
|
|
- computing sanity metrics from timestamps (drops/drift) |
|
|
- (optional) audio pulse alignment from WAV files (cross-correlation) |
|
|
|
|
|
All functions are dependency-light (std lib + numpy). |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import json |
|
|
import wave |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Optional, Tuple |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
def load_timestamps_seconds(path: Path) -> np.ndarray: |
|
|
""" |
|
|
Load timestamps (seconds) from JSON. |
|
|
|
|
|
Supported: |
|
|
- {"t": [...]} or {"timestamps_s": [...]} |
|
|
- [...] (list of numbers) |
|
|
""" |
|
|
|
|
|
obj = json.loads(Path(path).read_text()) |
|
|
if isinstance(obj, dict): |
|
|
if "t" in obj and isinstance(obj["t"], list): |
|
|
arr = np.asarray(obj["t"], dtype=np.float64) |
|
|
elif "timestamps_s" in obj and isinstance(obj["timestamps_s"], list): |
|
|
arr = np.asarray(obj["timestamps_s"], dtype=np.float64) |
|
|
else: |
|
|
raise ValueError(f"Unsupported timestamps JSON schema: {path}") |
|
|
elif isinstance(obj, list): |
|
|
arr = np.asarray(obj, dtype=np.float64) |
|
|
else: |
|
|
raise ValueError(f"Unsupported timestamps JSON schema: {path}") |
|
|
|
|
|
if arr.ndim != 1: |
|
|
raise ValueError(f"Timestamps must be 1D, got shape {arr.shape} in {path}") |
|
|
return arr |
|
|
|
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class DropStats: |
|
|
num_timestamps: int |
|
|
expected_dt_s: float |
|
|
num_gaps: int |
|
|
max_gap_s: float |
|
|
|
|
|
|
|
|
def dropped_frames_stats( |
|
|
timestamps_s: np.ndarray, |
|
|
*, |
|
|
expected_dt_s: Optional[float] = None, |
|
|
gap_factor: float = 1.5, |
|
|
) -> DropStats: |
|
|
""" |
|
|
Heuristic dropped-frame detection via large timestamp gaps. |
|
|
""" |
|
|
|
|
|
ts = np.asarray(timestamps_s, dtype=np.float64) |
|
|
if ts.size < 2: |
|
|
return DropStats(num_timestamps=int(ts.size), expected_dt_s=0.0, num_gaps=0, max_gap_s=0.0) |
|
|
|
|
|
dts = np.diff(ts) |
|
|
dts = dts[np.isfinite(dts) & (dts > 0)] |
|
|
if dts.size == 0: |
|
|
return DropStats(num_timestamps=int(ts.size), expected_dt_s=0.0, num_gaps=0, max_gap_s=0.0) |
|
|
|
|
|
exp = float(expected_dt_s) if expected_dt_s is not None else float(np.median(dts)) |
|
|
gaps = dts > (float(gap_factor) * exp) |
|
|
max_gap = float(np.max(dts)) if dts.size else 0.0 |
|
|
return DropStats( |
|
|
num_timestamps=int(ts.size), |
|
|
expected_dt_s=exp, |
|
|
num_gaps=int(np.sum(gaps)), |
|
|
max_gap_s=max_gap, |
|
|
) |
|
|
|
|
|
|
|
|
def linear_drift_fit( |
|
|
t_ref_s: np.ndarray, |
|
|
t_other_s: np.ndarray, |
|
|
) -> Dict[str, float]: |
|
|
""" |
|
|
Fit t_other ≈ a * t_ref + b. Returns drift in ppm and residual stats. |
|
|
""" |
|
|
|
|
|
a_ref = np.asarray(t_ref_s, dtype=np.float64) |
|
|
a_oth = np.asarray(t_other_s, dtype=np.float64) |
|
|
n = int(min(a_ref.size, a_oth.size)) |
|
|
if n < 3: |
|
|
return {"n": float(n), "a": 1.0, "b": 0.0, "drift_ppm": 0.0, "rmse_s": 0.0} |
|
|
|
|
|
x = a_ref[:n] |
|
|
y = a_oth[:n] |
|
|
|
|
|
x0 = float(x[0]) |
|
|
y0 = float(y[0]) |
|
|
x = x - x0 |
|
|
y = y - y0 |
|
|
|
|
|
A = np.stack([x, np.ones_like(x)], axis=1) |
|
|
coeff, *_ = np.linalg.lstsq(A, y, rcond=None) |
|
|
a = float(coeff[0]) |
|
|
b = float(coeff[1] + y0 - a * x0) |
|
|
|
|
|
y_hat = a * (a_ref[:n]) + b |
|
|
resid = y_hat - a_oth[:n] |
|
|
rmse = float(np.sqrt(np.mean(resid * resid))) |
|
|
drift_ppm = float((a - 1.0) * 1e6) |
|
|
return {"n": float(n), "a": a, "b": b, "drift_ppm": drift_ppm, "rmse_s": rmse} |
|
|
|
|
|
|
|
|
def _read_wav_mono(path: Path) -> Tuple[np.ndarray, int]: |
|
|
with wave.open(str(path), "rb") as wf: |
|
|
sr = int(wf.getframerate()) |
|
|
n = int(wf.getnframes()) |
|
|
chans = int(wf.getnchannels()) |
|
|
sampwidth = int(wf.getsampwidth()) |
|
|
raw = wf.readframes(n) |
|
|
if sampwidth == 2: |
|
|
a = np.frombuffer(raw, dtype=np.int16).astype(np.float32) |
|
|
a /= 32768.0 |
|
|
elif sampwidth == 4: |
|
|
a = np.frombuffer(raw, dtype=np.int32).astype(np.float32) |
|
|
a /= 2147483648.0 |
|
|
else: |
|
|
raise ValueError(f"Unsupported WAV sample width: {sampwidth}") |
|
|
if chans > 1: |
|
|
a = a.reshape(-1, chans).mean(axis=1) |
|
|
return a, sr |
|
|
|
|
|
|
|
|
def audio_offset_seconds( |
|
|
ref_wav: Path, |
|
|
other_wav: Path, |
|
|
*, |
|
|
max_lag_s: float = 1.0, |
|
|
downsample_hz: int = 2000, |
|
|
) -> float: |
|
|
""" |
|
|
Estimate offset between two WAVs via cross-correlation of amplitude envelopes. |
|
|
Positive means other lags ref (other should be shifted earlier by offset). |
|
|
""" |
|
|
|
|
|
ref, sr_ref = _read_wav_mono(ref_wav) |
|
|
oth, sr_oth = _read_wav_mono(other_wav) |
|
|
if sr_ref != sr_oth: |
|
|
|
|
|
target = int(min(sr_ref, sr_oth, downsample_hz)) |
|
|
else: |
|
|
target = int(min(sr_ref, downsample_hz)) |
|
|
|
|
|
def downsample(x: np.ndarray, sr: int) -> np.ndarray: |
|
|
step = max(1, int(round(sr / target))) |
|
|
return x[::step] |
|
|
|
|
|
ref_d = downsample(ref, sr_ref) |
|
|
oth_d = downsample(oth, sr_oth) |
|
|
|
|
|
|
|
|
ref_e = np.abs(ref_d) |
|
|
oth_e = np.abs(oth_d) |
|
|
ref_e -= float(np.mean(ref_e)) |
|
|
oth_e -= float(np.mean(oth_e)) |
|
|
|
|
|
max_lag = int(round(float(max_lag_s) * target)) |
|
|
|
|
|
n = int(2 ** int(np.ceil(np.log2(ref_e.size + oth_e.size + 1)))) |
|
|
F = np.fft.rfft(ref_e, n=n) |
|
|
G = np.fft.rfft(oth_e, n=n) |
|
|
corr = np.fft.irfft(F * np.conj(G), n=n) |
|
|
|
|
|
corr = np.concatenate([corr[-(oth_e.size - 1) :], corr[: ref_e.size]]) |
|
|
center = oth_e.size - 1 |
|
|
lo = max(0, center - max_lag) |
|
|
hi = min(corr.size, center + max_lag + 1) |
|
|
window = corr[lo:hi] |
|
|
best = int(np.argmax(window)) + lo |
|
|
lag = best - center |
|
|
return float(lag) / float(target) |
|
|
|
|
|
|
|
|
def sync_sanity_from_timestamps( |
|
|
timestamps_by_device: Dict[str, np.ndarray], |
|
|
*, |
|
|
reference_device: Optional[str] = None, |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Compute per-device dropped-frame stats and drift relative to a reference device. |
|
|
""" |
|
|
|
|
|
if not timestamps_by_device: |
|
|
return {"ok": False, "reason": "no_timestamps"} |
|
|
ref_id = reference_device or sorted(timestamps_by_device.keys())[0] |
|
|
ref = timestamps_by_device.get(ref_id) |
|
|
if ref is None: |
|
|
return {"ok": False, "reason": "missing_reference", "reference_device": ref_id} |
|
|
|
|
|
drop: Dict[str, Any] = {} |
|
|
drift: Dict[str, Any] = {} |
|
|
for did, ts in timestamps_by_device.items(): |
|
|
ds = dropped_frames_stats(ts) |
|
|
drop[did] = { |
|
|
"num_timestamps": ds.num_timestamps, |
|
|
"expected_dt_s": ds.expected_dt_s, |
|
|
"num_gaps": ds.num_gaps, |
|
|
"max_gap_s": ds.max_gap_s, |
|
|
} |
|
|
drift[did] = ( |
|
|
linear_drift_fit(ref, ts) |
|
|
if did != ref_id |
|
|
else {"n": float(ts.size), "a": 1.0, "b": 0.0, "drift_ppm": 0.0, "rmse_s": 0.0} |
|
|
) |
|
|
|
|
|
return { |
|
|
"ok": True, |
|
|
"reference_device": ref_id, |
|
|
"dropped_frames": drop, |
|
|
"drift": drift, |
|
|
} |
|
|
|