borderless / ui /agent /respond.py
spagestic's picture
switched to qwen
71a7158
Raw
History Blame Contribute Delete
8.41 kB
# ui/agent/respond.py
from __future__ import annotations
import json
import os
import time
import uuid
from typing import Any
import gradio as gr
from gradio import ChatMessage
from huggingface_hub import InferenceClient
from huggingface_hub.errors import HfHubHTTPError
from .completion import complete_turn
from .config import MAX_TOOL_ROUNDS, MODEL_ID, THINK_TOOLS, TOOLS, hub_token_available
from .messages import history_to_api_messages, multimodal_input_to_api_content
from .streaming import yield_response
from .tools import emit_thinking_message, execute_tool_calls, should_emit_reasoning
from .synthesis import build_structured_final_answer, findings_from_ui_messages
AUTH_REQUIRED_MESSAGE = (
"Please sign in with your Hugging Face account to use Borderless. "
"Click **Log in with Hugging Face** at the top of the page, then try again."
)
SESSION_EXPIRED_MESSAGE = (
"Your Hugging Face session has expired. "
"Click **Log in with Hugging Face** at the top of the page to sign in again, "
"then retry your message."
)
def _auth_error_message(exc: Exception) -> str | None:
if (
isinstance(exc, HfHubHTTPError)
and exc.response is not None
and exc.response.status_code == 401
):
return SESSION_EXPIRED_MESSAGE
message = str(exc).lower()
if (
"401" in message
or "unauthorized" in message
or "oauth token has expired" in message
):
return SESSION_EXPIRED_MESSAGE
return None
def respond(
message: str | dict[str, Any],
history: list[dict[str, Any]],
system_message,
max_tokens,
temperature,
top_p,
globe_state: dict[str, Any],
hf_token: gr.OAuthToken | None,
):
"""
For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference
"""
if hf_token is None and not hub_token_available():
yield from yield_response([], AUTH_REQUIRED_MESSAGE, globe_state)
return
if hf_token is not None and hf_token.expires_at <= time.time():
yield from yield_response([], SESSION_EXPIRED_MESSAGE, globe_state)
return
api_token = hf_token.token if hf_token is not None else os.environ.get("HF_TOKEN", "")
client = InferenceClient(api_key=api_token, model=MODEL_ID)
user_content = multimodal_input_to_api_content(message)
if not user_content:
yield from yield_response(
[],
"Please enter a message or attach a file.",
globe_state,
)
return
api_messages: list[dict[str, Any]] = [
{"role": "system", "content": system_message},
*history_to_api_messages(history),
{"role": "user", "content": user_content},
]
ui_messages: list[ChatMessage] = []
try:
content, reasoning, tool_calls = complete_turn(
client,
api_messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
tools=THINK_TOOLS,
tool_choice="required",
)
if not tool_calls:
thought = content or reasoning or "Plan the response before taking action."
tool_calls = [
{
"id": f"call_{uuid.uuid4().hex}",
"type": "function",
"function": {
"name": "think",
"arguments": json.dumps({"thought": thought}),
},
}
]
content = ""
if should_emit_reasoning(reasoning, tool_calls):
for ui_msg, globe_state in emit_thinking_message(
ui_messages, reasoning, globe_state
):
yield ui_msg, globe_state
for ui_msg, globe_state in execute_tool_calls(
api_messages,
ui_messages,
tool_calls,
content,
globe_state,
):
yield ui_msg, globe_state
for _ in range(MAX_TOOL_ROUNDS):
content, reasoning, tool_calls = complete_turn(
client,
api_messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
tools=TOOLS,
tool_choice="auto",
)
if should_emit_reasoning(reasoning, tool_calls):
for ui_msg, globe_state in emit_thinking_message(
ui_messages, reasoning, globe_state
):
yield ui_msg, globe_state
if tool_calls:
for ui_msg, globe_state in execute_tool_calls(
api_messages,
ui_messages,
tool_calls,
content,
globe_state,
):
yield ui_msg, globe_state
continue
answer = content
if answer:
yield from yield_response(ui_messages, answer, globe_state)
return
# Retry once with a required tool call when the model returned nothing.
content, reasoning, tool_calls = complete_turn(
client,
api_messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
tools=TOOLS,
tool_choice="required",
)
if should_emit_reasoning(reasoning, tool_calls):
for ui_msg, globe_state in emit_thinking_message(
ui_messages, reasoning, globe_state
):
yield ui_msg, globe_state
if tool_calls:
for ui_msg, globe_state in execute_tool_calls(
api_messages,
ui_messages,
tool_calls,
content,
globe_state,
):
yield ui_msg, globe_state
continue
answer = content
if answer:
yield from yield_response(ui_messages, answer, globe_state)
return
# Some providers return an empty first turn when tools are enabled.
content, reasoning, _ = complete_turn(
client,
api_messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
tools=None,
)
answer = content
if answer:
yield from yield_response(ui_messages, answer, globe_state)
return
yield from yield_response(
ui_messages,
"I could not generate a response. Please try again.",
globe_state,
)
return
except Exception as exc:
message = _auth_error_message(exc) or (
f"Sorry, something went wrong while generating a response: {exc}"
)
yield from yield_response(ui_messages, message, globe_state)
return
try:
content, reasoning, _ = complete_turn(
client,
api_messages
+ [
{
"role": "user",
"content": (
"Synthesize the research gathered so far into the best "
"possible partial answer. Clearly label any items that "
"still need verification on official sources."
),
}
],
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
tools=None,
)
answer = content
if answer:
yield from yield_response(ui_messages, answer, globe_state)
return
except Exception:
pass
findings = findings_from_ui_messages(ui_messages)
fallback_answer = build_structured_final_answer(
profile_summary="",
findings=findings,
preamble=(
"I could not complete model synthesis, but here is the structured "
"research gathered so far."
),
)
yield from yield_response(ui_messages, fallback_answer, globe_state)