Spaces:
Paused
Paused
| import hashlib | |
| import json | |
| import logging | |
| import secrets | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Tuple | |
| from fastapi import Depends, HTTPException, Security, status | |
| from fastapi.security.api_key import APIKeyHeader | |
| from sqlalchemy.orm import Session | |
| from core.database import get_db | |
| from core.models import APIKey | |
| logger = logging.getLogger(__name__) | |
| API_KEY_NAME = "X-API-Key" | |
| api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) | |
| class APIKeyService: | |
| """Service for managing and validating API keys with database persistence""" | |
| def _hash_key(self, key: str) -> str: | |
| """Hash API key for storage using SHA-256""" | |
| return hashlib.sha256(key.encode()).hexdigest() | |
| def generate_api_key( | |
| self, | |
| db: Session, | |
| user_id: str, | |
| name: str, | |
| description: Optional[str] = None, | |
| permissions: list[str] = None, | |
| expires_days: int = 365, | |
| ) -> Tuple[str, APIKey]: | |
| """ | |
| Generate a new persistent API key | |
| Returns (raw_key, api_key_model) | |
| """ | |
| try: | |
| # Generate safe random key: sk_... | |
| raw_secret = secrets.token_urlsafe(32) | |
| prefix = raw_secret[:8] | |
| full_key = f"sk_{raw_secret}" | |
| key_hash = self._hash_key(full_key) | |
| expires_at = datetime.utcnow() + timedelta(days=expires_days) | |
| api_key = APIKey( | |
| key_prefix=prefix, | |
| key_hash=key_hash, | |
| name=name, | |
| description=description, | |
| user_id=user_id, | |
| permissions=json.dumps(permissions or ["read"]), | |
| expires_at=expires_at, | |
| ) | |
| db.add(api_key) | |
| db.commit() | |
| db.refresh(api_key) | |
| logger.info(f"Generated new API key for user {user_id}: {name}") | |
| return full_key, api_key | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Failed to generate API key: {e}") | |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Could not generate API key") | |
| def validate_key(self, db: Session, key: str) -> Optional[APIKey]: | |
| """Validate an API key and return the model if valid""" | |
| if not key: | |
| return None | |
| try: | |
| key_hash = self._hash_key(key) | |
| api_key = db.query(APIKey).filter(APIKey.key_hash == key_hash, APIKey.is_active).first() | |
| if not api_key: | |
| return None | |
| # Check expiration | |
| if api_key.expires_at and api_key.expires_at < datetime.utcnow(): | |
| logger.warning(f"Attempted use of expired API key: {api_key.key_prefix}") | |
| return None | |
| # Update last used | |
| api_key.last_used_at = datetime.utcnow() | |
| db.commit() | |
| return api_key | |
| except Exception as e: | |
| logger.error(f"Error validating API key: {e}") | |
| return None | |
| def revoke_key(self, db: Session, key_id: str, user_id: str) -> bool: | |
| """Revoke an API key""" | |
| try: | |
| api_key = db.query(APIKey).filter(APIKey.id == key_id, APIKey.user_id == user_id).first() | |
| if not api_key: | |
| return False | |
| api_key.is_active = False | |
| db.commit() | |
| logger.info(f"Revoked API key {key_id} for user {user_id}") | |
| return True | |
| except Exception as e: | |
| db.rollback() | |
| logger.error(f"Error revoking API key: {e}") | |
| return False | |
| def list_keys(self, db: Session, user_id: str) -> list[APIKey]: | |
| """List active API keys for a user""" | |
| return db.query(APIKey).filter(APIKey.user_id == user_id, APIKey.is_active).all() | |
| api_key_service = APIKeyService() | |
| async def get_current_api_key(api_key: str = Security(api_key_header), db: Session = Depends(get_db)) -> APIKey: | |
| """Dependency to get and validate API key from header""" | |
| if not api_key: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="API Key missing", | |
| ) | |
| validated_key = api_key_service.validate_key(db, api_key) | |
| if not validated_key: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired API Key", | |
| ) | |
| return validated_key | |