| """ |
| API Authentication |
| |
| Provides API key authentication with environment-based configuration. |
| """ |
|
|
| import os |
| import secrets |
| from typing import Optional, Dict |
| from pydantic import BaseModel |
| from fastapi import HTTPException, status, Header, Query, Depends |
| from fastapi.security import APIKeyHeader |
|
|
| |
| api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) |
|
|
|
|
| class APIKeyInfo(BaseModel): |
| """API key information.""" |
| key: str |
| name: str |
| permissions: list = [] |
|
|
|
|
| |
| _valid_api_keys: Dict[str, APIKeyInfo] = {} |
|
|
|
|
| def load_api_keys() -> Dict[str, APIKeyInfo]: |
| """ |
| Load API keys from environment variables. |
| |
| Format: |
| API_KEY=your-api-key |
| API_KEY_NAME=Production |
| API_KEY_PERMISSIONS=read,write |
| |
| Multiple keys can be set with suffixes: |
| API_KEY_1=... |
| API_KEY_2=... |
| """ |
| global _valid_api_keys |
| |
| if _valid_api_keys: |
| return _valid_api_keys |
| |
| keys = {} |
| |
| |
| primary_key = os.getenv("API_KEY") |
| if primary_key: |
| keys[primary_key] = APIKeyInfo( |
| key=primary_key, |
| name=os.getenv("API_KEY_NAME", "Default"), |
| permissions=os.getenv("API_KEY_PERMISSIONS", "read,write").split(",") |
| ) |
| |
| |
| for i in range(1, 10): |
| key = os.getenv(f"API_KEY_{i}") |
| if key: |
| keys[key] = APIKeyInfo( |
| key=key, |
| name=os.getenv(f"API_KEY_{i}_NAME", f"Key {i}"), |
| permissions=os.getenv(f"API_KEY_{i}_PERMISSIONS", "read,write").split(",") |
| ) |
| |
| |
| if not keys and os.getenv("UNESDOC_PIPELINE_ENV") != "production": |
| demo_key = "demo-key-12345" |
| keys[demo_key] = APIKeyInfo( |
| key=demo_key, |
| name="Demo User", |
| permissions=["read", "write"] |
| ) |
| |
| _valid_api_keys = keys |
| return keys |
|
|
|
|
| def setup_demo_key(): |
| """Initialize API keys on startup.""" |
| load_api_keys() |
|
|
|
|
| class APIKeyMissingError(HTTPException): |
| """Error when API key is missing.""" |
| def __init__(self): |
| super().__init__( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="Missing API key. Include 'X-API-Key' header." |
| ) |
|
|
|
|
| class APIKeyInvalidError(HTTPException): |
| """Error when API key is invalid.""" |
| def __init__(self): |
| super().__init__( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Invalid API key." |
| ) |
|
|
|
|
| async def verify_api_key( |
| header_key: Optional[str] = Header(None, alias="X-API-Key"), |
| query_key: Optional[str] = Query(None, alias="api_key") |
| ) -> APIKeyInfo: |
| """ |
| Verify API key from header or query parameter. |
| |
| Args: |
| header_key: API key from X-API-Key header |
| query_key: API key from api_key query parameter |
| |
| Returns: |
| APIKeyInfo if valid |
| |
| Raises: |
| APIKeyMissingError: If no key provided |
| APIKeyInvalidError: If key is invalid |
| """ |
| provided_key = header_key or query_key |
| |
| if not provided_key: |
| raise APIKeyMissingError() |
| |
| |
| valid_keys = load_api_keys() |
| |
| if not valid_keys: |
| raise HTTPException( |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, |
| detail="No API keys configured. Set API_KEY environment variable." |
| ) |
| |
| |
| for valid_key, info in valid_keys.items(): |
| if secrets.compare_digest(provided_key, valid_key): |
| return info |
| |
| raise APIKeyInvalidError() |
|
|
|
|
| async def verify_api_key_optional( |
| header_key: Optional[str] = Header(None, alias="X-API-Key"), |
| query_key: Optional[str] = Query(None, alias="api_key") |
| ) -> Optional[APIKeyInfo]: |
| """ |
| Optionally verify API key (for public endpoints with enhanced access). |
| |
| Returns None if no key provided, APIKeyInfo if valid key. |
| """ |
| provided_key = header_key or query_key |
| |
| if not provided_key: |
| return None |
| |
| valid_keys = load_api_keys() |
| |
| for valid_key, info in valid_keys.items(): |
| if secrets.compare_digest(provided_key, valid_key): |
| return info |
| |
| return None |
|
|