from collections import defaultdict from datetime import datetime, timedelta, timezone from dateutil import parser from src.common.logger import logger from src.db.supabase_client import get_license, supabase groq_keys_cache = [] last_key_fetch = None CACHE_TTL = 60 key_usage_tracker = defaultdict(lambda: {"count": 0, "reset_at": None}) # Per-user rate limit USER_RATE_LIMIT = 20 user_request_tracker = {} def get_groq_keys(): global groq_keys_cache, last_key_fetch now = datetime.now(timezone.utc) # Reset per-key usage for key in list(key_usage_tracker.keys()): if ( key_usage_tracker[key]["reset_at"] and now >= key_usage_tracker[key]["reset_at"] ): key_usage_tracker[key] = {"count": 0, "reset_at": None} # Use cached keys if groq_keys_cache and last_key_fetch: if (now - last_key_fetch).total_seconds() < CACHE_TTL: return sorted(groq_keys_cache, key=lambda k: key_usage_tracker[k]["count"]) # Reload keys try: result = ( supabase.table("groq_api_keys") .select("api_key") .eq("is_active", True) .order("sort_order") .execute() ) groq_keys_cache = [row["api_key"] for row in result.data] last_key_fetch = now logger.info(f"Loaded {len(groq_keys_cache)} Groq API keys from database") return sorted(groq_keys_cache, key=lambda k: key_usage_tracker[k]["count"]) except Exception as e: logger.error(f"Error fetching Groq keys: {str(e)}") return [] def track_key_usage(api_key: str): now = datetime.now(timezone.utc) if key_usage_tracker[api_key]["reset_at"] is None: key_usage_tracker[api_key]["reset_at"] = now + timedelta(minutes=1) key_usage_tracker[api_key]["count"] += 1 def check_user_rate_limit(email: str) -> bool: now = datetime.now(timezone.utc) if email not in user_request_tracker: user_request_tracker[email] = { "count": 0, "reset_at": now + timedelta(minutes=1), } return True tracker = user_request_tracker[email] if now >= tracker["reset_at"]: tracker["count"] = 0 tracker["reset_at"] = now + timedelta(minutes=1) if tracker["count"] >= USER_RATE_LIMIT: return False tracker["count"] += 1 return True def verify_user_license(email: str, license_key: str): lic = get_license(license_key) if not lic: return None, "LICENSE_NOT_FOUND" if lic["user_id"] != email: return None, "EMAIL_NOT_MATCH" if lic["status"] != "active": return None, "LICENSE_INACTIVE" if lic["expires_at"]: exp = parser.isoparse(lic["expires_at"]) if exp < datetime.now(timezone.utc): return None, "LICENSE_EXPIRED" return lic, None