opencode-ai
feat: cherry-pick PR #542 - console.x.ai routing + native tool calling
53a0992
"""Console API protocol β€” payload builder and response parser.
The ``console.x.ai/v1/responses`` endpoint shares SSO cookies with grok.com
but exposes the OpenAI Responses API directly. Free/basic accounts can call
all models (grok-4.3, grok-4.20-*, etc.) through this endpoint, bypassing
the tier restrictions of the grok.com web chat API.
The upstream API supports:
- Plain string input or structured input arrays (for multimodal / chat history)
- Native function calling via ``tools`` field
- Reasoning summary streaming
- SSE streaming with OpenAI Responses API event names
Request format (string input):
{"model": "grok-4.3", "input": "What is 1+1?", "stream": true}
Request format (structured input + tools):
{
"model": "grok-4.3",
"input": [
{"role": "user", "content": [
{"type": "input_text", "text": "What's the weather?"},
{"type": "input_image", "image_url": "https://...", "detail": "auto"}
]}
],
"tools": [
{"type": "function", "name": "get_weather",
"description": "...", "parameters": {...}}
],
"tool_choice": "auto"
}
Response output items (non-streaming):
- {"type": "reasoning", "summary": [{"type": "summary_text", "text": "..."}]}
- {"type": "message", "role": "assistant",
"content": [{"type": "output_text", "text": "...", "annotations": [...]}]}
- {"type": "function_call", "call_id": "...", "name": "...", "arguments": "..."}
"""
from typing import Any
import orjson
from app.platform.errors import UpstreamError
from app.platform.logging.logger import logger
# ---------------------------------------------------------------------------
# Input conversion (OpenAI Chat Completions β†’ console.x.ai input array)
# ---------------------------------------------------------------------------
def build_console_input(messages: list[dict[str, Any]], ) -> tuple[list[dict[str, Any]], str]:
"""Convert OpenAI Chat Completions messages β†’ console structured input.
Returns ``(input_array, instructions)``:
- ``input_array`` is the list passed as Responses API ``input`` field.
- ``instructions`` aggregates all ``role=system`` messages and is
passed via the separate Responses API ``instructions`` field for
better reasoning model behaviour.
Mapping rules:
- ``role=system`` β†’ folded into ``instructions``
- ``role=user/assistant`` β†’ preserved with content blocks converted
- Content block ``text`` β†’ ``{type: input_text/output_text, text}``
- Content block ``image_url`` β†’ ``{type: input_image, image_url, detail}``
- ``role=tool`` β†’ ``{type: function_call_output,
call_id, output}``
- ``role=assistant`` with ``tool_calls`` β†’ emit one ``function_call``
item per call before any accompanying text.
"""
instructions_parts: list[str] = []
output: list[dict[str, Any]] = []
for msg in messages:
role = msg.get("role") or "user"
content = msg.get("content")
tool_calls = msg.get("tool_calls")
# ── system β†’ instructions ────────────────────────────────────────
if role == "system":
if isinstance(content, str) and content.strip():
instructions_parts.append(content.strip())
elif isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text") or ""
if text.strip():
instructions_parts.append(text.strip())
continue
# ── tool result β†’ function_call_output ───────────────────────────
if role == "tool":
call_id = msg.get("tool_call_id") or ""
text = content if isinstance(content, str) else _flatten_text(content)
output.append({
"type": "function_call_output",
"call_id": call_id,
"output": text or "",
})
continue
# ── assistant with tool_calls β†’ function_call items ──────────────
if role == "assistant" and tool_calls:
for tc in tool_calls:
if not isinstance(tc, dict):
continue
fn = tc.get("function") or {}
output.append(
{
"type": "function_call",
"call_id": tc.get("id") or fn.get("name") or "",
"name": fn.get("name") or "",
"arguments": fn.get("arguments") or "{}",
})
# Trailing assistant text (rare) is emitted as a normal message
text = content if isinstance(content, str) else _flatten_text(content)
if text and text.strip():
output.append(
{
"role": "assistant",
"content": [{
"type": "output_text",
"text": text.strip()
}],
})
continue
# ── normal user / assistant message ──────────────────────────────
blocks = _convert_content_blocks(content, role)
if not blocks:
continue
output.append({"role": role, "content": blocks})
instructions = "\n\n".join(instructions_parts).strip()
return output, instructions
def _flatten_text(content: Any) -> str:
"""Flatten an OpenAI content array into a single text string."""
if isinstance(content, str):
return content
if not isinstance(content, list):
return ""
parts: list[str] = []
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") == "text":
text = block.get("text") or ""
if text:
parts.append(text)
return "\n".join(parts)
def _convert_content_blocks(
content: Any,
role: str,
) -> list[dict[str, Any]]:
"""Convert one OpenAI message content (str or array) β†’ console blocks."""
text_type = "output_text" if role == "assistant" else "input_text"
# Plain string content
if isinstance(content, str):
text = content.strip()
if not text:
return []
return [{"type": text_type, "text": text}]
# Already-structured array
if not isinstance(content, list):
return []
blocks: list[dict[str, Any]] = []
for block in content:
if not isinstance(block, dict):
continue
btype = block.get("type")
if btype == "text":
text = block.get("text") or ""
if text.strip():
blocks.append({"type": text_type, "text": text})
elif btype == "image_url":
inner = block.get("image_url") or {}
if isinstance(inner, str):
url, detail = inner, "auto"
else:
url = inner.get("url") or ""
detail = inner.get("detail") or "auto"
if url:
blocks.append({
"type": "input_image",
"image_url": url,
"detail": detail,
})
elif btype in ("input_text", "output_text", "input_image"):
# Already in console format β€” pass through
blocks.append(dict(block))
return blocks
# ---------------------------------------------------------------------------
# Tool format conversion
# ---------------------------------------------------------------------------
def convert_openai_tools_to_console(tools: list[dict[str, Any]] | None, ) -> list[dict[str, Any]]:
"""Convert OpenAI Chat Completions tools β†’ console (Responses API) tools.
OpenAI Chat Completions:
{"type": "function", "function": {"name", "description", "parameters"}}
Console (Responses API):
{"type": "function", "name", "description", "parameters"}
Already-flat tools are passed through (e.g. ``web_search`` server-side
tool, ``code_interpreter``, ``x_search`` etc.).
"""
if not tools:
return []
out: list[dict[str, Any]] = []
for t in tools:
if not isinstance(t, dict):
continue
if t.get("type") != "function":
# Pass through server-side tools (web_search, x_search, etc.)
out.append(dict(t))
continue
fn = t.get("function") if isinstance(t.get("function"), dict) else None
if fn is not None:
out.append(
{
"type": "function",
"name": fn.get("name") or "",
"description": fn.get("description") or "",
"parameters": fn.get("parameters") or {},
})
else:
# Already flat
out.append(dict(t))
return out
def convert_openai_tool_choice(tool_choice: Any) -> Any:
"""Convert OpenAI tool_choice β†’ console tool_choice.
OpenAI: "none" | "auto" | "required" | {"type":"function","function":{"name":"x"}}
Console: "none" | "auto" | "required" | {"type":"function","name":"x"}
"""
if isinstance(tool_choice, str):
return tool_choice
if isinstance(tool_choice, dict) and tool_choice.get("type") == "function":
fn = tool_choice.get("function") if isinstance(tool_choice.get("function"), dict) else None
if fn:
return {"type": "function", "name": fn.get("name") or ""}
return dict(tool_choice)
return tool_choice
# ---------------------------------------------------------------------------
# Payload builder
# ---------------------------------------------------------------------------
def build_console_payload(
*,
console_model: str,
input: Any,
instructions: str = "",
stream: bool = False,
temperature: float | None = None,
top_p: float | None = None,
reasoning_effort: str | None = None,
tools: list[dict[str, Any]] | None = None,
tool_choice: Any = None,
) -> dict[str, Any]:
"""Build the JSON payload for POST /v1/responses on console.x.ai.
``input`` may be a plain string or an array of structured input items
(use :func:`build_console_input` to convert OpenAI messages).
``tools`` should already be in console format (use
:func:`convert_openai_tools_to_console`).
"""
payload: dict[str, Any] = {
"model": console_model,
"input": input,
}
if stream:
payload["stream"] = True
if instructions:
payload["instructions"] = instructions
if temperature is not None:
payload["temperature"] = temperature
if top_p is not None:
payload["top_p"] = top_p
if reasoning_effort:
# Valid values: "minimal" | "low" | "medium" | "high"
payload["reasoning"] = {"effort": reasoning_effort}
if tools:
payload["tools"] = tools
if tool_choice is not None:
payload["tool_choice"] = tool_choice
if isinstance(input, str):
msg_repr = f"len={len(input)}"
elif isinstance(input, list):
msg_repr = f"items={len(input)}"
else:
msg_repr = "unknown"
logger.debug(
"console payload built: model={} stream={} input_{} tools={}",
console_model,
stream,
msg_repr,
len(tools) if tools else 0,
)
return payload
# ---------------------------------------------------------------------------
# Non-streaming response parsing
# ---------------------------------------------------------------------------
def extract_console_text(response_json: dict[str, Any]) -> str:
"""Extract the assistant's final text from a non-streaming response."""
output = response_json.get("output") or []
for item in output:
if not isinstance(item, dict):
continue
if item.get("type") != "message":
continue
contents = item.get("content") or []
for c in contents:
if not isinstance(c, dict):
continue
if c.get("type") == "output_text":
return c.get("text") or ""
return ""
def extract_console_reasoning(response_json: dict[str, Any]) -> str:
"""Extract reasoning summary text if present (non-streaming)."""
output = response_json.get("output") or []
for item in output:
if not isinstance(item, dict):
continue
if item.get("type") == "reasoning":
summary = item.get("summary") or []
parts: list[str] = []
for s in summary:
if isinstance(s, dict):
text = s.get("text") or s.get("content") or ""
if text:
parts.append(text)
elif isinstance(s, str):
parts.append(s)
return "\n".join(parts)
return ""
def extract_console_tool_calls(response_json: dict[str, Any], ) -> list[dict[str, Any]]:
"""Extract tool calls from a non-streaming response.
Returns a list of OpenAI Chat Completions tool_call dicts:
[{"id": "call_xxx", "type": "function",
"function": {"name": "...", "arguments": "..."}}]
Console responses include each tool call as a top-level output item
of type ``function_call`` with a ``call_id``, ``name`` and
JSON-serialised ``arguments`` string.
"""
output = response_json.get("output") or []
calls: list[dict[str, Any]] = []
for item in output:
if not isinstance(item, dict):
continue
if item.get("type") != "function_call":
continue
call_id = item.get("call_id") or item.get("id") or ""
calls.append(
{
"id": call_id,
"type": "function",
"function": {
"name": item.get("name") or "",
"arguments": item.get("arguments") or "{}",
},
})
return calls
def extract_console_search_sources(response_json: dict[str, Any], ) -> list[dict[str, Any]]:
"""Extract the search sources list from web_search_call output items.
Returns a deduplicated list of source dicts in the format used by the
existing grok.com path:
[{"url": "https://...", "title": ""}, ...]
Two upstream variants are handled:
1. Single-agent models (grok-4.3, grok-4.20-reasoning) emit a
``web_search_call`` output item per search with full sources:
``{"type": "search", "sources": [{"url": "..."}, ...]}``
or ``{"type": "open_page", "url": "..."}``.
2. Multi-agent models (grok-4.20-multi-agent) skip ``web_search_call``
items entirely and embed URLs only as document-level annotations on
the final assistant message with ``start_index == end_index == 0``.
We fall back to those annotation URLs so callers always see a
useful citation list regardless of the upstream emission format.
"""
seen: set[str] = set()
out: list[dict[str, Any]] = []
for item in response_json.get("output") or []:
if not isinstance(item, dict):
continue
if item.get("type") != "web_search_call":
continue
action = item.get("action") or {}
if not isinstance(action, dict):
continue
# Search action with sources list
for src in action.get("sources") or []:
if not isinstance(src, dict):
continue
url = src.get("url") or ""
if not url or url in seen:
continue
seen.add(url)
out.append({
"url": url,
"title": src.get("title") or "",
})
# Page-open action β€” single URL
if action.get("type") == "open_page":
url = action.get("url") or ""
if url and url not in seen:
seen.add(url)
out.append({"url": url, "title": ""})
# Fallback: harvest URLs from message annotations. Multi-agent
# responses publish citations only here. We dedupe against the
# web_search_call sources collected above so single-agent paths
# remain unchanged.
for item in response_json.get("output") or []:
if not isinstance(item, dict) or item.get("type") != "message":
continue
for content in item.get("content") or []:
if not isinstance(content, dict):
continue
for ann in content.get("annotations") or []:
if not isinstance(ann, dict):
continue
if ann.get("type") not in (None, "url_citation"):
continue
url = ann.get("url") or ""
if not url or url in seen:
continue
seen.add(url)
title = ann.get("title") or ""
# Multi-agent annotations sometimes set title=url; strip
# the duplicate so the source list reads cleanly.
if title == url:
title = ""
out.append({"url": url, "title": title})
return out
def inject_web_search_tool(tools: list[dict[str, Any]] | None, ) -> list[dict[str, Any]]:
"""Ensure a ``web_search`` tool is present in the console tools list.
If the user already supplied any ``web_search`` tool (with or without
options), it's left untouched. Otherwise a default ``{"type":
"web_search"}`` entry is appended. xAI charges $5/1000 calls for web
search; this is consumed from the account's prepaid (trial) credits.
"""
existing = list(tools or [])
for t in existing:
if isinstance(t, dict) and t.get("type") == "web_search":
return existing
existing.append({"type": "web_search"})
return existing
def extract_console_annotations(response_json: dict[str, Any], ) -> list[dict[str, Any]]:
"""Extract URL citation annotations from a non-streaming response.
Returns a flat list of citation dicts in chat-completions format:
[{"url": "...", "title": "...", "start_index": 0, "end_index": 0}]
"""
out: list[dict[str, Any]] = []
output = response_json.get("output") or []
for item in output:
if not isinstance(item, dict):
continue
if item.get("type") != "message":
continue
contents = item.get("content") or []
for c in contents:
if not isinstance(c, dict):
continue
anns = c.get("annotations") or []
for a in anns:
if not isinstance(a, dict):
continue
if a.get("type") not in (None, "url_citation"):
continue
url = a.get("url") or ""
if not url:
continue
out.append(
{
"url": url,
"title": a.get("title") or "",
"start_index": int(a.get("start_index") or 0),
"end_index": int(a.get("end_index") or 0),
})
return out
def extract_console_usage(response_json: dict[str, Any]) -> dict[str, int]:
"""Extract usage tokens from a non-streaming response."""
usage = response_json.get("usage") or {}
return {
"prompt_tokens": int(usage.get("input_tokens") or 0),
"completion_tokens": int(usage.get("output_tokens") or 0),
"total_tokens": int(usage.get("total_tokens") or 0),
"reasoning_tokens": int(
(usage.get("output_tokens_details") or {}).get("reasoning_tokens") or
usage.get("reasoning_tokens") or 0),
}
def parse_console_error(status_code: int, body: str) -> UpstreamError:
"""Convert a non-200 console response into an UpstreamError."""
message = f"Console upstream returned {status_code}"
try:
obj = orjson.loads(body) if body else {}
if isinstance(obj, dict):
err = obj.get("error") or obj.get("code") or ""
if isinstance(err, dict):
err = err.get("message") or err.get("code") or ""
if err:
message = f"{message}: {err}"
except (orjson.JSONDecodeError, ValueError, TypeError):
pass
return UpstreamError(message, status=status_code, body=body[:400])
# ---------------------------------------------------------------------------
# SSE streaming event parsing
# ---------------------------------------------------------------------------
def classify_console_sse_line(line: str | bytes) -> tuple[str, str]:
"""Return (kind, payload) for one raw SSE line.
kind:
- 'data' β€” SSE data line; payload is the JSON string
- 'event' β€” SSE event name line; payload is the event name
- 'skip' β€” comment / blank / unrecognized
"""
if isinstance(line, bytes):
line = line.decode("utf-8", "replace")
line = line.strip()
if not line:
return "skip", ""
if line.startswith("event:"):
return "event", line[6:].strip()
if line.startswith("data:"):
data = line[5:].strip()
return "data", data
if line.startswith("{"):
return "data", line
return "skip", ""
class ConsoleStreamAdapter:
"""Parse upstream Console SSE frames and emit text/reasoning/tool deltas.
The console.x.ai SSE protocol uses OpenAI Responses API event names:
- response.created
- response.output_item.added ← announces a new item
- response.content_part.added
- response.output_text.delta ← text chunks
- response.output_text.done
- response.reasoning_summary_text.delta ← reasoning chunks
- response.function_call_arguments.delta ← tool args streaming
- response.function_call_arguments.done ← tool args complete
- response.output_item.done ← completed item
- response.output_text.annotation.added ← citation annotation
- response.completed
- response.failed / response.cancelled / response.error
"""
__slots__ = (
"_current_event",
"_active_tool_index",
"_tool_args_buf",
"_seen_source_urls",
"tool_calls",
"annotations",
"search_sources",
"text_buf",
"thinking_buf",
"_usage",
)
def __init__(self) -> None:
self._current_event: str = ""
self._active_tool_index: dict[str, int] = {} # item_id β†’ index
self._tool_args_buf: dict[str, list[str]] = {} # item_id β†’ args chunks
self._seen_source_urls: set[str] = set()
self.tool_calls: list[dict[str, Any]] = []
self.annotations: list[dict[str, Any]] = []
self.search_sources: list[dict[str, Any]] = []
self.text_buf: list[str] = []
self.thinking_buf: list[str] = []
self._usage: dict[str, int] = {}
def feed_event(self, event_name: str) -> None:
"""Record the most recent ``event:`` name from the SSE stream."""
self._current_event = event_name
def feed_data(self, data: str) -> dict[str, Any]:
"""Parse one SSE data frame; return the kind/content classification.
Returns a dict like:
{"kind": "text", "content": "Two"}
{"kind": "thinking", "content": "Let me think..."}
{"kind": "tool_call_start", "index": 0, "call_id": "...", "name": "..."}
{"kind": "tool_call_args", "index": 0, "delta": "..."}
{"kind": "tool_call_done", "index": 0}
{"kind": "annotation", "annotation_data": {...}}
{"kind": "done"}
{"kind": "error", "message": "..."}
{"kind": "skip"}
"""
if not data or data == "[DONE]":
return {"kind": "done"}
try:
obj = orjson.loads(data)
except (orjson.JSONDecodeError, ValueError, TypeError):
return {"kind": "skip"}
if not isinstance(obj, dict):
return {"kind": "skip"}
# Event-specific dispatch (event: line precedes data: line in SSE).
ev = self._current_event or obj.get("type") or ""
# ── Text delta ────────────────────────────────────────────────────────
if ev == "response.output_text.delta" or obj.get("type") == "response.output_text.delta":
delta = obj.get("delta") or ""
if isinstance(delta, str) and delta:
self.text_buf.append(delta)
return {"kind": "text", "content": delta}
return {"kind": "skip"}
# ── Reasoning summary delta (thinking) ────────────────────────────────
if ev in (
"response.reasoning_summary_text.delta",
"response.reasoning_summary.delta",
) or obj.get("type") in (
"response.reasoning_summary_text.delta",
"response.reasoning_summary.delta",
):
delta = obj.get("delta") or ""
if isinstance(delta, str) and delta:
self.thinking_buf.append(delta)
return {"kind": "thinking", "content": delta}
return {"kind": "skip"}
# ── Tool call start (output_item.added with type=function_call) ──────
if ev == "response.output_item.added" or obj.get("type") == "response.output_item.added":
item = obj.get("item") or {}
if isinstance(item, dict) and item.get("type") == "function_call":
item_id = item.get("id") or item.get("call_id") or ""
call_id = item.get("call_id") or item_id
name = item.get("name") or ""
idx = len(self.tool_calls)
self._active_tool_index[item_id] = idx
self._tool_args_buf[item_id] = []
self.tool_calls.append(
{
"id": call_id,
"type": "function",
"function": {
"name": name,
"arguments": ""
},
})
return {
"kind": "tool_call_start",
"index": idx,
"call_id": call_id,
"name": name,
}
return {"kind": "skip"}
# ── Web search call done β€” collect sources ───────────────────────────
if ev == "response.output_item.done" or obj.get("type") == "response.output_item.done":
item = obj.get("item") or {}
if isinstance(item, dict) and item.get("type") == "web_search_call":
action = item.get("action") or {}
if isinstance(action, dict):
for src in action.get("sources") or []:
if not isinstance(src, dict):
continue
url = src.get("url") or ""
if url and url not in self._seen_source_urls:
self._seen_source_urls.add(url)
self.search_sources.append({
"url": url,
"title": src.get("title") or "",
})
if action.get("type") == "open_page":
url = action.get("url") or ""
if url and url not in self._seen_source_urls:
self._seen_source_urls.add(url)
self.search_sources.append({
"url": url,
"title": "",
})
return {"kind": "skip"}
# ── Tool call argument delta ──────────────────────────────────────────
if ev == "response.function_call_arguments.delta" or obj.get(
"type") == "response.function_call_arguments.delta":
item_id = obj.get("item_id") or ""
delta = obj.get("delta") or ""
if not isinstance(delta, str) or not delta:
return {"kind": "skip"}
idx = self._active_tool_index.get(item_id)
if idx is None:
return {"kind": "skip"}
self._tool_args_buf.setdefault(item_id, []).append(delta)
return {"kind": "tool_call_args", "index": idx, "delta": delta}
# ── Tool call complete ────────────────────────────────────────────────
if ev == "response.function_call_arguments.done" or obj.get(
"type") == "response.function_call_arguments.done":
item_id = obj.get("item_id") or ""
idx = self._active_tool_index.get(item_id)
if idx is None:
return {"kind": "skip"}
# Prefer upstream-provided final arguments string when present.
final_args = obj.get("arguments")
if not isinstance(final_args, str) or not final_args:
final_args = "".join(self._tool_args_buf.get(item_id, []))
self.tool_calls[idx]["function"]["arguments"] = final_args
return {"kind": "tool_call_done", "index": idx}
# ── URL citation annotation ───────────────────────────────────────────
if ev == "response.output_text.annotation.added" or obj.get(
"type") == "response.output_text.annotation.added":
ann = obj.get("annotation") or {}
if isinstance(ann, dict) and ann.get("type") in (None, "url_citation"):
url = ann.get("url") or ""
if url:
title = ann.get("title") or ""
if title == url:
# Multi-agent often duplicates URL into title; clean it.
title = ""
record = {
"url": url,
"title": title,
"start_index": int(ann.get("start_index") or 0),
"end_index": int(ann.get("end_index") or 0),
}
self.annotations.append(record)
# Fallback for multi-agent: harvest citation URL into
# search_sources too. Dedupe against web_search_call
# sources to avoid duplicating single-agent entries.
if url not in self._seen_source_urls:
self._seen_source_urls.add(url)
self.search_sources.append({
"url": url,
"title": title,
})
return {"kind": "annotation", "annotation_data": record}
return {"kind": "skip"}
# ── Final completion frame β€” capture usage for accounting ────────────
if ev == "response.completed" or obj.get("type") == "response.completed":
resp = obj.get("response") or obj
usage = resp.get("usage") or {}
if usage:
self._usage = {
"prompt_tokens": int(usage.get("input_tokens") or 0),
"completion_tokens": int(usage.get("output_tokens") or 0),
"total_tokens": int(usage.get("total_tokens") or 0),
"reasoning_tokens": int(
(usage.get("output_tokens_details") or {}).get("reasoning_tokens") or
usage.get("reasoning_tokens") or 0),
}
return {"kind": "done"}
# ── Error frames ──────────────────────────────────────────────────────
if ev in ("response.failed", "response.error", "error") or obj.get("type") in (
"response.failed",
"response.error",
"error",
):
err = obj.get("error") or obj.get("response", {}).get("error") or {}
if isinstance(err, dict):
msg = err.get("message") or err.get("code") or "Console stream error"
else:
msg = str(err) or "Console stream error"
return {"kind": "error", "message": str(msg)}
return {"kind": "skip"}
@property
def usage(self) -> dict[str, int]:
"""Return collected usage tokens (populated after stream completion)."""
return dict(self._usage)
__all__ = [
"build_console_input",
"build_console_payload",
"convert_openai_tools_to_console",
"convert_openai_tool_choice",
"inject_web_search_tool",
"extract_console_text",
"extract_console_reasoning",
"extract_console_tool_calls",
"extract_console_annotations",
"extract_console_search_sources",
"extract_console_usage",
"parse_console_error",
"classify_console_sse_line",
"ConsoleStreamAdapter",
]