3d_model / ylff /services /sync_alignment.py
Azan
Clean deployment build (Squashed)
7a87926
"""
Timebase alignment helpers (Phase 1).
We support:
- validating precomputed `sync_offsets.json` (handled elsewhere)
- computing sanity metrics from timestamps (drops/drift)
- (optional) audio pulse alignment from WAV files (cross-correlation)
All functions are dependency-light (std lib + numpy).
"""
from __future__ import annotations
import json
import wave
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import numpy as np
def load_timestamps_seconds(path: Path) -> np.ndarray:
"""
Load timestamps (seconds) from JSON.
Supported:
- {"t": [...]} or {"timestamps_s": [...]}
- [...] (list of numbers)
"""
obj = json.loads(Path(path).read_text())
if isinstance(obj, dict):
if "t" in obj and isinstance(obj["t"], list):
arr = np.asarray(obj["t"], dtype=np.float64)
elif "timestamps_s" in obj and isinstance(obj["timestamps_s"], list):
arr = np.asarray(obj["timestamps_s"], dtype=np.float64)
else:
raise ValueError(f"Unsupported timestamps JSON schema: {path}")
elif isinstance(obj, list):
arr = np.asarray(obj, dtype=np.float64)
else:
raise ValueError(f"Unsupported timestamps JSON schema: {path}")
if arr.ndim != 1:
raise ValueError(f"Timestamps must be 1D, got shape {arr.shape} in {path}")
return arr
@dataclass(frozen=True)
class DropStats:
num_timestamps: int
expected_dt_s: float
num_gaps: int
max_gap_s: float
def dropped_frames_stats(
timestamps_s: np.ndarray,
*,
expected_dt_s: Optional[float] = None,
gap_factor: float = 1.5,
) -> DropStats:
"""
Heuristic dropped-frame detection via large timestamp gaps.
"""
ts = np.asarray(timestamps_s, dtype=np.float64)
if ts.size < 2:
return DropStats(num_timestamps=int(ts.size), expected_dt_s=0.0, num_gaps=0, max_gap_s=0.0)
dts = np.diff(ts)
dts = dts[np.isfinite(dts) & (dts > 0)]
if dts.size == 0:
return DropStats(num_timestamps=int(ts.size), expected_dt_s=0.0, num_gaps=0, max_gap_s=0.0)
exp = float(expected_dt_s) if expected_dt_s is not None else float(np.median(dts))
gaps = dts > (float(gap_factor) * exp)
max_gap = float(np.max(dts)) if dts.size else 0.0
return DropStats(
num_timestamps=int(ts.size),
expected_dt_s=exp,
num_gaps=int(np.sum(gaps)),
max_gap_s=max_gap,
)
def linear_drift_fit(
t_ref_s: np.ndarray,
t_other_s: np.ndarray,
) -> Dict[str, float]:
"""
Fit t_other ≈ a * t_ref + b. Returns drift in ppm and residual stats.
"""
a_ref = np.asarray(t_ref_s, dtype=np.float64)
a_oth = np.asarray(t_other_s, dtype=np.float64)
n = int(min(a_ref.size, a_oth.size))
if n < 3:
return {"n": float(n), "a": 1.0, "b": 0.0, "drift_ppm": 0.0, "rmse_s": 0.0}
x = a_ref[:n]
y = a_oth[:n]
# subtract initial time to reduce conditioning
x0 = float(x[0])
y0 = float(y[0])
x = x - x0
y = y - y0
A = np.stack([x, np.ones_like(x)], axis=1)
coeff, *_ = np.linalg.lstsq(A, y, rcond=None)
a = float(coeff[0])
b = float(coeff[1] + y0 - a * x0)
y_hat = a * (a_ref[:n]) + b
resid = y_hat - a_oth[:n]
rmse = float(np.sqrt(np.mean(resid * resid)))
drift_ppm = float((a - 1.0) * 1e6)
return {"n": float(n), "a": a, "b": b, "drift_ppm": drift_ppm, "rmse_s": rmse}
def _read_wav_mono(path: Path) -> Tuple[np.ndarray, int]:
with wave.open(str(path), "rb") as wf:
sr = int(wf.getframerate())
n = int(wf.getnframes())
chans = int(wf.getnchannels())
sampwidth = int(wf.getsampwidth())
raw = wf.readframes(n)
if sampwidth == 2:
a = np.frombuffer(raw, dtype=np.int16).astype(np.float32)
a /= 32768.0
elif sampwidth == 4:
a = np.frombuffer(raw, dtype=np.int32).astype(np.float32)
a /= 2147483648.0
else:
raise ValueError(f"Unsupported WAV sample width: {sampwidth}")
if chans > 1:
a = a.reshape(-1, chans).mean(axis=1)
return a, sr
def audio_offset_seconds(
ref_wav: Path,
other_wav: Path,
*,
max_lag_s: float = 1.0,
downsample_hz: int = 2000,
) -> float:
"""
Estimate offset between two WAVs via cross-correlation of amplitude envelopes.
Positive means other lags ref (other should be shifted earlier by offset).
"""
ref, sr_ref = _read_wav_mono(ref_wav)
oth, sr_oth = _read_wav_mono(other_wav)
if sr_ref != sr_oth:
# crude resample by decimation to a shared low rate
target = int(min(sr_ref, sr_oth, downsample_hz))
else:
target = int(min(sr_ref, downsample_hz))
def downsample(x: np.ndarray, sr: int) -> np.ndarray:
step = max(1, int(round(sr / target)))
return x[::step]
ref_d = downsample(ref, sr_ref)
oth_d = downsample(oth, sr_oth)
# Use absolute amplitude as simple envelope.
ref_e = np.abs(ref_d)
oth_e = np.abs(oth_d)
ref_e -= float(np.mean(ref_e))
oth_e -= float(np.mean(oth_e))
max_lag = int(round(float(max_lag_s) * target))
# FFT-based correlation
n = int(2 ** int(np.ceil(np.log2(ref_e.size + oth_e.size + 1))))
F = np.fft.rfft(ref_e, n=n)
G = np.fft.rfft(oth_e, n=n)
corr = np.fft.irfft(F * np.conj(G), n=n)
# corr[k] corresponds to lag k (oth shifted by k)
corr = np.concatenate([corr[-(oth_e.size - 1) :], corr[: ref_e.size]])
center = oth_e.size - 1
lo = max(0, center - max_lag)
hi = min(corr.size, center + max_lag + 1)
window = corr[lo:hi]
best = int(np.argmax(window)) + lo
lag = best - center
return float(lag) / float(target)
def sync_sanity_from_timestamps(
timestamps_by_device: Dict[str, np.ndarray],
*,
reference_device: Optional[str] = None,
) -> Dict[str, Any]:
"""
Compute per-device dropped-frame stats and drift relative to a reference device.
"""
if not timestamps_by_device:
return {"ok": False, "reason": "no_timestamps"}
ref_id = reference_device or sorted(timestamps_by_device.keys())[0]
ref = timestamps_by_device.get(ref_id)
if ref is None:
return {"ok": False, "reason": "missing_reference", "reference_device": ref_id}
drop: Dict[str, Any] = {}
drift: Dict[str, Any] = {}
for did, ts in timestamps_by_device.items():
ds = dropped_frames_stats(ts)
drop[did] = {
"num_timestamps": ds.num_timestamps,
"expected_dt_s": ds.expected_dt_s,
"num_gaps": ds.num_gaps,
"max_gap_s": ds.max_gap_s,
}
drift[did] = (
linear_drift_fit(ref, ts)
if did != ref_id
else {"n": float(ts.size), "a": 1.0, "b": 0.0, "drift_ppm": 0.0, "rmse_s": 0.0}
)
return {
"ok": True,
"reference_device": ref_id,
"dropped_frames": drop,
"drift": drift,
}