import json import uuid import base64 import asyncio import logging import os import sys import time from typing import AsyncIterator, Optional from playwright.async_api import async_playwright, Browser, BrowserContext, Page, Route from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend logger = logging.getLogger(__name__) _virtual_display = None def _ensure_display(): """On Linux without DISPLAY, start a virtual display via Xvfb.""" global _virtual_display if _virtual_display is not None: return if sys.platform != "linux": return # Windows/macOS always have a display if os.environ.get("DISPLAY"): return # Display already available try: from pyvirtualdisplay import Display _virtual_display = Display(visible=False, size=(1280, 800)) _virtual_display.start() logger.info(f"Started virtual display: {os.environ.get('DISPLAY')}") except ImportError: logger.warning( "No DISPLAY and pyvirtualdisplay not installed. " "Install with: pip install pyvirtualdisplay\n" "Also install Xvfb: apt-get install xvfb" ) raise RuntimeError( "Headless server requires pyvirtualdisplay + xvfb. " "Install: pip install pyvirtualdisplay && apt-get install xvfb" ) DUCK_AI_BASE = "https://duck.ai" CHAT_URL_PATTERN = "**/duckchat/v1/chat" SEC_CH_UA = '"Not:A-Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"' USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36" DEFAULT_POOL_SIZE = 3 def _int_to_base64url(n: int, length: int = None) -> str: b = n.to_bytes((n.bit_length() + 7) // 8, byteorder='big') if n > 0 else b'\x00' if length and len(b) < length: b = b'\x00' * (length - len(b)) + b return base64.urlsafe_b64encode(b).rstrip(b'=').decode('ascii') def _rsa_public_key_to_jwk(public_key) -> dict: pub_numbers = public_key.public_numbers() n_bytes = (pub_numbers.n.bit_length() + 7) // 8 return { "alg": "RSA-OAEP-256", "e": _int_to_base64url(pub_numbers.e), "ext": True, "key_ops": ["encrypt"], "kty": "RSA", "n": _int_to_base64url(pub_numbers.n, n_bytes), "use": "enc" } def _generate_rsa_keypair(): priv = rsa.generate_private_key( public_exponent=65537, key_size=2048, backend=default_backend() ) jwk = _rsa_public_key_to_jwk(priv.public_key()) return priv, jwk def _decrypt_data(private_key, encrypted_b64: str) -> Optional[str]: if not private_key: return None try: raw = base64.b64decode(encrypted_b64) enc_key = raw[:256] rest = raw[256:] aes_key = private_key.decrypt( enc_key, padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None) ) iv, tag, ct = rest[:12], rest[-16:], rest[12:-16] cipher = Cipher(algorithms.AES(aes_key), modes.GCM(iv, tag), backend=default_backend()) dec = cipher.decryptor() return (dec.update(ct) + dec.finalize()).decode('utf-8') except Exception as e: logger.debug(f"Decryption failed: {e}") return None # --------------------------------------------------------------------------- # PageWorker: one browser tab that can handle one chat request at a time # --------------------------------------------------------------------------- class PageWorker: def __init__(self, worker_id: int, context: BrowserContext): self.id = worker_id self._context = context self.page: Optional[Page] = None self.ready = False self.request_count = 0 async def init(self): """Create page, navigate to duck.ai, handle first-visit dialog.""" self.page = await self._context.new_page() await self.page.goto(DUCK_AI_BASE, wait_until="domcontentloaded", timeout=60000) await self.page.wait_for_timeout(3000) for selector in [ "button:has-text('Get Started')", "button:has-text('Continue')", "button:has-text('I Agree')", "button:has-text('Accept')", ]: try: btn = self.page.locator(selector) if await btn.count() > 0: await btn.first.click(timeout=3000) await self.page.wait_for_timeout(1000) except: pass try: await self.page.wait_for_selector("textarea:not([disabled])", timeout=15000) except: logger.warning(f"Worker {self.id}: textarea not enabled after 15s") self.ready = True logger.info(f"Worker {self.id} ready") async def reinit(self): """Recreate the page (e.g. after an error).""" try: if self.page: await self.page.close() except: pass self.page = None self.ready = False await self.init() async def do_chat(self, desired_body: dict, private_key, public_key_jwk) -> str: """ Send a chat request through the page's own UI flow with route interception. Returns the raw SSE response body text. """ response_data = {"body": None, "status": None, "error": None} async def intercept_chat(route: Route): try: request = route.request original_body = json.loads(request.post_data) if request.post_data else {} durable = original_body.get("durableStream", {}) durable["publicKey"] = public_key_jwk final_body = {**desired_body, "durableStream": durable} resp = await route.fetch(post_data=json.dumps(final_body)) body_bytes = await resp.body() response_data["status"] = resp.status response_data["body"] = body_bytes.decode("utf-8", errors="replace") await route.fulfill( status=200, content_type="text/event-stream", body='data: {"action":"success"}\n\ndata: [DONE]\n\n' ) except Exception as e: response_data["error"] = str(e) logger.error(f"Worker {self.id} intercept error: {e}") try: await route.fulfill(status=200, content_type="text/event-stream", body='data: [DONE]\n\n') except: pass # Click "New Chat" if available try: nc = self.page.locator("a:has-text('New Chat'), button:has-text('New Chat'), a[href='/']") if await nc.count() > 0: await nc.first.click(timeout=3000) await self.page.wait_for_timeout(1000) except: pass # Ensure textarea try: await self.page.wait_for_selector("textarea:not([disabled])", timeout=10000) except: await self.reinit() await self.page.wait_for_selector("textarea:not([disabled])", timeout=10000) await self.page.route(CHAT_URL_PATTERN, intercept_chat) max_retries = 2 for attempt in range(max_retries + 1): response_data = {"body": None, "status": None, "error": None} try: textarea = self.page.locator("textarea:not([disabled])") await textarea.first.click(timeout=5000) msgs = desired_body.get("messages", []) last_msg = msgs[-1]["content"] if msgs else "Hi" await textarea.first.fill(last_msg[:50]) await self.page.wait_for_timeout(200) await textarea.first.press("Enter") for _ in range(120): await self.page.wait_for_timeout(500) if response_data["body"] is not None or response_data["error"] is not None: break except Exception as e: response_data["error"] = str(e) try: await self.page.unroute(CHAT_URL_PATTERN, intercept_chat) except: pass if response_data["error"]: logger.error(f"Worker {self.id} attempt {attempt+1}: {response_data['error']}") if attempt < max_retries: await self.reinit() await self.page.route(CHAT_URL_PATTERN, intercept_chat) continue raise RuntimeError(f"Chat failed: {response_data['error']}") status = response_data.get("status", 0) body = response_data.get("body", "") if status == 429 or (status == 418 and "ERR_BN_LIMIT" in body): if attempt < max_retries: logger.warning(f"Worker {self.id} rate limited, retrying...") await asyncio.sleep(5) await self.reinit() await self.page.route(CHAT_URL_PATTERN, intercept_chat) continue raise RuntimeError(f"Rate limited: {body[:200]}") if status != 200: if attempt < max_retries: logger.warning(f"Worker {self.id} error {status}, retrying...") await self.reinit() await self.page.route(CHAT_URL_PATTERN, intercept_chat) continue raise RuntimeError(f"Chat API error {status}: {body[:300]}") self.request_count += 1 return body raise RuntimeError("Exhausted retries") async def close(self): try: if self.page: await self.page.close() except: pass # --------------------------------------------------------------------------- # DuckAIClient: manages browser + pool of PageWorkers # --------------------------------------------------------------------------- class DuckAIClient: def __init__(self, proxy: str = None, model: str = "claude-haiku-4-5", assistant_name: str = None, system_prompt: str = None, pool_size: int = DEFAULT_POOL_SIZE): self.model = model self.assistant_name = assistant_name self.system_prompt = system_prompt self.proxy = proxy self.pool_size = pool_size self._playwright = None self._browser: Optional[Browser] = None self._context: Optional[BrowserContext] = None self._pool: Optional[asyncio.Queue] = None self._workers: list[PageWorker] = [] self._init_lock = asyncio.Lock() self._ready = False async def _ensure_ready(self): if self._ready: return async with self._init_lock: if self._ready: return _ensure_display() logger.info(f"Launching browser (pool_size={self.pool_size})...") self._playwright = await async_playwright().start() launch_args = { "headless": False, "args": [ "--disable-blink-features=AutomationControlled", "--window-position=-9999,-9999", "--no-sandbox", ], } self._browser = await self._playwright.chromium.launch(**launch_args) ctx_args = { "user_agent": USER_AGENT, "viewport": {"width": 1280, "height": 800}, "locale": "en-US", } if self.proxy: ctx_args["proxy"] = {"server": self.proxy} self._context = await self._browser.new_context(**ctx_args) await self._context.set_extra_http_headers({"sec-ch-ua": SEC_CH_UA}) # Create page workers self._pool = asyncio.Queue() self._workers = [] init_tasks = [] for i in range(self.pool_size): w = PageWorker(i, self._context) self._workers.append(w) init_tasks.append(w.init()) # Initialize all workers in parallel results = await asyncio.gather(*init_tasks, return_exceptions=True) for i, res in enumerate(results): if isinstance(res, Exception): logger.error(f"Worker {i} init failed: {res}") else: self._pool.put_nowait(self._workers[i]) ready_count = self._pool.qsize() logger.info(f"Browser ready: {ready_count}/{self.pool_size} workers") if ready_count == 0: raise RuntimeError("No workers initialized successfully") self._ready = True async def chat_stream(self, messages: list, web_search: bool = True, custom_instructions: str = None, assistant_name: str = None) -> AsyncIterator[dict]: await self._ensure_ready() # Build desired body duck_messages = [] metadata = {} name = assistant_name or self.assistant_name if name or custom_instructions: customization = {} if name: customization["assistantName"] = name customization["shouldSeekClarity"] = False if custom_instructions: customization["customInstructions"] = custom_instructions metadata["customization"] = customization metadata["toolChoice"] = { "WebSearch": web_search, "NewsSearch": False, "VideosSearch": False, "LocalSearch": False, "WeatherForecast": False } for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") if role == "system": metadata.setdefault("customization", {})["customInstructions"] = content continue duck_messages.append({ "role": "assistant" if role == "assistant" else "user", "content": content }) if not duck_messages: duck_messages = [{"role": "user", "content": "Hello"}] desired_body = { "model": self.model, "messages": duck_messages, "canUseTools": web_search, "canUseApproxLocation": None, } if metadata: desired_body["metadata"] = metadata # Generate per-request RSA keypair private_key, public_key_jwk = _generate_rsa_keypair() # Acquire a worker from the pool (blocks if all busy) worker: PageWorker = await asyncio.wait_for(self._pool.get(), timeout=120) try: t0 = time.time() raw_body = await worker.do_chat(desired_body, private_key, public_key_jwk) elapsed = time.time() - t0 logger.info(f"Worker {worker.id} completed in {elapsed:.1f}s " f"(total: {worker.request_count})") except Exception as e: # Return worker even on failure self._pool.put_nowait(worker) raise else: # Return worker to pool self._pool.put_nowait(worker) # Parse SSE response async for event in self._parse_sse_text(raw_body, private_key): yield event async def _parse_sse_text(self, text: str, private_key) -> AsyncIterator[dict]: for line in text.split("\n"): line = line.strip() if not line or line.startswith(":") or line.startswith("event:"): continue if not line.startswith("data: "): continue data = line[6:] if data == "[DONE]": yield {"type": "done"} return try: parsed = json.loads(data) except json.JSONDecodeError: dec = _decrypt_data(private_key, data) if dec: try: parsed = json.loads(dec) except json.JSONDecodeError: yield {"type": "text", "data": dec} continue else: continue if "encryptedData" in parsed: dec = _decrypt_data(private_key, parsed["encryptedData"]) if dec: try: inner = json.loads(dec) yield {"type": "message", "data": inner} except json.JSONDecodeError: yield {"type": "text", "data": dec} continue action = parsed.get("action") role = parsed.get("role", "") # Tool invocation events (WebSearch, etc.) if role == "tool-invocation": state = parsed.get("state", "") if state == "call": yield {"type": "search_begin", "data": parsed} elif state == "result": yield {"type": "search_results", "data": parsed} else: yield {"type": "event", "data": parsed} continue # Source/citation events from web search if role == "source": source = parsed.get("source", {}) yield {"type": "search_source", "data": source} continue if action in ("search_begin", "search_end"): yield {"type": action, "data": parsed} elif action in ("search_result", "search_results"): yield {"type": "search_results", "data": parsed} elif "message" in parsed: msg = parsed["message"] if isinstance(msg, str): yield {"type": "text", "data": msg} else: yield {"type": "event", "data": parsed} elif "data" in parsed: val = parsed["data"] if isinstance(val, str): dec = _decrypt_data(private_key, val) yield {"type": "text", "data": dec if dec else val} else: yield {"type": "event", "data": parsed} else: yield {"type": "event", "data": parsed} def pool_status(self) -> dict: """Return current pool utilization stats.""" if not self._ready: return {"ready": False} total = len(self._workers) available = self._pool.qsize() if self._pool else 0 return { "ready": True, "total_workers": total, "available_workers": available, "busy_workers": total - available, "worker_stats": [ {"id": w.id, "requests": w.request_count, "ready": w.ready} for w in self._workers ], } async def close(self): for w in self._workers: await w.close() self._workers.clear() try: if self._browser: await self._browser.close() if self._playwright: await self._playwright.stop() except: pass self._ready = False