duck / duck_client.py
Spooker's picture
Upload 8 files
4d2e96d verified
import json
import uuid
import base64
import asyncio
import logging
import os
import sys
import time
from typing import AsyncIterator, Optional
from playwright.async_api import async_playwright, Browser, BrowserContext, Page, Route
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
logger = logging.getLogger(__name__)
_virtual_display = None
def _ensure_display():
"""On Linux without DISPLAY, start a virtual display via Xvfb."""
global _virtual_display
if _virtual_display is not None:
return
if sys.platform != "linux":
return # Windows/macOS always have a display
if os.environ.get("DISPLAY"):
return # Display already available
try:
from pyvirtualdisplay import Display
_virtual_display = Display(visible=False, size=(1280, 800))
_virtual_display.start()
logger.info(f"Started virtual display: {os.environ.get('DISPLAY')}")
except ImportError:
logger.warning(
"No DISPLAY and pyvirtualdisplay not installed. "
"Install with: pip install pyvirtualdisplay\n"
"Also install Xvfb: apt-get install xvfb"
)
raise RuntimeError(
"Headless server requires pyvirtualdisplay + xvfb. "
"Install: pip install pyvirtualdisplay && apt-get install xvfb"
)
DUCK_AI_BASE = "https://duck.ai"
CHAT_URL_PATTERN = "**/duckchat/v1/chat"
SEC_CH_UA = '"Not:A-Brand";v="99", "Google Chrome";v="133", "Chromium";v="133"'
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
DEFAULT_POOL_SIZE = 3
def _int_to_base64url(n: int, length: int = None) -> str:
b = n.to_bytes((n.bit_length() + 7) // 8, byteorder='big') if n > 0 else b'\x00'
if length and len(b) < length:
b = b'\x00' * (length - len(b)) + b
return base64.urlsafe_b64encode(b).rstrip(b'=').decode('ascii')
def _rsa_public_key_to_jwk(public_key) -> dict:
pub_numbers = public_key.public_numbers()
n_bytes = (pub_numbers.n.bit_length() + 7) // 8
return {
"alg": "RSA-OAEP-256",
"e": _int_to_base64url(pub_numbers.e),
"ext": True,
"key_ops": ["encrypt"],
"kty": "RSA",
"n": _int_to_base64url(pub_numbers.n, n_bytes),
"use": "enc"
}
def _generate_rsa_keypair():
priv = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
jwk = _rsa_public_key_to_jwk(priv.public_key())
return priv, jwk
def _decrypt_data(private_key, encrypted_b64: str) -> Optional[str]:
if not private_key:
return None
try:
raw = base64.b64decode(encrypted_b64)
enc_key = raw[:256]
rest = raw[256:]
aes_key = private_key.decrypt(
enc_key,
padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(), label=None)
)
iv, tag, ct = rest[:12], rest[-16:], rest[12:-16]
cipher = Cipher(algorithms.AES(aes_key), modes.GCM(iv, tag), backend=default_backend())
dec = cipher.decryptor()
return (dec.update(ct) + dec.finalize()).decode('utf-8')
except Exception as e:
logger.debug(f"Decryption failed: {e}")
return None
# ---------------------------------------------------------------------------
# PageWorker: one browser tab that can handle one chat request at a time
# ---------------------------------------------------------------------------
class PageWorker:
def __init__(self, worker_id: int, context: BrowserContext):
self.id = worker_id
self._context = context
self.page: Optional[Page] = None
self.ready = False
self.request_count = 0
async def init(self):
"""Create page, navigate to duck.ai, handle first-visit dialog."""
self.page = await self._context.new_page()
await self.page.goto(DUCK_AI_BASE, wait_until="domcontentloaded", timeout=60000)
await self.page.wait_for_timeout(3000)
for selector in [
"button:has-text('Get Started')",
"button:has-text('Continue')",
"button:has-text('I Agree')",
"button:has-text('Accept')",
]:
try:
btn = self.page.locator(selector)
if await btn.count() > 0:
await btn.first.click(timeout=3000)
await self.page.wait_for_timeout(1000)
except:
pass
try:
await self.page.wait_for_selector("textarea:not([disabled])", timeout=15000)
except:
logger.warning(f"Worker {self.id}: textarea not enabled after 15s")
self.ready = True
logger.info(f"Worker {self.id} ready")
async def reinit(self):
"""Recreate the page (e.g. after an error)."""
try:
if self.page:
await self.page.close()
except:
pass
self.page = None
self.ready = False
await self.init()
async def do_chat(self, desired_body: dict, private_key, public_key_jwk) -> str:
"""
Send a chat request through the page's own UI flow with route interception.
Returns the raw SSE response body text.
"""
response_data = {"body": None, "status": None, "error": None}
async def intercept_chat(route: Route):
try:
request = route.request
original_body = json.loads(request.post_data) if request.post_data else {}
durable = original_body.get("durableStream", {})
durable["publicKey"] = public_key_jwk
final_body = {**desired_body, "durableStream": durable}
resp = await route.fetch(post_data=json.dumps(final_body))
body_bytes = await resp.body()
response_data["status"] = resp.status
response_data["body"] = body_bytes.decode("utf-8", errors="replace")
await route.fulfill(
status=200,
content_type="text/event-stream",
body='data: {"action":"success"}\n\ndata: [DONE]\n\n'
)
except Exception as e:
response_data["error"] = str(e)
logger.error(f"Worker {self.id} intercept error: {e}")
try:
await route.fulfill(status=200, content_type="text/event-stream",
body='data: [DONE]\n\n')
except:
pass
# Click "New Chat" if available
try:
nc = self.page.locator("a:has-text('New Chat'), button:has-text('New Chat'), a[href='/']")
if await nc.count() > 0:
await nc.first.click(timeout=3000)
await self.page.wait_for_timeout(1000)
except:
pass
# Ensure textarea
try:
await self.page.wait_for_selector("textarea:not([disabled])", timeout=10000)
except:
await self.reinit()
await self.page.wait_for_selector("textarea:not([disabled])", timeout=10000)
await self.page.route(CHAT_URL_PATTERN, intercept_chat)
max_retries = 2
for attempt in range(max_retries + 1):
response_data = {"body": None, "status": None, "error": None}
try:
textarea = self.page.locator("textarea:not([disabled])")
await textarea.first.click(timeout=5000)
msgs = desired_body.get("messages", [])
last_msg = msgs[-1]["content"] if msgs else "Hi"
await textarea.first.fill(last_msg[:50])
await self.page.wait_for_timeout(200)
await textarea.first.press("Enter")
for _ in range(120):
await self.page.wait_for_timeout(500)
if response_data["body"] is not None or response_data["error"] is not None:
break
except Exception as e:
response_data["error"] = str(e)
try:
await self.page.unroute(CHAT_URL_PATTERN, intercept_chat)
except:
pass
if response_data["error"]:
logger.error(f"Worker {self.id} attempt {attempt+1}: {response_data['error']}")
if attempt < max_retries:
await self.reinit()
await self.page.route(CHAT_URL_PATTERN, intercept_chat)
continue
raise RuntimeError(f"Chat failed: {response_data['error']}")
status = response_data.get("status", 0)
body = response_data.get("body", "")
if status == 429 or (status == 418 and "ERR_BN_LIMIT" in body):
if attempt < max_retries:
logger.warning(f"Worker {self.id} rate limited, retrying...")
await asyncio.sleep(5)
await self.reinit()
await self.page.route(CHAT_URL_PATTERN, intercept_chat)
continue
raise RuntimeError(f"Rate limited: {body[:200]}")
if status != 200:
if attempt < max_retries:
logger.warning(f"Worker {self.id} error {status}, retrying...")
await self.reinit()
await self.page.route(CHAT_URL_PATTERN, intercept_chat)
continue
raise RuntimeError(f"Chat API error {status}: {body[:300]}")
self.request_count += 1
return body
raise RuntimeError("Exhausted retries")
async def close(self):
try:
if self.page:
await self.page.close()
except:
pass
# ---------------------------------------------------------------------------
# DuckAIClient: manages browser + pool of PageWorkers
# ---------------------------------------------------------------------------
class DuckAIClient:
def __init__(self, proxy: str = None, model: str = "claude-haiku-4-5",
assistant_name: str = None, system_prompt: str = None,
pool_size: int = DEFAULT_POOL_SIZE):
self.model = model
self.assistant_name = assistant_name
self.system_prompt = system_prompt
self.proxy = proxy
self.pool_size = pool_size
self._playwright = None
self._browser: Optional[Browser] = None
self._context: Optional[BrowserContext] = None
self._pool: Optional[asyncio.Queue] = None
self._workers: list[PageWorker] = []
self._init_lock = asyncio.Lock()
self._ready = False
async def _ensure_ready(self):
if self._ready:
return
async with self._init_lock:
if self._ready:
return
_ensure_display()
logger.info(f"Launching browser (pool_size={self.pool_size})...")
self._playwright = await async_playwright().start()
launch_args = {
"headless": False,
"args": [
"--disable-blink-features=AutomationControlled",
"--window-position=-9999,-9999",
"--no-sandbox",
],
}
self._browser = await self._playwright.chromium.launch(**launch_args)
ctx_args = {
"user_agent": USER_AGENT,
"viewport": {"width": 1280, "height": 800},
"locale": "en-US",
}
if self.proxy:
ctx_args["proxy"] = {"server": self.proxy}
self._context = await self._browser.new_context(**ctx_args)
await self._context.set_extra_http_headers({"sec-ch-ua": SEC_CH_UA})
# Create page workers
self._pool = asyncio.Queue()
self._workers = []
init_tasks = []
for i in range(self.pool_size):
w = PageWorker(i, self._context)
self._workers.append(w)
init_tasks.append(w.init())
# Initialize all workers in parallel
results = await asyncio.gather(*init_tasks, return_exceptions=True)
for i, res in enumerate(results):
if isinstance(res, Exception):
logger.error(f"Worker {i} init failed: {res}")
else:
self._pool.put_nowait(self._workers[i])
ready_count = self._pool.qsize()
logger.info(f"Browser ready: {ready_count}/{self.pool_size} workers")
if ready_count == 0:
raise RuntimeError("No workers initialized successfully")
self._ready = True
async def chat_stream(self, messages: list, web_search: bool = True,
custom_instructions: str = None,
assistant_name: str = None) -> AsyncIterator[dict]:
await self._ensure_ready()
# Build desired body
duck_messages = []
metadata = {}
name = assistant_name or self.assistant_name
if name or custom_instructions:
customization = {}
if name:
customization["assistantName"] = name
customization["shouldSeekClarity"] = False
if custom_instructions:
customization["customInstructions"] = custom_instructions
metadata["customization"] = customization
metadata["toolChoice"] = {
"WebSearch": web_search, "NewsSearch": False,
"VideosSearch": False, "LocalSearch": False, "WeatherForecast": False
}
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
if role == "system":
metadata.setdefault("customization", {})["customInstructions"] = content
continue
duck_messages.append({
"role": "assistant" if role == "assistant" else "user",
"content": content
})
if not duck_messages:
duck_messages = [{"role": "user", "content": "Hello"}]
desired_body = {
"model": self.model,
"messages": duck_messages,
"canUseTools": web_search,
"canUseApproxLocation": None,
}
if metadata:
desired_body["metadata"] = metadata
# Generate per-request RSA keypair
private_key, public_key_jwk = _generate_rsa_keypair()
# Acquire a worker from the pool (blocks if all busy)
worker: PageWorker = await asyncio.wait_for(self._pool.get(), timeout=120)
try:
t0 = time.time()
raw_body = await worker.do_chat(desired_body, private_key, public_key_jwk)
elapsed = time.time() - t0
logger.info(f"Worker {worker.id} completed in {elapsed:.1f}s "
f"(total: {worker.request_count})")
except Exception as e:
# Return worker even on failure
self._pool.put_nowait(worker)
raise
else:
# Return worker to pool
self._pool.put_nowait(worker)
# Parse SSE response
async for event in self._parse_sse_text(raw_body, private_key):
yield event
async def _parse_sse_text(self, text: str, private_key) -> AsyncIterator[dict]:
for line in text.split("\n"):
line = line.strip()
if not line or line.startswith(":") or line.startswith("event:"):
continue
if not line.startswith("data: "):
continue
data = line[6:]
if data == "[DONE]":
yield {"type": "done"}
return
try:
parsed = json.loads(data)
except json.JSONDecodeError:
dec = _decrypt_data(private_key, data)
if dec:
try:
parsed = json.loads(dec)
except json.JSONDecodeError:
yield {"type": "text", "data": dec}
continue
else:
continue
if "encryptedData" in parsed:
dec = _decrypt_data(private_key, parsed["encryptedData"])
if dec:
try:
inner = json.loads(dec)
yield {"type": "message", "data": inner}
except json.JSONDecodeError:
yield {"type": "text", "data": dec}
continue
action = parsed.get("action")
role = parsed.get("role", "")
# Tool invocation events (WebSearch, etc.)
if role == "tool-invocation":
state = parsed.get("state", "")
if state == "call":
yield {"type": "search_begin", "data": parsed}
elif state == "result":
yield {"type": "search_results", "data": parsed}
else:
yield {"type": "event", "data": parsed}
continue
# Source/citation events from web search
if role == "source":
source = parsed.get("source", {})
yield {"type": "search_source", "data": source}
continue
if action in ("search_begin", "search_end"):
yield {"type": action, "data": parsed}
elif action in ("search_result", "search_results"):
yield {"type": "search_results", "data": parsed}
elif "message" in parsed:
msg = parsed["message"]
if isinstance(msg, str):
yield {"type": "text", "data": msg}
else:
yield {"type": "event", "data": parsed}
elif "data" in parsed:
val = parsed["data"]
if isinstance(val, str):
dec = _decrypt_data(private_key, val)
yield {"type": "text", "data": dec if dec else val}
else:
yield {"type": "event", "data": parsed}
else:
yield {"type": "event", "data": parsed}
def pool_status(self) -> dict:
"""Return current pool utilization stats."""
if not self._ready:
return {"ready": False}
total = len(self._workers)
available = self._pool.qsize() if self._pool else 0
return {
"ready": True,
"total_workers": total,
"available_workers": available,
"busy_workers": total - available,
"worker_stats": [
{"id": w.id, "requests": w.request_count, "ready": w.ready}
for w in self._workers
],
}
async def close(self):
for w in self._workers:
await w.close()
self._workers.clear()
try:
if self._browser:
await self._browser.close()
if self._playwright:
await self._playwright.stop()
except:
pass
self._ready = False