Spaces:
Running
Running
| from datetime import datetime, timedelta, UTC | |
| from enum import StrEnum | |
| from supabase import create_client | |
| import os | |
| def _client(): | |
| return create_client(os.environ["SUPABASE_URL"], os.environ["SUPABASE_KEY"]) | |
| class Tier(StrEnum): | |
| FREE = "free" | |
| PRO = "pro" | |
| SCHOLAR = "scholar" | |
| TTL: dict[Tier, timedelta] = { | |
| Tier.FREE: timedelta(hours=3), | |
| Tier.PRO: timedelta(weeks=1), | |
| Tier.SCHOLAR: timedelta(days=30), | |
| } | |
| FILE_LIMIT_MB: dict[Tier, int] = {Tier.FREE: 5, Tier.PRO: 25, Tier.SCHOLAR: 50} | |
| DOC_LIMIT: dict[Tier, int | None] = {Tier.FREE: 1, Tier.PRO: 10, Tier.SCHOLAR: None} | |
| # General message limits (per session for FREE, per day for paid) | |
| MSG_LIMIT: dict[Tier, int | None] = {Tier.FREE: 5, Tier.PRO: 200, Tier.SCHOLAR: None} | |
| # DeepMind daily message limits — split by submode. | |
| # "pro" submode = Groq-powered (fast, context-limited) | |
| # "super" submode = Cerebras-powered (ultra-fast, full-context) | |
| DEEPMIND_PRO_LIMIT: dict[Tier, int | None] = { | |
| Tier.FREE: 5, # 5 Pro messages/day | |
| Tier.PRO: 100, # 100 Pro messages/day | |
| Tier.SCHOLAR: 300, # 300 Pro messages/day | |
| } | |
| DEEPMIND_SUPER_LIMIT: dict[Tier, int | None] = { | |
| Tier.FREE: 2, # 2 Super messages/day (Free users) | |
| Tier.PRO: 50, # 50 Super messages/day | |
| Tier.SCHOLAR: 200, # 200 Super messages/day | |
| } | |
| def get_user_tier(user_id: str) -> Tier: | |
| """Return user tier, defaulting to FREE if profile row doesn't exist yet.""" | |
| try: | |
| r = _client().table("profiles").select("tier").eq("id", user_id).single().execute() | |
| return Tier(r.data.get("tier", "free")) | |
| except Exception: | |
| # Profile row not yet created (new user or migration not run) — default to free | |
| # and upsert a row so subsequent calls succeed | |
| try: | |
| _client().table("profiles").upsert({"id": user_id, "tier": "free"}).execute() | |
| except Exception: | |
| pass | |
| return Tier.FREE | |
| def get_expiry(tier: Tier) -> datetime: | |
| return datetime.now(UTC) + TTL[tier] | |
| def can_upload(user_id: str, file_bytes: int) -> tuple[bool, str]: | |
| tier = get_user_tier(user_id) | |
| max_bytes = FILE_LIMIT_MB[tier] * 1024 * 1024 | |
| if file_bytes > max_bytes: | |
| return False, f"File exceeds {FILE_LIMIT_MB[tier]}MB limit on {tier} plan." | |
| max_docs = DOC_LIMIT[tier] | |
| if max_docs is not None: | |
| count = ( | |
| _client() | |
| .table("documents") | |
| .select("id", count="exact") | |
| .eq("user_id", user_id) | |
| .execute() | |
| .count | |
| ) | |
| if count >= max_docs: | |
| return False, f"{tier.capitalize()} allows {max_docs} doc(s). Upgrade to store more." | |
| return True, "ok" | |
| def check_message_limit(user_id: str, session_id: str) -> tuple[bool, str]: | |
| tier = get_user_tier(user_id) | |
| limit = MSG_LIMIT[tier] | |
| if limit is None: | |
| return True, "ok" | |
| client = _client() | |
| if tier == Tier.FREE: | |
| # FREE: count per session | |
| count = ( | |
| client.table("chat_history") | |
| .select("id", count="exact") | |
| .eq("session_id", session_id) | |
| .eq("role", "user") | |
| .execute() | |
| .count | |
| ) | |
| else: | |
| # Paid: count per day (UTC) | |
| today = datetime.now(UTC).date().isoformat() | |
| count = ( | |
| client.table("chat_history") | |
| .select("id", count="exact") | |
| .eq("user_id", user_id) | |
| .gte("created_at", today) | |
| .eq("role", "user") | |
| .execute() | |
| .count | |
| ) | |
| if count >= limit: | |
| return False, f"Message limit reached on {tier} plan. Upgrade to continue." | |
| return True, "ok" | |
| def check_deepmind_limit(user_id: str, submode: str = "pro") -> tuple[bool, str]: | |
| """Check the user's DeepMind daily limit for the given submode. | |
| Submode 'pro' → Groq-powered — limits: Free=5, Pro=100, Scholar=300 | |
| Submode 'super' → Cerebras-powered — limits: Free=2, Pro=50, Scholar=200 | |
| All counts are per UTC calendar day. | |
| """ | |
| tier = get_user_tier(user_id) | |
| limit = (DEEPMIND_SUPER_LIMIT if submode == "super" else DEEPMIND_PRO_LIMIT)[tier] | |
| if limit is None: | |
| return True, "ok" | |
| today = datetime.now(UTC).date().isoformat() | |
| count = ( | |
| _client() | |
| .table("chat_history") | |
| .select("id", count="exact") | |
| .eq("user_id", user_id) | |
| .eq("role", "user") | |
| .eq("is_deepmind", True) | |
| .eq("deepmind_submode", submode) | |
| .gte("created_at", today) | |
| .execute() | |
| .count | |
| ) or 0 | |
| if count >= limit: | |
| tier_label = tier.capitalize() | |
| mode_label = "Super" if submode == "super" else "Pro" | |
| return False, ( | |
| f"DeepMind {mode_label} daily limit reached ({limit} messages/day on {tier_label} plan). " | |
| "Resets at midnight UTC." | |
| ) | |
| return True, "ok" | |
| def get_deepmind_usage(user_id: str) -> dict: | |
| """Return per-submode DeepMind usage stats for today. | |
| Returns: | |
| { | |
| tier: str, | |
| pro: { used, limit, remaining }, | |
| super: { used, limit, remaining }, | |
| } | |
| """ | |
| tier = get_user_tier(user_id) | |
| client = _client() | |
| today = datetime.now(UTC).date().isoformat() | |
| def _count(submode: str) -> int: | |
| return ( | |
| client | |
| .table("chat_history") | |
| .select("id", count="exact") | |
| .eq("user_id", user_id) | |
| .eq("role", "user") | |
| .eq("is_deepmind", True) | |
| .eq("deepmind_submode", submode) | |
| .gte("created_at", today) | |
| .execute() | |
| .count | |
| ) or 0 | |
| pro_used = _count("pro") | |
| super_used = _count("super") | |
| pro_limit = DEEPMIND_PRO_LIMIT[tier] | |
| sup_limit = DEEPMIND_SUPER_LIMIT[tier] | |
| return { | |
| "tier": str(tier), | |
| "pro": { | |
| "used": pro_used, | |
| "limit": pro_limit, | |
| "remaining": (pro_limit - pro_used) if pro_limit is not None else None, | |
| }, | |
| "super": { | |
| "used": super_used, | |
| "limit": sup_limit, | |
| "remaining": (sup_limit - super_used) if sup_limit is not None else None, | |
| }, | |
| } | |