File size: 4,833 Bytes
a63c61f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import json
import logging
import requests
from typing import AsyncGenerator

from src.core.ports.llm_port import LlmPort
from src.core.config import settings

logger = logging.getLogger(__name__)

# Try these in order until one works β€” covers all API key vintages
_MODEL_FALLBACK_CHAIN = [
    "gemini-2.0-flash",
    "gemini-2.0-flash-lite",
    "gemini-1.5-flash",
    "gemini-1.5-flash-latest",
    "gemini-flash-latest",
    "gemini-pro",
]

_BASE = "https://generativelanguage.googleapis.com/v1beta/models"


class GeminiAdapter(LlmPort):
    """
    Google Gemini adapter using the native generateContent REST API.
    Auto-discovers the first working model for the given API key.
    Free tier: 15 RPM, 1M TPM, 1500 RPD β€” https://aistudio.google.com/apikey
    """

    def __init__(self):
        self.api_key = settings.GEMINI_API_KEY
        self.model: str | None = None

        if not self.api_key or self.api_key == "your-gemini-api-key-here":
            self.api_key = None
            logger.warning("GEMINI_API_KEY not set β€” Gemini adapter disabled.")
            return

        # Prefer whatever is explicitly configured β€” but reject obvious non-model values
        configured = (settings.GEMINI_MODEL or "").strip()
        # Reject if it looks like an API key (starts with known prefixes or is too long)
        if configured and (
            configured.startswith("key_") or
            configured.startswith("gsk_") or
            configured.startswith("AIza") or
            len(configured) > 60
        ):
            logger.warning(f"GEMINI_MODEL='{configured}' looks like an API key, ignoring it.")
            configured = ""

        chain = ([configured] if configured else []) + _MODEL_FALLBACK_CHAIN

        for model in chain:
            url = f"{_BASE}/{model}:generateContent"
            try:
                resp = requests.post(
                    url,
                    params={"key": self.api_key},
                    json={"contents": [{"parts": [{"text": "hi"}]}]},
                    timeout=10,
                )
                if resp.status_code == 200:
                    self.model = model
                    logger.info(f"βœ… Gemini adapter ready β€” model: {self.model}")
                    break
                elif resp.status_code == 404:
                    logger.debug(f"Gemini model {model} not available (404), trying next...")
                    continue
                else:
                    # 429 rate-limit etc β€” model exists, use it
                    self.model = model
                    logger.info(f"βœ… Gemini adapter ready β€” model: {self.model} (status {resp.status_code})")
                    break
            except Exception as e:
                logger.debug(f"Gemini probe failed for {model}: {e}")
                continue

        if not self.model:
            logger.error("❌ No working Gemini model found for this API key.")

    def _url(self) -> str:
        return f"{_BASE}/{self.model}:generateContent"

    def _call(self, prompt: str) -> str:
        resp = requests.post(
            self._url(),
            params={"key": self.api_key},
            json={"contents": [{"parts": [{"text": prompt}]}]},
            timeout=60,
        )
        resp.raise_for_status()
        return resp.json()["candidates"][0]["content"]["parts"][0]["text"]

    def generate(self, prompt: str) -> str:
        if not self.api_key or not self.model:
            return "Gemini not available."
        try:
            return self._call(prompt)
        except requests.HTTPError as e:
            code = e.response.status_code if e.response else 0
            if code == 429:
                return "Gemini rate limit reached. Please try again shortly."
            logger.error(f"Gemini HTTP error: {e}")
            return f"Gemini error: {e}"
        except Exception as e:
            logger.error(f"Gemini generate error: {e}")
            return f"Gemini error: {e}"

    async def generate_stream(self, prompt: str) -> AsyncGenerator[str, None]:
        if not self.api_key or not self.model:
            yield f"data: {json.dumps({'token': 'Gemini not available.'})}\n\n"
            yield "data: [DONE]\n\n"
            return
        try:
            answer = self._call(prompt)
            yield f"data: {json.dumps({'token': answer})}\n\n"
            yield "data: [DONE]\n\n"
        except requests.HTTPError as e:
            code = e.response.status_code if e.response else 0
            msg = "Gemini rate limit reached." if code == 429 else f"Gemini error: {e}"
            yield f"data: {json.dumps({'token': msg})}\n\n"
            yield "data: [DONE]\n\n"
        except Exception as e:
            yield f"data: {json.dumps({'token': f'Gemini error: {e}'})}\n\n"
            yield "data: [DONE]\n\n"