"""Smart OCR deduplication — stabilization-first approach. Core principle: **don't read text until it STOPS CHANGING**. Then check against read history to avoid repeats. Architecture: Phase 1 — **Snapshot Stabilization** Each tick compares the full OCR output (all regions merged) with the previous tick. If text is growing (typewriter effect), we wait. Only when the snapshot is identical for ``stabilize_ticks`` consecutive ticks do we consider it "stable" and proceed. Phase 2 — **Line History Dedup** Once stable, each line is fuzzy-compared against a history of previously emitted lines. Only genuinely new lines pass through. History entries expire via TTL so the same text can be re-read after a cooldown. Phase 3 — **Significance Check** Rejects composed output that is too short, has too few real words, or is mostly non-alphanumeric (OCR garbage / UI artifacts). This naturally handles: - **Typewriter effects**: text grows → wait → stabilize → read complete sentence - **Static UI** (HP bars, names): stabilizes → read once → in history → skip - **OCR noise**: fuzzy matching tolerates minor variations - **Dialog changes**: snapshot changes → re-stabilize → emit new parts only - **Repeated dialog**: TTL expiry allows re-reading after cooldown Usage:: from src.services.ocr.dedup import SmartDedup dedup = SmartDedup() text = dedup.process(region_labels, ocr_results) if text is not None: translate_and_speak(text) """ from __future__ import annotations import time from collections import deque from dataclasses import dataclass from difflib import SequenceMatcher from src.services.ocr.models import OcrResult from src.utils.logger import logger # ── Constants (sensible defaults) ──────────────────────────────── DEFAULT_STABILIZE_TICKS: int = 3 DEFAULT_SNAPSHOT_SIMILARITY: float = 0.92 DEFAULT_LINE_SIMILARITY: float = 0.80 DEFAULT_LINE_TTL: float = 120.0 DEFAULT_HISTORY_TTL: float = 90.0 DEFAULT_HISTORY_SIZE: int = 30 DEFAULT_MIN_NEW_CHARS: int = 8 DEFAULT_MIN_NEW_WORDS: int = 2 DEFAULT_MIN_ALNUM_RATIO: float = 0.35 # ── Data classes ───────────────────────────────────────────────── @dataclass class HistoryEntry: """An entry in the global text history ring buffer.""" norm_text: str original_text: str first_seen: float last_seen: float hit_count: int = 1 @dataclass class DedupConfig: """All tunable knobs for the dedup system. Attributes: stabilize_ticks: Consecutive identical ticks before text is considered "stable". snapshot_similarity: Fuzzy threshold for treating two snapshots as identical (0-1). line_similarity: Fuzzy threshold for line-level history matching (0-1). line_ttl: Seconds before a known line in history expires. history_ttl: Seconds before a global history entry expires. history_size: Max entries in the global history ring buffer. history_similarity: Alias for line_similarity (backward compat with bridge.py). min_new_chars: Minimum characters for a change to be significant. min_new_words: Minimum word count for significance. min_alnum_ratio: Minimum alphanumeric ratio for significance. debounce_time: Legacy field — not used internally, kept for bridge compat. """ stabilize_ticks: int = DEFAULT_STABILIZE_TICKS snapshot_similarity: float = DEFAULT_SNAPSHOT_SIMILARITY line_similarity: float = DEFAULT_LINE_SIMILARITY line_ttl: float = DEFAULT_LINE_TTL history_ttl: float = DEFAULT_HISTORY_TTL history_size: int = DEFAULT_HISTORY_SIZE history_similarity: float = DEFAULT_LINE_SIMILARITY min_new_chars: int = DEFAULT_MIN_NEW_CHARS min_new_words: int = DEFAULT_MIN_NEW_WORDS min_alnum_ratio: float = DEFAULT_MIN_ALNUM_RATIO debounce_time: float = 0.0 # legacy — mapped to stabilize_ticks externally instant_mode: bool = False # skip stabilization — emit text on first identical tick # ── Helpers ────────────────────────────────────────────────────── def _normalize(text: str) -> str: """Collapse whitespace, strip, lowercase — for comparison only.""" return " ".join(text.split()).strip().lower() # ── Line History ───────────────────────────────────────────────── class LineHistory: """Tracks previously emitted lines with TTL-based expiry. Each emitted line is stored (normalized) with a timestamp. Old entries expire after ``ttl`` seconds, allowing re-reading. Fuzzy matching handles OCR noise on short lines. """ def __init__( self, ttl: float = DEFAULT_LINE_TTL, similarity: float = DEFAULT_LINE_SIMILARITY, ) -> None: self._entries: dict[str, float] = {} # norm_line → last_emitted_at self._ttl = ttl self._similarity = similarity def is_known(self, line: str) -> bool: """Check if a line was emitted recently (within TTL). Uses exact match first, then fuzzy for short lines. Args: line: Raw (non-normalized) line text. Returns: True if line is in recent history (should be skipped). """ norm = _normalize(line) if len(norm) < 2: return True # too short → treat as known (skip garbage) now = time.monotonic() self._gc(now) # Fast path: exact match if norm in self._entries: return True # Slow path: fuzzy match (short lines where OCR noise matters) if len(norm) < 60: for key in self._entries: if abs(len(norm) - len(key)) > max(5, len(key) * 0.25): continue ratio = SequenceMatcher(None, norm, key).ratio() if ratio >= self._similarity: return True return False def mark_emitted(self, line: str) -> None: """Record a line as emitted.""" norm = _normalize(line) if norm: self._entries[norm] = time.monotonic() def reset(self) -> None: """Clear all history.""" self._entries.clear() @property def size(self) -> int: return len(self._entries) def _gc(self, now: float) -> None: """Remove entries older than TTL.""" expired = [k for k, ts in self._entries.items() if now - ts > self._ttl] for k in expired: del self._entries[k] # ── Global Text History (ring buffer for full text blocks) ─────── class GlobalTextHistory: """Ring buffer of recently emitted text blocks with TTL. Prevents the same composed text from being re-emitted within the TTL window. Uses fuzzy matching to handle OCR noise. """ def __init__( self, max_size: int = DEFAULT_HISTORY_SIZE, ttl: float = DEFAULT_HISTORY_TTL, similarity: float = DEFAULT_LINE_SIMILARITY, ) -> None: self._entries: deque[HistoryEntry] = deque(maxlen=max_size) self._ttl = ttl self._similarity = similarity def is_duplicate(self, text: str) -> tuple[bool, float]: """Check whether text duplicates something in recent history. Args: text: Composed text block. Returns: ``(is_dup, best_similarity)`` """ now = time.monotonic() norm = _normalize(text) if not norm: return (True, 1.0) best_sim = 0.0 for entry in self._entries: if now - entry.last_seen > self._ttl: continue if entry.norm_text == norm: entry.last_seen = now entry.hit_count += 1 return (True, 1.0) ratio = SequenceMatcher(None, norm, entry.norm_text).ratio() best_sim = max(best_sim, ratio) if ratio >= self._similarity: entry.last_seen = now entry.hit_count += 1 return (True, ratio) return (False, best_sim) def add(self, text: str) -> None: """Record a new text block in history.""" norm = _normalize(text) now = time.monotonic() self._entries.append( HistoryEntry( norm_text=norm, original_text=text, first_seen=now, last_seen=now, ) ) def reset(self) -> None: self._entries.clear() @property def size(self) -> int: return len(self._entries) # ── Significance Check ─────────────────────────────────────────── class ChangeDetector: """Decide whether new lines constitute a meaningful change. Rejects very short text, too few words, or mostly non-alphanumeric content. """ def __init__( self, min_chars: int = DEFAULT_MIN_NEW_CHARS, min_words: int = DEFAULT_MIN_NEW_WORDS, min_alnum_ratio: float = DEFAULT_MIN_ALNUM_RATIO, ) -> None: self._min_chars = min_chars self._min_words = min_words self._min_alnum_ratio = min_alnum_ratio def is_significant(self, new_lines: list[str]) -> bool: """Return True if the new lines represent real content, not OCR garbage.""" text = " ".join(line.strip() for line in new_lines).strip() if len(text) < self._min_chars: return False words = text.split() if len(words) < self._min_words: return False alnum = sum(1 for c in text if c.isalnum()) ratio = alnum / len(text) if text else 0 if ratio < self._min_alnum_ratio: return False return True # ── Main Facade: SmartDedup ────────────────────────────────────── class SmartDedup: """Stabilization-first OCR deduplication. Core algorithm: 1. Each tick: merge all OCR results into a single text snapshot 2. Compare snapshot with previous tick — growing? same? different? 3. When snapshot is identical for ``stabilize_ticks`` consecutive ticks → STABLE 4. Extract lines, filter against read history → emit only NEW lines 5. Significance check → reject OCR garbage 6. Add emitted lines to history, record in global ring buffer This replaces the old per-line-tracker approach which caused: - Sentence fragments (read partial text too early) - Infinite silence (partial lines marked "known" too aggressively) Example:: dedup = SmartDedup() # On each pipeline tick: text = dedup.process(region_labels, ocr_results) if text is not None: await translate_and_speak(text) # On pipeline stop or config change: dedup.reset() """ def __init__(self, config: DedupConfig | None = None) -> None: self._cfg = config or DedupConfig() # Stabilization state self._last_snapshot: str | None = None self._last_raw: str | None = None self._stable_count: int = 0 self._processed_snapshot: str | None = None # Why: track last emitted text to detect post-emit growth # (e.g. we emitted 2 lines, then lines 3-4 appear → continuation, not new text) self._last_emitted_norm: str | None = None # History layers self._line_history = LineHistory( ttl=self._cfg.line_ttl, similarity=self._cfg.line_similarity, ) self._global_history = GlobalTextHistory( max_size=self._cfg.history_size, ttl=self._cfg.history_ttl, similarity=self._cfg.history_similarity, ) self._change_detector = ChangeDetector( min_chars=self._cfg.min_new_chars, min_words=self._cfg.min_new_words, min_alnum_ratio=self._cfg.min_alnum_ratio, ) # ── Public API ─────────────────────────────────────────────── def process( self, region_labels: list[str], ocr_results: list[OcrResult], *, force: bool = False, ) -> str | None: """Run stabilization-based dedup on multi-region OCR results. Args: region_labels: Label/ID for each region (for diagnostics). ocr_results: OCR result per region (same order as labels). force: If True, skip all dedup and return all text immediately. Returns: Text to translate + speak, or None if suppressed by dedup. """ # ── Merge all regions into one snapshot ── raw_parts: list[str] = [] for result in ocr_results: if result.error or result.is_empty: continue text = result.text.strip() if text: raw_parts.append(text) if not raw_parts: return None full_raw = "\n".join(raw_parts) full_norm = _normalize(full_raw) if not full_norm or len(full_norm) < 2: return None # ── Force read: bypass all dedup ── if force: self._global_history.add(full_raw) self._mark_all_lines_known(full_raw) self._last_snapshot = full_norm self._last_raw = full_raw self._processed_snapshot = full_norm self._stable_count = 0 logger.info("Dedup: force read — emitting %d chars", len(full_raw)) return full_raw # ── Phase 1: Stabilization check ── if self._last_snapshot is None: # First tick — record snapshot, wait for next self._last_snapshot = full_norm self._last_raw = full_raw self._stable_count = 0 self._processed_snapshot = None # Why: in instant mode, skip waiting — proceed on the very first tick if not self._cfg.instant_mode: return None # Compare current snapshot with previous snapshot_sim = self._snapshot_similarity(self._last_snapshot, full_norm) if snapshot_sim >= self._cfg.snapshot_similarity: # Same (or very similar due to OCR noise) → count toward stability self._stable_count += 1 elif self._is_text_growing(self._last_snapshot, full_norm): # Text is expanding (typewriter effect) → reset, keep waiting self._stable_count = 0 self._last_snapshot = full_norm self._last_raw = full_raw self._processed_snapshot = None logger.debug("Dedup: text growing, waiting for stabilization") return None elif ( self._last_emitted_norm is not None and self._is_text_growing(self._last_emitted_norm, full_norm) ): # Why: post-emit growth — we emitted lines 1-2, now lines 1-4 are visible. # The new snapshot is a SUPERSET of what we emitted → continuation. # Reset stability and wait for the full text to settle. self._stable_count = 0 self._last_snapshot = full_norm self._last_raw = full_raw self._processed_snapshot = None logger.debug("Dedup: post-emit growth detected, waiting for continuation") return None else: # Completely different content → new text, start fresh self._stable_count = 0 self._last_snapshot = full_norm self._last_raw = full_raw self._processed_snapshot = None logger.debug("Dedup: snapshot changed, waiting for stabilization") return None # Update raw text (keep latest version even during stability counting) self._last_snapshot = full_norm self._last_raw = full_raw # Not stable yet? required_ticks = 1 if self._cfg.instant_mode else self._cfg.stabilize_ticks if self._stable_count < required_ticks: return None # ── Already processed this exact snapshot? ── if self._processed_snapshot is not None: sim = self._snapshot_similarity(full_norm, self._processed_snapshot) if sim >= self._cfg.snapshot_similarity: return None # already evaluated, nothing new # ── Phase 2: Text is STABLE — extract new lines ── all_lines = self._extract_lines(full_raw, ocr_results) new_lines: list[str] = [] for line in all_lines: if not self._line_history.is_known(line): new_lines.append(line) # Also check against global text history (full text block dedup) if new_lines: composed = "\n".join(new_lines) is_dup, sim = self._global_history.is_duplicate(composed) if is_dup: logger.debug("Dedup: global history match (sim=%.3f)", sim) new_lines = [] if not new_lines: # All lines already known — mark snapshot as processed self._processed_snapshot = full_norm return None # ── Phase 3: Significance check ── if not self._change_detector.is_significant(new_lines): logger.debug( "Dedup: new lines not significant (%d lines, %d chars)", len(new_lines), sum(len(line) for line in new_lines), ) self._processed_snapshot = full_norm return None # ── EMIT! ── composed = "\n".join(new_lines) self._mark_all_lines_known(composed) self._global_history.add(composed) self._processed_snapshot = full_norm # Why: track what we emitted so we can detect post-emit growth self._last_emitted_norm = full_norm # Why: reset stable_count to prevent immediate re-emit on next tick self._stable_count = 0 logger.info( "Dedup: emitting %d new lines (%d chars, %d known lines in history)", len(new_lines), len(composed), self._line_history.size, ) return composed def force_flush(self) -> str | None: """Force-emit whatever raw text is pending (for force-read button).""" if self._last_raw: raw = self._last_raw self._global_history.add(raw) self._mark_all_lines_known(raw) return raw return None def update_config(self, config: DedupConfig) -> None: """Apply new configuration. Rebuilds internal components.""" self._cfg = config self._line_history = LineHistory( ttl=config.line_ttl, similarity=config.line_similarity, ) self._global_history = GlobalTextHistory( max_size=config.history_size, ttl=config.history_ttl, similarity=config.history_similarity, ) self._change_detector = ChangeDetector( min_chars=config.min_new_chars, min_words=config.min_new_words, min_alnum_ratio=config.min_alnum_ratio, ) logger.info("SmartDedup: config updated") def reset(self) -> None: """Clear all state (e.g. on scene change or pipeline restart).""" self._last_snapshot = None self._last_raw = None self._stable_count = 0 self._processed_snapshot = None self._last_emitted_norm = None self._line_history.reset() self._global_history.reset() logger.info("SmartDedup: all state reset") def reset_region(self, label: str) -> None: """No-op in snapshot-based approach — kept for backward compat.""" pass @property def stats(self) -> dict[str, int]: """Return diagnostic stats.""" return { "tracked_regions": 0, "total_known_lines": self._line_history.size, "history_size": self._global_history.size, "stable_count": self._stable_count, } # ── Internal ───────────────────────────────────────────────── @staticmethod def _snapshot_similarity(a: str, b: str) -> float: """Fast similarity between two normalized snapshots.""" if a == b: return 1.0 if not a or not b: return 0.0 return SequenceMatcher(None, a, b).ratio() @staticmethod def _is_text_growing(old_norm: str, new_norm: str) -> bool: """Check if new text is an expansion of old text (typewriter effect). Returns True if new_norm is longer AND contains most of old_norm's words at the beginning (prefix-like growth). """ if len(new_norm) <= len(old_norm): return False # Simple prefix check — covers most typewriter cases if new_norm.startswith(old_norm): return True # Word-level check: old words appear at the start of new word sequence old_words = old_norm.split() new_words = new_norm.split() if len(new_words) <= len(old_words): return False # Count matching words at the beginning matching = 0 for old_w, new_w in zip(old_words, new_words): if old_w == new_w: matching += 1 elif SequenceMatcher(None, old_w, new_w).ratio() > 0.8: # Why: OCR noise may corrupt already-visible words slightly matching += 1 # Why: 60% threshold — allows some OCR noise in the matching portion return matching >= len(old_words) * 0.6 def _extract_lines( self, raw_text: str, ocr_results: list[OcrResult] ) -> list[str]: """Extract individual lines from OCR results. Prefers structured ``OcrResult.lines`` when available. Deduplicates across regions (overlapping capture areas). Args: raw_text: Fallback raw text (used if no structured lines). ocr_results: OCR results with structured lines. Returns: List of unique raw line texts. """ lines: list[str] = [] seen_norms: set[str] = set() for result in ocr_results: if result.error or result.is_empty: continue for ocr_line in result.lines: raw = ocr_line.text.strip() if not raw: continue norm = _normalize(raw) if len(norm) < 2: continue # Why: skip duplicate lines across regions (overlapping capture areas) if norm in seen_norms: continue # Fuzzy cross-region dedup for short lines # Why: high threshold (0.95) because overlapping regions produce # near-identical text, not merely similar text is_cross_dup = False if len(norm) < 60: for seen in seen_norms: if abs(len(norm) - len(seen)) > 3: continue if SequenceMatcher(None, norm, seen).ratio() >= 0.95: is_cross_dup = True break if is_cross_dup: continue seen_norms.add(norm) lines.append(raw) # Fallback: if no structured lines, split raw text if not lines: for line in raw_text.split("\n"): stripped = line.strip() if stripped and len(_normalize(stripped)) >= 2: norm = _normalize(stripped) if norm not in seen_norms: seen_norms.add(norm) lines.append(stripped) return lines def _mark_all_lines_known(self, text: str) -> None: """Add all lines in text to line history.""" for line in text.split("\n"): stripped = line.strip() if stripped and len(_normalize(stripped)) >= 2: self._line_history.mark_emitted(stripped)