| | """LLM client β supports Claude API, Groq, and Ollama (local LLMs) with model routing and cost tracking.""" |
| |
|
| | from __future__ import annotations |
| |
|
| | import asyncio |
| | import json |
| | import logging |
| | import os |
| | import time |
| | from dataclasses import dataclass, field |
| | from typing import Optional |
| |
|
| | import httpx |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | PROVIDER_CLAUDE = "claude" |
| | PROVIDER_OLLAMA = "ollama" |
| | PROVIDER_GROQ = "groq" |
| | PROVIDER_GEMINI = "gemini" |
| | PROVIDER_HF = "hf" |
| | PROVIDER_NN = "nn" |
| |
|
| | |
| | MODEL_SONNET = "claude-sonnet-4-5-20250929" |
| | MODEL_HAIKU = "claude-haiku-4-5-20251001" |
| |
|
| | |
| | MODEL_LLAMA = "llama3.1:8b" |
| | MODEL_LLAMA_SMALL = "llama3.1:8b" |
| | MODEL_MISTRAL = "mistral" |
| | MODEL_QWEN = "qwen2.5" |
| | MODEL_GEMMA = "gemma2" |
| |
|
| | |
| | MODEL_GROQ_LLAMA_8B = "llama-3.1-8b-instant" |
| | MODEL_GROQ_LLAMA_70B = "llama-3.3-70b-versatile" |
| | MODEL_GROQ_MIXTRAL = "mixtral-8x7b-32768" |
| |
|
| | |
| | |
| | |
| | MODEL_GEMINI_FLASH = "gemini-2.0-flash-lite" |
| | MODEL_GEMINI_FLASH_FALLBACK = "gemini-2.0-flash-001" |
| | MODEL_GEMINI_PRO = "gemini-1.5-pro" |
| |
|
| | |
| | _GEMINI_FALLBACK_CHAIN: dict[str, str] = { |
| | "gemini-2.0-flash": "gemini-2.0-flash-lite", |
| | "gemini-2.0-flash-exp": "gemini-2.0-flash-lite", |
| | "gemini-2.0-flash-001": "gemini-2.0-flash-lite", |
| | "gemini-2.0-flash-lite": MODEL_GEMINI_FLASH_FALLBACK, |
| | |
| | } |
| |
|
| | |
| | _GEMINI_MODEL_UNAVAILABLE_KWS = ( |
| | "not found", "not supported", "invalid argument", |
| | "does not exist", "unavailable", "serverless", |
| | ) |
| |
|
| | |
| | MODEL_NN_SOCI = "RayMelius/soci-agent-nn" |
| |
|
| | |
| | MODEL_OLLAMA_SOCI = "soci-agent-7b" |
| |
|
| | |
| | COST_PER_1M = { |
| | MODEL_SONNET: {"input": 3.0, "output": 15.0}, |
| | MODEL_HAIKU: {"input": 0.80, "output": 4.0}, |
| | MODEL_GROQ_LLAMA_8B: {"input": 0.05, "output": 0.08}, |
| | MODEL_GROQ_LLAMA_70B: {"input": 0.59, "output": 0.79}, |
| | MODEL_GROQ_MIXTRAL: {"input": 0.24, "output": 0.24}, |
| | } |
| |
|
| |
|
| | @dataclass |
| | class LLMUsage: |
| | """Tracks API usage and costs.""" |
| |
|
| | total_calls: int = 0 |
| | total_input_tokens: int = 0 |
| | total_output_tokens: int = 0 |
| | calls_by_model: dict[str, int] = field(default_factory=dict) |
| | tokens_by_model: dict[str, dict[str, int]] = field(default_factory=dict) |
| |
|
| | def record(self, model: str, input_tokens: int, output_tokens: int) -> None: |
| | self.total_calls += 1 |
| | self.total_input_tokens += input_tokens |
| | self.total_output_tokens += output_tokens |
| | self.calls_by_model[model] = self.calls_by_model.get(model, 0) + 1 |
| | if model not in self.tokens_by_model: |
| | self.tokens_by_model[model] = {"input": 0, "output": 0} |
| | self.tokens_by_model[model]["input"] += input_tokens |
| | self.tokens_by_model[model]["output"] += output_tokens |
| |
|
| | @property |
| | def estimated_cost_usd(self) -> float: |
| | total = 0.0 |
| | for model, tokens in self.tokens_by_model.items(): |
| | costs = COST_PER_1M.get(model, {"input": 0.0, "output": 0.0}) |
| | total += tokens["input"] / 1_000_000 * costs["input"] |
| | total += tokens["output"] / 1_000_000 * costs["output"] |
| | return total |
| |
|
| | def summary(self) -> str: |
| | lines = [ |
| | f"Total API calls: {self.total_calls}", |
| | f"Total tokens: {self.total_input_tokens:,} in / {self.total_output_tokens:,} out", |
| | f"Estimated cost: ${self.estimated_cost_usd:.4f}", |
| | ] |
| | for model, count in self.calls_by_model.items(): |
| | short = model.split("-")[1] if "-" in model else model |
| | lines.append(f" {short}: {count} calls") |
| | return "\n".join(lines) |
| |
|
| |
|
| | def _parse_json_response(text: str) -> dict: |
| | """Extract JSON from an LLM response, handling markdown blocks and extra text.""" |
| | text = text.strip() |
| | if not text: |
| | return {} |
| | |
| | if text.startswith("```"): |
| | lines = text.split("\n") |
| | text = "\n".join(lines[1:-1]) if len(lines) > 2 else text |
| | text = text.strip() |
| | try: |
| | return json.loads(text) |
| | except json.JSONDecodeError: |
| | |
| | start = text.find("{") |
| | end = text.rfind("}") + 1 |
| | if start >= 0 and end > start: |
| | try: |
| | return json.loads(text[start:end]) |
| | except json.JSONDecodeError: |
| | pass |
| | logger.warning(f"Failed to parse JSON from LLM response: {text[:200]}") |
| | return {} |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class ClaudeClient: |
| | """Wrapper around the Anthropic Claude API.""" |
| |
|
| | def __init__( |
| | self, |
| | api_key: Optional[str] = None, |
| | default_model: str = MODEL_HAIKU, |
| | max_retries: int = 3, |
| | ) -> None: |
| | import anthropic |
| | self.api_key = api_key or os.environ.get("ANTHROPIC_API_KEY", "") |
| | if not self.api_key: |
| | raise ValueError( |
| | "ANTHROPIC_API_KEY not set. Copy .env.example to .env and add your key." |
| | ) |
| | self.client = anthropic.Anthropic(api_key=self.api_key) |
| | self.default_model = default_model |
| | self.max_retries = max_retries |
| | self.usage = LLMUsage() |
| | self.provider = PROVIDER_CLAUDE |
| | self._rate_limited_until: float = 0.0 |
| |
|
| | async def complete( |
| | self, |
| | system: str, |
| | user_message: str, |
| | model: Optional[str] = None, |
| | temperature: float = 0.7, |
| | max_tokens: int = 1024, |
| | ) -> str: |
| | import anthropic |
| | model = model or self.default_model |
| |
|
| | for attempt in range(self.max_retries): |
| | try: |
| | response = self.client.messages.create( |
| | model=model, |
| | max_tokens=max_tokens, |
| | temperature=temperature, |
| | system=system, |
| | messages=[{"role": "user", "content": user_message}], |
| | ) |
| | self.usage.record( |
| | model=model, |
| | input_tokens=response.usage.input_tokens, |
| | output_tokens=response.usage.output_tokens, |
| | ) |
| | return response.content[0].text |
| |
|
| | except anthropic.RateLimitError: |
| | wait = 2 ** attempt |
| | self._rate_limited_until = time.monotonic() + wait |
| | logger.warning(f"Rate limited, waiting {wait}s (attempt {attempt + 1})") |
| | time.sleep(wait) |
| | except anthropic.APIError as e: |
| | logger.error(f"API error: {e}") |
| | if attempt == self.max_retries - 1: |
| | raise |
| | time.sleep(1) |
| | self._rate_limited_until = time.monotonic() + 60 |
| | return "" |
| |
|
| | @property |
| | def llm_status(self) -> str: |
| | if time.monotonic() < self._rate_limited_until: |
| | return "limited" |
| | return "active" |
| |
|
| | async def complete_json( |
| | self, |
| | system: str, |
| | user_message: str, |
| | model: Optional[str] = None, |
| | temperature: float = 0.7, |
| | max_tokens: int = 1024, |
| | ) -> dict: |
| | json_instruction = ( |
| | "\n\nRespond ONLY with valid JSON. No markdown, no explanation, no extra text. " |
| | "Just the JSON object." |
| | ) |
| | text = await self.complete( |
| | system=system, |
| | user_message=user_message + json_instruction, |
| | model=model, |
| | temperature=temperature, |
| | max_tokens=max_tokens, |
| | ) |
| | return _parse_json_response(text) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class OllamaClient: |
| | """Wrapper around Ollama's local API for running open-source LLMs. |
| | |
| | Ollama serves models locally at http://localhost:11434. |
| | Install: https://ollama.com |
| | Pull a model: ollama pull llama3.1 |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | base_url: str = "http://localhost:11434", |
| | default_model: str = MODEL_OLLAMA_SOCI, |
| | max_retries: int = 2, |
| | ) -> None: |
| | self.base_url = base_url.rstrip("/") |
| | self.default_model = default_model |
| | self.max_retries = max_retries |
| | self.usage = LLMUsage() |
| | self.provider = PROVIDER_OLLAMA |
| | self._http = httpx.AsyncClient(timeout=180.0) |
| | self._last_error: float = 0.0 |
| |
|
| | @property |
| | def llm_status(self) -> str: |
| | if time.monotonic() - self._last_error < 30: |
| | return "limited" |
| | return "active" |
| |
|
| | async def complete( |
| | self, |
| | system: str, |
| | user_message: str, |
| | model: Optional[str] = None, |
| | temperature: float = 0.7, |
| | max_tokens: int = 1024, |
| | ) -> str: |
| | """Send a message to the local Ollama model (async).""" |
| | model = model or self.default_model |
| | model = self._map_model(model) |
| |
|
| | payload = { |
| | "model": model, |
| | "messages": [ |
| | {"role": "system", "content": system}, |
| | {"role": "user", "content": user_message}, |
| | ], |
| | "stream": False, |
| | "options": { |
| | "temperature": temperature, |
| | "num_predict": max_tokens, |
| | }, |
| | } |
| |
|
| | for attempt in range(self.max_retries): |
| | try: |
| | response = await self._http.post( |
| | f"{self.base_url}/api/chat", |
| | json=payload, |
| | ) |
| | response.raise_for_status() |
| | data = response.json() |
| |
|
| | input_tokens = data.get("prompt_eval_count", 0) |
| | output_tokens = data.get("eval_count", 0) |
| | self.usage.record(model, input_tokens, output_tokens) |
| |
|
| | return data.get("message", {}).get("content", "") |
| |
|
| | except httpx.ConnectError: |
| | self._last_error = time.monotonic() |
| | msg = ( |
| | f"Cannot connect to Ollama at {self.base_url}. " |
| | "Make sure Ollama is running: 'ollama serve'" |
| | ) |
| | logger.error(msg) |
| | if attempt == self.max_retries - 1: |
| | raise ConnectionError(msg) |
| | await asyncio.sleep(1) |
| | except httpx.HTTPStatusError as e: |
| | if e.response.status_code == 404: |
| | msg = ( |
| | f"Model '{model}' not found in Ollama. " |
| | f"Pull it first: 'ollama pull {model}'" |
| | ) |
| | logger.error(msg) |
| | raise ValueError(msg) |
| | logger.error(f"Ollama API error: {e}") |
| | if attempt == self.max_retries - 1: |
| | raise |
| | await asyncio.sleep(1) |
| | except Exception as e: |
| | logger.error(f"Ollama error: {e}") |
| | if attempt == self.max_retries - 1: |
| | raise |
| | await asyncio.sleep(1) |
| | return "" |
| |
|
| | async def complete_json( |
| | self, |
| | system: str, |
| | user_message: str, |
| | model: Optional[str] = None, |
| | temperature: float = 0.7, |
| | max_tokens: int = 1024, |
| | ) -> dict: |
| | """Send a JSON-mode request to Ollama (async, uses native format: json).""" |
| | model = model or self.default_model |
| | model = self._map_model(model) |
| |
|
| | json_instruction = ( |
| | "\n\nRespond ONLY with valid JSON. No markdown, no explanation, no extra text. " |
| | "Just the JSON object." |
| | ) |
| |
|
| | payload = { |
| | "model": model, |
| | "messages": [ |
| | {"role": "system", "content": system}, |
| | {"role": "user", "content": user_message + json_instruction}, |
| | ], |
| | "stream": False, |
| | "format": "json", |
| | "options": { |
| | "temperature": temperature, |
| | "num_predict": max_tokens, |
| | }, |
| | } |
| |
|
| | for attempt in range(self.max_retries): |
| | try: |
| | response = await self._http.post( |
| | f"{self.base_url}/api/chat", |
| | json=payload, |
| | ) |
| | response.raise_for_status() |
| | data = response.json() |
| |
|
| | input_tokens = data.get("prompt_eval_count", 0) |
| | output_tokens = data.get("eval_count", 0) |
| | self.usage.record(model, input_tokens, output_tokens) |
| |
|
| | text = data.get("message", {}).get("content", "") |
| | return _parse_json_response(text) |
| |
|
| | except httpx.ConnectError: |
| | logger.error(f"Cannot connect to Ollama at {self.base_url}") |
| | if attempt == self.max_retries - 1: |
| | return {} |
| | await asyncio.sleep(1) |
| | except Exception as e: |
| | logger.error(f"Ollama JSON error: {e}") |
| | if attempt == self.max_retries - 1: |
| | return {} |
| | await asyncio.sleep(1) |
| | return {} |
| |
|
| | def _map_model(self, model: str) -> str: |
| | """Map Claude model names to Ollama equivalents so existing code works.""" |
| | mapping = { |
| | MODEL_SONNET: self.default_model, |
| | MODEL_HAIKU: self.default_model, |
| | } |
| | return mapping.get(model, model) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class GroqClient: |
| | """Wrapper around the Groq API for fast cloud inference. |
| | |
| | Groq provides extremely fast inference (~500 tok/s) with parallel request support. |
| | Free tier: 30 requests/min on llama-3.1-8b-instant. |
| | Sign up: https://console.groq.com |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | api_key: Optional[str] = None, |
| | default_model: str = MODEL_GROQ_LLAMA_8B, |
| | max_retries: int = 3, |
| | max_rpm: int = 28, |
| | ) -> None: |
| | self.api_key = api_key or os.environ.get("GROQ_API_KEY", "") |
| | if not self.api_key: |
| | raise ValueError( |
| | "GROQ_API_KEY not set. Get a free key at https://console.groq.com" |
| | ) |
| | self.default_model = default_model |
| | self.max_retries = max_retries |
| | self.usage = LLMUsage() |
| | self.provider = PROVIDER_GROQ |
| | self._last_error: str = "" |
| | self._http = httpx.AsyncClient( |
| | base_url="https://api.groq.com/openai/v1", |
| | headers={ |
| | "Authorization": f"Bearer {self.api_key}", |
| | "Content-Type": "application/json", |
| | }, |
| | timeout=60.0, |
| | ) |
| | |
| | |
| | self._min_request_interval = 60.0 / max_rpm |
| | self._last_request_time: float = 0.0 |
| | self._rate_lock = asyncio.Lock() |
| | |
| | |
| | self._rate_limited_until: float = 0.0 |
| |
|
| | def _is_quota_exhausted(self) -> bool: |
| | """Return True if we are inside a long-wait circuit-breaker window.""" |
| | import time |
| | return time.monotonic() < self._rate_limited_until |
| |
|
| | def _handle_429(self, retry_after_str: str, attempt: int, body: str = "") -> float: |
| | """Parse retry-after and update circuit breaker. Returns seconds to sleep. |
| | |
| | Short waits (β€120s, per-minute limit) β return the wait so caller retries. |
| | Long waits (>120s, daily quota) β arm the circuit breaker and return 0 |
| | so the caller gives up immediately instead of blocking for minutes. |
| | """ |
| | import time |
| | try: |
| | retry_after = float(retry_after_str) |
| | except (ValueError, TypeError): |
| | retry_after = max(3.0, 2 ** attempt + 1) |
| |
|
| | |
| | |
| | is_daily_quota = retry_after > 120 or "daily" in body.lower() or "limit" in body.lower() |
| | if is_daily_quota: |
| | self._rate_limited_until = time.monotonic() + retry_after |
| | logger.warning( |
| | f"Groq daily quota exhausted β skipping LLM calls for {retry_after:.0f}s " |
| | f"(until quota resets). Simulation continues without LLM." |
| | ) |
| | return 0.0 |
| | |
| | wait = min(retry_after, 60.0) |
| | logger.info(f"Groq per-minute rate limit β waiting {wait:.0f}s before retry") |
| | return wait |
| |
|
| | async def _wait_for_rate_limit(self) -> None: |
| | """Wait if needed to stay under the RPM limit.""" |
| | import time |
| | async with self._rate_lock: |
| | now = time.monotonic() |
| | elapsed = now - self._last_request_time |
| | if elapsed < self._min_request_interval: |
| | wait_time = self._min_request_interval - elapsed |
| | await asyncio.sleep(wait_time) |
| | self._last_request_time = time.monotonic() |
| |
|
| | async def complete( |
| | self, |
| | system: str, |
| | user_message: str, |
| | model: Optional[str] = None, |
| | temperature: float = 0.7, |
| | max_tokens: int = 1024, |
| | ) -> str: |
| | """Send a chat completion request to Groq (async, rate-limited).""" |
| | model = self._map_model(model or self.default_model) |
| |
|
| | payload = { |
| | "model": model, |
| | "messages": [ |
| | {"role": "system", "content": system}, |
| | {"role": "user", "content": user_message}, |
| | ], |
| | "temperature": temperature, |
| | "max_tokens": max_tokens, |
| | } |
| |
|
| | if self._is_quota_exhausted(): |
| | logger.debug("Groq quota circuit breaker active β skipping complete()") |
| | self._last_error = f"quota exhausted (resets in {(self._rate_limited_until - time.monotonic())/3600:.1f}h)" |
| | return "" |
| |
|
| | for attempt in range(self.max_retries): |
| | try: |
| | await self._wait_for_rate_limit() |
| | response = await self._http.post("/chat/completions", json=payload) |
| | response.raise_for_status() |
| | data = response.json() |
| |
|
| | usage = data.get("usage", {}) |
| | self.usage.record( |
| | model, |
| | usage.get("prompt_tokens", 0), |
| | usage.get("completion_tokens", 0), |
| | ) |
| |
|
| | self._last_error = "" |
| | return data["choices"][0]["message"]["content"] |
| |
|
| | except httpx.HTTPStatusError as e: |
| | if e.response.status_code == 429: |
| | body = e.response.text[:200] if e.response.text else "" |
| | sleep_for = self._handle_429( |
| | e.response.headers.get("retry-after", ""), attempt, body |
| | ) |
| | if sleep_for == 0: |
| | self._last_error = f"429 daily quota exhausted: {body[:120]}" |
| | return "" |
| | await asyncio.sleep(sleep_for) |
| | elif e.response.status_code == 401: |
| | raise ValueError("Invalid GROQ_API_KEY") |
| | else: |
| | self._last_error = f"HTTP {e.response.status_code}: {e.response.text[:120]}" |
| | logger.error(f"Groq API error: {e.response.status_code} {e.response.text[:200]}") |
| | if attempt == self.max_retries - 1: |
| | return "" |
| | await asyncio.sleep(1) |
| | except Exception as e: |
| | self._last_error = str(e)[:120] |
| | logger.error(f"Groq error: {e}") |
| | if attempt == self.max_retries - 1: |
| | return "" |
| | await asyncio.sleep(1) |
| | return "" |
| |
|
| | async def complete_json( |
| | self, |
| | system: str, |
| | user_message: str, |
| | model: Optional[str] = None, |
| | temperature: float = 0.7, |
| | max_tokens: int = 1024, |
| | ) -> dict: |
| | """Send a JSON-mode request to Groq.""" |
| | model = self._map_model(model or self.default_model) |
| |
|
| | json_instruction = ( |
| | "\n\nRespond ONLY with valid JSON. No markdown, no explanation, no extra text. " |
| | "Just the JSON object." |
| | ) |
| |
|
| | payload = { |
| | "model": model, |
| | "messages": [ |
| | {"role": "system", "content": system}, |
| | {"role": "user", "content": user_message + json_instruction}, |
| | ], |
| | "temperature": temperature, |
| | "max_tokens": max_tokens, |
| | "response_format": {"type": "json_object"}, |
| | } |
| |
|
| | if self._is_quota_exhausted(): |
| | logger.debug("Groq quota circuit breaker active β skipping complete_json()") |
| | return {} |
| |
|
| | for attempt in range(self.max_retries): |
| | try: |
| | await self._wait_for_rate_limit() |
| | response = await self._http.post("/chat/completions", json=payload) |
| | response.raise_for_status() |
| | data = response.json() |
| |
|
| | usage = data.get("usage", {}) |
| | self.usage.record( |
| | model, |
| | usage.get("prompt_tokens", 0), |
| | usage.get("completion_tokens", 0), |
| | ) |
| |
|
| | text = data["choices"][0]["message"]["content"] |
| | return _parse_json_response(text) |
| |
|
| | except httpx.HTTPStatusError as e: |
| | if e.response.status_code == 429: |
| | body = e.response.text[:200] if e.response.text else "" |
| | sleep_for = self._handle_429( |
| | e.response.headers.get("retry-after", ""), attempt, body |
| | ) |
| | if sleep_for == 0: |
| | return {} |
| | await asyncio.sleep(sleep_for) |
| | else: |
| | logger.error(f"Groq JSON error: {e.response.status_code}") |
| | if attempt == self.max_retries - 1: |
| | return {} |
| | await asyncio.sleep(1) |
| | except Exception as e: |
| | logger.error(f"Groq JSON error: {e}") |
| | if attempt == self.max_retries - 1: |
| | return {} |
| | await asyncio.sleep(1) |
| | return {} |
| |
|
| | def _map_model(self, model: str) -> str: |
| | """Map Claude/Ollama model names to Groq equivalents.""" |
| | mapping = { |
| | MODEL_SONNET: self.default_model, |
| | MODEL_HAIKU: self.default_model, |
| | MODEL_LLAMA: MODEL_GROQ_LLAMA_8B, |
| | } |
| | return mapping.get(model, model) |
| |
|
| | @property |
| | def llm_status(self) -> str: |
| | return "limited" if self._is_quota_exhausted() else "active" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class GeminiClient: |
| | """Google Gemini via the OpenAI-compatible AI Studio endpoint. |
| | |
| | Free tier (no credit card, as of 2026): |
| | - gemini-2.0-flash: 5 RPM, ~1,500 RPD, 250,000 TPM |
| | - Daily quota resets at midnight Pacific Time |
| | - Paid tier: $0.10/1M input tokens, $0.40/1M output tokens |
| | - Get a free key at https://aistudio.google.com/apikey |
| | Uses the OpenAI-compatible endpoint so no extra SDK is needed. |
| | |
| | Token/cost guide (typical Soci request ~1,000 input + 90 output tokens): |
| | - Cost per request (paid): ~$0.000133 ($0.10 input + $0.40 output per 1M) |
| | - 1,500 RPD free tier β $0.20/day on paid tier |
| | - 500 RPD usage β $0.07/day |
| | - Override daily limit via GEMINI_DAILY_LIMIT env var. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | api_key: Optional[str] = None, |
| | default_model: str = MODEL_GEMINI_FLASH, |
| | max_retries: int = 3, |
| | max_rpm: int = 4, |
| | daily_limit: int = 1500, |
| | ) -> None: |
| | self.api_key = api_key or os.environ.get("GEMINI_API_KEY", "") |
| | if not self.api_key: |
| | raise ValueError( |
| | "GEMINI_API_KEY not set. " |
| | "Get a free key at https://aistudio.google.com/apikey" |
| | ) |
| | self.default_model = default_model |
| | self.max_retries = max_retries |
| | self.usage = LLMUsage() |
| | self.provider = PROVIDER_GEMINI |
| | self._last_error: str = "" |
| | self._http = httpx.AsyncClient( |
| | base_url="https://generativelanguage.googleapis.com/v1beta/openai/", |
| | headers={ |
| | "Authorization": f"Bearer {self.api_key}", |
| | "Content-Type": "application/json", |
| | }, |
| | timeout=60.0, |
| | ) |
| | self._min_request_interval = 60.0 / max_rpm |
| | self._last_request_time: float = 0.0 |
| | self._rate_lock = asyncio.Lock() |
| | self._rate_limited_until: float = 0.0 |
| | |
| | |
| | self._unavailable_models: set[str] = set() |
| | |
| | self._daily_limit: int = int(os.environ.get("GEMINI_DAILY_LIMIT", str(daily_limit))) |
| | self._daily_requests: int = 0 |
| | self._daily_date: str = "" |
| | self._warned_thresholds: set = set() |
| |
|
| | def _is_quota_exhausted(self) -> bool: |
| | return time.monotonic() < self._rate_limited_until |
| |
|
| | @staticmethod |
| | def _secs_until_pacific_midnight() -> float: |
| | """Seconds from now until the next midnight Pacific Time (UTC-8). |
| | |
| | Gemini free-tier quotas reset at midnight Pacific, so this is the |
| | correct circuit-breaker duration after daily quota exhaustion. |
| | """ |
| | import datetime as _dt |
| | pacific = _dt.timezone(_dt.timedelta(hours=-8)) |
| | now = _dt.datetime.now(pacific) |
| | midnight = (now + _dt.timedelta(days=1)).replace( |
| | hour=0, minute=0, second=0, microsecond=0 |
| | ) |
| | secs = (midnight - now).total_seconds() |
| | return max(secs, 60.0) |
| |
|
| | def _track_daily_request(self) -> None: |
| | """Increment daily counter and log warnings at 50/70/90/99% of the daily limit.""" |
| | import datetime as _dt |
| | |
| | pacific_offset = _dt.timezone(_dt.timedelta(hours=-8)) |
| | today = _dt.datetime.now(pacific_offset).strftime("%Y-%m-%d") |
| | if today != self._daily_date: |
| | self._daily_date = today |
| | self._daily_requests = 0 |
| | self._warned_thresholds = set() |
| | self._daily_requests += 1 |
| | pct = self._daily_requests / self._daily_limit |
| | remaining = self._daily_limit - self._daily_requests |
| | for threshold in (0.50, 0.70, 0.90, 0.99): |
| | if pct >= threshold and threshold not in self._warned_thresholds: |
| | self._warned_thresholds.add(threshold) |
| | hrs = self._secs_until_pacific_midnight() / 3600 |
| | logger.warning( |
| | f"Gemini daily quota: {self._daily_requests}/{self._daily_limit} requests used " |
| | f"({pct * 100:.0f}%) β {remaining} remaining, resets in {hrs:.1f}h (midnight Pacific)" |
| | ) |
| |
|
| | async def _wait_for_rate_limit(self) -> None: |
| | async with self._rate_lock: |
| | now = time.monotonic() |
| | elapsed = now - self._last_request_time |
| | if elapsed < self._min_request_interval: |
| | await asyncio.sleep(self._min_request_interval - elapsed) |
| | self._last_request_time = time.monotonic() |
| |
|
| | def _map_model(self, model: str) -> str: |
| | """Map Claude/Groq model names to Gemini equivalents.""" |
| | mapping = { |
| | MODEL_SONNET: self.default_model, |
| | MODEL_HAIKU: self.default_model, |
| | MODEL_GROQ_LLAMA_8B: MODEL_GEMINI_FLASH, |
| | } |
| | mapped = mapping.get(model, model) |
| | |
| | while mapped in self._unavailable_models: |
| | fallback = _GEMINI_FALLBACK_CHAIN.get(mapped) |
| | if fallback is None or fallback == mapped: |
| | break |
| | mapped = fallback |
| | return mapped |
| |
|
| | def _handle_model_not_found(self, model: str) -> Optional[str]: |
| | """Mark model unavailable and return the fallback model ID, or None if no fallback.""" |
| | self._unavailable_models.add(model) |
| | |
| | if self.default_model == model: |
| | fallback = _GEMINI_FALLBACK_CHAIN.get(model) |
| | if fallback: |
| | self.default_model = fallback |
| | logger.warning( |
| | f"Gemini model '{model}' not available on this endpoint β " |
| | f"switching to '{fallback}' for all future calls" |
| | ) |
| | return fallback |
| | return None |
| |
|
| | @property |
| | def llm_status(self) -> str: |
| | return "limited" if self._is_quota_exhausted() else "active" |
| |
|
| | async def complete( |
| | self, |
| | system: str, |
| | user_message: str, |
| | model: Optional[str] = None, |
| | temperature: float = 0.7, |
| | max_tokens: int = 1024, |
| | ) -> str: |
| | """Send a chat completion request to Gemini.""" |
| | if self._is_quota_exhausted(): |
| | self._last_error = f"quota exhausted (resets in {self._secs_until_pacific_midnight()/3600:.1f}h)" |
| | logger.debug("Gemini quota circuit breaker active β skipping complete()") |
| | return "" |
| |
|
| | model = self._map_model(model or self.default_model) |
| | payload = { |
| | "model": model, |
| | "messages": [ |
| | {"role": "system", "content": system}, |
| | {"role": "user", "content": user_message}, |
| | ], |
| | "temperature": temperature, |
| | "max_tokens": max_tokens, |
| | } |
| |
|
| | for attempt in range(self.max_retries): |
| | try: |
| | await self._wait_for_rate_limit() |
| | resp = await self._http.post("chat/completions", json=payload) |
| | resp.raise_for_status() |
| | data = resp.json() |
| | usage = data.get("usage", {}) |
| | self.usage.record(model, usage.get("prompt_tokens", 0), usage.get("completion_tokens", 0)) |
| | self._track_daily_request() |
| | self._last_error = "" |
| | return data["choices"][0]["message"]["content"] |
| | except httpx.HTTPStatusError as e: |
| | status = e.response.status_code |
| | body_raw = e.response.text or "" |
| | body = body_raw[:200].replace("{", "(").replace("}", ")") |
| | if status == 429: |
| | retry_after = e.response.headers.get("retry-after", "5") |
| | try: |
| | wait = float(retry_after) |
| | except (ValueError, TypeError): |
| | wait = 5.0 |
| | |
| | |
| | body_lower = body_raw.lower() |
| | is_daily = "per-day" in body_lower or "per day" in body_lower or "daily" in body_lower or wait > 120 |
| | if is_daily: |
| | circuit_wait = self._secs_until_pacific_midnight() |
| | self._rate_limited_until = time.monotonic() + circuit_wait |
| | self._last_error = f"daily quota exhausted β resets in {circuit_wait/3600:.1f}h" |
| | logger.warning(f"Gemini daily quota exhausted β circuit-breaking for {circuit_wait/3600:.1f}h (until midnight Pacific): {body}") |
| | return "" |
| | |
| | wait = min(wait, 30.0) |
| | logger.info(f"Gemini per-minute rate limit β waiting {wait:.0f}s before retry") |
| | await asyncio.sleep(wait) |
| | elif any(kw in body_raw.lower() for kw in _GEMINI_MODEL_UNAVAILABLE_KWS): |
| | |
| | self._last_error = f"model unavailable ({status}): {body[:100]}" |
| | fallback = self._handle_model_not_found(model) |
| | if fallback: |
| | model = fallback |
| | payload["model"] = model |
| | continue |
| | logger.error(f"Gemini model '{model}' not found and no fallback: {body}") |
| | return "" |
| | else: |
| | self._last_error = f"HTTP {status}: {body[:120]}" |
| | logger.error(f"Gemini HTTP error: {status} {body}") |
| | if attempt == self.max_retries - 1: |
| | return "" |
| | await asyncio.sleep(1) |
| | except Exception as e: |
| | self._last_error = str(e)[:120] |
| | logger.error(f"Gemini error: {e}") |
| | if attempt == self.max_retries - 1: |
| | return "" |
| | await asyncio.sleep(1) |
| | return "" |
| |
|
| | async def complete_json( |
| | self, |
| | system: str, |
| | user_message: str, |
| | model: Optional[str] = None, |
| | temperature: float = 0.7, |
| | max_tokens: int = 1024, |
| | ) -> dict: |
| | """Send a JSON-mode request to Gemini.""" |
| | if self._is_quota_exhausted(): |
| | self._last_error = f"quota exhausted (resets in {self._secs_until_pacific_midnight()/3600:.1f}h)" |
| | logger.debug("Gemini quota circuit breaker active β skipping complete_json()") |
| | return {} |
| |
|
| | model = self._map_model(model or self.default_model) |
| | json_instruction = ( |
| | "\n\nRespond ONLY with valid JSON. No markdown, no explanation, no extra text. " |
| | "Just the JSON object." |
| | ) |
| | payload = { |
| | "model": model, |
| | "messages": [ |
| | {"role": "system", "content": system}, |
| | {"role": "user", "content": user_message + json_instruction}, |
| | ], |
| | "temperature": temperature, |
| | "max_tokens": max_tokens, |
| | "response_format": {"type": "json_object"}, |
| | } |
| |
|
| | for attempt in range(self.max_retries): |
| | try: |
| | await self._wait_for_rate_limit() |
| | resp = await self._http.post("chat/completions", json=payload) |
| | resp.raise_for_status() |
| | data = resp.json() |
| | usage = data.get("usage", {}) |
| | self.usage.record(model, usage.get("prompt_tokens", 0), usage.get("completion_tokens", 0)) |
| | self._track_daily_request() |
| | text = data["choices"][0]["message"]["content"] |
| | return _parse_json_response(text) |
| | except httpx.HTTPStatusError as e: |
| | status = e.response.status_code |
| | body_raw = e.response.text or "" |
| | body = body_raw[:200].replace("{", "(").replace("}", ")") |
| | if status == 429: |
| | retry_after = e.response.headers.get("retry-after", "5") |
| | try: |
| | wait = float(retry_after) |
| | except (ValueError, TypeError): |
| | wait = 5.0 |
| | body_lower = body_raw.lower() |
| | is_daily = "per-day" in body_lower or "per day" in body_lower or "daily" in body_lower or wait > 120 |
| | if is_daily: |
| | circuit_wait = self._secs_until_pacific_midnight() |
| | self._rate_limited_until = time.monotonic() + circuit_wait |
| | logger.warning(f"Gemini daily quota exhausted β circuit-breaking for {circuit_wait/3600:.1f}h: {body}") |
| | return {} |
| | wait = min(wait, 30.0) |
| | logger.info(f"Gemini per-minute rate limit β waiting {wait:.0f}s before retry") |
| | await asyncio.sleep(wait) |
| | elif any(kw in body_raw.lower() for kw in _GEMINI_MODEL_UNAVAILABLE_KWS): |
| | |
| | fallback = self._handle_model_not_found(model) |
| | if fallback: |
| | model = fallback |
| | payload["model"] = model |
| | continue |
| | logger.error(f"Gemini model '{model}' not found and no fallback: {body}") |
| | return {} |
| | else: |
| | logger.error(f"Gemini JSON error: {status} {body}") |
| | if attempt == self.max_retries - 1: |
| | return {} |
| | await asyncio.sleep(1) |
| | except Exception as e: |
| | logger.error(f"Gemini JSON error: {e}") |
| | if attempt == self.max_retries - 1: |
| | return {} |
| | await asyncio.sleep(1) |
| | return {} |
| |
|
| |
|
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def create_llm_client( |
| | provider: Optional[str] = None, |
| | model: Optional[str] = None, |
| | ollama_url: str = "http://localhost:11434", |
| | ): |
| | """Create an LLM client based on environment or explicit config. |
| | |
| | Provider detection order: |
| | 1. Explicit provider argument |
| | 2. LLM_PROVIDER env var |
| | 3. Default β NN (local ONNX model, zero cost) |
| | 4. If ANTHROPIC_API_KEY is set β Claude |
| | 5. If GROQ_API_KEY is set β Groq |
| | 6. If GEMINI_API_KEY is set β Gemini |
| | 7. Fallback β Ollama (local) |
| | """ |
| | if provider is None: |
| | provider = os.environ.get("LLM_PROVIDER", "").lower() |
| |
|
| | if not provider: |
| | |
| | |
| | provider = PROVIDER_NN |
| |
|
| | if provider == PROVIDER_NN: |
| | from soci.engine.nn_client import NNClient |
| | return NNClient() |
| | elif provider == PROVIDER_CLAUDE: |
| | default_model = model or MODEL_HAIKU |
| | return ClaudeClient(default_model=default_model) |
| | elif provider == PROVIDER_GROQ: |
| | default_model = model or os.environ.get("GROQ_MODEL", MODEL_GROQ_LLAMA_8B) |
| | return GroqClient(default_model=default_model) |
| | elif provider == PROVIDER_GEMINI: |
| | default_model = model or os.environ.get("GEMINI_MODEL", MODEL_GEMINI_FLASH) |
| | return GeminiClient(default_model=default_model) |
| | elif provider == PROVIDER_OLLAMA: |
| | default_model = model or os.environ.get("OLLAMA_MODEL", MODEL_OLLAMA_SOCI) |
| | return OllamaClient(base_url=ollama_url, default_model=default_model) |
| | else: |
| | raise ValueError(f"Unknown LLM provider: {provider}. Use 'nn', 'claude', 'groq', 'gemini', or 'ollama'.") |
| |
|
| |
|
| | |
| |
|
| | PLAN_DAY_PROMPT = """\ |
| | It is {time_str} on Day {day}. You just woke up. |
| | |
| | {context} |
| | |
| | Based on your personality, needs, and memories, plan your day. What will you do today? |
| | Think about your obligations (work, responsibilities) and your desires (socializing, fun, rest). |
| | |
| | Respond with a JSON object: |
| | {{ |
| | "plan": ["item 1", "item 2", ...], |
| | "reasoning": "brief explanation of why this plan" |
| | }} |
| | |
| | Keep the plan to 5-8 items. Be specific about locations and times. |
| | """ |
| |
|
| | DECIDE_ACTION_PROMPT = """\ |
| | It is {time_str} on Day {day}. |
| | |
| | {context} |
| | |
| | You are currently at {location_name}. You just finished: {last_activity}. |
| | |
| | What do you do next? Consider your needs, your plan, who's around, and any events happening. |
| | |
| | Respond with a JSON object: |
| | {{ |
| | "action": "move|work|eat|sleep|talk|exercise|shop|relax|wander", |
| | "target": "location_id or agent_id (if talking) or empty string", |
| | "detail": "what specifically you're doing, in first person", |
| | "duration": 1-4, |
| | "reasoning": "brief internal thought about why" |
| | }} |
| | |
| | Available locations you can move to: {connected_locations} |
| | People at your current location: {people_here} |
| | """ |
| |
|
| | OBSERVE_PROMPT = """\ |
| | It is {time_str} on Day {day}. |
| | |
| | {context} |
| | |
| | You just noticed: {observation} |
| | |
| | How important is this to you (1-10)? What do you think about it? |
| | |
| | Respond with a JSON object: |
| | {{ |
| | "importance": 1-10, |
| | "reaction": "your brief internal thought or feeling about this" |
| | }} |
| | """ |
| |
|
| | REFLECT_PROMPT = """\ |
| | It is {time_str} on Day {day}. |
| | |
| | {context} |
| | |
| | RECENT EXPERIENCES: |
| | {recent_memories} |
| | |
| | Take a moment to reflect on your recent experiences. What patterns do you notice? |
| | What have you learned? How do you feel about things? |
| | |
| | Respond with a JSON object: |
| | {{ |
| | "reflections": ["reflection 1", "reflection 2", ...], |
| | "mood_shift": -0.3 to 0.3, |
| | "reasoning": "why your mood shifted this way", |
| | "life_event": null, |
| | "goal_update": null |
| | }} |
| | |
| | Generate 1-3 reflections. Each should be a genuine insight, not just a summary. |
| | |
| | If something truly significant happened recently (a promotion, finishing a project, personal milestone, |
| | making a close friend, learning something important), set life_event to: |
| | {{"type": "promotion|graduated|achievement|milestone|new_job|moved|breakup|friendship", "description": "what happened"}} |
| | Most reflections should have life_event as null β only include when genuinely noteworthy. |
| | |
| | If you want to set a new goal or update progress on an existing one, set goal_update to: |
| | {{"action": "add|complete|progress", "description": "goal text", "goal_id": null}} |
| | For "complete" or "progress", include the goal_id number. For "add", include description only. |
| | """ |
| |
|
| | CONVERSATION_PROMPT = """\ |
| | It is {time_str} on Day {day}. |
| | |
| | {context} |
| | |
| | You are at {location_name}. {other_name} is here too. |
| | |
| | WHAT YOU KNOW ABOUT {other_name}: |
| | {relationship_context} |
| | |
| | {conversation_history} |
| | |
| | {other_name} says: "{other_message}" |
| | |
| | How do you respond? Stay in character. Be natural β not every conversation is deep. |
| | Sometimes people make small talk, sometimes they argue, sometimes they're awkward. |
| | |
| | Respond with a JSON object: |
| | {{ |
| | "message": "your spoken response", |
| | "inner_thought": "what you're actually thinking", |
| | "sentiment_delta": -0.1 to 0.1, |
| | "trust_delta": -0.05 to 0.05 |
| | }} |
| | """ |
| |
|
| | CONVERSATION_INITIATE_PROMPT = """\ |
| | It is {time_str} on Day {day}. |
| | |
| | {context} |
| | |
| | You are at {location_name}. {other_name} is here. |
| | |
| | WHAT YOU KNOW ABOUT {other_name}: |
| | {relationship_context} |
| | |
| | You decide to start a conversation with {other_name}. What do you say? |
| | Consider the time of day, location, your mood, and your history with them. |
| | |
| | Respond with a JSON object: |
| | {{ |
| | "message": "what you say to start the conversation", |
| | "inner_thought": "why you're initiating this conversation", |
| | "topic": "brief topic label" |
| | }} |
| | """ |
| |
|