Preformu / layers /llm_providers.py
Kevinshh's picture
Upload 2 files
24b0113 verified
"""
LLM Provider Abstraction Layer.
This module provides a unified interface for multiple LLM providers:
- OpenAI (ChatGPT)
- Google (Gemini)
- Anthropic (Claude)
- xAI (Grok)
- Moonshot (Kimi)
- Deepseek
- Zhipu (智谱清言 GLM)
Users can select their preferred provider and configure API keys.
"""
import os
import httpx
import json
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from enum import Enum
class LLMProvider(str, Enum):
"""Supported LLM providers."""
OPENAI = "openai" # ChatGPT
GEMINI = "gemini" # Google Gemini
CLAUDE = "claude" # Anthropic Claude
GROK = "grok" # xAI Grok
KIMI = "kimi" # Moonshot Kimi
DEEPSEEK = "deepseek" # Deepseek
ZHIPU = "zhipu" # 智谱清言 GLM
@dataclass
class LLMConfig:
"""Configuration for LLM providers."""
provider: LLMProvider
api_key: str
model: Optional[str] = None # If None, use default model
base_url: Optional[str] = None # Custom API endpoint
temperature: float = 0.3
max_tokens: int = 4096
timeout: int = 120
# Default models for each provider (Updated 2026-01)
DEFAULT_MODELS = {
LLMProvider.OPENAI: "gpt-5.2", # Verified: Latest GPT model
LLMProvider.GEMINI: "gemini-3-pro-preview", # Verified: Preview API name
LLMProvider.CLAUDE: "claude-3-5-sonnet-20241022", # Reverting to known stable for reliability
LLMProvider.GROK: "grok-2-latest", # Reverting to verified latest
LLMProvider.KIMI: "kimi-k2-0905-preview", # Verified: Correct Kimi K2 API name
LLMProvider.DEEPSEEK: "deepseek-chat", # Points to latest (V3) automatically
LLMProvider.ZHIPU: "glm-4-plus", # Verified: Stable release
}
# API endpoints for each provider
API_ENDPOINTS = {
LLMProvider.OPENAI: "https://api.openai.com/v1/chat/completions",
LLMProvider.GEMINI: "https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent",
LLMProvider.CLAUDE: "https://api.anthropic.com/v1/messages",
LLMProvider.GROK: "https://api.x.ai/v1/chat/completions",
LLMProvider.KIMI: "https://api.moonshot.cn/v1/chat/completions",
LLMProvider.DEEPSEEK: "https://api.deepseek.com/chat/completions", # Fixed: removed /v1
LLMProvider.ZHIPU: "https://open.bigmodel.cn/api/paas/v4/chat/completions",
}
# Environment variable names for API keys
API_KEY_ENV_VARS = {
LLMProvider.OPENAI: "OPENAI_API_KEY",
LLMProvider.GEMINI: "GEMINI_API_KEY",
LLMProvider.CLAUDE: "ANTHROPIC_API_KEY",
LLMProvider.GROK: "GROK_API_KEY",
LLMProvider.KIMI: "MOONSHOT_API_KEY",
LLMProvider.DEEPSEEK: "DEEPSEEK_API_KEY",
LLMProvider.ZHIPU: "ZHIPU_API_KEY",
}
class BaseLLMClient(ABC):
"""Abstract base class for LLM clients."""
def __init__(self, config: LLMConfig):
self.config = config
self.model = config.model or DEFAULT_MODELS.get(config.provider)
@abstractmethod
async def chat(
self,
messages: List[Dict[str, str]],
temperature: Optional[float] = None,
) -> str:
"""Send chat messages and get response."""
pass
def chat_sync(
self,
messages: List[Dict[str, str]],
temperature: Optional[float] = None,
) -> str:
"""Synchronous version of chat."""
import asyncio
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# Create new loop in thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
asyncio.run,
self.chat(messages, temperature)
)
return future.result()
else:
return asyncio.run(self.chat(messages, temperature))
except RuntimeError:
return asyncio.run(self.chat(messages, temperature))
class OpenAICompatibleClient(BaseLLMClient):
"""
Client for OpenAI-compatible APIs.
Works with: OpenAI, Grok, Kimi, Deepseek, Zhipu
These all use the same API format.
"""
async def chat(
self,
messages: List[Dict[str, str]],
temperature: Optional[float] = None,
) -> str:
"""Send chat request to OpenAI-compatible endpoint."""
endpoint = self.config.base_url or API_ENDPOINTS.get(self.config.provider)
headers = {
"Authorization": f"Bearer {self.config.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"messages": messages,
"temperature": temperature or self.config.temperature,
"max_tokens": self.config.max_tokens,
}
print(f"[LLM] Calling {self.config.provider.value} API...")
print(f"[LLM] Endpoint: {endpoint}")
print(f"[LLM] Model: {self.model}")
try:
async with httpx.AsyncClient(timeout=self.config.timeout) as client:
response = await client.post(
endpoint,
headers=headers,
json=payload,
)
if response.status_code != 200:
error_text = response.text
print(f"[LLM] Error response: {error_text}")
raise Exception(f"API Error {response.status_code}: {error_text}")
data = response.json()
content = data["choices"][0]["message"]["content"]
print(f"[LLM] Success! Response length: {len(content)} chars")
return content
except httpx.TimeoutException:
raise Exception(f"Request timeout after {self.config.timeout}s")
except httpx.RequestError as e:
raise Exception(f"Request failed: {str(e)}")
class ClaudeClient(BaseLLMClient):
"""Client for Anthropic Claude API."""
async def chat(
self,
messages: List[Dict[str, str]],
temperature: Optional[float] = None,
) -> str:
"""Send chat request to Claude API."""
endpoint = self.config.base_url or API_ENDPOINTS[LLMProvider.CLAUDE]
# Extract system message if present
system_content = ""
user_messages = []
for msg in messages:
if msg["role"] == "system":
system_content = msg["content"]
else:
user_messages.append(msg)
headers = {
"x-api-key": self.config.api_key,
"Content-Type": "application/json",
"anthropic-version": "2023-06-01",
}
payload = {
"model": self.model,
"max_tokens": self.config.max_tokens,
"messages": user_messages,
"temperature": temperature or self.config.temperature,
}
if system_content:
payload["system"] = system_content
print(f"[LLM] Calling Claude API...")
print(f"[LLM] Model: {self.model}")
try:
async with httpx.AsyncClient(timeout=self.config.timeout) as client:
response = await client.post(
endpoint,
headers=headers,
json=payload,
)
if response.status_code != 200:
error_text = response.text
print(f"[LLM] Error response: {error_text}")
raise Exception(f"API Error {response.status_code}: {error_text}")
data = response.json()
content = data["content"][0]["text"]
print(f"[LLM] Success! Response length: {len(content)} chars")
return content
except httpx.TimeoutException:
raise Exception(f"Request timeout after {self.config.timeout}s")
except httpx.RequestError as e:
raise Exception(f"Request failed: {str(e)}")
class GeminiClient(BaseLLMClient):
"""Client for Google Gemini API."""
async def chat(
self,
messages: List[Dict[str, str]],
temperature: Optional[float] = None,
) -> str:
"""Send chat request to Gemini API."""
endpoint = API_ENDPOINTS[LLMProvider.GEMINI].format(model=self.model)
endpoint = f"{endpoint}?key={self.config.api_key}"
# Convert messages to Gemini format
contents = []
system_instruction = None
for msg in messages:
if msg["role"] == "system":
system_instruction = msg["content"]
elif msg["role"] == "user":
contents.append({
"role": "user",
"parts": [{"text": msg["content"]}]
})
elif msg["role"] == "assistant":
contents.append({
"role": "model",
"parts": [{"text": msg["content"]}]
})
payload = {
"contents": contents,
"generationConfig": {
"temperature": temperature or self.config.temperature,
"maxOutputTokens": self.config.max_tokens,
}
}
if system_instruction:
payload["systemInstruction"] = {
"parts": [{"text": system_instruction}]
}
headers = {"Content-Type": "application/json"}
print(f"[LLM] Calling Gemini API...")
print(f"[LLM] Model: {self.model}")
try:
async with httpx.AsyncClient(timeout=self.config.timeout) as client:
response = await client.post(
endpoint,
headers=headers,
json=payload,
)
if response.status_code != 200:
error_text = response.text
print(f"[LLM] Error response: {error_text}")
raise Exception(f"API Error {response.status_code}: {error_text}")
data = response.json()
content = data["candidates"][0]["content"]["parts"][0]["text"]
print(f"[LLM] Success! Response length: {len(content)} chars")
return content
except httpx.TimeoutException:
raise Exception(f"Request timeout after {self.config.timeout}s")
except httpx.RequestError as e:
raise Exception(f"Request failed: {str(e)}")
def create_llm_client(config: LLMConfig) -> BaseLLMClient:
"""
Factory function to create appropriate LLM client.
Args:
config: LLM configuration
Returns:
Appropriate LLM client instance
"""
print(f"[LLM] Creating client for provider: {config.provider.value}")
if config.provider == LLMProvider.CLAUDE:
return ClaudeClient(config)
elif config.provider == LLMProvider.GEMINI:
return GeminiClient(config)
else:
# OpenAI, Grok, Kimi, Deepseek, Zhipu all use OpenAI-compatible API
return OpenAICompatibleClient(config)
def get_available_providers() -> List[Dict[str, str]]:
"""
Get list of available providers with their display names.
Returns:
List of provider info dicts
"""
return [
{"id": "openai", "name": "OpenAI (ChatGPT)", "env_var": "OPENAI_API_KEY"},
{"id": "gemini", "name": "Google Gemini", "env_var": "GEMINI_API_KEY"},
{"id": "claude", "name": "Anthropic Claude", "env_var": "ANTHROPIC_API_KEY"},
{"id": "grok", "name": "xAI Grok", "env_var": "GROK_API_KEY"},
{"id": "kimi", "name": "Moonshot Kimi", "env_var": "MOONSHOT_API_KEY"},
{"id": "deepseek", "name": "Deepseek", "env_var": "DEEPSEEK_API_KEY"},
{"id": "zhipu", "name": "智谱清言 (GLM)", "env_var": "ZHIPU_API_KEY"},
]
def detect_available_provider() -> Optional[LLMProvider]:
"""
Auto-detect which provider has an API key configured.
Returns:
First available provider, or None
"""
for provider, env_var in API_KEY_ENV_VARS.items():
if os.environ.get(env_var):
print(f"[LLM] Auto-detected provider: {provider.value} (from {env_var})")
return provider
return None
def create_client_from_env(
provider: Optional[LLMProvider] = None
) -> Optional[BaseLLMClient]:
"""
Create LLM client from environment variables.
Args:
provider: Specific provider to use, or None to auto-detect
Returns:
LLM client or None if no API key found
"""
if provider is None:
provider = detect_available_provider()
if provider is None:
return None
env_var = API_KEY_ENV_VARS.get(provider)
api_key = os.environ.get(env_var, "")
if not api_key:
return None
config = LLMConfig(
provider=provider,
api_key=api_key,
)
return create_llm_client(config)
# Test function for debugging
def test_provider(provider_name: str, api_key: str, test_message: str = "Hello, respond with 'OK' if you receive this.") -> Dict[str, Any]:
"""
Test a specific LLM provider.
Args:
provider_name: Provider name (openai, gemini, etc.)
api_key: API key
test_message: Test message to send
Returns:
Dict with success status and response/error
"""
import asyncio
try:
provider = LLMProvider(provider_name.lower())
config = LLMConfig(
provider=provider,
api_key=api_key,
)
client = create_llm_client(config)
messages = [
{"role": "user", "content": test_message}
]
response = asyncio.run(client.chat(messages))
return {
"success": True,
"provider": provider_name,
"model": client.model,
"response": response[:500] if len(response) > 500 else response,
}
except Exception as e:
return {
"success": False,
"provider": provider_name,
"error": str(e),
}