opencode-ai
feat: cherry-pick PR #542 - console.x.ai routing + native tool calling
53a0992
"""Reverse pipeline planner — build_plan() from model spec and request metadata.
Determines the endpoint URL, transport kind, pool/mode IDs, and timeout
for a given operation. Does NOT execute anything — pure data transform.
"""
from typing import Any
from app.control.model.spec import ModelSpec
from app.dataplane.reverse.runtime.endpoint_table import (
CHAT,
CONSOLE_RESPONSES,
MEDIA_POST,
WS_IMAGINE,
)
from .types import ReversePlan, TransportKind
# ---------------------------------------------------------------------------
# Profile defaults (timeout / content-type per transport)
# ---------------------------------------------------------------------------
_DEFAULTS: dict[TransportKind, dict[str, Any]] = {
TransportKind.HTTP_SSE: {
"timeout_s": 120.0,
"content_type": "application/json"
},
TransportKind.HTTP_JSON: {
"timeout_s": 30.0,
"content_type": "application/json"
},
TransportKind.WEBSOCKET: {
"timeout_s": 300.0,
"content_type": "application/json"
},
TransportKind.GRPC_WEB: {
"timeout_s": 15.0,
"content_type": "application/grpc-web+proto"
},
}
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_plan(spec: ModelSpec, request: dict[str, Any] | None = None) -> ReversePlan:
"""Produce a ReversePlan for the given model spec.
``request`` is the raw API request body — used to refine the plan
(e.g. detect image-edit vs image-gen) but may be ``None``.
"""
endpoint, tkind = _resolve_endpoint(spec, request or {})
defaults = _DEFAULTS.get(tkind, _DEFAULTS[TransportKind.HTTP_JSON])
return ReversePlan(
endpoint=endpoint,
transport_kind=tkind,
pool_candidates=spec.pool_candidates(),
mode_id=int(spec.mode_id),
timeout_s=defaults["timeout_s"],
content_type=defaults["content_type"],
)
# ---------------------------------------------------------------------------
# Internal routing logic
# ---------------------------------------------------------------------------
def _resolve_endpoint(
spec: ModelSpec,
request: dict[str, Any],
) -> tuple[str, TransportKind]:
"""Determine (endpoint_url, transport_kind) for the given capability."""
# Console models route through console.x.ai/v1/responses (OpenAI Responses API)
if spec.is_console() and spec.is_chat():
# When stream=true the response is SSE; otherwise plain JSON.
# Use HTTP_SSE as the default since both streaming and non-streaming
# share the same content-type and the timeout profile is more
# permissive (long-running responses).
return CONSOLE_RESPONSES, TransportKind.HTTP_SSE
if spec.is_chat():
return CHAT, TransportKind.HTTP_SSE
if spec.is_image():
return WS_IMAGINE, TransportKind.WEBSOCKET
if spec.is_image_edit():
return CHAT, TransportKind.HTTP_SSE
if spec.is_video():
return MEDIA_POST, TransportKind.HTTP_JSON
if spec.is_voice():
# LiveKit negotiation is HTTP JSON, actual voice is WebSocket.
return CHAT, TransportKind.HTTP_SSE
# Fallback: treat as chat.
return CHAT, TransportKind.HTTP_SSE
__all__ = ["build_plan"]