Spaces:
Sleeping
Sleeping
| """Reverse pipeline planner — build_plan() from model spec and request metadata. | |
| Determines the endpoint URL, transport kind, pool/mode IDs, and timeout | |
| for a given operation. Does NOT execute anything — pure data transform. | |
| """ | |
| from typing import Any | |
| from app.control.model.spec import ModelSpec | |
| from app.dataplane.reverse.runtime.endpoint_table import ( | |
| CHAT, | |
| CONSOLE_RESPONSES, | |
| MEDIA_POST, | |
| WS_IMAGINE, | |
| ) | |
| from .types import ReversePlan, TransportKind | |
| # --------------------------------------------------------------------------- | |
| # Profile defaults (timeout / content-type per transport) | |
| # --------------------------------------------------------------------------- | |
| _DEFAULTS: dict[TransportKind, dict[str, Any]] = { | |
| TransportKind.HTTP_SSE: { | |
| "timeout_s": 120.0, | |
| "content_type": "application/json" | |
| }, | |
| TransportKind.HTTP_JSON: { | |
| "timeout_s": 30.0, | |
| "content_type": "application/json" | |
| }, | |
| TransportKind.WEBSOCKET: { | |
| "timeout_s": 300.0, | |
| "content_type": "application/json" | |
| }, | |
| TransportKind.GRPC_WEB: { | |
| "timeout_s": 15.0, | |
| "content_type": "application/grpc-web+proto" | |
| }, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| def build_plan(spec: ModelSpec, request: dict[str, Any] | None = None) -> ReversePlan: | |
| """Produce a ReversePlan for the given model spec. | |
| ``request`` is the raw API request body — used to refine the plan | |
| (e.g. detect image-edit vs image-gen) but may be ``None``. | |
| """ | |
| endpoint, tkind = _resolve_endpoint(spec, request or {}) | |
| defaults = _DEFAULTS.get(tkind, _DEFAULTS[TransportKind.HTTP_JSON]) | |
| return ReversePlan( | |
| endpoint=endpoint, | |
| transport_kind=tkind, | |
| pool_candidates=spec.pool_candidates(), | |
| mode_id=int(spec.mode_id), | |
| timeout_s=defaults["timeout_s"], | |
| content_type=defaults["content_type"], | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Internal routing logic | |
| # --------------------------------------------------------------------------- | |
| def _resolve_endpoint( | |
| spec: ModelSpec, | |
| request: dict[str, Any], | |
| ) -> tuple[str, TransportKind]: | |
| """Determine (endpoint_url, transport_kind) for the given capability.""" | |
| # Console models route through console.x.ai/v1/responses (OpenAI Responses API) | |
| if spec.is_console() and spec.is_chat(): | |
| # When stream=true the response is SSE; otherwise plain JSON. | |
| # Use HTTP_SSE as the default since both streaming and non-streaming | |
| # share the same content-type and the timeout profile is more | |
| # permissive (long-running responses). | |
| return CONSOLE_RESPONSES, TransportKind.HTTP_SSE | |
| if spec.is_chat(): | |
| return CHAT, TransportKind.HTTP_SSE | |
| if spec.is_image(): | |
| return WS_IMAGINE, TransportKind.WEBSOCKET | |
| if spec.is_image_edit(): | |
| return CHAT, TransportKind.HTTP_SSE | |
| if spec.is_video(): | |
| return MEDIA_POST, TransportKind.HTTP_JSON | |
| if spec.is_voice(): | |
| # LiveKit negotiation is HTTP JSON, actual voice is WebSocket. | |
| return CHAT, TransportKind.HTTP_SSE | |
| # Fallback: treat as chat. | |
| return CHAT, TransportKind.HTTP_SSE | |
| __all__ = ["build_plan"] | |