quranic-universal-aligner / src /core /debug_collector.py
hetchyy's picture
Upload folder using huggingface_hub
602b5d3 verified
"""Thread-local debug data collector for the hidden debug API.
When active, pipeline stages append structured debug data to the collector
instead of (or in addition to) printing to stdout. The collector is the
single source of truth for the debug endpoint response.
"""
import threading
_ctx = threading.local()
class DebugCollector:
"""Accumulates structured debug data from all pipeline stages."""
__slots__ = ("vad", "asr", "anchor", "specials", "alignment", "events", "_profiling")
def __init__(self):
self._profiling = None # ProfilingData set by pipeline after completion
self.vad = {} # raw/cleaned intervals, counts, params
self.asr = {} # per-segment phonemes, model info
self.anchor = {} # voting results, surah ranking, best run
self.specials = { # special segment detection
"candidates_tested": [],
"detected": [],
"first_quran_idx": 0,
}
self.alignment = [] # per-segment DP results
self.events = [] # reanchors, chapter transitions, retries, gaps, etc.
def add_event(self, event_type, **kwargs):
"""Append a pipeline event (gap, retry, reanchor, transition, etc.)."""
self.events.append({"type": event_type, **kwargs})
def add_special_candidate(self, segment_idx, candidate_type, edit_distance,
threshold, matched):
"""Record a special/transition detection attempt."""
self.specials["candidates_tested"].append({
"segment_idx": segment_idx,
"type": candidate_type,
"edit_distance": round(edit_distance, 4),
"threshold": threshold,
"matched": matched,
})
def add_special_detected(self, segment_idx, special_type, confidence):
"""Record a confirmed special segment detection."""
self.specials["detected"].append({
"segment_idx": segment_idx,
"type": special_type,
"confidence": round(confidence, 4),
})
def add_alignment_result(self, segment_idx, asr_phonemes, window,
expected_pointer, result=None, timing=None,
retry_tier=None, failed_reason=None):
"""Record a per-segment alignment result."""
entry = {
"segment_idx": segment_idx,
"asr_phonemes": " ".join(asr_phonemes[:60]) + ("..." if len(asr_phonemes) > 60 else ""),
"asr_phoneme_count": len(asr_phonemes),
"window": window,
"expected_pointer": expected_pointer,
"retry_tier": retry_tier,
}
if result is not None:
entry["result"] = result
if timing is not None:
entry["timing"] = {
"window_setup_ms": round(timing.get("window_setup_time", 0) * 1000, 3),
"dp_ms": round(timing.get("dp_time", 0) * 1000, 3),
"result_build_ms": round(timing.get("result_build_time", 0) * 1000, 3),
}
if failed_reason is not None:
entry["failed_reason"] = failed_reason
self.alignment.append(entry)
def to_dict(self):
"""Serialize collector to JSON-safe dict."""
return {
"vad": self.vad,
"asr": self.asr,
"anchor": self.anchor,
"specials": self.specials,
"alignment_detail": self.alignment,
"events": self.events,
}
def start_debug_collection():
"""Activate a DebugCollector for the current thread."""
_ctx.collector = DebugCollector()
def get_debug_collector():
"""Return the active collector, or None if not in debug mode."""
return getattr(_ctx, "collector", None)
def stop_debug_collection():
"""Deactivate and return the collector for the current thread."""
c = getattr(_ctx, "collector", None)
_ctx.collector = None
return c