J
File size: 16,767 Bytes
85fa7d2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
"""
Stealth Engine - Motor de scraping con anti-detección
Bypasea las protecciones de sitios como PimEyes, OnlyFans, etc.
"""

from playwright.async_api import async_playwright, Browser, Page
from playwright_stealth import stealth_async
from typing import List, Dict, Optional
import asyncio
import random
from loguru import logger
from fake_useragent import UserAgent
import json


class StealthSearch:
    """
    Motor de búsqueda con capacidades de evasión anti-bot.
    Implementa técnicas avanzadas para parecer un usuario real.
    """
    
    # User agents rotativos
    USER_AGENTS = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15',
    ]
    
    def __init__(self, headless: bool = True, proxy: Optional[str] = None):
        """
        Inicializa el motor de búsqueda stealth.
        
        Args:
            headless: Ejecutar navegador sin GUI
            proxy: Proxy a usar (formato: "http://ip:port")
        """
        self.headless = headless
        self.proxy = proxy
        self.ua_generator = UserAgent()
        
    async def _create_stealth_browser(self) -> tuple[Browser, Page]:
        """
        Crea un navegador con todas las protecciones anti-detección activadas.
        """
        playwright = await async_playwright().start()
        
        # Configuración del navegador
        launch_options = {
            'headless': self.headless,
            'args': [
                '--disable-blink-features=AutomationControlled',
                '--disable-dev-shm-usage',
                '--no-sandbox',
                '--disable-setuid-sandbox',
                '--disable-web-security',
                '--disable-features=IsolateOrigins,site-per-process',
            ]
        }
        
        if self.proxy:
            launch_options['proxy'] = {'server': self.proxy}
        
        browser = await playwright.chromium.launch(**launch_options)
        
        # Crear contexto con fingerprint realista
        context = await browser.new_context(
            user_agent=random.choice(self.USER_AGENTS),
            viewport={'width': 1920, 'height': 1080},
            locale='en-US',
            timezone_id='America/New_York',
            permissions=['geolocation'],
            geolocation={'latitude': 40.7128, 'longitude': -74.0060},  # NYC
            color_scheme='light',
            device_scale_factor=1,
        )
        
        # Crear página
        page = await context.new_page()
        
        # Aplicar playwright-stealth
        await stealth_async(page)
        
        # Inyectar scripts adicionales de evasión
        await self._inject_evasion_scripts(page)
        
        logger.info("Navegador stealth creado exitosamente")
        
        return browser, page
    
    async def _inject_evasion_scripts(self, page: Page):
        """
        Inyecta scripts JavaScript para evadir detección adicional.
        """
        # Sobrescribir navigator.webdriver
        await page.add_init_script("""
            Object.defineProperty(navigator, 'webdriver', {
                get: () => undefined
            });
        """)
        
        # Sobrescribir navigator.plugins
        await page.add_init_script("""
            Object.defineProperty(navigator, 'plugins', {
                get: () => [1, 2, 3, 4, 5]
            });
        """)
        
        # Sobrescribir navigator.languages
        await page.add_init_script("""
            Object.defineProperty(navigator, 'languages', {
                get: () => ['en-US', 'en']
            });
        """)
        
        # Chrome runtime mock
        await page.add_init_script("""
            window.chrome = {
                runtime: {}
            };
        """)
        
        # Permissions mock
        await page.add_init_script("""
            const originalQuery = window.navigator.permissions.query;
            window.navigator.permissions.query = (parameters) => (
                parameters.name === 'notifications' ?
                    Promise.resolve({ state: Notification.permission }) :
                    originalQuery(parameters)
            );
        """)
    
    async def _human_behavior(self, page: Page):
        """
        Simula comportamiento humano: movimientos de mouse, scrolls, etc.
        """
        # Scroll aleatorio
        await page.evaluate("""
            window.scrollTo({
                top: Math.random() * 500,
                behavior: 'smooth'
            });
        """)
        
        # Espera aleatoria
        await asyncio.sleep(random.uniform(0.5, 2.0))
        
        # Movimiento de mouse aleatorio
        await page.mouse.move(
            random.randint(100, 500),
            random.randint(100, 500)
        )
    
    async def search_pimeyes_free(self, image_path: str) -> List[Dict]:
        """
        Busca en PimEyes sin pagar, extrayendo las miniaturas censuradas.
        
        Args:
            image_path: Ruta a la imagen a buscar
            
        Returns:
            Lista de resultados con miniaturas y datos extraíbles
        """
        logger.info("Iniciando búsqueda stealth en PimEyes")
        
        browser, page = await self._create_stealth_browser()
        results = []
        
        try:
            # Navegar a PimEyes
            await page.goto('https://pimeyes.com/en', wait_until='networkidle')
            logger.info("Página PimEyes cargada")
            
            # Simular comportamiento humano
            await self._human_behavior(page)
            
            # Aceptar cookies si aparecen
            try:
                await page.click('button:has-text("Accept")', timeout=3000)
            except:
                pass
            
            # Buscar el botón de upload
            upload_button = await page.query_selector('input[type="file"]')
            
            if upload_button:
                # Subir imagen
                await upload_button.set_input_files(image_path)
                logger.info("Imagen subida, esperando resultados...")
                
                # Esperar a que carguen los resultados
                await page.wait_for_selector('.results-container', timeout=30000)
                
                # Simular scroll para que carguen más imágenes
                for _ in range(3):
                    await page.evaluate('window.scrollBy(0, 500)')
                    await asyncio.sleep(1)
                
                # Extraer miniaturas
                thumbnails = await page.query_selector_all('.result-item img')
                
                for idx, thumb in enumerate(thumbnails):
                    try:
                        # Extraer URL de la miniatura
                        thumb_url = await thumb.get_attribute('src')
                        
                        # Extraer contenedor padre para obtener metadata
                        parent = await thumb.evaluate_handle('el => el.closest(".result-item")')
                        parent_html = await parent.inner_html()
                        
                        # Buscar texto visible (puede contener dominio)
                        text_content = await parent.inner_text()
                        
                        # Tomar screenshot de la miniatura individual
                        screenshot = await thumb.screenshot()
                        
                        results.append({
                            'thumbnail_url': thumb_url,
                            'index': idx,
                            'text_content': text_content,
                            'screenshot': screenshot,
                            'source': 'pimeyes',
                            'censored': 'blur' in parent_html.lower() or 'premium' in parent_html.lower()
                        })
                        
                        logger.debug(f"Miniatura {idx} extraída")
                        
                    except Exception as e:
                        logger.warning(f"Error extrayendo miniatura {idx}: {e}")
                        continue
                
                logger.success(f"PimEyes: {len(results)} miniaturas extraídas")
            
            else:
                logger.error("No se encontró el botón de upload en PimEyes")
        
        except Exception as e:
            logger.error(f"Error en búsqueda de PimEyes: {e}")
        
        finally:
            await browser.close()
        
        return results
    
    async def search_yandex_reverse(self, image_path: str) -> List[Dict]:
        """
        Búsqueda reversa en Yandex Images con stealth.
        
        Args:
            image_path: Ruta a la imagen
            
        Returns:
            Lista de resultados
        """
        logger.info("Iniciando búsqueda stealth en Yandex")
        
        browser, page = await self._create_stealth_browser()
        results = []
        
        try:
            # Navegar a Yandex Images
            await page.goto('https://yandex.com/images/', wait_until='networkidle')
            
            # Simular comportamiento humano
            await self._human_behavior(page)
            
            # Click en el botón de búsqueda por imagen
            try:
                camera_button = await page.query_selector('.cbir-panel__button')
                await camera_button.click()
                await asyncio.sleep(1)
            except:
                logger.warning("No se pudo hacer click en botón de cámara")
            
            # Subir imagen
            file_input = await page.query_selector('input[type="file"]')
            if file_input:
                await file_input.set_input_files(image_path)
                logger.info("Imagen subida a Yandex")
                
                # Esperar resultados
                await page.wait_for_selector('.serp-item', timeout=15000)
                
                # Scroll para cargar más resultados
                for _ in range(5):
                    await page.evaluate('window.scrollBy(0, 800)')
                    await asyncio.sleep(0.5)
                
                # Extraer resultados
                items = await page.query_selector_all('.serp-item')
                
                for idx, item in enumerate(items[:50]):
                    try:
                        # Extraer link
                        link_elem = await item.query_selector('a.serp-item__link')
                        url = await link_elem.get_attribute('href') if link_elem else None
                        
                        # Extraer miniatura
                        img_elem = await item.query_selector('img.serp-item__thumb')
                        thumb_url = await img_elem.get_attribute('src') if img_elem else None
                        
                        # Extraer dominio
                        domain_elem = await item.query_selector('.serp-item__domain')
                        domain = await domain_elem.inner_text() if domain_elem else None
                        
                        if url:
                            results.append({
                                'url': url,
                                'thumbnail_url': thumb_url,
                                'domain': domain,
                                'source': 'yandex',
                                'index': idx
                            })
                        
                    except Exception as e:
                        logger.debug(f"Error extrayendo item {idx}: {e}")
                        continue
                
                logger.success(f"Yandex: {len(results)} resultados extraídos")
        
        except Exception as e:
            logger.error(f"Error en búsqueda de Yandex: {e}")
        
        finally:
            await browser.close()
        
        return results
    
    async def search_bing_reverse(self, image_path: str) -> List[Dict]:
        """
        Búsqueda reversa en Bing Images con stealth.
        """
        logger.info("Iniciando búsqueda stealth en Bing")
        
        browser, page = await self._create_stealth_browser()
        results = []
        
        try:
            # Navegar a Bing Images
            await page.goto('https://www.bing.com/images', wait_until='networkidle')
            
            await self._human_behavior(page)
            
            # Click en búsqueda por imagen
            try:
                camera_icon = await page.query_selector('.cameraIcon')
                await camera_icon.click()
                await asyncio.sleep(1)
            except:
                logger.warning("No se encontró icono de cámara en Bing")
            
            # Subir imagen
            file_input = await page.query_selector('input[type="file"]')
            if file_input:
                await file_input.set_input_files(image_path)
                
                # Esperar resultados
                await page.wait_for_selector('.imgpt', timeout=15000)
                
                # Scroll
                for _ in range(3):
                    await page.evaluate('window.scrollBy(0, 1000)')
                    await asyncio.sleep(1)
                
                # Extraer resultados
                items = await page.query_selector_all('.imgpt')
                
                for idx, item in enumerate(items[:50]):
                    try:
                        link_elem = await item.query_selector('a')
                        url = await link_elem.get_attribute('href') if link_elem else None
                        
                        img_elem = await item.query_selector('img')
                        thumb_url = await img_elem.get_attribute('src') if img_elem else None
                        
                        if url:
                            results.append({
                                'url': url,
                                'thumbnail_url': thumb_url,
                                'source': 'bing',
                                'index': idx
                            })
                    
                    except Exception as e:
                        logger.debug(f"Error: {e}")
                        continue
                
                logger.success(f"Bing: {len(results)} resultados")
        
        except Exception as e:
            logger.error(f"Error en Bing: {e}")
        
        finally:
            await browser.close()
        
        return results
    
    async def search_all_engines(self, image_path: str) -> Dict[str, List[Dict]]:
        """
        Busca en todos los motores simultáneamente.
        
        Args:
            image_path: Ruta a la imagen
            
        Returns:
            Diccionario con resultados por motor
        """
        logger.info("Iniciando búsqueda multi-motor")
        
        # Ejecutar búsquedas en paralelo
        tasks = [
            self.search_pimeyes_free(image_path),
            self.search_yandex_reverse(image_path),
            self.search_bing_reverse(image_path),
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        all_results = {
            'pimeyes': results[0] if not isinstance(results[0], Exception) else [],
            'yandex': results[1] if not isinstance(results[1], Exception) else [],
            'bing': results[2] if not isinstance(results[2], Exception) else [],
        }
        
        total = sum(len(v) for v in all_results.values())
        logger.success(f"Total de resultados: {total}")
        
        return all_results


async def test_stealth():
    """
    Función de prueba
    """
    stealth = StealthSearch(headless=True)
    
    # Crear imagen de prueba
    import numpy as np
    from PIL import Image
    
    test_img = np.random.randint(0, 255, (200, 200, 3), dtype=np.uint8)
    Image.fromarray(test_img).save('/tmp/test.jpg')
    
    # Probar PimEyes
    results = await stealth.search_pimeyes_free('/tmp/test.jpg')
    print(f"PimEyes: {len(results)} resultados")
    
    # Probar Yandex
    results = await stealth.search_yandex_reverse('/tmp/test.jpg')
    print(f"Yandex: {len(results)} resultados")


if __name__ == "__main__":
    asyncio.run(test_stealth())