Peterase's picture
feat(rag): implement hybrid search with live sources and production-grade intent classification
a63c61f
from typing import AsyncGenerator
from src.core.ports.llm_port import LlmPort
from langchain_openai import ChatOpenAI
from src.core.config import settings
import json
import logging
logger = logging.getLogger(__name__)
class GroqAdapter(LlmPort):
"""
Groq LLM adapter using the OpenAI-compatible API.
Free tier: ~14,400 RPD on llama-3.3-70b-versatile, 200+ tok/s.
Get your key: https://console.groq.com/keys
"""
GROQ_BASE_URL = "https://api.groq.com/openai/v1"
DEFAULT_MODEL = "llama-3.3-70b-versatile"
def __init__(self):
self.llm = None
if settings.GROQ_API_KEY and settings.GROQ_API_KEY != "your-groq-api-key-here":
try:
self.llm = ChatOpenAI(
api_key=settings.GROQ_API_KEY,
base_url=self.GROQ_BASE_URL,
model=settings.GROQ_MODEL or self.DEFAULT_MODEL,
temperature=0.2,
max_tokens=1024,
)
logger.info(f"βœ… Groq adapter initialized with model: {settings.GROQ_MODEL or self.DEFAULT_MODEL}")
except Exception as e:
logger.error(f"Failed to initialize Groq adapter: {e}")
else:
logger.warning("GROQ_API_KEY not set β€” Groq adapter disabled.")
def _is_rate_limit(self, error_msg: str) -> bool:
return "rate_limit" in error_msg.lower() or "429" in error_msg or "quota" in error_msg.lower()
def generate(self, prompt: str) -> str:
if not self.llm:
return "Groq API key not configured."
try:
return self.llm.invoke(prompt).content
except Exception as e:
error_msg = str(e)
if self._is_rate_limit(error_msg):
logger.warning("Groq rate limit hit β€” trying fallback providers.")
return self._fallback_generate(prompt)
logger.error(f"Groq generate error: {e}")
return f"Error generating response: {error_msg}"
def _fallback_generate(self, prompt: str) -> str:
"""Try Gemini β†’ Together AI β†’ error message."""
if settings.GEMINI_API_KEY and settings.GEMINI_API_KEY != "your-gemini-api-key-here":
try:
from src.infrastructure.adapters.gemini_adapter import GeminiAdapter
result = GeminiAdapter().generate(prompt)
if not result.startswith("Gemini"):
logger.info("Gemini fallback succeeded.")
return result
logger.warning(f"Gemini fallback returned error: {result}")
except Exception as e:
logger.warning(f"Gemini fallback failed: {e}")
# Try Together AI
if settings.TOGETHER_API_KEY and settings.TOGETHER_API_KEY != "your-together-api-key-here":
try:
from langchain_openai import ChatOpenAI
together = ChatOpenAI(
api_key=settings.TOGETHER_API_KEY,
base_url="https://api.together.xyz/v1",
model=settings.TOGETHER_MODEL or "meta-llama/Llama-3.3-70B-Instruct-Turbo",
temperature=0.2,
max_tokens=1024,
)
logger.info("Falling back to Together AI.")
return together.invoke(prompt).content
except Exception as e:
logger.warning(f"Together AI fallback failed: {e}")
# Try HuggingFace Inference API
if settings.HF_TOKEN and settings.HF_TOKEN != "your-hf-token-here":
try:
from src.infrastructure.adapters.huggingface_adapter import HuggingFaceAdapter
result = HuggingFaceAdapter().generate(prompt)
if result and not result.startswith("HuggingFace"):
logger.info("Falling back to HuggingFace.")
return result
logger.warning(f"HuggingFace fallback returned: {result}")
except Exception as e:
logger.warning(f"HuggingFace fallback failed: {e}")
return "All LLM providers are currently unavailable. Please try again in a few minutes."
async def generate_stream(self, prompt: str) -> AsyncGenerator[str, None]:
if not self.llm:
yield f"data: {json.dumps({'token': 'Groq API key not configured.'})}\n\n"
yield "data: [DONE]\n\n"
return
try:
for chunk in self.llm.stream(prompt):
if hasattr(chunk, 'content') and chunk.content:
yield f"data: {json.dumps({'token': chunk.content})}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
error_msg = str(e)
if self._is_rate_limit(error_msg):
logger.warning("Groq rate limit hit during stream β€” falling back.")
fallback_answer = self._fallback_generate(prompt)
yield f"data: {json.dumps({'token': fallback_answer})}\n\n"
else:
yield f"data: {json.dumps({'token': f'Error: {error_msg}'})}\n\n"
yield "data: [DONE]\n\n"