""" Unified multi-backend VLM API client with tool-use support. Supports OpenAI (GPT-4o), Anthropic (Claude), and Together (Qwen2.5-VL). Handles image encoding, rate limiting, retries, response normalization, and native function/tool calling across all backends. """ import base64 import io import json import time import logging from collections import deque from pathlib import Path from abc import ABC, abstractmethod from dataclasses import dataclass, field from PIL import Image import config logger = logging.getLogger(__name__) @dataclass class VLMResponse: """Normalized response from any VLM backend, including tool calls.""" text: str model: str backend: str input_tokens: int output_tokens: int latency_ms: float tool_call: object | None = None # tools.ToolCall if a tool was called def _normalize_image_mode(img: Image.Image) -> Image.Image: """Normalize medical image modes to RGB-compatible formats for JPEG encoding.""" if img.mode in ("RGB",): return img if img.mode == "RGBA": background = Image.new("RGB", img.size, (255, 255, 255)) background.paste(img, mask=img.split()[3]) return background if img.mode == "L": return img.convert("RGB") if img.mode in ("I", "I;16", "I;16B", "I;16L"): import numpy as np arr = np.array(img, dtype=np.float64) if arr.max() > arr.min(): arr = (arr - arr.min()) / (arr.max() - arr.min()) * 255.0 else: arr = np.zeros_like(arr) return Image.fromarray(arr.astype(np.uint8)).convert("RGB") if img.mode == "F": import numpy as np arr = np.array(img, dtype=np.float64) if arr.max() > arr.min(): arr = (arr - arr.min()) / (arr.max() - arr.min()) * 255.0 else: arr = np.zeros_like(arr) return Image.fromarray(arr.astype(np.uint8)).convert("RGB") return img.convert("RGB") def encode_image_to_base64(image_path: str | Path, max_size: int = 1024) -> str: """Load and encode an image to base64, resizing if needed.""" img = Image.open(image_path) if max(img.size) > max_size: ratio = max_size / max(img.size) new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) img = img.resize(new_size, Image.LANCZOS) img = _normalize_image_mode(img) buf = io.BytesIO() img.save(buf, format="JPEG", quality=90) return base64.b64encode(buf.getvalue()).decode("utf-8") def encode_pil_image_to_base64(img: Image.Image, max_size: int = 1024) -> str: """Encode a PIL Image object to base64.""" if max(img.size) > max_size: ratio = max_size / max(img.size) new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio)) img = img.resize(new_size, Image.LANCZOS) img = _normalize_image_mode(img) buf = io.BytesIO() img.save(buf, format="JPEG", quality=90) return base64.b64encode(buf.getvalue()).decode("utf-8") class BaseVLMClient(ABC): """Abstract base class for VLM API clients with tool-use support.""" def __init__(self, model: str, api_key: str, rate_limit: int = 30): self.model = model self.api_key = api_key self.rate_limit = rate_limit self._call_timestamps: deque[float] = deque() def _rate_limit_wait(self): """Enforce rate limiting using a sliding window over the last 60 seconds.""" now = time.time() while self._call_timestamps and now - self._call_timestamps[0] >= 60.0: self._call_timestamps.popleft() if len(self._call_timestamps) >= self.rate_limit: sleep_time = 60.0 - (now - self._call_timestamps[0]) if sleep_time > 0: time.sleep(sleep_time) self._call_timestamps.popleft() self._call_timestamps.append(time.time()) @abstractmethod def call( self, system_prompt: str, user_text: str, images: list[str] | None = None, temperature: float = 0.1, max_tokens: int = 2048, tools: list[dict] | None = None, ) -> VLMResponse: """Make a VLM API call, optionally with tools.""" pass def call_multiturn( self, system_prompt: str, messages: list[dict], temperature: float = 0.1, max_tokens: int = 2048, tools: list[dict] | None = None, ) -> VLMResponse: """Multi-turn conversation call with tool support. Override in subclasses.""" last_user = "" last_images = [] for msg in reversed(messages): if msg["role"] == "user": if isinstance(msg["content"], str): last_user = msg["content"] elif isinstance(msg["content"], list): for block in msg["content"]: if block.get("type") == "text": last_user = block["text"] elif block.get("type") == "image_url": last_images.append(block["image_url"]["url"].split(",", 1)[-1]) break return self.call(system_prompt, last_user, last_images or None, temperature, max_tokens, tools) def call_with_retry( self, system_prompt: str, user_text: str = None, images: list[str] | None = None, temperature: float = 0.1, max_tokens: int = 2048, max_retries: int = 3, messages: list[dict] | None = None, tools: list[dict] | None = None, ) -> VLMResponse: """Call with exponential backoff retry. Supports single-turn, multi-turn, and tools.""" for attempt in range(max_retries): try: self._rate_limit_wait() if messages is not None: return self.call_multiturn(system_prompt, messages, temperature, max_tokens, tools) return self.call(system_prompt, user_text, images, temperature, max_tokens, tools) except Exception as e: wait_time = 2 ** attempt * 5 logger.warning( f"API call failed (attempt {attempt + 1}/{max_retries}): {e}. " f"Retrying in {wait_time}s..." ) if attempt == max_retries - 1: raise time.sleep(wait_time) def _parse_tool_call_openai(response_message) -> object | None: """Extract a ToolCall from an OpenAI response message.""" from tools import ToolCall tool_calls = getattr(response_message, "tool_calls", None) if not tool_calls: return None tc = tool_calls[0] # Take the first tool call try: arguments = json.loads(tc.function.arguments) except (json.JSONDecodeError, AttributeError): arguments = {} return ToolCall( tool_name=tc.function.name, arguments=arguments, call_id=tc.id, ) def _parse_tool_call_anthropic(response) -> object | None: """Extract a ToolCall from an Anthropic response.""" from tools import ToolCall for block in response.content: if block.type == "tool_use": return ToolCall( tool_name=block.name, arguments=block.input, call_id=block.id, ) return None # ============================================================ # OpenAI Backend (GPT-4o) — with tool calling # ============================================================ class OpenAIClient(BaseVLMClient): """OpenAI GPT-4o API client with native function calling.""" def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None): super().__init__( model=model or config.MODELS["openai"], api_key=api_key or config.OPENAI_API_KEY, rate_limit=rate_limit or config.RATE_LIMITS["openai"], ) from openai import OpenAI self.client = OpenAI(api_key=self.api_key) def call( self, system_prompt: str, user_text: str, images: list[str] | None = None, temperature: float = 0.1, max_tokens: int = 2048, tools: list[dict] | None = None, ) -> VLMResponse: content = [] if images: for img_b64 in images: content.append({ "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}", "detail": "high"}, }) content.append({"type": "text", "text": user_text}) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": content}, ] kwargs = { "model": self.model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, } if tools: from tools import to_openai_tools kwargs["tools"] = to_openai_tools(tools) kwargs["tool_choice"] = "required" t0 = time.time() response = self.client.chat.completions.create(**kwargs) latency = (time.time() - t0) * 1000 msg = response.choices[0].message tool_call = _parse_tool_call_openai(msg) if tools else None return VLMResponse( text=msg.content or "", model=self.model, backend="openai", input_tokens=response.usage.prompt_tokens, output_tokens=response.usage.completion_tokens, latency_ms=latency, tool_call=tool_call, ) def call_multiturn( self, system_prompt: str, messages: list[dict], temperature: float = 0.1, max_tokens: int = 2048, tools: list[dict] | None = None, ) -> VLMResponse: """ Multi-turn OpenAI call with full tool-calling protocol. Translates our internal message format to OpenAI's API format: - "user" → role:"user" (passed through) - "assistant" → role:"assistant" with tool_calls array - "tool_result" → role:"tool" (text result) + role:"user" (images + follow-up) OpenAI requires: after an assistant message with tool_calls, the next message MUST be role:"tool" with the matching tool_call_id. Images cannot go in tool messages, so we send them in a separate user message. """ api_messages = [{"role": "system", "content": system_prompt}] for msg in messages: role = msg["role"] if role == "user": api_messages.append({ "role": "user", "content": msg["content"], }) elif role == "assistant": api_msg = {"role": "assistant"} if msg.get("tool_calls"): tc = msg["tool_calls"][0] api_msg["tool_calls"] = [{ "id": tc.call_id, "type": "function", "function": { "name": tc.tool_name, "arguments": json.dumps(tc.arguments), }, }] # OpenAI requires content to be null when tool_calls present api_msg["content"] = msg.get("content") or None else: api_msg["content"] = msg.get("content", "") api_messages.append(api_msg) elif role == "tool_result": # Step 1: Send the tool result as role:"tool" api_messages.append({ "role": "tool", "tool_call_id": msg["tool_call_id"], "content": msg.get("content", ""), }) # Step 2: Send images + follow-up as a user message # (OpenAI tool messages don't support image content blocks) follow_up_content = [] for img_b64 in msg.get("images", []): follow_up_content.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_b64}", }, }) follow_up = msg.get("follow_up", "") if follow_up: follow_up_content.append({ "type": "text", "text": follow_up, }) if follow_up_content: api_messages.append({ "role": "user", "content": follow_up_content, }) kwargs = { "model": self.model, "messages": api_messages, "temperature": temperature, "max_tokens": max_tokens, } if tools: from tools import to_openai_tools kwargs["tools"] = to_openai_tools(tools) kwargs["tool_choice"] = "required" t0 = time.time() response = self.client.chat.completions.create(**kwargs) latency = (time.time() - t0) * 1000 msg = response.choices[0].message tool_call = _parse_tool_call_openai(msg) if tools else None return VLMResponse( text=msg.content or "", model=self.model, backend="openai", input_tokens=response.usage.prompt_tokens, output_tokens=response.usage.completion_tokens, latency_ms=latency, tool_call=tool_call, ) # ============================================================ # Anthropic Backend (Claude) — with tool use # ============================================================ class AnthropicClient(BaseVLMClient): """Anthropic Claude API client with native tool use.""" def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None): super().__init__( model=model or config.MODELS["anthropic"], api_key=api_key or config.ANTHROPIC_API_KEY, rate_limit=rate_limit or config.RATE_LIMITS["anthropic"], ) from anthropic import Anthropic self.client = Anthropic(api_key=self.api_key) def call( self, system_prompt: str, user_text: str, images: list[str] | None = None, temperature: float = 0.1, max_tokens: int = 2048, tools: list[dict] | None = None, ) -> VLMResponse: content = [] if images: for img_b64 in images: content.append({ "type": "image", "source": { "type": "base64", "media_type": "image/jpeg", "data": img_b64, }, }) content.append({"type": "text", "text": user_text}) kwargs = { "model": self.model, "system": system_prompt, "messages": [{"role": "user", "content": content}], "temperature": temperature, "max_tokens": max_tokens, } if tools: from tools import to_anthropic_tools kwargs["tools"] = to_anthropic_tools(tools) kwargs["tool_choice"] = {"type": "any"} t0 = time.time() response = self.client.messages.create(**kwargs) latency = (time.time() - t0) * 1000 # Extract text from response (may have both text and tool_use blocks) text_parts = [] for block in response.content: if hasattr(block, "text"): text_parts.append(block.text) tool_call = _parse_tool_call_anthropic(response) if tools else None return VLMResponse( text="\n".join(text_parts), model=self.model, backend="anthropic", input_tokens=response.usage.input_tokens, output_tokens=response.usage.output_tokens, latency_ms=latency, tool_call=tool_call, ) def call_multiturn( self, system_prompt: str, messages: list[dict], temperature: float = 0.1, max_tokens: int = 2048, tools: list[dict] | None = None, ) -> VLMResponse: """ Multi-turn Anthropic call with full tool-use protocol. Translates our internal message format to Anthropic's API format: - "user" → role:"user" (passed through) - "assistant" → role:"assistant" with tool_use content blocks - "tool_result" → role:"user" with tool_result block + image blocks Anthropic's protocol: after an assistant message with a tool_use block, the next message MUST be role:"user" containing a tool_result block with the matching tool_use_id. Images and follow-up text can be included in the same user message as additional content blocks. """ api_messages = [] for msg in messages: role = msg["role"] if role == "user": content = msg["content"] # Convert image_url format to Anthropic's image format if isinstance(content, list): anthropic_content = [] for block in content: if block.get("type") == "image_url": url = block["image_url"]["url"] # Extract base64 data from data URL if url.startswith("data:"): b64_data = url.split(",", 1)[-1] else: b64_data = url anthropic_content.append({ "type": "image", "source": { "type": "base64", "media_type": "image/jpeg", "data": b64_data, }, }) elif block.get("type") == "text": anthropic_content.append(block) else: anthropic_content.append(block) api_messages.append({ "role": "user", "content": anthropic_content, }) else: api_messages.append({ "role": "user", "content": content, }) elif role == "assistant": content_blocks = [] if msg.get("content"): content_blocks.append({ "type": "text", "text": msg["content"], }) if msg.get("tool_calls"): tc = msg["tool_calls"][0] content_blocks.append({ "type": "tool_use", "id": tc.call_id, "name": tc.tool_name, "input": tc.arguments, }) api_messages.append({ "role": "assistant", "content": content_blocks, }) elif role == "tool_result": # Anthropic: tool_result goes in a user message alongside # any images and follow-up text user_content = [] # The tool_result block user_content.append({ "type": "tool_result", "tool_use_id": msg["tool_call_id"], "content": msg.get("content", ""), }) # Images from the channel data for img_b64 in msg.get("images", []): user_content.append({ "type": "image", "source": { "type": "base64", "media_type": "image/jpeg", "data": img_b64, }, }) # Follow-up text (next-step instructions) follow_up = msg.get("follow_up", "") if follow_up: user_content.append({ "type": "text", "text": follow_up, }) api_messages.append({ "role": "user", "content": user_content, }) kwargs = { "model": self.model, "system": system_prompt, "messages": api_messages, "temperature": temperature, "max_tokens": max_tokens, } if tools: from tools import to_anthropic_tools kwargs["tools"] = to_anthropic_tools(tools) kwargs["tool_choice"] = {"type": "any"} t0 = time.time() response = self.client.messages.create(**kwargs) latency = (time.time() - t0) * 1000 text_parts = [] for block in response.content: if hasattr(block, "text"): text_parts.append(block.text) tool_call = _parse_tool_call_anthropic(response) if tools else None return VLMResponse( text="\n".join(text_parts), model=self.model, backend="anthropic", input_tokens=response.usage.input_tokens, output_tokens=response.usage.output_tokens, latency_ms=latency, tool_call=tool_call, ) # ============================================================ # Together Backend (Qwen2.5-VL) — with tool calling # ============================================================ class TogetherClient(BaseVLMClient): """Together AI client with function calling support.""" def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None): super().__init__( model=model or config.MODELS["together"], api_key=api_key or config.TOGETHER_API_KEY, rate_limit=rate_limit or config.RATE_LIMITS["together"], ) from together import Together self.client = Together(api_key=self.api_key) def call( self, system_prompt: str, user_text: str, images: list[str] | None = None, temperature: float = 0.1, max_tokens: int = 2048, tools: list[dict] | None = None, ) -> VLMResponse: content = [] if images: for img_b64 in images: content.append({ "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{img_b64}"}, }) content.append({"type": "text", "text": user_text}) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": content}, ] kwargs = { "model": self.model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, } if tools: from tools import to_openai_tools kwargs["tools"] = to_openai_tools(tools) t0 = time.time() response = self.client.chat.completions.create(**kwargs) latency = (time.time() - t0) * 1000 msg = response.choices[0].message usage = response.usage tool_call = _parse_tool_call_openai(msg) if tools else None return VLMResponse( text=msg.content or "", model=self.model, backend="together", input_tokens=getattr(usage, "prompt_tokens", 0), output_tokens=getattr(usage, "completion_tokens", 0), latency_ms=latency, tool_call=tool_call, ) # ============================================================ # Client Factory # ============================================================ class OpenAIMiniClient(OpenAIClient): """OpenAI GPT-4o-mini client.""" def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None): BaseVLMClient.__init__( self, model=model or config.MODELS["openai_mini"], api_key=api_key or config.OPENAI_API_KEY, rate_limit=rate_limit or config.RATE_LIMITS["openai_mini"], ) from openai import OpenAI self.client = OpenAI(api_key=self.api_key) def create_client(backend: str, **kwargs) -> BaseVLMClient: """Factory function to create a VLM client by backend name.""" clients = { "openai": OpenAIClient, "openai_mini": OpenAIMiniClient, "anthropic": AnthropicClient, "together": TogetherClient, } if backend not in clients: raise ValueError(f"Unknown backend: {backend}. Choose from {list(clients.keys())}") return clients[backend](**kwargs)