Spaces:
Paused
Paused
| import json | |
| import uuid | |
| import base64 | |
| import asyncio | |
| import logging | |
| import os | |
| import sys | |
| import time | |
| from typing import AsyncIterator, Optional | |
| from playwright.async_api import async_playwright, Browser, BrowserContext, Page, Route | |
| from cryptography.hazmat.primitives.asymmetric import rsa, padding | |
| from cryptography.hazmat.primitives import hashes | |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
| from cryptography.hazmat.backends import default_backend | |
| logger = logging.getLogger(__name__) | |
| _virtual_display = None | |
| def _ensure_display(): | |
| """On Linux without DISPLAY, start a virtual display via Xvfb.""" | |
| global _virtual_display | |
| if _virtual_display is not None: | |
| return | |
| if sys.platform != "linux": | |
| return # Windows/macOS always have a display | |
| if os.environ.get("DISPLAY"): | |
| return # Display already available | |
| try: | |
| from pyvirtualdisplay import Display | |
| _virtual_display = Display(visible=False, size=(1280, 800)) | |
| _virtual_display.start() | |
| logger.info(f"Started virtual display: {os.environ.get('DISPLAY')}") | |
| except ImportError: | |
| logger.warning( | |
| "No DISPLAY and pyvirtualdisplay not installed. " | |
| "Install with: pip install pyvirtualdisplay\n" | |
| "Also install Xvfb: apt-get install xvfb" | |
| ) | |
| raise RuntimeError( | |
| "Headless server requires pyvirtualdisplay + xvfb. " | |
| "Install: pip install pyvirtualdisplay && apt-get install xvfb" | |
| ) | |
| DUCK_AI_BASE = "https://duck.ai" | |
| CHAT_URL_PATTERN = "**/duckchat/v1/chat" | |
| SEC_CH_UA = '"Not:A-Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"' | |
| USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36" | |
| DEFAULT_POOL_SIZE = 3 | |
| def _int_to_base64url(n: int, length: int = None) -> str: | |
| b = n.to_bytes((n.bit_length() + 7) // 8, byteorder='big') if n > 0 else b'\x00' | |
| if length and len(b) < length: | |
| b = b'\x00' * (length - len(b)) + b | |
| return base64.urlsafe_b64encode(b).rstrip(b'=').decode('ascii') | |
| def _rsa_public_key_to_jwk(public_key) -> dict: | |
| pub_numbers = public_key.public_numbers() | |
| n_bytes = (pub_numbers.n.bit_length() + 7) // 8 | |
| return { | |
| "alg": "RSA-OAEP-256", | |
| "e": _int_to_base64url(pub_numbers.e), | |
| "ext": True, | |
| "key_ops": ["encrypt"], | |
| "kty": "RSA", | |
| "n": _int_to_base64url(pub_numbers.n, n_bytes), | |
| "use": "enc" | |
| } | |
| def _generate_rsa_keypair(): | |
| priv = rsa.generate_private_key( | |
| public_exponent=65537, key_size=2048, backend=default_backend() | |
| ) | |
| jwk = _rsa_public_key_to_jwk(priv.public_key()) | |
| return priv, jwk | |
| def _decrypt_data(private_key, encrypted_b64: str) -> Optional[str]: | |
| if not private_key: | |
| return None | |
| try: | |
| raw = base64.b64decode(encrypted_b64) | |
| enc_key = raw[:256] | |
| rest = raw[256:] | |
| aes_key = private_key.decrypt( | |
| enc_key, | |
| padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), | |
| algorithm=hashes.SHA256(), label=None) | |
| ) | |
| iv, tag, ct = rest[:12], rest[-16:], rest[12:-16] | |
| cipher = Cipher(algorithms.AES(aes_key), modes.GCM(iv, tag), backend=default_backend()) | |
| dec = cipher.decryptor() | |
| return (dec.update(ct) + dec.finalize()).decode('utf-8') | |
| except Exception as e: | |
| logger.debug(f"Decryption failed: {e}") | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # PageWorker: one browser tab that can handle one chat request at a time | |
| # --------------------------------------------------------------------------- | |
| class PageWorker: | |
| def __init__(self, worker_id: int, context: BrowserContext): | |
| self.id = worker_id | |
| self._context = context | |
| self.page: Optional[Page] = None | |
| self.ready = False | |
| self.request_count = 0 | |
| async def init(self): | |
| """Create page, navigate to duck.ai, handle first-visit dialog.""" | |
| self.page = await self._context.new_page() | |
| await self.page.goto(DUCK_AI_BASE, wait_until="domcontentloaded", timeout=60000) | |
| await self.page.wait_for_timeout(3000) | |
| for selector in [ | |
| "button:has-text('Get Started')", | |
| "button:has-text('Continue')", | |
| "button:has-text('I Agree')", | |
| "button:has-text('Accept')", | |
| ]: | |
| try: | |
| btn = self.page.locator(selector) | |
| if await btn.count() > 0: | |
| await btn.first.click(timeout=3000) | |
| await self.page.wait_for_timeout(1000) | |
| except: | |
| pass | |
| try: | |
| await self.page.wait_for_selector("textarea:not([disabled])", timeout=15000) | |
| except: | |
| logger.warning(f"Worker {self.id}: textarea not enabled after 15s") | |
| self.ready = True | |
| logger.info(f"Worker {self.id} ready") | |
| async def reinit(self): | |
| """Recreate the page (e.g. after an error).""" | |
| try: | |
| if self.page: | |
| await self.page.close() | |
| except: | |
| pass | |
| self.page = None | |
| self.ready = False | |
| await self.init() | |
| async def do_chat(self, desired_body: dict, private_key, public_key_jwk) -> str: | |
| """ | |
| Send a chat request through the page's own UI flow with route interception. | |
| Returns the raw SSE response body text. | |
| """ | |
| response_data = {"body": None, "status": None, "error": None} | |
| async def intercept_chat(route: Route): | |
| try: | |
| request = route.request | |
| original_body = json.loads(request.post_data) if request.post_data else {} | |
| durable = original_body.get("durableStream", {}) | |
| durable["publicKey"] = public_key_jwk | |
| final_body = {**desired_body, "durableStream": durable} | |
| resp = await route.fetch(post_data=json.dumps(final_body)) | |
| body_bytes = await resp.body() | |
| response_data["status"] = resp.status | |
| response_data["body"] = body_bytes.decode("utf-8", errors="replace") | |
| await route.fulfill( | |
| status=200, | |
| content_type="text/event-stream", | |
| body='data: {"action":"success"}\n\ndata: [DONE]\n\n' | |
| ) | |
| except Exception as e: | |
| response_data["error"] = str(e) | |
| logger.error(f"Worker {self.id} intercept error: {e}") | |
| try: | |
| await route.fulfill(status=200, content_type="text/event-stream", | |
| body='data: [DONE]\n\n') | |
| except: | |
| pass | |
| # Click "New Chat" if available | |
| try: | |
| nc = self.page.locator("a:has-text('New Chat'), button:has-text('New Chat'), a[href='/']") | |
| if await nc.count() > 0: | |
| await nc.first.click(timeout=3000) | |
| await self.page.wait_for_timeout(1000) | |
| except: | |
| pass | |
| # Ensure textarea | |
| try: | |
| await self.page.wait_for_selector("textarea:not([disabled])", timeout=10000) | |
| except: | |
| await self.reinit() | |
| await self.page.wait_for_selector("textarea:not([disabled])", timeout=10000) | |
| await self.page.route(CHAT_URL_PATTERN, intercept_chat) | |
| max_retries = 2 | |
| for attempt in range(max_retries + 1): | |
| response_data = {"body": None, "status": None, "error": None} | |
| try: | |
| textarea = self.page.locator("textarea:not([disabled])") | |
| await textarea.first.click(timeout=5000) | |
| msgs = desired_body.get("messages", []) | |
| last_msg = msgs[-1]["content"] if msgs else "Hi" | |
| await textarea.first.fill(last_msg[:50]) | |
| await self.page.wait_for_timeout(200) | |
| await textarea.first.press("Enter") | |
| for _ in range(120): | |
| await self.page.wait_for_timeout(500) | |
| if response_data["body"] is not None or response_data["error"] is not None: | |
| break | |
| except Exception as e: | |
| response_data["error"] = str(e) | |
| try: | |
| await self.page.unroute(CHAT_URL_PATTERN, intercept_chat) | |
| except: | |
| pass | |
| if response_data["error"]: | |
| logger.error(f"Worker {self.id} attempt {attempt+1}: {response_data['error']}") | |
| if attempt < max_retries: | |
| await self.reinit() | |
| await self.page.route(CHAT_URL_PATTERN, intercept_chat) | |
| continue | |
| raise RuntimeError(f"Chat failed: {response_data['error']}") | |
| status = response_data.get("status", 0) | |
| body = response_data.get("body", "") | |
| if status == 429 or (status == 418 and "ERR_BN_LIMIT" in body): | |
| if attempt < max_retries: | |
| logger.warning(f"Worker {self.id} rate limited, retrying...") | |
| await asyncio.sleep(5) | |
| await self.reinit() | |
| await self.page.route(CHAT_URL_PATTERN, intercept_chat) | |
| continue | |
| raise RuntimeError(f"Rate limited: {body[:200]}") | |
| if status != 200: | |
| if attempt < max_retries: | |
| logger.warning(f"Worker {self.id} error {status}, retrying...") | |
| await self.reinit() | |
| await self.page.route(CHAT_URL_PATTERN, intercept_chat) | |
| continue | |
| raise RuntimeError(f"Chat API error {status}: {body[:300]}") | |
| self.request_count += 1 | |
| return body | |
| raise RuntimeError("Exhausted retries") | |
| async def close(self): | |
| try: | |
| if self.page: | |
| await self.page.close() | |
| except: | |
| pass | |
| # --------------------------------------------------------------------------- | |
| # DuckAIClient: manages browser + pool of PageWorkers | |
| # --------------------------------------------------------------------------- | |
| class DuckAIClient: | |
| def __init__(self, proxy: str = None, model: str = "claude-haiku-4-5", | |
| assistant_name: str = None, system_prompt: str = None, | |
| pool_size: int = DEFAULT_POOL_SIZE): | |
| self.model = model | |
| self.assistant_name = assistant_name | |
| self.system_prompt = system_prompt | |
| self.proxy = proxy | |
| self.pool_size = pool_size | |
| self._playwright = None | |
| self._browser: Optional[Browser] = None | |
| self._context: Optional[BrowserContext] = None | |
| self._pool: Optional[asyncio.Queue] = None | |
| self._workers: list[PageWorker] = [] | |
| self._init_lock = asyncio.Lock() | |
| self._ready = False | |
| async def _ensure_ready(self): | |
| if self._ready: | |
| return | |
| async with self._init_lock: | |
| if self._ready: | |
| return | |
| _ensure_display() | |
| logger.info(f"Launching browser (pool_size={self.pool_size})...") | |
| self._playwright = await async_playwright().start() | |
| launch_args = { | |
| "headless": False, | |
| "args": [ | |
| "--disable-blink-features=AutomationControlled", | |
| "--window-position=-9999,-9999", | |
| "--no-sandbox", | |
| ], | |
| } | |
| self._browser = await self._playwright.chromium.launch(**launch_args) | |
| ctx_args = { | |
| "user_agent": USER_AGENT, | |
| "viewport": {"width": 1280, "height": 800}, | |
| "locale": "en-US", | |
| } | |
| if self.proxy: | |
| ctx_args["proxy"] = {"server": self.proxy} | |
| self._context = await self._browser.new_context(**ctx_args) | |
| await self._context.set_extra_http_headers({"sec-ch-ua": SEC_CH_UA}) | |
| # Create page workers | |
| self._pool = asyncio.Queue() | |
| self._workers = [] | |
| init_tasks = [] | |
| for i in range(self.pool_size): | |
| w = PageWorker(i, self._context) | |
| self._workers.append(w) | |
| init_tasks.append(w.init()) | |
| # Initialize all workers in parallel | |
| results = await asyncio.gather(*init_tasks, return_exceptions=True) | |
| for i, res in enumerate(results): | |
| if isinstance(res, Exception): | |
| logger.error(f"Worker {i} init failed: {res}") | |
| else: | |
| self._pool.put_nowait(self._workers[i]) | |
| ready_count = self._pool.qsize() | |
| logger.info(f"Browser ready: {ready_count}/{self.pool_size} workers") | |
| if ready_count == 0: | |
| raise RuntimeError("No workers initialized successfully") | |
| self._ready = True | |
| async def chat_stream(self, messages: list, web_search: bool = True, | |
| custom_instructions: str = None, | |
| assistant_name: str = None) -> AsyncIterator[dict]: | |
| await self._ensure_ready() | |
| # Build desired body | |
| duck_messages = [] | |
| metadata = {} | |
| name = assistant_name or self.assistant_name | |
| if name or custom_instructions: | |
| customization = {} | |
| if name: | |
| customization["assistantName"] = name | |
| customization["shouldSeekClarity"] = False | |
| if custom_instructions: | |
| customization["customInstructions"] = custom_instructions | |
| metadata["customization"] = customization | |
| metadata["toolChoice"] = { | |
| "WebSearch": web_search, "NewsSearch": False, | |
| "VideosSearch": False, "LocalSearch": False, "WeatherForecast": False | |
| } | |
| for msg in messages: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| if role == "system": | |
| metadata.setdefault("customization", {})["customInstructions"] = content | |
| continue | |
| duck_messages.append({ | |
| "role": "assistant" if role == "assistant" else "user", | |
| "content": content | |
| }) | |
| if not duck_messages: | |
| duck_messages = [{"role": "user", "content": "Hello"}] | |
| desired_body = { | |
| "model": self.model, | |
| "messages": duck_messages, | |
| "canUseTools": web_search, | |
| "canUseApproxLocation": None, | |
| } | |
| if metadata: | |
| desired_body["metadata"] = metadata | |
| # Generate per-request RSA keypair | |
| private_key, public_key_jwk = _generate_rsa_keypair() | |
| # Acquire a worker from the pool (blocks if all busy) | |
| worker: PageWorker = await asyncio.wait_for(self._pool.get(), timeout=120) | |
| try: | |
| t0 = time.time() | |
| raw_body = await worker.do_chat(desired_body, private_key, public_key_jwk) | |
| elapsed = time.time() - t0 | |
| logger.info(f"Worker {worker.id} completed in {elapsed:.1f}s " | |
| f"(total: {worker.request_count})") | |
| except Exception as e: | |
| # Return worker even on failure | |
| self._pool.put_nowait(worker) | |
| raise | |
| else: | |
| # Return worker to pool | |
| self._pool.put_nowait(worker) | |
| # Parse SSE response | |
| async for event in self._parse_sse_text(raw_body, private_key): | |
| yield event | |
| async def _parse_sse_text(self, text: str, private_key) -> AsyncIterator[dict]: | |
| for line in text.split("\n"): | |
| line = line.strip() | |
| if not line or line.startswith(":") or line.startswith("event:"): | |
| continue | |
| if not line.startswith("data: "): | |
| continue | |
| data = line[6:] | |
| if data == "[DONE]": | |
| yield {"type": "done"} | |
| return | |
| try: | |
| parsed = json.loads(data) | |
| except json.JSONDecodeError: | |
| dec = _decrypt_data(private_key, data) | |
| if dec: | |
| try: | |
| parsed = json.loads(dec) | |
| except json.JSONDecodeError: | |
| yield {"type": "text", "data": dec} | |
| continue | |
| else: | |
| continue | |
| if "encryptedData" in parsed: | |
| dec = _decrypt_data(private_key, parsed["encryptedData"]) | |
| if dec: | |
| try: | |
| inner = json.loads(dec) | |
| yield {"type": "message", "data": inner} | |
| except json.JSONDecodeError: | |
| yield {"type": "text", "data": dec} | |
| continue | |
| action = parsed.get("action") | |
| role = parsed.get("role", "") | |
| # Tool invocation events (WebSearch, etc.) | |
| if role == "tool-invocation": | |
| state = parsed.get("state", "") | |
| if state == "call": | |
| yield {"type": "search_begin", "data": parsed} | |
| elif state == "result": | |
| yield {"type": "search_results", "data": parsed} | |
| else: | |
| yield {"type": "event", "data": parsed} | |
| continue | |
| # Source/citation events from web search | |
| if role == "source": | |
| source = parsed.get("source", {}) | |
| yield {"type": "search_source", "data": source} | |
| continue | |
| if action in ("search_begin", "search_end"): | |
| yield {"type": action, "data": parsed} | |
| elif action in ("search_result", "search_results"): | |
| yield {"type": "search_results", "data": parsed} | |
| elif "message" in parsed: | |
| msg = parsed["message"] | |
| if isinstance(msg, str): | |
| yield {"type": "text", "data": msg} | |
| else: | |
| yield {"type": "event", "data": parsed} | |
| elif "data" in parsed: | |
| val = parsed["data"] | |
| if isinstance(val, str): | |
| dec = _decrypt_data(private_key, val) | |
| yield {"type": "text", "data": dec if dec else val} | |
| else: | |
| yield {"type": "event", "data": parsed} | |
| else: | |
| yield {"type": "event", "data": parsed} | |
| def pool_status(self) -> dict: | |
| """Return current pool utilization stats.""" | |
| if not self._ready: | |
| return {"ready": False} | |
| total = len(self._workers) | |
| available = self._pool.qsize() if self._pool else 0 | |
| return { | |
| "ready": True, | |
| "total_workers": total, | |
| "available_workers": available, | |
| "busy_workers": total - available, | |
| "worker_stats": [ | |
| {"id": w.id, "requests": w.request_count, "ready": w.ready} | |
| for w in self._workers | |
| ], | |
| } | |
| async def close(self): | |
| for w in self._workers: | |
| await w.close() | |
| self._workers.clear() | |
| try: | |
| if self._browser: | |
| await self._browser.close() | |
| if self._playwright: | |
| await self._playwright.stop() | |
| except: | |
| pass | |
| self._ready = False | |