File size: 4,404 Bytes
5ca88f7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | """
API Authentication
Provides API key authentication with environment-based configuration.
"""
import os
import secrets
from typing import Optional, Dict
from pydantic import BaseModel
from fastapi import HTTPException, status, Header, Query, Depends
from fastapi.security import APIKeyHeader
# API Key header scheme
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
class APIKeyInfo(BaseModel):
"""API key information."""
key: str
name: str
permissions: list = []
# Cache for API keys (loaded from environment)
_valid_api_keys: Dict[str, APIKeyInfo] = {}
def load_api_keys() -> Dict[str, APIKeyInfo]:
"""
Load API keys from environment variables.
Format:
API_KEY=your-api-key
API_KEY_NAME=Production
API_KEY_PERMISSIONS=read,write
Multiple keys can be set with suffixes:
API_KEY_1=...
API_KEY_2=...
"""
global _valid_api_keys
if _valid_api_keys:
return _valid_api_keys
keys = {}
# Load primary API key
primary_key = os.getenv("API_KEY")
if primary_key:
keys[primary_key] = APIKeyInfo(
key=primary_key,
name=os.getenv("API_KEY_NAME", "Default"),
permissions=os.getenv("API_KEY_PERMISSIONS", "read,write").split(",")
)
# Load additional API keys (API_KEY_1, API_KEY_2, etc.)
for i in range(1, 10):
key = os.getenv(f"API_KEY_{i}")
if key:
keys[key] = APIKeyInfo(
key=key,
name=os.getenv(f"API_KEY_{i}_NAME", f"Key {i}"),
permissions=os.getenv(f"API_KEY_{i}_PERMISSIONS", "read,write").split(",")
)
# Fallback to demo key in development (not production)
if not keys and os.getenv("UNESDOC_PIPELINE_ENV") != "production":
demo_key = "demo-key-12345"
keys[demo_key] = APIKeyInfo(
key=demo_key,
name="Demo User",
permissions=["read", "write"]
)
_valid_api_keys = keys
return keys
def setup_demo_key():
"""Initialize API keys on startup."""
load_api_keys()
class APIKeyMissingError(HTTPException):
"""Error when API key is missing."""
def __init__(self):
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
detail="Missing API key. Include 'X-API-Key' header."
)
class APIKeyInvalidError(HTTPException):
"""Error when API key is invalid."""
def __init__(self):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key."
)
async def verify_api_key(
header_key: Optional[str] = Header(None, alias="X-API-Key"),
query_key: Optional[str] = Query(None, alias="api_key")
) -> APIKeyInfo:
"""
Verify API key from header or query parameter.
Args:
header_key: API key from X-API-Key header
query_key: API key from api_key query parameter
Returns:
APIKeyInfo if valid
Raises:
APIKeyMissingError: If no key provided
APIKeyInvalidError: If key is invalid
"""
provided_key = header_key or query_key
if not provided_key:
raise APIKeyMissingError()
# Load keys from environment
valid_keys = load_api_keys()
if not valid_keys:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="No API keys configured. Set API_KEY environment variable."
)
# Use timing-safe comparison
for valid_key, info in valid_keys.items():
if secrets.compare_digest(provided_key, valid_key):
return info
raise APIKeyInvalidError()
async def verify_api_key_optional(
header_key: Optional[str] = Header(None, alias="X-API-Key"),
query_key: Optional[str] = Query(None, alias="api_key")
) -> Optional[APIKeyInfo]:
"""
Optionally verify API key (for public endpoints with enhanced access).
Returns None if no key provided, APIKeyInfo if valid key.
"""
provided_key = header_key or query_key
if not provided_key:
return None
valid_keys = load_api_keys()
for valid_key, info in valid_keys.items():
if secrets.compare_digest(provided_key, valid_key):
return info
return None
|