agent-ui / backend /agent.py
lvwerra's picture
lvwerra HF Staff
Refactor backend: extract async stream helper, direct tool registry, nudge_for_result via call_llm
4e3db9c
"""
Web agent backend - autonomous agent with web tools (search, read, screenshot).
Uses the same tool-calling loop pattern as code.py:
LLM call → parse tool_calls → execute → update history → repeat
"""
import json
import logging
import re
from typing import List, Dict, Optional
from .tools import (
web_search, read_url,
execute_web_search, execute_read_url,
extract_and_download_images,
)
from .image import resize_image_for_vlm
logger = logging.getLogger(__name__)
TOOLS = [web_search, read_url]
MAX_TURNS = 20
def execute_tool(tool_name: str, args: dict, serper_key: str) -> dict:
"""
Execute a tool by name and return result dict.
Returns:
dict with keys:
- "content": str result for the LLM
- "image": optional base64 PNG (for screenshot_url)
- "display": dict with display-friendly data for frontend
"""
if tool_name == "web_search":
query = args.get("query", "")
num_results = args.get("num_results", 5)
result_str = execute_web_search(query, serper_key, num_results)
return {
"content": result_str,
"display": {"type": "search", "query": query, "results": result_str}
}
elif tool_name == "read_url":
url = args.get("url", "")
chunk = args.get("chunk", 0)
use_html = args.get("use_html", False)
content = execute_read_url(url, chunk=chunk, use_html=use_html)
return {
"content": content,
"display": {"type": "page", "url": url, "length": len(content), "markdown": content}
}
elif tool_name == "screenshot_url":
url = args.get("url", "")
base64_png = execute_screenshot_url(url)
if base64_png:
return {
"content": "Screenshot captured successfully. The image is attached.",
"image": base64_png,
"display": {"type": "screenshot", "url": url}
}
else:
return {
"content": f"Failed to take screenshot of {url}. The page may require JavaScript or be inaccessible.",
"display": {"type": "screenshot_error", "url": url}
}
return {"content": f"Unknown tool: {tool_name}", "display": {"type": "error"}}
def stream_agent_execution(
client,
model: str,
messages: List[Dict],
serper_key: str,
extra_params: Optional[Dict] = None,
abort_event=None,
multimodal: bool = False
):
"""
Run the agent tool-calling loop.
Yields dicts with SSE event types:
- thinking: { content }
- content: { content }
- tool_start: { tool, args }
- tool_result: { tool, result, image? }
- result_preview: { content }
- result: { content }
- generating: {}
- retry: { attempt, max_attempts, delay, message }
- error: { content }
- done: {}
"""
from .agents import call_llm
turns = 0
done = False
has_result = False
debug_call_number = 0
while not done and turns < MAX_TURNS:
# Check abort before each turn
if abort_event and abort_event.is_set():
yield {"type": "aborted"}
return
turns += 1
# LLM call with retries and debug events
response = None
for event in call_llm(client, model, messages, tools=TOOLS, extra_params=extra_params, abort_event=abort_event, call_number=debug_call_number):
if "_response" in event:
response = event["_response"]
debug_call_number = event["_call_number"]
else:
yield event
if event.get("type") in ("error", "aborted"):
return
if response is None:
return
# --- Parse response ---
assistant_message = response.choices[0].message
content = assistant_message.content or ""
tool_calls = assistant_message.tool_calls or []
# Check for <result> tags
result_match = re.search(r'<result>(.*?)</result>', content, re.DOTALL | re.IGNORECASE)
result_content = None
thinking_content = content
if result_match:
result_content = result_match.group(1).strip()
thinking_content = re.sub(r'<result>.*?</result>', '', content, flags=re.DOTALL | re.IGNORECASE).strip()
# Send thinking/content
if thinking_content.strip():
if tool_calls:
yield {"type": "thinking", "content": thinking_content}
else:
yield {"type": "content", "content": thinking_content}
# Send result preview
if result_content:
yield {"type": "result_preview", "content": result_content}
# --- Handle tool calls ---
if tool_calls:
for tool_call in tool_calls:
# Check abort between tool calls
if abort_event and abort_event.is_set():
yield {"type": "aborted"}
return
func_name = tool_call.function.name
# Parse arguments
try:
args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError as e:
output = f"Error parsing arguments: {e}"
messages.append({
"role": "assistant",
"content": content,
"tool_calls": [{"id": tool_call.id, "type": "function", "function": {"name": func_name, "arguments": tool_call.function.arguments}}]
})
messages.append({"role": "tool", "tool_call_id": tool_call.id, "content": output})
yield {"type": "error", "content": output}
continue
# Signal tool start (include IDs for history reconstruction)
yield {
"type": "tool_start",
"tool": func_name,
"args": args,
"tool_call_id": tool_call.id,
"arguments": tool_call.function.arguments,
"thinking": content,
}
# Execute tool
result = execute_tool(func_name, args, serper_key)
# Build tool response content for LLM
if result.get("image") and multimodal:
# Send screenshot as multimodal content so VLM can see it
vlm_image = resize_image_for_vlm(result["image"])
tool_response_content = [
{"type": "text", "text": result["content"]},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{vlm_image}"}}
]
elif func_name == "read_url" and multimodal:
# Extract and include page images so VLM can see them
page_images = extract_and_download_images(result["content"])
if page_images:
tool_response_content = [{"type": "text", "text": result["content"]}]
for img_b64 in page_images:
vlm_img = resize_image_for_vlm(img_b64)
tool_response_content.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{vlm_img}"}
})
else:
tool_response_content = result["content"]
else:
tool_response_content = result["content"]
# Add to message history
messages.append({
"role": "assistant",
"content": content,
"tool_calls": [{"id": tool_call.id, "type": "function", "function": {"name": func_name, "arguments": tool_call.function.arguments}}]
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_response_content
})
# Signal tool result to frontend (include response for history)
tool_result_event = {
"type": "tool_result",
"tool": func_name,
"tool_call_id": tool_call.id,
"result": result.get("display", {}),
"response": result.get("content", ""),
}
if result.get("image"):
tool_result_event["image"] = result["image"]
yield tool_result_event
else:
# No tool calls — we're done
messages.append({"role": "assistant", "content": content})
done = True
# Send result if found
if result_content:
has_result = True
yield {"type": "result", "content": result_content}
# Signal between-turn processing
if not done:
yield {"type": "generating"}
# If agent finished without a <result>, nudge it for one
if not has_result:
from .agents import nudge_for_result
yield from nudge_for_result(client, model, messages, extra_params=extra_params, call_number=debug_call_number)
yield {"type": "done"}