| | import os |
| | import secrets |
| | import hashlib |
| | from datetime import datetime |
| | from typing import Optional |
| | from fastapi import Depends, HTTPException, status, Header |
| | from sqlalchemy.orm import Session |
| | from .db import SessionLocal |
| | from .models import APIKey, User |
| |
|
| |
|
| | def get_db(): |
| | """Database dependency.""" |
| | db = SessionLocal() |
| | try: |
| | yield db |
| | finally: |
| | db.close() |
| |
|
| |
|
| | def generate_api_key() -> str: |
| | """ |
| | Generate a secure API key. |
| | Format: sk_live_<random_64_char_hex> |
| | """ |
| | random_bytes = secrets.token_bytes(32) |
| | random_hex = random_bytes.hex() |
| | return f"sk_live_{random_hex}" |
| |
|
| |
|
| | def hash_api_key(key: str) -> str: |
| | """Hash an API key using SHA-256.""" |
| | return hashlib.sha256(key.encode()).hexdigest() |
| |
|
| |
|
| | def verify_api_key(key: str, key_hash: str) -> bool: |
| | """Verify an API key against its hash.""" |
| | return hash_api_key(key) == key_hash |
| |
|
| |
|
| | def get_api_key_prefix(key: str) -> str: |
| | """Get the prefix of an API key for display purposes.""" |
| | return key[:12] + "..." if len(key) > 12 else key |
| |
|
| |
|
| | async def get_user_from_api_key( |
| | api_key: Optional[str] = Header(None, alias="X-API-Key"), |
| | db: Session = Depends(get_db) |
| | ) -> Optional[User]: |
| | """ |
| | Authenticate user from API key header. |
| | Returns User if valid, None if no API key provided. |
| | Raises HTTPException if API key is invalid. |
| | """ |
| | if not api_key: |
| | return None |
| | |
| | |
| | key_hash = hash_api_key(api_key) |
| | |
| | |
| | api_key_record = ( |
| | db.query(APIKey) |
| | .filter(APIKey.key_hash == key_hash) |
| | .filter(APIKey.is_active == True) |
| | .first() |
| | ) |
| | |
| | if not api_key_record: |
| | raise HTTPException( |
| | status_code=status.HTTP_401_UNAUTHORIZED, |
| | detail="Invalid API key", |
| | headers={"WWW-Authenticate": "Bearer"}, |
| | ) |
| | |
| | |
| | api_key_record.last_used_at = datetime.utcnow() |
| | db.commit() |
| | |
| | |
| | user = db.query(User).filter(User.id == api_key_record.user_id).first() |
| | if not user: |
| | raise HTTPException( |
| | status_code=status.HTTP_401_UNAUTHORIZED, |
| | detail="User not found for API key", |
| | ) |
| | |
| | return user |
| |
|
| |
|
| | async def get_current_user_or_api_key( |
| | api_key_user: Optional[User] = Depends(get_user_from_api_key), |
| | |
| | ) -> Optional[User]: |
| | """ |
| | Dependency that returns user from API key if provided, otherwise None. |
| | This allows endpoints to support both JWT and API key authentication. |
| | """ |
| | return api_key_user |
| |
|
| |
|