| import json |
| import uuid |
| import base64 |
| import asyncio |
| import logging |
| import os |
| import sys |
| import time |
| from typing import AsyncIterator, Optional |
|
|
| from playwright.async_api import async_playwright, Browser, BrowserContext, Page, Route |
| from cryptography.hazmat.primitives.asymmetric import rsa, padding |
| from cryptography.hazmat.primitives import hashes |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes |
| from cryptography.hazmat.backends import default_backend |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _virtual_display = None |
|
|
|
|
| def _ensure_display(): |
| """On Linux without DISPLAY, start a virtual display via Xvfb.""" |
| global _virtual_display |
| if _virtual_display is not None: |
| return |
| if sys.platform != "linux": |
| return |
| if os.environ.get("DISPLAY"): |
| return |
| try: |
| from pyvirtualdisplay import Display |
| _virtual_display = Display(visible=False, size=(1280, 800)) |
| _virtual_display.start() |
| logger.info(f"Started virtual display: {os.environ.get('DISPLAY')}") |
| except ImportError: |
| logger.warning( |
| "No DISPLAY and pyvirtualdisplay not installed. " |
| "Install with: pip install pyvirtualdisplay\n" |
| "Also install Xvfb: apt-get install xvfb" |
| ) |
| raise RuntimeError( |
| "Headless server requires pyvirtualdisplay + xvfb. " |
| "Install: pip install pyvirtualdisplay && apt-get install xvfb" |
| ) |
|
|
| DUCK_AI_BASE = "https://duck.ai" |
| CHAT_URL_PATTERN = "**/duckchat/v1/chat" |
| SEC_CH_UA = '"Not:A-Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"' |
| USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36" |
|
|
| DEFAULT_POOL_SIZE = 3 |
|
|
|
|
| def _int_to_base64url(n: int, length: int = None) -> str: |
| b = n.to_bytes((n.bit_length() + 7) // 8, byteorder='big') if n > 0 else b'\x00' |
| if length and len(b) < length: |
| b = b'\x00' * (length - len(b)) + b |
| return base64.urlsafe_b64encode(b).rstrip(b'=').decode('ascii') |
|
|
|
|
| def _rsa_public_key_to_jwk(public_key) -> dict: |
| pub_numbers = public_key.public_numbers() |
| n_bytes = (pub_numbers.n.bit_length() + 7) // 8 |
| return { |
| "alg": "RSA-OAEP-256", |
| "e": _int_to_base64url(pub_numbers.e), |
| "ext": True, |
| "key_ops": ["encrypt"], |
| "kty": "RSA", |
| "n": _int_to_base64url(pub_numbers.n, n_bytes), |
| "use": "enc" |
| } |
|
|
|
|
| def _generate_rsa_keypair(): |
| priv = rsa.generate_private_key( |
| public_exponent=65537, key_size=2048, backend=default_backend() |
| ) |
| jwk = _rsa_public_key_to_jwk(priv.public_key()) |
| return priv, jwk |
|
|
|
|
| def _decrypt_data(private_key, encrypted_b64: str) -> Optional[str]: |
| if not private_key: |
| return None |
| try: |
| raw = base64.b64decode(encrypted_b64) |
| enc_key = raw[:256] |
| rest = raw[256:] |
| aes_key = private_key.decrypt( |
| enc_key, |
| padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), |
| algorithm=hashes.SHA256(), label=None) |
| ) |
| iv, tag, ct = rest[:12], rest[-16:], rest[12:-16] |
| cipher = Cipher(algorithms.AES(aes_key), modes.GCM(iv, tag), backend=default_backend()) |
| dec = cipher.decryptor() |
| return (dec.update(ct) + dec.finalize()).decode('utf-8') |
| except Exception as e: |
| logger.debug(f"Decryption failed: {e}") |
| return None |
|
|
|
|
| |
| |
| |
| class PageWorker: |
| def __init__(self, worker_id: int, context: BrowserContext): |
| self.id = worker_id |
| self._context = context |
| self.page: Optional[Page] = None |
| self.ready = False |
| self.request_count = 0 |
|
|
| async def init(self): |
| """Create page, navigate to duck.ai, handle first-visit dialog.""" |
| self.page = await self._context.new_page() |
| await self.page.goto(DUCK_AI_BASE, wait_until="domcontentloaded", timeout=60000) |
| await self.page.wait_for_timeout(3000) |
|
|
| for selector in [ |
| "button:has-text('Get Started')", |
| "button:has-text('Continue')", |
| "button:has-text('I Agree')", |
| "button:has-text('Accept')", |
| ]: |
| try: |
| btn = self.page.locator(selector) |
| if await btn.count() > 0: |
| await btn.first.click(timeout=3000) |
| await self.page.wait_for_timeout(1000) |
| except: |
| pass |
|
|
| try: |
| await self.page.wait_for_selector("textarea:not([disabled])", timeout=15000) |
| except: |
| logger.warning(f"Worker {self.id}: textarea not enabled after 15s") |
|
|
| self.ready = True |
| logger.info(f"Worker {self.id} ready") |
|
|
| async def reinit(self): |
| """Recreate the page (e.g. after an error).""" |
| try: |
| if self.page: |
| await self.page.close() |
| except: |
| pass |
| self.page = None |
| self.ready = False |
| await self.init() |
|
|
| async def do_chat(self, desired_body: dict, private_key, public_key_jwk) -> str: |
| """ |
| Send a chat request through the page's own UI flow with route interception. |
| Returns the raw SSE response body text. |
| """ |
| response_data = {"body": None, "status": None, "error": None} |
|
|
| async def intercept_chat(route: Route): |
| try: |
| request = route.request |
| original_body = json.loads(request.post_data) if request.post_data else {} |
| durable = original_body.get("durableStream", {}) |
| durable["publicKey"] = public_key_jwk |
| final_body = {**desired_body, "durableStream": durable} |
|
|
| resp = await route.fetch(post_data=json.dumps(final_body)) |
| body_bytes = await resp.body() |
| response_data["status"] = resp.status |
| response_data["body"] = body_bytes.decode("utf-8", errors="replace") |
|
|
| await route.fulfill( |
| status=200, |
| content_type="text/event-stream", |
| body='data: {"action":"success"}\n\ndata: [DONE]\n\n' |
| ) |
| except Exception as e: |
| response_data["error"] = str(e) |
| logger.error(f"Worker {self.id} intercept error: {e}") |
| try: |
| await route.fulfill(status=200, content_type="text/event-stream", |
| body='data: [DONE]\n\n') |
| except: |
| pass |
|
|
| |
| try: |
| nc = self.page.locator("a:has-text('New Chat'), button:has-text('New Chat'), a[href='/']") |
| if await nc.count() > 0: |
| await nc.first.click(timeout=3000) |
| await self.page.wait_for_timeout(1000) |
| except: |
| pass |
|
|
| |
| try: |
| await self.page.wait_for_selector("textarea:not([disabled])", timeout=10000) |
| except: |
| await self.reinit() |
| await self.page.wait_for_selector("textarea:not([disabled])", timeout=10000) |
|
|
| await self.page.route(CHAT_URL_PATTERN, intercept_chat) |
|
|
| max_retries = 2 |
| for attempt in range(max_retries + 1): |
| response_data = {"body": None, "status": None, "error": None} |
|
|
| try: |
| textarea = self.page.locator("textarea:not([disabled])") |
| await textarea.first.click(timeout=5000) |
| msgs = desired_body.get("messages", []) |
| last_msg = msgs[-1]["content"] if msgs else "Hi" |
| await textarea.first.fill(last_msg[:50]) |
| await self.page.wait_for_timeout(200) |
| await textarea.first.press("Enter") |
|
|
| for _ in range(120): |
| await self.page.wait_for_timeout(500) |
| if response_data["body"] is not None or response_data["error"] is not None: |
| break |
| except Exception as e: |
| response_data["error"] = str(e) |
|
|
| try: |
| await self.page.unroute(CHAT_URL_PATTERN, intercept_chat) |
| except: |
| pass |
|
|
| if response_data["error"]: |
| logger.error(f"Worker {self.id} attempt {attempt+1}: {response_data['error']}") |
| if attempt < max_retries: |
| await self.reinit() |
| await self.page.route(CHAT_URL_PATTERN, intercept_chat) |
| continue |
| raise RuntimeError(f"Chat failed: {response_data['error']}") |
|
|
| status = response_data.get("status", 0) |
| body = response_data.get("body", "") |
|
|
| if status == 429 or (status == 418 and "ERR_BN_LIMIT" in body): |
| if attempt < max_retries: |
| logger.warning(f"Worker {self.id} rate limited, retrying...") |
| await asyncio.sleep(5) |
| await self.reinit() |
| await self.page.route(CHAT_URL_PATTERN, intercept_chat) |
| continue |
| raise RuntimeError(f"Rate limited: {body[:200]}") |
|
|
| if status != 200: |
| if attempt < max_retries: |
| logger.warning(f"Worker {self.id} error {status}, retrying...") |
| await self.reinit() |
| await self.page.route(CHAT_URL_PATTERN, intercept_chat) |
| continue |
| raise RuntimeError(f"Chat API error {status}: {body[:300]}") |
|
|
| self.request_count += 1 |
| return body |
|
|
| raise RuntimeError("Exhausted retries") |
|
|
| async def close(self): |
| try: |
| if self.page: |
| await self.page.close() |
| except: |
| pass |
|
|
|
|
| |
| |
| |
| class DuckAIClient: |
| def __init__(self, proxy: str = None, model: str = "claude-haiku-4-5", |
| assistant_name: str = None, system_prompt: str = None, |
| pool_size: int = DEFAULT_POOL_SIZE): |
| self.model = model |
| self.assistant_name = assistant_name |
| self.system_prompt = system_prompt |
| self.proxy = proxy |
| self.pool_size = pool_size |
| self._playwright = None |
| self._browser: Optional[Browser] = None |
| self._context: Optional[BrowserContext] = None |
| self._pool: Optional[asyncio.Queue] = None |
| self._workers: list[PageWorker] = [] |
| self._init_lock = asyncio.Lock() |
| self._ready = False |
|
|
| async def _ensure_ready(self): |
| if self._ready: |
| return |
| async with self._init_lock: |
| if self._ready: |
| return |
| _ensure_display() |
| logger.info(f"Launching browser (pool_size={self.pool_size})...") |
| self._playwright = await async_playwright().start() |
|
|
| launch_args = { |
| "headless": False, |
| "args": [ |
| "--disable-blink-features=AutomationControlled", |
| "--window-position=-9999,-9999", |
| "--no-sandbox", |
| ], |
| } |
| self._browser = await self._playwright.chromium.launch(**launch_args) |
|
|
| ctx_args = { |
| "user_agent": USER_AGENT, |
| "viewport": {"width": 1280, "height": 800}, |
| "locale": "en-US", |
| } |
| if self.proxy: |
| ctx_args["proxy"] = {"server": self.proxy} |
| self._context = await self._browser.new_context(**ctx_args) |
| await self._context.set_extra_http_headers({"sec-ch-ua": SEC_CH_UA}) |
|
|
| |
| self._pool = asyncio.Queue() |
| self._workers = [] |
| init_tasks = [] |
| for i in range(self.pool_size): |
| w = PageWorker(i, self._context) |
| self._workers.append(w) |
| init_tasks.append(w.init()) |
|
|
| |
| results = await asyncio.gather(*init_tasks, return_exceptions=True) |
| for i, res in enumerate(results): |
| if isinstance(res, Exception): |
| logger.error(f"Worker {i} init failed: {res}") |
| else: |
| self._pool.put_nowait(self._workers[i]) |
|
|
| ready_count = self._pool.qsize() |
| logger.info(f"Browser ready: {ready_count}/{self.pool_size} workers") |
| if ready_count == 0: |
| raise RuntimeError("No workers initialized successfully") |
| self._ready = True |
|
|
| async def chat_stream(self, messages: list, web_search: bool = True, |
| custom_instructions: str = None, |
| assistant_name: str = None) -> AsyncIterator[dict]: |
| await self._ensure_ready() |
|
|
| |
| duck_messages = [] |
| metadata = {} |
| name = assistant_name or self.assistant_name |
|
|
| if name or custom_instructions: |
| customization = {} |
| if name: |
| customization["assistantName"] = name |
| customization["shouldSeekClarity"] = False |
| if custom_instructions: |
| customization["customInstructions"] = custom_instructions |
| metadata["customization"] = customization |
|
|
| metadata["toolChoice"] = { |
| "WebSearch": web_search, "NewsSearch": False, |
| "VideosSearch": False, "LocalSearch": False, "WeatherForecast": False |
| } |
|
|
| for msg in messages: |
| role = msg.get("role", "user") |
| content = msg.get("content", "") |
| if role == "system": |
| metadata.setdefault("customization", {})["customInstructions"] = content |
| continue |
| duck_messages.append({ |
| "role": "assistant" if role == "assistant" else "user", |
| "content": content |
| }) |
|
|
| if not duck_messages: |
| duck_messages = [{"role": "user", "content": "Hello"}] |
|
|
| desired_body = { |
| "model": self.model, |
| "messages": duck_messages, |
| "canUseTools": web_search, |
| "canUseApproxLocation": None, |
| } |
| if metadata: |
| desired_body["metadata"] = metadata |
|
|
| |
| private_key, public_key_jwk = _generate_rsa_keypair() |
|
|
| |
| worker: PageWorker = await asyncio.wait_for(self._pool.get(), timeout=120) |
| try: |
| t0 = time.time() |
| raw_body = await worker.do_chat(desired_body, private_key, public_key_jwk) |
| elapsed = time.time() - t0 |
| logger.info(f"Worker {worker.id} completed in {elapsed:.1f}s " |
| f"(total: {worker.request_count})") |
| except Exception as e: |
| |
| self._pool.put_nowait(worker) |
| raise |
| else: |
| |
| self._pool.put_nowait(worker) |
|
|
| |
| async for event in self._parse_sse_text(raw_body, private_key): |
| yield event |
|
|
| async def _parse_sse_text(self, text: str, private_key) -> AsyncIterator[dict]: |
| for line in text.split("\n"): |
| line = line.strip() |
| if not line or line.startswith(":") or line.startswith("event:"): |
| continue |
| if not line.startswith("data: "): |
| continue |
| data = line[6:] |
| if data == "[DONE]": |
| yield {"type": "done"} |
| return |
|
|
| try: |
| parsed = json.loads(data) |
| except json.JSONDecodeError: |
| dec = _decrypt_data(private_key, data) |
| if dec: |
| try: |
| parsed = json.loads(dec) |
| except json.JSONDecodeError: |
| yield {"type": "text", "data": dec} |
| continue |
| else: |
| continue |
|
|
| if "encryptedData" in parsed: |
| dec = _decrypt_data(private_key, parsed["encryptedData"]) |
| if dec: |
| try: |
| inner = json.loads(dec) |
| yield {"type": "message", "data": inner} |
| except json.JSONDecodeError: |
| yield {"type": "text", "data": dec} |
| continue |
|
|
| action = parsed.get("action") |
| role = parsed.get("role", "") |
|
|
| |
| if role == "tool-invocation": |
| state = parsed.get("state", "") |
| if state == "call": |
| yield {"type": "search_begin", "data": parsed} |
| elif state == "result": |
| yield {"type": "search_results", "data": parsed} |
| else: |
| yield {"type": "event", "data": parsed} |
| continue |
|
|
| |
| if role == "source": |
| source = parsed.get("source", {}) |
| yield {"type": "search_source", "data": source} |
| continue |
|
|
| if action in ("search_begin", "search_end"): |
| yield {"type": action, "data": parsed} |
| elif action in ("search_result", "search_results"): |
| yield {"type": "search_results", "data": parsed} |
| elif "message" in parsed: |
| msg = parsed["message"] |
| if isinstance(msg, str): |
| yield {"type": "text", "data": msg} |
| else: |
| yield {"type": "event", "data": parsed} |
| elif "data" in parsed: |
| val = parsed["data"] |
| if isinstance(val, str): |
| dec = _decrypt_data(private_key, val) |
| yield {"type": "text", "data": dec if dec else val} |
| else: |
| yield {"type": "event", "data": parsed} |
| else: |
| yield {"type": "event", "data": parsed} |
|
|
| def pool_status(self) -> dict: |
| """Return current pool utilization stats.""" |
| if not self._ready: |
| return {"ready": False} |
| total = len(self._workers) |
| available = self._pool.qsize() if self._pool else 0 |
| return { |
| "ready": True, |
| "total_workers": total, |
| "available_workers": available, |
| "busy_workers": total - available, |
| "worker_stats": [ |
| {"id": w.id, "requests": w.request_count, "ready": w.ready} |
| for w in self._workers |
| ], |
| } |
|
|
| async def close(self): |
| for w in self._workers: |
| await w.close() |
| self._workers.clear() |
| try: |
| if self._browser: |
| await self._browser.close() |
| if self._playwright: |
| await self._playwright.stop() |
| except: |
| pass |
| self._ready = False |
|
|