|
|
""" |
|
|
Stealth Engine - Motor de scraping con anti-detecci贸n |
|
|
Bypasea las protecciones de sitios como PimEyes, OnlyFans, etc. |
|
|
""" |
|
|
|
|
|
from playwright.async_api import async_playwright, Browser, Page |
|
|
from playwright_stealth import stealth_async |
|
|
from typing import List, Dict, Optional |
|
|
import asyncio |
|
|
import random |
|
|
from loguru import logger |
|
|
from fake_useragent import UserAgent |
|
|
import json |
|
|
|
|
|
|
|
|
class StealthSearch: |
|
|
""" |
|
|
Motor de b煤squeda con capacidades de evasi贸n anti-bot. |
|
|
Implementa t茅cnicas avanzadas para parecer un usuario real. |
|
|
""" |
|
|
|
|
|
|
|
|
USER_AGENTS = [ |
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
|
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0', |
|
|
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15', |
|
|
] |
|
|
|
|
|
def __init__(self, headless: bool = True, proxy: Optional[str] = None): |
|
|
""" |
|
|
Inicializa el motor de b煤squeda stealth. |
|
|
|
|
|
Args: |
|
|
headless: Ejecutar navegador sin GUI |
|
|
proxy: Proxy a usar (formato: "http://ip:port") |
|
|
""" |
|
|
self.headless = headless |
|
|
self.proxy = proxy |
|
|
self.ua_generator = UserAgent() |
|
|
|
|
|
async def _create_stealth_browser(self) -> tuple[Browser, Page]: |
|
|
""" |
|
|
Crea un navegador con todas las protecciones anti-detecci贸n activadas. |
|
|
""" |
|
|
playwright = await async_playwright().start() |
|
|
|
|
|
|
|
|
launch_options = { |
|
|
'headless': self.headless, |
|
|
'args': [ |
|
|
'--disable-blink-features=AutomationControlled', |
|
|
'--disable-dev-shm-usage', |
|
|
'--no-sandbox', |
|
|
'--disable-setuid-sandbox', |
|
|
'--disable-web-security', |
|
|
'--disable-features=IsolateOrigins,site-per-process', |
|
|
] |
|
|
} |
|
|
|
|
|
if self.proxy: |
|
|
launch_options['proxy'] = {'server': self.proxy} |
|
|
|
|
|
browser = await playwright.chromium.launch(**launch_options) |
|
|
|
|
|
|
|
|
context = await browser.new_context( |
|
|
user_agent=random.choice(self.USER_AGENTS), |
|
|
viewport={'width': 1920, 'height': 1080}, |
|
|
locale='en-US', |
|
|
timezone_id='America/New_York', |
|
|
permissions=['geolocation'], |
|
|
geolocation={'latitude': 40.7128, 'longitude': -74.0060}, |
|
|
color_scheme='light', |
|
|
device_scale_factor=1, |
|
|
) |
|
|
|
|
|
|
|
|
page = await context.new_page() |
|
|
|
|
|
|
|
|
await stealth_async(page) |
|
|
|
|
|
|
|
|
await self._inject_evasion_scripts(page) |
|
|
|
|
|
logger.info("Navegador stealth creado exitosamente") |
|
|
|
|
|
return browser, page |
|
|
|
|
|
async def _inject_evasion_scripts(self, page: Page): |
|
|
""" |
|
|
Inyecta scripts JavaScript para evadir detecci贸n adicional. |
|
|
""" |
|
|
|
|
|
await page.add_init_script(""" |
|
|
Object.defineProperty(navigator, 'webdriver', { |
|
|
get: () => undefined |
|
|
}); |
|
|
""") |
|
|
|
|
|
|
|
|
await page.add_init_script(""" |
|
|
Object.defineProperty(navigator, 'plugins', { |
|
|
get: () => [1, 2, 3, 4, 5] |
|
|
}); |
|
|
""") |
|
|
|
|
|
|
|
|
await page.add_init_script(""" |
|
|
Object.defineProperty(navigator, 'languages', { |
|
|
get: () => ['en-US', 'en'] |
|
|
}); |
|
|
""") |
|
|
|
|
|
|
|
|
await page.add_init_script(""" |
|
|
window.chrome = { |
|
|
runtime: {} |
|
|
}; |
|
|
""") |
|
|
|
|
|
|
|
|
await page.add_init_script(""" |
|
|
const originalQuery = window.navigator.permissions.query; |
|
|
window.navigator.permissions.query = (parameters) => ( |
|
|
parameters.name === 'notifications' ? |
|
|
Promise.resolve({ state: Notification.permission }) : |
|
|
originalQuery(parameters) |
|
|
); |
|
|
""") |
|
|
|
|
|
async def _human_behavior(self, page: Page): |
|
|
""" |
|
|
Simula comportamiento humano: movimientos de mouse, scrolls, etc. |
|
|
""" |
|
|
|
|
|
await page.evaluate(""" |
|
|
window.scrollTo({ |
|
|
top: Math.random() * 500, |
|
|
behavior: 'smooth' |
|
|
}); |
|
|
""") |
|
|
|
|
|
|
|
|
await asyncio.sleep(random.uniform(0.5, 2.0)) |
|
|
|
|
|
|
|
|
await page.mouse.move( |
|
|
random.randint(100, 500), |
|
|
random.randint(100, 500) |
|
|
) |
|
|
|
|
|
async def search_pimeyes_free(self, image_path: str) -> List[Dict]: |
|
|
""" |
|
|
Busca en PimEyes sin pagar, extrayendo las miniaturas censuradas. |
|
|
|
|
|
Args: |
|
|
image_path: Ruta a la imagen a buscar |
|
|
|
|
|
Returns: |
|
|
Lista de resultados con miniaturas y datos extra铆bles |
|
|
""" |
|
|
logger.info("Iniciando b煤squeda stealth en PimEyes") |
|
|
|
|
|
browser, page = await self._create_stealth_browser() |
|
|
results = [] |
|
|
|
|
|
try: |
|
|
|
|
|
await page.goto('https://pimeyes.com/en', wait_until='networkidle') |
|
|
logger.info("P谩gina PimEyes cargada") |
|
|
|
|
|
|
|
|
await self._human_behavior(page) |
|
|
|
|
|
|
|
|
try: |
|
|
await page.click('button:has-text("Accept")', timeout=3000) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
upload_button = await page.query_selector('input[type="file"]') |
|
|
|
|
|
if upload_button: |
|
|
|
|
|
await upload_button.set_input_files(image_path) |
|
|
logger.info("Imagen subida, esperando resultados...") |
|
|
|
|
|
|
|
|
await page.wait_for_selector('.results-container', timeout=30000) |
|
|
|
|
|
|
|
|
for _ in range(3): |
|
|
await page.evaluate('window.scrollBy(0, 500)') |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
thumbnails = await page.query_selector_all('.result-item img') |
|
|
|
|
|
for idx, thumb in enumerate(thumbnails): |
|
|
try: |
|
|
|
|
|
thumb_url = await thumb.get_attribute('src') |
|
|
|
|
|
|
|
|
parent = await thumb.evaluate_handle('el => el.closest(".result-item")') |
|
|
parent_html = await parent.inner_html() |
|
|
|
|
|
|
|
|
text_content = await parent.inner_text() |
|
|
|
|
|
|
|
|
screenshot = await thumb.screenshot() |
|
|
|
|
|
results.append({ |
|
|
'thumbnail_url': thumb_url, |
|
|
'index': idx, |
|
|
'text_content': text_content, |
|
|
'screenshot': screenshot, |
|
|
'source': 'pimeyes', |
|
|
'censored': 'blur' in parent_html.lower() or 'premium' in parent_html.lower() |
|
|
}) |
|
|
|
|
|
logger.debug(f"Miniatura {idx} extra铆da") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error extrayendo miniatura {idx}: {e}") |
|
|
continue |
|
|
|
|
|
logger.success(f"PimEyes: {len(results)} miniaturas extra铆das") |
|
|
|
|
|
else: |
|
|
logger.error("No se encontr贸 el bot贸n de upload en PimEyes") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error en b煤squeda de PimEyes: {e}") |
|
|
|
|
|
finally: |
|
|
await browser.close() |
|
|
|
|
|
return results |
|
|
|
|
|
async def search_yandex_reverse(self, image_path: str) -> List[Dict]: |
|
|
""" |
|
|
B煤squeda reversa en Yandex Images con stealth. |
|
|
|
|
|
Args: |
|
|
image_path: Ruta a la imagen |
|
|
|
|
|
Returns: |
|
|
Lista de resultados |
|
|
""" |
|
|
logger.info("Iniciando b煤squeda stealth en Yandex") |
|
|
|
|
|
browser, page = await self._create_stealth_browser() |
|
|
results = [] |
|
|
|
|
|
try: |
|
|
|
|
|
await page.goto('https://yandex.com/images/', wait_until='networkidle') |
|
|
|
|
|
|
|
|
await self._human_behavior(page) |
|
|
|
|
|
|
|
|
try: |
|
|
camera_button = await page.query_selector('.cbir-panel__button') |
|
|
await camera_button.click() |
|
|
await asyncio.sleep(1) |
|
|
except: |
|
|
logger.warning("No se pudo hacer click en bot贸n de c谩mara") |
|
|
|
|
|
|
|
|
file_input = await page.query_selector('input[type="file"]') |
|
|
if file_input: |
|
|
await file_input.set_input_files(image_path) |
|
|
logger.info("Imagen subida a Yandex") |
|
|
|
|
|
|
|
|
await page.wait_for_selector('.serp-item', timeout=15000) |
|
|
|
|
|
|
|
|
for _ in range(5): |
|
|
await page.evaluate('window.scrollBy(0, 800)') |
|
|
await asyncio.sleep(0.5) |
|
|
|
|
|
|
|
|
items = await page.query_selector_all('.serp-item') |
|
|
|
|
|
for idx, item in enumerate(items[:50]): |
|
|
try: |
|
|
|
|
|
link_elem = await item.query_selector('a.serp-item__link') |
|
|
url = await link_elem.get_attribute('href') if link_elem else None |
|
|
|
|
|
|
|
|
img_elem = await item.query_selector('img.serp-item__thumb') |
|
|
thumb_url = await img_elem.get_attribute('src') if img_elem else None |
|
|
|
|
|
|
|
|
domain_elem = await item.query_selector('.serp-item__domain') |
|
|
domain = await domain_elem.inner_text() if domain_elem else None |
|
|
|
|
|
if url: |
|
|
results.append({ |
|
|
'url': url, |
|
|
'thumbnail_url': thumb_url, |
|
|
'domain': domain, |
|
|
'source': 'yandex', |
|
|
'index': idx |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Error extrayendo item {idx}: {e}") |
|
|
continue |
|
|
|
|
|
logger.success(f"Yandex: {len(results)} resultados extra铆dos") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error en b煤squeda de Yandex: {e}") |
|
|
|
|
|
finally: |
|
|
await browser.close() |
|
|
|
|
|
return results |
|
|
|
|
|
async def search_bing_reverse(self, image_path: str) -> List[Dict]: |
|
|
""" |
|
|
B煤squeda reversa en Bing Images con stealth. |
|
|
""" |
|
|
logger.info("Iniciando b煤squeda stealth en Bing") |
|
|
|
|
|
browser, page = await self._create_stealth_browser() |
|
|
results = [] |
|
|
|
|
|
try: |
|
|
|
|
|
await page.goto('https://www.bing.com/images', wait_until='networkidle') |
|
|
|
|
|
await self._human_behavior(page) |
|
|
|
|
|
|
|
|
try: |
|
|
camera_icon = await page.query_selector('.cameraIcon') |
|
|
await camera_icon.click() |
|
|
await asyncio.sleep(1) |
|
|
except: |
|
|
logger.warning("No se encontr贸 icono de c谩mara en Bing") |
|
|
|
|
|
|
|
|
file_input = await page.query_selector('input[type="file"]') |
|
|
if file_input: |
|
|
await file_input.set_input_files(image_path) |
|
|
|
|
|
|
|
|
await page.wait_for_selector('.imgpt', timeout=15000) |
|
|
|
|
|
|
|
|
for _ in range(3): |
|
|
await page.evaluate('window.scrollBy(0, 1000)') |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
items = await page.query_selector_all('.imgpt') |
|
|
|
|
|
for idx, item in enumerate(items[:50]): |
|
|
try: |
|
|
link_elem = await item.query_selector('a') |
|
|
url = await link_elem.get_attribute('href') if link_elem else None |
|
|
|
|
|
img_elem = await item.query_selector('img') |
|
|
thumb_url = await img_elem.get_attribute('src') if img_elem else None |
|
|
|
|
|
if url: |
|
|
results.append({ |
|
|
'url': url, |
|
|
'thumbnail_url': thumb_url, |
|
|
'source': 'bing', |
|
|
'index': idx |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.debug(f"Error: {e}") |
|
|
continue |
|
|
|
|
|
logger.success(f"Bing: {len(results)} resultados") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error en Bing: {e}") |
|
|
|
|
|
finally: |
|
|
await browser.close() |
|
|
|
|
|
return results |
|
|
|
|
|
async def search_all_engines(self, image_path: str) -> Dict[str, List[Dict]]: |
|
|
""" |
|
|
Busca en todos los motores simult谩neamente. |
|
|
|
|
|
Args: |
|
|
image_path: Ruta a la imagen |
|
|
|
|
|
Returns: |
|
|
Diccionario con resultados por motor |
|
|
""" |
|
|
logger.info("Iniciando b煤squeda multi-motor") |
|
|
|
|
|
|
|
|
tasks = [ |
|
|
self.search_pimeyes_free(image_path), |
|
|
self.search_yandex_reverse(image_path), |
|
|
self.search_bing_reverse(image_path), |
|
|
] |
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
all_results = { |
|
|
'pimeyes': results[0] if not isinstance(results[0], Exception) else [], |
|
|
'yandex': results[1] if not isinstance(results[1], Exception) else [], |
|
|
'bing': results[2] if not isinstance(results[2], Exception) else [], |
|
|
} |
|
|
|
|
|
total = sum(len(v) for v in all_results.values()) |
|
|
logger.success(f"Total de resultados: {total}") |
|
|
|
|
|
return all_results |
|
|
|
|
|
|
|
|
async def test_stealth(): |
|
|
""" |
|
|
Funci贸n de prueba |
|
|
""" |
|
|
stealth = StealthSearch(headless=True) |
|
|
|
|
|
|
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
|
|
|
test_img = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8) |
|
|
Image.fromarray(test_img).save('/tmp/test.jpg') |
|
|
|
|
|
|
|
|
results = await stealth.search_pimeyes_free('/tmp/test.jpg') |
|
|
print(f"PimEyes: {len(results)} resultados") |
|
|
|
|
|
|
|
|
results = await stealth.search_yandex_reverse('/tmp/test.jpg') |
|
|
print(f"Yandex: {len(results)} resultados") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(test_stealth()) |
|
|
|