J / src /cross_referencer.py
Andro0s's picture
Upload 13 files
85fa7d2 verified
"""
Cross-Referencer - Correlación inteligente de resultados entre múltiples motores
Este módulo es la clave para unir hallazgos de Yandex, Bing y PimEyes.
"""
from typing import List, Dict, Set, Tuple
from urllib.parse import urlparse, parse_qs
import re
from difflib import SequenceMatcher
from collections import defaultdict
from loguru import logger
import hashlib
class CrossReferencer:
"""
Sistema de correlación que une resultados de múltiples fuentes.
Si Yandex encuentra una foto y el OCR de PimEyes detecta el mismo dominio,
este módulo los vincula automáticamente.
"""
def __init__(self, domain_similarity_threshold: float = 0.85):
"""
Args:
domain_similarity_threshold: Umbral de similitud para considerar dominios iguales (0.0-1.0)
"""
self.domain_threshold = domain_similarity_threshold
self.domain_cache = {} # Cache de dominios normalizados
def normalize_domain(self, url_or_domain: str) -> str:
"""
Normaliza un dominio o URL para comparación.
Args:
url_or_domain: URL completa o dominio
Returns:
Dominio normalizado
"""
# Usar cache
if url_or_domain in self.domain_cache:
return self.domain_cache[url_or_domain]
# Limpiar
cleaned = url_or_domain.lower().strip()
# Si es una URL, extraer dominio
if cleaned.startswith(('http://', 'https://')):
parsed = urlparse(cleaned)
domain = parsed.netloc
else:
domain = cleaned
# Remover www.
domain = re.sub(r'^www\.', '', domain)
# Remover puerto si existe
domain = re.sub(r':\d+$', '', domain)
# Remover subdominios comunes que no son relevantes
domain = re.sub(r'^(m\.|mobile\.|static\.|cdn\.)', '', domain)
# Cache
self.domain_cache[url_or_domain] = domain
return domain
def extract_domain_from_url(self, url: str) -> str:
"""
Extrae el dominio principal de una URL.
"""
try:
parsed = urlparse(url)
domain = parsed.netloc
# Remover www
domain = re.sub(r'^www\.', '', domain)
# Obtener dominio principal (sin subdominios)
parts = domain.split('.')
if len(parts) >= 2:
return '.'.join(parts[-2:])
return domain
except Exception as e:
logger.debug(f"Error extrayendo dominio de {url}: {e}")
return ""
def calculate_domain_similarity(self, domain1: str, domain2: str) -> float:
"""
Calcula la similitud entre dos dominios.
Returns:
Similitud entre 0.0 y 1.0
"""
# Normalizar ambos
d1 = self.normalize_domain(domain1)
d2 = self.normalize_domain(domain2)
# Comparación exacta
if d1 == d2:
return 1.0
# Comparación difusa usando SequenceMatcher
similarity = SequenceMatcher(None, d1, d2).ratio()
return similarity
def find_cross_references(self, all_results: Dict[str, List[Dict]],
ocr_results: Dict = None) -> List[Dict]:
"""
Encuentra correlaciones entre resultados de diferentes motores.
Args:
all_results: Diccionario con resultados por motor {'yandex': [...], 'bing': [...], ...}
ocr_results: Resultados de OCR de miniaturas censuradas
Returns:
Lista de resultados correlacionados y enriquecidos
"""
logger.info("Iniciando cross-referencing de resultados")
# Índice de dominios
domain_index = defaultdict(list)
# Indexar todos los resultados por dominio
for source, results in all_results.items():
for idx, result in enumerate(results):
# Extraer dominio
if 'url' in result:
domain = self.extract_domain_from_url(result['url'])
elif 'domain' in result:
domain = self.normalize_domain(result['domain'])
else:
continue
# Añadir al índice
result['_original_source'] = source
result['_original_index'] = idx
domain_index[domain].append(result)
# Si hay resultados de OCR, añadirlos al índice
if ocr_results:
for ocr_item in ocr_results:
domain = self.normalize_domain(ocr_item.get('domain', ''))
ocr_item['_is_ocr'] = True
domain_index[domain].append(ocr_item)
# Encontrar correlaciones
cross_referenced_results = []
processed_domains = set()
for domain, items in domain_index.items():
if domain in processed_domains or not domain:
continue
# Si hay múltiples fuentes para el mismo dominio, es una correlación
sources = set(item.get('_original_source') for item in items if '_original_source' in item)
has_ocr = any(item.get('_is_ocr', False) for item in items)
if len(sources) > 1 or has_ocr:
# Crear resultado correlacionado
correlation = self._create_correlation(domain, items, sources)
cross_referenced_results.append(correlation)
logger.info(f"Correlación encontrada: {domain} en {sources}")
processed_domains.add(domain)
# Añadir resultados sin correlación pero verificados
for source, results in all_results.items():
for result in results:
domain = self.extract_domain_from_url(result.get('url', ''))
if domain not in processed_domains:
result['cross_referenced'] = False
result['sources'] = [source]
cross_referenced_results.append(result)
# Ordenar por número de fuentes (más fuentes = más confiable)
cross_referenced_results.sort(
key=lambda x: (
len(x.get('sources', [])),
x.get('ocr_verified', False),
x.get('confidence', 0)
),
reverse=True
)
logger.success(f"Cross-referencing completado: {len(cross_referenced_results)} resultados procesados")
return cross_referenced_results
def _create_correlation(self, domain: str, items: List[Dict], sources: Set[str]) -> Dict:
"""
Crea un resultado correlacionado unificado.
"""
# Separar items de OCR y de búsqueda
ocr_items = [i for i in items if i.get('_is_ocr', False)]
search_items = [i for i in items if not i.get('_is_ocr', False)]
# Tomar el mejor resultado de búsqueda (primero de Yandex si existe)
primary_result = None
for source in ['yandex', 'bing', 'google', 'pimeyes']:
candidates = [i for i in search_items if i.get('_original_source') == source]
if candidates:
primary_result = candidates[0]
break
if not primary_result and search_items:
primary_result = search_items[0]
# Crear resultado unificado
correlation = {
'domain': domain,
'cross_referenced': True,
'sources': list(sources),
'ocr_verified': len(ocr_items) > 0,
'confidence': self._calculate_correlation_confidence(sources, ocr_items),
}
# Añadir datos del resultado primario
if primary_result:
correlation.update({
'url': primary_result.get('url'),
'thumbnail_url': primary_result.get('thumbnail_url'),
'primary_source': primary_result.get('_original_source'),
})
# Añadir datos de OCR
if ocr_items:
correlation['ocr_data'] = {
'extracted_domains': [i.get('domain') for i in ocr_items],
'avg_confidence': sum(i.get('confidence', 0) for i in ocr_items) / len(ocr_items),
'extraction_methods': [i.get('method', 'unknown') for i in ocr_items],
}
# Añadir todas las URLs alternativas
all_urls = [i.get('url') for i in search_items if i.get('url')]
if all_urls:
correlation['alternative_urls'] = list(set(all_urls))
return correlation
def _calculate_correlation_confidence(self, sources: Set[str], ocr_items: List[Dict]) -> float:
"""
Calcula la confianza de una correlación basada en número de fuentes y OCR.
Returns:
Confianza entre 0.0 y 1.0
"""
base_confidence = 0.5
# Bonus por cada fuente adicional (máx 0.15 por fuente)
source_bonus = min(len(sources) * 0.15, 0.45)
# Bonus si hay verificación OCR
ocr_bonus = 0.0
if ocr_items:
avg_ocr_confidence = sum(i.get('confidence', 0) for i in ocr_items) / len(ocr_items)
ocr_bonus = avg_ocr_confidence * 0.2 # Máx 0.2
total_confidence = min(base_confidence + source_bonus + ocr_bonus, 1.0)
return round(total_confidence, 3)
def match_pimeyes_with_search(self, pimeyes_results: List[Dict],
search_results: List[Dict],
ocr_domains: List[str]) -> List[Dict]:
"""
Método especializado para correlacionar PimEyes (censurado) con búsquedas abiertas.
Este es el "truco" principal: si PimEyes tiene una miniatura censurada pero el OCR
detecta "ejemplo.com", y Yandex encuentra "ejemplo.com/foto.jpg", los unimos.
Args:
pimeyes_results: Resultados de PimEyes (censurados)
search_results: Resultados de Yandex/Bing (abiertos)
ocr_domains: Dominios extraídos por OCR de miniaturas de PimEyes
Returns:
Lista de matches con URLs desbloquedas
"""
logger.info("Matching PimEyes censurado con búsquedas abiertas")
matches = []
for ocr_domain in ocr_domains:
normalized_ocr = self.normalize_domain(ocr_domain)
# Buscar en resultados de búsqueda
for search_result in search_results:
search_domain = self.extract_domain_from_url(search_result.get('url', ''))
# Si los dominios coinciden
if self.calculate_domain_similarity(normalized_ocr, search_domain) >= self.domain_threshold:
match = {
'pimeyes_domain_ocr': ocr_domain,
'matched_url': search_result.get('url'),
'thumbnail_url': search_result.get('thumbnail_url'),
'source': search_result.get('source', 'unknown'),
'match_confidence': self.calculate_domain_similarity(normalized_ocr, search_domain),
'unlocked': True, # Desbloqueado!
}
matches.append(match)
logger.success(f"✓ PimEyes censurado desbloqueado: {ocr_domain}{search_result['url']}")
return matches
def deduplicate_results(self, results: List[Dict]) -> List[Dict]:
"""
Elimina resultados duplicados basándose en URL y hash de imagen.
Args:
results: Lista de resultados
Returns:
Lista sin duplicados
"""
seen_urls = set()
seen_hashes = set()
unique_results = []
for result in results:
url = result.get('url', '')
# Hash del URL
url_hash = hashlib.md5(url.encode()).hexdigest() if url else None
# Hash de thumbnail si existe
thumb_hash = None
if result.get('thumbnail_url'):
thumb_hash = hashlib.md5(result['thumbnail_url'].encode()).hexdigest()
# Verificar duplicados
is_duplicate = False
if url and url in seen_urls:
is_duplicate = True
if url_hash and url_hash in seen_hashes:
is_duplicate = True
if thumb_hash and thumb_hash in seen_hashes:
is_duplicate = True
if not is_duplicate:
unique_results.append(result)
if url:
seen_urls.add(url)
if url_hash:
seen_hashes.add(url_hash)
if thumb_hash:
seen_hashes.add(thumb_hash)
logger.info(f"Deduplicación: {len(results)}{len(unique_results)} únicos")
return unique_results
def generate_final_report(self, cross_referenced_results: List[Dict]) -> Dict:
"""
Genera un reporte final unificado con estadísticas.
Returns:
Diccionario con reporte completo
"""
# Estadísticas
total_results = len(cross_referenced_results)
cross_ref_count = sum(1 for r in cross_referenced_results if r.get('cross_referenced', False))
ocr_verified_count = sum(1 for r in cross_referenced_results if r.get('ocr_verified', False))
# Agrupar por fuente
by_source = defaultdict(int)
for result in cross_referenced_results:
for source in result.get('sources', []):
by_source[source] += 1
# Dominios únicos
unique_domains = set()
for result in cross_referenced_results:
domain = result.get('domain')
if domain:
unique_domains.add(domain)
# Resultados de alta confianza (>0.8)
high_confidence = [r for r in cross_referenced_results if r.get('confidence', 0) > 0.8]
report = {
'summary': {
'total_results': total_results,
'cross_referenced': cross_ref_count,
'ocr_verified': ocr_verified_count,
'unique_domains': len(unique_domains),
'high_confidence_results': len(high_confidence),
},
'by_source': dict(by_source),
'results': cross_referenced_results,
'top_matches': cross_referenced_results[:10], # Top 10
}
logger.info(f"Reporte generado: {total_results} resultados, {cross_ref_count} correlacionados")
return report
# Función de utilidad
def quick_cross_reference(yandex_results: List[Dict],
bing_results: List[Dict],
pimeyes_ocr_domains: List[str]) -> List[Dict]:
"""
Función de conveniencia para correlacionar rápidamente.
Args:
yandex_results: Resultados de Yandex
bing_results: Resultados de Bing
pimeyes_ocr_domains: Dominios extraídos de PimEyes por OCR
Returns:
Lista de resultados correlacionados
"""
xref = CrossReferencer()
all_results = {
'yandex': yandex_results,
'bing': bing_results,
}
# Convertir dominios OCR al formato esperado
ocr_results = [{'domain': d, 'confidence': 0.8} for d in pimeyes_ocr_domains]
return xref.find_cross_references(all_results, ocr_results)
if __name__ == "__main__":
# Ejemplo de uso
xref = CrossReferencer()
# Resultados de ejemplo
yandex = [
{'url': 'https://example.com/photo1.jpg', 'source': 'yandex'},
{'url': 'https://test.com/image.png', 'source': 'yandex'},
]
bing = [
{'url': 'https://example.com/photo2.jpg', 'source': 'bing'},
{'url': 'https://another.com/pic.jpg', 'source': 'bing'},
]
ocr_domains = ['example.com', 'test.com']
# Cross-reference
results = quick_cross_reference(yandex, bing, ocr_domains)
print(f"\nResultados correlacionados: {len(results)}")
for r in results:
print(f" • {r.get('domain')} - Fuentes: {r.get('sources')} - OCR: {r.get('ocr_verified')}")