|
|
import re |
|
|
import unicodedata |
|
|
from collections import defaultdict |
|
|
from typing import List, Tuple, Dict, Set |
|
|
import heapq |
|
|
from loguru import logger |
|
|
|
|
|
class SpanishFuzzyMatcher: |
|
|
def __init__(self, dictionary_path: str): |
|
|
self.dictionary = set() |
|
|
self.word_by_length = defaultdict(list) |
|
|
self.ngram_index = defaultdict(set) |
|
|
self.common_words = set() |
|
|
|
|
|
self._load_dictionary(dictionary_path) |
|
|
self._build_indexes() |
|
|
self._load_common_words() |
|
|
|
|
|
def _detect_encoding(self, path: str) -> str: |
|
|
encodings = ['utf-8', 'latin1', 'iso-8859-1', 'cp1252', 'utf-16'] |
|
|
|
|
|
for encoding in encodings: |
|
|
try: |
|
|
with open(path, 'r', encoding=encoding) as f: |
|
|
f.read(1024) |
|
|
return encoding |
|
|
except (UnicodeDecodeError, UnicodeError): |
|
|
continue |
|
|
|
|
|
return 'utf-8' |
|
|
|
|
|
def _load_dictionary(self, path: str): |
|
|
try: |
|
|
encoding = self._detect_encoding(path) |
|
|
print(f"Detected encoding: {encoding}") |
|
|
|
|
|
with open(path, 'r', encoding=encoding, errors='ignore') as f: |
|
|
for line_num, line in enumerate(f, 1): |
|
|
try: |
|
|
word = line.strip().lower() |
|
|
if word and len(word) > 1: |
|
|
|
|
|
cleaned_word = re.sub(r"[^a-záéíóúüñç\-']", "", word) |
|
|
if cleaned_word and len(cleaned_word) > 1: |
|
|
self.dictionary.add(cleaned_word) |
|
|
self.word_by_length[len(cleaned_word)].append(cleaned_word) |
|
|
except Exception as e: |
|
|
print(f"Warning: Skipping line {line_num} due to error: {e}") |
|
|
continue |
|
|
|
|
|
print(f"Loaded {len(self.dictionary)} words from dictionary") |
|
|
|
|
|
except FileNotFoundError: |
|
|
raise FileNotFoundError(f"Dictionary file not found: {path}") |
|
|
except Exception as e: |
|
|
raise Exception(f"Error loading dictionary: {e}") |
|
|
|
|
|
def _load_common_words(self): |
|
|
common_spanish = { |
|
|
'el', 'la', 'de', 'que', 'y', 'a', 'en', 'un', 'es', 'se', 'no', 'te', 'lo', 'le', 'da', 'su', 'por', 'son', 'con', 'para', 'al', 'las', 'del', 'los', 'una', 'mi', 'muy', 'mas', 'me', 'si', 'ya', 'todo', 'como', 'pero', 'hay', 'o', 'cuando', 'esta', 'ser', 'tiene', 'estar', 'hacer', 'sobre', 'entre', 'poder', 'antes', 'tiempo', 'año', 'casa', 'día', 'vida', 'trabajo', 'hombre', 'mujer', 'mundo', 'parte', 'momento', 'lugar', 'país', 'forma', 'manera', 'estado', 'caso', 'grupo', 'agua', 'punto', 'vez', 'donde', 'quien', 'haber', 'tener', 'hacer', 'decir', 'ir', 'ver', 'dar', 'saber', 'querer', 'llegar', 'pasar', 'deber', 'poner', 'parecer', 'quedar', 'creer', 'hablar', 'llevar', 'dejar', 'seguir', 'encontrar', 'llamar', 'venir', 'pensar', 'salir', 'volver', 'tomar', 'conocer', 'vivir', 'sentir', 'tratar', 'mirar', 'contar', 'empezar', 'esperar', 'buscar', 'existir', 'entrar', 'trabajar', 'escribir', 'perder', 'producir', 'ocurrir', 'entender', 'pedir', 'recibir', 'recordar', 'terminar', 'permitir', 'aparecer', 'conseguir', 'comenzar', 'servir', 'sacar', 'necesitar', 'mantener', 'resultar', 'leer', 'caer', 'cambiar', 'presentar', 'crear', 'abrir', 'considerar', 'oír', 'acabar', 'convertir', 'ganar', 'traer', 'realizar', 'suponer', 'comprender', 'explicar', 'dedicar', 'andar', 'estudiar', 'mano', 'cabeza', 'ojo', 'cara', 'pie', 'corazón', 'vez', 'palabra', 'número', 'color', 'mesa', 'silla', 'libro', 'papel', 'coche', 'calle', 'puerta', 'ventana', 'ciudad', 'pueblo', 'escuela', 'hospital', 'iglesia', 'tienda', 'mercado', 'banco', 'hotel', 'restaurante', 'café', 'bar', 'teatro', 'cine', 'museo', 'parque', 'jardín', 'playa', 'montaña', 'río', 'mar', 'lago', 'bosque', 'árbol', 'flor', 'animal', 'perro', 'gato', 'pájaro', 'pez', 'comida', 'pan', 'carne', 'pollo', 'pescado', 'leche', 'huevo', 'queso', 'fruta', 'verdura', 'patata', 'tomate', 'cebolla', 'ajo', 'sal', 'azúcar', 'aceite', 'vino', 'cerveza', 'café', 'té', 'agua', 'fuego', 'aire', 'tierra', 'sol', 'luna', 'estrella', 'nube', 'lluvia', 'nieve', 'viento', 'calor', 'frío', 'luz', 'sombra', 'mañana', 'tarde', 'noche', 'hoy', 'ayer', 'mañana', 'semana', 'mes', 'año', 'hora', 'minuto', 'segundo', 'lunes', 'martes', 'miércoles', 'jueves', 'viernes', 'sábado', 'domingo', 'enero', 'febrero', 'marzo', 'abril', 'mayo', 'junio', 'julio', 'agosto', 'septiembre', 'octubre', 'noviembre', 'diciembre', 'primavera', 'verano', 'otoño', 'invierno', 'bueno', 'malo', 'grande', 'pequeño', 'alto', 'bajo', 'largo', 'corto', 'ancho', 'estrecho', 'grueso', 'delgado', 'fuerte', 'débil', 'rápido', 'lento', 'fácil', 'difícil', 'nuevo', 'viejo', 'joven', 'mayor', 'blanco', 'negro', 'rojo', 'azul', 'verde', 'amarillo', 'gris', 'marrón', 'rosa', 'naranja', 'morado', 'feliz', 'triste', 'contento', 'enfadado', 'cansado', 'aburrido', 'interesante', 'divertido', 'importante', 'necesario', 'posible', 'imposible', 'seguro', 'peligroso', 'rico', 'pobre', 'caro', 'barato', 'limpio', 'sucio', 'sano', 'enfermo', 'vivo', 'muerto', 'lleno', 'vacío', 'abierto', 'cerrado', 'caliente', 'frío', 'seco', 'mojado', 'duro', 'blando', 'suave', 'áspero', 'dulce', 'amargo', 'salado', 'picante', 'conocerte', 'tengas' |
|
|
} |
|
|
self.common_words = {word for word in common_spanish if word in self.dictionary} |
|
|
print(f"Loaded {len(self.common_words)} common words") |
|
|
|
|
|
def _is_common_spanish_error(self, ocr_word: str, dict_word: str) -> bool: |
|
|
ocr_lower = ocr_word.lower() |
|
|
dict_lower = dict_word.lower() |
|
|
|
|
|
|
|
|
ocr_substitutions = { |
|
|
'b': 'v', 'v': 'b', |
|
|
'c': 's', 's': 'c', |
|
|
'z': 's', 's': 'z', |
|
|
'j': 'g', 'g': 'j', |
|
|
'y': 'i', 'i': 'y', |
|
|
'u': 'n', 'n': 'u', |
|
|
'll': 'y', 'y': 'll', |
|
|
'ñ': 'n', 'n': 'ñ', |
|
|
} |
|
|
|
|
|
if len(ocr_lower) == len(dict_lower): |
|
|
diff_count = sum(1 for a, b in zip(ocr_lower, dict_lower) if a != b) |
|
|
if diff_count == 1: |
|
|
for i, (a, b) in enumerate(zip(ocr_lower, dict_lower)): |
|
|
if a != b: |
|
|
return a in ocr_substitutions and ocr_substitutions[a] == b |
|
|
|
|
|
return False |
|
|
def _build_indexes(self): |
|
|
for word in self.dictionary: |
|
|
padded_word = f"${word}$" |
|
|
for i in range(len(padded_word) - 2): |
|
|
trigram = padded_word[i:i+3] |
|
|
self.ngram_index[trigram].add(word) |
|
|
|
|
|
def _normalize_text(self, text: str) -> str: |
|
|
text = unicodedata.normalize('NFD', text) |
|
|
text = ''.join(c for c in text if unicodedata.category(c) != 'Mn') |
|
|
return text.lower() |
|
|
|
|
|
def _levenshtein_distance(self, s1: str, s2: str) -> int: |
|
|
if len(s1) < len(s2): |
|
|
return self._levenshtein_distance(s2, s1) |
|
|
|
|
|
if len(s2) == 0: |
|
|
return len(s1) |
|
|
|
|
|
previous_row = list(range(len(s2) + 1)) |
|
|
for i, c1 in enumerate(s1): |
|
|
current_row = [i + 1] |
|
|
for j, c2 in enumerate(s2): |
|
|
insertions = previous_row[j + 1] + 1 |
|
|
deletions = current_row[j] + 1 |
|
|
substitutions = previous_row[j] + (c1 != c2) |
|
|
current_row.append(min(insertions, deletions, substitutions)) |
|
|
previous_row = current_row |
|
|
|
|
|
return previous_row[-1] |
|
|
|
|
|
def _damerau_levenshtein_distance(self, s1: str, s2: str) -> int: |
|
|
len1, len2 = len(s1), len(s2) |
|
|
|
|
|
da = {} |
|
|
for char in s1 + s2: |
|
|
if char not in da: |
|
|
da[char] = 0 |
|
|
|
|
|
max_dist = len1 + len2 |
|
|
h = [[max_dist for _ in range(len2 + 2)] for _ in range(len1 + 2)] |
|
|
|
|
|
h[0][0] = max_dist |
|
|
for i in range(0, len1 + 1): |
|
|
h[i + 1][0] = max_dist |
|
|
h[i + 1][1] = i |
|
|
for j in range(0, len2 + 1): |
|
|
h[0][j + 1] = max_dist |
|
|
h[1][j + 1] = j |
|
|
|
|
|
for i in range(1, len1 + 1): |
|
|
db = 0 |
|
|
for j in range(1, len2 + 1): |
|
|
k = da[s2[j - 1]] |
|
|
l = db |
|
|
if s1[i - 1] == s2[j - 1]: |
|
|
cost = 0 |
|
|
db = j |
|
|
else: |
|
|
cost = 1 |
|
|
|
|
|
h[i + 1][j + 1] = min( |
|
|
h[i][j] + cost, |
|
|
h[i + 1][j] + 1, |
|
|
h[i][j + 1] + 1, |
|
|
h[k][l] + (i - k - 1) + 1 + (j - l - 1) |
|
|
) |
|
|
|
|
|
da[s1[i - 1]] = i |
|
|
|
|
|
return h[len1 + 1][len2 + 1] |
|
|
|
|
|
def _jaro_winkler_similarity(self, s1: str, s2: str) -> float: |
|
|
def jaro_similarity(s1: str, s2: str) -> float: |
|
|
if s1 == s2: |
|
|
return 1.0 |
|
|
|
|
|
len1, len2 = len(s1), len(s2) |
|
|
if len1 == 0 or len2 == 0: |
|
|
return 0.0 |
|
|
|
|
|
match_window = max(len1, len2) // 2 - 1 |
|
|
if match_window < 0: |
|
|
match_window = 0 |
|
|
|
|
|
s1_matches = [False] * len1 |
|
|
s2_matches = [False] * len2 |
|
|
|
|
|
matches = 0 |
|
|
transpositions = 0 |
|
|
|
|
|
for i in range(len1): |
|
|
start = max(0, i - match_window) |
|
|
end = min(i + match_window + 1, len2) |
|
|
|
|
|
for j in range(start, end): |
|
|
if s2_matches[j] or s1[i] != s2[j]: |
|
|
continue |
|
|
s1_matches[i] = s2_matches[j] = True |
|
|
matches += 1 |
|
|
break |
|
|
|
|
|
if matches == 0: |
|
|
return 0.0 |
|
|
|
|
|
k = 0 |
|
|
for i in range(len1): |
|
|
if not s1_matches[i]: |
|
|
continue |
|
|
while not s2_matches[k]: |
|
|
k += 1 |
|
|
if s1[i] != s2[k]: |
|
|
transpositions += 1 |
|
|
k += 1 |
|
|
|
|
|
jaro = (matches / len1 + matches / len2 + |
|
|
(matches - transpositions / 2) / matches) / 3 |
|
|
return jaro |
|
|
|
|
|
jaro = jaro_similarity(s1, s2) |
|
|
|
|
|
prefix_len = 0 |
|
|
for i in range(min(len(s1), len(s2), 4)): |
|
|
if s1[i] == s2[i]: |
|
|
prefix_len += 1 |
|
|
else: |
|
|
break |
|
|
|
|
|
return jaro + (0.1 * prefix_len * (1 - jaro)) |
|
|
|
|
|
def _get_candidates(self, word: str, max_candidates: int = 200) -> Set[str]: |
|
|
candidates = set() |
|
|
word_len = len(word) |
|
|
|
|
|
common_candidates = set() |
|
|
for common_word in self.common_words: |
|
|
if abs(len(common_word) - word_len) <= 2: |
|
|
common_candidates.add(common_word) |
|
|
|
|
|
candidates.update(common_candidates) |
|
|
|
|
|
for length in range(max(1, word_len - 2), word_len + 3): |
|
|
length_words = self.word_by_length[length] |
|
|
|
|
|
sorted_words = sorted(length_words, key=len)[:max_candidates//3] |
|
|
candidates.update(sorted_words) |
|
|
|
|
|
padded_word = f"${word}$" |
|
|
trigram_candidates = set() |
|
|
trigram_scores = defaultdict(int) |
|
|
|
|
|
for i in range(len(padded_word) - 2): |
|
|
trigram = padded_word[i:i+3] |
|
|
if trigram in self.ngram_index: |
|
|
for candidate in self.ngram_index[trigram]: |
|
|
trigram_scores[candidate] += 1 |
|
|
|
|
|
sorted_trigram = sorted(trigram_scores.items(), key=lambda x: x[1], reverse=True) |
|
|
trigram_candidates = {word for word, score in sorted_trigram[:max_candidates//2]} |
|
|
candidates.update(trigram_candidates) |
|
|
|
|
|
return candidates |
|
|
|
|
|
def _calculate_composite_score(self, word1: str, word2: str) -> float: |
|
|
norm_word1 = self._normalize_text(word1) |
|
|
norm_word2 = self._normalize_text(word2) |
|
|
|
|
|
levenshtein = self._levenshtein_distance(norm_word1, norm_word2) |
|
|
damerau = self._damerau_levenshtein_distance(norm_word1, norm_word2) |
|
|
jaro_winkler = self._jaro_winkler_similarity(norm_word1, norm_word2) |
|
|
|
|
|
max_len = max(len(norm_word1), len(norm_word2)) |
|
|
if max_len == 0: |
|
|
return 1.0 |
|
|
|
|
|
levenshtein_sim = 1 - (levenshtein / max_len) |
|
|
damerau_sim = 1 - (damerau / max_len) |
|
|
|
|
|
length_diff = abs(len(norm_word1) - len(norm_word2)) |
|
|
length_penalty = 1 - (length_diff / max(len(norm_word1), len(norm_word2))) |
|
|
|
|
|
frequency_bonus = 1.0 |
|
|
if norm_word2 in self.common_words: |
|
|
frequency_bonus = 1.3 |
|
|
|
|
|
spanish_error_bonus = 1.0 |
|
|
if self._is_common_spanish_error(word1, word2): |
|
|
spanish_error_bonus = 1.2 |
|
|
|
|
|
exact_length_bonus = 1.0 |
|
|
if len(norm_word1) == len(norm_word2): |
|
|
exact_length_bonus = 1.1 |
|
|
|
|
|
base_score = ( |
|
|
0.25 * levenshtein_sim + |
|
|
0.45 * damerau_sim + |
|
|
0.25 * jaro_winkler + |
|
|
0.05 * length_penalty |
|
|
) |
|
|
|
|
|
final_score = base_score * frequency_bonus * spanish_error_bonus * exact_length_bonus |
|
|
|
|
|
return min(final_score, 1.0) |
|
|
|
|
|
def find_best_matches(self, word: str, top_k: int = 5, threshold: float = 0.4) -> List[Tuple[str, float]]: |
|
|
if not word or len(word) < 2: |
|
|
return [] |
|
|
|
|
|
normalized_word = self._normalize_text(word) |
|
|
if normalized_word in self.dictionary: |
|
|
return [(word, 1.0)] |
|
|
|
|
|
if word.lower() in self.dictionary: |
|
|
return [(word.lower(), 1.0)] |
|
|
|
|
|
candidates = self._get_candidates(normalized_word) |
|
|
|
|
|
scored_matches = [] |
|
|
for candidate in candidates: |
|
|
score = self._calculate_composite_score(word, candidate) |
|
|
if score >= threshold: |
|
|
heapq.heappush(scored_matches, (-score, candidate, score)) |
|
|
|
|
|
results = [] |
|
|
seen_words = set() |
|
|
for _ in range(min(top_k, len(scored_matches))): |
|
|
if scored_matches: |
|
|
_, candidate, score = heapq.heappop(scored_matches) |
|
|
if candidate not in seen_words: |
|
|
results.append((candidate, score)) |
|
|
seen_words.add(candidate) |
|
|
|
|
|
return results |
|
|
|
|
|
def correct_sentence(self, sentence: str, confidence_threshold: float = 0.6) -> str: |
|
|
words = re.findall(r'\b\w+\b|\W+', sentence) |
|
|
corrected_words = [] |
|
|
|
|
|
for token in words: |
|
|
if re.match(r'\b\w+\b', token): |
|
|
matches = self.find_best_matches(token, top_k=1, threshold=0.3) |
|
|
|
|
|
if matches and matches[0][1] >= confidence_threshold: |
|
|
corrected_words.append(matches[0][0]) |
|
|
else: |
|
|
corrected_words.append(token) |
|
|
else: |
|
|
corrected_words.append(token) |
|
|
|
|
|
return ''.join(corrected_words) |
|
|
|
|
|
def PostProcessing(ocr_sentence): |
|
|
try: |
|
|
logger.info("Post processing started......") |
|
|
matcher = SpanishFuzzyMatcher('Diccionario.Espanol.136k.palabras.txt') |
|
|
logger.info("Dictionary loaded successfully!") |
|
|
|
|
|
corrected = matcher.correct_sentence(ocr_sentence, confidence_threshold=0.6) |
|
|
logger.info("Post processing completed successfully!") |
|
|
return corrected |
|
|
|
|
|
except Exception as e: |
|
|
print(e) |
|
|
logger.error(f"Post processing failed: {e}") |
|
|
return ocr_sentence |