zenith-backend / app /services /infrastructure /api_key_service.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
import hashlib
import json
import logging
import secrets
from datetime import datetime, timedelta
from typing import Optional, Tuple
from fastapi import Depends, HTTPException, Security, status
from fastapi.security.api_key import APIKeyHeader
from sqlalchemy.orm import Session
from core.database import get_db
from core.models import APIKey
logger = logging.getLogger(__name__)
API_KEY_NAME = "X-API-Key"
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
class APIKeyService:
"""Service for managing and validating API keys with database persistence"""
def _hash_key(self, key: str) -> str:
"""Hash API key for storage using SHA-256"""
return hashlib.sha256(key.encode()).hexdigest()
def generate_api_key(
self,
db: Session,
user_id: str,
name: str,
description: Optional[str] = None,
permissions: list[str] = None,
expires_days: int = 365,
) -> Tuple[str, APIKey]:
"""
Generate a new persistent API key
Returns (raw_key, api_key_model)
"""
try:
# Generate safe random key: sk_...
raw_secret = secrets.token_urlsafe(32)
prefix = raw_secret[:8]
full_key = f"sk_{raw_secret}"
key_hash = self._hash_key(full_key)
expires_at = datetime.utcnow() + timedelta(days=expires_days)
api_key = APIKey(
key_prefix=prefix,
key_hash=key_hash,
name=name,
description=description,
user_id=user_id,
permissions=json.dumps(permissions or ["read"]),
expires_at=expires_at,
)
db.add(api_key)
db.commit()
db.refresh(api_key)
logger.info(f"Generated new API key for user {user_id}: {name}")
return full_key, api_key
except Exception as e:
db.rollback()
logger.error(f"Failed to generate API key: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not generate API key")
def validate_key(self, db: Session, key: str) -> Optional[APIKey]:
"""Validate an API key and return the model if valid"""
if not key:
return None
try:
key_hash = self._hash_key(key)
api_key = db.query(APIKey).filter(APIKey.key_hash == key_hash, APIKey.is_active).first()
if not api_key:
return None
# Check expiration
if api_key.expires_at and api_key.expires_at < datetime.utcnow():
logger.warning(f"Attempted use of expired API key: {api_key.key_prefix}")
return None
# Update last used
api_key.last_used_at = datetime.utcnow()
db.commit()
return api_key
except Exception as e:
logger.error(f"Error validating API key: {e}")
return None
def revoke_key(self, db: Session, key_id: str, user_id: str) -> bool:
"""Revoke an API key"""
try:
api_key = db.query(APIKey).filter(APIKey.id == key_id, APIKey.user_id == user_id).first()
if not api_key:
return False
api_key.is_active = False
db.commit()
logger.info(f"Revoked API key {key_id} for user {user_id}")
return True
except Exception as e:
db.rollback()
logger.error(f"Error revoking API key: {e}")
return False
def list_keys(self, db: Session, user_id: str) -> list[APIKey]:
"""List active API keys for a user"""
return db.query(APIKey).filter(APIKey.user_id == user_id, APIKey.is_active).all()
api_key_service = APIKeyService()
async def get_current_api_key(api_key: str = Security(api_key_header), db: Session = Depends(get_db)) -> APIKey:
"""Dependency to get and validate API key from header"""
if not api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API Key missing",
)
validated_key = api_key_service.validate_key(db, api_key)
if not validated_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired API Key",
)
return validated_key