Spaces:
Sleeping
Sleeping
| import os | |
| import secrets | |
| import hashlib | |
| from datetime import datetime | |
| from typing import Optional | |
| from fastapi import Depends, HTTPException, status, Header | |
| from sqlalchemy.orm import Session | |
| from .db import SessionLocal | |
| from .models import APIKey, User | |
| def get_db(): | |
| """Database dependency.""" | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| def generate_api_key() -> str: | |
| """ | |
| Generate a secure API key. | |
| Format: sk_live_<random_64_char_hex> | |
| """ | |
| random_bytes = secrets.token_bytes(32) | |
| random_hex = random_bytes.hex() | |
| return f"sk_live_{random_hex}" | |
| def hash_api_key(key: str) -> str: | |
| """Hash an API key using SHA-256.""" | |
| return hashlib.sha256(key.encode()).hexdigest() | |
| def verify_api_key(key: str, key_hash: str) -> bool: | |
| """Verify an API key against its hash.""" | |
| return hash_api_key(key) == key_hash | |
| def get_api_key_prefix(key: str) -> str: | |
| """Get the prefix of an API key for display purposes.""" | |
| return key[:12] + "..." if len(key) > 12 else key | |
| async def get_user_from_api_key( | |
| api_key: Optional[str] = Header(None, alias="X-API-Key"), | |
| db: Session = Depends(get_db) | |
| ) -> Optional[User]: | |
| """ | |
| Authenticate user from API key header. | |
| Returns User if valid, None if no API key provided. | |
| Raises HTTPException if API key is invalid. | |
| """ | |
| if not api_key: | |
| return None | |
| # Hash the provided key | |
| key_hash = hash_api_key(api_key) | |
| # Find the API key in database | |
| api_key_record = ( | |
| db.query(APIKey) | |
| .filter(APIKey.key_hash == key_hash) | |
| .filter(APIKey.is_active == True) | |
| .first() | |
| ) | |
| if not api_key_record: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid API key", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| # Update last used timestamp | |
| api_key_record.last_used_at = datetime.utcnow() | |
| db.commit() | |
| # Get the user | |
| user = db.query(User).filter(User.id == api_key_record.user_id).first() | |
| if not user: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="User not found for API key", | |
| ) | |
| return user | |
| async def get_current_user_or_api_key( | |
| api_key_user: Optional[User] = Depends(get_user_from_api_key), | |
| # JWT auth will be handled separately in main.py | |
| ) -> Optional[User]: | |
| """ | |
| Dependency that returns user from API key if provided, otherwise None. | |
| This allows endpoints to support both JWT and API key authentication. | |
| """ | |
| return api_key_user | |