hetchyy's picture
deploy
bfecf3d verified
"""Pure data-assembly for v3 usage-log JSON columns.
Each build_* function takes explicit inputs (no globals, no thread-locals) so
it's unit-testable without the pipeline. The pipeline reads thread-locals via
helper getters in `zero_gpu` / `pipeline.py` and passes the values down.
Shape spec: `docs/usage-logging-v3.md`.
"""
from __future__ import annotations
from typing import Any, Optional
def _r(v: Optional[float], n: int = 3) -> Optional[float]:
"""Round-or-None helper. Preserves `None` so downstream JSON serializes null."""
if v is None:
return None
try:
return round(float(v), n)
except (TypeError, ValueError):
return None
# ---------------------------------------------------------------------------
# settings
# ---------------------------------------------------------------------------
def build_settings(min_silence_ms: int, min_speech_ms: int, pad_ms: int,
asr_model_id: str, asr_model_label: Optional[str],
device: str, url_source: Optional[str] = None) -> dict:
# Wrap / DP-repetition constants live in config.py. Log them per row so
# historical analyses stay interpretable after tuning.
try:
from config import (
WRAP_PENALTY, WRAP_SPAN_WEIGHT, WRAP_SCORE_COST,
MAX_EDIT_DISTANCE, MAX_EDIT_DISTANCE_RELAXED,
)
align_cfg = {
"wrap_penalty": float(WRAP_PENALTY),
"wrap_span_weight": float(WRAP_SPAN_WEIGHT),
"wrap_score_cost": float(WRAP_SCORE_COST),
# Acceptance thresholds from config — both primary and retry
# logged per-row so threshold-tuning analysis can correlate
# dp_debug.norm_dist against the active thresholds even when
# they're retuned between runs.
"max_edit_distance": float(MAX_EDIT_DISTANCE),
"max_edit_distance_relaxed": float(MAX_EDIT_DISTANCE_RELAXED),
}
except Exception:
align_cfg = None
try:
from config import SEGMENTER_BATCH_SIZE
# CPU path always runs VAD at bs=1 (see src/segmenter/vad.py).
vad_bs = 1 if str(device).lower().startswith("cpu") else int(SEGMENTER_BATCH_SIZE)
except Exception:
vad_bs = None
return {
"min_silence_ms": int(min_silence_ms),
"min_speech_ms": int(min_speech_ms),
"pad_ms": int(pad_ms),
"asr_model": asr_model_id,
"asr_model_label": asr_model_label,
"device": device,
"url_source": url_source,
"align_config": align_cfg,
"vad_batch_size": vad_bs,
}
# ---------------------------------------------------------------------------
# timing (unified CPU + GPU + stage block)
# ---------------------------------------------------------------------------
def build_timing(profiling, cpu_stats: Optional[dict], worker_dispatch: Optional[dict],
lease_stats: Optional[dict], estimate_given_s: Optional[float],
device: str, estimate_formula_s: Optional[float] = None) -> dict:
"""Assemble the v3 `timing` block.
Args:
profiling: ProfilingData instance (stage timings + aggregates).
cpu_stats: `_CPU_STATS_TLS.info` dict, or None (GPU path or no dispatch).
worker_dispatch: `worker_pool._DISPATCH_TLS.info` dict, or None (not using remote pool).
lease_stats: `_LEASE_STATS_TLS.info` dict, or None (CPU path — no lease).
estimate_given_s: Ceil-to-5 value handed to the user — populates
`timing.estimate_given_s`.
estimate_formula_s: Raw formula output pre-ceil — populates
`timing.estimate_formula_s`. Lets estimator tuning separate ceiling
error from slope/intercept error.
device: "gpu" / "cpu".
"""
is_gpu = device.lower() == "gpu" and lease_stats is not None
# stages = non-lease, non-per-batch wall buckets (always present)
stages = {
"resample_s": _r(getattr(profiling, "resample_time", 0.0)),
"anchor_s": _r(getattr(profiling, "anchor_time", 0.0)),
"match_wall_s": _r(getattr(profiling, "match_wall_time", 0.0)),
"result_build_s": _r(getattr(profiling, "result_build_time", 0.0)),
"result_audio_encode_s": _r(getattr(profiling, "result_audio_encode_time", 0.0)),
}
# VAD / ASR: inference dominates (~99.5%). Stage decomposition collapsed
# to wall_s + queue_s only. Per-batch ASR detail (incl QK^T) still lives
# in `asr_batches[]` for the L3-cache-cliff signal.
vad_wall = getattr(profiling, "vad_wall_time", 0.0) or 0.0
vad_gpu = getattr(profiling, "vad_gpu_time", 0.0) or 0.0
asr_wall = getattr(profiling, "asr_time", 0.0) or 0.0
asr_gpu = getattr(profiling, "asr_gpu_time", 0.0) or 0.0
vad = {
"wall_s": _r(vad_wall),
"queue_s": _r(max(0.0, vad_wall - vad_gpu)),
}
asr = {
"wall_s": _r(asr_wall),
"queue_s": _r(max(0.0, asr_wall - asr_gpu)),
}
# DP aggregate block
dp = {
"total_s": _r(getattr(profiling, "phoneme_dp_total_time", 0.0)),
"avg_ms_per_seg": _r(1000 * getattr(profiling, "phoneme_dp_avg_time", 0.0)),
"min_ms_per_seg": _r(1000 * getattr(profiling, "phoneme_dp_min_time", 0.0)),
"max_ms_per_seg": _r(1000 * getattr(profiling, "phoneme_dp_max_time", 0.0)),
"window_setup_s_total": _r(getattr(profiling, "phoneme_window_setup_time", 0.0)),
"ref_build_s": _r(getattr(profiling, "phoneme_ref_build_time", 0.0)),
"num_segments_aligned": int(getattr(profiling, "phoneme_num_segments", 0)),
}
timing = {
"lease_type": (lease_stats or {}).get("lease_type") if is_gpu else ("none" if device.lower() == "cpu" else None),
"lease_requested_s": (lease_stats or {}).get("requested_s") if is_gpu else None,
"lease_cap_hit": (lease_stats or {}).get("cap_hit") if is_gpu else None,
"estimate_given_s": _r(estimate_given_s),
"estimate_formula_s": _r(estimate_formula_s),
"wall_total_s": _r(getattr(profiling, "total_time", 0.0)),
"stages": stages,
"vad": vad,
"asr": asr,
"dp": dp,
}
# CPU block — local subprocess dispatch (the production CPU path)
if cpu_stats:
timing["cpu"] = {
"strategy": cpu_stats.get("strategy"),
"worker_mode": cpu_stats.get("worker_mode"),
"dtype": cpu_stats.get("dtype"),
"concurrency_cap": cpu_stats.get("concurrency_cap"),
"queue_wait_s": _r(cpu_stats.get("queue_wait_s")),
"compute_s": _r(cpu_stats.get("compute_s")),
"peers_at_acquire": cpu_stats.get("peers_at_acquire"),
"peers_at_release": cpu_stats.get("peers_at_release"),
"subprocess_spawn_s": _r(cpu_stats.get("subprocess_spawn_s")),
}
else:
timing["cpu"] = None
# Remote worker-pool dispatch (when CPU_STRATEGY=workers). Kept alongside
# the local `cpu` block; post-hoc analysis can pick whichever is populated.
timing["worker_dispatch"] = worker_dispatch
return timing
# ---------------------------------------------------------------------------
# asr_batches (pass-through of already-shaped profiling.asr_batch_profiling)
# ---------------------------------------------------------------------------
def build_asr_batches(profiling) -> list[dict]:
"""Return the per-batch ASR detail list (already in v3-compatible shape)."""
return list(getattr(profiling, "asr_batch_profiling", None) or [])
# ---------------------------------------------------------------------------
# segments (with dp_debug)
# ---------------------------------------------------------------------------
def build_segments(seg_infos, debug_collector,
word_counts: list, ayah_spans: list,
all_special_refs: set, include_dp_debug: bool = True,
audio=None, sample_rate: int = 16000,
noise_floor_rms: Optional[float] = None) -> list[dict]:
"""Per-segment log entries, one row per seg in `seg_infos`.
DP-trace lookup uses `seg._original_alignment_idx` (preserved across
`_split_fused_segments`) — NOT the post-split enumerate index — so
segments that survived a Basmala/Isti'adha split still find their trace.
`debug_collector.to_dp_debug(idx)` is called lazily — only when
include_dp_debug is True — to avoid the `|`-separator build cost when
disabled via `USAGE_LOG_DISABLE_DP_DEBUG=1`.
"""
out = []
for i, seg in enumerate(seg_infos):
sp_type = seg.matched_ref if seg.matched_ref in all_special_refs else None
entry = {
"idx": i + 1,
"start": round(seg.start_time, 3),
"end": round(seg.end_time, 3),
"duration": round(seg.end_time - seg.start_time, 3),
"ref": seg.matched_ref or "",
"confidence": round(seg.match_score, 3),
"word_count": word_counts[i] if i < len(word_counts) else 0,
"ayah_span": ayah_spans[i] if i < len(ayah_spans) else 0,
"has_repeated_words": seg.has_repeated_words,
"has_missing_words": seg.has_missing_words,
"special_type": sp_type,
}
if seg.repeated_ranges:
entry["repeated_ranges"] = seg.repeated_ranges
if seg.repeated_text:
entry["repeated_text"] = seg.repeated_text
if include_dp_debug and debug_collector is not None:
trace_idx = getattr(seg, "_original_alignment_idx", None)
if trace_idx is None:
trace_idx = i # fallback for unsplit segments
dbg = debug_collector.to_dp_debug(trace_idx)
if dbg is not None:
entry["dp_debug"] = dbg
# Per-segment audio stats (3.1.3+) — rms/peak/snr_db computed on
# the segment slice. SNR uses clip-level noise floor from the
# non-speech VAD concat. Unlocks segment-granular correlations in
# 07/08/10 (previously only clip-level `audio_rms` available).
if audio is not None:
from .audio_analytics import segment_audio_stats
entry["audio_stats"] = segment_audio_stats(
audio, sample_rate,
seg.start_time, seg.end_time,
noise_floor_rms,
)
out.append(entry)
return out
# ---------------------------------------------------------------------------
# events (pass-through of DebugCollector.events)
# ---------------------------------------------------------------------------
def build_events(debug_collector) -> list[dict]:
if debug_collector is None:
return []
return list(debug_collector.events)
# ---------------------------------------------------------------------------
# anchor (pass-through of DebugCollector.anchor)
# ---------------------------------------------------------------------------
def build_anchor(debug_collector) -> dict:
if debug_collector is None:
return {}
return dict(debug_collector.anchor)
# ---------------------------------------------------------------------------
# reciter_stats (adds audio_rms / audio_peak; drops pps)
# ---------------------------------------------------------------------------
def build_reciter_stats(wpm: float, avg_seg_dur: float, std_seg_dur: float,
avg_pause_dur: float, std_pause_dur: float,
audio,
audio_analytics: Optional[dict] = None) -> dict:
"""audio is the float32 mono 16kHz waveform already in scope at log time.
When `audio_analytics` is supplied (3.1.3+), pull `rms/peak` plus the
additive amplitude fields from its `whole` block instead of calling
`audio_rms_peak` a second time. Falls back to `audio_rms_peak` when
analytics computation was skipped.
"""
whole = (audio_analytics or {}).get("whole") or {}
if whole:
rms = whole.get("rms", 0.0)
peak = whole.get("peak", 0.0)
dc = whole.get("dc_offset")
p99 = whole.get("p99")
p01 = whole.get("p01")
crest = whole.get("crest")
dyn_range_db = whole.get("dyn_range_db")
else:
from .audio_stats import audio_rms_peak
rms, peak = audio_rms_peak(audio) if audio is not None else (0.0, 0.0)
dc = p99 = p01 = crest = dyn_range_db = None
return {
"wpm": _r(wpm, 2),
"avg_seg_dur": _r(avg_seg_dur, 2),
"std_seg_dur": _r(std_seg_dur, 2),
"avg_pause_dur": _r(avg_pause_dur, 2),
"std_pause_dur": _r(std_pause_dur, 2),
"audio_rms": _r(rms, 5),
"audio_peak": _r(peak, 5),
"audio_dc_offset": _r(dc, 6),
"audio_p99": _r(p99, 5),
"audio_p01": _r(p01, 5),
"audio_crest": _r(crest, 3),
"audio_dyn_range_db": _r(dyn_range_db, 2),
}
# ---------------------------------------------------------------------------
# results_summary
# ---------------------------------------------------------------------------
def _parse_detected_surahs(segments) -> list[int]:
"""Sorted distinct surah numbers across all aligned segments.
Derived from each segment's `matched_ref` leading "surah:" prefix.
Skips specials (Basmala/Isti'adha/Amin/etc.) and empty refs.
"""
seen: set[int] = set()
for s in segments:
ref = getattr(s, "matched_ref", None) or ""
if not ref or ":" not in ref:
continue # special or no match
head = ref.split("-", 1)[0] # handle "37:151:3-37:152:2"
try:
seen.add(int(head.split(":", 1)[0]))
except ValueError:
continue
return sorted(seen)
def build_results_summary(segments: list, profiling,
total_speech_s: float,
missing_word_count: int) -> dict:
all_scores = [s.match_score for s in segments]
mean_conf = (sum(all_scores) / len(all_scores)) if all_scores else 0.0
min_conf = min(all_scores) if all_scores else 0.0
passed = int(getattr(profiling, "segments_passed", 0) or 0)
return {
"detected_surahs": _parse_detected_surahs(segments),
"num_segments": len(segments),
"missing_word_count": int(missing_word_count),
"total_speech_s": round(float(total_speech_s), 3),
"mean_confidence": _r(mean_conf, 3),
"min_confidence": _r(min_conf, 3),
"segments_passed": passed,
"retry_attempts": int(getattr(profiling, "retry_attempts", 0) or 0),
"retry_passed": int(getattr(profiling, "retry_passed", 0) or 0),
"reanchors": int(getattr(profiling, "consec_reanchors", 0) or 0),
"special_merges": int(getattr(profiling, "special_merges", 0) or 0),
"transition_skips": int(getattr(profiling, "transition_skips", 0) or 0),
"wraps_detected": int(getattr(profiling, "phoneme_wraps_detected", 0) or 0),
}
# ---------------------------------------------------------------------------
# gpu_memory (small one-liner — inlined at call site but exposed for parity)
# ---------------------------------------------------------------------------
def build_gpu_memory(profiling, device: str) -> Optional[dict]:
if device.lower() != "gpu":
return None
return {
"peak_vram_mb": _r(getattr(profiling, "gpu_peak_vram_mb", 0.0), 2),
"reserved_vram_mb": _r(getattr(profiling, "gpu_reserved_vram_mb", 0.0), 2),
}