rag-api-node-1 / src /infrastructure /adapters /gemini_adapter.py
Peterase's picture
feat(rag): implement hybrid search with live sources and production-grade intent classification
a63c61f
import json
import logging
import requests
from typing import AsyncGenerator
from src.core.ports.llm_port import LlmPort
from src.core.config import settings
logger = logging.getLogger(__name__)
# Try these in order until one works β€” covers all API key vintages
_MODEL_FALLBACK_CHAIN = [
"gemini-2.0-flash",
"gemini-2.0-flash-lite",
"gemini-1.5-flash",
"gemini-1.5-flash-latest",
"gemini-flash-latest",
"gemini-pro",
]
_BASE = "https://generativelanguage.googleapis.com/v1beta/models"
class GeminiAdapter(LlmPort):
"""
Google Gemini adapter using the native generateContent REST API.
Auto-discovers the first working model for the given API key.
Free tier: 15 RPM, 1M TPM, 1500 RPD β€” https://aistudio.google.com/apikey
"""
def __init__(self):
self.api_key = settings.GEMINI_API_KEY
self.model: str | None = None
if not self.api_key or self.api_key == "your-gemini-api-key-here":
self.api_key = None
logger.warning("GEMINI_API_KEY not set β€” Gemini adapter disabled.")
return
# Prefer whatever is explicitly configured β€” but reject obvious non-model values
configured = (settings.GEMINI_MODEL or "").strip()
# Reject if it looks like an API key (starts with known prefixes or is too long)
if configured and (
configured.startswith("key_") or
configured.startswith("gsk_") or
configured.startswith("AIza") or
len(configured) > 60
):
logger.warning(f"GEMINI_MODEL='{configured}' looks like an API key, ignoring it.")
configured = ""
chain = ([configured] if configured else []) + _MODEL_FALLBACK_CHAIN
for model in chain:
url = f"{_BASE}/{model}:generateContent"
try:
resp = requests.post(
url,
params={"key": self.api_key},
json={"contents": [{"parts": [{"text": "hi"}]}]},
timeout=10,
)
if resp.status_code == 200:
self.model = model
logger.info(f"βœ… Gemini adapter ready β€” model: {self.model}")
break
elif resp.status_code == 404:
logger.debug(f"Gemini model {model} not available (404), trying next...")
continue
else:
# 429 rate-limit etc β€” model exists, use it
self.model = model
logger.info(f"βœ… Gemini adapter ready β€” model: {self.model} (status {resp.status_code})")
break
except Exception as e:
logger.debug(f"Gemini probe failed for {model}: {e}")
continue
if not self.model:
logger.error("❌ No working Gemini model found for this API key.")
def _url(self) -> str:
return f"{_BASE}/{self.model}:generateContent"
def _call(self, prompt: str) -> str:
resp = requests.post(
self._url(),
params={"key": self.api_key},
json={"contents": [{"parts": [{"text": prompt}]}]},
timeout=60,
)
resp.raise_for_status()
return resp.json()["candidates"][0]["content"]["parts"][0]["text"]
def generate(self, prompt: str) -> str:
if not self.api_key or not self.model:
return "Gemini not available."
try:
return self._call(prompt)
except requests.HTTPError as e:
code = e.response.status_code if e.response else 0
if code == 429:
return "Gemini rate limit reached. Please try again shortly."
logger.error(f"Gemini HTTP error: {e}")
return f"Gemini error: {e}"
except Exception as e:
logger.error(f"Gemini generate error: {e}")
return f"Gemini error: {e}"
async def generate_stream(self, prompt: str) -> AsyncGenerator[str, None]:
if not self.api_key or not self.model:
yield f"data: {json.dumps({'token': 'Gemini not available.'})}\n\n"
yield "data: [DONE]\n\n"
return
try:
answer = self._call(prompt)
yield f"data: {json.dumps({'token': answer})}\n\n"
yield "data: [DONE]\n\n"
except requests.HTTPError as e:
code = e.response.status_code if e.response else 0
msg = "Gemini rate limit reached." if code == 429 else f"Gemini error: {e}"
yield f"data: {json.dumps({'token': msg})}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: {json.dumps({'token': f'Gemini error: {e}'})}\n\n"
yield "data: [DONE]\n\n"