peijun1's picture
Deploy AI Studio Proxy API to Hugging Face Spaces
a5784e9
Raw
History Blame Contribute Delete
32.6 kB
import base64
import json
import logging
import os
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Union, cast
from urllib.parse import unquote, urlparse
from api_utils.utils_ext.files import extract_data_url_to_local, save_blob_to_local
from api_utils.utils_ext.function_calling_orchestrator import should_skip_tool_injection
from logging_utils import set_request_id
from models import Message
if TYPE_CHECKING:
from api_utils.utils_ext.function_calling_orchestrator import FunctionCallingState
def prepare_combined_prompt(
messages: List[Message],
req_id: str,
tools: Optional[List[Dict[str, Any]]] = None,
tool_choice: Optional[Union[str, Dict[str, Any]]] = None,
fc_state: Optional["FunctionCallingState"] = None,
) -> Tuple[str, List[str]]:
"""Prepare combined prompt"""
logger = logging.getLogger("AIStudioProxyServer")
set_request_id(req_id)
# Track summary stats for consolidated logging
_has_system_prompt = False
_msg_count = len(messages)
# Do not clear upload_files here; it is cleared by the upper layer at the start of each request as needed
# to avoid "file not found" errors caused by loss of historical attachments.
combined_parts: List[str] = []
system_prompt_content: Optional[str] = None
processed_system_message_indices: Set[int] = set()
files_list: List[
str
] = [] # Collect local file paths to be uploaded (images, videos, PDFs, etc.)
# If available tools are declared, inject the tool catalog before the prompt to help the model know available functions
# Skip injection when using native function calling mode (tools configured via UI)
# Pass fc_state to handle AUTO mode fallback correctly
if isinstance(tools, list) and len(tools) > 0:
if should_skip_tool_injection(tools, fc_state=fc_state):
logger.debug(
f"[{req_id}] Skipping tool catalog injection - native mode active and configured"
)
else:
try:
tool_lines: List[str] = ["Available Tools Catalog:"]
for t in tools:
name: Optional[str] = None
params_schema: Optional[Dict[str, Any]] = None
# t is Dict[str, Any] from List[Dict[str, Any]]
fn_val: Any = t.get("function") if "function" in t else t
if isinstance(fn_val, dict):
# Type narrowed: fn_val is dict
typed_fn: Dict[str, Any] = cast(Dict[str, Any], fn_val)
name_raw: Any = typed_fn.get("name") or t.get("name")
if isinstance(name_raw, str):
name = name_raw
params_raw: Any = typed_fn.get("parameters")
if isinstance(params_raw, dict):
params_schema = cast(Dict[str, Any], params_raw)
else:
# fn_val is not dict, get name directly from t
name_raw: Any = t.get("name")
if isinstance(name_raw, str):
name = name_raw
if name:
tool_lines.append(f"- Function: {name}")
if params_schema:
try:
tool_lines.append(
f" Parameter Schema: {json.dumps(params_schema, ensure_ascii=False)}"
)
except Exception:
pass
if tool_choice:
# Explicitly request or suggest callable function name
chosen_name: Optional[str] = None
if isinstance(tool_choice, dict):
# Type narrowed to dict by isinstance
typed_tool_choice: Dict[str, Any] = tool_choice
fn_val: Any = typed_tool_choice.get("function")
if isinstance(fn_val, dict):
# Standard format: {"type": "function", "function": {"name": "..."}}
typed_fn: Dict[str, Any] = cast(Dict[str, Any], fn_val)
name_raw: Any = typed_fn.get("name")
if isinstance(name_raw, str):
chosen_name = name_raw
elif "name" in typed_tool_choice:
# Flat format: {"type": "function", "name": "..."}
name_raw = typed_tool_choice.get("name")
if isinstance(name_raw, str):
chosen_name = name_raw
elif tool_choice.lower() not in (
"auto",
"none",
"no",
"off",
"required",
"any",
):
chosen_name = tool_choice
if chosen_name:
tool_lines.append(f"Recommended function to use: {chosen_name}")
combined_parts.append("\n".join(tool_lines) + "\n---\n")
except Exception:
pass
# Process system messages
for i, msg in enumerate(messages):
if msg.role == "system":
content = msg.content
if isinstance(content, str) and content.strip():
system_prompt_content = content.strip()
processed_system_message_indices.add(i)
_has_system_prompt = True
logger.debug(
f"Found system prompt at index {i}: {system_prompt_content[:80]}..."
)
system_instr_prefix = "System Instructions:\n"
combined_parts.append(f"{system_instr_prefix}{system_prompt_content}")
else:
logger.debug(f"Ignoring empty system message at index {i}")
processed_system_message_indices.add(i)
break
role_map_ui = {
"user": "User",
"assistant": "Assistant",
"system": "System",
"tool": "Tool",
}
turn_separator = "\n---\n"
# Process other messages
for i, msg in enumerate(messages):
if i in processed_system_message_indices:
continue
if msg.role == "system":
logger.debug(f"Skipping subsequent system message at index {i}")
continue
if combined_parts:
combined_parts.append(turn_separator)
role = msg.role or "unknown"
role_prefix_ui = f"{role_map_ui.get(role, role.capitalize())}:\n"
current_turn_parts: List[str] = [role_prefix_ui]
content = msg.content or ""
content_str: str = ""
if isinstance(content, str):
content_str = content.strip()
elif isinstance(content, list):
# Process multimodal content
text_parts: List[str] = []
for item in content:
# Get item type
item_type: Optional[str] = None
try:
# Guard against property exceptions when using hasattr/getattr
if hasattr(item, "type"):
item_type = item.type
except Exception:
item_type = None
if item_type is None and isinstance(item, dict):
typed_item: Dict[str, Any] = cast(Dict[str, Any], item)
item_type_raw: Any = typed_item.get("type")
if isinstance(item_type_raw, str):
item_type = item_type_raw
if item_type == "text":
# Text item
if hasattr(item, "text"):
text_parts.append(getattr(item, "text", "") or "")
elif isinstance(item, dict):
typed_item: Dict[str, Any] = cast(Dict[str, Any], item)
text_raw: Any = typed_item.get("text", "")
text_parts.append(str(text_raw))
continue
# Image/File/Media URL item
if item_type in (
"image_url",
"file_url",
"media_url",
"input_image",
) or (
isinstance(item, dict)
and (
"image_url" in item
or "input_image" in item
or "file_url" in item
or "media_url" in item
or "url" in item
)
):
try:
url_value: Optional[str] = None
# Pydantic object attributes
if hasattr(item, "image_url") and item.image_url:
url_value = item.image_url.url
try:
detail_val: Optional[str] = getattr(
item.image_url, "detail", None
)
if detail_val:
text_parts.append(
f"[Image Details: detail={detail_val}]"
)
except Exception:
pass
elif hasattr(item, "input_image") and item.input_image:
url_value = item.input_image.url
try:
detail_val: Optional[str] = getattr(
item.input_image, "detail", None
)
if detail_val:
text_parts.append(
f"[Image Details: detail={detail_val}]"
)
except Exception:
pass
elif hasattr(item, "file_url") and item.file_url:
url_value = item.file_url.url
elif hasattr(item, "media_url") and item.media_url:
url_value = item.media_url.url
elif hasattr(item, "url") and item.url:
url_value = item.url
# Dictionary structure (backwards compatibility)
if url_value is None and isinstance(item, dict):
typed_item: Dict[str, Any] = cast(Dict[str, Any], item)
image_url_raw: Any = typed_item.get("image_url")
input_image_raw: Any = typed_item.get("input_image")
if isinstance(image_url_raw, dict):
typed_img_url: Dict[str, Any] = cast(
Dict[str, Any], image_url_raw
)
url_raw: Any = typed_img_url.get("url")
if isinstance(url_raw, str):
url_value = url_raw
detail_raw: Any = typed_img_url.get("detail")
if isinstance(detail_raw, str):
text_parts.append(
f"[Image Details: detail={detail_raw}]"
)
elif isinstance(image_url_raw, str):
url_value = image_url_raw
elif isinstance(input_image_raw, dict):
typed_input_img: Dict[str, Any] = cast(
Dict[str, Any], input_image_raw
)
url_raw: Any = typed_input_img.get("url")
if isinstance(url_raw, str):
url_value = url_raw
detail_raw: Any = typed_input_img.get("detail")
if isinstance(detail_raw, str):
text_parts.append(
f"[Image Details: detail={detail_raw}]"
)
elif isinstance(input_image_raw, str):
url_value = input_image_raw
else:
# Check other URL fields
file_url_raw: Any = typed_item.get("file_url")
media_url_raw: Any = typed_item.get("media_url")
file_raw: Any = typed_item.get("file")
if isinstance(file_url_raw, dict):
typed_file_url: Dict[str, Any] = cast(
Dict[str, Any], file_url_raw
)
url_raw: Any = typed_file_url.get("url")
if isinstance(url_raw, str):
url_value = url_raw
elif isinstance(file_url_raw, str):
url_value = file_url_raw
elif isinstance(media_url_raw, dict):
typed_media_url: Dict[str, Any] = cast(
Dict[str, Any], media_url_raw
)
url_raw: Any = typed_media_url.get("url")
if isinstance(url_raw, str):
url_value = url_raw
elif isinstance(media_url_raw, str):
url_value = media_url_raw
elif "url" in typed_item:
url_raw: Any = typed_item.get("url")
if isinstance(url_raw, str):
url_value = url_raw
elif isinstance(file_raw, dict):
# Compatible with general file field
typed_file: Dict[str, Any] = cast(
Dict[str, Any], file_raw
)
url_raw: Any = typed_file.get(
"url"
) or typed_file.get("path")
if isinstance(url_raw, str):
url_value = url_raw
url_value = (url_value or "").strip()
if not url_value:
continue
# Normalize to local file list and log
if url_value.startswith("data:"):
file_path = extract_data_url_to_local(
url_value, req_id=req_id
)
if file_path:
files_list.append(file_path)
logger.debug(
f"(Prepare Prompt) Identified and added data:URL attachment: {file_path}"
)
elif url_value.startswith("file:"):
parsed = urlparse(url_value)
local_path = unquote(parsed.path)
if os.path.exists(local_path):
files_list.append(local_path)
logger.debug(
f"(Prepare Prompt) Identified and added local attachment (file://): {local_path}"
)
else:
logger.warning(
f"(Prepare Prompt) Local file pointed to by file URL does not exist: {local_path}"
)
elif os.path.isabs(url_value) and os.path.exists(url_value):
files_list.append(url_value)
logger.debug(
f"(Prepare Prompt) Identified and added local attachment (absolute path): {url_value}"
)
else:
logger.debug(
f"(Prepare Prompt) Ignoring non-local attachment URL: {url_value}"
)
except Exception as e:
logger.warning(
f"(Prepare Prompt) Error processing attachment URL: {e}"
)
continue
# Audio/Video input
if item_type in ("input_audio", "input_video"):
try:
inp: Any = None
if hasattr(item, "input_audio") and item.input_audio:
inp = item.input_audio
elif hasattr(item, "input_video") and item.input_video:
inp = item.input_video
elif isinstance(item, dict):
typed_item: Dict[str, Any] = cast(Dict[str, Any], item)
inp = typed_item.get("input_audio") or typed_item.get(
"input_video"
)
if inp:
url_value: Optional[str] = None
data_val: Optional[str] = None
mime_val: Optional[str] = None
fmt_val: Optional[str] = None
if isinstance(inp, dict):
typed_inp: Dict[str, Any] = cast(Dict[str, Any], inp)
url_raw: Any = typed_inp.get("url")
if isinstance(url_raw, str):
url_value = url_raw
data_raw: Any = typed_inp.get("data")
if isinstance(data_raw, str):
data_val = data_raw
mime_raw: Any = typed_inp.get("mime_type")
if isinstance(mime_raw, str):
mime_val = mime_raw
fmt_raw: Any = typed_inp.get("format")
if isinstance(fmt_raw, str):
fmt_val = fmt_raw
else:
# Pydantic model or object with attributes
url_attr: Any = getattr(inp, "url", None)
if isinstance(url_attr, str):
url_value = url_attr
data_attr: Any = getattr(inp, "data", None)
if isinstance(data_attr, str):
data_val = data_attr
mime_attr: Any = getattr(inp, "mime_type", None)
if isinstance(mime_attr, str):
mime_val = mime_attr
fmt_attr: Any = getattr(inp, "format", None)
if isinstance(fmt_attr, str):
fmt_val = fmt_attr
if url_value:
if url_value.startswith("data:"):
saved = extract_data_url_to_local(
url_value, req_id=req_id
)
if saved:
files_list.append(saved)
logger.debug(
f"(Prepare Prompt) Identified and added audio/video data:URL attachment: {saved}"
)
elif url_value.startswith("file:"):
parsed = urlparse(url_value)
local_path = unquote(parsed.path)
if os.path.exists(local_path):
files_list.append(local_path)
logger.debug(
f"(Prepare Prompt) Identified and added local audio/video attachment (file://): {local_path}"
)
elif os.path.isabs(url_value) and os.path.exists(
url_value
):
files_list.append(url_value)
logger.debug(
f"(Prepare Prompt) Identified and added local audio/video attachment (absolute path): {url_value}"
)
elif data_val:
if isinstance(data_val, str) and data_val.startswith(
"data:"
):
saved = extract_data_url_to_local(
data_val, req_id=req_id
)
if saved:
files_list.append(saved)
logger.debug(
f"(Prepare Prompt) Identified and added audio/video data:URL attachment: {saved}"
)
else:
# Treat as pure base64 data
try:
raw = base64.b64decode(data_val)
saved = save_blob_to_local(
raw, mime_val, fmt_val, req_id=req_id
)
if saved:
files_list.append(saved)
logger.debug(
f"(Prepare Prompt) Identified and added audio/video base64 attachment: {saved}"
)
except Exception:
pass
except Exception as e:
logger.warning(
f"(Prepare Prompt) Error processing audio/video input: {e}"
)
continue
# Other unknown items: log without affecting
logger.warning(
f"(Prepare Prompt) Warning: Ignoring non-text or unknown type content item in message at index {i}"
)
content_str = "\n".join(text_parts).strip()
elif isinstance(content, dict):
# Compatible with dictionary format content, may contain 'attachments'/'images'/'media'/'files'
typed_content: Dict[str, Any] = cast(Dict[str, Any], content)
text_parts = []
attachments_keys = ["attachments", "images", "media", "files"]
for key in attachments_keys:
items: Any = typed_content.get(key)
if isinstance(items, list):
for it in items:
url_value: Optional[str] = None
if isinstance(it, str):
url_value = it
elif isinstance(it, dict):
typed_it: Dict[str, Any] = cast(Dict[str, Any], it)
url_raw: Any = typed_it.get("url") or typed_it.get("path")
if isinstance(url_raw, str):
url_value = url_raw
if not url_value:
image_url_raw: Any = typed_it.get("image_url")
input_image_raw: Any = typed_it.get("input_image")
if isinstance(image_url_raw, dict):
typed_img_url: Dict[str, Any] = cast(
Dict[str, Any], image_url_raw
)
url_from_image: Any = typed_img_url.get("url")
if isinstance(url_from_image, str):
url_value = url_from_image
elif isinstance(input_image_raw, dict):
typed_input_img: Dict[str, Any] = cast(
Dict[str, Any], input_image_raw
)
url_from_input: Any = typed_input_img.get("url")
if isinstance(url_from_input, str):
url_value = url_from_input
if not url_value:
continue
url_value = url_value.strip()
if not url_value:
continue
if url_value.startswith("data:"):
fp = extract_data_url_to_local(url_value)
if fp:
files_list.append(fp)
logger.debug(
f"(Prepare Prompt) Identified and added dict attachment data:URL: {fp}"
)
elif url_value.startswith("file:"):
parsed = urlparse(url_value)
lp = unquote(parsed.path)
if os.path.exists(lp):
files_list.append(lp)
logger.debug(
f"(Prepare Prompt) Identified and added dict attachment file://: {lp}"
)
elif os.path.isabs(url_value) and os.path.exists(url_value):
files_list.append(url_value)
logger.debug(
f"(Prepare Prompt) Identified and added dict attachment absolute path: {url_value}"
)
else:
logger.debug(
f"(Prepare Prompt) Ignoring non-local URL for dict attachment: {url_value}"
)
# Also append potential plain text description in dictionary
text_field: Any = typed_content.get("text")
if isinstance(text_field, str):
text_parts.append(text_field)
content_str = "\n".join(text_parts).strip()
else:
logger.warning(
f"(Prepare Prompt) Warning: Unexpected content type for role {role} at index {i} ({type(content)}) or is None."
)
content_str = str(content or "").strip()
if content_str:
current_turn_parts.append(content_str)
# Handle tool calls (visualize only, do not execute actively here to avoid conflict with client execution in conversational loop)
tool_calls = msg.tool_calls
if role == "assistant" and tool_calls:
if content_str:
current_turn_parts.append("\n")
tool_call_visualizations = []
for tool_call in tool_calls:
if hasattr(tool_call, "type") and tool_call.type == "function":
function_call = tool_call.function
func_name = function_call.name if function_call else None
func_args_str = function_call.arguments if function_call else None
try:
parsed_args = json.loads(
func_args_str if func_args_str else "{}"
)
formatted_args = json.dumps(
parsed_args, indent=2, ensure_ascii=False
)
except (json.JSONDecodeError, TypeError):
formatted_args = (
func_args_str if func_args_str is not None else "{}"
)
tool_call_visualizations.append(
f"Request function call: {func_name}\nParameters:\n{formatted_args}"
)
if tool_call_visualizations:
current_turn_parts.append("\n".join(tool_call_visualizations))
# Handle tool result messages (role = 'tool'): include in prompt so model sees tool output
if role == "tool":
tool_result_lines: List[str] = []
# Standard OpenAI style: content is string, tool_call_id associates with previous call
tool_call_id = getattr(msg, "tool_call_id", None)
if tool_call_id:
tool_result_lines.append(f"Tool result (tool_call_id={tool_call_id}):")
if isinstance(msg.content, str):
tool_result_lines.append(msg.content)
elif isinstance(msg.content, list):
# Compatible with few clients putting results in a list
try:
merged_parts: List[str] = []
for it in msg.content:
if isinstance(it, dict):
if it.get("type") == "text":
text_raw = it.get("text", "")
if isinstance(text_raw, str):
merged_parts.append(text_raw)
else:
merged_parts.append(str(text_raw))
else:
merged_parts.append(str(it))
else:
merged_parts.append(str(it))
merged = "\n".join(merged_parts)
tool_result_lines.append(merged)
except Exception:
tool_result_lines.append(str(msg.content))
else:
tool_result_lines.append(str(msg.content))
if tool_result_lines:
if content_str:
current_turn_parts.append("\n")
current_turn_parts.append("\n".join(tool_result_lines))
if len(current_turn_parts) > 1 or (role == "assistant" and tool_calls):
combined_parts.append("".join(current_turn_parts))
elif not combined_parts and not current_turn_parts:
logger.debug(
f"(Prepare Prompt) Skipping empty message for role {role} at index {i} (and no tool calls)."
)
elif len(current_turn_parts) == 1 and not combined_parts:
logger.debug(
f"(Prepare Prompt) Skipping empty message for role {role} at index {i} (prefix only)."
)
final_prompt = "".join(combined_parts)
if final_prompt:
final_prompt += "\n"
# Consolidated English summary (replaces verbose Chinese logs)
sys_indicator = "Yes" if _has_system_prompt else "No"
attach_info = f", {len(files_list)} attachments" if files_list else ""
logger.debug(
f"[Prompt] Built messages: {_msg_count} (System: {sys_indicator}), "
f"Total {len(final_prompt):,} characters{attach_info}"
)
return final_prompt, files_list