from typing import AsyncGenerator from src.core.ports.llm_port import LlmPort from langchain_openai import ChatOpenAI from src.core.config import settings import json import logging logger = logging.getLogger(__name__) class GroqAdapter(LlmPort): """ Groq LLM adapter using the OpenAI-compatible API. Free tier: ~14,400 RPD on llama-3.3-70b-versatile, 200+ tok/s. Get your key: https://console.groq.com/keys """ GROQ_BASE_URL = "https://api.groq.com/openai/v1" DEFAULT_MODEL = "llama-3.3-70b-versatile" def __init__(self): self.llm = None if settings.GROQ_API_KEY and settings.GROQ_API_KEY != "your-groq-api-key-here": try: self.llm = ChatOpenAI( api_key=settings.GROQ_API_KEY, base_url=self.GROQ_BASE_URL, model=settings.GROQ_MODEL or self.DEFAULT_MODEL, temperature=0.2, max_tokens=1024, ) logger.info(f"✅ Groq adapter initialized with model: {settings.GROQ_MODEL or self.DEFAULT_MODEL}") except Exception as e: logger.error(f"Failed to initialize Groq adapter: {e}") else: logger.warning("GROQ_API_KEY not set — Groq adapter disabled.") def _is_rate_limit(self, error_msg: str) -> bool: return "rate_limit" in error_msg.lower() or "429" in error_msg or "quota" in error_msg.lower() def generate(self, prompt: str) -> str: if not self.llm: return "Groq API key not configured." try: return self.llm.invoke(prompt).content except Exception as e: error_msg = str(e) if self._is_rate_limit(error_msg): logger.warning("Groq rate limit hit — trying fallback providers.") return self._fallback_generate(prompt) logger.error(f"Groq generate error: {e}") return f"Error generating response: {error_msg}" def _fallback_generate(self, prompt: str) -> str: """Try Gemini → Together AI → error message.""" if settings.GEMINI_API_KEY and settings.GEMINI_API_KEY != "your-gemini-api-key-here": try: from src.infrastructure.adapters.gemini_adapter import GeminiAdapter result = GeminiAdapter().generate(prompt) if not result.startswith("Gemini"): logger.info("Gemini fallback succeeded.") return result logger.warning(f"Gemini fallback returned error: {result}") except Exception as e: logger.warning(f"Gemini fallback failed: {e}") # Try Together AI if settings.TOGETHER_API_KEY and settings.TOGETHER_API_KEY != "your-together-api-key-here": try: from langchain_openai import ChatOpenAI together = ChatOpenAI( api_key=settings.TOGETHER_API_KEY, base_url="https://api.together.xyz/v1", model=settings.TOGETHER_MODEL or "meta-llama/Llama-3.3-70B-Instruct-Turbo", temperature=0.2, max_tokens=1024, ) logger.info("Falling back to Together AI.") return together.invoke(prompt).content except Exception as e: logger.warning(f"Together AI fallback failed: {e}") # Try HuggingFace Inference API if settings.HF_TOKEN and settings.HF_TOKEN != "your-hf-token-here": try: from src.infrastructure.adapters.huggingface_adapter import HuggingFaceAdapter result = HuggingFaceAdapter().generate(prompt) if result and not result.startswith("HuggingFace"): logger.info("Falling back to HuggingFace.") return result logger.warning(f"HuggingFace fallback returned: {result}") except Exception as e: logger.warning(f"HuggingFace fallback failed: {e}") return "All LLM providers are currently unavailable. Please try again in a few minutes." async def generate_stream(self, prompt: str) -> AsyncGenerator[str, None]: if not self.llm: yield f"data: {json.dumps({'token': 'Groq API key not configured.'})}\n\n" yield "data: [DONE]\n\n" return try: for chunk in self.llm.stream(prompt): if hasattr(chunk, 'content') and chunk.content: yield f"data: {json.dumps({'token': chunk.content})}\n\n" yield "data: [DONE]\n\n" except Exception as e: error_msg = str(e) if self._is_rate_limit(error_msg): logger.warning("Groq rate limit hit during stream — falling back.") fallback_answer = self._fallback_generate(prompt) yield f"data: {json.dumps({'token': fallback_answer})}\n\n" else: yield f"data: {json.dumps({'token': f'Error: {error_msg}'})}\n\n" yield "data: [DONE]\n\n"