|
|
from __future__ import annotations |
|
|
|
|
|
import threading |
|
|
from datetime import datetime, timezone |
|
|
from typing import Dict, Tuple, Any |
|
|
|
|
|
from .config import settings |
|
|
|
|
|
_lock = threading.Lock() |
|
|
_state: Dict[str, Any] = {"date": None, "count": 0} |
|
|
|
|
|
|
|
|
def _today_key() -> str: |
|
|
return datetime.now(timezone.utc).strftime("%Y%m%d") |
|
|
|
|
|
|
|
|
def _in_memory_increment(limit: int) -> Tuple[bool, Dict[str, Any]]: |
|
|
info: Dict[str, Any] = {"limit": limit} |
|
|
with _lock: |
|
|
today = _today_key() |
|
|
if _state["date"] != today: |
|
|
_state["date"] = today |
|
|
_state["count"] = 0 |
|
|
_state["count"] += 1 |
|
|
count = int(_state["count"]) |
|
|
|
|
|
info["count"] = count |
|
|
info["remaining"] = max(0, limit - count) |
|
|
return count <= limit, info |
|
|
|
|
|
|
|
|
def _redis_increment(limit: int) -> Tuple[bool, Dict[str, Any]]: |
|
|
info: Dict[str, Any] = {"limit": limit} |
|
|
import redis |
|
|
|
|
|
client = redis.from_url(settings.redis_url, decode_responses=True) |
|
|
key = f"toxrai:ai_cap:{_today_key()}" |
|
|
count = int(client.incr(key)) |
|
|
if count == 1: |
|
|
|
|
|
client.expire(key, 60 * 60 * 48) |
|
|
|
|
|
info["count"] = count |
|
|
info["remaining"] = max(0, limit - count) |
|
|
return count <= limit, info |
|
|
|
|
|
|
|
|
def check_and_increment_global_ai_cap() -> Tuple[bool, Dict[str, Any]]: |
|
|
"""Global daily cap to prevent overuse in public demos.""" |
|
|
limit = int(settings.max_ai_summaries_per_day or 0) |
|
|
if limit <= 0: |
|
|
return True, {"limit": limit, "count": 0, "remaining": None} |
|
|
|
|
|
if settings.redis_url: |
|
|
try: |
|
|
return _redis_increment(limit) |
|
|
except Exception as e: |
|
|
|
|
|
allowed, info = _in_memory_increment(limit) |
|
|
info["redis_error"] = str(e) |
|
|
return allowed, info |
|
|
|
|
|
return _in_memory_increment(limit) |
|
|
|