oneocr / _archive /dedup.py
OneOCR Dev
OneOCR - reverse engineering complete, ONNX pipeline 53% match rate
ce847d4
"""Smart OCR deduplication — stabilization-first approach.
Core principle: **don't read text until it STOPS CHANGING**.
Then check against read history to avoid repeats.
Architecture:
Phase 1 — **Snapshot Stabilization**
Each tick compares the full OCR output (all regions merged) with the
previous tick. If text is growing (typewriter effect), we wait.
Only when the snapshot is identical for ``stabilize_ticks`` consecutive
ticks do we consider it "stable" and proceed.
Phase 2 — **Line History Dedup**
Once stable, each line is fuzzy-compared against a history of previously
emitted lines. Only genuinely new lines pass through. History entries
expire via TTL so the same text can be re-read after a cooldown.
Phase 3 — **Significance Check**
Rejects composed output that is too short, has too few real words,
or is mostly non-alphanumeric (OCR garbage / UI artifacts).
This naturally handles:
- **Typewriter effects**: text grows → wait → stabilize → read complete sentence
- **Static UI** (HP bars, names): stabilizes → read once → in history → skip
- **OCR noise**: fuzzy matching tolerates minor variations
- **Dialog changes**: snapshot changes → re-stabilize → emit new parts only
- **Repeated dialog**: TTL expiry allows re-reading after cooldown
Usage::
from src.services.ocr.dedup import SmartDedup
dedup = SmartDedup()
text = dedup.process(region_labels, ocr_results)
if text is not None:
translate_and_speak(text)
"""
from __future__ import annotations
import time
from collections import deque
from dataclasses import dataclass
from difflib import SequenceMatcher
from src.services.ocr.models import OcrResult
from src.utils.logger import logger
# ── Constants (sensible defaults) ────────────────────────────────
DEFAULT_STABILIZE_TICKS: int = 3
DEFAULT_SNAPSHOT_SIMILARITY: float = 0.92
DEFAULT_LINE_SIMILARITY: float = 0.80
DEFAULT_LINE_TTL: float = 120.0
DEFAULT_HISTORY_TTL: float = 90.0
DEFAULT_HISTORY_SIZE: int = 30
DEFAULT_MIN_NEW_CHARS: int = 8
DEFAULT_MIN_NEW_WORDS: int = 2
DEFAULT_MIN_ALNUM_RATIO: float = 0.35
# ── Data classes ─────────────────────────────────────────────────
@dataclass
class HistoryEntry:
"""An entry in the global text history ring buffer."""
norm_text: str
original_text: str
first_seen: float
last_seen: float
hit_count: int = 1
@dataclass
class DedupConfig:
"""All tunable knobs for the dedup system.
Attributes:
stabilize_ticks: Consecutive identical ticks before text is considered "stable".
snapshot_similarity: Fuzzy threshold for treating two snapshots as identical (0-1).
line_similarity: Fuzzy threshold for line-level history matching (0-1).
line_ttl: Seconds before a known line in history expires.
history_ttl: Seconds before a global history entry expires.
history_size: Max entries in the global history ring buffer.
history_similarity: Alias for line_similarity (backward compat with bridge.py).
min_new_chars: Minimum characters for a change to be significant.
min_new_words: Minimum word count for significance.
min_alnum_ratio: Minimum alphanumeric ratio for significance.
debounce_time: Legacy field — not used internally, kept for bridge compat.
"""
stabilize_ticks: int = DEFAULT_STABILIZE_TICKS
snapshot_similarity: float = DEFAULT_SNAPSHOT_SIMILARITY
line_similarity: float = DEFAULT_LINE_SIMILARITY
line_ttl: float = DEFAULT_LINE_TTL
history_ttl: float = DEFAULT_HISTORY_TTL
history_size: int = DEFAULT_HISTORY_SIZE
history_similarity: float = DEFAULT_LINE_SIMILARITY
min_new_chars: int = DEFAULT_MIN_NEW_CHARS
min_new_words: int = DEFAULT_MIN_NEW_WORDS
min_alnum_ratio: float = DEFAULT_MIN_ALNUM_RATIO
debounce_time: float = 0.0 # legacy — mapped to stabilize_ticks externally
instant_mode: bool = False # skip stabilization — emit text on first identical tick
# ── Helpers ──────────────────────────────────────────────────────
def _normalize(text: str) -> str:
"""Collapse whitespace, strip, lowercase — for comparison only."""
return " ".join(text.split()).strip().lower()
# ── Line History ─────────────────────────────────────────────────
class LineHistory:
"""Tracks previously emitted lines with TTL-based expiry.
Each emitted line is stored (normalized) with a timestamp.
Old entries expire after ``ttl`` seconds, allowing re-reading.
Fuzzy matching handles OCR noise on short lines.
"""
def __init__(
self,
ttl: float = DEFAULT_LINE_TTL,
similarity: float = DEFAULT_LINE_SIMILARITY,
) -> None:
self._entries: dict[str, float] = {} # norm_line → last_emitted_at
self._ttl = ttl
self._similarity = similarity
def is_known(self, line: str) -> bool:
"""Check if a line was emitted recently (within TTL).
Uses exact match first, then fuzzy for short lines.
Args:
line: Raw (non-normalized) line text.
Returns:
True if line is in recent history (should be skipped).
"""
norm = _normalize(line)
if len(norm) < 2:
return True # too short → treat as known (skip garbage)
now = time.monotonic()
self._gc(now)
# Fast path: exact match
if norm in self._entries:
return True
# Slow path: fuzzy match (short lines where OCR noise matters)
if len(norm) < 60:
for key in self._entries:
if abs(len(norm) - len(key)) > max(5, len(key) * 0.25):
continue
ratio = SequenceMatcher(None, norm, key).ratio()
if ratio >= self._similarity:
return True
return False
def mark_emitted(self, line: str) -> None:
"""Record a line as emitted."""
norm = _normalize(line)
if norm:
self._entries[norm] = time.monotonic()
def reset(self) -> None:
"""Clear all history."""
self._entries.clear()
@property
def size(self) -> int:
return len(self._entries)
def _gc(self, now: float) -> None:
"""Remove entries older than TTL."""
expired = [k for k, ts in self._entries.items() if now - ts > self._ttl]
for k in expired:
del self._entries[k]
# ── Global Text History (ring buffer for full text blocks) ───────
class GlobalTextHistory:
"""Ring buffer of recently emitted text blocks with TTL.
Prevents the same composed text from being re-emitted within
the TTL window. Uses fuzzy matching to handle OCR noise.
"""
def __init__(
self,
max_size: int = DEFAULT_HISTORY_SIZE,
ttl: float = DEFAULT_HISTORY_TTL,
similarity: float = DEFAULT_LINE_SIMILARITY,
) -> None:
self._entries: deque[HistoryEntry] = deque(maxlen=max_size)
self._ttl = ttl
self._similarity = similarity
def is_duplicate(self, text: str) -> tuple[bool, float]:
"""Check whether text duplicates something in recent history.
Args:
text: Composed text block.
Returns:
``(is_dup, best_similarity)``
"""
now = time.monotonic()
norm = _normalize(text)
if not norm:
return (True, 1.0)
best_sim = 0.0
for entry in self._entries:
if now - entry.last_seen > self._ttl:
continue
if entry.norm_text == norm:
entry.last_seen = now
entry.hit_count += 1
return (True, 1.0)
ratio = SequenceMatcher(None, norm, entry.norm_text).ratio()
best_sim = max(best_sim, ratio)
if ratio >= self._similarity:
entry.last_seen = now
entry.hit_count += 1
return (True, ratio)
return (False, best_sim)
def add(self, text: str) -> None:
"""Record a new text block in history."""
norm = _normalize(text)
now = time.monotonic()
self._entries.append(
HistoryEntry(
norm_text=norm,
original_text=text,
first_seen=now,
last_seen=now,
)
)
def reset(self) -> None:
self._entries.clear()
@property
def size(self) -> int:
return len(self._entries)
# ── Significance Check ───────────────────────────────────────────
class ChangeDetector:
"""Decide whether new lines constitute a meaningful change.
Rejects very short text, too few words, or mostly non-alphanumeric content.
"""
def __init__(
self,
min_chars: int = DEFAULT_MIN_NEW_CHARS,
min_words: int = DEFAULT_MIN_NEW_WORDS,
min_alnum_ratio: float = DEFAULT_MIN_ALNUM_RATIO,
) -> None:
self._min_chars = min_chars
self._min_words = min_words
self._min_alnum_ratio = min_alnum_ratio
def is_significant(self, new_lines: list[str]) -> bool:
"""Return True if the new lines represent real content, not OCR garbage."""
text = " ".join(line.strip() for line in new_lines).strip()
if len(text) < self._min_chars:
return False
words = text.split()
if len(words) < self._min_words:
return False
alnum = sum(1 for c in text if c.isalnum())
ratio = alnum / len(text) if text else 0
if ratio < self._min_alnum_ratio:
return False
return True
# ── Main Facade: SmartDedup ──────────────────────────────────────
class SmartDedup:
"""Stabilization-first OCR deduplication.
Core algorithm:
1. Each tick: merge all OCR results into a single text snapshot
2. Compare snapshot with previous tick — growing? same? different?
3. When snapshot is identical for ``stabilize_ticks`` consecutive ticks → STABLE
4. Extract lines, filter against read history → emit only NEW lines
5. Significance check → reject OCR garbage
6. Add emitted lines to history, record in global ring buffer
This replaces the old per-line-tracker approach which caused:
- Sentence fragments (read partial text too early)
- Infinite silence (partial lines marked "known" too aggressively)
Example::
dedup = SmartDedup()
# On each pipeline tick:
text = dedup.process(region_labels, ocr_results)
if text is not None:
await translate_and_speak(text)
# On pipeline stop or config change:
dedup.reset()
"""
def __init__(self, config: DedupConfig | None = None) -> None:
self._cfg = config or DedupConfig()
# Stabilization state
self._last_snapshot: str | None = None
self._last_raw: str | None = None
self._stable_count: int = 0
self._processed_snapshot: str | None = None
# Why: track last emitted text to detect post-emit growth
# (e.g. we emitted 2 lines, then lines 3-4 appear → continuation, not new text)
self._last_emitted_norm: str | None = None
# History layers
self._line_history = LineHistory(
ttl=self._cfg.line_ttl,
similarity=self._cfg.line_similarity,
)
self._global_history = GlobalTextHistory(
max_size=self._cfg.history_size,
ttl=self._cfg.history_ttl,
similarity=self._cfg.history_similarity,
)
self._change_detector = ChangeDetector(
min_chars=self._cfg.min_new_chars,
min_words=self._cfg.min_new_words,
min_alnum_ratio=self._cfg.min_alnum_ratio,
)
# ── Public API ───────────────────────────────────────────────
def process(
self,
region_labels: list[str],
ocr_results: list[OcrResult],
*,
force: bool = False,
) -> str | None:
"""Run stabilization-based dedup on multi-region OCR results.
Args:
region_labels: Label/ID for each region (for diagnostics).
ocr_results: OCR result per region (same order as labels).
force: If True, skip all dedup and return all text immediately.
Returns:
Text to translate + speak, or None if suppressed by dedup.
"""
# ── Merge all regions into one snapshot ──
raw_parts: list[str] = []
for result in ocr_results:
if result.error or result.is_empty:
continue
text = result.text.strip()
if text:
raw_parts.append(text)
if not raw_parts:
return None
full_raw = "\n".join(raw_parts)
full_norm = _normalize(full_raw)
if not full_norm or len(full_norm) < 2:
return None
# ── Force read: bypass all dedup ──
if force:
self._global_history.add(full_raw)
self._mark_all_lines_known(full_raw)
self._last_snapshot = full_norm
self._last_raw = full_raw
self._processed_snapshot = full_norm
self._stable_count = 0
logger.info("Dedup: force read — emitting %d chars", len(full_raw))
return full_raw
# ── Phase 1: Stabilization check ──
if self._last_snapshot is None:
# First tick — record snapshot, wait for next
self._last_snapshot = full_norm
self._last_raw = full_raw
self._stable_count = 0
self._processed_snapshot = None
# Why: in instant mode, skip waiting — proceed on the very first tick
if not self._cfg.instant_mode:
return None
# Compare current snapshot with previous
snapshot_sim = self._snapshot_similarity(self._last_snapshot, full_norm)
if snapshot_sim >= self._cfg.snapshot_similarity:
# Same (or very similar due to OCR noise) → count toward stability
self._stable_count += 1
elif self._is_text_growing(self._last_snapshot, full_norm):
# Text is expanding (typewriter effect) → reset, keep waiting
self._stable_count = 0
self._last_snapshot = full_norm
self._last_raw = full_raw
self._processed_snapshot = None
logger.debug("Dedup: text growing, waiting for stabilization")
return None
elif (
self._last_emitted_norm is not None
and self._is_text_growing(self._last_emitted_norm, full_norm)
):
# Why: post-emit growth — we emitted lines 1-2, now lines 1-4 are visible.
# The new snapshot is a SUPERSET of what we emitted → continuation.
# Reset stability and wait for the full text to settle.
self._stable_count = 0
self._last_snapshot = full_norm
self._last_raw = full_raw
self._processed_snapshot = None
logger.debug("Dedup: post-emit growth detected, waiting for continuation")
return None
else:
# Completely different content → new text, start fresh
self._stable_count = 0
self._last_snapshot = full_norm
self._last_raw = full_raw
self._processed_snapshot = None
logger.debug("Dedup: snapshot changed, waiting for stabilization")
return None
# Update raw text (keep latest version even during stability counting)
self._last_snapshot = full_norm
self._last_raw = full_raw
# Not stable yet?
required_ticks = 1 if self._cfg.instant_mode else self._cfg.stabilize_ticks
if self._stable_count < required_ticks:
return None
# ── Already processed this exact snapshot? ──
if self._processed_snapshot is not None:
sim = self._snapshot_similarity(full_norm, self._processed_snapshot)
if sim >= self._cfg.snapshot_similarity:
return None # already evaluated, nothing new
# ── Phase 2: Text is STABLE — extract new lines ──
all_lines = self._extract_lines(full_raw, ocr_results)
new_lines: list[str] = []
for line in all_lines:
if not self._line_history.is_known(line):
new_lines.append(line)
# Also check against global text history (full text block dedup)
if new_lines:
composed = "\n".join(new_lines)
is_dup, sim = self._global_history.is_duplicate(composed)
if is_dup:
logger.debug("Dedup: global history match (sim=%.3f)", sim)
new_lines = []
if not new_lines:
# All lines already known — mark snapshot as processed
self._processed_snapshot = full_norm
return None
# ── Phase 3: Significance check ──
if not self._change_detector.is_significant(new_lines):
logger.debug(
"Dedup: new lines not significant (%d lines, %d chars)",
len(new_lines),
sum(len(line) for line in new_lines),
)
self._processed_snapshot = full_norm
return None
# ── EMIT! ──
composed = "\n".join(new_lines)
self._mark_all_lines_known(composed)
self._global_history.add(composed)
self._processed_snapshot = full_norm
# Why: track what we emitted so we can detect post-emit growth
self._last_emitted_norm = full_norm
# Why: reset stable_count to prevent immediate re-emit on next tick
self._stable_count = 0
logger.info(
"Dedup: emitting %d new lines (%d chars, %d known lines in history)",
len(new_lines),
len(composed),
self._line_history.size,
)
return composed
def force_flush(self) -> str | None:
"""Force-emit whatever raw text is pending (for force-read button)."""
if self._last_raw:
raw = self._last_raw
self._global_history.add(raw)
self._mark_all_lines_known(raw)
return raw
return None
def update_config(self, config: DedupConfig) -> None:
"""Apply new configuration. Rebuilds internal components."""
self._cfg = config
self._line_history = LineHistory(
ttl=config.line_ttl,
similarity=config.line_similarity,
)
self._global_history = GlobalTextHistory(
max_size=config.history_size,
ttl=config.history_ttl,
similarity=config.history_similarity,
)
self._change_detector = ChangeDetector(
min_chars=config.min_new_chars,
min_words=config.min_new_words,
min_alnum_ratio=config.min_alnum_ratio,
)
logger.info("SmartDedup: config updated")
def reset(self) -> None:
"""Clear all state (e.g. on scene change or pipeline restart)."""
self._last_snapshot = None
self._last_raw = None
self._stable_count = 0
self._processed_snapshot = None
self._last_emitted_norm = None
self._line_history.reset()
self._global_history.reset()
logger.info("SmartDedup: all state reset")
def reset_region(self, label: str) -> None:
"""No-op in snapshot-based approach — kept for backward compat."""
pass
@property
def stats(self) -> dict[str, int]:
"""Return diagnostic stats."""
return {
"tracked_regions": 0,
"total_known_lines": self._line_history.size,
"history_size": self._global_history.size,
"stable_count": self._stable_count,
}
# ── Internal ─────────────────────────────────────────────────
@staticmethod
def _snapshot_similarity(a: str, b: str) -> float:
"""Fast similarity between two normalized snapshots."""
if a == b:
return 1.0
if not a or not b:
return 0.0
return SequenceMatcher(None, a, b).ratio()
@staticmethod
def _is_text_growing(old_norm: str, new_norm: str) -> bool:
"""Check if new text is an expansion of old text (typewriter effect).
Returns True if new_norm is longer AND contains most of old_norm's
words at the beginning (prefix-like growth).
"""
if len(new_norm) <= len(old_norm):
return False
# Simple prefix check — covers most typewriter cases
if new_norm.startswith(old_norm):
return True
# Word-level check: old words appear at the start of new word sequence
old_words = old_norm.split()
new_words = new_norm.split()
if len(new_words) <= len(old_words):
return False
# Count matching words at the beginning
matching = 0
for old_w, new_w in zip(old_words, new_words):
if old_w == new_w:
matching += 1
elif SequenceMatcher(None, old_w, new_w).ratio() > 0.8:
# Why: OCR noise may corrupt already-visible words slightly
matching += 1
# Why: 60% threshold — allows some OCR noise in the matching portion
return matching >= len(old_words) * 0.6
def _extract_lines(
self, raw_text: str, ocr_results: list[OcrResult]
) -> list[str]:
"""Extract individual lines from OCR results.
Prefers structured ``OcrResult.lines`` when available.
Deduplicates across regions (overlapping capture areas).
Args:
raw_text: Fallback raw text (used if no structured lines).
ocr_results: OCR results with structured lines.
Returns:
List of unique raw line texts.
"""
lines: list[str] = []
seen_norms: set[str] = set()
for result in ocr_results:
if result.error or result.is_empty:
continue
for ocr_line in result.lines:
raw = ocr_line.text.strip()
if not raw:
continue
norm = _normalize(raw)
if len(norm) < 2:
continue
# Why: skip duplicate lines across regions (overlapping capture areas)
if norm in seen_norms:
continue
# Fuzzy cross-region dedup for short lines
# Why: high threshold (0.95) because overlapping regions produce
# near-identical text, not merely similar text
is_cross_dup = False
if len(norm) < 60:
for seen in seen_norms:
if abs(len(norm) - len(seen)) > 3:
continue
if SequenceMatcher(None, norm, seen).ratio() >= 0.95:
is_cross_dup = True
break
if is_cross_dup:
continue
seen_norms.add(norm)
lines.append(raw)
# Fallback: if no structured lines, split raw text
if not lines:
for line in raw_text.split("\n"):
stripped = line.strip()
if stripped and len(_normalize(stripped)) >= 2:
norm = _normalize(stripped)
if norm not in seen_norms:
seen_norms.add(norm)
lines.append(stripped)
return lines
def _mark_all_lines_known(self, text: str) -> None:
"""Add all lines in text to line history."""
for line in text.split("\n"):
stripped = line.strip()
if stripped and len(_normalize(stripped)) >= 2:
self._line_history.mark_emitted(stripped)