PhantomEye / api /auth.py
Abu-Sameer-66's picture
Upload folder using huggingface_hub
0ab99bb verified
import os
import uuid
import hashlib
import hmac
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, Security, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
# Config
SECRET_KEY = os.getenv("SECRET_KEY", "phantomeye-secret-key-change-in-production-2026")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # 24 hours
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
bearer_scheme = HTTPBearer()
# In-memory API key store — replace with PostgreSQL in production
API_KEYS_DB: dict[str, dict] = {}
# Pre-registered demo keys for testing
DEMO_KEYS = {
"PE-DEMO-KEY-2026-FREE": {
"client": "demo_user",
"tier": "free",
"rate_limit": 100,
"calls_today": 0,
"created_at": datetime.utcnow().isoformat(),
"active": True,
},
"PE-PROD-KEY-2026-PRO": {
"client": "pro_user",
"tier": "pro",
"rate_limit": 10000,
"calls_today": 0,
"created_at": datetime.utcnow().isoformat(),
"active": True,
}
}
API_KEYS_DB.update(DEMO_KEYS)
def generate_api_key(client_name: str, tier: str = "free") -> str:
"""Generate a unique PhantomEye API key."""
unique = str(uuid.uuid4()).replace("-", "").upper()[:16]
key = f"PE-{tier.upper()}-{unique}"
API_KEYS_DB[key] = {
"client": client_name,
"tier": tier,
"rate_limit": 100 if tier == "free" else 10000,
"calls_today": 0,
"created_at": datetime.utcnow().isoformat(),
"active": True,
}
return key
def validate_api_key(api_key: str) -> dict:
"""Validate API key and check rate limit."""
if api_key not in API_KEYS_DB:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key"
)
key_data = API_KEYS_DB[api_key]
if not key_data["active"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API key is disabled"
)
if key_data["calls_today"] >= key_data["rate_limit"]:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Rate limit exceeded — {key_data['rate_limit']} calls/day for {key_data['tier']} tier"
)
API_KEYS_DB[api_key]["calls_today"] += 1
return key_data
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token."""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_token(credentials: HTTPAuthorizationCredentials = Security(bearer_scheme)) -> dict:
"""Verify JWT token from Authorization header."""
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
client = payload.get("sub")
if client is None:
raise HTTPException(status_code=401, detail="Invalid token")
return payload
except JWTError:
raise HTTPException(status_code=401, detail="Invalid or expired token")
def get_api_key_stats() -> dict:
"""Return stats about all registered API keys."""
return {
"total_keys": len(API_KEYS_DB),
"active_keys": sum(1 for k in API_KEYS_DB.values() if k["active"]),
"keys": [
{
"key": k[:12] + "****",
"client": v["client"],
"tier": v["tier"],
"calls_today": v["calls_today"],
"rate_limit": v["rate_limit"],
"active": v["active"],
}
for k, v in API_KEYS_DB.items()
]
}