from __future__ import annotations import threading from datetime import datetime, timezone from typing import Dict, Tuple, Any from .config import settings _lock = threading.Lock() _state: Dict[str, Any] = {"date": None, "count": 0} def _today_key() -> str: return datetime.now(timezone.utc).strftime("%Y%m%d") def _in_memory_increment(limit: int) -> Tuple[bool, Dict[str, Any]]: info: Dict[str, Any] = {"limit": limit} with _lock: today = _today_key() if _state["date"] != today: _state["date"] = today _state["count"] = 0 _state["count"] += 1 count = int(_state["count"]) info["count"] = count info["remaining"] = max(0, limit - count) return count <= limit, info def _redis_increment(limit: int) -> Tuple[bool, Dict[str, Any]]: info: Dict[str, Any] = {"limit": limit} import redis # local import to keep optional client = redis.from_url(settings.redis_url, decode_responses=True) key = f"toxrai:ai_cap:{_today_key()}" count = int(client.incr(key)) if count == 1: # expire a bit after 24h to avoid unbounded growth client.expire(key, 60 * 60 * 48) info["count"] = count info["remaining"] = max(0, limit - count) return count <= limit, info def check_and_increment_global_ai_cap() -> Tuple[bool, Dict[str, Any]]: """Global daily cap to prevent overuse in public demos.""" limit = int(settings.max_ai_summaries_per_day or 0) if limit <= 0: return True, {"limit": limit, "count": 0, "remaining": None} if settings.redis_url: try: return _redis_increment(limit) except Exception as e: # Fall back to in-memory, but report the error for debugging. allowed, info = _in_memory_increment(limit) info["redis_error"] = str(e) return allowed, info return _in_memory_increment(limit)