Spaces:
Running
Running
| """ | |
| Knowledge Universe β API Key Authentication + Usage Tracking | |
| ============================================================ | |
| Lightweight, Redis-backed. No Stripe yet (week 2). | |
| Key format: ku_live_{uuid4_hex} (production) | |
| ku_test_{uuid4_hex} (test/free tier) | |
| Redis schema: | |
| ku:key:{sha256(api_key)[:32]} β JSON { | |
| "customer_id": str, | |
| "email": str, | |
| "tier": "free" | "starter" | "growth" | "pro", | |
| "calls_limit": int, | |
| "created_at": ISO str | |
| } | |
| ku:usage:{customer_id}:{YYYY-MM} β int (INCR, expires in 31 days) | |
| Tier limits (calls/month): | |
| free: 500 | |
| starter: 5_000 | |
| growth: 20_000 | |
| pro: 75_000 | |
| """ | |
| import hashlib | |
| import json | |
| import logging | |
| import secrets | |
| import uuid | |
| from datetime import datetime, timezone | |
| from typing import Dict, List, Any, Optional | |
| from fastapi import Header, HTTPException, Request | |
| logger = logging.getLogger(__name__) | |
| TIER_LIMITS = { | |
| "free": 500, | |
| "starter": 5_000, | |
| "growth": 20_000, | |
| "pro": 75_000, | |
| } | |
| # ββ Key helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_api_key(tier: str = "free") -> str: | |
| """Generate a new API key for a customer.""" | |
| prefix = "ku_live" if tier != "free" else "ku_test" | |
| token = secrets.token_hex(24) | |
| return f"{prefix}_{token}" | |
| def hash_key(api_key: str) -> str: | |
| """One-way hash of an API key for safe Redis storage.""" | |
| return hashlib.sha256(api_key.encode()).hexdigest()[:32] | |
| def redis_key(api_key: str) -> str: | |
| return f"ku:key:{hash_key(api_key)}" | |
| def usage_key(customer_id: str) -> str: | |
| month = datetime.now(timezone.utc).strftime("%Y-%m") | |
| return f"ku:usage:{customer_id}:{month}" | |
| # ββ Core auth functions ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def create_customer( | |
| redis, | |
| email: str, | |
| tier: str = "free", | |
| ) -> dict: | |
| """ | |
| Create a new customer and store their API key in Redis. | |
| Includes initialization for half_life_overrides. | |
| """ | |
| api_key = generate_api_key(tier) | |
| customer_id = str(uuid.uuid4()) | |
| limit = TIER_LIMITS.get(tier, TIER_LIMITS["free"]) | |
| customer = { | |
| "customer_id": customer_id, | |
| "email": email, | |
| "tier": tier, | |
| "calls_limit": limit, | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| "half_life_overrides": {}, # ADDED: Initialize empty overrides | |
| } | |
| rkey = redis_key(api_key) | |
| # Use redis.client.set if your wrapper requires it, | |
| # matching the style in update_half_life_overrides | |
| await redis.client.set(rkey, json.dumps(customer)) | |
| email_hash = hashlib.sha256(email.lower().encode()).hexdigest()[:24] | |
| await redis.client.set(f"ku:email:{email_hash}", customer_id) | |
| logger.info(f"New customer created: {customer_id} tier={tier}") | |
| return {**customer, "api_key": api_key} | |
| # ============================================================ | |
| # FEATURE 7: Half-life customization per customer | |
| # src/api/auth.py β add update_half_life_overrides function | |
| # ============================================================ | |
| async def update_half_life_overrides( | |
| redis, | |
| api_key: str, | |
| overrides: Dict[str, int], | |
| ) -> bool: | |
| """Store per-customer half-life overrides in Redis customer record.""" | |
| try: | |
| rkey = redis_key(api_key) | |
| raw = await redis.client.get(rkey) | |
| if not raw: | |
| return False | |
| customer = json.loads(raw) | |
| customer["half_life_overrides"] = overrides | |
| await redis.client.set(rkey, json.dumps(customer)) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Half-life override update failed: {e}") | |
| return False | |
| async def get_customer(redis, api_key: str) -> Optional[dict]: | |
| """Look up customer by API key. Returns None if key is invalid.""" | |
| try: | |
| rkey = redis_key(api_key) | |
| raw = await redis.client.get(rkey) | |
| if not raw: | |
| return None | |
| return json.loads(raw) | |
| except Exception as e: | |
| logger.error(f"Customer lookup failed: {e}") | |
| return None | |
| async def email_exists(redis, email: str) -> bool: | |
| """Check if an email is already registered.""" | |
| email_hash = hashlib.sha256(email.lower().encode()).hexdigest()[:24] | |
| raw = await redis.client.get(f"ku:email:{email_hash}") | |
| return raw is not None | |
| async def get_usage(redis, customer_id: str) -> int: | |
| """Get current month call count for a customer.""" | |
| try: | |
| ukey = usage_key(customer_id) | |
| raw = await redis.client.get(ukey) | |
| return int(raw) if raw else 0 | |
| except Exception: | |
| return 0 | |
| async def increment_usage(redis, customer_id: str) -> int: | |
| """Increment usage counter. Returns new count.""" | |
| try: | |
| ukey = usage_key(customer_id) | |
| count = await redis.client.incr(ukey) | |
| if count == 1: | |
| await redis.client.expire(ukey, 2_678_400) | |
| return count | |
| except Exception as e: | |
| logger.error(f"Usage increment failed: {e}") | |
| return 0 | |
| # ββ FastAPI dependency βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def require_api_key( | |
| request: Request, | |
| x_api_key: Optional[str] = Header(None, alias="X-API-Key"), | |
| ) -> dict: | |
| """ | |
| FastAPI dependency. Validates key, checks quota, increments usage. | |
| Returns customer dict. Raises 401/429 on failure. | |
| """ | |
| if not x_api_key: | |
| raise HTTPException( | |
| status_code=401, | |
| detail={ | |
| "error": "MISSING_API_KEY", | |
| "message": "Include your API key in the X-API-Key header.", | |
| "docs": "https://knowledge-universe.onrender.com/docs#authentication", | |
| } | |
| ) | |
| redis = request.app.state.redis | |
| customer = await get_customer(redis, x_api_key) | |
| if not customer: | |
| raise HTTPException( | |
| status_code=401, | |
| detail={ | |
| "error": "INVALID_API_KEY", | |
| "message": "The API key you provided is invalid or has been revoked.", | |
| "docs": "https://knowledge-universe.onrender.com/docs#authentication", | |
| } | |
| ) | |
| limit = customer.get("calls_limit", TIER_LIMITS["free"]) | |
| used = await get_usage(redis, customer["customer_id"]) | |
| if used >= limit: | |
| raise HTTPException( | |
| status_code=429, | |
| detail={ | |
| "error": "QUOTA_EXCEEDED", | |
| "message": f"Monthly limit of {limit:,} calls reached.", | |
| "calls_used": used, | |
| "calls_limit": limit, | |
| "resets": "1st of next month", | |
| "upgrade": "https://knowledge-universe.onrender.com/docs#rate-limits", | |
| } | |
| ) | |
| new_count = await increment_usage(redis, customer["customer_id"]) | |
| request.state.customer = customer | |
| request.state.calls_used = new_count | |
| request.state.calls_limit = limit | |
| return customer | |
| # ββ Admin & RBAC dependencies ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def require_admin_key( | |
| request: Request, | |
| x_api_key: Optional[str] = Header(None, alias="X-API-Key"), | |
| ) -> dict: | |
| """ | |
| FastAPI dependency for administrative endpoints (e.g., cache invalidation). | |
| Validates key and strictly enforces Enterprise/Admin tier. Does NOT increment usage. | |
| """ | |
| if not x_api_key: | |
| raise HTTPException( | |
| status_code=401, | |
| detail={ | |
| "error": "MISSING_API_KEY", | |
| "message": "Include your API key in the X-API-Key header.", | |
| } | |
| ) | |
| redis = request.app.state.redis | |
| customer = await get_customer(redis, x_api_key) | |
| if not customer: | |
| raise HTTPException( | |
| status_code=401, | |
| detail={ | |
| "error": "INVALID_API_KEY", | |
| "message": "The API key you provided is invalid or has been revoked.", | |
| } | |
| ) | |
| # STRICT RBAC GUARD: Block free, starter, growth, and standard pro tiers. | |
| tier = customer.get("tier", "free").lower() | |
| if tier not in ("enterprise", "admin"): | |
| raise HTTPException( | |
| status_code=403, | |
| detail={ | |
| "error": "FORBIDDEN", | |
| "message": f"Your current tier ({tier}) does not have privileges to execute this action. Requires 'enterprise' tier.", | |
| } | |
| ) | |
| request.state.customer = customer | |
| return customer |