| | """ |
| | Cross-Referencer - Correlación inteligente de resultados entre múltiples motores |
| | Este módulo es la clave para unir hallazgos de Yandex, Bing y PimEyes. |
| | """ |
| |
|
| | from typing import List, Dict, Set, Tuple |
| | from urllib.parse import urlparse, parse_qs |
| | import re |
| | from difflib import SequenceMatcher |
| | from collections import defaultdict |
| | from loguru import logger |
| | import hashlib |
| |
|
| |
|
| | class CrossReferencer: |
| | """ |
| | Sistema de correlación que une resultados de múltiples fuentes. |
| | Si Yandex encuentra una foto y el OCR de PimEyes detecta el mismo dominio, |
| | este módulo los vincula automáticamente. |
| | """ |
| | |
| | def __init__(self, domain_similarity_threshold: float = 0.85): |
| | """ |
| | Args: |
| | domain_similarity_threshold: Umbral de similitud para considerar dominios iguales (0.0-1.0) |
| | """ |
| | self.domain_threshold = domain_similarity_threshold |
| | self.domain_cache = {} |
| | |
| | def normalize_domain(self, url_or_domain: str) -> str: |
| | """ |
| | Normaliza un dominio o URL para comparación. |
| | |
| | Args: |
| | url_or_domain: URL completa o dominio |
| | |
| | Returns: |
| | Dominio normalizado |
| | """ |
| | |
| | if url_or_domain in self.domain_cache: |
| | return self.domain_cache[url_or_domain] |
| | |
| | |
| | cleaned = url_or_domain.lower().strip() |
| | |
| | |
| | if cleaned.startswith(('http://', 'https://')): |
| | parsed = urlparse(cleaned) |
| | domain = parsed.netloc |
| | else: |
| | domain = cleaned |
| | |
| | |
| | domain = re.sub(r'^www\.', '', domain) |
| | |
| | |
| | domain = re.sub(r':\d+$', '', domain) |
| | |
| | |
| | domain = re.sub(r'^(m\.|mobile\.|static\.|cdn\.)', '', domain) |
| | |
| | |
| | self.domain_cache[url_or_domain] = domain |
| | |
| | return domain |
| | |
| | def extract_domain_from_url(self, url: str) -> str: |
| | """ |
| | Extrae el dominio principal de una URL. |
| | """ |
| | try: |
| | parsed = urlparse(url) |
| | domain = parsed.netloc |
| | |
| | |
| | domain = re.sub(r'^www\.', '', domain) |
| | |
| | |
| | parts = domain.split('.') |
| | if len(parts) >= 2: |
| | return '.'.join(parts[-2:]) |
| | |
| | return domain |
| | |
| | except Exception as e: |
| | logger.debug(f"Error extrayendo dominio de {url}: {e}") |
| | return "" |
| | |
| | def calculate_domain_similarity(self, domain1: str, domain2: str) -> float: |
| | """ |
| | Calcula la similitud entre dos dominios. |
| | |
| | Returns: |
| | Similitud entre 0.0 y 1.0 |
| | """ |
| | |
| | d1 = self.normalize_domain(domain1) |
| | d2 = self.normalize_domain(domain2) |
| | |
| | |
| | if d1 == d2: |
| | return 1.0 |
| | |
| | |
| | similarity = SequenceMatcher(None, d1, d2).ratio() |
| | |
| | return similarity |
| | |
| | def find_cross_references(self, all_results: Dict[str, List[Dict]], |
| | ocr_results: Dict = None) -> List[Dict]: |
| | """ |
| | Encuentra correlaciones entre resultados de diferentes motores. |
| | |
| | Args: |
| | all_results: Diccionario con resultados por motor {'yandex': [...], 'bing': [...], ...} |
| | ocr_results: Resultados de OCR de miniaturas censuradas |
| | |
| | Returns: |
| | Lista de resultados correlacionados y enriquecidos |
| | """ |
| | logger.info("Iniciando cross-referencing de resultados") |
| | |
| | |
| | domain_index = defaultdict(list) |
| | |
| | |
| | for source, results in all_results.items(): |
| | for idx, result in enumerate(results): |
| | |
| | if 'url' in result: |
| | domain = self.extract_domain_from_url(result['url']) |
| | elif 'domain' in result: |
| | domain = self.normalize_domain(result['domain']) |
| | else: |
| | continue |
| | |
| | |
| | result['_original_source'] = source |
| | result['_original_index'] = idx |
| | domain_index[domain].append(result) |
| | |
| | |
| | if ocr_results: |
| | for ocr_item in ocr_results: |
| | domain = self.normalize_domain(ocr_item.get('domain', '')) |
| | ocr_item['_is_ocr'] = True |
| | domain_index[domain].append(ocr_item) |
| | |
| | |
| | cross_referenced_results = [] |
| | processed_domains = set() |
| | |
| | for domain, items in domain_index.items(): |
| | if domain in processed_domains or not domain: |
| | continue |
| | |
| | |
| | sources = set(item.get('_original_source') for item in items if '_original_source' in item) |
| | has_ocr = any(item.get('_is_ocr', False) for item in items) |
| | |
| | if len(sources) > 1 or has_ocr: |
| | |
| | correlation = self._create_correlation(domain, items, sources) |
| | cross_referenced_results.append(correlation) |
| | |
| | logger.info(f"Correlación encontrada: {domain} en {sources}") |
| | |
| | processed_domains.add(domain) |
| | |
| | |
| | for source, results in all_results.items(): |
| | for result in results: |
| | domain = self.extract_domain_from_url(result.get('url', '')) |
| | if domain not in processed_domains: |
| | result['cross_referenced'] = False |
| | result['sources'] = [source] |
| | cross_referenced_results.append(result) |
| | |
| | |
| | cross_referenced_results.sort( |
| | key=lambda x: ( |
| | len(x.get('sources', [])), |
| | x.get('ocr_verified', False), |
| | x.get('confidence', 0) |
| | ), |
| | reverse=True |
| | ) |
| | |
| | logger.success(f"Cross-referencing completado: {len(cross_referenced_results)} resultados procesados") |
| | |
| | return cross_referenced_results |
| | |
| | def _create_correlation(self, domain: str, items: List[Dict], sources: Set[str]) -> Dict: |
| | """ |
| | Crea un resultado correlacionado unificado. |
| | """ |
| | |
| | ocr_items = [i for i in items if i.get('_is_ocr', False)] |
| | search_items = [i for i in items if not i.get('_is_ocr', False)] |
| | |
| | |
| | primary_result = None |
| | for source in ['yandex', 'bing', 'google', 'pimeyes']: |
| | candidates = [i for i in search_items if i.get('_original_source') == source] |
| | if candidates: |
| | primary_result = candidates[0] |
| | break |
| | |
| | if not primary_result and search_items: |
| | primary_result = search_items[0] |
| | |
| | |
| | correlation = { |
| | 'domain': domain, |
| | 'cross_referenced': True, |
| | 'sources': list(sources), |
| | 'ocr_verified': len(ocr_items) > 0, |
| | 'confidence': self._calculate_correlation_confidence(sources, ocr_items), |
| | } |
| | |
| | |
| | if primary_result: |
| | correlation.update({ |
| | 'url': primary_result.get('url'), |
| | 'thumbnail_url': primary_result.get('thumbnail_url'), |
| | 'primary_source': primary_result.get('_original_source'), |
| | }) |
| | |
| | |
| | if ocr_items: |
| | correlation['ocr_data'] = { |
| | 'extracted_domains': [i.get('domain') for i in ocr_items], |
| | 'avg_confidence': sum(i.get('confidence', 0) for i in ocr_items) / len(ocr_items), |
| | 'extraction_methods': [i.get('method', 'unknown') for i in ocr_items], |
| | } |
| | |
| | |
| | all_urls = [i.get('url') for i in search_items if i.get('url')] |
| | if all_urls: |
| | correlation['alternative_urls'] = list(set(all_urls)) |
| | |
| | return correlation |
| | |
| | def _calculate_correlation_confidence(self, sources: Set[str], ocr_items: List[Dict]) -> float: |
| | """ |
| | Calcula la confianza de una correlación basada en número de fuentes y OCR. |
| | |
| | Returns: |
| | Confianza entre 0.0 y 1.0 |
| | """ |
| | base_confidence = 0.5 |
| | |
| | |
| | source_bonus = min(len(sources) * 0.15, 0.45) |
| | |
| | |
| | ocr_bonus = 0.0 |
| | if ocr_items: |
| | avg_ocr_confidence = sum(i.get('confidence', 0) for i in ocr_items) / len(ocr_items) |
| | ocr_bonus = avg_ocr_confidence * 0.2 |
| | |
| | total_confidence = min(base_confidence + source_bonus + ocr_bonus, 1.0) |
| | |
| | return round(total_confidence, 3) |
| | |
| | def match_pimeyes_with_search(self, pimeyes_results: List[Dict], |
| | search_results: List[Dict], |
| | ocr_domains: List[str]) -> List[Dict]: |
| | """ |
| | Método especializado para correlacionar PimEyes (censurado) con búsquedas abiertas. |
| | |
| | Este es el "truco" principal: si PimEyes tiene una miniatura censurada pero el OCR |
| | detecta "ejemplo.com", y Yandex encuentra "ejemplo.com/foto.jpg", los unimos. |
| | |
| | Args: |
| | pimeyes_results: Resultados de PimEyes (censurados) |
| | search_results: Resultados de Yandex/Bing (abiertos) |
| | ocr_domains: Dominios extraídos por OCR de miniaturas de PimEyes |
| | |
| | Returns: |
| | Lista de matches con URLs desbloquedas |
| | """ |
| | logger.info("Matching PimEyes censurado con búsquedas abiertas") |
| | |
| | matches = [] |
| | |
| | for ocr_domain in ocr_domains: |
| | normalized_ocr = self.normalize_domain(ocr_domain) |
| | |
| | |
| | for search_result in search_results: |
| | search_domain = self.extract_domain_from_url(search_result.get('url', '')) |
| | |
| | |
| | if self.calculate_domain_similarity(normalized_ocr, search_domain) >= self.domain_threshold: |
| | match = { |
| | 'pimeyes_domain_ocr': ocr_domain, |
| | 'matched_url': search_result.get('url'), |
| | 'thumbnail_url': search_result.get('thumbnail_url'), |
| | 'source': search_result.get('source', 'unknown'), |
| | 'match_confidence': self.calculate_domain_similarity(normalized_ocr, search_domain), |
| | 'unlocked': True, |
| | } |
| | |
| | matches.append(match) |
| | logger.success(f"✓ PimEyes censurado desbloqueado: {ocr_domain} → {search_result['url']}") |
| | |
| | return matches |
| | |
| | def deduplicate_results(self, results: List[Dict]) -> List[Dict]: |
| | """ |
| | Elimina resultados duplicados basándose en URL y hash de imagen. |
| | |
| | Args: |
| | results: Lista de resultados |
| | |
| | Returns: |
| | Lista sin duplicados |
| | """ |
| | seen_urls = set() |
| | seen_hashes = set() |
| | unique_results = [] |
| | |
| | for result in results: |
| | url = result.get('url', '') |
| | |
| | |
| | url_hash = hashlib.md5(url.encode()).hexdigest() if url else None |
| | |
| | |
| | thumb_hash = None |
| | if result.get('thumbnail_url'): |
| | thumb_hash = hashlib.md5(result['thumbnail_url'].encode()).hexdigest() |
| | |
| | |
| | is_duplicate = False |
| | |
| | if url and url in seen_urls: |
| | is_duplicate = True |
| | |
| | if url_hash and url_hash in seen_hashes: |
| | is_duplicate = True |
| | |
| | if thumb_hash and thumb_hash in seen_hashes: |
| | is_duplicate = True |
| | |
| | if not is_duplicate: |
| | unique_results.append(result) |
| | |
| | if url: |
| | seen_urls.add(url) |
| | if url_hash: |
| | seen_hashes.add(url_hash) |
| | if thumb_hash: |
| | seen_hashes.add(thumb_hash) |
| | |
| | logger.info(f"Deduplicación: {len(results)} → {len(unique_results)} únicos") |
| | |
| | return unique_results |
| | |
| | def generate_final_report(self, cross_referenced_results: List[Dict]) -> Dict: |
| | """ |
| | Genera un reporte final unificado con estadísticas. |
| | |
| | Returns: |
| | Diccionario con reporte completo |
| | """ |
| | |
| | total_results = len(cross_referenced_results) |
| | cross_ref_count = sum(1 for r in cross_referenced_results if r.get('cross_referenced', False)) |
| | ocr_verified_count = sum(1 for r in cross_referenced_results if r.get('ocr_verified', False)) |
| | |
| | |
| | by_source = defaultdict(int) |
| | for result in cross_referenced_results: |
| | for source in result.get('sources', []): |
| | by_source[source] += 1 |
| | |
| | |
| | unique_domains = set() |
| | for result in cross_referenced_results: |
| | domain = result.get('domain') |
| | if domain: |
| | unique_domains.add(domain) |
| | |
| | |
| | high_confidence = [r for r in cross_referenced_results if r.get('confidence', 0) > 0.8] |
| | |
| | report = { |
| | 'summary': { |
| | 'total_results': total_results, |
| | 'cross_referenced': cross_ref_count, |
| | 'ocr_verified': ocr_verified_count, |
| | 'unique_domains': len(unique_domains), |
| | 'high_confidence_results': len(high_confidence), |
| | }, |
| | 'by_source': dict(by_source), |
| | 'results': cross_referenced_results, |
| | 'top_matches': cross_referenced_results[:10], |
| | } |
| | |
| | logger.info(f"Reporte generado: {total_results} resultados, {cross_ref_count} correlacionados") |
| | |
| | return report |
| |
|
| |
|
| | |
| | def quick_cross_reference(yandex_results: List[Dict], |
| | bing_results: List[Dict], |
| | pimeyes_ocr_domains: List[str]) -> List[Dict]: |
| | """ |
| | Función de conveniencia para correlacionar rápidamente. |
| | |
| | Args: |
| | yandex_results: Resultados de Yandex |
| | bing_results: Resultados de Bing |
| | pimeyes_ocr_domains: Dominios extraídos de PimEyes por OCR |
| | |
| | Returns: |
| | Lista de resultados correlacionados |
| | """ |
| | xref = CrossReferencer() |
| | |
| | all_results = { |
| | 'yandex': yandex_results, |
| | 'bing': bing_results, |
| | } |
| | |
| | |
| | ocr_results = [{'domain': d, 'confidence': 0.8} for d in pimeyes_ocr_domains] |
| | |
| | return xref.find_cross_references(all_results, ocr_results) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | |
| | xref = CrossReferencer() |
| | |
| | |
| | yandex = [ |
| | {'url': 'https://example.com/photo1.jpg', 'source': 'yandex'}, |
| | {'url': 'https://test.com/image.png', 'source': 'yandex'}, |
| | ] |
| | |
| | bing = [ |
| | {'url': 'https://example.com/photo2.jpg', 'source': 'bing'}, |
| | {'url': 'https://another.com/pic.jpg', 'source': 'bing'}, |
| | ] |
| | |
| | ocr_domains = ['example.com', 'test.com'] |
| | |
| | |
| | results = quick_cross_reference(yandex, bing, ocr_domains) |
| | |
| | print(f"\nResultados correlacionados: {len(results)}") |
| | for r in results: |
| | print(f" • {r.get('domain')} - Fuentes: {r.get('sources')} - OCR: {r.get('ocr_verified')}") |
| |
|