Spaces:
Sleeping
Sleeping
| """Console API protocol β payload builder and response parser. | |
| The ``console.x.ai/v1/responses`` endpoint shares SSO cookies with grok.com | |
| but exposes the OpenAI Responses API directly. Free/basic accounts can call | |
| all models (grok-4.3, grok-4.20-*, etc.) through this endpoint, bypassing | |
| the tier restrictions of the grok.com web chat API. | |
| The upstream API supports: | |
| - Plain string input or structured input arrays (for multimodal / chat history) | |
| - Native function calling via ``tools`` field | |
| - Reasoning summary streaming | |
| - SSE streaming with OpenAI Responses API event names | |
| Request format (string input): | |
| {"model": "grok-4.3", "input": "What is 1+1?", "stream": true} | |
| Request format (structured input + tools): | |
| { | |
| "model": "grok-4.3", | |
| "input": [ | |
| {"role": "user", "content": [ | |
| {"type": "input_text", "text": "What's the weather?"}, | |
| {"type": "input_image", "image_url": "https://...", "detail": "auto"} | |
| ]} | |
| ], | |
| "tools": [ | |
| {"type": "function", "name": "get_weather", | |
| "description": "...", "parameters": {...}} | |
| ], | |
| "tool_choice": "auto" | |
| } | |
| Response output items (non-streaming): | |
| - {"type": "reasoning", "summary": [{"type": "summary_text", "text": "..."}]} | |
| - {"type": "message", "role": "assistant", | |
| "content": [{"type": "output_text", "text": "...", "annotations": [...]}]} | |
| - {"type": "function_call", "call_id": "...", "name": "...", "arguments": "..."} | |
| """ | |
| from typing import Any | |
| import orjson | |
| from app.platform.errors import UpstreamError | |
| from app.platform.logging.logger import logger | |
| # --------------------------------------------------------------------------- | |
| # Input conversion (OpenAI Chat Completions β console.x.ai input array) | |
| # --------------------------------------------------------------------------- | |
| def build_console_input(messages: list[dict[str, Any]], ) -> tuple[list[dict[str, Any]], str]: | |
| """Convert OpenAI Chat Completions messages β console structured input. | |
| Returns ``(input_array, instructions)``: | |
| - ``input_array`` is the list passed as Responses API ``input`` field. | |
| - ``instructions`` aggregates all ``role=system`` messages and is | |
| passed via the separate Responses API ``instructions`` field for | |
| better reasoning model behaviour. | |
| Mapping rules: | |
| - ``role=system`` β folded into ``instructions`` | |
| - ``role=user/assistant`` β preserved with content blocks converted | |
| - Content block ``text`` β ``{type: input_text/output_text, text}`` | |
| - Content block ``image_url`` β ``{type: input_image, image_url, detail}`` | |
| - ``role=tool`` β ``{type: function_call_output, | |
| call_id, output}`` | |
| - ``role=assistant`` with ``tool_calls`` β emit one ``function_call`` | |
| item per call before any accompanying text. | |
| """ | |
| instructions_parts: list[str] = [] | |
| output: list[dict[str, Any]] = [] | |
| for msg in messages: | |
| role = msg.get("role") or "user" | |
| content = msg.get("content") | |
| tool_calls = msg.get("tool_calls") | |
| # ββ system β instructions ββββββββββββββββββββββββββββββββββββββββ | |
| if role == "system": | |
| if isinstance(content, str) and content.strip(): | |
| instructions_parts.append(content.strip()) | |
| elif isinstance(content, list): | |
| for block in content: | |
| if isinstance(block, dict) and block.get("type") == "text": | |
| text = block.get("text") or "" | |
| if text.strip(): | |
| instructions_parts.append(text.strip()) | |
| continue | |
| # ββ tool result β function_call_output βββββββββββββββββββββββββββ | |
| if role == "tool": | |
| call_id = msg.get("tool_call_id") or "" | |
| text = content if isinstance(content, str) else _flatten_text(content) | |
| output.append({ | |
| "type": "function_call_output", | |
| "call_id": call_id, | |
| "output": text or "", | |
| }) | |
| continue | |
| # ββ assistant with tool_calls β function_call items ββββββββββββββ | |
| if role == "assistant" and tool_calls: | |
| for tc in tool_calls: | |
| if not isinstance(tc, dict): | |
| continue | |
| fn = tc.get("function") or {} | |
| output.append( | |
| { | |
| "type": "function_call", | |
| "call_id": tc.get("id") or fn.get("name") or "", | |
| "name": fn.get("name") or "", | |
| "arguments": fn.get("arguments") or "{}", | |
| }) | |
| # Trailing assistant text (rare) is emitted as a normal message | |
| text = content if isinstance(content, str) else _flatten_text(content) | |
| if text and text.strip(): | |
| output.append( | |
| { | |
| "role": "assistant", | |
| "content": [{ | |
| "type": "output_text", | |
| "text": text.strip() | |
| }], | |
| }) | |
| continue | |
| # ββ normal user / assistant message ββββββββββββββββββββββββββββββ | |
| blocks = _convert_content_blocks(content, role) | |
| if not blocks: | |
| continue | |
| output.append({"role": role, "content": blocks}) | |
| instructions = "\n\n".join(instructions_parts).strip() | |
| return output, instructions | |
| def _flatten_text(content: Any) -> str: | |
| """Flatten an OpenAI content array into a single text string.""" | |
| if isinstance(content, str): | |
| return content | |
| if not isinstance(content, list): | |
| return "" | |
| parts: list[str] = [] | |
| for block in content: | |
| if not isinstance(block, dict): | |
| continue | |
| if block.get("type") == "text": | |
| text = block.get("text") or "" | |
| if text: | |
| parts.append(text) | |
| return "\n".join(parts) | |
| def _convert_content_blocks( | |
| content: Any, | |
| role: str, | |
| ) -> list[dict[str, Any]]: | |
| """Convert one OpenAI message content (str or array) β console blocks.""" | |
| text_type = "output_text" if role == "assistant" else "input_text" | |
| # Plain string content | |
| if isinstance(content, str): | |
| text = content.strip() | |
| if not text: | |
| return [] | |
| return [{"type": text_type, "text": text}] | |
| # Already-structured array | |
| if not isinstance(content, list): | |
| return [] | |
| blocks: list[dict[str, Any]] = [] | |
| for block in content: | |
| if not isinstance(block, dict): | |
| continue | |
| btype = block.get("type") | |
| if btype == "text": | |
| text = block.get("text") or "" | |
| if text.strip(): | |
| blocks.append({"type": text_type, "text": text}) | |
| elif btype == "image_url": | |
| inner = block.get("image_url") or {} | |
| if isinstance(inner, str): | |
| url, detail = inner, "auto" | |
| else: | |
| url = inner.get("url") or "" | |
| detail = inner.get("detail") or "auto" | |
| if url: | |
| blocks.append({ | |
| "type": "input_image", | |
| "image_url": url, | |
| "detail": detail, | |
| }) | |
| elif btype in ("input_text", "output_text", "input_image"): | |
| # Already in console format β pass through | |
| blocks.append(dict(block)) | |
| return blocks | |
| # --------------------------------------------------------------------------- | |
| # Tool format conversion | |
| # --------------------------------------------------------------------------- | |
| def convert_openai_tools_to_console(tools: list[dict[str, Any]] | None, ) -> list[dict[str, Any]]: | |
| """Convert OpenAI Chat Completions tools β console (Responses API) tools. | |
| OpenAI Chat Completions: | |
| {"type": "function", "function": {"name", "description", "parameters"}} | |
| Console (Responses API): | |
| {"type": "function", "name", "description", "parameters"} | |
| Already-flat tools are passed through (e.g. ``web_search`` server-side | |
| tool, ``code_interpreter``, ``x_search`` etc.). | |
| """ | |
| if not tools: | |
| return [] | |
| out: list[dict[str, Any]] = [] | |
| for t in tools: | |
| if not isinstance(t, dict): | |
| continue | |
| if t.get("type") != "function": | |
| # Pass through server-side tools (web_search, x_search, etc.) | |
| out.append(dict(t)) | |
| continue | |
| fn = t.get("function") if isinstance(t.get("function"), dict) else None | |
| if fn is not None: | |
| out.append( | |
| { | |
| "type": "function", | |
| "name": fn.get("name") or "", | |
| "description": fn.get("description") or "", | |
| "parameters": fn.get("parameters") or {}, | |
| }) | |
| else: | |
| # Already flat | |
| out.append(dict(t)) | |
| return out | |
| def convert_openai_tool_choice(tool_choice: Any) -> Any: | |
| """Convert OpenAI tool_choice β console tool_choice. | |
| OpenAI: "none" | "auto" | "required" | {"type":"function","function":{"name":"x"}} | |
| Console: "none" | "auto" | "required" | {"type":"function","name":"x"} | |
| """ | |
| if isinstance(tool_choice, str): | |
| return tool_choice | |
| if isinstance(tool_choice, dict) and tool_choice.get("type") == "function": | |
| fn = tool_choice.get("function") if isinstance(tool_choice.get("function"), dict) else None | |
| if fn: | |
| return {"type": "function", "name": fn.get("name") or ""} | |
| return dict(tool_choice) | |
| return tool_choice | |
| # --------------------------------------------------------------------------- | |
| # Payload builder | |
| # --------------------------------------------------------------------------- | |
| def build_console_payload( | |
| *, | |
| console_model: str, | |
| input: Any, | |
| instructions: str = "", | |
| stream: bool = False, | |
| temperature: float | None = None, | |
| top_p: float | None = None, | |
| reasoning_effort: str | None = None, | |
| tools: list[dict[str, Any]] | None = None, | |
| tool_choice: Any = None, | |
| ) -> dict[str, Any]: | |
| """Build the JSON payload for POST /v1/responses on console.x.ai. | |
| ``input`` may be a plain string or an array of structured input items | |
| (use :func:`build_console_input` to convert OpenAI messages). | |
| ``tools`` should already be in console format (use | |
| :func:`convert_openai_tools_to_console`). | |
| """ | |
| payload: dict[str, Any] = { | |
| "model": console_model, | |
| "input": input, | |
| } | |
| if stream: | |
| payload["stream"] = True | |
| if instructions: | |
| payload["instructions"] = instructions | |
| if temperature is not None: | |
| payload["temperature"] = temperature | |
| if top_p is not None: | |
| payload["top_p"] = top_p | |
| if reasoning_effort: | |
| # Valid values: "minimal" | "low" | "medium" | "high" | |
| payload["reasoning"] = {"effort": reasoning_effort} | |
| if tools: | |
| payload["tools"] = tools | |
| if tool_choice is not None: | |
| payload["tool_choice"] = tool_choice | |
| if isinstance(input, str): | |
| msg_repr = f"len={len(input)}" | |
| elif isinstance(input, list): | |
| msg_repr = f"items={len(input)}" | |
| else: | |
| msg_repr = "unknown" | |
| logger.debug( | |
| "console payload built: model={} stream={} input_{} tools={}", | |
| console_model, | |
| stream, | |
| msg_repr, | |
| len(tools) if tools else 0, | |
| ) | |
| return payload | |
| # --------------------------------------------------------------------------- | |
| # Non-streaming response parsing | |
| # --------------------------------------------------------------------------- | |
| def extract_console_text(response_json: dict[str, Any]) -> str: | |
| """Extract the assistant's final text from a non-streaming response.""" | |
| output = response_json.get("output") or [] | |
| for item in output: | |
| if not isinstance(item, dict): | |
| continue | |
| if item.get("type") != "message": | |
| continue | |
| contents = item.get("content") or [] | |
| for c in contents: | |
| if not isinstance(c, dict): | |
| continue | |
| if c.get("type") == "output_text": | |
| return c.get("text") or "" | |
| return "" | |
| def extract_console_reasoning(response_json: dict[str, Any]) -> str: | |
| """Extract reasoning summary text if present (non-streaming).""" | |
| output = response_json.get("output") or [] | |
| for item in output: | |
| if not isinstance(item, dict): | |
| continue | |
| if item.get("type") == "reasoning": | |
| summary = item.get("summary") or [] | |
| parts: list[str] = [] | |
| for s in summary: | |
| if isinstance(s, dict): | |
| text = s.get("text") or s.get("content") or "" | |
| if text: | |
| parts.append(text) | |
| elif isinstance(s, str): | |
| parts.append(s) | |
| return "\n".join(parts) | |
| return "" | |
| def extract_console_tool_calls(response_json: dict[str, Any], ) -> list[dict[str, Any]]: | |
| """Extract tool calls from a non-streaming response. | |
| Returns a list of OpenAI Chat Completions tool_call dicts: | |
| [{"id": "call_xxx", "type": "function", | |
| "function": {"name": "...", "arguments": "..."}}] | |
| Console responses include each tool call as a top-level output item | |
| of type ``function_call`` with a ``call_id``, ``name`` and | |
| JSON-serialised ``arguments`` string. | |
| """ | |
| output = response_json.get("output") or [] | |
| calls: list[dict[str, Any]] = [] | |
| for item in output: | |
| if not isinstance(item, dict): | |
| continue | |
| if item.get("type") != "function_call": | |
| continue | |
| call_id = item.get("call_id") or item.get("id") or "" | |
| calls.append( | |
| { | |
| "id": call_id, | |
| "type": "function", | |
| "function": { | |
| "name": item.get("name") or "", | |
| "arguments": item.get("arguments") or "{}", | |
| }, | |
| }) | |
| return calls | |
| def extract_console_search_sources(response_json: dict[str, Any], ) -> list[dict[str, Any]]: | |
| """Extract the search sources list from web_search_call output items. | |
| Returns a deduplicated list of source dicts in the format used by the | |
| existing grok.com path: | |
| [{"url": "https://...", "title": ""}, ...] | |
| Two upstream variants are handled: | |
| 1. Single-agent models (grok-4.3, grok-4.20-reasoning) emit a | |
| ``web_search_call`` output item per search with full sources: | |
| ``{"type": "search", "sources": [{"url": "..."}, ...]}`` | |
| or ``{"type": "open_page", "url": "..."}``. | |
| 2. Multi-agent models (grok-4.20-multi-agent) skip ``web_search_call`` | |
| items entirely and embed URLs only as document-level annotations on | |
| the final assistant message with ``start_index == end_index == 0``. | |
| We fall back to those annotation URLs so callers always see a | |
| useful citation list regardless of the upstream emission format. | |
| """ | |
| seen: set[str] = set() | |
| out: list[dict[str, Any]] = [] | |
| for item in response_json.get("output") or []: | |
| if not isinstance(item, dict): | |
| continue | |
| if item.get("type") != "web_search_call": | |
| continue | |
| action = item.get("action") or {} | |
| if not isinstance(action, dict): | |
| continue | |
| # Search action with sources list | |
| for src in action.get("sources") or []: | |
| if not isinstance(src, dict): | |
| continue | |
| url = src.get("url") or "" | |
| if not url or url in seen: | |
| continue | |
| seen.add(url) | |
| out.append({ | |
| "url": url, | |
| "title": src.get("title") or "", | |
| }) | |
| # Page-open action β single URL | |
| if action.get("type") == "open_page": | |
| url = action.get("url") or "" | |
| if url and url not in seen: | |
| seen.add(url) | |
| out.append({"url": url, "title": ""}) | |
| # Fallback: harvest URLs from message annotations. Multi-agent | |
| # responses publish citations only here. We dedupe against the | |
| # web_search_call sources collected above so single-agent paths | |
| # remain unchanged. | |
| for item in response_json.get("output") or []: | |
| if not isinstance(item, dict) or item.get("type") != "message": | |
| continue | |
| for content in item.get("content") or []: | |
| if not isinstance(content, dict): | |
| continue | |
| for ann in content.get("annotations") or []: | |
| if not isinstance(ann, dict): | |
| continue | |
| if ann.get("type") not in (None, "url_citation"): | |
| continue | |
| url = ann.get("url") or "" | |
| if not url or url in seen: | |
| continue | |
| seen.add(url) | |
| title = ann.get("title") or "" | |
| # Multi-agent annotations sometimes set title=url; strip | |
| # the duplicate so the source list reads cleanly. | |
| if title == url: | |
| title = "" | |
| out.append({"url": url, "title": title}) | |
| return out | |
| def inject_web_search_tool(tools: list[dict[str, Any]] | None, ) -> list[dict[str, Any]]: | |
| """Ensure a ``web_search`` tool is present in the console tools list. | |
| If the user already supplied any ``web_search`` tool (with or without | |
| options), it's left untouched. Otherwise a default ``{"type": | |
| "web_search"}`` entry is appended. xAI charges $5/1000 calls for web | |
| search; this is consumed from the account's prepaid (trial) credits. | |
| """ | |
| existing = list(tools or []) | |
| for t in existing: | |
| if isinstance(t, dict) and t.get("type") == "web_search": | |
| return existing | |
| existing.append({"type": "web_search"}) | |
| return existing | |
| def extract_console_annotations(response_json: dict[str, Any], ) -> list[dict[str, Any]]: | |
| """Extract URL citation annotations from a non-streaming response. | |
| Returns a flat list of citation dicts in chat-completions format: | |
| [{"url": "...", "title": "...", "start_index": 0, "end_index": 0}] | |
| """ | |
| out: list[dict[str, Any]] = [] | |
| output = response_json.get("output") or [] | |
| for item in output: | |
| if not isinstance(item, dict): | |
| continue | |
| if item.get("type") != "message": | |
| continue | |
| contents = item.get("content") or [] | |
| for c in contents: | |
| if not isinstance(c, dict): | |
| continue | |
| anns = c.get("annotations") or [] | |
| for a in anns: | |
| if not isinstance(a, dict): | |
| continue | |
| if a.get("type") not in (None, "url_citation"): | |
| continue | |
| url = a.get("url") or "" | |
| if not url: | |
| continue | |
| out.append( | |
| { | |
| "url": url, | |
| "title": a.get("title") or "", | |
| "start_index": int(a.get("start_index") or 0), | |
| "end_index": int(a.get("end_index") or 0), | |
| }) | |
| return out | |
| def extract_console_usage(response_json: dict[str, Any]) -> dict[str, int]: | |
| """Extract usage tokens from a non-streaming response.""" | |
| usage = response_json.get("usage") or {} | |
| return { | |
| "prompt_tokens": int(usage.get("input_tokens") or 0), | |
| "completion_tokens": int(usage.get("output_tokens") or 0), | |
| "total_tokens": int(usage.get("total_tokens") or 0), | |
| "reasoning_tokens": int( | |
| (usage.get("output_tokens_details") or {}).get("reasoning_tokens") or | |
| usage.get("reasoning_tokens") or 0), | |
| } | |
| def parse_console_error(status_code: int, body: str) -> UpstreamError: | |
| """Convert a non-200 console response into an UpstreamError.""" | |
| message = f"Console upstream returned {status_code}" | |
| try: | |
| obj = orjson.loads(body) if body else {} | |
| if isinstance(obj, dict): | |
| err = obj.get("error") or obj.get("code") or "" | |
| if isinstance(err, dict): | |
| err = err.get("message") or err.get("code") or "" | |
| if err: | |
| message = f"{message}: {err}" | |
| except (orjson.JSONDecodeError, ValueError, TypeError): | |
| pass | |
| return UpstreamError(message, status=status_code, body=body[:400]) | |
| # --------------------------------------------------------------------------- | |
| # SSE streaming event parsing | |
| # --------------------------------------------------------------------------- | |
| def classify_console_sse_line(line: str | bytes) -> tuple[str, str]: | |
| """Return (kind, payload) for one raw SSE line. | |
| kind: | |
| - 'data' β SSE data line; payload is the JSON string | |
| - 'event' β SSE event name line; payload is the event name | |
| - 'skip' β comment / blank / unrecognized | |
| """ | |
| if isinstance(line, bytes): | |
| line = line.decode("utf-8", "replace") | |
| line = line.strip() | |
| if not line: | |
| return "skip", "" | |
| if line.startswith("event:"): | |
| return "event", line[6:].strip() | |
| if line.startswith("data:"): | |
| data = line[5:].strip() | |
| return "data", data | |
| if line.startswith("{"): | |
| return "data", line | |
| return "skip", "" | |
| class ConsoleStreamAdapter: | |
| """Parse upstream Console SSE frames and emit text/reasoning/tool deltas. | |
| The console.x.ai SSE protocol uses OpenAI Responses API event names: | |
| - response.created | |
| - response.output_item.added β announces a new item | |
| - response.content_part.added | |
| - response.output_text.delta β text chunks | |
| - response.output_text.done | |
| - response.reasoning_summary_text.delta β reasoning chunks | |
| - response.function_call_arguments.delta β tool args streaming | |
| - response.function_call_arguments.done β tool args complete | |
| - response.output_item.done β completed item | |
| - response.output_text.annotation.added β citation annotation | |
| - response.completed | |
| - response.failed / response.cancelled / response.error | |
| """ | |
| __slots__ = ( | |
| "_current_event", | |
| "_active_tool_index", | |
| "_tool_args_buf", | |
| "_seen_source_urls", | |
| "tool_calls", | |
| "annotations", | |
| "search_sources", | |
| "text_buf", | |
| "thinking_buf", | |
| "_usage", | |
| ) | |
| def __init__(self) -> None: | |
| self._current_event: str = "" | |
| self._active_tool_index: dict[str, int] = {} # item_id β index | |
| self._tool_args_buf: dict[str, list[str]] = {} # item_id β args chunks | |
| self._seen_source_urls: set[str] = set() | |
| self.tool_calls: list[dict[str, Any]] = [] | |
| self.annotations: list[dict[str, Any]] = [] | |
| self.search_sources: list[dict[str, Any]] = [] | |
| self.text_buf: list[str] = [] | |
| self.thinking_buf: list[str] = [] | |
| self._usage: dict[str, int] = {} | |
| def feed_event(self, event_name: str) -> None: | |
| """Record the most recent ``event:`` name from the SSE stream.""" | |
| self._current_event = event_name | |
| def feed_data(self, data: str) -> dict[str, Any]: | |
| """Parse one SSE data frame; return the kind/content classification. | |
| Returns a dict like: | |
| {"kind": "text", "content": "Two"} | |
| {"kind": "thinking", "content": "Let me think..."} | |
| {"kind": "tool_call_start", "index": 0, "call_id": "...", "name": "..."} | |
| {"kind": "tool_call_args", "index": 0, "delta": "..."} | |
| {"kind": "tool_call_done", "index": 0} | |
| {"kind": "annotation", "annotation_data": {...}} | |
| {"kind": "done"} | |
| {"kind": "error", "message": "..."} | |
| {"kind": "skip"} | |
| """ | |
| if not data or data == "[DONE]": | |
| return {"kind": "done"} | |
| try: | |
| obj = orjson.loads(data) | |
| except (orjson.JSONDecodeError, ValueError, TypeError): | |
| return {"kind": "skip"} | |
| if not isinstance(obj, dict): | |
| return {"kind": "skip"} | |
| # Event-specific dispatch (event: line precedes data: line in SSE). | |
| ev = self._current_event or obj.get("type") or "" | |
| # ββ Text delta ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if ev == "response.output_text.delta" or obj.get("type") == "response.output_text.delta": | |
| delta = obj.get("delta") or "" | |
| if isinstance(delta, str) and delta: | |
| self.text_buf.append(delta) | |
| return {"kind": "text", "content": delta} | |
| return {"kind": "skip"} | |
| # ββ Reasoning summary delta (thinking) ββββββββββββββββββββββββββββββββ | |
| if ev in ( | |
| "response.reasoning_summary_text.delta", | |
| "response.reasoning_summary.delta", | |
| ) or obj.get("type") in ( | |
| "response.reasoning_summary_text.delta", | |
| "response.reasoning_summary.delta", | |
| ): | |
| delta = obj.get("delta") or "" | |
| if isinstance(delta, str) and delta: | |
| self.thinking_buf.append(delta) | |
| return {"kind": "thinking", "content": delta} | |
| return {"kind": "skip"} | |
| # ββ Tool call start (output_item.added with type=function_call) ββββββ | |
| if ev == "response.output_item.added" or obj.get("type") == "response.output_item.added": | |
| item = obj.get("item") or {} | |
| if isinstance(item, dict) and item.get("type") == "function_call": | |
| item_id = item.get("id") or item.get("call_id") or "" | |
| call_id = item.get("call_id") or item_id | |
| name = item.get("name") or "" | |
| idx = len(self.tool_calls) | |
| self._active_tool_index[item_id] = idx | |
| self._tool_args_buf[item_id] = [] | |
| self.tool_calls.append( | |
| { | |
| "id": call_id, | |
| "type": "function", | |
| "function": { | |
| "name": name, | |
| "arguments": "" | |
| }, | |
| }) | |
| return { | |
| "kind": "tool_call_start", | |
| "index": idx, | |
| "call_id": call_id, | |
| "name": name, | |
| } | |
| return {"kind": "skip"} | |
| # ββ Web search call done β collect sources βββββββββββββββββββββββββββ | |
| if ev == "response.output_item.done" or obj.get("type") == "response.output_item.done": | |
| item = obj.get("item") or {} | |
| if isinstance(item, dict) and item.get("type") == "web_search_call": | |
| action = item.get("action") or {} | |
| if isinstance(action, dict): | |
| for src in action.get("sources") or []: | |
| if not isinstance(src, dict): | |
| continue | |
| url = src.get("url") or "" | |
| if url and url not in self._seen_source_urls: | |
| self._seen_source_urls.add(url) | |
| self.search_sources.append({ | |
| "url": url, | |
| "title": src.get("title") or "", | |
| }) | |
| if action.get("type") == "open_page": | |
| url = action.get("url") or "" | |
| if url and url not in self._seen_source_urls: | |
| self._seen_source_urls.add(url) | |
| self.search_sources.append({ | |
| "url": url, | |
| "title": "", | |
| }) | |
| return {"kind": "skip"} | |
| # ββ Tool call argument delta ββββββββββββββββββββββββββββββββββββββββββ | |
| if ev == "response.function_call_arguments.delta" or obj.get( | |
| "type") == "response.function_call_arguments.delta": | |
| item_id = obj.get("item_id") or "" | |
| delta = obj.get("delta") or "" | |
| if not isinstance(delta, str) or not delta: | |
| return {"kind": "skip"} | |
| idx = self._active_tool_index.get(item_id) | |
| if idx is None: | |
| return {"kind": "skip"} | |
| self._tool_args_buf.setdefault(item_id, []).append(delta) | |
| return {"kind": "tool_call_args", "index": idx, "delta": delta} | |
| # ββ Tool call complete ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if ev == "response.function_call_arguments.done" or obj.get( | |
| "type") == "response.function_call_arguments.done": | |
| item_id = obj.get("item_id") or "" | |
| idx = self._active_tool_index.get(item_id) | |
| if idx is None: | |
| return {"kind": "skip"} | |
| # Prefer upstream-provided final arguments string when present. | |
| final_args = obj.get("arguments") | |
| if not isinstance(final_args, str) or not final_args: | |
| final_args = "".join(self._tool_args_buf.get(item_id, [])) | |
| self.tool_calls[idx]["function"]["arguments"] = final_args | |
| return {"kind": "tool_call_done", "index": idx} | |
| # ββ URL citation annotation βββββββββββββββββββββββββββββββββββββββββββ | |
| if ev == "response.output_text.annotation.added" or obj.get( | |
| "type") == "response.output_text.annotation.added": | |
| ann = obj.get("annotation") or {} | |
| if isinstance(ann, dict) and ann.get("type") in (None, "url_citation"): | |
| url = ann.get("url") or "" | |
| if url: | |
| title = ann.get("title") or "" | |
| if title == url: | |
| # Multi-agent often duplicates URL into title; clean it. | |
| title = "" | |
| record = { | |
| "url": url, | |
| "title": title, | |
| "start_index": int(ann.get("start_index") or 0), | |
| "end_index": int(ann.get("end_index") or 0), | |
| } | |
| self.annotations.append(record) | |
| # Fallback for multi-agent: harvest citation URL into | |
| # search_sources too. Dedupe against web_search_call | |
| # sources to avoid duplicating single-agent entries. | |
| if url not in self._seen_source_urls: | |
| self._seen_source_urls.add(url) | |
| self.search_sources.append({ | |
| "url": url, | |
| "title": title, | |
| }) | |
| return {"kind": "annotation", "annotation_data": record} | |
| return {"kind": "skip"} | |
| # ββ Final completion frame β capture usage for accounting ββββββββββββ | |
| if ev == "response.completed" or obj.get("type") == "response.completed": | |
| resp = obj.get("response") or obj | |
| usage = resp.get("usage") or {} | |
| if usage: | |
| self._usage = { | |
| "prompt_tokens": int(usage.get("input_tokens") or 0), | |
| "completion_tokens": int(usage.get("output_tokens") or 0), | |
| "total_tokens": int(usage.get("total_tokens") or 0), | |
| "reasoning_tokens": int( | |
| (usage.get("output_tokens_details") or {}).get("reasoning_tokens") or | |
| usage.get("reasoning_tokens") or 0), | |
| } | |
| return {"kind": "done"} | |
| # ββ Error frames ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if ev in ("response.failed", "response.error", "error") or obj.get("type") in ( | |
| "response.failed", | |
| "response.error", | |
| "error", | |
| ): | |
| err = obj.get("error") or obj.get("response", {}).get("error") or {} | |
| if isinstance(err, dict): | |
| msg = err.get("message") or err.get("code") or "Console stream error" | |
| else: | |
| msg = str(err) or "Console stream error" | |
| return {"kind": "error", "message": str(msg)} | |
| return {"kind": "skip"} | |
| def usage(self) -> dict[str, int]: | |
| """Return collected usage tokens (populated after stream completion).""" | |
| return dict(self._usage) | |
| __all__ = [ | |
| "build_console_input", | |
| "build_console_payload", | |
| "convert_openai_tools_to_console", | |
| "convert_openai_tool_choice", | |
| "inject_web_search_tool", | |
| "extract_console_text", | |
| "extract_console_reasoning", | |
| "extract_console_tool_calls", | |
| "extract_console_annotations", | |
| "extract_console_search_sources", | |
| "extract_console_usage", | |
| "parse_console_error", | |
| "classify_console_sse_line", | |
| "ConsoleStreamAdapter", | |
| ] | |