|
|
"""Smart OCR deduplication — stabilization-first approach. |
|
|
|
|
|
Core principle: **don't read text until it STOPS CHANGING**. |
|
|
Then check against read history to avoid repeats. |
|
|
|
|
|
Architecture: |
|
|
|
|
|
Phase 1 — **Snapshot Stabilization** |
|
|
Each tick compares the full OCR output (all regions merged) with the |
|
|
previous tick. If text is growing (typewriter effect), we wait. |
|
|
Only when the snapshot is identical for ``stabilize_ticks`` consecutive |
|
|
ticks do we consider it "stable" and proceed. |
|
|
|
|
|
Phase 2 — **Line History Dedup** |
|
|
Once stable, each line is fuzzy-compared against a history of previously |
|
|
emitted lines. Only genuinely new lines pass through. History entries |
|
|
expire via TTL so the same text can be re-read after a cooldown. |
|
|
|
|
|
Phase 3 — **Significance Check** |
|
|
Rejects composed output that is too short, has too few real words, |
|
|
or is mostly non-alphanumeric (OCR garbage / UI artifacts). |
|
|
|
|
|
This naturally handles: |
|
|
- **Typewriter effects**: text grows → wait → stabilize → read complete sentence |
|
|
- **Static UI** (HP bars, names): stabilizes → read once → in history → skip |
|
|
- **OCR noise**: fuzzy matching tolerates minor variations |
|
|
- **Dialog changes**: snapshot changes → re-stabilize → emit new parts only |
|
|
- **Repeated dialog**: TTL expiry allows re-reading after cooldown |
|
|
|
|
|
Usage:: |
|
|
|
|
|
from src.services.ocr.dedup import SmartDedup |
|
|
|
|
|
dedup = SmartDedup() |
|
|
text = dedup.process(region_labels, ocr_results) |
|
|
if text is not None: |
|
|
translate_and_speak(text) |
|
|
""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import time |
|
|
from collections import deque |
|
|
from dataclasses import dataclass |
|
|
from difflib import SequenceMatcher |
|
|
|
|
|
from src.services.ocr.models import OcrResult |
|
|
from src.utils.logger import logger |
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_STABILIZE_TICKS: int = 3 |
|
|
DEFAULT_SNAPSHOT_SIMILARITY: float = 0.92 |
|
|
DEFAULT_LINE_SIMILARITY: float = 0.80 |
|
|
DEFAULT_LINE_TTL: float = 120.0 |
|
|
DEFAULT_HISTORY_TTL: float = 90.0 |
|
|
DEFAULT_HISTORY_SIZE: int = 30 |
|
|
DEFAULT_MIN_NEW_CHARS: int = 8 |
|
|
DEFAULT_MIN_NEW_WORDS: int = 2 |
|
|
DEFAULT_MIN_ALNUM_RATIO: float = 0.35 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
class HistoryEntry: |
|
|
"""An entry in the global text history ring buffer.""" |
|
|
|
|
|
norm_text: str |
|
|
original_text: str |
|
|
first_seen: float |
|
|
last_seen: float |
|
|
hit_count: int = 1 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class DedupConfig: |
|
|
"""All tunable knobs for the dedup system. |
|
|
|
|
|
Attributes: |
|
|
stabilize_ticks: Consecutive identical ticks before text is considered "stable". |
|
|
snapshot_similarity: Fuzzy threshold for treating two snapshots as identical (0-1). |
|
|
line_similarity: Fuzzy threshold for line-level history matching (0-1). |
|
|
line_ttl: Seconds before a known line in history expires. |
|
|
history_ttl: Seconds before a global history entry expires. |
|
|
history_size: Max entries in the global history ring buffer. |
|
|
history_similarity: Alias for line_similarity (backward compat with bridge.py). |
|
|
min_new_chars: Minimum characters for a change to be significant. |
|
|
min_new_words: Minimum word count for significance. |
|
|
min_alnum_ratio: Minimum alphanumeric ratio for significance. |
|
|
debounce_time: Legacy field — not used internally, kept for bridge compat. |
|
|
""" |
|
|
|
|
|
stabilize_ticks: int = DEFAULT_STABILIZE_TICKS |
|
|
snapshot_similarity: float = DEFAULT_SNAPSHOT_SIMILARITY |
|
|
line_similarity: float = DEFAULT_LINE_SIMILARITY |
|
|
line_ttl: float = DEFAULT_LINE_TTL |
|
|
history_ttl: float = DEFAULT_HISTORY_TTL |
|
|
history_size: int = DEFAULT_HISTORY_SIZE |
|
|
history_similarity: float = DEFAULT_LINE_SIMILARITY |
|
|
min_new_chars: int = DEFAULT_MIN_NEW_CHARS |
|
|
min_new_words: int = DEFAULT_MIN_NEW_WORDS |
|
|
min_alnum_ratio: float = DEFAULT_MIN_ALNUM_RATIO |
|
|
debounce_time: float = 0.0 |
|
|
instant_mode: bool = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _normalize(text: str) -> str: |
|
|
"""Collapse whitespace, strip, lowercase — for comparison only.""" |
|
|
return " ".join(text.split()).strip().lower() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LineHistory: |
|
|
"""Tracks previously emitted lines with TTL-based expiry. |
|
|
|
|
|
Each emitted line is stored (normalized) with a timestamp. |
|
|
Old entries expire after ``ttl`` seconds, allowing re-reading. |
|
|
Fuzzy matching handles OCR noise on short lines. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
ttl: float = DEFAULT_LINE_TTL, |
|
|
similarity: float = DEFAULT_LINE_SIMILARITY, |
|
|
) -> None: |
|
|
self._entries: dict[str, float] = {} |
|
|
self._ttl = ttl |
|
|
self._similarity = similarity |
|
|
|
|
|
def is_known(self, line: str) -> bool: |
|
|
"""Check if a line was emitted recently (within TTL). |
|
|
|
|
|
Uses exact match first, then fuzzy for short lines. |
|
|
|
|
|
Args: |
|
|
line: Raw (non-normalized) line text. |
|
|
|
|
|
Returns: |
|
|
True if line is in recent history (should be skipped). |
|
|
""" |
|
|
norm = _normalize(line) |
|
|
if len(norm) < 2: |
|
|
return True |
|
|
|
|
|
now = time.monotonic() |
|
|
self._gc(now) |
|
|
|
|
|
|
|
|
if norm in self._entries: |
|
|
return True |
|
|
|
|
|
|
|
|
if len(norm) < 60: |
|
|
for key in self._entries: |
|
|
if abs(len(norm) - len(key)) > max(5, len(key) * 0.25): |
|
|
continue |
|
|
ratio = SequenceMatcher(None, norm, key).ratio() |
|
|
if ratio >= self._similarity: |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def mark_emitted(self, line: str) -> None: |
|
|
"""Record a line as emitted.""" |
|
|
norm = _normalize(line) |
|
|
if norm: |
|
|
self._entries[norm] = time.monotonic() |
|
|
|
|
|
def reset(self) -> None: |
|
|
"""Clear all history.""" |
|
|
self._entries.clear() |
|
|
|
|
|
@property |
|
|
def size(self) -> int: |
|
|
return len(self._entries) |
|
|
|
|
|
def _gc(self, now: float) -> None: |
|
|
"""Remove entries older than TTL.""" |
|
|
expired = [k for k, ts in self._entries.items() if now - ts > self._ttl] |
|
|
for k in expired: |
|
|
del self._entries[k] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GlobalTextHistory: |
|
|
"""Ring buffer of recently emitted text blocks with TTL. |
|
|
|
|
|
Prevents the same composed text from being re-emitted within |
|
|
the TTL window. Uses fuzzy matching to handle OCR noise. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
max_size: int = DEFAULT_HISTORY_SIZE, |
|
|
ttl: float = DEFAULT_HISTORY_TTL, |
|
|
similarity: float = DEFAULT_LINE_SIMILARITY, |
|
|
) -> None: |
|
|
self._entries: deque[HistoryEntry] = deque(maxlen=max_size) |
|
|
self._ttl = ttl |
|
|
self._similarity = similarity |
|
|
|
|
|
def is_duplicate(self, text: str) -> tuple[bool, float]: |
|
|
"""Check whether text duplicates something in recent history. |
|
|
|
|
|
Args: |
|
|
text: Composed text block. |
|
|
|
|
|
Returns: |
|
|
``(is_dup, best_similarity)`` |
|
|
""" |
|
|
now = time.monotonic() |
|
|
norm = _normalize(text) |
|
|
if not norm: |
|
|
return (True, 1.0) |
|
|
|
|
|
best_sim = 0.0 |
|
|
for entry in self._entries: |
|
|
if now - entry.last_seen > self._ttl: |
|
|
continue |
|
|
|
|
|
if entry.norm_text == norm: |
|
|
entry.last_seen = now |
|
|
entry.hit_count += 1 |
|
|
return (True, 1.0) |
|
|
|
|
|
ratio = SequenceMatcher(None, norm, entry.norm_text).ratio() |
|
|
best_sim = max(best_sim, ratio) |
|
|
if ratio >= self._similarity: |
|
|
entry.last_seen = now |
|
|
entry.hit_count += 1 |
|
|
return (True, ratio) |
|
|
|
|
|
return (False, best_sim) |
|
|
|
|
|
def add(self, text: str) -> None: |
|
|
"""Record a new text block in history.""" |
|
|
norm = _normalize(text) |
|
|
now = time.monotonic() |
|
|
self._entries.append( |
|
|
HistoryEntry( |
|
|
norm_text=norm, |
|
|
original_text=text, |
|
|
first_seen=now, |
|
|
last_seen=now, |
|
|
) |
|
|
) |
|
|
|
|
|
def reset(self) -> None: |
|
|
self._entries.clear() |
|
|
|
|
|
@property |
|
|
def size(self) -> int: |
|
|
return len(self._entries) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ChangeDetector: |
|
|
"""Decide whether new lines constitute a meaningful change. |
|
|
|
|
|
Rejects very short text, too few words, or mostly non-alphanumeric content. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
min_chars: int = DEFAULT_MIN_NEW_CHARS, |
|
|
min_words: int = DEFAULT_MIN_NEW_WORDS, |
|
|
min_alnum_ratio: float = DEFAULT_MIN_ALNUM_RATIO, |
|
|
) -> None: |
|
|
self._min_chars = min_chars |
|
|
self._min_words = min_words |
|
|
self._min_alnum_ratio = min_alnum_ratio |
|
|
|
|
|
def is_significant(self, new_lines: list[str]) -> bool: |
|
|
"""Return True if the new lines represent real content, not OCR garbage.""" |
|
|
text = " ".join(line.strip() for line in new_lines).strip() |
|
|
|
|
|
if len(text) < self._min_chars: |
|
|
return False |
|
|
|
|
|
words = text.split() |
|
|
if len(words) < self._min_words: |
|
|
return False |
|
|
|
|
|
alnum = sum(1 for c in text if c.isalnum()) |
|
|
ratio = alnum / len(text) if text else 0 |
|
|
if ratio < self._min_alnum_ratio: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SmartDedup: |
|
|
"""Stabilization-first OCR deduplication. |
|
|
|
|
|
Core algorithm: |
|
|
|
|
|
1. Each tick: merge all OCR results into a single text snapshot |
|
|
2. Compare snapshot with previous tick — growing? same? different? |
|
|
3. When snapshot is identical for ``stabilize_ticks`` consecutive ticks → STABLE |
|
|
4. Extract lines, filter against read history → emit only NEW lines |
|
|
5. Significance check → reject OCR garbage |
|
|
6. Add emitted lines to history, record in global ring buffer |
|
|
|
|
|
This replaces the old per-line-tracker approach which caused: |
|
|
- Sentence fragments (read partial text too early) |
|
|
- Infinite silence (partial lines marked "known" too aggressively) |
|
|
|
|
|
Example:: |
|
|
|
|
|
dedup = SmartDedup() |
|
|
|
|
|
# On each pipeline tick: |
|
|
text = dedup.process(region_labels, ocr_results) |
|
|
if text is not None: |
|
|
await translate_and_speak(text) |
|
|
|
|
|
# On pipeline stop or config change: |
|
|
dedup.reset() |
|
|
""" |
|
|
|
|
|
def __init__(self, config: DedupConfig | None = None) -> None: |
|
|
self._cfg = config or DedupConfig() |
|
|
|
|
|
|
|
|
self._last_snapshot: str | None = None |
|
|
self._last_raw: str | None = None |
|
|
self._stable_count: int = 0 |
|
|
self._processed_snapshot: str | None = None |
|
|
|
|
|
|
|
|
|
|
|
self._last_emitted_norm: str | None = None |
|
|
|
|
|
|
|
|
self._line_history = LineHistory( |
|
|
ttl=self._cfg.line_ttl, |
|
|
similarity=self._cfg.line_similarity, |
|
|
) |
|
|
self._global_history = GlobalTextHistory( |
|
|
max_size=self._cfg.history_size, |
|
|
ttl=self._cfg.history_ttl, |
|
|
similarity=self._cfg.history_similarity, |
|
|
) |
|
|
self._change_detector = ChangeDetector( |
|
|
min_chars=self._cfg.min_new_chars, |
|
|
min_words=self._cfg.min_new_words, |
|
|
min_alnum_ratio=self._cfg.min_alnum_ratio, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def process( |
|
|
self, |
|
|
region_labels: list[str], |
|
|
ocr_results: list[OcrResult], |
|
|
*, |
|
|
force: bool = False, |
|
|
) -> str | None: |
|
|
"""Run stabilization-based dedup on multi-region OCR results. |
|
|
|
|
|
Args: |
|
|
region_labels: Label/ID for each region (for diagnostics). |
|
|
ocr_results: OCR result per region (same order as labels). |
|
|
force: If True, skip all dedup and return all text immediately. |
|
|
|
|
|
Returns: |
|
|
Text to translate + speak, or None if suppressed by dedup. |
|
|
""" |
|
|
|
|
|
raw_parts: list[str] = [] |
|
|
for result in ocr_results: |
|
|
if result.error or result.is_empty: |
|
|
continue |
|
|
text = result.text.strip() |
|
|
if text: |
|
|
raw_parts.append(text) |
|
|
|
|
|
if not raw_parts: |
|
|
return None |
|
|
|
|
|
full_raw = "\n".join(raw_parts) |
|
|
full_norm = _normalize(full_raw) |
|
|
|
|
|
if not full_norm or len(full_norm) < 2: |
|
|
return None |
|
|
|
|
|
|
|
|
if force: |
|
|
self._global_history.add(full_raw) |
|
|
self._mark_all_lines_known(full_raw) |
|
|
self._last_snapshot = full_norm |
|
|
self._last_raw = full_raw |
|
|
self._processed_snapshot = full_norm |
|
|
self._stable_count = 0 |
|
|
logger.info("Dedup: force read — emitting %d chars", len(full_raw)) |
|
|
return full_raw |
|
|
|
|
|
|
|
|
if self._last_snapshot is None: |
|
|
|
|
|
self._last_snapshot = full_norm |
|
|
self._last_raw = full_raw |
|
|
self._stable_count = 0 |
|
|
self._processed_snapshot = None |
|
|
|
|
|
if not self._cfg.instant_mode: |
|
|
return None |
|
|
|
|
|
|
|
|
snapshot_sim = self._snapshot_similarity(self._last_snapshot, full_norm) |
|
|
|
|
|
if snapshot_sim >= self._cfg.snapshot_similarity: |
|
|
|
|
|
self._stable_count += 1 |
|
|
elif self._is_text_growing(self._last_snapshot, full_norm): |
|
|
|
|
|
self._stable_count = 0 |
|
|
self._last_snapshot = full_norm |
|
|
self._last_raw = full_raw |
|
|
self._processed_snapshot = None |
|
|
logger.debug("Dedup: text growing, waiting for stabilization") |
|
|
return None |
|
|
elif ( |
|
|
self._last_emitted_norm is not None |
|
|
and self._is_text_growing(self._last_emitted_norm, full_norm) |
|
|
): |
|
|
|
|
|
|
|
|
|
|
|
self._stable_count = 0 |
|
|
self._last_snapshot = full_norm |
|
|
self._last_raw = full_raw |
|
|
self._processed_snapshot = None |
|
|
logger.debug("Dedup: post-emit growth detected, waiting for continuation") |
|
|
return None |
|
|
else: |
|
|
|
|
|
self._stable_count = 0 |
|
|
self._last_snapshot = full_norm |
|
|
self._last_raw = full_raw |
|
|
self._processed_snapshot = None |
|
|
logger.debug("Dedup: snapshot changed, waiting for stabilization") |
|
|
return None |
|
|
|
|
|
|
|
|
self._last_snapshot = full_norm |
|
|
self._last_raw = full_raw |
|
|
|
|
|
|
|
|
required_ticks = 1 if self._cfg.instant_mode else self._cfg.stabilize_ticks |
|
|
if self._stable_count < required_ticks: |
|
|
return None |
|
|
|
|
|
|
|
|
if self._processed_snapshot is not None: |
|
|
sim = self._snapshot_similarity(full_norm, self._processed_snapshot) |
|
|
if sim >= self._cfg.snapshot_similarity: |
|
|
return None |
|
|
|
|
|
|
|
|
all_lines = self._extract_lines(full_raw, ocr_results) |
|
|
new_lines: list[str] = [] |
|
|
|
|
|
for line in all_lines: |
|
|
if not self._line_history.is_known(line): |
|
|
new_lines.append(line) |
|
|
|
|
|
|
|
|
if new_lines: |
|
|
composed = "\n".join(new_lines) |
|
|
is_dup, sim = self._global_history.is_duplicate(composed) |
|
|
if is_dup: |
|
|
logger.debug("Dedup: global history match (sim=%.3f)", sim) |
|
|
new_lines = [] |
|
|
|
|
|
if not new_lines: |
|
|
|
|
|
self._processed_snapshot = full_norm |
|
|
return None |
|
|
|
|
|
|
|
|
if not self._change_detector.is_significant(new_lines): |
|
|
logger.debug( |
|
|
"Dedup: new lines not significant (%d lines, %d chars)", |
|
|
len(new_lines), |
|
|
sum(len(line) for line in new_lines), |
|
|
) |
|
|
self._processed_snapshot = full_norm |
|
|
return None |
|
|
|
|
|
|
|
|
composed = "\n".join(new_lines) |
|
|
self._mark_all_lines_known(composed) |
|
|
self._global_history.add(composed) |
|
|
self._processed_snapshot = full_norm |
|
|
|
|
|
self._last_emitted_norm = full_norm |
|
|
|
|
|
self._stable_count = 0 |
|
|
|
|
|
logger.info( |
|
|
"Dedup: emitting %d new lines (%d chars, %d known lines in history)", |
|
|
len(new_lines), |
|
|
len(composed), |
|
|
self._line_history.size, |
|
|
) |
|
|
return composed |
|
|
|
|
|
def force_flush(self) -> str | None: |
|
|
"""Force-emit whatever raw text is pending (for force-read button).""" |
|
|
if self._last_raw: |
|
|
raw = self._last_raw |
|
|
self._global_history.add(raw) |
|
|
self._mark_all_lines_known(raw) |
|
|
return raw |
|
|
return None |
|
|
|
|
|
def update_config(self, config: DedupConfig) -> None: |
|
|
"""Apply new configuration. Rebuilds internal components.""" |
|
|
self._cfg = config |
|
|
self._line_history = LineHistory( |
|
|
ttl=config.line_ttl, |
|
|
similarity=config.line_similarity, |
|
|
) |
|
|
self._global_history = GlobalTextHistory( |
|
|
max_size=config.history_size, |
|
|
ttl=config.history_ttl, |
|
|
similarity=config.history_similarity, |
|
|
) |
|
|
self._change_detector = ChangeDetector( |
|
|
min_chars=config.min_new_chars, |
|
|
min_words=config.min_new_words, |
|
|
min_alnum_ratio=config.min_alnum_ratio, |
|
|
) |
|
|
logger.info("SmartDedup: config updated") |
|
|
|
|
|
def reset(self) -> None: |
|
|
"""Clear all state (e.g. on scene change or pipeline restart).""" |
|
|
self._last_snapshot = None |
|
|
self._last_raw = None |
|
|
self._stable_count = 0 |
|
|
self._processed_snapshot = None |
|
|
self._last_emitted_norm = None |
|
|
self._line_history.reset() |
|
|
self._global_history.reset() |
|
|
logger.info("SmartDedup: all state reset") |
|
|
|
|
|
def reset_region(self, label: str) -> None: |
|
|
"""No-op in snapshot-based approach — kept for backward compat.""" |
|
|
pass |
|
|
|
|
|
@property |
|
|
def stats(self) -> dict[str, int]: |
|
|
"""Return diagnostic stats.""" |
|
|
return { |
|
|
"tracked_regions": 0, |
|
|
"total_known_lines": self._line_history.size, |
|
|
"history_size": self._global_history.size, |
|
|
"stable_count": self._stable_count, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
def _snapshot_similarity(a: str, b: str) -> float: |
|
|
"""Fast similarity between two normalized snapshots.""" |
|
|
if a == b: |
|
|
return 1.0 |
|
|
if not a or not b: |
|
|
return 0.0 |
|
|
return SequenceMatcher(None, a, b).ratio() |
|
|
|
|
|
@staticmethod |
|
|
def _is_text_growing(old_norm: str, new_norm: str) -> bool: |
|
|
"""Check if new text is an expansion of old text (typewriter effect). |
|
|
|
|
|
Returns True if new_norm is longer AND contains most of old_norm's |
|
|
words at the beginning (prefix-like growth). |
|
|
""" |
|
|
if len(new_norm) <= len(old_norm): |
|
|
return False |
|
|
|
|
|
|
|
|
if new_norm.startswith(old_norm): |
|
|
return True |
|
|
|
|
|
|
|
|
old_words = old_norm.split() |
|
|
new_words = new_norm.split() |
|
|
|
|
|
if len(new_words) <= len(old_words): |
|
|
return False |
|
|
|
|
|
|
|
|
matching = 0 |
|
|
for old_w, new_w in zip(old_words, new_words): |
|
|
if old_w == new_w: |
|
|
matching += 1 |
|
|
elif SequenceMatcher(None, old_w, new_w).ratio() > 0.8: |
|
|
|
|
|
matching += 1 |
|
|
|
|
|
|
|
|
return matching >= len(old_words) * 0.6 |
|
|
|
|
|
def _extract_lines( |
|
|
self, raw_text: str, ocr_results: list[OcrResult] |
|
|
) -> list[str]: |
|
|
"""Extract individual lines from OCR results. |
|
|
|
|
|
Prefers structured ``OcrResult.lines`` when available. |
|
|
Deduplicates across regions (overlapping capture areas). |
|
|
|
|
|
Args: |
|
|
raw_text: Fallback raw text (used if no structured lines). |
|
|
ocr_results: OCR results with structured lines. |
|
|
|
|
|
Returns: |
|
|
List of unique raw line texts. |
|
|
""" |
|
|
lines: list[str] = [] |
|
|
seen_norms: set[str] = set() |
|
|
|
|
|
for result in ocr_results: |
|
|
if result.error or result.is_empty: |
|
|
continue |
|
|
for ocr_line in result.lines: |
|
|
raw = ocr_line.text.strip() |
|
|
if not raw: |
|
|
continue |
|
|
norm = _normalize(raw) |
|
|
if len(norm) < 2: |
|
|
continue |
|
|
|
|
|
|
|
|
if norm in seen_norms: |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_cross_dup = False |
|
|
if len(norm) < 60: |
|
|
for seen in seen_norms: |
|
|
if abs(len(norm) - len(seen)) > 3: |
|
|
continue |
|
|
if SequenceMatcher(None, norm, seen).ratio() >= 0.95: |
|
|
is_cross_dup = True |
|
|
break |
|
|
if is_cross_dup: |
|
|
continue |
|
|
|
|
|
seen_norms.add(norm) |
|
|
lines.append(raw) |
|
|
|
|
|
|
|
|
if not lines: |
|
|
for line in raw_text.split("\n"): |
|
|
stripped = line.strip() |
|
|
if stripped and len(_normalize(stripped)) >= 2: |
|
|
norm = _normalize(stripped) |
|
|
if norm not in seen_norms: |
|
|
seen_norms.add(norm) |
|
|
lines.append(stripped) |
|
|
|
|
|
return lines |
|
|
|
|
|
def _mark_all_lines_known(self, text: str) -> None: |
|
|
"""Add all lines in text to line history.""" |
|
|
for line in text.split("\n"): |
|
|
stripped = line.strip() |
|
|
if stripped and len(_normalize(stripped)) >= 2: |
|
|
self._line_history.mark_emitted(stripped) |
|
|
|