Spaces:
Sleeping
Sleeping
| import os | |
| import requests | |
| import datetime | |
| from fastapi import HTTPException | |
| from collections import defaultdict | |
| # Rate limits | |
| # Rate limits | |
| FREE_DAILY_LIMIT = 3 | |
| # In-memory storage for rate limiting (reset on restart is fine for MVP) | |
| USAGE_DB = defaultdict(lambda: {"count": 0, "date": datetime.date.today().isoformat()}) | |
| # Dodo Payments API | |
| DODO_API_BASE = "https://live.dodopayments.com" | |
| # Supabase Config (Backend needs these env vars) | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") or os.getenv("NEXT_PUBLIC_SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_ANON_KEY") or os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") | |
| def verify_dodo_license(license_key: str) -> dict: | |
| """ | |
| Verify a license key using Dodo Payments API. | |
| """ | |
| try: | |
| response = requests.post( | |
| f"{DODO_API_BASE}/licenses/validate", | |
| json={"license_key": license_key}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=10 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if data.get("valid") == True: | |
| return {"valid": True, "message": "License verified!", "remaining": 9999} | |
| else: | |
| return {"valid": False, "message": "Invalid license."} | |
| elif response.status_code == 404: | |
| return {"valid": False, "message": "License key not found."} | |
| else: | |
| return {"valid": False, "message": "Invalid license key."} | |
| except Exception as e: | |
| print(f"Dodo license verification error: {e}") | |
| return {"valid": False, "message": "Verification service unavailable."} | |
| def check_supabase_auth(authorization: str) -> dict: | |
| """ | |
| Verify Supabase Bearer token and check 'is_pro' status in profiles. | |
| Returns: {"is_pro": bool, "user_id": str} | |
| """ | |
| if not authorization or not authorization.startswith("Bearer "): | |
| return {"is_pro": False} | |
| token = authorization.split(" ")[1] | |
| if not SUPABASE_URL or not SUPABASE_KEY: | |
| print("Warning: Supabase credentials not set in backend.") | |
| return {"is_pro": False} | |
| try: | |
| # 1. Verify User (Get User ID) | |
| user_res = requests.get( | |
| f"{SUPABASE_URL}/auth/v1/user", | |
| headers={"Authorization": f"Bearer {token}", "apikey": SUPABASE_KEY}, | |
| timeout=5 | |
| ) | |
| if user_res.status_code != 200: | |
| return {"is_pro": False} | |
| user_id = user_res.json().get("id") | |
| if not user_id: | |
| return {"is_pro": False} | |
| # 2. Check Profile (is_pro status) | |
| # Using REST API to query profiles table | |
| profile_res = requests.get( | |
| f"{SUPABASE_URL}/rest/v1/profiles", | |
| params={"id": f"eq.{user_id}", "select": "is_pro"}, | |
| headers={"Authorization": f"Bearer {token}", "apikey": SUPABASE_KEY}, | |
| timeout=5 | |
| ) | |
| if profile_res.status_code == 200: | |
| profiles = profile_res.json() | |
| if profiles and profiles[0].get("is_pro") is True: | |
| return {"is_pro": True, "user_id": user_id} | |
| return {"is_pro": False, "user_id": user_id} | |
| except Exception as e: | |
| print(f"Supabase auth check failed: {e}") | |
| return {"is_pro": False} | |
| # Alias for backward compatibility | |
| verify_gumroad_license = verify_dodo_license | |
| def check_rate_limit(ip_address: str, license_key: str = None, fingerprint: str = None, authorization: str = None) -> dict: | |
| """ | |
| Check availability. | |
| Priority: | |
| 1. License Key (Pro) | |
| 2. Supabase Auth (Pro) | |
| 3. Browser Fingerprint (Free Limit) | |
| 4. IP Address (Free Limit) | |
| """ | |
| # 1. Check license if provided | |
| if license_key: | |
| verification = verify_dodo_license(license_key) | |
| if verification["valid"]: | |
| return {"allowed": True, "reason": "licensed", "remaining": 9999} | |
| # 2. Check Supabase Pro status | |
| if authorization: | |
| pro_status = check_supabase_auth(authorization) | |
| if pro_status["is_pro"]: | |
| return {"allowed": True, "reason": "pro_user", "remaining": 9999} | |
| # 3. Determine user identifier - prefer fingerprint | |
| user_id = fingerprint if fingerprint else ip_address | |
| # 4. Check daily limit | |
| today = datetime.date.today().isoformat() | |
| user_data = USAGE_DB[user_id] | |
| # Reset if new day | |
| if user_data["date"] != today: | |
| user_data["date"] = today | |
| user_data["count"] = 0 | |
| # Check limit | |
| if user_data["count"] < FREE_DAILY_LIMIT: | |
| user_data["count"] += 1 | |
| return { | |
| "allowed": True, | |
| "reason": "free_tier", | |
| "remaining": FREE_DAILY_LIMIT - user_data["count"] | |
| } | |
| # Limit reached | |
| return { | |
| "allowed": False, | |
| "reason": "limit_reached", | |
| "message": f"Daily limit of {FREE_DAILY_LIMIT} analyses reached." | |
| } | |
| def get_remaining_analyses(ip_address: str, license_key: str = None, fingerprint: str = None, authorization: str = None) -> dict: | |
| """ | |
| Get remaining analyses WITHOUT incrementing the counter. | |
| Used to show remaining count on page load. | |
| """ | |
| # 1. Check license | |
| if license_key: | |
| verification = verify_dodo_license(license_key) | |
| if verification["valid"]: | |
| return {"remaining": 9999, "is_pro": True, "limit": FREE_DAILY_LIMIT} | |
| # 2. Check Supabase Pro status | |
| if authorization: | |
| pro_status = check_supabase_auth(authorization) | |
| if pro_status["is_pro"]: | |
| return {"remaining": 9999, "is_pro": True, "limit": FREE_DAILY_LIMIT} | |
| # 3. Identifier | |
| user_id = fingerprint if fingerprint else ip_address | |
| # 4. Usage | |
| today = datetime.date.today().isoformat() | |
| user_data = USAGE_DB.get(user_id, {"count": 0, "date": today}) | |
| if user_data.get("date") != today: | |
| return {"remaining": FREE_DAILY_LIMIT, "is_pro": False, "limit": FREE_DAILY_LIMIT} | |
| remaining = max(0, FREE_DAILY_LIMIT - user_data.get("count", 0)) | |
| return {"remaining": remaining, "is_pro": False, "limit": FREE_DAILY_LIMIT} | |