{token[:20]}...
| Name | Key | RPM Limit | Created | Action |
|---|---|---|---|---|
| No keys found | ||||
import asyncio import json import re import uuid import time import secrets import base64 import mimetypes from collections import defaultdict from typing import Optional, Dict, List from datetime import datetime, timezone, timedelta import uvicorn # nodriver for undetectable browser automation (replaces Camoufox) try: import nodriver HAS_NODRIVER = True except ImportError: HAS_NODRIVER = False print("=" * 60) print("❌ ERROR: nodriver not installed!") print("") print(" PROBLEM: nodriver is required for reCAPTCHA bypassing.") print("") print(" SOLUTION:") print(" 1. Run: pip install nodriver") print(" 2. Restart LMArenaBridge") print("=" * 60) from fastapi import FastAPI, HTTPException, Depends, status, Form, Request, Response, Header from starlette.responses import HTMLResponse, RedirectResponse, StreamingResponse from fastapi.security import APIKeyHeader import httpx # curl_cffi for TLS fingerprint mimicking (bypasses Cloudflare JA3 detection) try: from curl_cffi.requests import AsyncSession as CurlAsyncSession HAS_CURL_CFFI = True except ImportError: HAS_CURL_CFFI = False CurlAsyncSession = None print("⚠️ curl_cffi not installed. Install with: pip install curl_cffi") print(" (Falling back to httpx - may trigger bot detection)") # ============================================================ # CONFIGURATION # ============================================================ # Set to True for detailed logging, False for minimal logging DEBUG = True # Port to run the server on import os PORT = int(os.environ.get("PORT", 7860)) HEADLESS = os.environ.get("HEADLESS", "false").lower() == "true" # HTTP Status Codes class HTTPStatus: # 1xx Informational CONTINUE = 100 SWITCHING_PROTOCOLS = 101 PROCESSING = 102 EARLY_HINTS = 103 # 2xx Success OK = 200 CREATED = 201 ACCEPTED = 202 NON_AUTHORITATIVE_INFORMATION = 203 NO_CONTENT = 204 RESET_CONTENT = 205 PARTIAL_CONTENT = 206 MULTI_STATUS = 207 # 3xx Redirection MULTIPLE_CHOICES = 300 MOVED_PERMANENTLY = 301 MOVED_TEMPORARILY = 302 SEE_OTHER = 303 NOT_MODIFIED = 304 USE_PROXY = 305 TEMPORARY_REDIRECT = 307 PERMANENT_REDIRECT = 308 # 4xx Client Errors BAD_REQUEST = 400 UNAUTHORIZED = 401 PAYMENT_REQUIRED = 402 FORBIDDEN = 403 NOT_FOUND = 404 METHOD_NOT_ALLOWED = 405 NOT_ACCEPTABLE = 406 PROXY_AUTHENTICATION_REQUIRED = 407 REQUEST_TIMEOUT = 408 CONFLICT = 409 GONE = 410 LENGTH_REQUIRED = 411 PRECONDITION_FAILED = 412 REQUEST_TOO_LONG = 413 REQUEST_URI_TOO_LONG = 414 UNSUPPORTED_MEDIA_TYPE = 415 REQUESTED_RANGE_NOT_SATISFIABLE = 416 EXPECTATION_FAILED = 417 IM_A_TEAPOT = 418 INSUFFICIENT_SPACE_ON_RESOURCE = 419 METHOD_FAILURE = 420 MISDIRECTED_REQUEST = 421 UNPROCESSABLE_ENTITY = 422 LOCKED = 423 FAILED_DEPENDENCY = 424 UPGRADE_REQUIRED = 426 PRECONDITION_REQUIRED = 428 TOO_MANY_REQUESTS = 429 REQUEST_HEADER_FIELDS_TOO_LARGE = 431 UNAVAILABLE_FOR_LEGAL_REASONS = 451 # 5xx Server Errors INTERNAL_SERVER_ERROR = 500 NOT_IMPLEMENTED = 501 BAD_GATEWAY = 502 SERVICE_UNAVAILABLE = 503 GATEWAY_TIMEOUT = 504 HTTP_VERSION_NOT_SUPPORTED = 505 INSUFFICIENT_STORAGE = 507 NETWORK_AUTHENTICATION_REQUIRED = 511 # Status code descriptions for logging STATUS_MESSAGES = { 100: "Continue", 101: "Switching Protocols", 102: "Processing", 103: "Early Hints", 200: "OK - Success", 201: "Created", 202: "Accepted", 203: "Non-Authoritative Information", 204: "No Content", 205: "Reset Content", 206: "Partial Content", 207: "Multi-Status", 300: "Multiple Choices", 301: "Moved Permanently", 302: "Moved Temporarily", 303: "See Other", 304: "Not Modified", 305: "Use Proxy", 307: "Temporary Redirect", 308: "Permanent Redirect", 400: "Bad Request - Invalid request syntax", 401: "Unauthorized - Invalid or expired token", 402: "Payment Required", 403: "Forbidden - Access denied", 404: "Not Found - Resource doesn't exist", 405: "Method Not Allowed", 406: "Not Acceptable", 407: "Proxy Authentication Required", 408: "Request Timeout", 409: "Conflict", 410: "Gone - Resource permanently deleted", 411: "Length Required", 412: "Precondition Failed", 413: "Request Too Long - Payload too large", 414: "Request URI Too Long", 415: "Unsupported Media Type", 416: "Requested Range Not Satisfiable", 417: "Expectation Failed", 418: "I'm a Teapot", 419: "Insufficient Space on Resource", 420: "Method Failure", 421: "Misdirected Request", 422: "Unprocessable Entity", 423: "Locked", 424: "Failed Dependency", 426: "Upgrade Required", 428: "Precondition Required", 429: "Too Many Requests - Rate limit exceeded", 431: "Request Header Fields Too Large", 451: "Unavailable For Legal Reasons", 500: "Internal Server Error", 501: "Not Implemented", 502: "Bad Gateway", 503: "Service Unavailable", 504: "Gateway Timeout", 505: "HTTP Version Not Supported", 507: "Insufficient Storage", 511: "Network Authentication Required" } def get_status_emoji(status_code: int) -> str: """Get emoji for status code""" if 200 <= status_code < 300: return "✅" elif 300 <= status_code < 400: return "↪️" elif 400 <= status_code < 500: if status_code == 401: return "🔒" elif status_code == 403: return "🚫" elif status_code == 404: return "❓" elif status_code == 429: return "⏱️" return "⚠️" elif 500 <= status_code < 600: return "❌" return "ℹ️" def log_http_status(status_code: int, context: str = ""): """Log HTTP status with readable message""" emoji = get_status_emoji(status_code) message = STATUS_MESSAGES.get(status_code, f"Unknown Status {status_code}") if context: debug_print(f"{emoji} HTTP {status_code}: {message} ({context})") else: debug_print(f"{emoji} HTTP {status_code}: {message}") # ============================================================ def debug_print(*args, **kwargs): """Print debug messages only if DEBUG is True""" if DEBUG: print(*args, **kwargs) # --- New reCAPTCHA Functions --- # Updated constants from gpt4free/g4f/Provider/needs_auth/LMArena.py RECAPTCHA_SITEKEY = "6Led_uYrAAAAAKjxDIF58fgFtX3t8loNAK85bW9I" RECAPTCHA_ACTION = "chat_submit" async def initialize_nodriver_browser(): """ Opens a visible Chrome browser and navigates to LMArena. User must solve CAPTCHA manually. Browser stays open for session duration. """ global NODRIVER_BROWSER, NODRIVER_TAB, BROWSER_READY if not HAS_NODRIVER: print("=" * 60) print("❌ ERROR: Chrome browser not found!") print("") print(" PROBLEM: nodriver requires Google Chrome to be installed.") print("") print(" SOLUTION:") print(" 1. Download Chrome from: https://www.google.com/chrome/") print(" 2. Install Chrome") print(" 3. Restart LMArenaBridge") print("=" * 60) return False if BROWSER_READY and NODRIVER_TAB is not None: debug_print(" └── Browser already initialized, reusing session") return True print("") print("🌐 STEP 1/3: Launching Chrome browser...") print(" ├── Looking for Chrome installation...") # Create chrome profile directory path (for persistent login) import os chrome_profile_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "chrome_profile") try: # Start nodriver with visible browser and PERSISTENT profile NODRIVER_BROWSER = await nodriver.start( headless=HEADLESS, # Toggleable via environment variable user_data_dir=chrome_profile_dir, # 💾 Saves login across restarts! browser_args=[ '--disable-blink-features=AutomationControlled', '--no-first-run', '--no-default-browser-check', '--no-sandbox', '--disable-dev-shm-usage', ] ) print(" ├── ✅ Chrome launched successfully") print(f" ├── 💾 Using persistent profile: {chrome_profile_dir}") print(" └── 🔄 Navigating to lmarena.ai...") # Navigate to LMArena NODRIVER_TAB = await NODRIVER_BROWSER.get("https://arena.ai/?mode=direct") # Capture User-Agent from the actual browser global USER_AGENT try: ua = await NODRIVER_TAB.evaluate("navigator.userAgent") if ua: USER_AGENT = ua debug_print(f"🕵️ Captured User-Agent: {USER_AGENT[:50]}...") except Exception as e: debug_print(f"⚠️ Failed to captures User-Agent: {e}") # Wait for page to settle await asyncio.sleep(3) print("") print("⏳ STEP 2/3: Waiting for CAPTCHA verification...") print(" ┌────────────────────────────────────────────────────────┐") print(" │ 👆 ACTION REQUIRED: Please click the reCAPTCHA │") print(" │ checkbox in the Chrome window that just opened! │") print(" │ │") print(" │ ⏱️ Timeout in 120 seconds... │") print(" └────────────────────────────────────────────────────────┘") # Wait for reCAPTCHA library to load and get first token captcha_solved = await wait_for_recaptcha_ready(timeout=120) if captcha_solved: print("") print("✅ STEP 2/3: CAPTCHA verified successfully!") BROWSER_READY = True return True else: print("") print("❌ ERROR: CAPTCHA verification timed out (120 seconds)") print("") print(" PROBLEM: You didn't click the reCAPTCHA checkbox in time.") print("") print(" SOLUTION:") print(" 1. Restart the server: python src/main.py") print(" 2. When Chrome opens, quickly click the \"I'm not a robot\" checkbox") print(" 3. You have 2 minutes to complete this") return False except Exception as e: print(f"❌ ERROR: Failed to launch Chrome browser!") print(f" └── Details: {e}") print("") print(" POSSIBLE CAUSES:") print(" 1. Chrome not installed → Install from google.com/chrome") print(" 2. Chrome in use by another process → Close other Chrome windows") print(" 3. Permission issue → Run as administrator") return False async def wait_for_recaptcha_ready(timeout: int = 120) -> bool: """ Wait for user to complete CAPTCHA verification. Returns True when reCAPTCHA library is loaded and we can get tokens. """ global NODRIVER_TAB, RECAPTCHA_TOKEN, RECAPTCHA_EXPIRY start_time = time.time() last_status_time = 0 while time.time() - start_time < timeout: elapsed = int(time.time() - start_time) # Print status every 10 seconds if elapsed > 0 and elapsed % 10 == 0 and elapsed != last_status_time: last_status_time = elapsed remaining = timeout - elapsed print(f"⏳ Waiting for CAPTCHA... ({elapsed}s elapsed, {remaining}s remaining)") try: # Check if grecaptcha enterprise is available lib_ready = await NODRIVER_TAB.evaluate( "!!(window.grecaptcha && window.grecaptcha.enterprise)" ) if lib_ready: # Try to get a token debug_print(" └── reCAPTCHA library detected, requesting token...") token = await get_recaptcha_token_from_browser() if token: RECAPTCHA_TOKEN = token RECAPTCHA_EXPIRY = datetime.now(timezone.utc) + timedelta(seconds=110) print(f" └── reCAPTCHA token acquired ({len(token)} chars)") return True except Exception as e: debug_print(f" └── Check failed (normal during load): {e}") await asyncio.sleep(2) return False async def get_recaptcha_token_from_browser() -> Optional[str]: """ Gets a reCAPTCHA token from the persistent browser session. Uses a side-channel approach: sets global variable, triggers execute, polls for result. """ global NODRIVER_TAB if NODRIVER_TAB is None: debug_print("❌ Browser tab not available") return None try: # Step 1: Initialize the global variable await NODRIVER_TAB.evaluate("window.__recaptcha_token = 'PENDING';") # Step 2: Trigger the reCAPTCHA execution (don't await the Promise) trigger_script = f""" (function() {{ try {{ window.grecaptcha.enterprise.execute('{RECAPTCHA_SITEKEY}', {{ action: '{RECAPTCHA_ACTION}' }}) .then(function(token) {{ window.__recaptcha_token = token; }}) .catch(function(err) {{ window.__recaptcha_token = 'ERROR: ' + err.toString(); }}); }} catch (e) {{ window.__recaptcha_token = 'SYNC_ERROR: ' + e.toString(); }} }})(); """ await NODRIVER_TAB.evaluate(trigger_script) # Step 3: Poll for the result for i in range(15): # Max 15 seconds await asyncio.sleep(1) result = await NODRIVER_TAB.evaluate("window.__recaptcha_token") if result and result != 'PENDING': if isinstance(result, str) and result.startswith('ERROR'): debug_print(f" └── JS Error: {result}") return None elif isinstance(result, str) and result.startswith('SYNC_ERROR'): debug_print(f" └── Sync Error: {result}") return None elif isinstance(result, str) and len(result) > 100: # Valid token! return result else: debug_print(f" └── Unexpected result: {result}") return None debug_print(" └── Token polling timed out") return None except Exception as e: debug_print(f" └── Token request failed: {e}") return None async def get_recaptcha_v3_token() -> Optional[str]: """ Gets reCAPTCHA v3 token using the persistent nodriver browser session. If browser not initialized, returns None. """ global RECAPTCHA_TOKEN, RECAPTCHA_EXPIRY, BROWSER_READY if not BROWSER_READY or NODRIVER_TAB is None: debug_print("❌ Browser not ready. Token refresh unavailable.") print("") print("❌ ERROR: Browser connection lost!") print("") print(" PROBLEM: The Chrome window was closed or crashed.") print("") print(" SOLUTION:") print(" 1. Restart the server: python src/main.py") print(" 2. When Chrome opens, click the CAPTCHA") print(" 3. DO NOT close the Chrome window while using the bridge") return None current_time = datetime.now(timezone.utc).strftime("%H:%M:%S") debug_print(f"🔄 [{current_time}] Token refresh triggered") debug_print(" ├── Requesting new reCAPTCHA token...") token = await get_recaptcha_token_from_browser() if token: RECAPTCHA_TOKEN = token RECAPTCHA_EXPIRY = datetime.now(timezone.utc) + timedelta(seconds=110) next_refresh = (datetime.now(timezone.utc) + timedelta(seconds=100)).strftime("%H:%M:%S") debug_print(f" ├── ✅ New token acquired ({len(token)} chars)") debug_print(f" └── Next refresh at: {next_refresh}") return token else: debug_print(" └── ❌ Failed to get token") return None async def refresh_recaptcha_token() -> Optional[str]: """ Gets a FRESH reCAPTCHA token for each request. IMPORTANT: reCAPTCHA tokens are SINGLE-USE per Google docs. Once a token is verified by the server, it becomes immediately invalid. We MUST get a fresh token for every LMArena API request. """ global RECAPTCHA_TOKEN, RECAPTCHA_EXPIRY current_time = datetime.now(timezone.utc) time_str = current_time.strftime("%H:%M:%S") debug_print(f"🔄 [{time_str}] Getting fresh reCAPTCHA token (tokens are single-use)...") # ALWAYS get a fresh token - tokens are single-use! for attempt in range(1, 4): new_token = await get_recaptcha_v3_token() if new_token: RECAPTCHA_TOKEN = new_token RECAPTCHA_EXPIRY = current_time + timedelta(seconds=110) debug_print(f"✅ [{time_str}] Fresh token acquired ({len(new_token)} chars)") return new_token if attempt < 3: wait_time = attempt * 2 # Shorter waits: 2s, 4s debug_print(f"⚠️ Token fetch failed (attempt {attempt}/3), retrying in {wait_time}s...") await asyncio.sleep(wait_time) # All attempts failed print("") print("❌ ERROR: Token refresh failed after 3 attempts!") print("") print(" PROBLEM: Cannot acquire new reCAPTCHA token.") print("") print(" SOLUTION:") print(" 1. Check the Chrome window - you may need to solve CAPTCHA again") print(" 2. If Chrome is unresponsive, restart the server") return None # --- End New reCAPTCHA Functions --- # Custom UUIDv7 implementation (using correct Unix epoch) def uuid7(): """ Generate a UUIDv7 using Unix epoch (milliseconds since 1970-01-01) matching the browser's implementation. """ timestamp_ms = int(time.time() * 1000) rand_a = secrets.randbits(12) rand_b = secrets.randbits(62) uuid_int = timestamp_ms << 80 uuid_int |= (0x7000 | rand_a) << 64 uuid_int |= (0x8000000000000000 | rand_b) hex_str = f"{uuid_int:032x}" return f"{hex_str[0:8]}-{hex_str[8:12]}-{hex_str[12:16]}-{hex_str[16:20]}-{hex_str[20:32]}" # Image upload helper functions async def upload_image_to_lmarena(image_data: bytes, mime_type: str, filename: str) -> Optional[tuple]: """ Upload an image to LMArena R2 storage and return the key and download URL. Args: image_data: Binary image data mime_type: MIME type of the image (e.g., 'image/png') filename: Original filename for the image Returns: Tuple of (key, download_url) if successful, or None if upload fails """ try: # Validate inputs if not image_data: debug_print("❌ Image data is empty") return None if not mime_type or not mime_type.startswith('image/'): debug_print(f"❌ Invalid MIME type: {mime_type}") return None # Step 1: Request upload URL debug_print(f"📤 Step 1: Requesting upload URL for {filename}") # Get Next-Action IDs from config config = get_config() upload_action_id = config.get("next_action_upload") signed_url_action_id = config.get("next_action_signed_url") if not upload_action_id or not signed_url_action_id: debug_print("❌ Next-Action IDs not found in config. Please refresh tokens from dashboard.") return None # Prepare headers for Next.js Server Action request_headers = get_request_headers() request_headers.update({ "Accept": "text/x-component", "Content-Type": "text/plain;charset=UTF-8", "Next-Action": upload_action_id, "Referer": "https://arena.ai/?mode=direct", }) async with httpx.AsyncClient() as client: try: response = await client.post( "https://arena.ai/?mode=direct", headers=request_headers, content=json.dumps([filename, mime_type]), timeout=30.0 ) response.raise_for_status() except httpx.TimeoutException: debug_print("❌ Timeout while requesting upload URL") return None except httpx.HTTPError as e: debug_print(f"❌ HTTP error while requesting upload URL: {e}") return None # Parse response - format: 0:{...}\n1:{...}\n try: lines = response.text.strip().split('\n') upload_data = None for line in lines: if line.startswith('1:'): upload_data = json.loads(line[2:]) break if not upload_data or not upload_data.get('success'): debug_print(f"❌ Failed to get upload URL: {response.text[:200]}") return None upload_url = upload_data['data']['uploadUrl'] key = upload_data['data']['key'] debug_print(f"✅ Got upload URL and key: {key}") except (json.JSONDecodeError, KeyError, IndexError) as e: debug_print(f"❌ Failed to parse upload URL response: {e}") return None # Step 2: Upload image to R2 storage debug_print(f"📤 Step 2: Uploading image to R2 storage ({len(image_data)} bytes)") try: response = await client.put( upload_url, content=image_data, headers={"Content-Type": mime_type}, timeout=60.0 ) response.raise_for_status() debug_print(f"✅ Image uploaded successfully") except httpx.TimeoutException: debug_print("❌ Timeout while uploading image to R2 storage") return None except httpx.HTTPError as e: debug_print(f"❌ HTTP error while uploading image: {e}") return None # Step 3: Get signed download URL (uses different Next-Action) debug_print(f"📤 Step 3: Requesting signed download URL") request_headers_step3 = request_headers.copy() request_headers_step3["Next-Action"] = signed_url_action_id try: response = await client.post( "https://arena.ai/?mode=direct", headers=request_headers_step3, content=json.dumps([key]), timeout=30.0 ) response.raise_for_status() except httpx.TimeoutException: debug_print("❌ Timeout while requesting download URL") return None except httpx.HTTPError as e: debug_print(f"❌ HTTP error while requesting download URL: {e}") return None # Parse response try: lines = response.text.strip().split('\n') download_data = None for line in lines: if line.startswith('1:'): download_data = json.loads(line[2:]) break if not download_data or not download_data.get('success'): debug_print(f"❌ Failed to get download URL: {response.text[:200]}") return None download_url = download_data['data']['url'] debug_print(f"✅ Got signed download URL: {download_url[:100]}...") return (key, download_url) except (json.JSONDecodeError, KeyError, IndexError) as e: debug_print(f"❌ Failed to parse download URL response: {e}") return None except Exception as e: debug_print(f"❌ Unexpected error uploading image: {type(e).__name__}: {e}") return None async def process_message_content(content, model_capabilities: dict) -> tuple[str, List[dict]]: """ Process message content, handle images if present and model supports them. Args: content: Message content (string or list of content parts) model_capabilities: Model's capability dictionary Returns: Tuple of (text_content, experimental_attachments) """ # Check if model supports image input supports_images = model_capabilities.get('inputCapabilities', {}).get('image', False) # If content is a string, return it as-is if isinstance(content, str): return content, [] # If content is a list (OpenAI format with multiple parts) if isinstance(content, list): text_parts = [] attachments = [] for part in content: if isinstance(part, dict): if part.get('type') == 'text': text_parts.append(part.get('text', '')) elif part.get('type') == 'image_url' and supports_images: image_url = part.get('image_url', {}) if isinstance(image_url, dict): url = image_url.get('url', '') else: url = image_url # Handle base64-encoded images if url.startswith('data:'): # Format: data:image/png;base64,iVBORw0KGgo... try: # Validate and parse data URI if ',' not in url: debug_print(f"❌ Invalid data URI format (no comma separator)") continue header, data = url.split(',', 1) # Parse MIME type if ';' not in header or ':' not in header: debug_print(f"❌ Invalid data URI header format") continue mime_type = header.split(';')[0].split(':')[1] # Validate MIME type if not mime_type.startswith('image/'): debug_print(f"❌ Invalid MIME type: {mime_type}") continue # Decode base64 try: image_data = base64.b64decode(data) except Exception as e: debug_print(f"❌ Failed to decode base64 data: {e}") continue # Validate image size (max 10MB) if len(image_data) > 10 * 1024 * 1024: debug_print(f"❌ Image too large: {len(image_data)} bytes (max 10MB)") continue # Generate filename ext = mimetypes.guess_extension(mime_type) or '.png' filename = f"upload-{uuid.uuid4()}{ext}" debug_print(f"🖼️ Processing base64 image: {filename}, size: {len(image_data)} bytes") # Upload to LMArena upload_result = await upload_image_to_lmarena(image_data, mime_type, filename) if upload_result: key, download_url = upload_result # Add as attachment in LMArena format attachments.append({ "name": key, "contentType": mime_type, "url": download_url }) debug_print(f"✅ Image uploaded and added to attachments") else: debug_print(f"⚠️ Failed to upload image, skipping") except Exception as e: debug_print(f"❌ Unexpected error processing base64 image: {type(e).__name__}: {e}") # Handle URL images (direct URLs) elif url.startswith('http://') or url.startswith('https://'): # For external URLs, we'd need to download and re-upload # For now, skip this case debug_print(f"⚠️ External image URLs not yet supported: {url[:100]}") elif part.get('type') == 'image_url' and not supports_images: debug_print(f"⚠️ Image provided but model doesn't support images") # Combine text parts text_content = '\n'.join(text_parts).strip() return text_content, attachments # Fallback return str(content), [] app = FastAPI() # --- Constants & Global State --- CONFIG_FILE = "config.json" MODELS_FILE = "models.json" API_KEY_HEADER = APIKeyHeader(name="Authorization", auto_error=False) # In-memory stores # { "api_key": { "conversation_id": session_data } } chat_sessions: Dict[str, Dict[str, dict]] = defaultdict(dict) # { "session_id": "username" } dashboard_sessions = {} # { "api_key": [timestamp1, timestamp2, ...] } api_key_usage = defaultdict(list) # { "model_id": count } model_usage_stats = defaultdict(int) # Token cycling: current index for round-robin selection current_token_index = 0 # Track which token is assigned to each conversation (conversation_id -> token) conversation_tokens: Dict[str, str] = {} # Track failed tokens per request to avoid retrying with same token request_failed_tokens: Dict[str, set] = {} # --- New Global State for reCAPTCHA --- RECAPTCHA_TOKEN: Optional[str] = None # Initialize expiry far in the past to force a refresh on startup RECAPTCHA_EXPIRY: datetime = datetime.now(timezone.utc) - timedelta(days=365) # --- nodriver Browser Instance (persistent session) --- # These stay alive for the entire server session NODRIVER_BROWSER = None # nodriver.Browser instance NODRIVER_TAB = None # nodriver.Tab instance (the page) BROWSER_READY = False # Flag to indicate browser is ready for token refresh USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" # Default fallback LMARENA_REQUEST_LOCK = asyncio.Lock() # Lock to serialize LMArena requests (prevents rate limiting) LAST_LMARENA_REQUEST_TIME = 0.0 # Timestamp of last LMArena request (for rate limiting) # --- Webshare Proxy Pool Configuration --- # Enable/disable proxy rotation (set to True when proxies are configured) PROXY_ROTATION_ENABLED = False # Will be auto-enabled when proxies are added # When True, each request creates a NEW session instead of reusing existing ones # This bypasses LMArena's per-session rate limiting (they track by session ID, not just IP) # Recommended: Enable this when using proxy rotation for unlimited parallel requests FORCE_NEW_SESSION = True # Always create fresh session (bypasses per-session rate limits) # Proxy pool - Add your Webshare proxy credentials here # Format: {"host": "IP", "port": PORT, "username": "user", "password": "pass"} WEBSHARE_PROXY_POOL = [ # Account 1 - 10 Proxies (wlnpiril) {"host": "142.111.48.253", "port": 7030, "username": "wlnpiril", "password": "rz8y4an5o6n1"}, # US - Los Angeles {"host": "23.95.150.145", "port": 6114, "username": "wlnpiril", "password": "rz8y4an5o6n1"}, # US - Buffalo {"host": "198.23.239.134", "port": 6540, "username": "wlnpiril", "password": "rz8y4an5o6n1"}, # US - Buffalo {"host": "107.172.163.27", "port": 6543, "username": "wlnpiril", "password": "rz8y4an5o6n1"}, # US - Bloomingdale {"host": "198.105.121.200", "port": 6462, "username": "wlnpiril", "password": "rz8y4an5o6n1"}, # UK - London {"host": "64.137.96.74", "port": 6641, "username": "wlnpiril", "password": "rz8y4an5o6n1"}, # Spain - Madrid {"host": "84.247.60.125", "port": 6095, "username": "wlnpiril", "password": "rz8y4an5o6n1"}, # Poland - Warsaw {"host": "216.10.27.159", "port": 6837, "username": "wlnpiril", "password": "rz8y4an5o6n1"}, # US - Dallas {"host": "23.26.71.145", "port": 5628, "username": "wlnpiril", "password": "rz8y4an5o6n1"}, # US - Orem {"host": "23.27.208.120", "port": 5830, "username": "wlnpiril", "password": "rz8y4an5o6n1"}, # US - Reston # Account 2 - 10 Proxies (wfpfhvqd) {"host": "142.111.48.253", "port": 7030, "username": "wfpfhvqd", "password": "akmgj7n23qgw"}, # US - Los Angeles {"host": "23.95.150.145", "port": 6114, "username": "wfpfhvqd", "password": "akmgj7n23qgw"}, # US - Buffalo {"host": "198.23.239.134", "port": 6540, "username": "wfpfhvqd", "password": "akmgj7n23qgw"}, # US - Buffalo {"host": "107.172.163.27", "port": 6543, "username": "wfpfhvqd", "password": "akmgj7n23qgw"}, # US - Bloomingdale {"host": "198.105.121.200", "port": 6462, "username": "wfpfhvqd", "password": "akmgj7n23qgw"}, # UK - London {"host": "64.137.96.74", "port": 6641, "username": "wfpfhvqd", "password": "akmgj7n23qgw"}, # Spain - Madrid {"host": "84.247.60.125", "port": 6095, "username": "wfpfhvqd", "password": "akmgj7n23qgw"}, # Poland - Warsaw {"host": "216.10.27.159", "port": 6837, "username": "wfpfhvqd", "password": "akmgj7n23qgw"}, # US - Dallas {"host": "23.26.71.145", "port": 5628, "username": "wfpfhvqd", "password": "akmgj7n23qgw"}, # US - Orem {"host": "23.27.208.120", "port": 5830, "username": "wfpfhvqd", "password": "akmgj7n23qgw"}, # US - Reston # Account 3 - 10 Proxies (qbwdhdrw) {"host": "142.111.48.253", "port": 7030, "username": "qbwdhdrw", "password": "9f9w1szgq7tu"}, # US - Los Angeles {"host": "23.95.150.145", "port": 6114, "username": "qbwdhdrw", "password": "9f9w1szgq7tu"}, # US - Buffalo {"host": "198.23.239.134", "port": 6540, "username": "qbwdhdrw", "password": "9f9w1szgq7tu"}, # US - Buffalo {"host": "107.172.163.27", "port": 6543, "username": "qbwdhdrw", "password": "9f9w1szgq7tu"}, # US - Bloomingdale {"host": "198.105.121.200", "port": 6462, "username": "qbwdhdrw", "password": "9f9w1szgq7tu"}, # UK - London {"host": "64.137.96.74", "port": 6641, "username": "qbwdhdrw", "password": "9f9w1szgq7tu"}, # Spain - Madrid {"host": "84.247.60.125", "port": 6095, "username": "qbwdhdrw", "password": "9f9w1szgq7tu"}, # Poland - Warsaw {"host": "216.10.27.159", "port": 6837, "username": "qbwdhdrw", "password": "9f9w1szgq7tu"}, # US - Dallas {"host": "23.26.71.145", "port": 5628, "username": "qbwdhdrw", "password": "9f9w1szgq7tu"}, # US - Orem {"host": "23.27.208.120", "port": 5830, "username": "qbwdhdrw", "password": "9f9w1szgq7tu"}, # US - Reston # Account 4 - 10 Proxies (vwqxqyew) {"host": "142.111.48.253", "port": 7030, "username": "vwqxqyew", "password": "4l6qlayr252q"}, # US - Los Angeles {"host": "23.95.150.145", "port": 6114, "username": "vwqxqyew", "password": "4l6qlayr252q"}, # US - Buffalo {"host": "198.23.239.134", "port": 6540, "username": "vwqxqyew", "password": "4l6qlayr252q"}, # US - Buffalo {"host": "107.172.163.27", "port": 6543, "username": "vwqxqyew", "password": "4l6qlayr252q"}, # US - Bloomingdale {"host": "198.105.121.200", "port": 6462, "username": "vwqxqyew", "password": "4l6qlayr252q"}, # UK - London {"host": "64.137.96.74", "port": 6641, "username": "vwqxqyew", "password": "4l6qlayr252q"}, # Spain - Madrid {"host": "84.247.60.125", "port": 6095, "username": "vwqxqyew", "password": "4l6qlayr252q"}, # Poland - Warsaw {"host": "216.10.27.159", "port": 6837, "username": "vwqxqyew", "password": "4l6qlayr252q"}, # US - Dallas {"host": "23.26.71.145", "port": 5628, "username": "vwqxqyew", "password": "4l6qlayr252q"}, # US - Orem {"host": "23.27.208.120", "port": 5830, "username": "vwqxqyew", "password": "4l6qlayr252q"}, # US - Reston # Account 5 - 10 Proxies (ynwjxcuz) {"host": "142.111.48.253", "port": 7030, "username": "ynwjxcuz", "password": "l90dlksfzyia"}, # US - Los Angeles {"host": "23.95.150.145", "port": 6114, "username": "ynwjxcuz", "password": "l90dlksfzyia"}, # US - Buffalo {"host": "198.23.239.134", "port": 6540, "username": "ynwjxcuz", "password": "l90dlksfzyia"}, # US - Buffalo {"host": "107.172.163.27", "port": 6543, "username": "ynwjxcuz", "password": "l90dlksfzyia"}, # US - Bloomingdale {"host": "198.105.121.200", "port": 6462, "username": "ynwjxcuz", "password": "l90dlksfzyia"}, # UK - London {"host": "64.137.96.74", "port": 6641, "username": "ynwjxcuz", "password": "l90dlksfzyia"}, # Spain - Madrid {"host": "84.247.60.125", "port": 6095, "username": "ynwjxcuz", "password": "l90dlksfzyia"}, # Poland - Warsaw {"host": "216.10.27.159", "port": 6837, "username": "ynwjxcuz", "password": "l90dlksfzyia"}, # US - Dallas {"host": "23.26.71.145", "port": 5628, "username": "ynwjxcuz", "password": "l90dlksfzyia"}, # US - Orem {"host": "23.27.208.120", "port": 5830, "username": "ynwjxcuz", "password": "l90dlksfzyia"}, # US - Reston ] # Track which proxy to use next (round-robin) CURRENT_PROXY_INDEX = 0 PROXY_USAGE_STATS = defaultdict(lambda: {"requests": 0, "errors": 0}) def get_next_proxy(): """Get the next proxy from the pool in round-robin fashion.""" global CURRENT_PROXY_INDEX if not WEBSHARE_PROXY_POOL: return None proxy = WEBSHARE_PROXY_POOL[CURRENT_PROXY_INDEX] CURRENT_PROXY_INDEX = (CURRENT_PROXY_INDEX + 1) % len(WEBSHARE_PROXY_POOL) # Track usage proxy_id = f"{proxy['host']}:{proxy['port']}" PROXY_USAGE_STATS[proxy_id]["requests"] += 1 return proxy def format_proxy_url(proxy: dict) -> str: """Format proxy dict into URL string for httpx/requests.""" if not proxy: return None return f"http://{proxy['username']}:{proxy['password']}@{proxy['host']}:{proxy['port']}" def get_proxy_for_browser() -> dict: """Get proxy configuration formatted for browser use.""" proxy = get_next_proxy() if not proxy: return None return { "server": f"http://{proxy['host']}:{proxy['port']}", "username": proxy['username'], "password": proxy['password'] } # Auto-enable proxy rotation if proxies are configured if WEBSHARE_PROXY_POOL: PROXY_ROTATION_ENABLED = True print(f"🔄 Proxy rotation ENABLED with {len(WEBSHARE_PROXY_POOL)} proxies") else: print("⚠️ No proxies configured. Using direct connection (rate limits may apply)") # -------------------------------------- # --- Helper Functions --- def get_config(): try: with open(CONFIG_FILE, "r") as f: config = json.load(f) except (FileNotFoundError, json.JSONDecodeError) as e: debug_print(f"⚠️ Config file error: {e}, using defaults") config = {} except Exception as e: debug_print(f"⚠️ Unexpected error reading config: {e}, using defaults") config = {} # Ensure default keys exist try: config.setdefault("password", "admin") config.setdefault("auth_token", "") config.setdefault("auth_tokens", []) # Multiple auth tokens config.setdefault("cf_clearance", "") config.setdefault("api_keys", []) config.setdefault("usage_stats", {}) except Exception as e: debug_print(f"⚠️ Error setting config defaults: {e}") return config def load_usage_stats(): """Load usage stats from config into memory""" global model_usage_stats try: config = get_config() model_usage_stats = defaultdict(int, config.get("usage_stats", {})) except Exception as e: debug_print(f"⚠️ Error loading usage stats: {e}, using empty stats") model_usage_stats = defaultdict(int) def save_config(config): try: # Persist in-memory stats to the config dict before saving config["usage_stats"] = dict(model_usage_stats) with open(CONFIG_FILE, "w") as f: json.dump(config, f, indent=4) except Exception as e: debug_print(f"❌ Error saving config: {e}") def get_models(): try: with open(MODELS_FILE, "r") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return [] def save_models(models): try: with open(MODELS_FILE, "w") as f: json.dump(models, f, indent=2) except Exception as e: debug_print(f"❌ Error saving models: {e}") def get_request_headers(): """Get request headers with the first available auth token (for compatibility)""" config = get_config() # Try to get token from auth_tokens first, then fallback to single token auth_tokens = config.get("auth_tokens", []) if auth_tokens: token = auth_tokens[0] # Just use first token for non-API requests else: token = config.get("auth_token", "").strip() if not token: raise HTTPException(status_code=500, detail="Arena auth token not set in dashboard.") return get_request_headers_with_token(token) def get_request_headers_with_token(token: str): """Get request headers with a specific auth token""" config = get_config() cf_clearance = config.get("cf_clearance", "").strip() # Check if the token is a full cookie string (contains multiple cookies or the key name) if "arena-auth-prod-v1=" in token: # User pasted the full cookie string! Use it directly. # Ensure cf_clearance is updated if present in the string? # Actually, best to just use what user gave, but we might want to ensure cf_clearance is there. # If user gave full string, it likely has everything. cookie_header = token else: # Standard behavior: wrap the value cookie_header = f"cf_clearance={cf_clearance}; arena-auth-prod-v1={token}" return { "Content-Type": "text/plain;charset=UTF-8", "Cookie": cookie_header, "User-Agent": USER_AGENT, } def get_next_auth_token(exclude_tokens: set = None): """Get next auth token using round-robin selection Args: exclude_tokens: Set of tokens to exclude from selection (e.g., already tried tokens) """ global current_token_index config = get_config() # Get all available tokens auth_tokens = config.get("auth_tokens", []) if not auth_tokens: raise HTTPException(status_code=500, detail="No auth tokens configured") # Filter out excluded tokens if exclude_tokens: available_tokens = [t for t in auth_tokens if t not in exclude_tokens] if not available_tokens: raise HTTPException(status_code=500, detail="No more auth tokens available to try") else: available_tokens = auth_tokens # Round-robin selection from available tokens token = available_tokens[current_token_index % len(available_tokens)] current_token_index = (current_token_index + 1) % len(auth_tokens) return token def remove_auth_token(token: str): """Remove an expired/invalid auth token from the list""" try: config = get_config() auth_tokens = config.get("auth_tokens", []) if token in auth_tokens: auth_tokens.remove(token) config["auth_tokens"] = auth_tokens save_config(config) debug_print(f"🗑️ Removed expired token from list: {token[:20]}...") except Exception as e: debug_print(f"⚠️ Error removing auth token: {e}") async def make_lmarena_request_browser(url: str, payload: dict, method: str = "POST") -> dict: """Make LMArena API request through the real Chrome browser (bypasses all bot detection) This function uses the nodriver browser to execute JavaScript fetch() calls, ensuring the request comes from a real browser with proper cookies, TLS fingerprint, and session context. Args: url: Full URL to the LMArena API endpoint payload: JSON payload to send method: HTTP method (POST or PUT) Returns: dict with 'status_code' and 'text' (response body) """ global NODRIVER_TAB, BROWSER_READY, LAST_LMARENA_REQUEST_TIME if not BROWSER_READY or NODRIVER_TAB is None: raise HTTPException(status_code=503, detail="Browser not ready for API calls") # Only use lock and rate limiting if proxy rotation is DISABLED # With rotating proxies, each request uses different IP = no rate limit concerns if not PROXY_ROTATION_ENABLED: # Acquire lock to serialize requests (parallel requests will queue up here) debug_print(f"🔒 Waiting to acquire request lock...") await LMARENA_REQUEST_LOCK.acquire() debug_print(f"🔓 Lock acquired!") else: proxy = get_next_proxy() proxy_id = f"{proxy['host']}:{proxy['port']}" if proxy else "direct" debug_print(f"🔄 Using rotating proxy: {proxy_id} (no lock needed)") try: # Rate limiting: only if proxy rotation is disabled if not PROXY_ROTATION_ENABLED: MIN_REQUEST_INTERVAL = 2.5 current_time = time.time() if LAST_LMARENA_REQUEST_TIME > 0: elapsed = current_time - LAST_LMARENA_REQUEST_TIME if elapsed < MIN_REQUEST_INTERVAL: wait_time = MIN_REQUEST_INTERVAL - elapsed debug_print(f"⏳ Rate limiting: waiting {wait_time:.1f}s before next request") await asyncio.sleep(wait_time) LAST_LMARENA_REQUEST_TIME = time.time() # CRITICAL: Refresh reCAPTCHA token AFTER acquiring lock # Token may have expired while waiting in queue debug_print(f"🔄 Refreshing reCAPTCHA token after lock...") fresh_token = await refresh_recaptcha_token() if fresh_token and 'recaptchaV3Token' in payload: payload['recaptchaV3Token'] = fresh_token debug_print(f"✅ Fresh token applied ({len(fresh_token)} chars)") debug_print(f"🌐 Making browser-based request to: {url}") debug_print(f"🔐 Using REAL Chrome browser (bypasses bot detection)") # Escape the payload for JavaScript payload_json = json.dumps(payload).replace('\\', '\\\\').replace("'", "\\'").replace('\n', '\\n') # Generate unique request ID to avoid collisions request_id = f"lmab_{int(time.time() * 1000)}" # JavaScript code that stores result in window global (since evaluate() can't return async results) js_code = f""" (function() {{ window.{request_id} = null; // Reset fetch('{url}', {{ method: '{method}', headers: {{ 'Content-Type': 'application/json' }}, body: '{payload_json}', credentials: 'include' }}) .then(async (response) => {{ const text = await response.text(); window.{request_id} = {{ status_code: response.status, text: text, ok: response.ok, done: true }}; }}) .catch((error) => {{ window.{request_id} = {{ status_code: 0, text: 'Fetch error: ' + error.message, ok: false, done: true }}; }}); return 'request_started'; }})(); """ # Start the fetch request start_result = await NODRIVER_TAB.evaluate(js_code) debug_print(f"🚀 Browser fetch started: {start_result}") # Poll for result (timeout after 120 seconds) max_wait = 120 poll_interval = 0.5 waited = 0 while waited < max_wait: await asyncio.sleep(poll_interval) waited += poll_interval # Check if result is ready result = await NODRIVER_TAB.evaluate(f"window.{request_id}") # Debug: log result type if result is not None: debug_print(f"🔍 Result type: {type(result).__name__}, value: {str(result)[:100]}") # Handle different return types from nodriver if result is not None: # nodriver returns JS objects as list of [key, {type, value}] pairs # e.g. [['status_code', {'type': 'number', 'value': 200}], ['text', {...}], ...] if isinstance(result, list) and len(result) > 0: # Check if it's the nodriver format: list of 2-element lists if isinstance(result[0], list) and len(result[0]) == 2: # Convert nodriver format to dict converted = {} for item in result: if isinstance(item, list) and len(item) == 2: key = item[0] value_wrapper = item[1] if isinstance(value_wrapper, dict) and 'value' in value_wrapper: converted[key] = value_wrapper['value'] else: converted[key] = value_wrapper result = converted debug_print(f"✅ Converted nodriver format to dict: {list(result.keys())}") debug_print(f" done={result.get('done')}, status={result.get('status_code')}") # If first element is a dict, take it (old handling) elif isinstance(result[0], dict): result = result[0] # Now check if it's a dict with 'done' key if isinstance(result, dict) and result.get("done"): debug_print(f"🌐 Browser response status: {result.get('status_code', 'unknown')}") # Log first 200 chars of response for debugging response_preview = str(result.get('text', ''))[:200] debug_print(f"📄 Response preview: {response_preview}...") # Clean up window variable await NODRIVER_TAB.evaluate(f"delete window.{request_id}") return { "status_code": result.get("status_code", 500), "text": result.get("text", ""), "ok": result.get("ok", False) } if waited % 5 == 0: debug_print(f"⏳ Waiting for browser response... ({int(waited)}s)") # Timeout debug_print(f"❌ Browser fetch timed out after {max_wait}s") await NODRIVER_TAB.evaluate(f"delete window.{request_id}") return {"status_code": 504, "text": "Browser request timed out"} except Exception as e: debug_print(f"❌ Browser request failed: {type(e).__name__}: {e}") return {"status_code": 500, "text": f"Browser error: {str(e)}"} finally: # Only release lock if we acquired it (proxy rotation disabled) if not PROXY_ROTATION_ENABLED: LMARENA_REQUEST_LOCK.release() debug_print(f"🔓 Request lock released") async def make_lmarena_streaming_request_browser(url: str, payload: dict, method: str = "POST"): """Stream LMArena API response through the real Chrome browser (bypasses reCAPTCHA). This is an async generator that yields response chunks as they arrive. Uses browser's ReadableStream API to capture streaming data. Args: url: Full URL to the LMArena API endpoint payload: JSON payload to send method: HTTP method (POST or PUT) Yields: str: Each chunk of the streaming response as it arrives """ global NODRIVER_TAB, BROWSER_READY, LAST_LMARENA_REQUEST_TIME if not BROWSER_READY or NODRIVER_TAB is None: raise HTTPException(status_code=503, detail="Browser not ready for API calls") # Only use lock and rate limiting if proxy rotation is DISABLED if not PROXY_ROTATION_ENABLED: # Acquire lock to serialize requests (parallel requests will queue up here) debug_print(f"🔒 [STREAM] Waiting to acquire request lock...") await LMARENA_REQUEST_LOCK.acquire() debug_print(f"🔓 [STREAM] Lock acquired!") else: proxy = get_next_proxy() proxy_id = f"{proxy['host']}:{proxy['port']}" if proxy else "direct" debug_print(f"🔄 [STREAM] Using rotating proxy: {proxy_id} (no lock needed)") # Rate limiting: only if proxy rotation is disabled if not PROXY_ROTATION_ENABLED: MIN_REQUEST_INTERVAL = 2.5 current_time = time.time() if LAST_LMARENA_REQUEST_TIME > 0: elapsed = current_time - LAST_LMARENA_REQUEST_TIME if elapsed < MIN_REQUEST_INTERVAL: wait_time = MIN_REQUEST_INTERVAL - elapsed debug_print(f"⏳ Rate limiting: waiting {wait_time:.1f}s before next streaming request") await asyncio.sleep(wait_time) LAST_LMARENA_REQUEST_TIME = time.time() # CRITICAL: Refresh reCAPTCHA token AFTER acquiring lock # Token may have expired while waiting in queue debug_print(f"🔄 [STREAM] Refreshing reCAPTCHA token after lock...") fresh_token = await refresh_recaptcha_token() if fresh_token and 'recaptchaV3Token' in payload: payload['recaptchaV3Token'] = fresh_token debug_print(f"✅ [STREAM] Fresh token applied ({len(fresh_token)} chars)") debug_print(f"🌐 Making STREAMING browser request to: {url}") debug_print(f"🔐 Using REAL Chrome browser for streaming (bypasses reCAPTCHA)") # Escape the payload for JavaScript payload_json = json.dumps(payload).replace('\\', '\\\\').replace("'", "\\'").replace('\n', '\\n') # Generate unique request ID request_id = f"lmab_stream_{int(time.time() * 1000)}" # JavaScript that uses ReadableStream to collect chunks # Stores chunks in an array that Python can poll js_code = f""" (function() {{ window.{request_id} = {{ chunks: [], done: false, error: null, status_code: 0 }}; fetch('{url}', {{ method: '{method}', headers: {{ 'Content-Type': 'application/json' }}, body: '{payload_json}', credentials: 'include' }}) .then(async (response) => {{ window.{request_id}.status_code = response.status; if (!response.ok) {{ const text = await response.text(); window.{request_id}.error = text; window.{request_id}.done = true; return; }} const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) {{ const {{done, value}} = await reader.read(); if (done) {{ window.{request_id}.done = true; break; }} const chunk = decoder.decode(value, {{stream: true}}); window.{request_id}.chunks.push(chunk); }} }}) .catch((error) => {{ window.{request_id}.error = 'Fetch error: ' + error.message; window.{request_id}.done = true; }}); return 'streaming_started'; }})(); """ try: # Start the streaming fetch start_result = await NODRIVER_TAB.evaluate(js_code) debug_print(f"🚀 Browser streaming started: {start_result}") # Poll for chunks and yield them as they arrive max_wait = 120 poll_interval = 0.1 # Poll faster for streaming waited = 0 last_chunk_index = 0 while waited < max_wait: await asyncio.sleep(poll_interval) waited += poll_interval # Get current state state_js = f""" (function() {{ const s = window.{request_id}; if (!s) return null; return {{ chunk_count: s.chunks.length, done: s.done, error: s.error, status_code: s.status_code }}; }})(); """ state = await NODRIVER_TAB.evaluate(state_js) if state is None: continue # Handle nodriver list format conversion if isinstance(state, list): converted = {} for item in state: if isinstance(item, list) and len(item) == 2: key = item[0] value_wrapper = item[1] if isinstance(value_wrapper, dict) and 'value' in value_wrapper: converted[key] = value_wrapper['value'] else: converted[key] = value_wrapper state = converted if not isinstance(state, dict): continue chunk_count = state.get('chunk_count', 0) done = state.get('done', False) error = state.get('error') status_code = state.get('status_code', 0) # Check for error (nodriver returns {'type': 'null'} for JS null, which is truthy) # Only treat as error if it's an actual error string is_real_error = error and isinstance(error, str) and error != "" if is_real_error: debug_print(f"❌ Stream error: {error}") await NODRIVER_TAB.evaluate(f"delete window.{request_id}") raise HTTPException(status_code=status_code or 500, detail=f"Browser stream error: {error}") # Get new chunks if available if chunk_count > last_chunk_index: # Get all new chunks get_chunks_js = f""" (function() {{ const s = window.{request_id}; if (!s) return []; return s.chunks.slice({last_chunk_index}); }})(); """ new_chunks = await NODRIVER_TAB.evaluate(get_chunks_js) # Handle nodriver format for chunk array if isinstance(new_chunks, list): for chunk_item in new_chunks: # Extract chunk text if isinstance(chunk_item, dict) and 'value' in chunk_item: chunk_text = chunk_item['value'] elif isinstance(chunk_item, str): chunk_text = chunk_item else: chunk_text = str(chunk_item) if chunk_item else "" if chunk_text: yield chunk_text last_chunk_index = chunk_count # Check if done if done: debug_print(f"✅ Browser streaming completed. Status: {status_code}, Total chunks: {chunk_count}") break # Periodic status log if waited % 10 == 0 and waited > 0: debug_print(f"⏳ Streaming... ({int(waited)}s, {chunk_count} chunks)") # Clean up await NODRIVER_TAB.evaluate(f"delete window.{request_id}") if waited >= max_wait: debug_print(f"❌ Browser streaming timed out after {max_wait}s") raise HTTPException(status_code=504, detail="Browser streaming timed out") except HTTPException: raise except Exception as e: debug_print(f"❌ Browser streaming failed: {type(e).__name__}: {e}") try: await NODRIVER_TAB.evaluate(f"delete window.{request_id}") except: pass raise HTTPException(status_code=500, detail=f"Browser streaming error: {str(e)}") finally: # Only release lock if we acquired it (proxy rotation disabled) if not PROXY_ROTATION_ENABLED: LMARENA_REQUEST_LOCK.release() debug_print(f"🔓 [STREAM] Request lock released") # --- Dashboard Authentication --- async def get_current_session(request: Request): session_id = request.cookies.get("session_id") if session_id and session_id in dashboard_sessions: return dashboard_sessions[session_id] return None # --- API Key Authentication & Rate Limiting --- async def rate_limit_api_key( auth_header: Optional[str] = Depends(API_KEY_HEADER), x_api_key: Optional[str] = Header(None, alias="x-api-key") ): api_key_str = None # Check Authorization header (Bearer token) debug_print(f"🔑 Auth Debug: AuthHeader set? {auth_header is not None}, X-API-Key set? {x_api_key is not None}") if auth_header and auth_header.startswith("Bearer "): api_key_str = auth_header[7:].strip() # Check x-api-key header (Anthropic standard) if not api_key_str and x_api_key: api_key_str = x_api_key.strip() if not api_key_str: raise HTTPException( status_code=401, detail="Missing or invalid authentication. Expected 'Authorization: Bearer KEY' or 'x-api-key: KEY'" ) config = get_config() key_data = next((k for k in config["api_keys"] if k["key"] == api_key_str), None) if not key_data: raise HTTPException(status_code=401, detail="Invalid API Key.") # Rate Limiting rate_limit = key_data.get("rpm", 60) current_time = time.time() # Clean up old timestamps (older than 60 seconds) api_key_usage[api_key_str] = [t for t in api_key_usage[api_key_str] if current_time - t < 60] if len(api_key_usage[api_key_str]) >= rate_limit: # Calculate seconds until oldest request expires (60 seconds window) oldest_timestamp = min(api_key_usage[api_key_str]) retry_after = int(60 - (current_time - oldest_timestamp)) retry_after = max(1, retry_after) # At least 1 second raise HTTPException( status_code=429, detail="Rate limit exceeded. Please try again later.", headers={"Retry-After": str(retry_after)} ) api_key_usage[api_key_str].append(current_time) return key_data # --- Core Logic --- async def get_initial_data(): """ Extracts initial data from the nodriver browser session. Must be called AFTER initialize_nodriver_browser(). Extracts: cf_clearance cookie, models list. """ global NODRIVER_TAB print("") print("📦 STEP 3/3: Loading LMArena data...") if NODRIVER_TAB is None: print(" └── ❌ Browser not available, skipping data extraction") return try: config = get_config() # Extract cf_clearance from cookies print(" ├── Extracting Cloudflare clearance...") try: cookies = await NODRIVER_TAB.browser.cookies.get_all() cf_clearance_cookie = None for cookie in cookies: if cookie.name == "cf_clearance": cf_clearance_cookie = cookie break if cf_clearance_cookie: config["cf_clearance"] = cf_clearance_cookie.value save_config(config) print(f" ├── ✅ cf_clearance saved") else: print(" ├── ⚠️ No cf_clearance cookie found (might not be needed)") except Exception as e: debug_print(f" ├── ⚠️ Error extracting cookies: {e}") # Extract models from page content print(" ├── Extracting available models...") try: # Get the page HTML content body = await NODRIVER_TAB.get_content() # Try to find models in the page match = re.search(r'{\\\"initialModels\\\":(\\[.*?\\]),\\\"initialModel[A-Z]Id', body, re.DOTALL) if match: models_json = match.group(1).encode().decode('unicode_escape') models = json.loads(models_json) save_models(models) print(f" ├── ✅ Found {len(models)} models") else: # Try alternative pattern match2 = re.search(r'"initialModels":(\[.*?\]),"initialModel', body, re.DOTALL) if match2: models = json.loads(match2.group(1)) save_models(models) print(f" ├── ✅ Found {len(models)} models") else: print(" ├── ⚠️ Could not find models in page (using cached)") except Exception as e: debug_print(f" ├── ⚠️ Error extracting models: {e}") print(" └── ✅ Initial data extraction complete") except Exception as e: print(f" └── ❌ Error during data extraction: {e}") async def periodic_refresh_task(): """Background task to refresh cf_clearance and models every 30 minutes""" while True: try: # Wait 30 minutes (1800 seconds) await asyncio.sleep(1800) debug_print("\n" + "="*60) debug_print("🔄 Starting scheduled 30-minute refresh...") debug_print("="*60) await get_initial_data() debug_print("✅ Scheduled refresh completed") debug_print("="*60 + "\n") except Exception as e: debug_print(f"❌ Error in periodic refresh task: {e}") # Continue the loop even if there's an error continue @app.on_event("startup") async def startup_event(): try: # Print startup banner print("=" * 60) print("🚀 LMArena Bridge Server Starting...") print("=" * 60) # Load configuration config = get_config() save_config(config) save_models(get_models()) load_usage_stats() api_key_count = len(config.get("api_keys", [])) auth_token_count = len(config.get("auth_tokens", [])) or (1 if config.get("auth_token") else 0) print(f"📋 Configuration loaded from config.json") print(f" ├── API Keys: {api_key_count} configured") print(f" ├── Auth Tokens: {auth_token_count} configured") print(f" └── Debug Mode: {'ON' if DEBUG else 'OFF'}") # 1. Initialize browser and solve CAPTCHA (this blocks until user solves) browser_ready = await initialize_nodriver_browser() if not browser_ready: print("") print("⚠️ WARNING: Server starting without browser (limited functionality)") print(" └── reCAPTCHA token refresh will not work") print("") else: # 2. Extract initial data from the browser session await get_initial_data() # 3. Start background tasks asyncio.create_task(periodic_refresh_task()) # Print ready message print("") print("=" * 60) print("✅ SERVER READY!") print("=" * 60) print(f"📍 Dashboard: http://localhost:{PORT}/dashboard") print(f"🔐 Login: http://localhost:{PORT}/dash/login") print(f"📚 Universal API: http://localhost:{PORT}/v1") if browser_ready: print("💡 Chrome window will stay open (do not close it!)") print("=" * 60) print("") except Exception as e: print(f"❌ Error during startup: {e}") import traceback traceback.print_exc() # Continue anyway - server should still start # --- UI Endpoints (Login/Dashboard) --- @app.get("/") async def root(): return {"status": "online", "message": "Arena Bridge is running"} @app.get("/dash/login", response_class=HTMLResponse) async def login_page(request: Request, error: Optional[str] = None): if await get_current_session(request): return RedirectResponse(url="/dashboard") error_msg = '
' if error else '' return f"""Failed to load configuration: {str(e)}
""", status_code=500) # Render API Keys keys_html = "" for key in config["api_keys"]: created_date = time.strftime('%Y-%m-%d %H:%M', time.localtime(key.get('created', 0))) keys_html += f"""{key['key']}{token[:20]}...
| Name | Key | RPM Limit | Created | Action |
|---|---|---|---|---|
| No keys found | ||||