""" Stealth Engine - Motor de scraping con anti-detección Bypasea las protecciones de sitios como PimEyes, OnlyFans, etc. """ from playwright.async_api import async_playwright, Browser, Page from playwright_stealth import stealth_async from typing import List, Dict, Optional import asyncio import random from loguru import logger from fake_useragent import UserAgent import json class StealthSearch: """ Motor de búsqueda con capacidades de evasión anti-bot. Implementa técnicas avanzadas para parecer un usuario real. """ # User agents rotativos USER_AGENTS = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15', ] def __init__(self, headless: bool = True, proxy: Optional[str] = None): """ Inicializa el motor de búsqueda stealth. Args: headless: Ejecutar navegador sin GUI proxy: Proxy a usar (formato: "http://ip:port") """ self.headless = headless self.proxy = proxy self.ua_generator = UserAgent() async def _create_stealth_browser(self) -> tuple[Browser, Page]: """ Crea un navegador con todas las protecciones anti-detección activadas. """ playwright = await async_playwright().start() # Configuración del navegador launch_options = { 'headless': self.headless, 'args': [ '--disable-blink-features=AutomationControlled', '--disable-dev-shm-usage', '--no-sandbox', '--disable-setuid-sandbox', '--disable-web-security', '--disable-features=IsolateOrigins,site-per-process', ] } if self.proxy: launch_options['proxy'] = {'server': self.proxy} browser = await playwright.chromium.launch(**launch_options) # Crear contexto con fingerprint realista context = await browser.new_context( user_agent=random.choice(self.USER_AGENTS), viewport={'width': 1920, 'height': 1080}, locale='en-US', timezone_id='America/New_York', permissions=['geolocation'], geolocation={'latitude': 40.7128, 'longitude': -74.0060}, # NYC color_scheme='light', device_scale_factor=1, ) # Crear página page = await context.new_page() # Aplicar playwright-stealth await stealth_async(page) # Inyectar scripts adicionales de evasión await self._inject_evasion_scripts(page) logger.info("Navegador stealth creado exitosamente") return browser, page async def _inject_evasion_scripts(self, page: Page): """ Inyecta scripts JavaScript para evadir detección adicional. """ # Sobrescribir navigator.webdriver await page.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); """) # Sobrescribir navigator.plugins await page.add_init_script(""" Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5] }); """) # Sobrescribir navigator.languages await page.add_init_script(""" Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] }); """) # Chrome runtime mock await page.add_init_script(""" window.chrome = { runtime: {} }; """) # Permissions mock await page.add_init_script(""" const originalQuery = window.navigator.permissions.query; window.navigator.permissions.query = (parameters) => ( parameters.name === 'notifications' ? Promise.resolve({ state: Notification.permission }) : originalQuery(parameters) ); """) async def _human_behavior(self, page: Page): """ Simula comportamiento humano: movimientos de mouse, scrolls, etc. """ # Scroll aleatorio await page.evaluate(""" window.scrollTo({ top: Math.random() * 500, behavior: 'smooth' }); """) # Espera aleatoria await asyncio.sleep(random.uniform(0.5, 2.0)) # Movimiento de mouse aleatorio await page.mouse.move( random.randint(100, 500), random.randint(100, 500) ) async def search_pimeyes_free(self, image_path: str) -> List[Dict]: """ Busca en PimEyes sin pagar, extrayendo las miniaturas censuradas. Args: image_path: Ruta a la imagen a buscar Returns: Lista de resultados con miniaturas y datos extraíbles """ logger.info("Iniciando búsqueda stealth en PimEyes") browser, page = await self._create_stealth_browser() results = [] try: # Navegar a PimEyes await page.goto('https://pimeyes.com/en', wait_until='networkidle') logger.info("Página PimEyes cargada") # Simular comportamiento humano await self._human_behavior(page) # Aceptar cookies si aparecen try: await page.click('button:has-text("Accept")', timeout=3000) except: pass # Buscar el botón de upload upload_button = await page.query_selector('input[type="file"]') if upload_button: # Subir imagen await upload_button.set_input_files(image_path) logger.info("Imagen subida, esperando resultados...") # Esperar a que carguen los resultados await page.wait_for_selector('.results-container', timeout=30000) # Simular scroll para que carguen más imágenes for _ in range(3): await page.evaluate('window.scrollBy(0, 500)') await asyncio.sleep(1) # Extraer miniaturas thumbnails = await page.query_selector_all('.result-item img') for idx, thumb in enumerate(thumbnails): try: # Extraer URL de la miniatura thumb_url = await thumb.get_attribute('src') # Extraer contenedor padre para obtener metadata parent = await thumb.evaluate_handle('el => el.closest(".result-item")') parent_html = await parent.inner_html() # Buscar texto visible (puede contener dominio) text_content = await parent.inner_text() # Tomar screenshot de la miniatura individual screenshot = await thumb.screenshot() results.append({ 'thumbnail_url': thumb_url, 'index': idx, 'text_content': text_content, 'screenshot': screenshot, 'source': 'pimeyes', 'censored': 'blur' in parent_html.lower() or 'premium' in parent_html.lower() }) logger.debug(f"Miniatura {idx} extraída") except Exception as e: logger.warning(f"Error extrayendo miniatura {idx}: {e}") continue logger.success(f"PimEyes: {len(results)} miniaturas extraídas") else: logger.error("No se encontró el botón de upload en PimEyes") except Exception as e: logger.error(f"Error en búsqueda de PimEyes: {e}") finally: await browser.close() return results async def search_yandex_reverse(self, image_path: str) -> List[Dict]: """ Búsqueda reversa en Yandex Images con stealth. Args: image_path: Ruta a la imagen Returns: Lista de resultados """ logger.info("Iniciando búsqueda stealth en Yandex") browser, page = await self._create_stealth_browser() results = [] try: # Navegar a Yandex Images await page.goto('https://yandex.com/images/', wait_until='networkidle') # Simular comportamiento humano await self._human_behavior(page) # Click en el botón de búsqueda por imagen try: camera_button = await page.query_selector('.cbir-panel__button') await camera_button.click() await asyncio.sleep(1) except: logger.warning("No se pudo hacer click en botón de cámara") # Subir imagen file_input = await page.query_selector('input[type="file"]') if file_input: await file_input.set_input_files(image_path) logger.info("Imagen subida a Yandex") # Esperar resultados await page.wait_for_selector('.serp-item', timeout=15000) # Scroll para cargar más resultados for _ in range(5): await page.evaluate('window.scrollBy(0, 800)') await asyncio.sleep(0.5) # Extraer resultados items = await page.query_selector_all('.serp-item') for idx, item in enumerate(items[:50]): try: # Extraer link link_elem = await item.query_selector('a.serp-item__link') url = await link_elem.get_attribute('href') if link_elem else None # Extraer miniatura img_elem = await item.query_selector('img.serp-item__thumb') thumb_url = await img_elem.get_attribute('src') if img_elem else None # Extraer dominio domain_elem = await item.query_selector('.serp-item__domain') domain = await domain_elem.inner_text() if domain_elem else None if url: results.append({ 'url': url, 'thumbnail_url': thumb_url, 'domain': domain, 'source': 'yandex', 'index': idx }) except Exception as e: logger.debug(f"Error extrayendo item {idx}: {e}") continue logger.success(f"Yandex: {len(results)} resultados extraídos") except Exception as e: logger.error(f"Error en búsqueda de Yandex: {e}") finally: await browser.close() return results async def search_bing_reverse(self, image_path: str) -> List[Dict]: """ Búsqueda reversa en Bing Images con stealth. """ logger.info("Iniciando búsqueda stealth en Bing") browser, page = await self._create_stealth_browser() results = [] try: # Navegar a Bing Images await page.goto('https://www.bing.com/images', wait_until='networkidle') await self._human_behavior(page) # Click en búsqueda por imagen try: camera_icon = await page.query_selector('.cameraIcon') await camera_icon.click() await asyncio.sleep(1) except: logger.warning("No se encontró icono de cámara en Bing") # Subir imagen file_input = await page.query_selector('input[type="file"]') if file_input: await file_input.set_input_files(image_path) # Esperar resultados await page.wait_for_selector('.imgpt', timeout=15000) # Scroll for _ in range(3): await page.evaluate('window.scrollBy(0, 1000)') await asyncio.sleep(1) # Extraer resultados items = await page.query_selector_all('.imgpt') for idx, item in enumerate(items[:50]): try: link_elem = await item.query_selector('a') url = await link_elem.get_attribute('href') if link_elem else None img_elem = await item.query_selector('img') thumb_url = await img_elem.get_attribute('src') if img_elem else None if url: results.append({ 'url': url, 'thumbnail_url': thumb_url, 'source': 'bing', 'index': idx }) except Exception as e: logger.debug(f"Error: {e}") continue logger.success(f"Bing: {len(results)} resultados") except Exception as e: logger.error(f"Error en Bing: {e}") finally: await browser.close() return results async def search_all_engines(self, image_path: str) -> Dict[str, List[Dict]]: """ Busca en todos los motores simultáneamente. Args: image_path: Ruta a la imagen Returns: Diccionario con resultados por motor """ logger.info("Iniciando búsqueda multi-motor") # Ejecutar búsquedas en paralelo tasks = [ self.search_pimeyes_free(image_path), self.search_yandex_reverse(image_path), self.search_bing_reverse(image_path), ] results = await asyncio.gather(*tasks, return_exceptions=True) all_results = { 'pimeyes': results[0] if not isinstance(results[0], Exception) else [], 'yandex': results[1] if not isinstance(results[1], Exception) else [], 'bing': results[2] if not isinstance(results[2], Exception) else [], } total = sum(len(v) for v in all_results.values()) logger.success(f"Total de resultados: {total}") return all_results async def test_stealth(): """ Función de prueba """ stealth = StealthSearch(headless=True) # Crear imagen de prueba import numpy as np from PIL import Image test_img = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8) Image.fromarray(test_img).save('/tmp/test.jpg') # Probar PimEyes results = await stealth.search_pimeyes_free('/tmp/test.jpg') print(f"PimEyes: {len(results)} resultados") # Probar Yandex results = await stealth.search_yandex_reverse('/tmp/test.jpg') print(f"Yandex: {len(results)} resultados") if __name__ == "__main__": asyncio.run(test_stealth())