LS / cloudflare_provider.py
Adarshu07's picture
Update cloudflare_provider.py
596f075 verified
"""
╔═══════════════════════════════════════════════════════════════╗
β•‘ cloudflare_provider.py β•‘
β•‘ Cloudflare AI Playground β€” Reverse Engineered Provider β•‘
β•‘ β•‘
β•‘ Display modes (auto-detected, no config needed): β•‘
β•‘ XVFB_EXTERNAL=1 + DISPLAY=:99 β•‘
β•‘ β†’ entrypoint.sh already started Xvfb; just use DISPLAY β•‘
β•‘ VR_DISPLAY=1 β•‘
β•‘ β†’ pyvirtualdisplay starts its own Xvfb (dev/local use) β•‘
β•‘ Neither β•‘
β•‘ β†’ direct Python WS only; browser fallback disabled β•‘
β•‘ β•‘
β•‘ NOTE: Chrome --headless is intentionally NEVER used. β•‘
β•‘ Cloudflare blocks headless user agents. β•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•
"""
import atexit
import json
import os
import sys
import time
import uuid
import random
import string
import threading
from pathlib import Path
from typing import Generator, Optional
# ═══════════════════════════════════════════════════════════
# Β§1 β€” AUTO INSTALL
# ═══════════════════════════════════════════════════════════
def _install(pkg, pip_name=None):
try:
__import__(pkg)
except ImportError:
import subprocess
subprocess.check_call(
[sys.executable, "-m", "pip", "install", "-q", pip_name or pkg]
)
_install("websocket", "websocket-client")
import websocket as _ws_mod
_HAS_BROWSER = False
try:
from DrissionPage import ChromiumPage, ChromiumOptions
_HAS_BROWSER = True
except Exception:
try:
_install("DrissionPage")
from DrissionPage import ChromiumPage, ChromiumOptions
_HAS_BROWSER = True
except Exception as _br_err:
print(f"[cloudflare] ⚠ DrissionPage unavailable: {_br_err}", file=sys.stderr)
# ═══════════════════════════════════════════════════════════
# Β§1b β€” DISPLAY DETECTION
#
# Three modes, detected in priority order:
# 1. XVFB_EXTERNAL=1 β†’ entrypoint.sh manages Xvfb, DISPLAY is set
# 2. VR_DISPLAY=1 β†’ we spawn pyvirtualdisplay ourselves
# 3. DISPLAY is set β†’ assume a real display exists (local dev)
# ═══════════════════════════════════════════════════════════
def _parse_bool_env(key: str, default: bool = False) -> bool:
val = os.environ.get(key, "").strip().lower()
if not val:
return default
return val in ("1", "true", "yes", "on", "enable", "enabled")
XVFB_EXTERNAL = _parse_bool_env("XVFB_EXTERNAL", default=False)
VR_DISPLAY = _parse_bool_env("VR_DISPLAY", default=False)
# Is any display available?
_DISPLAY_ENV = os.environ.get("DISPLAY", "").strip()
HAS_DISPLAY = bool(_DISPLAY_ENV) or XVFB_EXTERNAL
_HAS_VIRTUAL_DISPLAY = False
_Display = None
if VR_DISPLAY and not XVFB_EXTERNAL:
try:
_install("pyvirtualdisplay", "PyVirtualDisplay")
from pyvirtualdisplay import Display as _Display
_HAS_VIRTUAL_DISPLAY = True
except Exception as _vd_err:
print(
f"[cloudflare] ⚠ pyvirtualdisplay failed: {_vd_err}",
file=sys.stderr, flush=True,
)
# ═══════════════════════════════════════════════════════════
# Β§2 β€” CONSTANTS
# ═══════════════════════════════════════════════════════════
_SITE = "https://playground.ai.cloudflare.com"
_WS_BASE = "wss://playground.ai.cloudflare.com/agents/playground"
_CACHE = Path(__file__).resolve().parent / "cache"
_MFILE = _CACHE / "cloudflare_models.json"
_CHARS = string.ascii_letters + string.digits
_LOWER = string.ascii_lowercase + string.digits
_UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/136.0.0.0 Safari/537.36"
)
_CACHE_TTL = 6 * 3600 # 6 hours
# ═══════════════════════════════════════════════════════════
# Β§3 β€” MODEL TABLE
# ═══════════════════════════════════════════════════════════
_SHORT_TO_FULL: dict[str, str] = {
"gpt-oss-120b": "@cf/openai/gpt-oss-120b",
"gpt-oss-20b": "@cf/openai/gpt-oss-20b",
"qwen1.5-0.5b-chat": "@cf/qwen/qwen1.5-0.5b-chat",
"qwen1.5-1.8b-chat": "@cf/qwen/qwen1.5-1.8b-chat",
"qwen1.5-7b-chat-awq": "@cf/qwen/qwen1.5-7b-chat-awq",
"qwen1.5-14b-chat-awq": "@cf/qwen/qwen1.5-14b-chat-awq",
"qwen2.5-coder-32b-instruct": "@cf/qwen/qwen2.5-coder-32b-instruct",
"qwq-32b": "@cf/qwen/qwq-32b",
"qwen3-30b-a3b-fp8": "@cf/qwen/qwen3-30b-a3b-fp8",
"gemma-2b-it-lora": "@cf/google/gemma-2b-it-lora",
"gemma-7b-it-lora": "@cf/google/gemma-7b-it-lora",
"gemma-3-12b-it": "@cf/google/gemma-3-12b-it",
"gemma-7b-it": "@hf/google/gemma-7b-it",
"starling-lm-7b-beta": "@hf/nexusflow/starling-lm-7b-beta",
"llama-3-8b-instruct": "@cf/meta/llama-3-8b-instruct",
"llama-3-8b-instruct-awq": "@cf/meta/llama-3-8b-instruct-awq",
"llama-3.2-3b-instruct": "@cf/meta/llama-3.2-3b-instruct",
"llama-3.2-1b-instruct": "@cf/meta/llama-3.2-1b-instruct",
"llama-3.2-11b-vision-instruct": "@cf/meta/llama-3.2-11b-vision-instruct",
"llama-3.3-70b-instruct-fp8-fast": "@cf/meta/llama-3.3-70b-instruct-fp8-fast",
"llama-3.1-8b-instruct-fp8": "@cf/meta/llama-3.1-8b-instruct-fp8",
"llama-3.1-8b-instruct-awq": "@cf/meta/llama-3.1-8b-instruct-awq",
"llama-3.1-70b-instruct": "@cf/meta/llama-3.1-70b-instruct",
"llama-4-scout-17b-16e-instruct": "@cf/meta/llama-4-scout-17b-16e-instruct",
"llama-2-7b-chat-fp16": "@cf/meta/llama-2-7b-chat-fp16",
"llama-2-7b-chat-int8": "@cf/meta/llama-2-7b-chat-int8",
"llama-2-7b-chat-hf-lora": "@cf/meta-llama/llama-2-7b-chat-hf-lora",
"llama-guard-3-8b": "@cf/meta/llama-guard-3-8b",
"mistral-7b-instruct-v0.1": "@cf/mistral/mistral-7b-instruct-v0.1",
"mistral-7b-instruct-v0.2-lora": "@cf/mistral/mistral-7b-instruct-v0.2-lora",
"mistral-7b-instruct-v0.2": "@hf/mistral/mistral-7b-instruct-v0.2",
"mistral-7b-instruct-v0.1-awq": "@hf/thebloke/mistral-7b-instruct-v0.1-awq",
"mistral-small-3.1-24b-instruct": "@cf/mistralai/mistral-small-3.1-24b-instruct",
"deepseek-r1-distill-qwen-32b": "@cf/deepseek-ai/deepseek-r1-distill-qwen-32b",
"deepseek-math-7b-instruct": "@cf/deepseek-ai/deepseek-math-7b-instruct",
"deepseek-coder-6.7b-base-awq": "@hf/thebloke/deepseek-coder-6.7b-base-awq",
"deepseek-coder-6.7b-instruct-awq":"@hf/thebloke/deepseek-coder-6.7b-instruct-awq",
"tinyllama-1.1b-chat-v1.0": "@cf/tinyllama/tinyllama-1.1b-chat-v1.0",
"falcon-7b-instruct": "@cf/tiiuae/falcon-7b-instruct",
"hermes-2-pro-mistral-7b": "@hf/nousresearch/hermes-2-pro-mistral-7b",
"neural-chat-7b-v3-1-awq": "@hf/thebloke/neural-chat-7b-v3-1-awq",
"openhermes-2.5-mistral-7b-awq": "@hf/thebloke/openhermes-2.5-mistral-7b-awq",
"openchat-3.5-0106": "@cf/openchat/openchat-3.5-0106",
"llama-2-13b-chat-awq": "@hf/thebloke/llama-2-13b-chat-awq",
"zephyr-7b-beta-awq": "@hf/thebloke/zephyr-7b-beta-awq",
"discolm-german-7b-v1-awq": "@cf/thebloke/discolm-german-7b-v1-awq",
"una-cybertron-7b-v2-bf16": "@cf/fblgit/una-cybertron-7b-v2-bf16",
"sqlcoder-7b-2": "@cf/defog/sqlcoder-7b-2",
"phi-2": "@cf/microsoft/phi-2",
"nemotron-3-120b-a12b": "@cf/nvidia/nemotron-3-120b-a12b",
"gemma-sea-lion-v4-27b-it": "@cf/aisingapore/gemma-sea-lion-v4-27b-it",
"glm-4.7-flash": "@cf/zai-org/glm-4.7-flash",
"granite-4.0-h-micro": "@cf/ibm-granite/granite-4.0-h-micro",
"kimi-k2.5": "@cf/moonshotai/kimi-k2.5",
}
# ═══════════════════════════════════════════════════════════
# Β§4 β€” HELPERS
# ═══════════════════════════════════════════════════════════
def _rid(n=8):
return "".join(random.choices(_CHARS, k=n))
def _rid_lower(n=9):
return "".join(random.choices(_LOWER, k=n))
def _make_sid():
return "Cloudflare-AI-Playground-" + _rid(21)
def _make_pk():
return str(uuid.uuid4())
def _make_ws_url(sid, pk):
return f"{_WS_BASE}/{sid}?_pk={pk}"
def _asst_id():
return f"assistant_{int(time.time()*1000)}_{_rid_lower(9)}"
def _resolve_model(name: str) -> str:
if not name:
return name
if name.startswith("@cf/") or name.startswith("@hf/"):
return name
return _SHORT_TO_FULL.get(name, name)
# ═══════════════════════════════════════════════════════════
# Β§5 β€” CONVERTER + BUILDER
# ═══════════════════════════════════════════════════════════
class _Conv:
@staticmethod
def to_cf(msgs):
sys_p, out = "", []
for m in msgs:
r, c = m.get("role", ""), m.get("content", "")
if r == "system":
sys_p = c
elif r == "user":
out.append({
"role": "user",
"parts": [{"type": "text", "text": c}],
"id": _rid(16),
})
elif r == "assistant":
out.append({
"id": _asst_id(),
"role": "assistant",
"parts": [
{"type": "step-start"},
{"type": "text", "text": c, "state": "done"},
],
})
return sys_p, out
@staticmethod
def to_openai(cf_msgs, system=""):
out = []
if system:
out.append({"role": "system", "content": system})
for m in cf_msgs:
r = m.get("role", "")
t = next(
(p.get("text", "") for p in m.get("parts", [])
if p.get("type") == "text"),
"",
)
if r in ("user", "assistant") and t:
out.append({"role": r, "content": t})
return out
class _Build:
@staticmethod
def user(text):
return {
"role": "user",
"parts": [{"type": "text", "text": text}],
"id": _rid(16),
}
@staticmethod
def asst(text, reason=""):
p = [{"type": "step-start"}]
if reason:
p.append({"type": "reasoning", "text": reason, "state": "done"})
p.append({"type": "text", "text": text, "state": "done"})
return {"id": _asst_id(), "role": "assistant", "parts": p}
@staticmethod
def req(msgs):
return {
"id": _rid(8),
"init": {
"method": "POST",
"body": json.dumps({
"messages": msgs,
"trigger": "submit-message",
}, ensure_ascii=False),
},
"type": "cf_agent_use_chat_request",
}
# ═══════════════════════════════════════════════════════════
# Β§6 β€” MODEL CACHE (with TTL)
# ═══════════════════════════════════════════════════════════
class _Cache:
@staticmethod
def save(models):
_CACHE.mkdir(parents=True, exist_ok=True)
_MFILE.write_text(json.dumps({
"ts": time.time(),
"ts_human": time.strftime("%Y-%m-%d %H:%M:%S"),
"models": models,
}, indent=2, ensure_ascii=False))
@staticmethod
def load(ttl: int = _CACHE_TTL):
if not _MFILE.exists():
return None
try:
data = json.loads(_MFILE.read_text())
if time.time() - data.get("ts", 0) > ttl:
return None # stale
return data.get("models")
except Exception:
return None
@staticmethod
def clear():
if _MFILE.exists():
_MFILE.unlink()
# ═══════════════════════════════════════════════════════════
# Β§6b β€” VIRTUAL DISPLAY MANAGER
# Only used when VR_DISPLAY=1 AND XVFB_EXTERNAL=0.
# When XVFB_EXTERNAL=1, the entrypoint.sh manages Xvfb.
# ═══════════════════════════════════════════════════════════
class _VirtualDisplayManager:
_instance = None
_lock = threading.Lock()
def __init__(self):
self._display = None
self._running = False
self._enabled = VR_DISPLAY and _HAS_VIRTUAL_DISPLAY and not XVFB_EXTERNAL
@classmethod
def instance(cls) -> "_VirtualDisplayManager":
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
@property
def enabled(self) -> bool:
return self._enabled
@property
def running(self) -> bool:
# If external Xvfb is managing the display, report as "running"
return self._running or XVFB_EXTERNAL
def start(self, width=1920, height=1080, depth=24):
if XVFB_EXTERNAL:
_log_vd("External Xvfb detected (XVFB_EXTERNAL=1) β€” skipping pyvirtualdisplay")
return
if not self._enabled:
return
if self._running:
return
with self._lock:
if self._running:
return
try:
self._display = _Display(
visible=False,
size=(width, height),
color_depth=depth,
backend="xvfb",
)
self._display.start()
self._running = True
# Make sure DISPLAY env is set for child processes
if not os.environ.get("DISPLAY"):
os.environ["DISPLAY"] = f":{self._display.display}"
_log_vd(f"βœ“ Started pyvirtualdisplay on DISPLAY={os.environ.get('DISPLAY')}")
except Exception as exc:
_log_vd(f"βœ— pyvirtualdisplay failed: {exc}")
self._enabled = False
def stop(self):
if XVFB_EXTERNAL or not self._running:
return
with self._lock:
if not self._running:
return
try:
if self._display:
self._display.stop()
_log_vd("βœ“ Virtual display stopped")
except Exception as exc:
_log_vd(f"⚠ Error stopping virtual display: {exc}")
finally:
self._display = None
self._running = False
def __repr__(self):
if XVFB_EXTERNAL:
return f"VirtualDisplay(external DISPLAY={_DISPLAY_ENV})"
state = "running" if self._running else ("idle" if self._enabled else "disabled")
return f"VirtualDisplay({state})"
def _log_vd(msg: str):
print(f"[cloudflare:vdisplay] {msg}", file=sys.stderr, flush=True)
# ═══════════════════════════════════════════════════════════
# Β§7 β€” TRANSPORT LAYER
# ═══════════════════════════════════════════════════════════
# ── 7a: Direct Python WebSocket ──────────────────────────
class _DirectTransport:
def __init__(self, debug=False):
self._ws = None
self._inbox = []
self._lock = threading.Lock()
self._running = False
self._thread = None
self._debug = debug
def connect(self, url: str, cookies: str = "") -> bool:
self._ws = _ws_mod.WebSocket()
headers = [f"User-Agent: {_UA}"]
if cookies:
headers.append(f"Cookie: {cookies}")
self._ws.connect(
url,
origin = _SITE,
header = headers,
timeout = 15,
)
self._running = True
self._thread = threading.Thread(target=self._loop, daemon=True)
self._thread.start()
return True
def _loop(self):
self._ws.settimeout(0.05)
while self._running:
try:
data = self._ws.recv()
if data:
with self._lock:
self._inbox.append(data)
except _ws_mod.WebSocketTimeoutException:
continue
except _ws_mod.WebSocketConnectionClosedException:
self._running = False
break
except Exception:
if self._running:
self._running = False
break
def send(self, data: str) -> bool:
try:
if self._ws and self._ws.connected:
self._ws.send(data)
return True
except Exception:
pass
return False
def recv(self) -> list[str]:
with self._lock:
msgs = self._inbox[:]
self._inbox.clear()
return msgs
@property
def alive(self) -> bool:
return self._running and self._ws is not None and self._ws.connected
def close(self):
self._running = False
if self._ws:
try:
self._ws.close()
except Exception:
pass
self._ws = None
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2)
self._thread = None
# ── 7b: Browser WebSocket (Xvfb display required) ────────
_BROWSER_JS = """
(function(){
if(window.__cfws) return 'exists';
window.__cfws = {
sock:null, alive:false, inbox:[], error:null,
connect:function(u){
var s=this; s.error=null; s.alive=false; s.inbox=[];
s.sock=new WebSocket(u);
s.sock.onopen=function(){s.alive=true;s.error=null};
s.sock.onmessage=function(e){s.inbox.push(e.data)};
s.sock.onerror=function(){s.error='ws_error'};
s.sock.onclose=function(e){
s.alive=false;
if(e.code!==1000&&e.code!==1005)s.error='closed_'+e.code
};
},
send:function(d){
if(this.sock&&this.sock.readyState===1){
this.sock.send(typeof d==='string'?d:JSON.stringify(d));
return true}return false},
drain:function(){
if(!this.inbox.length)return null;
var r=JSON.stringify(this.inbox);this.inbox=[];return r},
kill:function(){if(this.sock)this.sock.close();this.alive=false}
};
return 'ok';
})();
"""
class _BrowserTransport:
"""
Chrome WebSocket relay.
Display requirements (in priority order):
1. XVFB_EXTERNAL=1 β€” entrypoint.sh started Xvfb, DISPLAY=:99 is set
2. VR_DISPLAY=1 β€” pyvirtualdisplay spawns its own Xvfb
3. DISPLAY env var β€” any real display (local dev)
Chrome --headless is NEVER used. Cloudflare blocks headless agents.
"""
def __init__(self, debug=False):
self._page = None
self._debug = debug
self._vd_mgr = _VirtualDisplayManager.instance()
def _ensure_display(self):
"""Make sure a display is available before launching Chrome."""
display = os.environ.get("DISPLAY", "")
if display:
# Display already set (external Xvfb or local X11)
_log_vd(f"Using existing display DISPLAY={display}")
return
# Try to start pyvirtualdisplay
if self._vd_mgr.enabled:
self._vd_mgr.start()
display = os.environ.get("DISPLAY", "")
if display:
return
raise RuntimeError(
"No X display available for Chrome.\n"
" On servers: set XVFB_EXTERNAL=1 and start Xvfb before this process,\n"
" or set VR_DISPLAY=1 to let pyvirtualdisplay manage Xvfb.\n"
" Headless Chrome is intentionally disabled (Cloudflare blocks it)."
)
def connect(self, url: str, **_) -> bool:
if not _HAS_BROWSER:
raise RuntimeError(
"DrissionPage not installed β€” cannot use browser fallback.\n"
"Install: pip install DrissionPage"
)
self._ensure_display()
opts = ChromiumOptions()
opts.set_argument("--disable-blink-features=AutomationControlled")
opts.set_argument("--no-sandbox")
opts.set_argument("--disable-dev-shm-usage")
opts.set_argument("--disable-gpu")
opts.set_argument("--disable-extensions")
opts.set_argument("--disable-infobars")
opts.set_argument("--window-size=1280,720")
opts.set_argument("--disable-background-networking")
opts.set_argument("--disable-sync")
opts.set_argument("--metrics-recording-only")
opts.set_argument("--mute-audio")
# ── NO --headless flag β€” Cloudflare blocks headless ──
_log_vd(f"Launching Chrome on DISPLAY={os.environ.get('DISPLAY', 'unset')}")
try:
self._page = ChromiumPage(addr_or_opts=opts)
except Exception as e:
raise RuntimeError(f"Chrome failed to launch: {e}") from e
self._page.get(_SITE)
time.sleep(4)
self._page.run_js(_BROWSER_JS)
self._page.run_js(f"window.__cfws.connect('{url}');")
deadline = time.time() + 20
while time.time() < deadline:
if self._page.run_js("return window.__cfws.alive;"):
_log_vd("βœ“ Browser WebSocket connected")
return True
err = self._page.run_js("return window.__cfws.error;")
if err:
raise ConnectionError(f"Browser WS failed: {err}")
time.sleep(0.15)
raise ConnectionError("Browser WS timed out waiting for open state")
def send(self, data: str) -> bool:
try:
return bool(
self._page.run_js(f"return window.__cfws.send({json.dumps(data)});")
)
except Exception:
return False
def recv(self) -> list[str]:
try:
raw = self._page.run_js("return window.__cfws.drain();")
except Exception:
return []
if not raw:
return []
try:
batch = json.loads(raw)
return batch if isinstance(batch, list) else []
except (json.JSONDecodeError, TypeError):
return []
@property
def alive(self) -> bool:
try:
return bool(self._page.run_js("return window.__cfws.alive;"))
except Exception:
return False
def close(self):
if self._page:
try:
self._page.run_js("if(window.__cfws) window.__cfws.kill();")
except Exception:
pass
try:
self._page.quit()
except Exception:
pass
self._page = None
# ═══════════════════════════════════════════════════════════
# Β§8 β€” PROVIDER
# ═══════════════════════════════════════════════════════════
class CloudflareProvider:
"""
☁️ Cloudflare AI Playground β€” fully modular provider.
Display modes (auto-detected):
β€’ XVFB_EXTERNAL=1 β€” entrypoint.sh manages Xvfb on DISPLAY=:99
β€’ VR_DISPLAY=1 β€” pyvirtualdisplay spawns Xvfb internally
β€’ DISPLAY set β€” use existing display (local dev)
"""
def __init__(
self,
model: str = "@cf/moonshotai/kimi-k2.5",
system: str = "You are a helpful assistant.",
temperature: float = 1.0,
max_tokens: int = None,
timeout_init: int = 120,
timeout_idle: int = 30,
use_cache: bool = True,
debug: bool = False,
):
self.model = _resolve_model(model)
self.system = system
self.temperature = temperature
self.max_tokens = max_tokens
self.timeout_init = timeout_init
self.timeout_idle = timeout_idle
self.use_cache = use_cache
self.debug = debug
self.history: list[dict] = []
self.models: list[dict] = []
self._chat_models: list[dict] = []
self.last_response: str = ""
self.last_reasoning: str = ""
self._sid: str = ""
self._pk: str = ""
self._transport = None
self._mode: str = ""
self._on: bool = False
self._boot()
atexit.register(self.close)
def _d(self, *a):
if self.debug:
print("[cloudflare]", *a, file=sys.stderr, flush=True)
def _pull(self) -> list[str]:
msgs = self._transport.recv()
if self.debug:
for m in msgs:
self._d("←", str(m)[:160])
return msgs
def _push(self, obj):
raw = json.dumps(obj, ensure_ascii=False)
self._d("β†’", raw[:300])
if not self._transport.send(raw):
raise RuntimeError("WebSocket send failed")
# ─────────────────────────────────────────────────
# Boot
# ─────────────────────────────────────────────────
def _boot(self):
self._sid = _make_sid()
self._pk = _make_pk()
url = _make_ws_url(self._sid, self._pk)
# ── Attempt 1: direct Python WS ─────────────
direct_err = None
try:
self._d("Trying direct Python WebSocket...")
t = _DirectTransport(debug=self.debug)
t.connect(url)
time.sleep(0.4)
if t.alive:
self._transport = t
self._mode = "direct"
self._d("βœ“ Direct WebSocket connected")
else:
t.close()
raise ConnectionError("WS not alive after connect")
except Exception as e:
direct_err = e
self._d(f"Direct failed: {e}")
# ── Attempt 2: Browser relay via Xvfb ───────
if self._transport is None:
browser_err = None
try:
self._d("Trying browser transport (Xvfb Chrome)...")
t = _BrowserTransport(debug=self.debug)
t.connect(url)
self._transport = t
self._mode = "browser"
self._d(f"βœ“ Browser transport connected (DISPLAY={os.environ.get('DISPLAY', '?')})")
except Exception as e:
browser_err = e
self._d(f"Browser failed: {e}")
if self._transport is None:
raise ConnectionError(
f"All connection methods failed.\n"
f" Direct: {direct_err}\n"
f" Browser: {browser_err}\n"
f" β†’ Check network connectivity and DISPLAY / XVFB_EXTERNAL env vars."
)
self._on = True
# ── Handshake ──────────────────────────────
want = {"cf_agent_identity", "cf_agent_state", "cf_agent_mcp_servers"}
seen = set()
deadline = time.time() + 12
while time.time() < deadline and seen < want:
for raw in self._pull():
try:
seen.add(json.loads(raw).get("type", ""))
except Exception:
pass
time.sleep(0.05)
self._d(f"Handshake received: {seen}")
self._push({"type": "cf_agent_stream_resume_request"})
time.sleep(0.3)
self._pull()
# ── Models ─────────────────────────────────
self._load_models()
if self.max_tokens is None:
self.max_tokens = self._ctx_window(self.model)
self._sync()
# ─────────────────────────────────────────────────
# Models
# ─────────────────────────────────────────────────
def _load_models(self):
if self.use_cache:
cached = _Cache.load()
if cached:
self.models = cached
self._chat_models = [
m for m in self.models
if m.get("task", {}).get("name") == "Text Generation"
]
self._d(f"Loaded {len(self._chat_models)} chat models from cache")
return
self._fetch_models()
if self.models and self.use_cache:
_Cache.save(self.models)
def _fetch_models(self):
rid = str(uuid.uuid4())
self._push({"args": [], "id": rid, "method": "getModels", "type": "rpc"})
deadline = time.time() + 15
while time.time() < deadline:
for raw in self._pull():
try:
d = json.loads(raw)
except Exception:
continue
if (d.get("type") == "rpc" and d.get("id") == rid
and d.get("done") and d.get("success")):
self.models = d.get("result", [])
self._chat_models = [
m for m in self.models
if m.get("task", {}).get("name") == "Text Generation"
]
self._d(f"Fetched {len(self._chat_models)} chat models")
return
time.sleep(0.05)
self._d("Warning: model fetch timed out")
def _ctx_window(self, model_name: str) -> int:
for m in self._chat_models:
if m.get("name") == model_name:
for p in m.get("properties", []):
if p.get("property_id") == "context_window":
try:
return int(p["value"])
except (ValueError, KeyError):
pass
return 4096
def _resolve(self, name: str) -> str:
if not name:
return name
if name.startswith("@cf/") or name.startswith("@hf/"):
return name
for m in self._chat_models:
full = m.get("name", "")
short = full.rsplit("/", 1)[-1]
if short == name or full == name:
return full
return _SHORT_TO_FULL.get(name, name)
def _sync(self):
self._push({
"type": "cf_agent_state",
"state": {
"model": self.model,
"temperature": self.temperature,
"stream": True,
"system": self.system,
"useExternalProvider": False,
"externalProvider": "openai",
"authMethod": "provider-key",
},
})
time.sleep(0.15)
self._pull()
# ─────────────────────────────────────────────────
# Setters
# ─────────────────────────────────────────────────
def set_model(self, name: str) -> "CloudflareProvider":
full = self._resolve(name)
self.model = full
self.max_tokens = self._ctx_window(full)
if self._on:
self._sync()
return self
def set_system(self, prompt: str) -> "CloudflareProvider":
self.system = prompt
if self._on:
self._sync()
return self
def set_temperature(self, t: float) -> "CloudflareProvider":
self.temperature = max(0.0, min(2.0, t))
if self._on:
self._sync()
return self
def set_max_tokens(self, n: int) -> "CloudflareProvider":
self.max_tokens = n
return self
def clear_history(self):
self.history.clear()
def get_history(self) -> list[dict]:
return _Conv.to_openai(self.history, self.system)
def list_models(self) -> list[dict]:
return [{
"name": m.get("name", ""),
"short": m.get("name", "").rsplit("/", 1)[-1],
"context": self._ctx_window(m.get("name", "")),
"active": m.get("name") == self.model,
} for m in self._chat_models]
def refresh_models(self):
_Cache.clear()
self._fetch_models()
if self.models and self.use_cache:
_Cache.save(self.models)
# ═══════════════════════════════════════════════════
# β˜… CHAT (streaming generator)
# ═══════════════════════════════════════════════════
def chat(
self,
data: str = None,
messages: list[dict] = None,
model: str = None,
temperature: float = None,
system: str = None,
max_tokens: int = None,
) -> Generator[str, None, None]:
if not self._on:
raise RuntimeError("Not connected")
if not messages and not data:
raise ValueError("Provide 'messages' or 'data'")
changed = False
if model:
full = self._resolve(model)
if full != self.model:
self.model = full
self.max_tokens = self._ctx_window(full)
changed = True
if temperature is not None and temperature != self.temperature:
self.temperature = max(0.0, min(2.0, temperature))
changed = True
if system and system != self.system:
self.system = system
changed = True
if max_tokens is not None:
self.max_tokens = max_tokens
if messages:
sys_p, cf_msgs = _Conv.to_cf(messages)
if sys_p and sys_p != self.system:
self.system = sys_p
changed = True
self.history = cf_msgs
else:
self.history.append(_Build.user(data))
if changed:
self._sync()
self._push(_Build.req(self.history))
full_text = ""
reasoning = ""
error = None
got_first = False
done = False
last_data = time.time()
reasoning_open = False
while not done:
if not self._transport.alive:
if not full_text:
yield "[Connection lost]\n"
break
msgs = self._pull()
if not msgs:
elapsed = time.time() - last_data
limit = self.timeout_idle if got_first else self.timeout_init
if elapsed > limit:
if not full_text:
yield "[Timeout β€” no response]\n"
break
time.sleep(0.015 if got_first else 0.04)
continue
last_data = time.time()
for raw in msgs:
try:
f = json.loads(raw)
except Exception:
continue
if f.get("type") != "cf_agent_use_chat_response":
continue
body_str = f.get("body", "")
if body_str:
try:
b = json.loads(body_str)
except Exception:
continue
bt = b.get("type", "")
if bt == "reasoning-start":
reasoning_open = True
got_first = True
yield "<think>\n"
elif bt == "reasoning-delta":
delta = b.get("delta", "")
if delta:
reasoning += delta
got_first = True
yield delta
elif bt == "reasoning-end":
if reasoning_open:
reasoning_open = False
yield "\n</think>\n\n"
elif bt == "text-delta":
delta = b.get("delta", "")
if reasoning_open:
reasoning_open = False
yield "\n</think>\n\n"
if delta:
full_text += delta
got_first = True
yield delta
elif bt == "error":
error = b.get("message", str(b))
if f.get("done", False):
done = True
break
if reasoning_open:
yield "\n</think>\n\n"
if error:
self._d("Server error:", error)
if not full_text:
yield f"\n[Error: {error}]\n"
if full_text:
self.history.append(_Build.asst(full_text, reasoning))
self.last_response = full_text
self.last_reasoning = reasoning
def ask(self, prompt: str, **kwargs) -> str:
return "".join(self.chat(data=prompt, **kwargs))
def new_session(self):
self._close_transport()
self.history.clear()
self._boot()
def _close_transport(self):
if self._transport:
try:
self._transport.close()
except Exception:
pass
self._transport = None
self._on = False
def close(self):
self._close_transport()
if self._mode == "browser" and not XVFB_EXTERNAL:
_VirtualDisplayManager.instance().stop()
self._d("Closed.")
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
def __del__(self):
try:
self.close()
except Exception:
pass
def __repr__(self):
s = "βœ…" if self._on else "❌"
vd = _VirtualDisplayManager.instance()
return (
f"CloudflareProvider({s} mode={self._mode!r} "
f"model={self.model!r} {vd})"
)
# ═══════════════════════════════════════════════════════════
# Β§9 β€” PROCESS-EXIT CLEANUP
# ═══════════════════════════════════════════════════════════
def _cleanup_virtual_display():
try:
if not XVFB_EXTERNAL:
_VirtualDisplayManager.instance().stop()
except Exception:
pass
atexit.register(_cleanup_virtual_display)