""" Cross-Referencer - Correlación inteligente de resultados entre múltiples motores Este módulo es la clave para unir hallazgos de Yandex, Bing y PimEyes. """ from typing import List, Dict, Set, Tuple from urllib.parse import urlparse, parse_qs import re from difflib import SequenceMatcher from collections import defaultdict from loguru import logger import hashlib class CrossReferencer: """ Sistema de correlación que une resultados de múltiples fuentes. Si Yandex encuentra una foto y el OCR de PimEyes detecta el mismo dominio, este módulo los vincula automáticamente. """ def __init__(self, domain_similarity_threshold: float = 0.85): """ Args: domain_similarity_threshold: Umbral de similitud para considerar dominios iguales (0.0-1.0) """ self.domain_threshold = domain_similarity_threshold self.domain_cache = {} # Cache de dominios normalizados def normalize_domain(self, url_or_domain: str) -> str: """ Normaliza un dominio o URL para comparación. Args: url_or_domain: URL completa o dominio Returns: Dominio normalizado """ # Usar cache if url_or_domain in self.domain_cache: return self.domain_cache[url_or_domain] # Limpiar cleaned = url_or_domain.lower().strip() # Si es una URL, extraer dominio if cleaned.startswith(('http://', 'https://')): parsed = urlparse(cleaned) domain = parsed.netloc else: domain = cleaned # Remover www. domain = re.sub(r'^www\.', '', domain) # Remover puerto si existe domain = re.sub(r':\d+$', '', domain) # Remover subdominios comunes que no son relevantes domain = re.sub(r'^(m\.|mobile\.|static\.|cdn\.)', '', domain) # Cache self.domain_cache[url_or_domain] = domain return domain def extract_domain_from_url(self, url: str) -> str: """ Extrae el dominio principal de una URL. """ try: parsed = urlparse(url) domain = parsed.netloc # Remover www domain = re.sub(r'^www\.', '', domain) # Obtener dominio principal (sin subdominios) parts = domain.split('.') if len(parts) >= 2: return '.'.join(parts[-2:]) return domain except Exception as e: logger.debug(f"Error extrayendo dominio de {url}: {e}") return "" def calculate_domain_similarity(self, domain1: str, domain2: str) -> float: """ Calcula la similitud entre dos dominios. Returns: Similitud entre 0.0 y 1.0 """ # Normalizar ambos d1 = self.normalize_domain(domain1) d2 = self.normalize_domain(domain2) # Comparación exacta if d1 == d2: return 1.0 # Comparación difusa usando SequenceMatcher similarity = SequenceMatcher(None, d1, d2).ratio() return similarity def find_cross_references(self, all_results: Dict[str, List[Dict]], ocr_results: Dict = None) -> List[Dict]: """ Encuentra correlaciones entre resultados de diferentes motores. Args: all_results: Diccionario con resultados por motor {'yandex': [...], 'bing': [...], ...} ocr_results: Resultados de OCR de miniaturas censuradas Returns: Lista de resultados correlacionados y enriquecidos """ logger.info("Iniciando cross-referencing de resultados") # Índice de dominios domain_index = defaultdict(list) # Indexar todos los resultados por dominio for source, results in all_results.items(): for idx, result in enumerate(results): # Extraer dominio if 'url' in result: domain = self.extract_domain_from_url(result['url']) elif 'domain' in result: domain = self.normalize_domain(result['domain']) else: continue # Añadir al índice result['_original_source'] = source result['_original_index'] = idx domain_index[domain].append(result) # Si hay resultados de OCR, añadirlos al índice if ocr_results: for ocr_item in ocr_results: domain = self.normalize_domain(ocr_item.get('domain', '')) ocr_item['_is_ocr'] = True domain_index[domain].append(ocr_item) # Encontrar correlaciones cross_referenced_results = [] processed_domains = set() for domain, items in domain_index.items(): if domain in processed_domains or not domain: continue # Si hay múltiples fuentes para el mismo dominio, es una correlación sources = set(item.get('_original_source') for item in items if '_original_source' in item) has_ocr = any(item.get('_is_ocr', False) for item in items) if len(sources) > 1 or has_ocr: # Crear resultado correlacionado correlation = self._create_correlation(domain, items, sources) cross_referenced_results.append(correlation) logger.info(f"Correlación encontrada: {domain} en {sources}") processed_domains.add(domain) # Añadir resultados sin correlación pero verificados for source, results in all_results.items(): for result in results: domain = self.extract_domain_from_url(result.get('url', '')) if domain not in processed_domains: result['cross_referenced'] = False result['sources'] = [source] cross_referenced_results.append(result) # Ordenar por número de fuentes (más fuentes = más confiable) cross_referenced_results.sort( key=lambda x: ( len(x.get('sources', [])), x.get('ocr_verified', False), x.get('confidence', 0) ), reverse=True ) logger.success(f"Cross-referencing completado: {len(cross_referenced_results)} resultados procesados") return cross_referenced_results def _create_correlation(self, domain: str, items: List[Dict], sources: Set[str]) -> Dict: """ Crea un resultado correlacionado unificado. """ # Separar items de OCR y de búsqueda ocr_items = [i for i in items if i.get('_is_ocr', False)] search_items = [i for i in items if not i.get('_is_ocr', False)] # Tomar el mejor resultado de búsqueda (primero de Yandex si existe) primary_result = None for source in ['yandex', 'bing', 'google', 'pimeyes']: candidates = [i for i in search_items if i.get('_original_source') == source] if candidates: primary_result = candidates[0] break if not primary_result and search_items: primary_result = search_items[0] # Crear resultado unificado correlation = { 'domain': domain, 'cross_referenced': True, 'sources': list(sources), 'ocr_verified': len(ocr_items) > 0, 'confidence': self._calculate_correlation_confidence(sources, ocr_items), } # Añadir datos del resultado primario if primary_result: correlation.update({ 'url': primary_result.get('url'), 'thumbnail_url': primary_result.get('thumbnail_url'), 'primary_source': primary_result.get('_original_source'), }) # Añadir datos de OCR if ocr_items: correlation['ocr_data'] = { 'extracted_domains': [i.get('domain') for i in ocr_items], 'avg_confidence': sum(i.get('confidence', 0) for i in ocr_items) / len(ocr_items), 'extraction_methods': [i.get('method', 'unknown') for i in ocr_items], } # Añadir todas las URLs alternativas all_urls = [i.get('url') for i in search_items if i.get('url')] if all_urls: correlation['alternative_urls'] = list(set(all_urls)) return correlation def _calculate_correlation_confidence(self, sources: Set[str], ocr_items: List[Dict]) -> float: """ Calcula la confianza de una correlación basada en número de fuentes y OCR. Returns: Confianza entre 0.0 y 1.0 """ base_confidence = 0.5 # Bonus por cada fuente adicional (máx 0.15 por fuente) source_bonus = min(len(sources) * 0.15, 0.45) # Bonus si hay verificación OCR ocr_bonus = 0.0 if ocr_items: avg_ocr_confidence = sum(i.get('confidence', 0) for i in ocr_items) / len(ocr_items) ocr_bonus = avg_ocr_confidence * 0.2 # Máx 0.2 total_confidence = min(base_confidence + source_bonus + ocr_bonus, 1.0) return round(total_confidence, 3) def match_pimeyes_with_search(self, pimeyes_results: List[Dict], search_results: List[Dict], ocr_domains: List[str]) -> List[Dict]: """ Método especializado para correlacionar PimEyes (censurado) con búsquedas abiertas. Este es el "truco" principal: si PimEyes tiene una miniatura censurada pero el OCR detecta "ejemplo.com", y Yandex encuentra "ejemplo.com/foto.jpg", los unimos. Args: pimeyes_results: Resultados de PimEyes (censurados) search_results: Resultados de Yandex/Bing (abiertos) ocr_domains: Dominios extraídos por OCR de miniaturas de PimEyes Returns: Lista de matches con URLs desbloquedas """ logger.info("Matching PimEyes censurado con búsquedas abiertas") matches = [] for ocr_domain in ocr_domains: normalized_ocr = self.normalize_domain(ocr_domain) # Buscar en resultados de búsqueda for search_result in search_results: search_domain = self.extract_domain_from_url(search_result.get('url', '')) # Si los dominios coinciden if self.calculate_domain_similarity(normalized_ocr, search_domain) >= self.domain_threshold: match = { 'pimeyes_domain_ocr': ocr_domain, 'matched_url': search_result.get('url'), 'thumbnail_url': search_result.get('thumbnail_url'), 'source': search_result.get('source', 'unknown'), 'match_confidence': self.calculate_domain_similarity(normalized_ocr, search_domain), 'unlocked': True, # Desbloqueado! } matches.append(match) logger.success(f"✓ PimEyes censurado desbloqueado: {ocr_domain} → {search_result['url']}") return matches def deduplicate_results(self, results: List[Dict]) -> List[Dict]: """ Elimina resultados duplicados basándose en URL y hash de imagen. Args: results: Lista de resultados Returns: Lista sin duplicados """ seen_urls = set() seen_hashes = set() unique_results = [] for result in results: url = result.get('url', '') # Hash del URL url_hash = hashlib.md5(url.encode()).hexdigest() if url else None # Hash de thumbnail si existe thumb_hash = None if result.get('thumbnail_url'): thumb_hash = hashlib.md5(result['thumbnail_url'].encode()).hexdigest() # Verificar duplicados is_duplicate = False if url and url in seen_urls: is_duplicate = True if url_hash and url_hash in seen_hashes: is_duplicate = True if thumb_hash and thumb_hash in seen_hashes: is_duplicate = True if not is_duplicate: unique_results.append(result) if url: seen_urls.add(url) if url_hash: seen_hashes.add(url_hash) if thumb_hash: seen_hashes.add(thumb_hash) logger.info(f"Deduplicación: {len(results)} → {len(unique_results)} únicos") return unique_results def generate_final_report(self, cross_referenced_results: List[Dict]) -> Dict: """ Genera un reporte final unificado con estadísticas. Returns: Diccionario con reporte completo """ # Estadísticas total_results = len(cross_referenced_results) cross_ref_count = sum(1 for r in cross_referenced_results if r.get('cross_referenced', False)) ocr_verified_count = sum(1 for r in cross_referenced_results if r.get('ocr_verified', False)) # Agrupar por fuente by_source = defaultdict(int) for result in cross_referenced_results: for source in result.get('sources', []): by_source[source] += 1 # Dominios únicos unique_domains = set() for result in cross_referenced_results: domain = result.get('domain') if domain: unique_domains.add(domain) # Resultados de alta confianza (>0.8) high_confidence = [r for r in cross_referenced_results if r.get('confidence', 0) > 0.8] report = { 'summary': { 'total_results': total_results, 'cross_referenced': cross_ref_count, 'ocr_verified': ocr_verified_count, 'unique_domains': len(unique_domains), 'high_confidence_results': len(high_confidence), }, 'by_source': dict(by_source), 'results': cross_referenced_results, 'top_matches': cross_referenced_results[:10], # Top 10 } logger.info(f"Reporte generado: {total_results} resultados, {cross_ref_count} correlacionados") return report # Función de utilidad def quick_cross_reference(yandex_results: List[Dict], bing_results: List[Dict], pimeyes_ocr_domains: List[str]) -> List[Dict]: """ Función de conveniencia para correlacionar rápidamente. Args: yandex_results: Resultados de Yandex bing_results: Resultados de Bing pimeyes_ocr_domains: Dominios extraídos de PimEyes por OCR Returns: Lista de resultados correlacionados """ xref = CrossReferencer() all_results = { 'yandex': yandex_results, 'bing': bing_results, } # Convertir dominios OCR al formato esperado ocr_results = [{'domain': d, 'confidence': 0.8} for d in pimeyes_ocr_domains] return xref.find_cross_references(all_results, ocr_results) if __name__ == "__main__": # Ejemplo de uso xref = CrossReferencer() # Resultados de ejemplo yandex = [ {'url': 'https://example.com/photo1.jpg', 'source': 'yandex'}, {'url': 'https://test.com/image.png', 'source': 'yandex'}, ] bing = [ {'url': 'https://example.com/photo2.jpg', 'source': 'bing'}, {'url': 'https://another.com/pic.jpg', 'source': 'bing'}, ] ocr_domains = ['example.com', 'test.com'] # Cross-reference results = quick_cross_reference(yandex, bing, ocr_domains) print(f"\nResultados correlacionados: {len(results)}") for r in results: print(f" • {r.get('domain')} - Fuentes: {r.get('sources')} - OCR: {r.get('ocr_verified')}")