Spaces:
Sleeping
Sleeping
| import asyncio | |
| import logging | |
| import time | |
| from collections.abc import AsyncGenerator | |
| import google.generativeai as genai | |
| from app.config import get_settings | |
| logger = logging.getLogger(__name__) | |
| class GeminiService: | |
| def __init__(self, api_key: str, model_name: str, rpm_limit: int = 15): | |
| genai.configure(api_key=api_key) | |
| self.model = genai.GenerativeModel(model_name) | |
| self.model_name = model_name | |
| self._min_interval = 60.0 / rpm_limit | |
| self._last_call_time = 0.0 | |
| logger.info(f"Initialized Gemini '{model_name}' (RPM limit: {rpm_limit})") | |
| def _wait_for_rate_limit(self) -> None: | |
| now = time.time() | |
| elapsed = now - self._last_call_time | |
| if elapsed < self._min_interval: | |
| wait = self._min_interval - elapsed | |
| logger.debug(f"Rate limiting: waiting {wait:.1f}s") | |
| time.sleep(wait) | |
| self._last_call_time = time.time() | |
| async def _async_wait_for_rate_limit(self) -> None: | |
| now = time.time() | |
| elapsed = now - self._last_call_time | |
| if elapsed < self._min_interval: | |
| wait = self._min_interval - elapsed | |
| logger.debug(f"Rate limiting: waiting {wait:.1f}s") | |
| await asyncio.sleep(wait) | |
| self._last_call_time = time.time() | |
| def generate(self, prompt: str, temperature: float = 0.3, max_tokens: int = 2048) -> str: | |
| self._wait_for_rate_limit() | |
| try: | |
| response = self.model.generate_content( | |
| prompt, | |
| generation_config=genai.types.GenerationConfig( | |
| temperature=temperature, | |
| max_output_tokens=max_tokens, | |
| ), | |
| ) | |
| return response.text | |
| except Exception as e: | |
| logger.error(f"Gemini generation failed: {e}") | |
| raise | |
| async def generate_stream( | |
| self, prompt: str, temperature: float = 0.3, max_tokens: int = 2048 | |
| ) -> AsyncGenerator[str, None]: | |
| await self._async_wait_for_rate_limit() | |
| try: | |
| response = self.model.generate_content( | |
| prompt, | |
| generation_config=genai.types.GenerationConfig( | |
| temperature=temperature, | |
| max_output_tokens=max_tokens, | |
| ), | |
| stream=True, | |
| ) | |
| for chunk in response: | |
| if chunk.text: | |
| yield chunk.text | |
| except Exception as e: | |
| logger.error(f"Gemini streaming failed: {e}") | |
| raise | |
| _llm: GeminiService | None = None | |
| def get_llm() -> GeminiService: | |
| global _llm | |
| if _llm is None: | |
| settings = get_settings() | |
| _llm = GeminiService( | |
| api_key=settings.gemini_api_key, | |
| model_name=settings.gemini_model, | |
| rpm_limit=settings.gemini_rpm_limit, | |
| ) | |
| return _llm | |