"""Reverse pipeline planner — build_plan() from model spec and request metadata. Determines the endpoint URL, transport kind, pool/mode IDs, and timeout for a given operation. Does NOT execute anything — pure data transform. """ from typing import Any from app.control.model.spec import ModelSpec from app.dataplane.reverse.runtime.endpoint_table import ( CHAT, MEDIA_POST, WS_IMAGINE, ) from .types import ReversePlan, TransportKind # --------------------------------------------------------------------------- # Profile defaults (timeout / content-type per transport) # --------------------------------------------------------------------------- _DEFAULTS: dict[TransportKind, dict[str, Any]] = { TransportKind.HTTP_SSE: {"timeout_s": 120.0, "content_type": "application/json"}, TransportKind.HTTP_JSON: {"timeout_s": 30.0, "content_type": "application/json"}, TransportKind.WEBSOCKET: {"timeout_s": 300.0, "content_type": "application/json"}, TransportKind.GRPC_WEB: {"timeout_s": 15.0, "content_type": "application/grpc-web+proto"}, } # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def build_plan(spec: ModelSpec, request: dict[str, Any] | None = None) -> ReversePlan: """Produce a ReversePlan for the given model spec. ``request`` is the raw API request body — used to refine the plan (e.g. detect image-edit vs image-gen) but may be ``None``. """ endpoint, tkind = _resolve_endpoint(spec, request or {}) defaults = _DEFAULTS.get(tkind, _DEFAULTS[TransportKind.HTTP_JSON]) return ReversePlan( endpoint = endpoint, transport_kind = tkind, pool_candidates = spec.pool_candidates(), mode_id = int(spec.mode_id), timeout_s = defaults["timeout_s"], content_type = defaults["content_type"], ) # --------------------------------------------------------------------------- # Internal routing logic # --------------------------------------------------------------------------- def _resolve_endpoint( spec: ModelSpec, request: dict[str, Any], ) -> tuple[str, TransportKind]: """Determine (endpoint_url, transport_kind) for the given capability.""" if spec.is_chat(): return CHAT, TransportKind.HTTP_SSE if spec.is_image(): return WS_IMAGINE, TransportKind.WEBSOCKET if spec.is_image_edit(): return CHAT, TransportKind.HTTP_SSE if spec.is_video(): return MEDIA_POST, TransportKind.HTTP_JSON if spec.is_voice(): # LiveKit negotiation is HTTP JSON, actual voice is WebSocket. return CHAT, TransportKind.HTTP_SSE # Fallback: treat as chat. return CHAT, TransportKind.HTTP_SSE __all__ = ["build_plan"]