|
|
"""Smart OCR deduplication — multi-layer heuristic to avoid re-reading the same text. |
|
|
|
|
|
Architecture (3 layers): |
|
|
|
|
|
Layer 1 — **Per-Region Line Tracker** |
|
|
Each capture region keeps a dict of known OCR lines (normalized text → metadata). |
|
|
New OCR results are compared line-by-line; only genuinely new lines pass through. |
|
|
Stale entries expire after ``line_ttl`` seconds. |
|
|
|
|
|
Layer 2 — **Global Text History** (ring buffer) |
|
|
After composing new lines into a text block, the block is fuzzy-matched against |
|
|
a bounded history of recently emitted texts. TTL-based expiry allows the same |
|
|
dialog to be read again after a configurable cooldown. |
|
|
|
|
|
Layer 3 — **Semantic Change Detector** |
|
|
Rejects composed text that is too short, has too few real words, or is mostly |
|
|
non-alphanumeric (OCR garbage / UI artifacts). |
|
|
|
|
|
Debounce (optional) |
|
|
When text grows incrementally (typewriter effect), the emitter waits for |
|
|
stabilization before yielding the final text. |
|
|
|
|
|
Usage:: |
|
|
|
|
|
from src.services.ocr.dedup import SmartDedup |
|
|
|
|
|
dedup = SmartDedup() |
|
|
text = dedup.process(regions, ocr_results) |
|
|
if text is not None: |
|
|
translate_and_speak(text) |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import time |
|
|
from collections import deque |
|
|
from dataclasses import dataclass |
|
|
from difflib import SequenceMatcher |
|
|
|
|
|
from src.services.ocr.models import OcrResult |
|
|
from src.utils.logger import logger |
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_LINE_TTL: float = 120.0 |
|
|
DEFAULT_LINE_SIMILARITY: float = 0.80 |
|
|
DEFAULT_HISTORY_SIZE: int = 30 |
|
|
DEFAULT_HISTORY_TTL: float = 90.0 |
|
|
DEFAULT_HISTORY_SIMILARITY: float = 0.82 |
|
|
DEFAULT_MIN_NEW_CHARS: int = 8 |
|
|
DEFAULT_MIN_NEW_WORDS: int = 2 |
|
|
DEFAULT_MIN_ALNUM_RATIO: float = 0.35 |
|
|
DEFAULT_DEBOUNCE_TIME: float = 0.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class KnownLine: |
|
|
"""A line previously seen by a RegionLineTracker.""" |
|
|
|
|
|
text: str |
|
|
first_seen: float |
|
|
last_seen: float |
|
|
hit_count: int = 1 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class HistoryEntry: |
|
|
"""An entry in the global text history ring buffer.""" |
|
|
|
|
|
norm_text: str |
|
|
original_text: str |
|
|
first_seen: float |
|
|
last_seen: float |
|
|
hit_count: int = 1 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DedupConfig: |
|
|
"""All tunable knobs for the dedup system. |
|
|
|
|
|
Attributes: |
|
|
line_ttl: Seconds before a known line expires (Layer 1). |
|
|
line_similarity: Fuzzy threshold for line-level dedup (0-1). |
|
|
history_size: Max entries in global ring buffer (Layer 2). |
|
|
history_ttl: Seconds before a global history entry expires. |
|
|
history_similarity: Fuzzy threshold for global dedup (0-1). |
|
|
min_new_chars: Minimum characters for a change to be significant (Layer 3). |
|
|
min_new_words: Minimum word count for significance. |
|
|
min_alnum_ratio: Minimum alphanumeric ratio for significance. |
|
|
debounce_time: Seconds to wait for text stabilization (0 = off). |
|
|
""" |
|
|
|
|
|
line_ttl: float = DEFAULT_LINE_TTL |
|
|
line_similarity: float = DEFAULT_LINE_SIMILARITY |
|
|
history_size: int = DEFAULT_HISTORY_SIZE |
|
|
history_ttl: float = DEFAULT_HISTORY_TTL |
|
|
history_similarity: float = DEFAULT_HISTORY_SIMILARITY |
|
|
min_new_chars: int = DEFAULT_MIN_NEW_CHARS |
|
|
min_new_words: int = DEFAULT_MIN_NEW_WORDS |
|
|
min_alnum_ratio: float = DEFAULT_MIN_ALNUM_RATIO |
|
|
debounce_time: float = DEFAULT_DEBOUNCE_TIME |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _normalize(text: str) -> str: |
|
|
"""Collapse whitespace, strip, lowercase — for comparison only.""" |
|
|
return " ".join(text.split()).strip().lower() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RegionLineTracker: |
|
|
"""Track known lines for a single capture region. |
|
|
|
|
|
Lines already seen (exact or fuzzy match) are filtered out. |
|
|
Entries expire after ``line_ttl`` seconds so the same text |
|
|
can be re-read after a cooldown. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
similarity: float = DEFAULT_LINE_SIMILARITY, |
|
|
line_ttl: float = DEFAULT_LINE_TTL, |
|
|
) -> None: |
|
|
self._known: dict[str, KnownLine] = {} |
|
|
self._similarity = similarity |
|
|
self._line_ttl = line_ttl |
|
|
|
|
|
def extract_new_lines(self, ocr_result: OcrResult) -> list[str]: |
|
|
"""Return only lines that are NOT already known. |
|
|
|
|
|
Args: |
|
|
ocr_result: OCR result with ``.lines`` populated. |
|
|
|
|
|
Returns: |
|
|
List of *original* (non-normalized) line texts that are new. |
|
|
""" |
|
|
now = time.monotonic() |
|
|
self._gc(now) |
|
|
|
|
|
new_lines: list[str] = [] |
|
|
for line in ocr_result.lines: |
|
|
raw = line.text.strip() |
|
|
if not raw: |
|
|
continue |
|
|
norm = _normalize(raw) |
|
|
if len(norm) < 2: |
|
|
continue |
|
|
|
|
|
|
|
|
if norm in self._known: |
|
|
self._known[norm].last_seen = now |
|
|
self._known[norm].hit_count += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
matched = False |
|
|
if len(norm) < 60: |
|
|
for key, entry in self._known.items(): |
|
|
|
|
|
if abs(len(norm) - len(key)) > max(5, len(key) * 0.2): |
|
|
continue |
|
|
ratio = SequenceMatcher(None, norm, key).ratio() |
|
|
if ratio >= self._similarity: |
|
|
entry.last_seen = now |
|
|
entry.hit_count += 1 |
|
|
matched = True |
|
|
break |
|
|
|
|
|
if not matched: |
|
|
self._known[norm] = KnownLine( |
|
|
text=norm, first_seen=now, last_seen=now |
|
|
) |
|
|
new_lines.append(raw) |
|
|
|
|
|
return new_lines |
|
|
|
|
|
def reset(self) -> None: |
|
|
"""Clear all known lines (e.g. on scene change).""" |
|
|
self._known.clear() |
|
|
|
|
|
@property |
|
|
def known_count(self) -> int: |
|
|
"""Number of tracked lines.""" |
|
|
return len(self._known) |
|
|
|
|
|
def _gc(self, now: float) -> None: |
|
|
"""Remove lines not seen for longer than TTL.""" |
|
|
expired = [ |
|
|
k for k, v in self._known.items() if now - v.last_seen > self._line_ttl |
|
|
] |
|
|
for k in expired: |
|
|
del self._known[k] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GlobalTextHistory: |
|
|
"""Ring buffer of recently emitted text blocks with TTL. |
|
|
|
|
|
Prevents the same composed text from being processed twice |
|
|
within the TTL window, even if it comes from different regions |
|
|
or after a brief interruption. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
max_size: int = DEFAULT_HISTORY_SIZE, |
|
|
ttl: float = DEFAULT_HISTORY_TTL, |
|
|
similarity: float = DEFAULT_HISTORY_SIMILARITY, |
|
|
) -> None: |
|
|
self._entries: deque[HistoryEntry] = deque(maxlen=max_size) |
|
|
self._ttl = ttl |
|
|
self._similarity = similarity |
|
|
|
|
|
def is_duplicate(self, text: str) -> tuple[bool, float]: |
|
|
"""Check whether *text* duplicates something in recent history. |
|
|
|
|
|
Args: |
|
|
text: Composed text block (already new-line joined). |
|
|
|
|
|
Returns: |
|
|
``(is_dup, best_similarity)`` — whether it matched and how closely. |
|
|
""" |
|
|
now = time.monotonic() |
|
|
norm = _normalize(text) |
|
|
if not norm: |
|
|
return (True, 1.0) |
|
|
|
|
|
best_sim = 0.0 |
|
|
for entry in self._entries: |
|
|
if now - entry.last_seen > self._ttl: |
|
|
continue |
|
|
|
|
|
|
|
|
if entry.norm_text == norm: |
|
|
entry.last_seen = now |
|
|
entry.hit_count += 1 |
|
|
return (True, 1.0) |
|
|
|
|
|
|
|
|
ratio = SequenceMatcher(None, norm, entry.norm_text).ratio() |
|
|
best_sim = max(best_sim, ratio) |
|
|
if ratio >= self._similarity: |
|
|
entry.last_seen = now |
|
|
entry.hit_count += 1 |
|
|
return (True, ratio) |
|
|
|
|
|
return (False, best_sim) |
|
|
|
|
|
def add(self, text: str) -> None: |
|
|
"""Record a new text block in history.""" |
|
|
norm = _normalize(text) |
|
|
now = time.monotonic() |
|
|
self._entries.append( |
|
|
HistoryEntry( |
|
|
norm_text=norm, |
|
|
original_text=text, |
|
|
first_seen=now, |
|
|
last_seen=now, |
|
|
) |
|
|
) |
|
|
|
|
|
def reset(self) -> None: |
|
|
"""Clear all history entries.""" |
|
|
self._entries.clear() |
|
|
|
|
|
@property |
|
|
def size(self) -> int: |
|
|
return len(self._entries) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ChangeDetector: |
|
|
"""Decide whether a set of new lines constitutes a meaningful change. |
|
|
|
|
|
Rejects: |
|
|
- Very short text (< ``min_chars`` printable characters) |
|
|
- Too few words (< ``min_words``) |
|
|
- Mostly non-alphanumeric (ratio < ``min_alnum_ratio``) |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
min_chars: int = DEFAULT_MIN_NEW_CHARS, |
|
|
min_words: int = DEFAULT_MIN_NEW_WORDS, |
|
|
min_alnum_ratio: float = DEFAULT_MIN_ALNUM_RATIO, |
|
|
) -> None: |
|
|
self._min_chars = min_chars |
|
|
self._min_words = min_words |
|
|
self._min_alnum_ratio = min_alnum_ratio |
|
|
|
|
|
def is_significant(self, new_lines: list[str]) -> bool: |
|
|
"""Return ``True`` if the new lines represent a real content change.""" |
|
|
text = " ".join(line.strip() for line in new_lines).strip() |
|
|
|
|
|
if len(text) < self._min_chars: |
|
|
return False |
|
|
|
|
|
words = text.split() |
|
|
if len(words) < self._min_words: |
|
|
return False |
|
|
|
|
|
alnum = sum(1 for c in text if c.isalnum()) |
|
|
ratio = alnum / len(text) if text else 0 |
|
|
if ratio < self._min_alnum_ratio: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DebouncedEmitter: |
|
|
"""Buffer text and only yield it after stabilization. |
|
|
|
|
|
Useful for typewriter-effect dialogs where text appears incrementally. |
|
|
If ``stabilize_time`` is 0, debouncing is disabled (pass-through). |
|
|
""" |
|
|
|
|
|
def __init__(self, stabilize_time: float = DEFAULT_DEBOUNCE_TIME) -> None: |
|
|
self._stabilize = stabilize_time |
|
|
self._pending: str | None = None |
|
|
self._pending_since: float = 0.0 |
|
|
|
|
|
def feed(self, text: str) -> str | None: |
|
|
"""Feed new text. Returns the text once it has been stable long enough. |
|
|
|
|
|
Args: |
|
|
text: The candidate text to emit. |
|
|
|
|
|
Returns: |
|
|
The stabilized text, or ``None`` if still waiting. |
|
|
""" |
|
|
if self._stabilize <= 0: |
|
|
return text |
|
|
|
|
|
now = time.monotonic() |
|
|
|
|
|
if self._pending is None or _normalize(text) != _normalize(self._pending): |
|
|
|
|
|
self._pending = text |
|
|
self._pending_since = now |
|
|
return None |
|
|
|
|
|
|
|
|
if now - self._pending_since >= self._stabilize: |
|
|
result = self._pending |
|
|
self._pending = None |
|
|
return result |
|
|
|
|
|
return None |
|
|
|
|
|
def flush(self) -> str | None: |
|
|
"""Force-emit whatever is pending (used on pipeline stop / force-read).""" |
|
|
result = self._pending |
|
|
self._pending = None |
|
|
return result |
|
|
|
|
|
def reset(self) -> None: |
|
|
"""Discard pending text.""" |
|
|
self._pending = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CrossRegionPool: |
|
|
"""Tracks lines across regions within a single tick to prevent cross-region duplication. |
|
|
|
|
|
Within a single pipeline tick, if region A already yielded line X, |
|
|
region B should skip it. |
|
|
""" |
|
|
|
|
|
def __init__(self, similarity: float = DEFAULT_LINE_SIMILARITY) -> None: |
|
|
self._seen: dict[str, str] = {} |
|
|
self._similarity = similarity |
|
|
|
|
|
def is_seen(self, line: str) -> bool: |
|
|
"""Check if this line was already yielded by another region this tick.""" |
|
|
norm = _normalize(line) |
|
|
if not norm: |
|
|
return True |
|
|
|
|
|
|
|
|
if norm in self._seen: |
|
|
return True |
|
|
|
|
|
|
|
|
if len(norm) < 60: |
|
|
for key in self._seen: |
|
|
if abs(len(norm) - len(key)) > max(4, len(key) * 0.2): |
|
|
continue |
|
|
if SequenceMatcher(None, norm, key).ratio() >= self._similarity: |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def mark(self, line: str) -> None: |
|
|
"""Record a line as yielded this tick.""" |
|
|
norm = _normalize(line) |
|
|
if norm: |
|
|
self._seen[norm] = line |
|
|
|
|
|
def clear(self) -> None: |
|
|
"""Reset for next tick.""" |
|
|
self._seen.clear() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SmartDedup: |
|
|
"""Three-layer OCR deduplication with debounce and cross-region awareness. |
|
|
|
|
|
Replaces the old single-``_last_ocr_text`` comparison in ``bridge.py``. |
|
|
|
|
|
Example:: |
|
|
|
|
|
dedup = SmartDedup() |
|
|
|
|
|
# On each pipeline tick: |
|
|
text = dedup.process(region_labels, ocr_results) |
|
|
if text is not None: |
|
|
await translate_and_speak(text) |
|
|
|
|
|
# On pipeline stop or config change: |
|
|
dedup.reset() |
|
|
""" |
|
|
|
|
|
def __init__(self, config: DedupConfig | None = None) -> None: |
|
|
self._cfg = config or DedupConfig() |
|
|
self._region_trackers: dict[str, RegionLineTracker] = {} |
|
|
self._global_history = GlobalTextHistory( |
|
|
max_size=self._cfg.history_size, |
|
|
ttl=self._cfg.history_ttl, |
|
|
similarity=self._cfg.history_similarity, |
|
|
) |
|
|
self._change_detector = ChangeDetector( |
|
|
min_chars=self._cfg.min_new_chars, |
|
|
min_words=self._cfg.min_new_words, |
|
|
min_alnum_ratio=self._cfg.min_alnum_ratio, |
|
|
) |
|
|
self._debouncer = DebouncedEmitter(stabilize_time=self._cfg.debounce_time) |
|
|
self._cross_pool = CrossRegionPool(similarity=self._cfg.line_similarity) |
|
|
|
|
|
|
|
|
|
|
|
def process( |
|
|
self, |
|
|
region_labels: list[str], |
|
|
ocr_results: list[OcrResult], |
|
|
*, |
|
|
force: bool = False, |
|
|
) -> str | None: |
|
|
"""Run all dedup layers on multi-region OCR results. |
|
|
|
|
|
Args: |
|
|
region_labels: Label/ID for each region (used as tracker key). |
|
|
ocr_results: OCR result per region (same order as labels). |
|
|
force: If ``True``, skip all dedup and return all text. |
|
|
|
|
|
Returns: |
|
|
Text to translate + speak, or ``None`` if dedup suppressed it. |
|
|
""" |
|
|
if force: |
|
|
texts = [r.text.strip() for r in ocr_results if r.text.strip()] |
|
|
combined = "\n".join(texts) if texts else None |
|
|
if combined: |
|
|
self._global_history.add(combined) |
|
|
|
|
|
for label, result in zip(region_labels, ocr_results): |
|
|
tracker = self._get_tracker(label) |
|
|
tracker.extract_new_lines(result) |
|
|
flushed = self._debouncer.flush() |
|
|
return combined |
|
|
|
|
|
|
|
|
self._cross_pool.clear() |
|
|
all_new_lines: list[str] = [] |
|
|
|
|
|
for label, result in zip(region_labels, ocr_results): |
|
|
if result.error or result.is_empty: |
|
|
continue |
|
|
tracker = self._get_tracker(label) |
|
|
region_new = tracker.extract_new_lines(result) |
|
|
|
|
|
for line in region_new: |
|
|
if not self._cross_pool.is_seen(line): |
|
|
self._cross_pool.mark(line) |
|
|
all_new_lines.append(line) |
|
|
|
|
|
if not all_new_lines: |
|
|
return None |
|
|
|
|
|
|
|
|
if not self._change_detector.is_significant(all_new_lines): |
|
|
logger.debug( |
|
|
"Dedup: new lines not significant (%d lines, %d chars)", |
|
|
len(all_new_lines), |
|
|
sum(len(l) for l in all_new_lines), |
|
|
) |
|
|
return None |
|
|
|
|
|
composed = "\n".join(all_new_lines) |
|
|
|
|
|
|
|
|
is_dup, sim = self._global_history.is_duplicate(composed) |
|
|
if is_dup: |
|
|
logger.debug("Dedup: global history match (sim=%.3f)", sim) |
|
|
return None |
|
|
|
|
|
|
|
|
stabilized = self._debouncer.feed(composed) |
|
|
if stabilized is None: |
|
|
logger.debug("Dedup: waiting for text stabilization") |
|
|
return None |
|
|
|
|
|
|
|
|
self._global_history.add(stabilized) |
|
|
return stabilized |
|
|
|
|
|
def force_flush(self) -> str | None: |
|
|
"""Force-emit any debounced pending text.""" |
|
|
pending = self._debouncer.flush() |
|
|
if pending: |
|
|
self._global_history.add(pending) |
|
|
return pending |
|
|
|
|
|
def update_config(self, config: DedupConfig) -> None: |
|
|
"""Apply new configuration. Recreates internal components.""" |
|
|
self._cfg = config |
|
|
|
|
|
self._global_history = GlobalTextHistory( |
|
|
max_size=config.history_size, |
|
|
ttl=config.history_ttl, |
|
|
similarity=config.history_similarity, |
|
|
) |
|
|
self._change_detector = ChangeDetector( |
|
|
min_chars=config.min_new_chars, |
|
|
min_words=config.min_new_words, |
|
|
min_alnum_ratio=config.min_alnum_ratio, |
|
|
) |
|
|
self._debouncer = DebouncedEmitter(stabilize_time=config.debounce_time) |
|
|
self._cross_pool = CrossRegionPool(similarity=config.line_similarity) |
|
|
|
|
|
for tracker in self._region_trackers.values(): |
|
|
tracker._similarity = config.line_similarity |
|
|
tracker._line_ttl = config.line_ttl |
|
|
|
|
|
def reset(self) -> None: |
|
|
"""Clear all state (e.g. on scene change or pipeline restart).""" |
|
|
for tracker in self._region_trackers.values(): |
|
|
tracker.reset() |
|
|
self._global_history.reset() |
|
|
self._debouncer.reset() |
|
|
self._cross_pool.clear() |
|
|
logger.info("SmartDedup: all layers reset") |
|
|
|
|
|
def reset_region(self, label: str) -> None: |
|
|
"""Reset a specific region tracker.""" |
|
|
if label in self._region_trackers: |
|
|
self._region_trackers[label].reset() |
|
|
|
|
|
@property |
|
|
def stats(self) -> dict[str, int]: |
|
|
"""Return diagnostic stats.""" |
|
|
return { |
|
|
"tracked_regions": len(self._region_trackers), |
|
|
"total_known_lines": sum( |
|
|
t.known_count for t in self._region_trackers.values() |
|
|
), |
|
|
"history_size": self._global_history.size, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def _get_tracker(self, label: str) -> RegionLineTracker: |
|
|
"""Get or create a line tracker for the given region label.""" |
|
|
if label not in self._region_trackers: |
|
|
self._region_trackers[label] = RegionLineTracker( |
|
|
similarity=self._cfg.line_similarity, |
|
|
line_ttl=self._cfg.line_ttl, |
|
|
) |
|
|
return self._region_trackers[label] |
|
|
|