grok2api / app /dataplane /reverse /planner.py
FUCAT's picture
Deploy grok2api to HF Spaces (Docker)
7e55e53
Raw
History Blame Contribute Delete
2.86 kB
"""Reverse pipeline planner — build_plan() from model spec and request metadata.
Determines the endpoint URL, transport kind, pool/mode IDs, and timeout
for a given operation. Does NOT execute anything — pure data transform.
"""
from typing import Any
from app.control.model.spec import ModelSpec
from app.dataplane.reverse.runtime.endpoint_table import (
CHAT, MEDIA_POST, WS_IMAGINE,
)
from .types import ReversePlan, TransportKind
# ---------------------------------------------------------------------------
# Profile defaults (timeout / content-type per transport)
# ---------------------------------------------------------------------------
_DEFAULTS: dict[TransportKind, dict[str, Any]] = {
TransportKind.HTTP_SSE: {"timeout_s": 120.0, "content_type": "application/json"},
TransportKind.HTTP_JSON: {"timeout_s": 30.0, "content_type": "application/json"},
TransportKind.WEBSOCKET: {"timeout_s": 300.0, "content_type": "application/json"},
TransportKind.GRPC_WEB: {"timeout_s": 15.0, "content_type": "application/grpc-web+proto"},
}
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def build_plan(spec: ModelSpec, request: dict[str, Any] | None = None) -> ReversePlan:
"""Produce a ReversePlan for the given model spec.
``request`` is the raw API request body — used to refine the plan
(e.g. detect image-edit vs image-gen) but may be ``None``.
"""
endpoint, tkind = _resolve_endpoint(spec, request or {})
defaults = _DEFAULTS.get(tkind, _DEFAULTS[TransportKind.HTTP_JSON])
return ReversePlan(
endpoint = endpoint,
transport_kind = tkind,
pool_candidates = spec.pool_candidates(),
mode_id = int(spec.mode_id),
timeout_s = defaults["timeout_s"],
content_type = defaults["content_type"],
)
# ---------------------------------------------------------------------------
# Internal routing logic
# ---------------------------------------------------------------------------
def _resolve_endpoint(
spec: ModelSpec,
request: dict[str, Any],
) -> tuple[str, TransportKind]:
"""Determine (endpoint_url, transport_kind) for the given capability."""
if spec.is_chat():
return CHAT, TransportKind.HTTP_SSE
if spec.is_image():
return WS_IMAGINE, TransportKind.WEBSOCKET
if spec.is_image_edit():
return CHAT, TransportKind.HTTP_SSE
if spec.is_video():
return MEDIA_POST, TransportKind.HTTP_JSON
if spec.is_voice():
# LiveKit negotiation is HTTP JSON, actual voice is WebSocket.
return CHAT, TransportKind.HTTP_SSE
# Fallback: treat as chat.
return CHAT, TransportKind.HTTP_SSE
__all__ = ["build_plan"]