RemanenetSpy
Rebrand to Kaal — rename from Chronos OS, add Logo component with serif seal design, swap all Hexagon icons for Logo
4303a4c
Raw
History Blame Contribute Delete
3.59 kB
"""
KAAL — API Key Authentication
=====================================
Middleware to validate X-API-Key headers against the SQLite api_keys table.
Supports tier-based rate limiting and usage tracking.
"""
from __future__ import annotations
import hashlib
import secrets
import logging
from typing import Optional
from fastapi import HTTPException, Security, Request
from fastapi.security import APIKeyHeader, HTTPBearer, HTTPAuthorizationCredentials
from chronos_core.models import TierName, TIER_LIMITS
from .deps import get_memory_store
logger = logging.getLogger("chronos.auth")
# ---------------------------------------------------------------------------
# API Key header scheme
# ---------------------------------------------------------------------------
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
bearer_scheme = HTTPBearer(auto_error=False)
def hash_api_key(key: str) -> str:
"""Hash an API key for storage (SHA-256)."""
return hashlib.sha256(key.encode()).hexdigest()
def generate_api_key() -> str:
"""Generate a new secure API key."""
return f"chrn_{secrets.token_urlsafe(32)}"
async def verify_api_key(
request: Request,
api_key_header: Optional[str] = Security(api_key_header),
bearer_token: Optional[HTTPAuthorizationCredentials] = Security(bearer_scheme),
) -> dict:
"""
FastAPI dependency: validate the API key from either X-API-Key or Bearer token.
Raises 401 if missing, 403 if invalid.
"""
api_key = api_key_header
if not api_key and bearer_token:
api_key = bearer_token.credentials
if not api_key:
raise HTTPException(
status_code=401,
detail="Missing API key. Include X-API-Key header or Authorization: Bearer token.",
)
key_hash = hash_api_key(api_key)
store = get_memory_store()
key_info = await store.validate_api_key(key_hash)
if not key_info:
raise HTTPException(
status_code=403,
detail="Invalid API key.",
)
return key_info
async def check_event_quota(source_id: str, event_count: int = 1) -> None:
"""
Check if the source has enough event quota remaining.
Raises 429 if quota exceeded on Explorer tier (hard cap).
"""
store = get_memory_store()
usage = await store.get_usage(source_id)
if not usage:
return # No usage record = no limits enforced yet
tier = usage.tier
limits = TIER_LIMITS[tier]
if tier == TierName.EXPLORER and usage.events_used + event_count > limits.events_per_month:
raise HTTPException(
status_code=429,
detail=(
f"Event quota exceeded ({usage.events_used}/{limits.events_per_month}). "
f"Upgrade to Builder ($49/mo) for 500k events/month."
),
)
async def check_orchestration_quota(source_id: str) -> None:
"""
Check if the source has enough orchestration quota remaining.
Raises 429 if quota exceeded on Explorer tier (hard cap).
"""
store = get_memory_store()
usage = await store.get_usage(source_id)
if not usage:
return
tier = usage.tier
limits = TIER_LIMITS[tier]
if tier == TierName.EXPLORER and usage.orchestration_used >= limits.orchestration_per_month:
raise HTTPException(
status_code=429,
detail=(
f"Orchestration quota exceeded ({usage.orchestration_used}/{limits.orchestration_per_month}). "
f"Upgrade to Builder ($49/mo) for 10k calls/month."
),
)