""" ╔══════════════════════════════════════════════════════════════════╗ ║ HIBP PRO MONITOR — GOD TIER EDITION ║ ║ Camoufox + Async + Smart Proxy Pool + Stealth Maximum ║ ╚══════════════════════════════════════════════════════════════════╝ """ from fastapi import FastAPI, Body, HTTPException, Header, Request from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware import gradio as gr import asyncio import random import time import hmac import hashlib import logging from typing import Optional, List, Dict, Any from datetime import datetime, timedelta from dataclasses import dataclass, field from contextlib import asynccontextmanager import aiohttp from camoufox.async_api import AsyncCamoufox import uvicorn import nest_asyncio nest_asyncio.apply() # ================================================================ # LOGGING — Clean & structured # ================================================================ logging.basicConfig( level=logging.INFO, format="%(asctime)s │ %(levelname)-7s │ %(message)s", datefmt="%H:%M:%S" ) log = logging.getLogger("hibp") # ================================================================ # CONFIGURATION # ================================================================ class Config: # --- API --- API_KEY: str = "CHANGE_ME_TO_A_STRONG_SECRET_KEY" HOST: str = "0.0.0.0" PORT: int = 7860 # --- Proxy --- PROXY_API_BASE: str = "https://voxxium-proxpy.hf.space" PROXY_FETCH_LIMIT: int = 80 PROXY_TEST_TIMEOUT: float = 6.0 PROXY_CACHE_TTL: int = 180 # seconds avant refresh du pool PROXY_MAX_CONCURRENT_TESTS: int = 50 # --- Scraper --- MAX_RETRIES: int = 8 # tentatives par email MAX_CONCURRENT_EMAILS: int = 3 # emails scrappés en parallèle PAGE_TIMEOUT: int = 35_000 # ms NAVIGATION_TIMEOUT: int = 45_000 # ms # --- Rate Limiting --- RATE_LIMIT_WINDOW: int = 60 # seconds RATE_LIMIT_MAX: int = 30 # max requests par fenêtre # ================================================================ # RATE LIMITER — In-memory sliding window # ================================================================ class RateLimiter: def __init__(self, window: int, max_requests: int): self.window = window self.max_requests = max_requests self._requests: Dict[str, List[float]] = {} def is_allowed(self, key: str) -> bool: now = time.time() if key not in self._requests: self._requests[key] = [] # Nettoyage des vieilles entrées self._requests[key] = [ t for t in self._requests[key] if t > now - self.window ] if len(self._requests[key]) >= self.max_requests: return False self._requests[key].append(now) return True # ================================================================ # PROXY POOL — Async, cached, sorted by latency # ================================================================ @dataclass class ProxyPool: _proxies: List[Dict[str, Any]] = field(default_factory=list) _last_refresh: float = 0.0 _lock: asyncio.Lock = field(default_factory=asyncio.Lock) @property def is_stale(self) -> bool: return time.time() - self._last_refresh > Config.PROXY_CACHE_TTL @property def urls(self) -> List[str]: return [p["url"] for p in self._proxies] async def _test_single( self, session: aiohttp.ClientSession, proxy_url: str ) -> Optional[Dict]: """Test un proxy via aiohttp — non bloquant.""" try: start = time.monotonic() async with session.get( "https://api.ipify.org?format=json", proxy=proxy_url, timeout=aiohttp.ClientTimeout(total=Config.PROXY_TEST_TIMEOUT), ssl=False, ) as resp: if resp.status == 200: latency = round(time.monotonic() - start, 3) return {"url": proxy_url, "latency": latency} except Exception: return None async def refresh(self) -> List[str]: """Fetch + test tous les proxies en async parallel.""" async with self._lock: if not self.is_stale and self._proxies: return self.urls log.info("🔄 Proxy pool refresh started...") try: async with aiohttp.ClientSession() as session: # 1) Fetch la liste async with session.get( f"{Config.PROXY_API_BASE}/all", params={ "protocol": "http", "verified": "true", "limit": Config.PROXY_FETCH_LIMIT, }, timeout=aiohttp.ClientTimeout(total=15), ) as resp: data = await resp.json() items = data if isinstance(data, list) else data.get("proxies", []) raw = [] for p in items: url = p.get("proxy_url") or p.get("proxy") or p.get("url") if url: raw.append(url) log.info(f" Fetched {len(raw)} raw proxies, testing...") # 2) Test en parallèle avec semaphore sem = asyncio.Semaphore(Config.PROXY_MAX_CONCURRENT_TESTS) async def _bounded_test(proxy_url: str): async with sem: return await self._test_single(session, proxy_url) results = await asyncio.gather( *[_bounded_test(p) for p in raw], return_exceptions=True, ) working = [r for r in results if isinstance(r, dict) and r is not None] working.sort(key=lambda x: x["latency"]) self._proxies = working self._last_refresh = time.time() log.info(f" ✅ {len(working)} working proxies (best: {working[0]['latency']}s)" if working else " ⚠️ No working proxies found") return self.urls except Exception as e: log.error(f" ❌ Proxy refresh failed: {e}") return self.urls # retourne le cache existant # ================================================================ # HUMAN BEHAVIOR SIMULATION # ================================================================ async def human_type(page, selector: str, text: str): """Frappe au clavier comme un humain réel — timing variable, erreurs possibles.""" locator = page.locator(selector).first await locator.wait_for(state="visible", timeout=15_000) # Click avec petit offset aléatoire box = await locator.bounding_box() if box: x = box["x"] + box["width"] * random.uniform(0.2, 0.8) y = box["y"] + box["height"] * random.uniform(0.3, 0.7) await page.mouse.click(x, y) else: await locator.click() await asyncio.sleep(random.uniform(0.3, 0.8)) for i, char in enumerate(text): # Variation de vitesse : plus lent au début, accélère au milieu base_delay = random.uniform(45, 140) # Micro-pauses aléatoires (comme un humain qui réfléchit) if random.random() < 0.08: await asyncio.sleep(random.uniform(0.2, 0.6)) await locator.type(char, delay=base_delay) async def human_delay(min_s: float = 1.0, max_s: float = 3.0): """Pause humanisée.""" await asyncio.sleep(random.uniform(min_s, max_s)) # ================================================================ # BREACH EXTRACTOR # ================================================================ async def extract_breaches(page) -> List[Dict]: """Extraction robuste des brèches depuis la timeline HIBP.""" breaches = [] try: log.info(" 📋 Waiting for breach timeline...") await page.wait_for_selector(".timeline-item", timeout=20_000) await asyncio.sleep(1.5) # Animation render items = await page.locator(".timeline-item").all() for item in items: try: # Extraction parallèle des champs name_p = item.locator(".timeline-title h5").inner_text() date_p = item.locator(".timeline-date-text").all_inner_texts() desc_p = item.locator(".timeline-content p").first.inner_text(timeout=5_000) comp_p = item.locator(".timeline-details-list li").all_inner_texts() name, date_texts, desc, comp = await asyncio.gather( name_p, date_p, desc_p, comp_p, return_exceptions=True ) breaches.append({ "name": name.strip() if isinstance(name, str) else "Unknown", "date": " ".join(date_texts).strip() if isinstance(date_texts, list) else "", "description": desc.strip() if isinstance(desc, str) else "", "compromised": [ x.strip() for x in (comp if isinstance(comp, list) else []) if x.strip() ], }) except Exception as e: log.debug(f" ⚠️ Item extraction error: {e}") continue log.info(f" ✅ {len(breaches)} breaches extracted") except Exception as e: log.warning(f" ❌ Timeline extraction failed: {e}") return breaches # ================================================================ # CORE SCRAPER ENGINE — Camoufox powered # ================================================================ async def check_single_email( email: str, proxy_pool: ProxyPool, use_proxy: bool = True, ) -> Dict: """ Vérifie un email sur HIBP avec Camoufox. Retry automatique avec rotation de proxy. """ log.info(f"🔍 Checking: {email}") proxies = [] if use_proxy: proxy_urls = await proxy_pool.refresh() proxies = random.sample(proxy_urls, min(Config.MAX_RETRIES - 1, len(proxy_urls))) # Toujours terminer par une tentative directe (sans proxy) attempts = proxies + [None] for attempt_num, proxy_url in enumerate(attempts, 1): proxy_label = proxy_url or "DIRECT" log.info(f" [{attempt_num}/{len(attempts)}] via {proxy_label}") browser = None try: # ═══════════════════════════════════════ # CAMOUFOX LAUNCH — Anti-detect maximum # ═══════════════════════════════════════ launch_kwargs = { "headless": True, "humanize": True, # Mouse movement naturel intégré "block_images": True, # Économise bande passante + vitesse "block_webrtc": True, # Empêche les leaks d'IP "os": ["windows", "macos"], # Fingerprint réaliste (pas linux = rare en vrai trafic) "i_know_what_im_doing": True, } if proxy_url: launch_kwargs["proxy"] = {"server": proxy_url} async with AsyncCamoufox(**launch_kwargs) as browser: page = await browser.new_page() # Timeouts page.set_default_timeout(Config.PAGE_TIMEOUT) page.set_default_navigation_timeout(Config.NAVIGATION_TIMEOUT) # ── Block unnecessary resources pour la SPEED ── await page.route( "**/*", lambda route: ( route.abort() if route.request.resource_type in ("image", "media", "font", "stylesheet") else route.continue_() ), ) # ── Navigation ── await page.goto( "https://haveibeenpwned.com/", wait_until="domcontentloaded", ) await human_delay(2.0, 4.5) # ── Saisie email humaine ── await human_type(page, 'input[type="email"], #emailInput', email) await human_delay(0.5, 1.5) # ── Click sur le bouton ── btn = page.locator('#checkButton, button[type="submit"]').first await btn.click() # ── Attente du résultat ── await page.wait_for_selector( '#email-result-good:not(.d-none), #email-result-bad:not(.d-none)', timeout=40_000, ) is_safe = await page.locator('#email-result-good:not(.d-none)').count() > 0 if is_safe: log.info(f" ✅ SAFE — {email}") return { "email": email, "pwned": False, "breach_count": 0, "breaches": [], "checked_at": datetime.utcnow().isoformat(), } else: breaches = await extract_breaches(page) log.info(f" 🔴 PWNED — {email} ({len(breaches)} breaches)") return { "email": email, "pwned": True, "breach_count": len(breaches), "breaches": breaches, "checked_at": datetime.utcnow().isoformat(), } except Exception as e: log.warning(f" ⚠️ Attempt {attempt_num} failed: {str(e)[:120]}") continue # Toutes les tentatives échouées log.error(f" 💀 ALL ATTEMPTS FAILED for {email}") return { "email": email, "pwned": None, "error": "All attempts failed", "checked_at": datetime.utcnow().isoformat(), } # ================================================================ # APPLICATION SETUP # ================================================================ proxy_pool = ProxyPool() rate_limiter = RateLimiter(Config.RATE_LIMIT_WINDOW, Config.RATE_LIMIT_MAX) @asynccontextmanager async def lifespan(app: FastAPI): """Pre-warm proxy pool on startup.""" log.info("🚀 HIBP Pro Monitor — GOD TIER EDITION starting...") await proxy_pool.refresh() yield log.info("👋 Shutting down...") app = FastAPI( title="HIBP Pro Monitor — GOD TIER", version="2.0.0", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["POST", "GET"], allow_headers=["*"], ) # ================================================================ # AUTH — Timing-safe token comparison # ================================================================ def verify_token(token: Optional[str]) -> bool: if not token: return False return hmac.compare_digest(token, Config.API_KEY) # ================================================================ # API ENDPOINTS # ================================================================ @app.post("/check") async def api_check( request: Request, payload: dict = Body(...), x_token: str = Header(None), ): # ── Auth ── if not verify_token(x_token): log.warning(f"🚫 Auth failed from {request.client.host}") raise HTTPException(status_code=403, detail="Invalid or missing token") # ── Rate Limiting ── client_ip = request.client.host if not rate_limiter.is_allowed(client_ip): raise HTTPException(status_code=429, detail="Rate limit exceeded") # ── Validation ── emails = payload.get("emails", []) use_proxy = payload.get("use_proxy", True) if not emails: raise HTTPException(status_code=400, detail="No emails provided") if len(emails) > 20: raise HTTPException(status_code=400, detail="Max 20 emails per request") # ── Traitement avec semaphore pour parallélisme contrôlé ── sem = asyncio.Semaphore(Config.MAX_CONCURRENT_EMAILS) async def _bounded_check(email: str): async with sem: return await check_single_email(email, proxy_pool, use_proxy) results = await asyncio.gather( *[_bounded_check(e.strip()) for e in emails if e.strip()], ) # Vérification des erreurs critiques failed = [r for r in results if r.get("pwned") is None] return JSONResponse( content={ "results": results, "total": len(results), "failed": len(failed), "timestamp": datetime.utcnow().isoformat(), }, status_code=200 if not failed else 207, # 207 = Multi-Status ) @app.get("/health") async def health(): return { "status": "operational", "proxy_pool_size": len(proxy_pool._proxies), "proxy_pool_age": round(time.time() - proxy_pool._last_refresh, 1), "uptime": "ok", } @app.post("/proxies/refresh") async def force_proxy_refresh(x_token: str = Header(None)): if not verify_token(x_token): raise HTTPException(status_code=403, detail="Unauthorized") proxy_pool._last_refresh = 0 # Force stale urls = await proxy_pool.refresh() return {"refreshed": len(urls)} # ================================================================ # GRADIO UI # ================================================================ async def gradio_check(txt: str, use_proxies: bool) -> str: if not txt.strip(): return "⚠️ Enter at least one email." emails = [e.strip() for e in txt.splitlines() if e.strip()] lines = [] for email in emails[:10]: # Limite UI result = await check_single_email(email, proxy_pool, use_proxies) if result.get("pwned") is None: lines.append(f"⚫ {email} — ERROR: {result.get('error')}") elif result["pwned"]: breach_names = ", ".join(b["name"] for b in result["breaches"][:5]) lines.append( f"🔴 {email} — PWNED ({result['breach_count']} breaches: {breach_names})" ) else: lines.append(f"🟢 {email} — SAFE ✅") return "\n".join(lines) with gr.Blocks(title="HIBP Pro Monitor", theme=gr.themes.Soft()) as demo: gr.Markdown( """ # 🛡️ HIBP Pro Monitor — GOD TIER **Camoufox-powered** breach detection with stealth anti-fingerprinting. """ ) with gr.Row(): with gr.Column(scale=2): emails_input = gr.Textbox( lines=6, label="📧 Emails (one per line)", placeholder="john@example.com\njane@test.com", ) proxy_toggle = gr.Checkbox(label="🌐 Use Proxy Rotation", value=True) check_btn = gr.Button("🔍 Check Breaches", variant="primary", size="lg") with gr.Column(scale=3): output = gr.Textbox( label="📊 Results", lines=10, interactive=False, ) check_btn.click(gradio_check, [emails_input, proxy_toggle], output) app = gr.mount_gradio_app(app, demo, path="/") # ================================================================ # ENTRYPOINT # ================================================================ if __name__ == "__main__": uvicorn.run( app, host=Config.HOST, port=Config.PORT, log_level="info", access_log=False, # On a notre propre logging )