| |
| """ Manages interactions with all external LLM and search APIs. """ |
|
|
| import os |
| import logging |
| from typing import Dict, Any, Generator, List |
|
|
| from dotenv import load_dotenv |
| from huggingface_hub import InferenceClient |
| from tavily import TavilyClient |
| from groq import Groq |
| import fireworks.client as Fireworks |
| import openai |
| import google.generativeai as genai |
|
|
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| load_dotenv() |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| FIREWORKS_API_KEY = os.getenv("FIREWORKS_API_KEY") |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
| DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY") |
|
|
| Messages = List[Dict[str, Any]] |
|
|
| class LLMService: |
| """A multi-provider wrapper for LLM Inference APIs.""" |
| def __init__(self): |
| self.hf_client = InferenceClient(token=HF_TOKEN) if HF_TOKEN else None |
| self.groq_client = Groq(api_key=GROQ_API_KEY) if GROQ_API_KEY else None |
| self.openai_client = openai.OpenAI(api_key=OPENAI_API_KEY) if OPENAI_API_KEY else None |
| |
| if DEEPSEEK_API_KEY: |
| self.deepseek_client = openai.OpenAI(api_key=DEEPSEEK_API_KEY, base_url="https://api.deepseek.com/v1") |
| else: |
| self.deepseek_client = None |
|
|
| if FIREWORKS_API_KEY: |
| Fireworks.api_key = FIREWORKS_API_KEY |
| self.fireworks_client = Fireworks |
| else: |
| self.fireworks_client = None |
|
|
| if GEMINI_API_KEY: |
| genai.configure(api_key=GEMINI_API_KEY) |
| self.gemini_model = genai.GenerativeModel('gemini-1.5-pro-latest') |
| else: |
| self.gemini_model = None |
|
|
| def _prepare_messages_for_gemini(self, messages: Messages) -> List[Dict[str, Any]]: |
| gemini_messages = [] |
| for msg in messages: |
| if msg['role'] == 'system': continue |
| role = 'model' if msg['role'] == 'assistant' else 'user' |
| gemini_messages.append({'role': role, 'parts': [msg['content']]}) |
| return gemini_messages |
|
|
| def generate_code_stream(self, model_id: str, messages: Messages, max_tokens: int = 8192) -> Generator[str, None, None]: |
| provider, model_name = model_id.split('/', 1) |
| logging.info(f"Dispatching to provider: {provider} for model: {model_name}") |
|
|
| try: |
| if provider in ['openai', 'groq', 'deepseek', 'fireworks']: |
| client_map = {'openai': self.openai_client, 'groq': self.groq_client, 'deepseek': self.deepseek_client, 'fireworks': self.fireworks_client.ChatCompletion if self.fireworks_client else None} |
| client = client_map.get(provider) |
| if not client: raise ValueError(f"{provider.capitalize()} API key not configured.") |
| |
| stream = client.create(model=model_name, messages=messages, stream=True, max_tokens=max_tokens) if provider == 'fireworks' else client.chat.completions.create(model=model_name, messages=messages, stream=True, max_tokens=max_tokens) |
| for chunk in stream: |
| if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: yield chunk.choices[0].delta.content |
| |
| elif provider == 'gemini': |
| if not self.gemini_model: raise ValueError("Gemini API key not configured.") |
| system_prompt = next((msg['content'] for msg in messages if msg['role'] == 'system'), "") |
| gemini_messages = self._prepare_messages_for_gemini(messages) |
| |
| if system_prompt and gemini_messages and gemini_messages[0]['role'] == 'user': |
| gemini_messages[0]['parts'][0] = f"{system_prompt}\n\n{gemini_messages[0]['parts'][0]}" |
| stream = self.gemini_model.generate_content(gemini_messages, stream=True) |
| for chunk in stream: yield chunk.text |
|
|
| elif provider == 'huggingface': |
| if not self.hf_client: raise ValueError("Hugging Face API token not configured.") |
| hf_model_id = model_id.split('/', 1)[1] |
| stream = self.hf_client.chat_completion(model=hf_model_id, messages=messages, stream=True, max_tokens=max_tokens) |
| for chunk in stream: |
| if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: yield chunk.choices[0].delta.content |
| else: |
| raise ValueError(f"Unknown provider: {provider}") |
| except Exception as e: |
| logging.error(f"LLM API Error with provider {provider}: {e}") |
| yield f"Error from {provider.capitalize()}: {str(e)}" |
|
|
| class SearchService: |
| def __init__(self, api_key: str = TAVILY_API_KEY): |
| self.client = TavilyClient(api_key=api_key) if api_key else None |
| if not self.client: logging.warning("TAVILY_API_KEY not set. Web search will be disabled.") |
| def is_available(self) -> bool: return self.client is not None |
| def search(self, query: str, max_results: int = 5) -> str: |
| if not self.is_available(): return "Web search is not available." |
| try: |
| response = self.client.search(query, search_depth="advanced", max_results=min(max(1, max_results), 10)) |
| return "Web Search Results:\n\n" + "\n---\n".join([f"Title: {res.get('title', 'N/A')}\nURL: {res.get('url', 'N/A')}\nContent: {res.get('content', 'N/A')}" for res in response.get('results', [])]) |
| except Exception as e: return f"Search error: {str(e)}" |
|
|
| llm_service = LLMService() |
| search_service = SearchService() |