| """OpenAI LLM client implementation.""" |
|
|
| import json |
| import logging |
| from typing import Any |
|
|
| from openai import AsyncOpenAI |
|
|
| from ..retry import RetryConfig, async_retry |
| from ..schema import FunctionCall, LLMResponse, Message, TokenUsage, ToolCall |
| from .base import LLMClientBase |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class OpenAIClient(LLMClientBase): |
| """LLM client using OpenAI's protocol. |
| |
| This client uses the official OpenAI SDK and supports: |
| - Reasoning content (via reasoning_split=True) |
| - Tool calling |
| - Retry logic |
| """ |
|
|
| def __init__( |
| self, |
| api_key: str, |
| api_base: str = "https://api.minimaxi.com/v1", |
| model: str = "MiniMax-M2.1", |
| retry_config: RetryConfig | None = None, |
| ): |
| """Initialize OpenAI client. |
| |
| Args: |
| api_key: API key for authentication |
| api_base: Base URL for the API (default: MiniMax OpenAI endpoint) |
| model: Model name to use (default: MiniMax-M2.1) |
| retry_config: Optional retry configuration |
| """ |
| super().__init__(api_key, api_base, model, retry_config) |
|
|
| |
| self.client = AsyncOpenAI( |
| api_key=api_key, |
| base_url=api_base, |
| ) |
|
|
| async def _make_api_request( |
| self, |
| api_messages: list[dict[str, Any]], |
| tools: list[Any] | None = None, |
| ) -> Any: |
| """Execute API request (core method that can be retried). |
| |
| Args: |
| api_messages: List of messages in OpenAI format |
| tools: Optional list of tools |
| |
| Returns: |
| OpenAI ChatCompletion response (full response including usage) |
| |
| Raises: |
| Exception: API call failed |
| """ |
| params = { |
| "model": self.model, |
| "messages": api_messages, |
| |
| "extra_body": {"reasoning_split": True}, |
| } |
|
|
| if tools: |
| params["tools"] = self._convert_tools(tools) |
|
|
| |
| response = await self.client.chat.completions.create(**params) |
| |
| return response |
|
|
| def _convert_tools(self, tools: list[Any]) -> list[dict[str, Any]]: |
| """Convert tools to OpenAI format. |
| |
| Args: |
| tools: List of Tool objects or dicts |
| |
| Returns: |
| List of tools in OpenAI dict format |
| """ |
| result = [] |
| for tool in tools: |
| if isinstance(tool, dict): |
| |
| if "type" in tool and tool["type"] == "function": |
| result.append(tool) |
| else: |
| |
| result.append( |
| { |
| "type": "function", |
| "function": { |
| "name": tool["name"], |
| "description": tool["description"], |
| "parameters": tool["input_schema"], |
| }, |
| } |
| ) |
| elif hasattr(tool, "to_openai_schema"): |
| |
| result.append(tool.to_openai_schema()) |
| else: |
| raise TypeError(f"Unsupported tool type: {type(tool)}") |
| return result |
|
|
| def _convert_messages(self, messages: list[Message]) -> tuple[str | None, list[dict[str, Any]]]: |
| """Convert internal messages to OpenAI format. |
| |
| Args: |
| messages: List of internal Message objects |
| |
| Returns: |
| Tuple of (system_message, api_messages) |
| Note: OpenAI includes system message in the messages array |
| """ |
| api_messages = [] |
|
|
| for msg in messages: |
| if msg.role == "system": |
| |
| api_messages.append({"role": "system", "content": msg.content}) |
| continue |
|
|
| |
| if msg.role == "user": |
| api_messages.append({"role": "user", "content": msg.content}) |
|
|
| |
| elif msg.role == "assistant": |
| assistant_msg = {"role": "assistant"} |
|
|
| |
| if msg.content: |
| assistant_msg["content"] = msg.content |
|
|
| |
| if msg.tool_calls: |
| tool_calls_list = [] |
| for tool_call in msg.tool_calls: |
| tool_calls_list.append( |
| { |
| "id": tool_call.id, |
| "type": "function", |
| "function": { |
| "name": tool_call.function.name, |
| "arguments": json.dumps(tool_call.function.arguments), |
| }, |
| } |
| ) |
| assistant_msg["tool_calls"] = tool_calls_list |
|
|
| |
| |
| |
| |
| |
| if msg.thinking: |
| assistant_msg["reasoning_details"] = [{"text": msg.thinking}] |
|
|
| api_messages.append(assistant_msg) |
|
|
| |
| elif msg.role == "tool": |
| api_messages.append( |
| { |
| "role": "tool", |
| "tool_call_id": msg.tool_call_id, |
| "content": msg.content, |
| } |
| ) |
|
|
| return None, api_messages |
|
|
| def _prepare_request( |
| self, |
| messages: list[Message], |
| tools: list[Any] | None = None, |
| ) -> dict[str, Any]: |
| """Prepare the request for OpenAI API. |
| |
| Args: |
| messages: List of conversation messages |
| tools: Optional list of available tools |
| |
| Returns: |
| Dictionary containing request parameters |
| """ |
| _, api_messages = self._convert_messages(messages) |
|
|
| return { |
| "api_messages": api_messages, |
| "tools": tools, |
| } |
|
|
| def _parse_response(self, response: Any) -> LLMResponse: |
| """Parse OpenAI response into LLMResponse. |
| |
| Args: |
| response: OpenAI ChatCompletion response (full response object) |
| |
| Returns: |
| LLMResponse object |
| """ |
| |
| message = response.choices[0].message |
|
|
| |
| text_content = message.content or "" |
|
|
| |
| thinking_content = "" |
| if hasattr(message, "reasoning_details") and message.reasoning_details: |
| |
| for detail in message.reasoning_details: |
| if hasattr(detail, "text"): |
| thinking_content += detail.text |
|
|
| |
| tool_calls = [] |
| if message.tool_calls: |
| for tool_call in message.tool_calls: |
| |
| arguments = json.loads(tool_call.function.arguments) |
|
|
| tool_calls.append( |
| ToolCall( |
| id=tool_call.id, |
| type="function", |
| function=FunctionCall( |
| name=tool_call.function.name, |
| arguments=arguments, |
| ), |
| ) |
| ) |
|
|
| |
| usage = None |
| if hasattr(response, "usage") and response.usage: |
| usage = TokenUsage( |
| prompt_tokens=response.usage.prompt_tokens or 0, |
| completion_tokens=response.usage.completion_tokens or 0, |
| total_tokens=response.usage.total_tokens or 0, |
| ) |
|
|
| return LLMResponse( |
| content=text_content, |
| thinking=thinking_content if thinking_content else None, |
| tool_calls=tool_calls if tool_calls else None, |
| finish_reason="stop", |
| usage=usage, |
| ) |
|
|
| async def generate( |
| self, |
| messages: list[Message], |
| tools: list[Any] | None = None, |
| ) -> LLMResponse: |
| """Generate response from OpenAI LLM. |
| |
| Args: |
| messages: List of conversation messages |
| tools: Optional list of available tools |
| |
| Returns: |
| LLMResponse containing the generated content |
| """ |
| |
| request_params = self._prepare_request(messages, tools) |
|
|
| |
| if self.retry_config.enabled: |
| |
| retry_decorator = async_retry(config=self.retry_config, on_retry=self.retry_callback) |
| api_call = retry_decorator(self._make_api_request) |
| response = await api_call( |
| request_params["api_messages"], |
| request_params["tools"], |
| ) |
| else: |
| |
| response = await self._make_api_request( |
| request_params["api_messages"], |
| request_params["tools"], |
| ) |
|
|
| |
| return self._parse_response(response) |
|
|