Spaces:
Running
Running
| """ | |
| Unified multi-backend VLM API client with tool-use support. | |
| Supports OpenAI (GPT-4o), Anthropic (Claude), and Together (Qwen2.5-VL). | |
| Handles image encoding, rate limiting, retries, response normalization, | |
| and native function/tool calling across all backends. | |
| """ | |
| import base64 | |
| import io | |
| import json | |
| import time | |
| import logging | |
| from collections import deque | |
| from pathlib import Path | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass, field | |
| from PIL import Image | |
| import config | |
| logger = logging.getLogger(__name__) | |
| class VLMResponse: | |
| """Normalized response from any VLM backend, including tool calls.""" | |
| text: str | |
| model: str | |
| backend: str | |
| input_tokens: int | |
| output_tokens: int | |
| latency_ms: float | |
| tool_call: object | None = None # tools.ToolCall if a tool was called | |
| def _normalize_image_mode(img: Image.Image) -> Image.Image: | |
| """Normalize medical image modes to RGB-compatible formats for JPEG encoding.""" | |
| if img.mode in ("RGB",): | |
| return img | |
| if img.mode == "RGBA": | |
| background = Image.new("RGB", img.size, (255, 255, 255)) | |
| background.paste(img, mask=img.split()[3]) | |
| return background | |
| if img.mode == "L": | |
| return img.convert("RGB") | |
| if img.mode in ("I", "I;16", "I;16B", "I;16L"): | |
| import numpy as np | |
| arr = np.array(img, dtype=np.float64) | |
| if arr.max() > arr.min(): | |
| arr = (arr - arr.min()) / (arr.max() - arr.min()) * 255.0 | |
| else: | |
| arr = np.zeros_like(arr) | |
| return Image.fromarray(arr.astype(np.uint8)).convert("RGB") | |
| if img.mode == "F": | |
| import numpy as np | |
| arr = np.array(img, dtype=np.float64) | |
| if arr.max() > arr.min(): | |
| arr = (arr - arr.min()) / (arr.max() - arr.min()) * 255.0 | |
| else: | |
| arr = np.zeros_like(arr) | |
| return Image.fromarray(arr.astype(np.uint8)).convert("RGB") | |
| return img.convert("RGB") | |
| def encode_image_to_base64(image_path: str | Path, max_size: int = 1024) -> str: | |
| """Load and encode an image to base64, resizing if needed.""" | |
| img = Image.open(image_path) | |
| if max(img.size) > max_size: | |
| ratio = max_size / max(img.size) | |
| new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) | |
| img = img.resize(new_size, Image.LANCZOS) | |
| img = _normalize_image_mode(img) | |
| buf = io.BytesIO() | |
| img.save(buf, format="JPEG", quality=90) | |
| return base64.b64encode(buf.getvalue()).decode("utf-8") | |
| def encode_pil_image_to_base64(img: Image.Image, max_size: int = 1024) -> str: | |
| """Encode a PIL Image object to base64.""" | |
| if max(img.size) > max_size: | |
| ratio = max_size / max(img.size) | |
| new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) | |
| img = img.resize(new_size, Image.LANCZOS) | |
| img = _normalize_image_mode(img) | |
| buf = io.BytesIO() | |
| img.save(buf, format="JPEG", quality=90) | |
| return base64.b64encode(buf.getvalue()).decode("utf-8") | |
| class BaseVLMClient(ABC): | |
| """Abstract base class for VLM API clients with tool-use support.""" | |
| def __init__(self, model: str, api_key: str, rate_limit: int = 30): | |
| self.model = model | |
| self.api_key = api_key | |
| self.rate_limit = rate_limit | |
| self._call_timestamps: deque[float] = deque() | |
| def _rate_limit_wait(self): | |
| """Enforce rate limiting using a sliding window over the last 60 seconds.""" | |
| now = time.time() | |
| while self._call_timestamps and now - self._call_timestamps[0] >= 60.0: | |
| self._call_timestamps.popleft() | |
| if len(self._call_timestamps) >= self.rate_limit: | |
| sleep_time = 60.0 - (now - self._call_timestamps[0]) | |
| if sleep_time > 0: | |
| time.sleep(sleep_time) | |
| self._call_timestamps.popleft() | |
| self._call_timestamps.append(time.time()) | |
| def call( | |
| self, | |
| system_prompt: str, | |
| user_text: str, | |
| images: list[str] | None = None, | |
| temperature: float = 0.1, | |
| max_tokens: int = 2048, | |
| tools: list[dict] | None = None, | |
| ) -> VLMResponse: | |
| """Make a VLM API call, optionally with tools.""" | |
| pass | |
| def call_multiturn( | |
| self, | |
| system_prompt: str, | |
| messages: list[dict], | |
| temperature: float = 0.1, | |
| max_tokens: int = 2048, | |
| tools: list[dict] | None = None, | |
| ) -> VLMResponse: | |
| """Multi-turn conversation call with tool support. Override in subclasses.""" | |
| last_user = "" | |
| last_images = [] | |
| for msg in reversed(messages): | |
| if msg["role"] == "user": | |
| if isinstance(msg["content"], str): | |
| last_user = msg["content"] | |
| elif isinstance(msg["content"], list): | |
| for block in msg["content"]: | |
| if block.get("type") == "text": | |
| last_user = block["text"] | |
| elif block.get("type") == "image_url": | |
| last_images.append(block["image_url"]["url"].split(",", 1)[-1]) | |
| break | |
| return self.call(system_prompt, last_user, last_images or None, temperature, max_tokens, tools) | |
| def call_with_retry( | |
| self, | |
| system_prompt: str, | |
| user_text: str = None, | |
| images: list[str] | None = None, | |
| temperature: float = 0.1, | |
| max_tokens: int = 2048, | |
| max_retries: int = 3, | |
| messages: list[dict] | None = None, | |
| tools: list[dict] | None = None, | |
| ) -> VLMResponse: | |
| """Call with exponential backoff retry. Supports single-turn, multi-turn, and tools.""" | |
| for attempt in range(max_retries): | |
| try: | |
| self._rate_limit_wait() | |
| if messages is not None: | |
| return self.call_multiturn(system_prompt, messages, temperature, max_tokens, tools) | |
| return self.call(system_prompt, user_text, images, temperature, max_tokens, tools) | |
| except Exception as e: | |
| wait_time = 2 ** attempt * 5 | |
| logger.warning( | |
| f"API call failed (attempt {attempt + 1}/{max_retries}): {e}. " | |
| f"Retrying in {wait_time}s..." | |
| ) | |
| if attempt == max_retries - 1: | |
| raise | |
| time.sleep(wait_time) | |
| def _parse_tool_call_openai(response_message) -> object | None: | |
| """Extract a ToolCall from an OpenAI response message.""" | |
| from tools import ToolCall | |
| tool_calls = getattr(response_message, "tool_calls", None) | |
| if not tool_calls: | |
| return None | |
| tc = tool_calls[0] # Take the first tool call | |
| try: | |
| arguments = json.loads(tc.function.arguments) | |
| except (json.JSONDecodeError, AttributeError): | |
| arguments = {} | |
| return ToolCall( | |
| tool_name=tc.function.name, | |
| arguments=arguments, | |
| call_id=tc.id, | |
| ) | |
| def _parse_tool_call_anthropic(response) -> object | None: | |
| """Extract a ToolCall from an Anthropic response.""" | |
| from tools import ToolCall | |
| for block in response.content: | |
| if block.type == "tool_use": | |
| return ToolCall( | |
| tool_name=block.name, | |
| arguments=block.input, | |
| call_id=block.id, | |
| ) | |
| return None | |
| # ============================================================ | |
| # OpenAI Backend (GPT-4o) — with tool calling | |
| # ============================================================ | |
| class OpenAIClient(BaseVLMClient): | |
| """OpenAI GPT-4o API client with native function calling.""" | |
| def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None): | |
| super().__init__( | |
| model=model or config.MODELS["openai"], | |
| api_key=api_key or config.OPENAI_API_KEY, | |
| rate_limit=rate_limit or config.RATE_LIMITS["openai"], | |
| ) | |
| from openai import OpenAI | |
| self.client = OpenAI(api_key=self.api_key) | |
| def call( | |
| self, | |
| system_prompt: str, | |
| user_text: str, | |
| images: list[str] | None = None, | |
| temperature: float = 0.1, | |
| max_tokens: int = 2048, | |
| tools: list[dict] | None = None, | |
| ) -> VLMResponse: | |
| content = [] | |
| if images: | |
| for img_b64 in images: | |
| content.append({ | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{img_b64}", "detail": "high"}, | |
| }) | |
| content.append({"type": "text", "text": user_text}) | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": content}, | |
| ] | |
| kwargs = { | |
| "model": self.model, | |
| "messages": messages, | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| } | |
| if tools: | |
| from tools import to_openai_tools | |
| kwargs["tools"] = to_openai_tools(tools) | |
| kwargs["tool_choice"] = "required" | |
| t0 = time.time() | |
| response = self.client.chat.completions.create(**kwargs) | |
| latency = (time.time() - t0) * 1000 | |
| msg = response.choices[0].message | |
| tool_call = _parse_tool_call_openai(msg) if tools else None | |
| return VLMResponse( | |
| text=msg.content or "", | |
| model=self.model, | |
| backend="openai", | |
| input_tokens=response.usage.prompt_tokens, | |
| output_tokens=response.usage.completion_tokens, | |
| latency_ms=latency, | |
| tool_call=tool_call, | |
| ) | |
| def call_multiturn( | |
| self, | |
| system_prompt: str, | |
| messages: list[dict], | |
| temperature: float = 0.1, | |
| max_tokens: int = 2048, | |
| tools: list[dict] | None = None, | |
| ) -> VLMResponse: | |
| """ | |
| Multi-turn OpenAI call with full tool-calling protocol. | |
| Translates our internal message format to OpenAI's API format: | |
| - "user" → role:"user" (passed through) | |
| - "assistant" → role:"assistant" with tool_calls array | |
| - "tool_result" → role:"tool" (text result) + role:"user" (images + follow-up) | |
| OpenAI requires: after an assistant message with tool_calls, the next | |
| message MUST be role:"tool" with the matching tool_call_id. Images | |
| cannot go in tool messages, so we send them in a separate user message. | |
| """ | |
| api_messages = [{"role": "system", "content": system_prompt}] | |
| for msg in messages: | |
| role = msg["role"] | |
| if role == "user": | |
| api_messages.append({ | |
| "role": "user", | |
| "content": msg["content"], | |
| }) | |
| elif role == "assistant": | |
| api_msg = {"role": "assistant"} | |
| if msg.get("tool_calls"): | |
| tc = msg["tool_calls"][0] | |
| api_msg["tool_calls"] = [{ | |
| "id": tc.call_id, | |
| "type": "function", | |
| "function": { | |
| "name": tc.tool_name, | |
| "arguments": json.dumps(tc.arguments), | |
| }, | |
| }] | |
| # OpenAI requires content to be null when tool_calls present | |
| api_msg["content"] = msg.get("content") or None | |
| else: | |
| api_msg["content"] = msg.get("content", "") | |
| api_messages.append(api_msg) | |
| elif role == "tool_result": | |
| # Step 1: Send the tool result as role:"tool" | |
| api_messages.append({ | |
| "role": "tool", | |
| "tool_call_id": msg["tool_call_id"], | |
| "content": msg.get("content", ""), | |
| }) | |
| # Step 2: Send images + follow-up as a user message | |
| # (OpenAI tool messages don't support image content blocks) | |
| follow_up_content = [] | |
| for img_b64 in msg.get("images", []): | |
| follow_up_content.append({ | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{img_b64}", | |
| }, | |
| }) | |
| follow_up = msg.get("follow_up", "") | |
| if follow_up: | |
| follow_up_content.append({ | |
| "type": "text", | |
| "text": follow_up, | |
| }) | |
| if follow_up_content: | |
| api_messages.append({ | |
| "role": "user", | |
| "content": follow_up_content, | |
| }) | |
| kwargs = { | |
| "model": self.model, | |
| "messages": api_messages, | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| } | |
| if tools: | |
| from tools import to_openai_tools | |
| kwargs["tools"] = to_openai_tools(tools) | |
| kwargs["tool_choice"] = "required" | |
| t0 = time.time() | |
| response = self.client.chat.completions.create(**kwargs) | |
| latency = (time.time() - t0) * 1000 | |
| msg = response.choices[0].message | |
| tool_call = _parse_tool_call_openai(msg) if tools else None | |
| return VLMResponse( | |
| text=msg.content or "", | |
| model=self.model, | |
| backend="openai", | |
| input_tokens=response.usage.prompt_tokens, | |
| output_tokens=response.usage.completion_tokens, | |
| latency_ms=latency, | |
| tool_call=tool_call, | |
| ) | |
| # ============================================================ | |
| # Anthropic Backend (Claude) — with tool use | |
| # ============================================================ | |
| class AnthropicClient(BaseVLMClient): | |
| """Anthropic Claude API client with native tool use.""" | |
| def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None): | |
| super().__init__( | |
| model=model or config.MODELS["anthropic"], | |
| api_key=api_key or config.ANTHROPIC_API_KEY, | |
| rate_limit=rate_limit or config.RATE_LIMITS["anthropic"], | |
| ) | |
| from anthropic import Anthropic | |
| self.client = Anthropic(api_key=self.api_key) | |
| def call( | |
| self, | |
| system_prompt: str, | |
| user_text: str, | |
| images: list[str] | None = None, | |
| temperature: float = 0.1, | |
| max_tokens: int = 2048, | |
| tools: list[dict] | None = None, | |
| ) -> VLMResponse: | |
| content = [] | |
| if images: | |
| for img_b64 in images: | |
| content.append({ | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": "image/jpeg", | |
| "data": img_b64, | |
| }, | |
| }) | |
| content.append({"type": "text", "text": user_text}) | |
| kwargs = { | |
| "model": self.model, | |
| "system": system_prompt, | |
| "messages": [{"role": "user", "content": content}], | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| } | |
| if tools: | |
| from tools import to_anthropic_tools | |
| kwargs["tools"] = to_anthropic_tools(tools) | |
| kwargs["tool_choice"] = {"type": "any"} | |
| t0 = time.time() | |
| response = self.client.messages.create(**kwargs) | |
| latency = (time.time() - t0) * 1000 | |
| # Extract text from response (may have both text and tool_use blocks) | |
| text_parts = [] | |
| for block in response.content: | |
| if hasattr(block, "text"): | |
| text_parts.append(block.text) | |
| tool_call = _parse_tool_call_anthropic(response) if tools else None | |
| return VLMResponse( | |
| text="\n".join(text_parts), | |
| model=self.model, | |
| backend="anthropic", | |
| input_tokens=response.usage.input_tokens, | |
| output_tokens=response.usage.output_tokens, | |
| latency_ms=latency, | |
| tool_call=tool_call, | |
| ) | |
| def call_multiturn( | |
| self, | |
| system_prompt: str, | |
| messages: list[dict], | |
| temperature: float = 0.1, | |
| max_tokens: int = 2048, | |
| tools: list[dict] | None = None, | |
| ) -> VLMResponse: | |
| """ | |
| Multi-turn Anthropic call with full tool-use protocol. | |
| Translates our internal message format to Anthropic's API format: | |
| - "user" → role:"user" (passed through) | |
| - "assistant" → role:"assistant" with tool_use content blocks | |
| - "tool_result" → role:"user" with tool_result block + image blocks | |
| Anthropic's protocol: after an assistant message with a tool_use block, | |
| the next message MUST be role:"user" containing a tool_result block | |
| with the matching tool_use_id. Images and follow-up text can be | |
| included in the same user message as additional content blocks. | |
| """ | |
| api_messages = [] | |
| for msg in messages: | |
| role = msg["role"] | |
| if role == "user": | |
| content = msg["content"] | |
| # Convert image_url format to Anthropic's image format | |
| if isinstance(content, list): | |
| anthropic_content = [] | |
| for block in content: | |
| if block.get("type") == "image_url": | |
| url = block["image_url"]["url"] | |
| # Extract base64 data from data URL | |
| if url.startswith("data:"): | |
| b64_data = url.split(",", 1)[-1] | |
| else: | |
| b64_data = url | |
| anthropic_content.append({ | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": "image/jpeg", | |
| "data": b64_data, | |
| }, | |
| }) | |
| elif block.get("type") == "text": | |
| anthropic_content.append(block) | |
| else: | |
| anthropic_content.append(block) | |
| api_messages.append({ | |
| "role": "user", | |
| "content": anthropic_content, | |
| }) | |
| else: | |
| api_messages.append({ | |
| "role": "user", | |
| "content": content, | |
| }) | |
| elif role == "assistant": | |
| content_blocks = [] | |
| if msg.get("content"): | |
| content_blocks.append({ | |
| "type": "text", | |
| "text": msg["content"], | |
| }) | |
| if msg.get("tool_calls"): | |
| tc = msg["tool_calls"][0] | |
| content_blocks.append({ | |
| "type": "tool_use", | |
| "id": tc.call_id, | |
| "name": tc.tool_name, | |
| "input": tc.arguments, | |
| }) | |
| api_messages.append({ | |
| "role": "assistant", | |
| "content": content_blocks, | |
| }) | |
| elif role == "tool_result": | |
| # Anthropic: tool_result goes in a user message alongside | |
| # any images and follow-up text | |
| user_content = [] | |
| # The tool_result block | |
| user_content.append({ | |
| "type": "tool_result", | |
| "tool_use_id": msg["tool_call_id"], | |
| "content": msg.get("content", ""), | |
| }) | |
| # Images from the channel data | |
| for img_b64 in msg.get("images", []): | |
| user_content.append({ | |
| "type": "image", | |
| "source": { | |
| "type": "base64", | |
| "media_type": "image/jpeg", | |
| "data": img_b64, | |
| }, | |
| }) | |
| # Follow-up text (next-step instructions) | |
| follow_up = msg.get("follow_up", "") | |
| if follow_up: | |
| user_content.append({ | |
| "type": "text", | |
| "text": follow_up, | |
| }) | |
| api_messages.append({ | |
| "role": "user", | |
| "content": user_content, | |
| }) | |
| kwargs = { | |
| "model": self.model, | |
| "system": system_prompt, | |
| "messages": api_messages, | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| } | |
| if tools: | |
| from tools import to_anthropic_tools | |
| kwargs["tools"] = to_anthropic_tools(tools) | |
| kwargs["tool_choice"] = {"type": "any"} | |
| t0 = time.time() | |
| response = self.client.messages.create(**kwargs) | |
| latency = (time.time() - t0) * 1000 | |
| text_parts = [] | |
| for block in response.content: | |
| if hasattr(block, "text"): | |
| text_parts.append(block.text) | |
| tool_call = _parse_tool_call_anthropic(response) if tools else None | |
| return VLMResponse( | |
| text="\n".join(text_parts), | |
| model=self.model, | |
| backend="anthropic", | |
| input_tokens=response.usage.input_tokens, | |
| output_tokens=response.usage.output_tokens, | |
| latency_ms=latency, | |
| tool_call=tool_call, | |
| ) | |
| # ============================================================ | |
| # Together Backend (Qwen2.5-VL) — with tool calling | |
| # ============================================================ | |
| class TogetherClient(BaseVLMClient): | |
| """Together AI client with function calling support.""" | |
| def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None): | |
| super().__init__( | |
| model=model or config.MODELS["together"], | |
| api_key=api_key or config.TOGETHER_API_KEY, | |
| rate_limit=rate_limit or config.RATE_LIMITS["together"], | |
| ) | |
| from together import Together | |
| self.client = Together(api_key=self.api_key) | |
| def call( | |
| self, | |
| system_prompt: str, | |
| user_text: str, | |
| images: list[str] | None = None, | |
| temperature: float = 0.1, | |
| max_tokens: int = 2048, | |
| tools: list[dict] | None = None, | |
| ) -> VLMResponse: | |
| content = [] | |
| if images: | |
| for img_b64 in images: | |
| content.append({ | |
| "type": "image_url", | |
| "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}, | |
| }) | |
| content.append({"type": "text", "text": user_text}) | |
| messages = [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": content}, | |
| ] | |
| kwargs = { | |
| "model": self.model, | |
| "messages": messages, | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| } | |
| if tools: | |
| from tools import to_openai_tools | |
| kwargs["tools"] = to_openai_tools(tools) | |
| t0 = time.time() | |
| response = self.client.chat.completions.create(**kwargs) | |
| latency = (time.time() - t0) * 1000 | |
| msg = response.choices[0].message | |
| usage = response.usage | |
| tool_call = _parse_tool_call_openai(msg) if tools else None | |
| return VLMResponse( | |
| text=msg.content or "", | |
| model=self.model, | |
| backend="together", | |
| input_tokens=getattr(usage, "prompt_tokens", 0), | |
| output_tokens=getattr(usage, "completion_tokens", 0), | |
| latency_ms=latency, | |
| tool_call=tool_call, | |
| ) | |
| # ============================================================ | |
| # Client Factory | |
| # ============================================================ | |
| class OpenAIMiniClient(OpenAIClient): | |
| """OpenAI GPT-4o-mini client.""" | |
| def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None): | |
| BaseVLMClient.__init__( | |
| self, | |
| model=model or config.MODELS["openai_mini"], | |
| api_key=api_key or config.OPENAI_API_KEY, | |
| rate_limit=rate_limit or config.RATE_LIMITS["openai_mini"], | |
| ) | |
| from openai import OpenAI | |
| self.client = OpenAI(api_key=self.api_key) | |
| def create_client(backend: str, **kwargs) -> BaseVLMClient: | |
| """Factory function to create a VLM client by backend name.""" | |
| clients = { | |
| "openai": OpenAIClient, | |
| "openai_mini": OpenAIMiniClient, | |
| "anthropic": AnthropicClient, | |
| "together": TogetherClient, | |
| } | |
| if backend not in clients: | |
| raise ValueError(f"Unknown backend: {backend}. Choose from {list(clients.keys())}") | |
| return clients[backend](**kwargs) | |