youbiaokachi's picture
Upload 46 files
c1ae554 verified
from __future__ import annotations
import json
import time
import uuid
from typing import Any, Dict, Optional
import requests
from .logging import logger
from .config import (
BRIDGE_BASE_URL,
FALLBACK_BRIDGE_URLS,
WARMUP_INIT_RETRIES,
WARMUP_INIT_DELAY_S,
WARMUP_REQUEST_RETRIES,
WARMUP_REQUEST_DELAY_S,
)
from .packets import packet_template
from .state import STATE, ensure_tool_ids
def bridge_send_stream(packet: Dict[str, Any]) -> Dict[str, Any]:
last_exc: Optional[Exception] = None
for base in FALLBACK_BRIDGE_URLS:
url = f"{base}/api/warp/send_stream"
try:
wrapped_packet = {"json_data": packet, "message_type": "warp.multi_agent.v1.Request"}
try:
logger.info("[OpenAI Compat] Bridge request URL: %s", url)
logger.info("[OpenAI Compat] Bridge request payload: %s", json.dumps(wrapped_packet, ensure_ascii=False))
except Exception:
logger.info("[OpenAI Compat] Bridge request payload serialization failed for URL %s", url)
r = requests.post(url, json=wrapped_packet, timeout=(5.0, 180.0))
if r.status_code == 200:
try:
logger.info("[OpenAI Compat] Bridge response (raw text): %s", r.text)
except Exception:
pass
return r.json()
else:
txt = r.text
last_exc = Exception(f"bridge_error: HTTP {r.status_code} {txt}")
except Exception as e:
last_exc = e
continue
if last_exc:
raise last_exc
raise Exception("bridge_unreachable")
def initialize_once() -> None:
if STATE.conversation_id:
return
ensure_tool_ids()
first_task_id = STATE.baseline_task_id or str(uuid.uuid4())
STATE.baseline_task_id = first_task_id
health_urls = [f"{base}/healthz" for base in FALLBACK_BRIDGE_URLS]
last_err: Optional[str] = None
for _ in range(WARMUP_INIT_RETRIES):
try:
ok = False
last_err = None
for h in health_urls:
try:
resp = requests.get(h, timeout=5.0)
if resp.status_code == 200:
ok = True
break
else:
last_err = f"HTTP {resp.status_code} at {h}"
except Exception as he:
last_err = f"{type(he).__name__}: {he} at {h}"
if ok:
break
except Exception as e:
last_err = str(e)
time.sleep(WARMUP_INIT_DELAY_S)
else:
raise RuntimeError(f"Bridge server not ready: {last_err}")
pkt = packet_template()
pkt["task_context"]["active_task_id"] = first_task_id
pkt["input"]["user_inputs"]["inputs"].append({"user_query": {"query": "warmup"}})
last_exc: Optional[Exception] = None
for attempt in range(1, WARMUP_REQUEST_RETRIES + 1):
try:
resp = bridge_send_stream(pkt)
break
except Exception as e:
last_exc = e
logger.warning(f"[OpenAI Compat] Warmup attempt {attempt}/{WARMUP_REQUEST_RETRIES} failed: {e}")
if attempt < WARMUP_REQUEST_RETRIES:
time.sleep(WARMUP_REQUEST_DELAY_S)
else:
raise
STATE.conversation_id = resp.get("conversation_id") or STATE.conversation_id
ret_task_id = resp.get("task_id")
if isinstance(ret_task_id, str) and ret_task_id:
STATE.baseline_task_id = ret_task_id