| """
|
| LLM Provider Abstraction Layer.
|
|
|
| This module provides a unified interface for multiple LLM providers:
|
| - OpenAI (ChatGPT)
|
| - Google (Gemini)
|
| - Anthropic (Claude)
|
| - xAI (Grok)
|
| - Moonshot (Kimi)
|
| - Deepseek
|
| - Zhipu (智谱清言 GLM)
|
|
|
| Users can select their preferred provider and configure API keys.
|
| """
|
|
|
| import os
|
| import httpx
|
| import json
|
| from abc import ABC, abstractmethod
|
| from typing import Optional, Dict, Any, List
|
| from dataclasses import dataclass
|
| from enum import Enum
|
|
|
|
|
| class LLMProvider(str, Enum):
|
| """Supported LLM providers."""
|
| OPENAI = "openai"
|
| GEMINI = "gemini"
|
| CLAUDE = "claude"
|
| GROK = "grok"
|
| KIMI = "kimi"
|
| DEEPSEEK = "deepseek"
|
| ZHIPU = "zhipu"
|
|
|
|
|
| @dataclass
|
| class LLMConfig:
|
| """Configuration for LLM providers."""
|
| provider: LLMProvider
|
| api_key: str
|
| model: Optional[str] = None
|
| base_url: Optional[str] = None
|
| temperature: float = 0.3
|
| max_tokens: int = 4096
|
| timeout: int = 120
|
|
|
|
|
|
|
| DEFAULT_MODELS = {
|
| LLMProvider.OPENAI: "gpt-5.2",
|
| LLMProvider.GEMINI: "gemini-3-pro-preview",
|
| LLMProvider.CLAUDE: "claude-3-5-sonnet-20241022",
|
| LLMProvider.GROK: "grok-2-latest",
|
| LLMProvider.KIMI: "kimi-k2-0905-preview",
|
| LLMProvider.DEEPSEEK: "deepseek-chat",
|
| LLMProvider.ZHIPU: "glm-4-plus",
|
| }
|
|
|
|
|
| API_ENDPOINTS = {
|
| LLMProvider.OPENAI: "https://api.openai.com/v1/chat/completions",
|
| LLMProvider.GEMINI: "https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent",
|
| LLMProvider.CLAUDE: "https://api.anthropic.com/v1/messages",
|
| LLMProvider.GROK: "https://api.x.ai/v1/chat/completions",
|
| LLMProvider.KIMI: "https://api.moonshot.cn/v1/chat/completions",
|
| LLMProvider.DEEPSEEK: "https://api.deepseek.com/chat/completions",
|
| LLMProvider.ZHIPU: "https://open.bigmodel.cn/api/paas/v4/chat/completions",
|
| }
|
|
|
|
|
| API_KEY_ENV_VARS = {
|
| LLMProvider.OPENAI: "OPENAI_API_KEY",
|
| LLMProvider.GEMINI: "GEMINI_API_KEY",
|
| LLMProvider.CLAUDE: "ANTHROPIC_API_KEY",
|
| LLMProvider.GROK: "GROK_API_KEY",
|
| LLMProvider.KIMI: "MOONSHOT_API_KEY",
|
| LLMProvider.DEEPSEEK: "DEEPSEEK_API_KEY",
|
| LLMProvider.ZHIPU: "ZHIPU_API_KEY",
|
| }
|
|
|
|
|
| class BaseLLMClient(ABC):
|
| """Abstract base class for LLM clients."""
|
|
|
| def __init__(self, config: LLMConfig):
|
| self.config = config
|
| self.model = config.model or DEFAULT_MODELS.get(config.provider)
|
|
|
| @abstractmethod
|
| async def chat(
|
| self,
|
| messages: List[Dict[str, str]],
|
| temperature: Optional[float] = None,
|
| ) -> str:
|
| """Send chat messages and get response."""
|
| pass
|
|
|
| def chat_sync(
|
| self,
|
| messages: List[Dict[str, str]],
|
| temperature: Optional[float] = None,
|
| ) -> str:
|
| """Synchronous version of chat."""
|
| import asyncio
|
| try:
|
| loop = asyncio.get_event_loop()
|
| if loop.is_running():
|
|
|
| import concurrent.futures
|
| with concurrent.futures.ThreadPoolExecutor() as executor:
|
| future = executor.submit(
|
| asyncio.run,
|
| self.chat(messages, temperature)
|
| )
|
| return future.result()
|
| else:
|
| return asyncio.run(self.chat(messages, temperature))
|
| except RuntimeError:
|
| return asyncio.run(self.chat(messages, temperature))
|
|
|
|
|
| class OpenAICompatibleClient(BaseLLMClient):
|
| """
|
| Client for OpenAI-compatible APIs.
|
|
|
| Works with: OpenAI, Grok, Kimi, Deepseek, Zhipu
|
| These all use the same API format.
|
| """
|
|
|
| async def chat(
|
| self,
|
| messages: List[Dict[str, str]],
|
| temperature: Optional[float] = None,
|
| ) -> str:
|
| """Send chat request to OpenAI-compatible endpoint."""
|
|
|
| endpoint = self.config.base_url or API_ENDPOINTS.get(self.config.provider)
|
|
|
| headers = {
|
| "Authorization": f"Bearer {self.config.api_key}",
|
| "Content-Type": "application/json",
|
| }
|
|
|
| payload = {
|
| "model": self.model,
|
| "messages": messages,
|
| "temperature": temperature or self.config.temperature,
|
| "max_tokens": self.config.max_tokens,
|
| }
|
|
|
| print(f"[LLM] Calling {self.config.provider.value} API...")
|
| print(f"[LLM] Endpoint: {endpoint}")
|
| print(f"[LLM] Model: {self.model}")
|
|
|
| try:
|
| async with httpx.AsyncClient(timeout=self.config.timeout) as client:
|
| response = await client.post(
|
| endpoint,
|
| headers=headers,
|
| json=payload,
|
| )
|
|
|
| if response.status_code != 200:
|
| error_text = response.text
|
| print(f"[LLM] Error response: {error_text}")
|
| raise Exception(f"API Error {response.status_code}: {error_text}")
|
|
|
| data = response.json()
|
| content = data["choices"][0]["message"]["content"]
|
| print(f"[LLM] Success! Response length: {len(content)} chars")
|
| return content
|
|
|
| except httpx.TimeoutException:
|
| raise Exception(f"Request timeout after {self.config.timeout}s")
|
| except httpx.RequestError as e:
|
| raise Exception(f"Request failed: {str(e)}")
|
|
|
|
|
| class ClaudeClient(BaseLLMClient):
|
| """Client for Anthropic Claude API."""
|
|
|
| async def chat(
|
| self,
|
| messages: List[Dict[str, str]],
|
| temperature: Optional[float] = None,
|
| ) -> str:
|
| """Send chat request to Claude API."""
|
|
|
| endpoint = self.config.base_url or API_ENDPOINTS[LLMProvider.CLAUDE]
|
|
|
|
|
| system_content = ""
|
| user_messages = []
|
|
|
| for msg in messages:
|
| if msg["role"] == "system":
|
| system_content = msg["content"]
|
| else:
|
| user_messages.append(msg)
|
|
|
| headers = {
|
| "x-api-key": self.config.api_key,
|
| "Content-Type": "application/json",
|
| "anthropic-version": "2023-06-01",
|
| }
|
|
|
| payload = {
|
| "model": self.model,
|
| "max_tokens": self.config.max_tokens,
|
| "messages": user_messages,
|
| "temperature": temperature or self.config.temperature,
|
| }
|
|
|
| if system_content:
|
| payload["system"] = system_content
|
|
|
| print(f"[LLM] Calling Claude API...")
|
| print(f"[LLM] Model: {self.model}")
|
|
|
| try:
|
| async with httpx.AsyncClient(timeout=self.config.timeout) as client:
|
| response = await client.post(
|
| endpoint,
|
| headers=headers,
|
| json=payload,
|
| )
|
|
|
| if response.status_code != 200:
|
| error_text = response.text
|
| print(f"[LLM] Error response: {error_text}")
|
| raise Exception(f"API Error {response.status_code}: {error_text}")
|
|
|
| data = response.json()
|
| content = data["content"][0]["text"]
|
| print(f"[LLM] Success! Response length: {len(content)} chars")
|
| return content
|
|
|
| except httpx.TimeoutException:
|
| raise Exception(f"Request timeout after {self.config.timeout}s")
|
| except httpx.RequestError as e:
|
| raise Exception(f"Request failed: {str(e)}")
|
|
|
|
|
| class GeminiClient(BaseLLMClient):
|
| """Client for Google Gemini API."""
|
|
|
| async def chat(
|
| self,
|
| messages: List[Dict[str, str]],
|
| temperature: Optional[float] = None,
|
| ) -> str:
|
| """Send chat request to Gemini API."""
|
|
|
| endpoint = API_ENDPOINTS[LLMProvider.GEMINI].format(model=self.model)
|
| endpoint = f"{endpoint}?key={self.config.api_key}"
|
|
|
|
|
| contents = []
|
| system_instruction = None
|
|
|
| for msg in messages:
|
| if msg["role"] == "system":
|
| system_instruction = msg["content"]
|
| elif msg["role"] == "user":
|
| contents.append({
|
| "role": "user",
|
| "parts": [{"text": msg["content"]}]
|
| })
|
| elif msg["role"] == "assistant":
|
| contents.append({
|
| "role": "model",
|
| "parts": [{"text": msg["content"]}]
|
| })
|
|
|
| payload = {
|
| "contents": contents,
|
| "generationConfig": {
|
| "temperature": temperature or self.config.temperature,
|
| "maxOutputTokens": self.config.max_tokens,
|
| }
|
| }
|
|
|
| if system_instruction:
|
| payload["systemInstruction"] = {
|
| "parts": [{"text": system_instruction}]
|
| }
|
|
|
| headers = {"Content-Type": "application/json"}
|
|
|
| print(f"[LLM] Calling Gemini API...")
|
| print(f"[LLM] Model: {self.model}")
|
|
|
| try:
|
| async with httpx.AsyncClient(timeout=self.config.timeout) as client:
|
| response = await client.post(
|
| endpoint,
|
| headers=headers,
|
| json=payload,
|
| )
|
|
|
| if response.status_code != 200:
|
| error_text = response.text
|
| print(f"[LLM] Error response: {error_text}")
|
| raise Exception(f"API Error {response.status_code}: {error_text}")
|
|
|
| data = response.json()
|
| content = data["candidates"][0]["content"]["parts"][0]["text"]
|
| print(f"[LLM] Success! Response length: {len(content)} chars")
|
| return content
|
|
|
| except httpx.TimeoutException:
|
| raise Exception(f"Request timeout after {self.config.timeout}s")
|
| except httpx.RequestError as e:
|
| raise Exception(f"Request failed: {str(e)}")
|
|
|
|
|
| def create_llm_client(config: LLMConfig) -> BaseLLMClient:
|
| """
|
| Factory function to create appropriate LLM client.
|
|
|
| Args:
|
| config: LLM configuration
|
|
|
| Returns:
|
| Appropriate LLM client instance
|
| """
|
| print(f"[LLM] Creating client for provider: {config.provider.value}")
|
|
|
| if config.provider == LLMProvider.CLAUDE:
|
| return ClaudeClient(config)
|
| elif config.provider == LLMProvider.GEMINI:
|
| return GeminiClient(config)
|
| else:
|
|
|
| return OpenAICompatibleClient(config)
|
|
|
|
|
| def get_available_providers() -> List[Dict[str, str]]:
|
| """
|
| Get list of available providers with their display names.
|
|
|
| Returns:
|
| List of provider info dicts
|
| """
|
| return [
|
| {"id": "openai", "name": "OpenAI (ChatGPT)", "env_var": "OPENAI_API_KEY"},
|
| {"id": "gemini", "name": "Google Gemini", "env_var": "GEMINI_API_KEY"},
|
| {"id": "claude", "name": "Anthropic Claude", "env_var": "ANTHROPIC_API_KEY"},
|
| {"id": "grok", "name": "xAI Grok", "env_var": "GROK_API_KEY"},
|
| {"id": "kimi", "name": "Moonshot Kimi", "env_var": "MOONSHOT_API_KEY"},
|
| {"id": "deepseek", "name": "Deepseek", "env_var": "DEEPSEEK_API_KEY"},
|
| {"id": "zhipu", "name": "智谱清言 (GLM)", "env_var": "ZHIPU_API_KEY"},
|
| ]
|
|
|
|
|
| def detect_available_provider() -> Optional[LLMProvider]:
|
| """
|
| Auto-detect which provider has an API key configured.
|
|
|
| Returns:
|
| First available provider, or None
|
| """
|
| for provider, env_var in API_KEY_ENV_VARS.items():
|
| if os.environ.get(env_var):
|
| print(f"[LLM] Auto-detected provider: {provider.value} (from {env_var})")
|
| return provider
|
| return None
|
|
|
|
|
| def create_client_from_env(
|
| provider: Optional[LLMProvider] = None
|
| ) -> Optional[BaseLLMClient]:
|
| """
|
| Create LLM client from environment variables.
|
|
|
| Args:
|
| provider: Specific provider to use, or None to auto-detect
|
|
|
| Returns:
|
| LLM client or None if no API key found
|
| """
|
| if provider is None:
|
| provider = detect_available_provider()
|
|
|
| if provider is None:
|
| return None
|
|
|
| env_var = API_KEY_ENV_VARS.get(provider)
|
| api_key = os.environ.get(env_var, "")
|
|
|
| if not api_key:
|
| return None
|
|
|
| config = LLMConfig(
|
| provider=provider,
|
| api_key=api_key,
|
| )
|
|
|
| return create_llm_client(config)
|
|
|
|
|
|
|
| def test_provider(provider_name: str, api_key: str, test_message: str = "Hello, respond with 'OK' if you receive this.") -> Dict[str, Any]:
|
| """
|
| Test a specific LLM provider.
|
|
|
| Args:
|
| provider_name: Provider name (openai, gemini, etc.)
|
| api_key: API key
|
| test_message: Test message to send
|
|
|
| Returns:
|
| Dict with success status and response/error
|
| """
|
| import asyncio
|
|
|
| try:
|
| provider = LLMProvider(provider_name.lower())
|
| config = LLMConfig(
|
| provider=provider,
|
| api_key=api_key,
|
| )
|
| client = create_llm_client(config)
|
|
|
| messages = [
|
| {"role": "user", "content": test_message}
|
| ]
|
|
|
| response = asyncio.run(client.chat(messages))
|
|
|
| return {
|
| "success": True,
|
| "provider": provider_name,
|
| "model": client.model,
|
| "response": response[:500] if len(response) > 500 else response,
|
| }
|
|
|
| except Exception as e:
|
| return {
|
| "success": False,
|
| "provider": provider_name,
|
| "error": str(e),
|
| }
|
|
|