activemedagent-demo / api_client.py
yuxbox's picture
Upload folder using huggingface_hub
a1aaf30 verified
"""
Unified multi-backend VLM API client with tool-use support.
Supports OpenAI (GPT-4o), Anthropic (Claude), and Together (Qwen2.5-VL).
Handles image encoding, rate limiting, retries, response normalization,
and native function/tool calling across all backends.
"""
import base64
import io
import json
import time
import logging
from collections import deque
from pathlib import Path
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from PIL import Image
import config
logger = logging.getLogger(__name__)
@dataclass
class VLMResponse:
"""Normalized response from any VLM backend, including tool calls."""
text: str
model: str
backend: str
input_tokens: int
output_tokens: int
latency_ms: float
tool_call: object | None = None # tools.ToolCall if a tool was called
def _normalize_image_mode(img: Image.Image) -> Image.Image:
"""Normalize medical image modes to RGB-compatible formats for JPEG encoding."""
if img.mode in ("RGB",):
return img
if img.mode == "RGBA":
background = Image.new("RGB", img.size, (255, 255, 255))
background.paste(img, mask=img.split()[3])
return background
if img.mode == "L":
return img.convert("RGB")
if img.mode in ("I", "I;16", "I;16B", "I;16L"):
import numpy as np
arr = np.array(img, dtype=np.float64)
if arr.max() > arr.min():
arr = (arr - arr.min()) / (arr.max() - arr.min()) * 255.0
else:
arr = np.zeros_like(arr)
return Image.fromarray(arr.astype(np.uint8)).convert("RGB")
if img.mode == "F":
import numpy as np
arr = np.array(img, dtype=np.float64)
if arr.max() > arr.min():
arr = (arr - arr.min()) / (arr.max() - arr.min()) * 255.0
else:
arr = np.zeros_like(arr)
return Image.fromarray(arr.astype(np.uint8)).convert("RGB")
return img.convert("RGB")
def encode_image_to_base64(image_path: str | Path, max_size: int = 1024) -> str:
"""Load and encode an image to base64, resizing if needed."""
img = Image.open(image_path)
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio))
img = img.resize(new_size, Image.LANCZOS)
img = _normalize_image_mode(img)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=90)
return base64.b64encode(buf.getvalue()).decode("utf-8")
def encode_pil_image_to_base64(img: Image.Image, max_size: int = 1024) -> str:
"""Encode a PIL Image object to base64."""
if max(img.size) > max_size:
ratio = max_size / max(img.size)
new_size = (int(img.size[0] * ratio), int(img.size[1] * ratio))
img = img.resize(new_size, Image.LANCZOS)
img = _normalize_image_mode(img)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=90)
return base64.b64encode(buf.getvalue()).decode("utf-8")
class BaseVLMClient(ABC):
"""Abstract base class for VLM API clients with tool-use support."""
def __init__(self, model: str, api_key: str, rate_limit: int = 30):
self.model = model
self.api_key = api_key
self.rate_limit = rate_limit
self._call_timestamps: deque[float] = deque()
def _rate_limit_wait(self):
"""Enforce rate limiting using a sliding window over the last 60 seconds."""
now = time.time()
while self._call_timestamps and now - self._call_timestamps[0] >= 60.0:
self._call_timestamps.popleft()
if len(self._call_timestamps) >= self.rate_limit:
sleep_time = 60.0 - (now - self._call_timestamps[0])
if sleep_time > 0:
time.sleep(sleep_time)
self._call_timestamps.popleft()
self._call_timestamps.append(time.time())
@abstractmethod
def call(
self,
system_prompt: str,
user_text: str,
images: list[str] | None = None,
temperature: float = 0.1,
max_tokens: int = 2048,
tools: list[dict] | None = None,
) -> VLMResponse:
"""Make a VLM API call, optionally with tools."""
pass
def call_multiturn(
self,
system_prompt: str,
messages: list[dict],
temperature: float = 0.1,
max_tokens: int = 2048,
tools: list[dict] | None = None,
) -> VLMResponse:
"""Multi-turn conversation call with tool support. Override in subclasses."""
last_user = ""
last_images = []
for msg in reversed(messages):
if msg["role"] == "user":
if isinstance(msg["content"], str):
last_user = msg["content"]
elif isinstance(msg["content"], list):
for block in msg["content"]:
if block.get("type") == "text":
last_user = block["text"]
elif block.get("type") == "image_url":
last_images.append(block["image_url"]["url"].split(",", 1)[-1])
break
return self.call(system_prompt, last_user, last_images or None, temperature, max_tokens, tools)
def call_with_retry(
self,
system_prompt: str,
user_text: str = None,
images: list[str] | None = None,
temperature: float = 0.1,
max_tokens: int = 2048,
max_retries: int = 3,
messages: list[dict] | None = None,
tools: list[dict] | None = None,
) -> VLMResponse:
"""Call with exponential backoff retry. Supports single-turn, multi-turn, and tools."""
for attempt in range(max_retries):
try:
self._rate_limit_wait()
if messages is not None:
return self.call_multiturn(system_prompt, messages, temperature, max_tokens, tools)
return self.call(system_prompt, user_text, images, temperature, max_tokens, tools)
except Exception as e:
wait_time = 2 ** attempt * 5
logger.warning(
f"API call failed (attempt {attempt + 1}/{max_retries}): {e}. "
f"Retrying in {wait_time}s..."
)
if attempt == max_retries - 1:
raise
time.sleep(wait_time)
def _parse_tool_call_openai(response_message) -> object | None:
"""Extract a ToolCall from an OpenAI response message."""
from tools import ToolCall
tool_calls = getattr(response_message, "tool_calls", None)
if not tool_calls:
return None
tc = tool_calls[0] # Take the first tool call
try:
arguments = json.loads(tc.function.arguments)
except (json.JSONDecodeError, AttributeError):
arguments = {}
return ToolCall(
tool_name=tc.function.name,
arguments=arguments,
call_id=tc.id,
)
def _parse_tool_call_anthropic(response) -> object | None:
"""Extract a ToolCall from an Anthropic response."""
from tools import ToolCall
for block in response.content:
if block.type == "tool_use":
return ToolCall(
tool_name=block.name,
arguments=block.input,
call_id=block.id,
)
return None
# ============================================================
# OpenAI Backend (GPT-4o) — with tool calling
# ============================================================
class OpenAIClient(BaseVLMClient):
"""OpenAI GPT-4o API client with native function calling."""
def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None):
super().__init__(
model=model or config.MODELS["openai"],
api_key=api_key or config.OPENAI_API_KEY,
rate_limit=rate_limit or config.RATE_LIMITS["openai"],
)
from openai import OpenAI
self.client = OpenAI(api_key=self.api_key)
def call(
self,
system_prompt: str,
user_text: str,
images: list[str] | None = None,
temperature: float = 0.1,
max_tokens: int = 2048,
tools: list[dict] | None = None,
) -> VLMResponse:
content = []
if images:
for img_b64 in images:
content.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{img_b64}", "detail": "high"},
})
content.append({"type": "text", "text": user_text})
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": content},
]
kwargs = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
}
if tools:
from tools import to_openai_tools
kwargs["tools"] = to_openai_tools(tools)
kwargs["tool_choice"] = "required"
t0 = time.time()
response = self.client.chat.completions.create(**kwargs)
latency = (time.time() - t0) * 1000
msg = response.choices[0].message
tool_call = _parse_tool_call_openai(msg) if tools else None
return VLMResponse(
text=msg.content or "",
model=self.model,
backend="openai",
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
latency_ms=latency,
tool_call=tool_call,
)
def call_multiturn(
self,
system_prompt: str,
messages: list[dict],
temperature: float = 0.1,
max_tokens: int = 2048,
tools: list[dict] | None = None,
) -> VLMResponse:
"""
Multi-turn OpenAI call with full tool-calling protocol.
Translates our internal message format to OpenAI's API format:
- "user" → role:"user" (passed through)
- "assistant" → role:"assistant" with tool_calls array
- "tool_result" → role:"tool" (text result) + role:"user" (images + follow-up)
OpenAI requires: after an assistant message with tool_calls, the next
message MUST be role:"tool" with the matching tool_call_id. Images
cannot go in tool messages, so we send them in a separate user message.
"""
api_messages = [{"role": "system", "content": system_prompt}]
for msg in messages:
role = msg["role"]
if role == "user":
api_messages.append({
"role": "user",
"content": msg["content"],
})
elif role == "assistant":
api_msg = {"role": "assistant"}
if msg.get("tool_calls"):
tc = msg["tool_calls"][0]
api_msg["tool_calls"] = [{
"id": tc.call_id,
"type": "function",
"function": {
"name": tc.tool_name,
"arguments": json.dumps(tc.arguments),
},
}]
# OpenAI requires content to be null when tool_calls present
api_msg["content"] = msg.get("content") or None
else:
api_msg["content"] = msg.get("content", "")
api_messages.append(api_msg)
elif role == "tool_result":
# Step 1: Send the tool result as role:"tool"
api_messages.append({
"role": "tool",
"tool_call_id": msg["tool_call_id"],
"content": msg.get("content", ""),
})
# Step 2: Send images + follow-up as a user message
# (OpenAI tool messages don't support image content blocks)
follow_up_content = []
for img_b64 in msg.get("images", []):
follow_up_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{img_b64}",
},
})
follow_up = msg.get("follow_up", "")
if follow_up:
follow_up_content.append({
"type": "text",
"text": follow_up,
})
if follow_up_content:
api_messages.append({
"role": "user",
"content": follow_up_content,
})
kwargs = {
"model": self.model,
"messages": api_messages,
"temperature": temperature,
"max_tokens": max_tokens,
}
if tools:
from tools import to_openai_tools
kwargs["tools"] = to_openai_tools(tools)
kwargs["tool_choice"] = "required"
t0 = time.time()
response = self.client.chat.completions.create(**kwargs)
latency = (time.time() - t0) * 1000
msg = response.choices[0].message
tool_call = _parse_tool_call_openai(msg) if tools else None
return VLMResponse(
text=msg.content or "",
model=self.model,
backend="openai",
input_tokens=response.usage.prompt_tokens,
output_tokens=response.usage.completion_tokens,
latency_ms=latency,
tool_call=tool_call,
)
# ============================================================
# Anthropic Backend (Claude) — with tool use
# ============================================================
class AnthropicClient(BaseVLMClient):
"""Anthropic Claude API client with native tool use."""
def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None):
super().__init__(
model=model or config.MODELS["anthropic"],
api_key=api_key or config.ANTHROPIC_API_KEY,
rate_limit=rate_limit or config.RATE_LIMITS["anthropic"],
)
from anthropic import Anthropic
self.client = Anthropic(api_key=self.api_key)
def call(
self,
system_prompt: str,
user_text: str,
images: list[str] | None = None,
temperature: float = 0.1,
max_tokens: int = 2048,
tools: list[dict] | None = None,
) -> VLMResponse:
content = []
if images:
for img_b64 in images:
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": img_b64,
},
})
content.append({"type": "text", "text": user_text})
kwargs = {
"model": self.model,
"system": system_prompt,
"messages": [{"role": "user", "content": content}],
"temperature": temperature,
"max_tokens": max_tokens,
}
if tools:
from tools import to_anthropic_tools
kwargs["tools"] = to_anthropic_tools(tools)
kwargs["tool_choice"] = {"type": "any"}
t0 = time.time()
response = self.client.messages.create(**kwargs)
latency = (time.time() - t0) * 1000
# Extract text from response (may have both text and tool_use blocks)
text_parts = []
for block in response.content:
if hasattr(block, "text"):
text_parts.append(block.text)
tool_call = _parse_tool_call_anthropic(response) if tools else None
return VLMResponse(
text="\n".join(text_parts),
model=self.model,
backend="anthropic",
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
latency_ms=latency,
tool_call=tool_call,
)
def call_multiturn(
self,
system_prompt: str,
messages: list[dict],
temperature: float = 0.1,
max_tokens: int = 2048,
tools: list[dict] | None = None,
) -> VLMResponse:
"""
Multi-turn Anthropic call with full tool-use protocol.
Translates our internal message format to Anthropic's API format:
- "user" → role:"user" (passed through)
- "assistant" → role:"assistant" with tool_use content blocks
- "tool_result" → role:"user" with tool_result block + image blocks
Anthropic's protocol: after an assistant message with a tool_use block,
the next message MUST be role:"user" containing a tool_result block
with the matching tool_use_id. Images and follow-up text can be
included in the same user message as additional content blocks.
"""
api_messages = []
for msg in messages:
role = msg["role"]
if role == "user":
content = msg["content"]
# Convert image_url format to Anthropic's image format
if isinstance(content, list):
anthropic_content = []
for block in content:
if block.get("type") == "image_url":
url = block["image_url"]["url"]
# Extract base64 data from data URL
if url.startswith("data:"):
b64_data = url.split(",", 1)[-1]
else:
b64_data = url
anthropic_content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": b64_data,
},
})
elif block.get("type") == "text":
anthropic_content.append(block)
else:
anthropic_content.append(block)
api_messages.append({
"role": "user",
"content": anthropic_content,
})
else:
api_messages.append({
"role": "user",
"content": content,
})
elif role == "assistant":
content_blocks = []
if msg.get("content"):
content_blocks.append({
"type": "text",
"text": msg["content"],
})
if msg.get("tool_calls"):
tc = msg["tool_calls"][0]
content_blocks.append({
"type": "tool_use",
"id": tc.call_id,
"name": tc.tool_name,
"input": tc.arguments,
})
api_messages.append({
"role": "assistant",
"content": content_blocks,
})
elif role == "tool_result":
# Anthropic: tool_result goes in a user message alongside
# any images and follow-up text
user_content = []
# The tool_result block
user_content.append({
"type": "tool_result",
"tool_use_id": msg["tool_call_id"],
"content": msg.get("content", ""),
})
# Images from the channel data
for img_b64 in msg.get("images", []):
user_content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": img_b64,
},
})
# Follow-up text (next-step instructions)
follow_up = msg.get("follow_up", "")
if follow_up:
user_content.append({
"type": "text",
"text": follow_up,
})
api_messages.append({
"role": "user",
"content": user_content,
})
kwargs = {
"model": self.model,
"system": system_prompt,
"messages": api_messages,
"temperature": temperature,
"max_tokens": max_tokens,
}
if tools:
from tools import to_anthropic_tools
kwargs["tools"] = to_anthropic_tools(tools)
kwargs["tool_choice"] = {"type": "any"}
t0 = time.time()
response = self.client.messages.create(**kwargs)
latency = (time.time() - t0) * 1000
text_parts = []
for block in response.content:
if hasattr(block, "text"):
text_parts.append(block.text)
tool_call = _parse_tool_call_anthropic(response) if tools else None
return VLMResponse(
text="\n".join(text_parts),
model=self.model,
backend="anthropic",
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
latency_ms=latency,
tool_call=tool_call,
)
# ============================================================
# Together Backend (Qwen2.5-VL) — with tool calling
# ============================================================
class TogetherClient(BaseVLMClient):
"""Together AI client with function calling support."""
def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None):
super().__init__(
model=model or config.MODELS["together"],
api_key=api_key or config.TOGETHER_API_KEY,
rate_limit=rate_limit or config.RATE_LIMITS["together"],
)
from together import Together
self.client = Together(api_key=self.api_key)
def call(
self,
system_prompt: str,
user_text: str,
images: list[str] | None = None,
temperature: float = 0.1,
max_tokens: int = 2048,
tools: list[dict] | None = None,
) -> VLMResponse:
content = []
if images:
for img_b64 in images:
content.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{img_b64}"},
})
content.append({"type": "text", "text": user_text})
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": content},
]
kwargs = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
}
if tools:
from tools import to_openai_tools
kwargs["tools"] = to_openai_tools(tools)
t0 = time.time()
response = self.client.chat.completions.create(**kwargs)
latency = (time.time() - t0) * 1000
msg = response.choices[0].message
usage = response.usage
tool_call = _parse_tool_call_openai(msg) if tools else None
return VLMResponse(
text=msg.content or "",
model=self.model,
backend="together",
input_tokens=getattr(usage, "prompt_tokens", 0),
output_tokens=getattr(usage, "completion_tokens", 0),
latency_ms=latency,
tool_call=tool_call,
)
# ============================================================
# Client Factory
# ============================================================
class OpenAIMiniClient(OpenAIClient):
"""OpenAI GPT-4o-mini client."""
def __init__(self, model: str = None, api_key: str = None, rate_limit: int = None):
BaseVLMClient.__init__(
self,
model=model or config.MODELS["openai_mini"],
api_key=api_key or config.OPENAI_API_KEY,
rate_limit=rate_limit or config.RATE_LIMITS["openai_mini"],
)
from openai import OpenAI
self.client = OpenAI(api_key=self.api_key)
def create_client(backend: str, **kwargs) -> BaseVLMClient:
"""Factory function to create a VLM client by backend name."""
clients = {
"openai": OpenAIClient,
"openai_mini": OpenAIMiniClient,
"anthropic": AnthropicClient,
"together": TogetherClient,
}
if backend not in clients:
raise ValueError(f"Unknown backend: {backend}. Choose from {list(clients.keys())}")
return clients[backend](**kwargs)