Spaces:
Running
Running
File size: 6,784 Bytes
0157ac7 574e4e7 0157ac7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | """SSE streaming for local web_search / web_fetch server tool results."""
from __future__ import annotations
import uuid
from collections.abc import AsyncIterator
from datetime import UTC, datetime
from typing import Any
from api.models.anthropic import MessagesRequest
from core.anthropic.server_tool_sse import (
SERVER_TOOL_USE,
WEB_FETCH_TOOL_ERROR,
WEB_FETCH_TOOL_RESULT,
WEB_SEARCH_TOOL_RESULT,
WEB_SEARCH_TOOL_RESULT_ERROR,
)
from core.anthropic.sse import format_sse_event
from .constants import _MAX_FETCH_CHARS
from .egress import WebFetchEgressPolicy
from .parsers import extract_query, extract_url
from .request import (
forced_server_tool_name,
forced_tool_turn_text,
has_tool_named,
)
def _search_summary(query: str, results: list[dict[str, str]]) -> str:
if not results:
return f"No web search results found for: {query}"
lines = [f"Search results for: {query}"]
for index, result in enumerate(results, start=1):
lines.append(f"{index}. {result['title']}\n{result['url']}")
return "\n\n".join(lines)
async def stream_web_server_tool_response(
request: MessagesRequest,
input_tokens: int,
*,
web_fetch_egress: WebFetchEgressPolicy,
verbose_client_errors: bool = False,
) -> AsyncIterator[str]:
"""Stream a minimal Anthropic-shaped turn for forced `web_search` / `web_fetch` (local fallback).
When `ENABLE_WEB_SERVER_TOOLS` is on, this is a proxy-side execution path — not a full
hosted Anthropic citation or encrypted-content pipeline.
"""
from . import outbound
tool_name = forced_server_tool_name(request)
if tool_name is None or not has_tool_named(request, tool_name):
return
text = forced_tool_turn_text(request)
message_id = f"msg_{uuid.uuid4()}"
tool_id = f"srvtoolu_{uuid.uuid4().hex}"
usage_key = (
"web_search_requests" if tool_name == "web_search" else "web_fetch_requests"
)
tool_input = (
{"query": extract_query(text)}
if tool_name == "web_search"
else {"url": extract_url(text)}
)
_result_block_for_tool = {
"web_search": WEB_SEARCH_TOOL_RESULT,
"web_fetch": WEB_FETCH_TOOL_RESULT,
}
_error_payload_type_for_tool = {
"web_search": WEB_SEARCH_TOOL_RESULT_ERROR,
"web_fetch": WEB_FETCH_TOOL_ERROR,
}
yield format_sse_event(
"message_start",
{
"type": "message_start",
"message": {
"id": message_id,
"type": "message",
"role": "assistant",
"content": [],
"model": request.model,
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": input_tokens, "output_tokens": 1},
},
},
)
yield format_sse_event(
"content_block_start",
{
"type": "content_block_start",
"index": 0,
"content_block": {
"type": SERVER_TOOL_USE,
"id": tool_id,
"name": tool_name,
"input": tool_input,
},
},
)
yield format_sse_event(
"content_block_stop", {"type": "content_block_stop", "index": 0}
)
try:
if tool_name == "web_search":
query = str(tool_input["query"])
results = await outbound._run_web_search(query)
result_content: Any = [
{
"type": "web_search_result",
"title": result["title"],
"url": result["url"],
}
for result in results
]
summary = _search_summary(query, results)
result_block_type = WEB_SEARCH_TOOL_RESULT
else:
fetched = await outbound._run_web_fetch(
str(tool_input["url"]), web_fetch_egress
)
result_content = {
"type": "web_fetch_result",
"url": fetched["url"],
"content": {
"type": "document",
"source": {
"type": "text",
"media_type": fetched["media_type"],
"data": fetched["data"],
},
"title": fetched["title"],
"citations": {"enabled": True},
},
"retrieved_at": datetime.now(UTC).isoformat(),
}
summary = fetched["data"][:_MAX_FETCH_CHARS]
result_block_type = WEB_FETCH_TOOL_RESULT
except Exception as error:
fetch_url = str(tool_input["url"]) if tool_name == "web_fetch" else None
outbound._log_web_tool_failure(tool_name, error, fetch_url=fetch_url)
result_block_type = _result_block_for_tool[tool_name]
result_content = {
"type": _error_payload_type_for_tool[tool_name],
"error_code": "unavailable",
}
summary = outbound._web_tool_client_error_summary(
tool_name, error, verbose=verbose_client_errors
)
output_tokens = max(1, len(summary) // 4)
yield format_sse_event(
"content_block_start",
{
"type": "content_block_start",
"index": 1,
"content_block": {
"type": result_block_type,
"tool_use_id": tool_id,
"content": result_content,
},
},
)
yield format_sse_event(
"content_block_stop", {"type": "content_block_stop", "index": 1}
)
# Model-facing summary: stream as normal text deltas (CLI/transcript code reads `text_delta`,
# not eager `text` on `content_block_start`).
yield format_sse_event(
"content_block_start",
{
"type": "content_block_start",
"index": 2,
"content_block": {"type": "text", "text": ""},
},
)
yield format_sse_event(
"content_block_delta",
{
"type": "content_block_delta",
"index": 2,
"delta": {"type": "text_delta", "text": summary},
},
)
yield format_sse_event(
"content_block_stop", {"type": "content_block_stop", "index": 2}
)
yield format_sse_event(
"message_delta",
{
"type": "message_delta",
"delta": {"stop_reason": "end_turn", "stop_sequence": None},
"usage": {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"server_tool_use": {usage_key: 1},
},
},
)
yield format_sse_event("message_stop", {"type": "message_stop"})
|