unesco-data-ai's picture
Deploy v2.0.0 [api-dev]
5ca88f7 verified
"""
API Authentication
Provides API key authentication with environment-based configuration.
"""
import os
import secrets
from typing import Optional, Dict
from pydantic import BaseModel
from fastapi import HTTPException, status, Header, Query, Depends
from fastapi.security import APIKeyHeader
# API Key header scheme
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
class APIKeyInfo(BaseModel):
"""API key information."""
key: str
name: str
permissions: list = []
# Cache for API keys (loaded from environment)
_valid_api_keys: Dict[str, APIKeyInfo] = {}
def load_api_keys() -> Dict[str, APIKeyInfo]:
"""
Load API keys from environment variables.
Format:
API_KEY=your-api-key
API_KEY_NAME=Production
API_KEY_PERMISSIONS=read,write
Multiple keys can be set with suffixes:
API_KEY_1=...
API_KEY_2=...
"""
global _valid_api_keys
if _valid_api_keys:
return _valid_api_keys
keys = {}
# Load primary API key
primary_key = os.getenv("API_KEY")
if primary_key:
keys[primary_key] = APIKeyInfo(
key=primary_key,
name=os.getenv("API_KEY_NAME", "Default"),
permissions=os.getenv("API_KEY_PERMISSIONS", "read,write").split(",")
)
# Load additional API keys (API_KEY_1, API_KEY_2, etc.)
for i in range(1, 10):
key = os.getenv(f"API_KEY_{i}")
if key:
keys[key] = APIKeyInfo(
key=key,
name=os.getenv(f"API_KEY_{i}_NAME", f"Key {i}"),
permissions=os.getenv(f"API_KEY_{i}_PERMISSIONS", "read,write").split(",")
)
# Fallback to demo key in development (not production)
if not keys and os.getenv("UNESDOC_PIPELINE_ENV") != "production":
demo_key = "demo-key-12345"
keys[demo_key] = APIKeyInfo(
key=demo_key,
name="Demo User",
permissions=["read", "write"]
)
_valid_api_keys = keys
return keys
def setup_demo_key():
"""Initialize API keys on startup."""
load_api_keys()
class APIKeyMissingError(HTTPException):
"""Error when API key is missing."""
def __init__(self):
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing API key. Include 'X-API-Key' header."
)
class APIKeyInvalidError(HTTPException):
"""Error when API key is invalid."""
def __init__(self):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key."
)
async def verify_api_key(
header_key: Optional[str] = Header(None, alias="X-API-Key"),
query_key: Optional[str] = Query(None, alias="api_key")
) -> APIKeyInfo:
"""
Verify API key from header or query parameter.
Args:
header_key: API key from X-API-Key header
query_key: API key from api_key query parameter
Returns:
APIKeyInfo if valid
Raises:
APIKeyMissingError: If no key provided
APIKeyInvalidError: If key is invalid
"""
provided_key = header_key or query_key
if not provided_key:
raise APIKeyMissingError()
# Load keys from environment
valid_keys = load_api_keys()
if not valid_keys:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="No API keys configured. Set API_KEY environment variable."
)
# Use timing-safe comparison
for valid_key, info in valid_keys.items():
if secrets.compare_digest(provided_key, valid_key):
return info
raise APIKeyInvalidError()
async def verify_api_key_optional(
header_key: Optional[str] = Header(None, alias="X-API-Key"),
query_key: Optional[str] = Query(None, alias="api_key")
) -> Optional[APIKeyInfo]:
"""
Optionally verify API key (for public endpoints with enhanced access).
Returns None if no key provided, APIKeyInfo if valid key.
"""
provided_key = header_key or query_key
if not provided_key:
return None
valid_keys = load_api_keys()
for valid_key, info in valid_keys.items():
if secrets.compare_digest(provided_key, valid_key):
return info
return None