"""Tool calling and multimodal message parsing.""" import json import re import uuid import base64 import io MAX_IMAGE_B64_SIZE = 50000 # ~37KB raw image def _compress_b64_if_needed(b64: str) -> str: """Compress image if base64 is too large for text embedding.""" if len(b64) <= MAX_IMAGE_B64_SIZE: return b64 try: from PIL import Image img_data = base64.b64decode(b64) img = Image.open(io.BytesIO(img_data)) # Resize to max 256px on longest side max_dim = 256 ratio = min(max_dim / img.width, max_dim / img.height) if ratio < 1: img = img.resize((int(img.width * ratio), int(img.height * ratio)), Image.LANCZOS) # Convert to JPEG with quality reduction buf = io.BytesIO() img.convert("RGB").save(buf, format="JPEG", quality=60) compressed = base64.b64encode(buf.getvalue()).decode() return compressed except Exception: # If PIL not available, truncate (model will get partial data) return b64[:MAX_IMAGE_B64_SIZE] def _build_tool_choice_instruction(tool_choice, tool_defs: list) -> str: """Build tool_choice constraint instruction. tool_choice values: - "none": do not call any tool - "auto": decide whether to call tools (default) - "required": must call at least one tool - {"type": "function", "function": {"name": "xxx"}}: must call specific tool """ if tool_choice == "none": return "\n\nIMPORTANT: Do NOT call any tools. Respond with text only." if tool_choice == "required": return "\n\nIMPORTANT: You MUST call at least one tool. Do not respond with text only." if isinstance(tool_choice, dict): fn_name = tool_choice.get("function", {}).get("name", "") if fn_name: return f'\n\nIMPORTANT: You MUST call the tool "{fn_name}". Do not call other tools.' return "" def messages_to_prompt(messages: list, tools: list = None, tool_choice=None) -> tuple: """Convert OpenAI messages to (prompt_str, images_list). Returns (prompt, images) where images is a list of (bytes, mime_type) tuples. """ parts = [] images = [] if tools and tool_choice != "none": tool_defs = [] for tool in tools: fn = tool.get("function", tool) if tool.get("type") == "function" else tool tool_defs.append({ "name": fn.get("name", tool.get("name", "")), "description": fn.get("description", tool.get("description", "")), "parameters": fn.get("parameters", tool.get("parameters", {})), }) if tool_defs: constraint = _build_tool_choice_instruction(tool_choice, tool_defs) parts.append( "# Tool Use\n\n" "You can call the following tools. Call format:\n" '```tool_call\n{"name": "func_name", "arguments": {...}}\n```\n' "When calling tools, output ONLY the tool_call block(s).\n\n" f"Available tools:\n{json.dumps(tool_defs, indent=2)}" f"{constraint}" ) for msg in messages: role = msg.get("role", "user") content = msg.get("content", "") if isinstance(content, list): text_parts = [] for c in content: if c.get("type") in ("text", "input_text"): text_parts.append(c.get("text", "")) elif c.get("type") == "image_url": text_parts.append("[Note: Image input not supported in this API. Please describe the image in text.]") elif c.get("type") == "image": text_parts.append("[Note: Image input not supported in this API. Please describe the image in text.]") content = " ".join(text_parts) if role == "system": parts.append(f"[System instruction]: {content}") elif role == "assistant": if msg.get("tool_calls"): tc_strs = [] for tc in msg["tool_calls"]: fn = tc.get("function", {}) tc_strs.append( f'```tool_call\n{{"name": "{fn.get("name")}", ' f'"arguments": {fn.get("arguments", "{}")}}}\n```' ) parts.append(f"[Assistant]: {content or ''}\n" + "\n".join(tc_strs)) else: parts.append(f"[Assistant]: {content}") elif role == "tool": parts.append(f"[Tool result for {msg.get('name', '')}]: {content}") else: parts.append(content if content else "") prompt = "\n\n".join(p for p in parts if p) return prompt, images def parse_tool_calls(text: str) -> tuple: """Extract tool_call blocks. Returns (clean_text, tool_calls_list).""" tool_calls = [] pattern = r'```tool_call\s*\n(.*?)\n```' clean_parts = [] last_end = 0 for m in re.finditer(pattern, text, re.DOTALL): clean_parts.append(text[last_end:m.start()]) last_end = m.end() try: data = json.loads(m.group(1).strip()) tool_calls.append({ "id": f"call_{uuid.uuid4().hex[:8]}", "type": "function", "function": { "name": data["name"], "arguments": json.dumps(data.get("arguments", {}), ensure_ascii=False), }, }) except (json.JSONDecodeError, KeyError): pass clean_parts.append(text[last_end:]) clean = "".join(clean_parts).strip() return clean, tool_calls # ─── Google Native API helpers ───────────────────────────────────────────────── def build_tool_prompt(tool_defs: list) -> str: """Build natural tool-use prompt for Gemini Web that avoids prompt-injection detection.""" tool_spec = json.dumps(tool_defs, indent=2, ensure_ascii=False) return ( "# Tool Use\n\n" "You can call the following tools to help accomplish tasks. " "These tools connect to the user's local environment and will execute when called.\n\n" "Call format (use this exact format):\n" "```function_call\n" '{"name": "", "args": {}}\n' "```\n\n" "When calling tools:\n" "- Output ONLY the function_call block(s), nothing else\n" "- You may call multiple tools with multiple blocks\n" "- After receiving a [Tool result for ...], use that data to answer the user\n\n" f"Available tools:\n{tool_spec}" ) def _google_tool_choice_instruction(req: dict) -> str: """Extract tool_choice constraint from Google API toolConfig.""" tool_config = req.get("toolConfig", {}) fc_config = tool_config.get("functionCallingConfig", {}) mode = fc_config.get("mode", "AUTO") allowed = fc_config.get("allowedFunctionNames", []) if mode == "NONE": return "\n\nIMPORTANT: Do NOT call any tools. Respond with text only." if mode == "ANY": if allowed: names = ", ".join(f'"{n}"' for n in allowed) return f"\n\nIMPORTANT: You MUST call one of these tools: {names}. Do not respond with text only." return "\n\nIMPORTANT: You MUST call at least one tool. Do not respond with text only." return "" def google_contents_to_prompt(req: dict) -> tuple: """Convert Google API contents/tools/systemInstruction to (prompt_str, images_list). Returns (prompt, images) where images is a list of (bytes, mime_type) tuples. """ parts = [] images = [] tool_config = req.get("toolConfig", {}) fc_mode = tool_config.get("functionCallingConfig", {}).get("mode", "AUTO") tools = req.get("tools") tool_defs = [] if tools and fc_mode != "NONE": for tool_group in tools: for fn in tool_group.get("functionDeclarations", []): td = {"name": fn.get("name", ""), "description": fn.get("description", "")} params = fn.get("parameters") or fn.get("parametersJsonSchema") if params: td["parameters"] = params tool_defs.append(td) sys_inst = req.get("systemInstruction") if sys_inst: sys_parts = sys_inst.get("parts", []) sys_text = " ".join(p.get("text", "") for p in sys_parts if p.get("text")) if sys_text: if tool_defs: constraint = _google_tool_choice_instruction(req) parts.append(sys_text + "\n\n" + build_tool_prompt(tool_defs) + constraint) else: parts.append(sys_text) elif tool_defs: constraint = _google_tool_choice_instruction(req) parts.append(build_tool_prompt(tool_defs) + constraint) for content in req.get("contents", []): role = content.get("role", "user") msg_parts = [] for p in content.get("parts", []): if p.get("text"): msg_parts.append(p["text"]) elif p.get("inlineData"): data = p["inlineData"] mime = data.get("mimeType", "image/png") images.append((base64.b64decode(data["data"]), mime)) elif p.get("functionCall"): fc = p["functionCall"] msg_parts.append( f'```function_call\n{json.dumps({"name": fc["name"], "args": fc.get("args", {})}, ensure_ascii=False)}\n```' ) elif p.get("functionResponse"): fr = p["functionResponse"] msg_parts.append( f'[Tool result for {fr.get("name", "")}]: {json.dumps(fr.get("response", {}), ensure_ascii=False)}' ) text = "\n".join(msg_parts) if role == "model": parts.append(f"[Assistant]: {text}") else: parts.append(text) return "\n\n".join(p for p in parts if p), images def parse_google_function_calls(text: str) -> tuple: """Extract function_call blocks from model output. Handles 3 formats: 1. ```function_call\\n{...}\\n``` (standard) 2. function_call\\n{...} (without backticks) 3. Raw JSON with "name" + "args" keys Returns (clean_text, [{"name": ..., "args": ...}]) """ function_calls = [] pattern1 = r'```function_call\s*\n(.*?)\n```' pattern2 = r'(?:^|\n)function_call\s*\n(\{[^`]*?\})' clean = text for pattern in [pattern1, pattern2]: for match in re.findall(pattern, clean, re.DOTALL): try: data = json.loads(match.strip()) if "name" in data: function_calls.append({ "name": data["name"], "args": data.get("args", data.get("arguments", {})), }) except (json.JSONDecodeError, KeyError): pass clean = re.sub(pattern, '', clean, flags=re.DOTALL).strip() if not function_calls and clean.strip().startswith("{"): try: data = json.loads(clean.strip()) if "name" in data and ("args" in data or "arguments" in data): function_calls.append({ "name": data["name"], "args": data.get("args", data.get("arguments", {})), }) clean = "" except (json.JSONDecodeError, KeyError): pass return clean, function_calls