File size: 4,404 Bytes
5ca88f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
"""
API Authentication

Provides API key authentication with environment-based configuration.
"""

import os
import secrets
from typing import Optional, Dict
from pydantic import BaseModel
from fastapi import HTTPException, status, Header, Query, Depends
from fastapi.security import APIKeyHeader

# API Key header scheme
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)


class APIKeyInfo(BaseModel):
    """API key information."""
    key: str
    name: str
    permissions: list = []


# Cache for API keys (loaded from environment)
_valid_api_keys: Dict[str, APIKeyInfo] = {}


def load_api_keys() -> Dict[str, APIKeyInfo]:
    """
    Load API keys from environment variables.
    
    Format:
        API_KEY=your-api-key
        API_KEY_NAME=Production
        API_KEY_PERMISSIONS=read,write
    
    Multiple keys can be set with suffixes:
        API_KEY_1=...
        API_KEY_2=...
    """
    global _valid_api_keys
    
    if _valid_api_keys:
        return _valid_api_keys
    
    keys = {}
    
    # Load primary API key
    primary_key = os.getenv("API_KEY")
    if primary_key:
        keys[primary_key] = APIKeyInfo(
            key=primary_key,
            name=os.getenv("API_KEY_NAME", "Default"),
            permissions=os.getenv("API_KEY_PERMISSIONS", "read,write").split(",")
        )
    
    # Load additional API keys (API_KEY_1, API_KEY_2, etc.)
    for i in range(1, 10):
        key = os.getenv(f"API_KEY_{i}")
        if key:
            keys[key] = APIKeyInfo(
                key=key,
                name=os.getenv(f"API_KEY_{i}_NAME", f"Key {i}"),
                permissions=os.getenv(f"API_KEY_{i}_PERMISSIONS", "read,write").split(",")
            )
    
    # Fallback to demo key in development (not production)
    if not keys and os.getenv("UNESDOC_PIPELINE_ENV") != "production":
        demo_key = "demo-key-12345"
        keys[demo_key] = APIKeyInfo(
            key=demo_key,
            name="Demo User",
            permissions=["read", "write"]
        )
    
    _valid_api_keys = keys
    return keys


def setup_demo_key():
    """Initialize API keys on startup."""
    load_api_keys()


class APIKeyMissingError(HTTPException):
    """Error when API key is missing."""
    def __init__(self):
        super().__init__(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Missing API key. Include 'X-API-Key' header."
        )


class APIKeyInvalidError(HTTPException):
    """Error when API key is invalid."""
    def __init__(self):
        super().__init__(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid API key."
        )


async def verify_api_key(
    header_key: Optional[str] = Header(None, alias="X-API-Key"),
    query_key: Optional[str] = Query(None, alias="api_key")
) -> APIKeyInfo:
    """
    Verify API key from header or query parameter.
    
    Args:
        header_key: API key from X-API-Key header
        query_key: API key from api_key query parameter
        
    Returns:
        APIKeyInfo if valid
        
    Raises:
        APIKeyMissingError: If no key provided
        APIKeyInvalidError: If key is invalid
    """
    provided_key = header_key or query_key
    
    if not provided_key:
        raise APIKeyMissingError()
    
    # Load keys from environment
    valid_keys = load_api_keys()
    
    if not valid_keys:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail="No API keys configured. Set API_KEY environment variable."
        )
    
    # Use timing-safe comparison
    for valid_key, info in valid_keys.items():
        if secrets.compare_digest(provided_key, valid_key):
            return info
    
    raise APIKeyInvalidError()


async def verify_api_key_optional(
    header_key: Optional[str] = Header(None, alias="X-API-Key"),
    query_key: Optional[str] = Query(None, alias="api_key")
) -> Optional[APIKeyInfo]:
    """
    Optionally verify API key (for public endpoints with enhanced access).
    
    Returns None if no key provided, APIKeyInfo if valid key.
    """
    provided_key = header_key or query_key
    
    if not provided_key:
        return None
    
    valid_keys = load_api_keys()
    
    for valid_key, info in valid_keys.items():
        if secrets.compare_digest(provided_key, valid_key):
            return info
    
    return None