| from google.generativeai.embedding import embed_content |
| from google.generativeai.client import configure |
| from google.generativeai.generative_models import GenerativeModel |
| from loguru import logger |
| from .request_limit_manager import RequestLimitManager |
| from typing import List, Optional |
|
|
| class GeminiClient: |
| def __init__(self): |
| self.limit_manager = RequestLimitManager("gemini") |
|
|
| def generate_text(self, prompt: str, **kwargs) -> str: |
| last_error = None |
| for key, model in self.limit_manager.iterate_key_model(): |
| try: |
| configure(api_key=key) |
| _model = GenerativeModel(model) |
| response = _model.generate_content(prompt, **kwargs) |
| self.limit_manager.log_request(key, model, success=True) |
| if hasattr(response, 'usage_metadata'): |
| logger.info(f"[GEMINI][USAGE] Prompt Token Count: {response.usage_metadata.prompt_token_count} - Candidate Token Count: {response.usage_metadata.candidates_token_count} - Total Token Count: {response.usage_metadata.total_token_count}") |
| if hasattr(response, 'text'): |
| logger.info(f"[GEMINI][TEXT_RESPONSE] {response.text}") |
| return response.text |
| elif hasattr(response, 'candidates') and response.candidates: |
| logger.info(f"[GEMINI][CANDIDATES_RESPONSE] {response.candidates[0].content.parts[0].text}") |
| return response.candidates[0].content.parts[0].text |
| logger.info(f"[GEMINI][RAW_RESPONSE] {response}") |
| return str(response) |
| except Exception as e: |
| import re |
| msg = str(e) |
| if "429" in msg or "rate limit" in msg.lower(): |
| retry_delay = 60 |
| m = re.search(r'retry_delay.*?seconds: (\d+)', msg) |
| if m: |
| retry_delay = int(m.group(1)) |
| self.limit_manager.log_request(key, model, success=False, retry_delay=retry_delay) |
| last_error = e |
| continue |
| raise last_error or RuntimeError("No available Gemini API key/model") |
|
|
| def count_tokens(self, prompt: str) -> int: |
| for key, model in self.limit_manager.iterate_key_model(): |
| try: |
| configure(api_key=key) |
| _model = GenerativeModel(model) |
| return _model.count_tokens(prompt).total_tokens |
| except Exception: |
| continue |
| return 0 |
|
|
| def create_embedding(self, text: str, model: Optional[str] = None) -> list: |
| last_error = None |
| for key, m in self.limit_manager.iterate_key_model(): |
| m = m or "" |
| use_model = model if model not in (None, "") else m |
| if not use_model: |
| continue |
| use_model = str(use_model) |
| try: |
| configure(api_key=key) |
| response = embed_content( |
| model=use_model, |
| content=text, |
| task_type="retrieval_query" |
| ) |
| self.limit_manager.log_request(key, use_model, success=True) |
| logger.info(f"[GEMINI][EMBEDDING][RAW_RESPONSE] {response['embedding'][:10]} ..... {response['embedding'][-10:]}") |
| return response['embedding'] |
| except Exception as e: |
| import re |
| msg = str(e) |
| if "429" in msg or "rate limit" in msg.lower(): |
| retry_delay = 60 |
| m_retry = re.search(r'retry_delay.*?seconds: (\d+)', msg) |
| if m_retry: |
| retry_delay = int(m_retry.group(1)) |
| self.limit_manager.log_request(key, use_model, success=False, retry_delay=retry_delay) |
| last_error = e |
| continue |
| raise last_error or RuntimeError("No available Gemini API key/model") |