| | from fastapi import HTTPException, Header, Depends |
| | from fastapi.security import APIKeyHeader |
| | from typing import Optional |
| | from config import API_KEY, HUGGINGFACE_API_KEY, HUGGINGFACE |
| | import os |
| | import json |
| | import base64 |
| |
|
| | |
| | def validate_api_key(api_key_to_validate: str) -> bool: |
| | """ |
| | Validate the provided API key against the configured key. |
| | """ |
| | if not API_KEY: |
| | |
| | |
| | return False |
| | return api_key_to_validate == API_KEY |
| |
|
| | |
| | api_key_header = APIKeyHeader(name="Authorization", auto_error=False) |
| |
|
| | |
| | async def get_api_key( |
| | authorization: Optional[str] = Header(None), |
| | x_ip_token: Optional[str] = Header(None, alias="x-ip-token") |
| | ): |
| | |
| | if HUGGINGFACE: |
| | if x_ip_token is None: |
| | raise HTTPException( |
| | status_code=401, |
| | detail="Missing x-ip-token header. This header is required for Hugging Face authentication." |
| | ) |
| |
|
| | try: |
| | |
| | parts = x_ip_token.split('.') |
| | if len(parts) < 2: |
| | raise ValueError("Invalid JWT format: Not enough parts to extract payload.") |
| | payload_encoded = parts[1] |
| | |
| | payload_encoded += '=' * (-len(payload_encoded) % 4) |
| | decoded_payload_bytes = base64.urlsafe_b64decode(payload_encoded) |
| | payload = json.loads(decoded_payload_bytes.decode('utf-8')) |
| | except ValueError as ve: |
| | |
| | print(f"ValueError processing x-ip-token: {ve}") |
| | raise HTTPException(status_code=400, detail=f"Invalid JWT format in x-ip-token: {str(ve)}") |
| | except (json.JSONDecodeError, base64.binascii.Error, UnicodeDecodeError) as e: |
| | print(f"Error decoding/parsing x-ip-token payload: {e}") |
| | raise HTTPException(status_code=400, detail=f"Malformed x-ip-token payload: {str(e)}") |
| | except Exception as e: |
| | print(f"Unexpected error processing x-ip-token: {e}") |
| | raise HTTPException(status_code=500, detail="Internal error processing x-ip-token.") |
| |
|
| | error_in_token = payload.get("error") |
| |
|
| | if error_in_token == "InvalidAccessToken": |
| | raise HTTPException( |
| | status_code=403, |
| | detail="Access denied: x-ip-token indicates 'InvalidAccessToken'." |
| | ) |
| | elif error_in_token is None: |
| | |
| | print(f"HuggingFace authentication successful via x-ip-token (error field was null).") |
| | return HUGGINGFACE_API_KEY |
| | else: |
| | |
| | raise HTTPException( |
| | status_code=403, |
| | detail=f"Access denied: x-ip-token indicates an unhandled error: '{error_in_token}'." |
| | ) |
| | else: |
| | |
| | if authorization is None: |
| | detail_message = "Missing API key. Please include 'Authorization: Bearer YOUR_API_KEY' header." |
| | |
| | if os.getenv("HUGGINGFACE") is not None: |
| | detail_message += " (Note: HUGGINGFACE mode with x-ip-token is not currently active)." |
| | raise HTTPException( |
| | status_code=401, |
| | detail=detail_message |
| | ) |
| | |
| | |
| | if not authorization.startswith("Bearer "): |
| | raise HTTPException( |
| | status_code=401, |
| | detail="Invalid API key format. Use 'Authorization: Bearer YOUR_API_KEY'" |
| | ) |
| | |
| | |
| | api_key = authorization.replace("Bearer ", "") |
| | |
| | |
| | if not validate_api_key(api_key): |
| | raise HTTPException( |
| | status_code=401, |
| | detail="Invalid API key" |
| | ) |
| | |
| | return api_key |