Spaces:
Running on Zero
Running on Zero
| """Pure data-assembly for v3 usage-log JSON columns. | |
| Each build_* function takes explicit inputs (no globals, no thread-locals) so | |
| it's unit-testable without the pipeline. The pipeline reads thread-locals via | |
| helper getters in `zero_gpu` / `pipeline.py` and passes the values down. | |
| Shape spec: `docs/usage-logging-v3.md`. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any, Optional | |
| def _r(v: Optional[float], n: int = 3) -> Optional[float]: | |
| """Round-or-None helper. Preserves `None` so downstream JSON serializes null.""" | |
| if v is None: | |
| return None | |
| try: | |
| return round(float(v), n) | |
| except (TypeError, ValueError): | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # settings | |
| # --------------------------------------------------------------------------- | |
| def build_settings(min_silence_ms: int, min_speech_ms: int, pad_ms: int, | |
| asr_model_id: str, asr_model_label: Optional[str], | |
| device: str, url_source: Optional[str] = None) -> dict: | |
| # Wrap / DP-repetition constants live in config.py. Log them per row so | |
| # historical analyses stay interpretable after tuning. | |
| try: | |
| from config import ( | |
| WRAP_PENALTY, WRAP_SPAN_WEIGHT, WRAP_SCORE_COST, | |
| MAX_EDIT_DISTANCE, MAX_EDIT_DISTANCE_RELAXED, | |
| ) | |
| align_cfg = { | |
| "wrap_penalty": float(WRAP_PENALTY), | |
| "wrap_span_weight": float(WRAP_SPAN_WEIGHT), | |
| "wrap_score_cost": float(WRAP_SCORE_COST), | |
| # Acceptance thresholds from config — both primary and retry | |
| # logged per-row so threshold-tuning analysis can correlate | |
| # dp_debug.norm_dist against the active thresholds even when | |
| # they're retuned between runs. | |
| "max_edit_distance": float(MAX_EDIT_DISTANCE), | |
| "max_edit_distance_relaxed": float(MAX_EDIT_DISTANCE_RELAXED), | |
| } | |
| except Exception: | |
| align_cfg = None | |
| try: | |
| from config import SEGMENTER_BATCH_SIZE | |
| # CPU path always runs VAD at bs=1 (see src/segmenter/vad.py). | |
| vad_bs = 1 if str(device).lower().startswith("cpu") else int(SEGMENTER_BATCH_SIZE) | |
| except Exception: | |
| vad_bs = None | |
| return { | |
| "min_silence_ms": int(min_silence_ms), | |
| "min_speech_ms": int(min_speech_ms), | |
| "pad_ms": int(pad_ms), | |
| "asr_model": asr_model_id, | |
| "asr_model_label": asr_model_label, | |
| "device": device, | |
| "url_source": url_source, | |
| "align_config": align_cfg, | |
| "vad_batch_size": vad_bs, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # timing (unified CPU + GPU + stage block) | |
| # --------------------------------------------------------------------------- | |
| def build_timing(profiling, cpu_stats: Optional[dict], worker_dispatch: Optional[dict], | |
| lease_stats: Optional[dict], estimate_given_s: Optional[float], | |
| device: str, estimate_formula_s: Optional[float] = None) -> dict: | |
| """Assemble the v3 `timing` block. | |
| Args: | |
| profiling: ProfilingData instance (stage timings + aggregates). | |
| cpu_stats: `_CPU_STATS_TLS.info` dict, or None (GPU path or no dispatch). | |
| worker_dispatch: `worker_pool._DISPATCH_TLS.info` dict, or None (not using remote pool). | |
| lease_stats: `_LEASE_STATS_TLS.info` dict, or None (CPU path — no lease). | |
| estimate_given_s: Ceil-to-5 value handed to the user — populates | |
| `timing.estimate_given_s`. | |
| estimate_formula_s: Raw formula output pre-ceil — populates | |
| `timing.estimate_formula_s`. Lets estimator tuning separate ceiling | |
| error from slope/intercept error. | |
| device: "gpu" / "cpu". | |
| """ | |
| is_gpu = device.lower() == "gpu" and lease_stats is not None | |
| # stages = non-lease, non-per-batch wall buckets (always present) | |
| stages = { | |
| "resample_s": _r(getattr(profiling, "resample_time", 0.0)), | |
| "anchor_s": _r(getattr(profiling, "anchor_time", 0.0)), | |
| "match_wall_s": _r(getattr(profiling, "match_wall_time", 0.0)), | |
| "result_build_s": _r(getattr(profiling, "result_build_time", 0.0)), | |
| "result_audio_encode_s": _r(getattr(profiling, "result_audio_encode_time", 0.0)), | |
| } | |
| # VAD / ASR: inference dominates (~99.5%). Stage decomposition collapsed | |
| # to wall_s + queue_s only. Per-batch ASR detail (incl QK^T) still lives | |
| # in `asr_batches[]` for the L3-cache-cliff signal. | |
| vad_wall = getattr(profiling, "vad_wall_time", 0.0) or 0.0 | |
| vad_gpu = getattr(profiling, "vad_gpu_time", 0.0) or 0.0 | |
| asr_wall = getattr(profiling, "asr_time", 0.0) or 0.0 | |
| asr_gpu = getattr(profiling, "asr_gpu_time", 0.0) or 0.0 | |
| vad = { | |
| "wall_s": _r(vad_wall), | |
| "queue_s": _r(max(0.0, vad_wall - vad_gpu)), | |
| } | |
| asr = { | |
| "wall_s": _r(asr_wall), | |
| "queue_s": _r(max(0.0, asr_wall - asr_gpu)), | |
| } | |
| # DP aggregate block | |
| dp = { | |
| "total_s": _r(getattr(profiling, "phoneme_dp_total_time", 0.0)), | |
| "avg_ms_per_seg": _r(1000 * getattr(profiling, "phoneme_dp_avg_time", 0.0)), | |
| "min_ms_per_seg": _r(1000 * getattr(profiling, "phoneme_dp_min_time", 0.0)), | |
| "max_ms_per_seg": _r(1000 * getattr(profiling, "phoneme_dp_max_time", 0.0)), | |
| "window_setup_s_total": _r(getattr(profiling, "phoneme_window_setup_time", 0.0)), | |
| "ref_build_s": _r(getattr(profiling, "phoneme_ref_build_time", 0.0)), | |
| "num_segments_aligned": int(getattr(profiling, "phoneme_num_segments", 0)), | |
| } | |
| timing = { | |
| "lease_type": (lease_stats or {}).get("lease_type") if is_gpu else ("none" if device.lower() == "cpu" else None), | |
| "lease_requested_s": (lease_stats or {}).get("requested_s") if is_gpu else None, | |
| "lease_cap_hit": (lease_stats or {}).get("cap_hit") if is_gpu else None, | |
| "estimate_given_s": _r(estimate_given_s), | |
| "estimate_formula_s": _r(estimate_formula_s), | |
| "wall_total_s": _r(getattr(profiling, "total_time", 0.0)), | |
| "stages": stages, | |
| "vad": vad, | |
| "asr": asr, | |
| "dp": dp, | |
| } | |
| # CPU block — local subprocess dispatch (the production CPU path) | |
| if cpu_stats: | |
| timing["cpu"] = { | |
| "strategy": cpu_stats.get("strategy"), | |
| "worker_mode": cpu_stats.get("worker_mode"), | |
| "dtype": cpu_stats.get("dtype"), | |
| "concurrency_cap": cpu_stats.get("concurrency_cap"), | |
| "queue_wait_s": _r(cpu_stats.get("queue_wait_s")), | |
| "compute_s": _r(cpu_stats.get("compute_s")), | |
| "peers_at_acquire": cpu_stats.get("peers_at_acquire"), | |
| "peers_at_release": cpu_stats.get("peers_at_release"), | |
| "subprocess_spawn_s": _r(cpu_stats.get("subprocess_spawn_s")), | |
| } | |
| else: | |
| timing["cpu"] = None | |
| # Remote worker-pool dispatch (when CPU_STRATEGY=workers). Kept alongside | |
| # the local `cpu` block; post-hoc analysis can pick whichever is populated. | |
| timing["worker_dispatch"] = worker_dispatch | |
| return timing | |
| # --------------------------------------------------------------------------- | |
| # asr_batches (pass-through of already-shaped profiling.asr_batch_profiling) | |
| # --------------------------------------------------------------------------- | |
| def build_asr_batches(profiling) -> list[dict]: | |
| """Return the per-batch ASR detail list (already in v3-compatible shape).""" | |
| return list(getattr(profiling, "asr_batch_profiling", None) or []) | |
| # --------------------------------------------------------------------------- | |
| # segments (with dp_debug) | |
| # --------------------------------------------------------------------------- | |
| def build_segments(seg_infos, debug_collector, | |
| word_counts: list, ayah_spans: list, | |
| all_special_refs: set, include_dp_debug: bool = True, | |
| audio=None, sample_rate: int = 16000, | |
| noise_floor_rms: Optional[float] = None) -> list[dict]: | |
| """Per-segment log entries, one row per seg in `seg_infos`. | |
| DP-trace lookup uses `seg._original_alignment_idx` (preserved across | |
| `_split_fused_segments`) — NOT the post-split enumerate index — so | |
| segments that survived a Basmala/Isti'adha split still find their trace. | |
| `debug_collector.to_dp_debug(idx)` is called lazily — only when | |
| include_dp_debug is True — to avoid the `|`-separator build cost when | |
| disabled via `USAGE_LOG_DISABLE_DP_DEBUG=1`. | |
| """ | |
| out = [] | |
| for i, seg in enumerate(seg_infos): | |
| sp_type = seg.matched_ref if seg.matched_ref in all_special_refs else None | |
| entry = { | |
| "idx": i + 1, | |
| "start": round(seg.start_time, 3), | |
| "end": round(seg.end_time, 3), | |
| "duration": round(seg.end_time - seg.start_time, 3), | |
| "ref": seg.matched_ref or "", | |
| "confidence": round(seg.match_score, 3), | |
| "word_count": word_counts[i] if i < len(word_counts) else 0, | |
| "ayah_span": ayah_spans[i] if i < len(ayah_spans) else 0, | |
| "has_repeated_words": seg.has_repeated_words, | |
| "has_missing_words": seg.has_missing_words, | |
| "special_type": sp_type, | |
| } | |
| if seg.repeated_ranges: | |
| entry["repeated_ranges"] = seg.repeated_ranges | |
| if seg.repeated_text: | |
| entry["repeated_text"] = seg.repeated_text | |
| if include_dp_debug and debug_collector is not None: | |
| trace_idx = getattr(seg, "_original_alignment_idx", None) | |
| if trace_idx is None: | |
| trace_idx = i # fallback for unsplit segments | |
| dbg = debug_collector.to_dp_debug(trace_idx) | |
| if dbg is not None: | |
| entry["dp_debug"] = dbg | |
| # Per-segment audio stats (3.1.3+) — rms/peak/snr_db computed on | |
| # the segment slice. SNR uses clip-level noise floor from the | |
| # non-speech VAD concat. Unlocks segment-granular correlations in | |
| # 07/08/10 (previously only clip-level `audio_rms` available). | |
| if audio is not None: | |
| from .audio_analytics import segment_audio_stats | |
| entry["audio_stats"] = segment_audio_stats( | |
| audio, sample_rate, | |
| seg.start_time, seg.end_time, | |
| noise_floor_rms, | |
| ) | |
| out.append(entry) | |
| return out | |
| # --------------------------------------------------------------------------- | |
| # events (pass-through of DebugCollector.events) | |
| # --------------------------------------------------------------------------- | |
| def build_events(debug_collector) -> list[dict]: | |
| if debug_collector is None: | |
| return [] | |
| return list(debug_collector.events) | |
| # --------------------------------------------------------------------------- | |
| # anchor (pass-through of DebugCollector.anchor) | |
| # --------------------------------------------------------------------------- | |
| def build_anchor(debug_collector) -> dict: | |
| if debug_collector is None: | |
| return {} | |
| return dict(debug_collector.anchor) | |
| # --------------------------------------------------------------------------- | |
| # reciter_stats (adds audio_rms / audio_peak; drops pps) | |
| # --------------------------------------------------------------------------- | |
| def build_reciter_stats(wpm: float, avg_seg_dur: float, std_seg_dur: float, | |
| avg_pause_dur: float, std_pause_dur: float, | |
| audio, | |
| audio_analytics: Optional[dict] = None) -> dict: | |
| """audio is the float32 mono 16kHz waveform already in scope at log time. | |
| When `audio_analytics` is supplied (3.1.3+), pull `rms/peak` plus the | |
| additive amplitude fields from its `whole` block instead of calling | |
| `audio_rms_peak` a second time. Falls back to `audio_rms_peak` when | |
| analytics computation was skipped. | |
| """ | |
| whole = (audio_analytics or {}).get("whole") or {} | |
| if whole: | |
| rms = whole.get("rms", 0.0) | |
| peak = whole.get("peak", 0.0) | |
| dc = whole.get("dc_offset") | |
| p99 = whole.get("p99") | |
| p01 = whole.get("p01") | |
| crest = whole.get("crest") | |
| dyn_range_db = whole.get("dyn_range_db") | |
| else: | |
| from .audio_stats import audio_rms_peak | |
| rms, peak = audio_rms_peak(audio) if audio is not None else (0.0, 0.0) | |
| dc = p99 = p01 = crest = dyn_range_db = None | |
| return { | |
| "wpm": _r(wpm, 2), | |
| "avg_seg_dur": _r(avg_seg_dur, 2), | |
| "std_seg_dur": _r(std_seg_dur, 2), | |
| "avg_pause_dur": _r(avg_pause_dur, 2), | |
| "std_pause_dur": _r(std_pause_dur, 2), | |
| "audio_rms": _r(rms, 5), | |
| "audio_peak": _r(peak, 5), | |
| "audio_dc_offset": _r(dc, 6), | |
| "audio_p99": _r(p99, 5), | |
| "audio_p01": _r(p01, 5), | |
| "audio_crest": _r(crest, 3), | |
| "audio_dyn_range_db": _r(dyn_range_db, 2), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # results_summary | |
| # --------------------------------------------------------------------------- | |
| def _parse_detected_surahs(segments) -> list[int]: | |
| """Sorted distinct surah numbers across all aligned segments. | |
| Derived from each segment's `matched_ref` leading "surah:" prefix. | |
| Skips specials (Basmala/Isti'adha/Amin/etc.) and empty refs. | |
| """ | |
| seen: set[int] = set() | |
| for s in segments: | |
| ref = getattr(s, "matched_ref", None) or "" | |
| if not ref or ":" not in ref: | |
| continue # special or no match | |
| head = ref.split("-", 1)[0] # handle "37:151:3-37:152:2" | |
| try: | |
| seen.add(int(head.split(":", 1)[0])) | |
| except ValueError: | |
| continue | |
| return sorted(seen) | |
| def build_results_summary(segments: list, profiling, | |
| total_speech_s: float, | |
| missing_word_count: int) -> dict: | |
| all_scores = [s.match_score for s in segments] | |
| mean_conf = (sum(all_scores) / len(all_scores)) if all_scores else 0.0 | |
| min_conf = min(all_scores) if all_scores else 0.0 | |
| passed = int(getattr(profiling, "segments_passed", 0) or 0) | |
| return { | |
| "detected_surahs": _parse_detected_surahs(segments), | |
| "num_segments": len(segments), | |
| "missing_word_count": int(missing_word_count), | |
| "total_speech_s": round(float(total_speech_s), 3), | |
| "mean_confidence": _r(mean_conf, 3), | |
| "min_confidence": _r(min_conf, 3), | |
| "segments_passed": passed, | |
| "retry_attempts": int(getattr(profiling, "retry_attempts", 0) or 0), | |
| "retry_passed": int(getattr(profiling, "retry_passed", 0) or 0), | |
| "reanchors": int(getattr(profiling, "consec_reanchors", 0) or 0), | |
| "special_merges": int(getattr(profiling, "special_merges", 0) or 0), | |
| "transition_skips": int(getattr(profiling, "transition_skips", 0) or 0), | |
| "wraps_detected": int(getattr(profiling, "phoneme_wraps_detected", 0) or 0), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # gpu_memory (small one-liner — inlined at call site but exposed for parity) | |
| # --------------------------------------------------------------------------- | |
| def build_gpu_memory(profiling, device: str) -> Optional[dict]: | |
| if device.lower() != "gpu": | |
| return None | |
| return { | |
| "peak_vram_mb": _r(getattr(profiling, "gpu_peak_vram_mb", 0.0), 2), | |
| "reserved_vram_mb": _r(getattr(profiling, "gpu_reserved_vram_mb", 0.0), 2), | |
| } | |