File size: 6,989 Bytes
7a87926
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""
Timebase alignment helpers (Phase 1).

We support:
- validating precomputed `sync_offsets.json` (handled elsewhere)
- computing sanity metrics from timestamps (drops/drift)
- (optional) audio pulse alignment from WAV files (cross-correlation)

All functions are dependency-light (std lib + numpy).
"""

from __future__ import annotations

import json
import wave
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import numpy as np


def load_timestamps_seconds(path: Path) -> np.ndarray:
    """
    Load timestamps (seconds) from JSON.

    Supported:
    - {"t": [...]} or {"timestamps_s": [...]}
    - [...] (list of numbers)
    """

    obj = json.loads(Path(path).read_text())
    if isinstance(obj, dict):
        if "t" in obj and isinstance(obj["t"], list):
            arr = np.asarray(obj["t"], dtype=np.float64)
        elif "timestamps_s" in obj and isinstance(obj["timestamps_s"], list):
            arr = np.asarray(obj["timestamps_s"], dtype=np.float64)
        else:
            raise ValueError(f"Unsupported timestamps JSON schema: {path}")
    elif isinstance(obj, list):
        arr = np.asarray(obj, dtype=np.float64)
    else:
        raise ValueError(f"Unsupported timestamps JSON schema: {path}")

    if arr.ndim != 1:
        raise ValueError(f"Timestamps must be 1D, got shape {arr.shape} in {path}")
    return arr


@dataclass(frozen=True)
class DropStats:
    num_timestamps: int
    expected_dt_s: float
    num_gaps: int
    max_gap_s: float


def dropped_frames_stats(
    timestamps_s: np.ndarray,
    *,
    expected_dt_s: Optional[float] = None,
    gap_factor: float = 1.5,
) -> DropStats:
    """
    Heuristic dropped-frame detection via large timestamp gaps.
    """

    ts = np.asarray(timestamps_s, dtype=np.float64)
    if ts.size < 2:
        return DropStats(num_timestamps=int(ts.size), expected_dt_s=0.0, num_gaps=0, max_gap_s=0.0)

    dts = np.diff(ts)
    dts = dts[np.isfinite(dts) & (dts > 0)]
    if dts.size == 0:
        return DropStats(num_timestamps=int(ts.size), expected_dt_s=0.0, num_gaps=0, max_gap_s=0.0)

    exp = float(expected_dt_s) if expected_dt_s is not None else float(np.median(dts))
    gaps = dts > (float(gap_factor) * exp)
    max_gap = float(np.max(dts)) if dts.size else 0.0
    return DropStats(
        num_timestamps=int(ts.size),
        expected_dt_s=exp,
        num_gaps=int(np.sum(gaps)),
        max_gap_s=max_gap,
    )


def linear_drift_fit(
    t_ref_s: np.ndarray,
    t_other_s: np.ndarray,
) -> Dict[str, float]:
    """
    Fit t_other ≈ a * t_ref + b. Returns drift in ppm and residual stats.
    """

    a_ref = np.asarray(t_ref_s, dtype=np.float64)
    a_oth = np.asarray(t_other_s, dtype=np.float64)
    n = int(min(a_ref.size, a_oth.size))
    if n < 3:
        return {"n": float(n), "a": 1.0, "b": 0.0, "drift_ppm": 0.0, "rmse_s": 0.0}

    x = a_ref[:n]
    y = a_oth[:n]
    # subtract initial time to reduce conditioning
    x0 = float(x[0])
    y0 = float(y[0])
    x = x - x0
    y = y - y0

    A = np.stack([x, np.ones_like(x)], axis=1)
    coeff, *_ = np.linalg.lstsq(A, y, rcond=None)
    a = float(coeff[0])
    b = float(coeff[1] + y0 - a * x0)

    y_hat = a * (a_ref[:n]) + b
    resid = y_hat - a_oth[:n]
    rmse = float(np.sqrt(np.mean(resid * resid)))
    drift_ppm = float((a - 1.0) * 1e6)
    return {"n": float(n), "a": a, "b": b, "drift_ppm": drift_ppm, "rmse_s": rmse}


def _read_wav_mono(path: Path) -> Tuple[np.ndarray, int]:
    with wave.open(str(path), "rb") as wf:
        sr = int(wf.getframerate())
        n = int(wf.getnframes())
        chans = int(wf.getnchannels())
        sampwidth = int(wf.getsampwidth())
        raw = wf.readframes(n)
    if sampwidth == 2:
        a = np.frombuffer(raw, dtype=np.int16).astype(np.float32)
        a /= 32768.0
    elif sampwidth == 4:
        a = np.frombuffer(raw, dtype=np.int32).astype(np.float32)
        a /= 2147483648.0
    else:
        raise ValueError(f"Unsupported WAV sample width: {sampwidth}")
    if chans > 1:
        a = a.reshape(-1, chans).mean(axis=1)
    return a, sr


def audio_offset_seconds(
    ref_wav: Path,
    other_wav: Path,
    *,
    max_lag_s: float = 1.0,
    downsample_hz: int = 2000,
) -> float:
    """
    Estimate offset between two WAVs via cross-correlation of amplitude envelopes.
    Positive means other lags ref (other should be shifted earlier by offset).
    """

    ref, sr_ref = _read_wav_mono(ref_wav)
    oth, sr_oth = _read_wav_mono(other_wav)
    if sr_ref != sr_oth:
        # crude resample by decimation to a shared low rate
        target = int(min(sr_ref, sr_oth, downsample_hz))
    else:
        target = int(min(sr_ref, downsample_hz))

    def downsample(x: np.ndarray, sr: int) -> np.ndarray:
        step = max(1, int(round(sr / target)))
        return x[::step]

    ref_d = downsample(ref, sr_ref)
    oth_d = downsample(oth, sr_oth)

    # Use absolute amplitude as simple envelope.
    ref_e = np.abs(ref_d)
    oth_e = np.abs(oth_d)
    ref_e -= float(np.mean(ref_e))
    oth_e -= float(np.mean(oth_e))

    max_lag = int(round(float(max_lag_s) * target))
    # FFT-based correlation
    n = int(2 ** int(np.ceil(np.log2(ref_e.size + oth_e.size + 1))))
    F = np.fft.rfft(ref_e, n=n)
    G = np.fft.rfft(oth_e, n=n)
    corr = np.fft.irfft(F * np.conj(G), n=n)
    # corr[k] corresponds to lag k (oth shifted by k)
    corr = np.concatenate([corr[-(oth_e.size - 1) :], corr[: ref_e.size]])
    center = oth_e.size - 1
    lo = max(0, center - max_lag)
    hi = min(corr.size, center + max_lag + 1)
    window = corr[lo:hi]
    best = int(np.argmax(window)) + lo
    lag = best - center
    return float(lag) / float(target)


def sync_sanity_from_timestamps(
    timestamps_by_device: Dict[str, np.ndarray],
    *,
    reference_device: Optional[str] = None,
) -> Dict[str, Any]:
    """
    Compute per-device dropped-frame stats and drift relative to a reference device.
    """

    if not timestamps_by_device:
        return {"ok": False, "reason": "no_timestamps"}
    ref_id = reference_device or sorted(timestamps_by_device.keys())[0]
    ref = timestamps_by_device.get(ref_id)
    if ref is None:
        return {"ok": False, "reason": "missing_reference", "reference_device": ref_id}

    drop: Dict[str, Any] = {}
    drift: Dict[str, Any] = {}
    for did, ts in timestamps_by_device.items():
        ds = dropped_frames_stats(ts)
        drop[did] = {
            "num_timestamps": ds.num_timestamps,
            "expected_dt_s": ds.expected_dt_s,
            "num_gaps": ds.num_gaps,
            "max_gap_s": ds.max_gap_s,
        }
        drift[did] = (
            linear_drift_fit(ref, ts)
            if did != ref_id
            else {"n": float(ts.size), "a": 1.0, "b": 0.0, "drift_ppm": 0.0, "rmse_s": 0.0}
        )

    return {
        "ok": True,
        "reference_device": ref_id,
        "dropped_frames": drop,
        "drift": drift,
    }