Spaces:
Paused
Paused
| """ | |
| API key management system for enterprise security | |
| """ | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import secrets | |
| import time | |
| from datetime import datetime, timedelta | |
| from typing import Any | |
| from fastapi import HTTPException | |
| logger = logging.getLogger(__name__) | |
| class APIKeyManager: | |
| """Enterprise API key management""" | |
| def __init__(self, redis_client=None): | |
| self.redis = redis_client | |
| self.algorithm = os.getenv("HASH_ALGORITHM", "HS256") | |
| self.pepper = os.getenv("HASH_PEPPER", "").encode() | |
| self.key_length = int(os.getenv("KEY_LENGTH", "32")) | |
| async def generate_api_key(self, name: str, permissions: list[str], expires_days: int = 365) -> dict[str, Any]: | |
| """Generate a new API key""" | |
| try: | |
| # Generate secure random key | |
| key = secrets.token_urlsafe(32) | |
| # Hash the key for storage | |
| key_hash = self._hash_key(key) | |
| # Create key record | |
| key_data = { | |
| "name": name, | |
| "key_hash": key_hash, | |
| "permissions": permissions, | |
| "created_at": datetime.utcnow().isoformat(), | |
| "expires_at": (datetime.utcnow() + timedelta(days=expires_days)).isoformat(), | |
| "last_used_at": None, | |
| "usage_count": 0, | |
| "is_active": True, | |
| "description": f"API key for {name}", | |
| } | |
| # Store in Redis | |
| if self.redis: | |
| await self.redis.setex( | |
| f"api_key:{key_hash}", | |
| expires_days * 24 * 3600, # Convert days to seconds | |
| json.dumps(key_data), | |
| ) | |
| return { | |
| "api_key": key, | |
| "key_hash": key_hash, | |
| "permissions": permissions, | |
| "expires_at": key_data["expires_at"], | |
| "description": key_data["description"], | |
| } | |
| except Exception as e: | |
| logger.error(f"Error generating API key: {e}") | |
| raise HTTPException("API key generation failed") | |
| def _hash_key(self, key: str) -> str: | |
| """Hash API key for storage""" | |
| return hashlib.sha256(key.encode() + self.pepper).hexdigest() | |
| def _verify_key(self, key: str, key_hash: str) -> bool: | |
| """Verify API key against stored hash""" | |
| stored_hash = hashlib.sha256(key.encode() + self.pepper).hexdigest() | |
| return stored_hash == key_hash | |
| async def validate_api_key(self, key_hash: str, required_permissions: list[str]) -> dict[str, Any]: | |
| """Validate API key and permissions""" | |
| if not self.redis: | |
| return {"valid": False, "error": "Redis not available"} | |
| try: | |
| key_data_str = await self.redis.get(f"api_key:{key_hash}") | |
| if not key_data_str: | |
| return {"valid": False, "error": "API key not found"} | |
| key_data = json.loads(key_data_str) | |
| # Check if key is expired | |
| if datetime.fromisoformat(key_data["expires_at"]) < datetime.utcnow(): | |
| return {"valid": False, "error": "API key expired"} | |
| # Check if key is active | |
| if not key_data.get("is_active"): | |
| return {"valid": False, "error": "API key is disabled"} | |
| # Check permissions | |
| key_permissions = set(key_data.get("permissions", [])) | |
| required_set = set(required_permissions) | |
| if not required_set.issubset(key_permissions): | |
| return { | |
| "valid": False, | |
| "error": f"Insufficient permissions. Required: {required_permissions}, Has: {list(key_permissions)}", | |
| "missing": list(required_set - key_permissions), | |
| } | |
| # Update last used and usage count | |
| key_data["last_used_at"] = datetime.utcnow().isoformat() | |
| key_data["usage_count"] = key_data.get("usage_count", 0) + 1 | |
| await self.redis.setex( | |
| f"api_key:{key_hash}", | |
| 365 * 24 * 3600, # 1 year | |
| json.dumps(key_data), | |
| ) | |
| return {"valid": True, "key_data": key_data, "permissions": list(key_permissions)} | |
| except Exception as e: | |
| logger.error(f"Error validating API key: {e}") | |
| return {"valid": False, "error": "API key validation failed"} | |
| async def revoke_api_key(self, key_hash: str) -> bool: | |
| """Revoke an API key""" | |
| if not self.redis: | |
| return False | |
| try: | |
| key_data_str = await self.redis.get(f"api_key:{key_hash}") | |
| if key_data_str: | |
| key_data = json.loads(key_data_str) | |
| key_data["is_active"] = False | |
| key_data["revoked_at"] = datetime.utcnow().isoformat() | |
| await self.redis.setex( | |
| f"api_key:{key_hash}", | |
| 365 * 24 * 3600, # Keep for 1 year | |
| json.dumps(key_data), | |
| ) | |
| logger.info(f"API key {key_hash[:8]}... revoked") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error revoking API key: {e}") | |
| return False | |
| async def list_api_keys(self, include_revoked: bool = False) -> list[dict[str, Any]]: | |
| """List all API keys""" | |
| if not self.redis: | |
| return [] | |
| try: | |
| keys = await self.redis.keys("api_key:*") | |
| api_keys = [] | |
| for key in keys: | |
| try: | |
| key_data = json.loads(await self.redis.get(key)) | |
| if include_revoked or key_data.get("is_active"): | |
| api_keys.append({"key_hash": key.split(":")[1], **key_data}) | |
| except Exception as e: | |
| logger.error(f"Error loading API key data: {e}") | |
| return api_keys | |
| except Exception as e: | |
| logger.error(f"Error listing API keys: {e}") | |
| return [] | |
| def generate_jwt_token(self, user_id: str, key_hash: str, expires_minutes: int = 60) -> str: | |
| """Generate JWT token for API key""" | |
| try: | |
| # Create token with user ID and key hash | |
| token_data = { | |
| "user_id": user_id, | |
| "api_key_hash": key_hash, | |
| "exp": int(time.time()) + (expires_minutes * 60), | |
| "iat": int(time.time()), | |
| "sub": "api_access", | |
| } | |
| # This would typically use a real JWT library | |
| # For now, return a simple token format | |
| return f"Bearer {secrets.token_hex(32)}_{hashlib.sha256(json.dumps(token_data).encode()).hexdigest()}" | |
| except Exception as e: | |
| logger.error(f"Error generating JWT token: {e}") | |
| raise HTTPException("Token generation failed") | |
| # Permission levels | |
| PERMISSIONS = { | |
| "admin": ["read", "write", "delete", "manage_users"], | |
| "user": ["read", "write"], | |
| "read_only": ["read"], | |
| "analytics": ["read"], | |
| "billing": ["read", "write"], | |
| "api_full": ["read", "write", "delete"], | |
| } | |
| # Global API key manager instance | |
| api_key_manager = APIKeyManager() | |
| # API key validation function | |
| async def verify_api_key(key_hash: str, required_permission: str = None): | |
| """FastAPI dependency for API key validation""" | |
| if required_permission: | |
| required_permissions = [required_permission] | |
| return await api_key_manager.validate_api_key(key_hash, required_permissions) | |
| # API key creation decorator | |
| def require_api_key_permissions(permissions: list[str]): | |
| """Decorator to require specific API key permissions""" | |
| def decorator(func): | |
| async def wrapper(*args, **kwargs): | |
| return await func(*args, **kwargs) | |
| return wrapper | |
| return decorator | |