Spaces:
Running
Running
| import os | |
| import uuid | |
| import hashlib | |
| import hmac | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| from jose import JWTError, jwt | |
| from passlib.context import CryptContext | |
| from fastapi import HTTPException, Security, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| # Config | |
| SECRET_KEY = os.getenv("SECRET_KEY", "phantomeye-secret-key-change-in-production-2026") | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| bearer_scheme = HTTPBearer() | |
| # In-memory API key store — replace with PostgreSQL in production | |
| API_KEYS_DB: dict[str, dict] = {} | |
| # Pre-registered demo keys for testing | |
| DEMO_KEYS = { | |
| "PE-DEMO-KEY-2026-FREE": { | |
| "client": "demo_user", | |
| "tier": "free", | |
| "rate_limit": 100, | |
| "calls_today": 0, | |
| "created_at": datetime.utcnow().isoformat(), | |
| "active": True, | |
| }, | |
| "PE-PROD-KEY-2026-PRO": { | |
| "client": "pro_user", | |
| "tier": "pro", | |
| "rate_limit": 10000, | |
| "calls_today": 0, | |
| "created_at": datetime.utcnow().isoformat(), | |
| "active": True, | |
| } | |
| } | |
| API_KEYS_DB.update(DEMO_KEYS) | |
| def generate_api_key(client_name: str, tier: str = "free") -> str: | |
| """Generate a unique PhantomEye API key.""" | |
| unique = str(uuid.uuid4()).replace("-", "").upper()[:16] | |
| key = f"PE-{tier.upper()}-{unique}" | |
| API_KEYS_DB[key] = { | |
| "client": client_name, | |
| "tier": tier, | |
| "rate_limit": 100 if tier == "free" else 10000, | |
| "calls_today": 0, | |
| "created_at": datetime.utcnow().isoformat(), | |
| "active": True, | |
| } | |
| return key | |
| def validate_api_key(api_key: str) -> dict: | |
| """Validate API key and check rate limit.""" | |
| if api_key not in API_KEYS_DB: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid API key" | |
| ) | |
| key_data = API_KEYS_DB[api_key] | |
| if not key_data["active"]: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="API key is disabled" | |
| ) | |
| if key_data["calls_today"] >= key_data["rate_limit"]: | |
| raise HTTPException( | |
| status_code=status.HTTP_429_TOO_MANY_REQUESTS, | |
| detail=f"Rate limit exceeded — {key_data['rate_limit']} calls/day for {key_data['tier']} tier" | |
| ) | |
| API_KEYS_DB[api_key]["calls_today"] += 1 | |
| return key_data | |
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| """Create JWT access token.""" | |
| to_encode = data.copy() | |
| expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)) | |
| to_encode.update({"exp": expire}) | |
| return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) | |
| def verify_token(credentials: HTTPAuthorizationCredentials = Security(bearer_scheme)) -> dict: | |
| """Verify JWT token from Authorization header.""" | |
| try: | |
| payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]) | |
| client = payload.get("sub") | |
| if client is None: | |
| raise HTTPException(status_code=401, detail="Invalid token") | |
| return payload | |
| except JWTError: | |
| raise HTTPException(status_code=401, detail="Invalid or expired token") | |
| def get_api_key_stats() -> dict: | |
| """Return stats about all registered API keys.""" | |
| return { | |
| "total_keys": len(API_KEYS_DB), | |
| "active_keys": sum(1 for k in API_KEYS_DB.values() if k["active"]), | |
| "keys": [ | |
| { | |
| "key": k[:12] + "****", | |
| "client": v["client"], | |
| "tier": v["tier"], | |
| "calls_today": v["calls_today"], | |
| "rate_limit": v["rate_limit"], | |
| "active": v["active"], | |
| } | |
| for k, v in API_KEYS_DB.items() | |
| ] | |
| } |