Spaces:
Running
Running
| """Tool calling and multimodal message parsing.""" | |
| import json | |
| import re | |
| import uuid | |
| import base64 | |
| import io | |
| MAX_IMAGE_B64_SIZE = 50000 # ~37KB raw image | |
| def _compress_b64_if_needed(b64: str) -> str: | |
| """Compress image if base64 is too large for text embedding.""" | |
| if len(b64) <= MAX_IMAGE_B64_SIZE: | |
| return b64 | |
| try: | |
| from PIL import Image | |
| img_data = base64.b64decode(b64) | |
| img = Image.open(io.BytesIO(img_data)) | |
| # Resize to max 256px on longest side | |
| max_dim = 256 | |
| ratio = min(max_dim / img.width, max_dim / img.height) | |
| if ratio < 1: | |
| img = img.resize((int(img.width * ratio), int(img.height * ratio)), Image.LANCZOS) | |
| # Convert to JPEG with quality reduction | |
| buf = io.BytesIO() | |
| img.convert("RGB").save(buf, format="JPEG", quality=60) | |
| compressed = base64.b64encode(buf.getvalue()).decode() | |
| return compressed | |
| except Exception: | |
| # If PIL not available, truncate (model will get partial data) | |
| return b64[:MAX_IMAGE_B64_SIZE] | |
| def _build_tool_choice_instruction(tool_choice, tool_defs: list) -> str: | |
| """Build tool_choice constraint instruction. | |
| tool_choice values: | |
| - "none": do not call any tool | |
| - "auto": decide whether to call tools (default) | |
| - "required": must call at least one tool | |
| - {"type": "function", "function": {"name": "xxx"}}: must call specific tool | |
| """ | |
| if tool_choice == "none": | |
| return "\n\nIMPORTANT: Do NOT call any tools. Respond with text only." | |
| if tool_choice == "required": | |
| return "\n\nIMPORTANT: You MUST call at least one tool. Do not respond with text only." | |
| if isinstance(tool_choice, dict): | |
| fn_name = tool_choice.get("function", {}).get("name", "") | |
| if fn_name: | |
| return f'\n\nIMPORTANT: You MUST call the tool "{fn_name}". Do not call other tools.' | |
| return "" | |
| def messages_to_prompt(messages: list, tools: list = None, tool_choice=None) -> tuple: | |
| """Convert OpenAI messages to (prompt_str, images_list). | |
| Returns (prompt, images) where images is a list of (bytes, mime_type) tuples. | |
| """ | |
| parts = [] | |
| images = [] | |
| if tools and tool_choice != "none": | |
| tool_defs = [] | |
| for tool in tools: | |
| fn = tool.get("function", tool) if tool.get("type") == "function" else tool | |
| tool_defs.append({ | |
| "name": fn.get("name", tool.get("name", "")), | |
| "description": fn.get("description", tool.get("description", "")), | |
| "parameters": fn.get("parameters", tool.get("parameters", {})), | |
| }) | |
| if tool_defs: | |
| constraint = _build_tool_choice_instruction(tool_choice, tool_defs) | |
| parts.append( | |
| "# Tool Use\n\n" | |
| "You can call the following tools. Call format:\n" | |
| '```tool_call\n{"name": "func_name", "arguments": {...}}\n```\n' | |
| "When calling tools, output ONLY the tool_call block(s).\n\n" | |
| f"Available tools:\n{json.dumps(tool_defs, indent=2)}" | |
| f"{constraint}" | |
| ) | |
| for msg in messages: | |
| role = msg.get("role", "user") | |
| content = msg.get("content", "") | |
| if isinstance(content, list): | |
| text_parts = [] | |
| for c in content: | |
| if c.get("type") in ("text", "input_text"): | |
| text_parts.append(c.get("text", "")) | |
| elif c.get("type") == "image_url": | |
| text_parts.append("[Note: Image input not supported in this API. Please describe the image in text.]") | |
| elif c.get("type") == "image": | |
| text_parts.append("[Note: Image input not supported in this API. Please describe the image in text.]") | |
| content = " ".join(text_parts) | |
| if role == "system": | |
| parts.append(f"[System instruction]: {content}") | |
| elif role == "assistant": | |
| if msg.get("tool_calls"): | |
| tc_strs = [] | |
| for tc in msg["tool_calls"]: | |
| fn = tc.get("function", {}) | |
| tc_strs.append( | |
| f'```tool_call\n{{"name": "{fn.get("name")}", ' | |
| f'"arguments": {fn.get("arguments", "{}")}}}\n```' | |
| ) | |
| parts.append(f"[Assistant]: {content or ''}\n" + "\n".join(tc_strs)) | |
| else: | |
| parts.append(f"[Assistant]: {content}") | |
| elif role == "tool": | |
| parts.append(f"[Tool result for {msg.get('name', '')}]: {content}") | |
| else: | |
| parts.append(content if content else "") | |
| prompt = "\n\n".join(p for p in parts if p) | |
| return prompt, images | |
| def parse_tool_calls(text: str) -> tuple: | |
| """Extract tool_call blocks. Returns (clean_text, tool_calls_list).""" | |
| tool_calls = [] | |
| pattern = r'```tool_call\s*\n(.*?)\n```' | |
| clean_parts = [] | |
| last_end = 0 | |
| for m in re.finditer(pattern, text, re.DOTALL): | |
| clean_parts.append(text[last_end:m.start()]) | |
| last_end = m.end() | |
| try: | |
| data = json.loads(m.group(1).strip()) | |
| tool_calls.append({ | |
| "id": f"call_{uuid.uuid4().hex[:8]}", | |
| "type": "function", | |
| "function": { | |
| "name": data["name"], | |
| "arguments": json.dumps(data.get("arguments", {}), ensure_ascii=False), | |
| }, | |
| }) | |
| except (json.JSONDecodeError, KeyError): | |
| pass | |
| clean_parts.append(text[last_end:]) | |
| clean = "".join(clean_parts).strip() | |
| return clean, tool_calls | |
| # ─── Google Native API helpers ───────────────────────────────────────────────── | |
| def build_tool_prompt(tool_defs: list) -> str: | |
| """Build natural tool-use prompt for Gemini Web that avoids prompt-injection detection.""" | |
| tool_spec = json.dumps(tool_defs, indent=2, ensure_ascii=False) | |
| return ( | |
| "# Tool Use\n\n" | |
| "You can call the following tools to help accomplish tasks. " | |
| "These tools connect to the user's local environment and will execute when called.\n\n" | |
| "Call format (use this exact format):\n" | |
| "```function_call\n" | |
| '{"name": "<tool_name>", "args": {<arguments>}}\n' | |
| "```\n\n" | |
| "When calling tools:\n" | |
| "- Output ONLY the function_call block(s), nothing else\n" | |
| "- You may call multiple tools with multiple blocks\n" | |
| "- After receiving a [Tool result for ...], use that data to answer the user\n\n" | |
| f"Available tools:\n{tool_spec}" | |
| ) | |
| def _google_tool_choice_instruction(req: dict) -> str: | |
| """Extract tool_choice constraint from Google API toolConfig.""" | |
| tool_config = req.get("toolConfig", {}) | |
| fc_config = tool_config.get("functionCallingConfig", {}) | |
| mode = fc_config.get("mode", "AUTO") | |
| allowed = fc_config.get("allowedFunctionNames", []) | |
| if mode == "NONE": | |
| return "\n\nIMPORTANT: Do NOT call any tools. Respond with text only." | |
| if mode == "ANY": | |
| if allowed: | |
| names = ", ".join(f'"{n}"' for n in allowed) | |
| return f"\n\nIMPORTANT: You MUST call one of these tools: {names}. Do not respond with text only." | |
| return "\n\nIMPORTANT: You MUST call at least one tool. Do not respond with text only." | |
| return "" | |
| def google_contents_to_prompt(req: dict) -> tuple: | |
| """Convert Google API contents/tools/systemInstruction to (prompt_str, images_list). | |
| Returns (prompt, images) where images is a list of (bytes, mime_type) tuples. | |
| """ | |
| parts = [] | |
| images = [] | |
| tool_config = req.get("toolConfig", {}) | |
| fc_mode = tool_config.get("functionCallingConfig", {}).get("mode", "AUTO") | |
| tools = req.get("tools") | |
| tool_defs = [] | |
| if tools and fc_mode != "NONE": | |
| for tool_group in tools: | |
| for fn in tool_group.get("functionDeclarations", []): | |
| td = {"name": fn.get("name", ""), "description": fn.get("description", "")} | |
| params = fn.get("parameters") or fn.get("parametersJsonSchema") | |
| if params: | |
| td["parameters"] = params | |
| tool_defs.append(td) | |
| sys_inst = req.get("systemInstruction") | |
| if sys_inst: | |
| sys_parts = sys_inst.get("parts", []) | |
| sys_text = " ".join(p.get("text", "") for p in sys_parts if p.get("text")) | |
| if sys_text: | |
| if tool_defs: | |
| constraint = _google_tool_choice_instruction(req) | |
| parts.append(sys_text + "\n\n" + build_tool_prompt(tool_defs) + constraint) | |
| else: | |
| parts.append(sys_text) | |
| elif tool_defs: | |
| constraint = _google_tool_choice_instruction(req) | |
| parts.append(build_tool_prompt(tool_defs) + constraint) | |
| for content in req.get("contents", []): | |
| role = content.get("role", "user") | |
| msg_parts = [] | |
| for p in content.get("parts", []): | |
| if p.get("text"): | |
| msg_parts.append(p["text"]) | |
| elif p.get("inlineData"): | |
| data = p["inlineData"] | |
| mime = data.get("mimeType", "image/png") | |
| images.append((base64.b64decode(data["data"]), mime)) | |
| elif p.get("functionCall"): | |
| fc = p["functionCall"] | |
| msg_parts.append( | |
| f'```function_call\n{json.dumps({"name": fc["name"], "args": fc.get("args", {})}, ensure_ascii=False)}\n```' | |
| ) | |
| elif p.get("functionResponse"): | |
| fr = p["functionResponse"] | |
| msg_parts.append( | |
| f'[Tool result for {fr.get("name", "")}]: {json.dumps(fr.get("response", {}), ensure_ascii=False)}' | |
| ) | |
| text = "\n".join(msg_parts) | |
| if role == "model": | |
| parts.append(f"[Assistant]: {text}") | |
| else: | |
| parts.append(text) | |
| return "\n\n".join(p for p in parts if p), images | |
| def parse_google_function_calls(text: str) -> tuple: | |
| """Extract function_call blocks from model output. | |
| Handles 3 formats: | |
| 1. ```function_call\\n{...}\\n``` (standard) | |
| 2. function_call\\n{...} (without backticks) | |
| 3. Raw JSON with "name" + "args" keys | |
| Returns (clean_text, [{"name": ..., "args": ...}]) | |
| """ | |
| function_calls = [] | |
| pattern1 = r'```function_call\s*\n(.*?)\n```' | |
| pattern2 = r'(?:^|\n)function_call\s*\n(\{[^`]*?\})' | |
| clean = text | |
| for pattern in [pattern1, pattern2]: | |
| for match in re.findall(pattern, clean, re.DOTALL): | |
| try: | |
| data = json.loads(match.strip()) | |
| if "name" in data: | |
| function_calls.append({ | |
| "name": data["name"], | |
| "args": data.get("args", data.get("arguments", {})), | |
| }) | |
| except (json.JSONDecodeError, KeyError): | |
| pass | |
| clean = re.sub(pattern, '', clean, flags=re.DOTALL).strip() | |
| if not function_calls and clean.strip().startswith("{"): | |
| try: | |
| data = json.loads(clean.strip()) | |
| if "name" in data and ("args" in data or "arguments" in data): | |
| function_calls.append({ | |
| "name": data["name"], | |
| "args": data.get("args", data.get("arguments", {})), | |
| }) | |
| clean = "" | |
| except (json.JSONDecodeError, KeyError): | |
| pass | |
| return clean, function_calls | |