Spaces:
Paused
Paused
| import base64 | |
| import json | |
| import logging | |
| import os | |
| from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, Union, cast | |
| from urllib.parse import unquote, urlparse | |
| from api_utils.utils_ext.files import extract_data_url_to_local, save_blob_to_local | |
| from api_utils.utils_ext.function_calling_orchestrator import should_skip_tool_injection | |
| from logging_utils import set_request_id | |
| from models import Message | |
| if TYPE_CHECKING: | |
| from api_utils.utils_ext.function_calling_orchestrator import FunctionCallingState | |
| def prepare_combined_prompt( | |
| messages: List[Message], | |
| req_id: str, | |
| tools: Optional[List[Dict[str, Any]]] = None, | |
| tool_choice: Optional[Union[str, Dict[str, Any]]] = None, | |
| fc_state: Optional["FunctionCallingState"] = None, | |
| ) -> Tuple[str, List[str]]: | |
| """Prepare combined prompt""" | |
| logger = logging.getLogger("AIStudioProxyServer") | |
| set_request_id(req_id) | |
| # Track summary stats for consolidated logging | |
| _has_system_prompt = False | |
| _msg_count = len(messages) | |
| # Do not clear upload_files here; it is cleared by the upper layer at the start of each request as needed | |
| # to avoid "file not found" errors caused by loss of historical attachments. | |
| combined_parts: List[str] = [] | |
| system_prompt_content: Optional[str] = None | |
| processed_system_message_indices: Set[int] = set() | |
| files_list: List[ | |
| str | |
| ] = [] # Collect local file paths to be uploaded (images, videos, PDFs, etc.) | |
| # If available tools are declared, inject the tool catalog before the prompt to help the model know available functions | |
| # Skip injection when using native function calling mode (tools configured via UI) | |
| # Pass fc_state to handle AUTO mode fallback correctly | |
| if isinstance(tools, list) and len(tools) > 0: | |
| if should_skip_tool_injection(tools, fc_state=fc_state): | |
| logger.debug( | |
| f"[{req_id}] Skipping tool catalog injection - native mode active and configured" | |
| ) | |
| else: | |
| try: | |
| tool_lines: List[str] = ["Available Tools Catalog:"] | |
| for t in tools: | |
| name: Optional[str] = None | |
| params_schema: Optional[Dict[str, Any]] = None | |
| # t is Dict[str, Any] from List[Dict[str, Any]] | |
| fn_val: Any = t.get("function") if "function" in t else t | |
| if isinstance(fn_val, dict): | |
| # Type narrowed: fn_val is dict | |
| typed_fn: Dict[str, Any] = cast(Dict[str, Any], fn_val) | |
| name_raw: Any = typed_fn.get("name") or t.get("name") | |
| if isinstance(name_raw, str): | |
| name = name_raw | |
| params_raw: Any = typed_fn.get("parameters") | |
| if isinstance(params_raw, dict): | |
| params_schema = cast(Dict[str, Any], params_raw) | |
| else: | |
| # fn_val is not dict, get name directly from t | |
| name_raw: Any = t.get("name") | |
| if isinstance(name_raw, str): | |
| name = name_raw | |
| if name: | |
| tool_lines.append(f"- Function: {name}") | |
| if params_schema: | |
| try: | |
| tool_lines.append( | |
| f" Parameter Schema: {json.dumps(params_schema, ensure_ascii=False)}" | |
| ) | |
| except Exception: | |
| pass | |
| if tool_choice: | |
| # Explicitly request or suggest callable function name | |
| chosen_name: Optional[str] = None | |
| if isinstance(tool_choice, dict): | |
| # Type narrowed to dict by isinstance | |
| typed_tool_choice: Dict[str, Any] = tool_choice | |
| fn_val: Any = typed_tool_choice.get("function") | |
| if isinstance(fn_val, dict): | |
| # Standard format: {"type": "function", "function": {"name": "..."}} | |
| typed_fn: Dict[str, Any] = cast(Dict[str, Any], fn_val) | |
| name_raw: Any = typed_fn.get("name") | |
| if isinstance(name_raw, str): | |
| chosen_name = name_raw | |
| elif "name" in typed_tool_choice: | |
| # Flat format: {"type": "function", "name": "..."} | |
| name_raw = typed_tool_choice.get("name") | |
| if isinstance(name_raw, str): | |
| chosen_name = name_raw | |
| elif tool_choice.lower() not in ( | |
| "auto", | |
| "none", | |
| "no", | |
| "off", | |
| "required", | |
| "any", | |
| ): | |
| chosen_name = tool_choice | |
| if chosen_name: | |
| tool_lines.append(f"Recommended function to use: {chosen_name}") | |
| combined_parts.append("\n".join(tool_lines) + "\n---\n") | |
| except Exception: | |
| pass | |
| # Process system messages | |
| for i, msg in enumerate(messages): | |
| if msg.role == "system": | |
| content = msg.content | |
| if isinstance(content, str) and content.strip(): | |
| system_prompt_content = content.strip() | |
| processed_system_message_indices.add(i) | |
| _has_system_prompt = True | |
| logger.debug( | |
| f"Found system prompt at index {i}: {system_prompt_content[:80]}..." | |
| ) | |
| system_instr_prefix = "System Instructions:\n" | |
| combined_parts.append(f"{system_instr_prefix}{system_prompt_content}") | |
| else: | |
| logger.debug(f"Ignoring empty system message at index {i}") | |
| processed_system_message_indices.add(i) | |
| break | |
| role_map_ui = { | |
| "user": "User", | |
| "assistant": "Assistant", | |
| "system": "System", | |
| "tool": "Tool", | |
| } | |
| turn_separator = "\n---\n" | |
| # Process other messages | |
| for i, msg in enumerate(messages): | |
| if i in processed_system_message_indices: | |
| continue | |
| if msg.role == "system": | |
| logger.debug(f"Skipping subsequent system message at index {i}") | |
| continue | |
| if combined_parts: | |
| combined_parts.append(turn_separator) | |
| role = msg.role or "unknown" | |
| role_prefix_ui = f"{role_map_ui.get(role, role.capitalize())}:\n" | |
| current_turn_parts: List[str] = [role_prefix_ui] | |
| content = msg.content or "" | |
| content_str: str = "" | |
| if isinstance(content, str): | |
| content_str = content.strip() | |
| elif isinstance(content, list): | |
| # Process multimodal content | |
| text_parts: List[str] = [] | |
| for item in content: | |
| # Get item type | |
| item_type: Optional[str] = None | |
| try: | |
| # Guard against property exceptions when using hasattr/getattr | |
| if hasattr(item, "type"): | |
| item_type = item.type | |
| except Exception: | |
| item_type = None | |
| if item_type is None and isinstance(item, dict): | |
| typed_item: Dict[str, Any] = cast(Dict[str, Any], item) | |
| item_type_raw: Any = typed_item.get("type") | |
| if isinstance(item_type_raw, str): | |
| item_type = item_type_raw | |
| if item_type == "text": | |
| # Text item | |
| if hasattr(item, "text"): | |
| text_parts.append(getattr(item, "text", "") or "") | |
| elif isinstance(item, dict): | |
| typed_item: Dict[str, Any] = cast(Dict[str, Any], item) | |
| text_raw: Any = typed_item.get("text", "") | |
| text_parts.append(str(text_raw)) | |
| continue | |
| # Image/File/Media URL item | |
| if item_type in ( | |
| "image_url", | |
| "file_url", | |
| "media_url", | |
| "input_image", | |
| ) or ( | |
| isinstance(item, dict) | |
| and ( | |
| "image_url" in item | |
| or "input_image" in item | |
| or "file_url" in item | |
| or "media_url" in item | |
| or "url" in item | |
| ) | |
| ): | |
| try: | |
| url_value: Optional[str] = None | |
| # Pydantic object attributes | |
| if hasattr(item, "image_url") and item.image_url: | |
| url_value = item.image_url.url | |
| try: | |
| detail_val: Optional[str] = getattr( | |
| item.image_url, "detail", None | |
| ) | |
| if detail_val: | |
| text_parts.append( | |
| f"[Image Details: detail={detail_val}]" | |
| ) | |
| except Exception: | |
| pass | |
| elif hasattr(item, "input_image") and item.input_image: | |
| url_value = item.input_image.url | |
| try: | |
| detail_val: Optional[str] = getattr( | |
| item.input_image, "detail", None | |
| ) | |
| if detail_val: | |
| text_parts.append( | |
| f"[Image Details: detail={detail_val}]" | |
| ) | |
| except Exception: | |
| pass | |
| elif hasattr(item, "file_url") and item.file_url: | |
| url_value = item.file_url.url | |
| elif hasattr(item, "media_url") and item.media_url: | |
| url_value = item.media_url.url | |
| elif hasattr(item, "url") and item.url: | |
| url_value = item.url | |
| # Dictionary structure (backwards compatibility) | |
| if url_value is None and isinstance(item, dict): | |
| typed_item: Dict[str, Any] = cast(Dict[str, Any], item) | |
| image_url_raw: Any = typed_item.get("image_url") | |
| input_image_raw: Any = typed_item.get("input_image") | |
| if isinstance(image_url_raw, dict): | |
| typed_img_url: Dict[str, Any] = cast( | |
| Dict[str, Any], image_url_raw | |
| ) | |
| url_raw: Any = typed_img_url.get("url") | |
| if isinstance(url_raw, str): | |
| url_value = url_raw | |
| detail_raw: Any = typed_img_url.get("detail") | |
| if isinstance(detail_raw, str): | |
| text_parts.append( | |
| f"[Image Details: detail={detail_raw}]" | |
| ) | |
| elif isinstance(image_url_raw, str): | |
| url_value = image_url_raw | |
| elif isinstance(input_image_raw, dict): | |
| typed_input_img: Dict[str, Any] = cast( | |
| Dict[str, Any], input_image_raw | |
| ) | |
| url_raw: Any = typed_input_img.get("url") | |
| if isinstance(url_raw, str): | |
| url_value = url_raw | |
| detail_raw: Any = typed_input_img.get("detail") | |
| if isinstance(detail_raw, str): | |
| text_parts.append( | |
| f"[Image Details: detail={detail_raw}]" | |
| ) | |
| elif isinstance(input_image_raw, str): | |
| url_value = input_image_raw | |
| else: | |
| # Check other URL fields | |
| file_url_raw: Any = typed_item.get("file_url") | |
| media_url_raw: Any = typed_item.get("media_url") | |
| file_raw: Any = typed_item.get("file") | |
| if isinstance(file_url_raw, dict): | |
| typed_file_url: Dict[str, Any] = cast( | |
| Dict[str, Any], file_url_raw | |
| ) | |
| url_raw: Any = typed_file_url.get("url") | |
| if isinstance(url_raw, str): | |
| url_value = url_raw | |
| elif isinstance(file_url_raw, str): | |
| url_value = file_url_raw | |
| elif isinstance(media_url_raw, dict): | |
| typed_media_url: Dict[str, Any] = cast( | |
| Dict[str, Any], media_url_raw | |
| ) | |
| url_raw: Any = typed_media_url.get("url") | |
| if isinstance(url_raw, str): | |
| url_value = url_raw | |
| elif isinstance(media_url_raw, str): | |
| url_value = media_url_raw | |
| elif "url" in typed_item: | |
| url_raw: Any = typed_item.get("url") | |
| if isinstance(url_raw, str): | |
| url_value = url_raw | |
| elif isinstance(file_raw, dict): | |
| # Compatible with general file field | |
| typed_file: Dict[str, Any] = cast( | |
| Dict[str, Any], file_raw | |
| ) | |
| url_raw: Any = typed_file.get( | |
| "url" | |
| ) or typed_file.get("path") | |
| if isinstance(url_raw, str): | |
| url_value = url_raw | |
| url_value = (url_value or "").strip() | |
| if not url_value: | |
| continue | |
| # Normalize to local file list and log | |
| if url_value.startswith("data:"): | |
| file_path = extract_data_url_to_local( | |
| url_value, req_id=req_id | |
| ) | |
| if file_path: | |
| files_list.append(file_path) | |
| logger.debug( | |
| f"(Prepare Prompt) Identified and added data:URL attachment: {file_path}" | |
| ) | |
| elif url_value.startswith("file:"): | |
| parsed = urlparse(url_value) | |
| local_path = unquote(parsed.path) | |
| if os.path.exists(local_path): | |
| files_list.append(local_path) | |
| logger.debug( | |
| f"(Prepare Prompt) Identified and added local attachment (file://): {local_path}" | |
| ) | |
| else: | |
| logger.warning( | |
| f"(Prepare Prompt) Local file pointed to by file URL does not exist: {local_path}" | |
| ) | |
| elif os.path.isabs(url_value) and os.path.exists(url_value): | |
| files_list.append(url_value) | |
| logger.debug( | |
| f"(Prepare Prompt) Identified and added local attachment (absolute path): {url_value}" | |
| ) | |
| else: | |
| logger.debug( | |
| f"(Prepare Prompt) Ignoring non-local attachment URL: {url_value}" | |
| ) | |
| except Exception as e: | |
| logger.warning( | |
| f"(Prepare Prompt) Error processing attachment URL: {e}" | |
| ) | |
| continue | |
| # Audio/Video input | |
| if item_type in ("input_audio", "input_video"): | |
| try: | |
| inp: Any = None | |
| if hasattr(item, "input_audio") and item.input_audio: | |
| inp = item.input_audio | |
| elif hasattr(item, "input_video") and item.input_video: | |
| inp = item.input_video | |
| elif isinstance(item, dict): | |
| typed_item: Dict[str, Any] = cast(Dict[str, Any], item) | |
| inp = typed_item.get("input_audio") or typed_item.get( | |
| "input_video" | |
| ) | |
| if inp: | |
| url_value: Optional[str] = None | |
| data_val: Optional[str] = None | |
| mime_val: Optional[str] = None | |
| fmt_val: Optional[str] = None | |
| if isinstance(inp, dict): | |
| typed_inp: Dict[str, Any] = cast(Dict[str, Any], inp) | |
| url_raw: Any = typed_inp.get("url") | |
| if isinstance(url_raw, str): | |
| url_value = url_raw | |
| data_raw: Any = typed_inp.get("data") | |
| if isinstance(data_raw, str): | |
| data_val = data_raw | |
| mime_raw: Any = typed_inp.get("mime_type") | |
| if isinstance(mime_raw, str): | |
| mime_val = mime_raw | |
| fmt_raw: Any = typed_inp.get("format") | |
| if isinstance(fmt_raw, str): | |
| fmt_val = fmt_raw | |
| else: | |
| # Pydantic model or object with attributes | |
| url_attr: Any = getattr(inp, "url", None) | |
| if isinstance(url_attr, str): | |
| url_value = url_attr | |
| data_attr: Any = getattr(inp, "data", None) | |
| if isinstance(data_attr, str): | |
| data_val = data_attr | |
| mime_attr: Any = getattr(inp, "mime_type", None) | |
| if isinstance(mime_attr, str): | |
| mime_val = mime_attr | |
| fmt_attr: Any = getattr(inp, "format", None) | |
| if isinstance(fmt_attr, str): | |
| fmt_val = fmt_attr | |
| if url_value: | |
| if url_value.startswith("data:"): | |
| saved = extract_data_url_to_local( | |
| url_value, req_id=req_id | |
| ) | |
| if saved: | |
| files_list.append(saved) | |
| logger.debug( | |
| f"(Prepare Prompt) Identified and added audio/video data:URL attachment: {saved}" | |
| ) | |
| elif url_value.startswith("file:"): | |
| parsed = urlparse(url_value) | |
| local_path = unquote(parsed.path) | |
| if os.path.exists(local_path): | |
| files_list.append(local_path) | |
| logger.debug( | |
| f"(Prepare Prompt) Identified and added local audio/video attachment (file://): {local_path}" | |
| ) | |
| elif os.path.isabs(url_value) and os.path.exists( | |
| url_value | |
| ): | |
| files_list.append(url_value) | |
| logger.debug( | |
| f"(Prepare Prompt) Identified and added local audio/video attachment (absolute path): {url_value}" | |
| ) | |
| elif data_val: | |
| if isinstance(data_val, str) and data_val.startswith( | |
| "data:" | |
| ): | |
| saved = extract_data_url_to_local( | |
| data_val, req_id=req_id | |
| ) | |
| if saved: | |
| files_list.append(saved) | |
| logger.debug( | |
| f"(Prepare Prompt) Identified and added audio/video data:URL attachment: {saved}" | |
| ) | |
| else: | |
| # Treat as pure base64 data | |
| try: | |
| raw = base64.b64decode(data_val) | |
| saved = save_blob_to_local( | |
| raw, mime_val, fmt_val, req_id=req_id | |
| ) | |
| if saved: | |
| files_list.append(saved) | |
| logger.debug( | |
| f"(Prepare Prompt) Identified and added audio/video base64 attachment: {saved}" | |
| ) | |
| except Exception: | |
| pass | |
| except Exception as e: | |
| logger.warning( | |
| f"(Prepare Prompt) Error processing audio/video input: {e}" | |
| ) | |
| continue | |
| # Other unknown items: log without affecting | |
| logger.warning( | |
| f"(Prepare Prompt) Warning: Ignoring non-text or unknown type content item in message at index {i}" | |
| ) | |
| content_str = "\n".join(text_parts).strip() | |
| elif isinstance(content, dict): | |
| # Compatible with dictionary format content, may contain 'attachments'/'images'/'media'/'files' | |
| typed_content: Dict[str, Any] = cast(Dict[str, Any], content) | |
| text_parts = [] | |
| attachments_keys = ["attachments", "images", "media", "files"] | |
| for key in attachments_keys: | |
| items: Any = typed_content.get(key) | |
| if isinstance(items, list): | |
| for it in items: | |
| url_value: Optional[str] = None | |
| if isinstance(it, str): | |
| url_value = it | |
| elif isinstance(it, dict): | |
| typed_it: Dict[str, Any] = cast(Dict[str, Any], it) | |
| url_raw: Any = typed_it.get("url") or typed_it.get("path") | |
| if isinstance(url_raw, str): | |
| url_value = url_raw | |
| if not url_value: | |
| image_url_raw: Any = typed_it.get("image_url") | |
| input_image_raw: Any = typed_it.get("input_image") | |
| if isinstance(image_url_raw, dict): | |
| typed_img_url: Dict[str, Any] = cast( | |
| Dict[str, Any], image_url_raw | |
| ) | |
| url_from_image: Any = typed_img_url.get("url") | |
| if isinstance(url_from_image, str): | |
| url_value = url_from_image | |
| elif isinstance(input_image_raw, dict): | |
| typed_input_img: Dict[str, Any] = cast( | |
| Dict[str, Any], input_image_raw | |
| ) | |
| url_from_input: Any = typed_input_img.get("url") | |
| if isinstance(url_from_input, str): | |
| url_value = url_from_input | |
| if not url_value: | |
| continue | |
| url_value = url_value.strip() | |
| if not url_value: | |
| continue | |
| if url_value.startswith("data:"): | |
| fp = extract_data_url_to_local(url_value) | |
| if fp: | |
| files_list.append(fp) | |
| logger.debug( | |
| f"(Prepare Prompt) Identified and added dict attachment data:URL: {fp}" | |
| ) | |
| elif url_value.startswith("file:"): | |
| parsed = urlparse(url_value) | |
| lp = unquote(parsed.path) | |
| if os.path.exists(lp): | |
| files_list.append(lp) | |
| logger.debug( | |
| f"(Prepare Prompt) Identified and added dict attachment file://: {lp}" | |
| ) | |
| elif os.path.isabs(url_value) and os.path.exists(url_value): | |
| files_list.append(url_value) | |
| logger.debug( | |
| f"(Prepare Prompt) Identified and added dict attachment absolute path: {url_value}" | |
| ) | |
| else: | |
| logger.debug( | |
| f"(Prepare Prompt) Ignoring non-local URL for dict attachment: {url_value}" | |
| ) | |
| # Also append potential plain text description in dictionary | |
| text_field: Any = typed_content.get("text") | |
| if isinstance(text_field, str): | |
| text_parts.append(text_field) | |
| content_str = "\n".join(text_parts).strip() | |
| else: | |
| logger.warning( | |
| f"(Prepare Prompt) Warning: Unexpected content type for role {role} at index {i} ({type(content)}) or is None." | |
| ) | |
| content_str = str(content or "").strip() | |
| if content_str: | |
| current_turn_parts.append(content_str) | |
| # Handle tool calls (visualize only, do not execute actively here to avoid conflict with client execution in conversational loop) | |
| tool_calls = msg.tool_calls | |
| if role == "assistant" and tool_calls: | |
| if content_str: | |
| current_turn_parts.append("\n") | |
| tool_call_visualizations = [] | |
| for tool_call in tool_calls: | |
| if hasattr(tool_call, "type") and tool_call.type == "function": | |
| function_call = tool_call.function | |
| func_name = function_call.name if function_call else None | |
| func_args_str = function_call.arguments if function_call else None | |
| try: | |
| parsed_args = json.loads( | |
| func_args_str if func_args_str else "{}" | |
| ) | |
| formatted_args = json.dumps( | |
| parsed_args, indent=2, ensure_ascii=False | |
| ) | |
| except (json.JSONDecodeError, TypeError): | |
| formatted_args = ( | |
| func_args_str if func_args_str is not None else "{}" | |
| ) | |
| tool_call_visualizations.append( | |
| f"Request function call: {func_name}\nParameters:\n{formatted_args}" | |
| ) | |
| if tool_call_visualizations: | |
| current_turn_parts.append("\n".join(tool_call_visualizations)) | |
| # Handle tool result messages (role = 'tool'): include in prompt so model sees tool output | |
| if role == "tool": | |
| tool_result_lines: List[str] = [] | |
| # Standard OpenAI style: content is string, tool_call_id associates with previous call | |
| tool_call_id = getattr(msg, "tool_call_id", None) | |
| if tool_call_id: | |
| tool_result_lines.append(f"Tool result (tool_call_id={tool_call_id}):") | |
| if isinstance(msg.content, str): | |
| tool_result_lines.append(msg.content) | |
| elif isinstance(msg.content, list): | |
| # Compatible with few clients putting results in a list | |
| try: | |
| merged_parts: List[str] = [] | |
| for it in msg.content: | |
| if isinstance(it, dict): | |
| if it.get("type") == "text": | |
| text_raw = it.get("text", "") | |
| if isinstance(text_raw, str): | |
| merged_parts.append(text_raw) | |
| else: | |
| merged_parts.append(str(text_raw)) | |
| else: | |
| merged_parts.append(str(it)) | |
| else: | |
| merged_parts.append(str(it)) | |
| merged = "\n".join(merged_parts) | |
| tool_result_lines.append(merged) | |
| except Exception: | |
| tool_result_lines.append(str(msg.content)) | |
| else: | |
| tool_result_lines.append(str(msg.content)) | |
| if tool_result_lines: | |
| if content_str: | |
| current_turn_parts.append("\n") | |
| current_turn_parts.append("\n".join(tool_result_lines)) | |
| if len(current_turn_parts) > 1 or (role == "assistant" and tool_calls): | |
| combined_parts.append("".join(current_turn_parts)) | |
| elif not combined_parts and not current_turn_parts: | |
| logger.debug( | |
| f"(Prepare Prompt) Skipping empty message for role {role} at index {i} (and no tool calls)." | |
| ) | |
| elif len(current_turn_parts) == 1 and not combined_parts: | |
| logger.debug( | |
| f"(Prepare Prompt) Skipping empty message for role {role} at index {i} (prefix only)." | |
| ) | |
| final_prompt = "".join(combined_parts) | |
| if final_prompt: | |
| final_prompt += "\n" | |
| # Consolidated English summary (replaces verbose Chinese logs) | |
| sys_indicator = "Yes" if _has_system_prompt else "No" | |
| attach_info = f", {len(files_list)} attachments" if files_list else "" | |
| logger.debug( | |
| f"[Prompt] Built messages: {_msg_count} (System: {sys_indicator}), " | |
| f"Total {len(final_prompt):,} characters{attach_info}" | |
| ) | |
| return final_prompt, files_list | |