analytics-engine / app /deps.py
Peter Mutwiri
imported datetime in depts
c87681f
raw
history blame
12.8 kB
# ── Standard Library ──────────────────────────────────────────────────────────
import os
from typing import Optional, TYPE_CHECKING
import pathlib
import logging
import time
from datetime import datetime
# ── Third-Party ────────────────────────────────────────────────────────────────
import duckdb
from fastapi import HTTPException, Header,Request, Depends
from upstash_redis import Redis
from collections import defaultdict
# ── Configuration Paths ────────────────────────────────────────────────────────
# Use YOUR existing pattern from app/db.py (multi-tenant)
DATA_DIR = pathlib.Path("./data/duckdb")
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Vector database for AI embeddings (shared but org-filtered)
VECTOR_DB_PATH = DATA_DIR / "vectors.duckdb"
logger = logging.getLogger(__name__)
# ── Secrets Management ─────────────────────────────────────────────────────────
def get_secret(name: str, required: bool = True) -> Optional[str]:
"""
Centralized secret retrieval with validation.
Fails fast on missing required secrets.
"""
value = os.getenv(name)
if required and (not value or value.strip() == ""):
raise ValueError(f"πŸ”΄ CRITICAL: Required secret '{name}' not found in HF environment")
return value
# API Keys (comma-separated for multiple Vercel projects)
API_KEYS = get_secret("API_KEYS").split(",") if get_secret("API_KEYS") else []
# Upstash Redis Bridge (required for Vercel ↔ HF communication)
REDIS_URL = get_secret("UPSTASH_REDIS_REST_URL")
REDIS_TOKEN = get_secret("UPSTASH_REDIS_REST_TOKEN")
# Hugging Face Token (read-only, for model download)
HF_API_TOKEN = get_secret("HF_API_TOKEN", required=False)
# QStash Token (optional, for advanced queue features)
QSTASH_TOKEN = get_secret("QSTASH_TOKEN", required=False)
# Application URL (where this HF Space is hosted)
# Application URL (auto-injected by HF Spaces, fallback for local dev)
APP_URL = os.getenv("SPACE_HOST", "http://localhost:8000").rstrip("/")
# ── Singleton Database Connections ──────────────────────────────────────────────
_org_db_connections = {}
_vector_db_conn = None
def get_duckdb(org_id: str):
"""
Multi-tenant DuckDB connection (YOUR proven pattern).
Each org gets isolated: ./data/duckdb/{org_id}.duckdb
"""
if org_id not in _org_db_connections:
db_file = DATA_DIR / f"{org_id}.duckdb"
conn = duckdb.connect(str(db_file), read_only=False)
# Ensure schemas exist
conn.execute("CREATE SCHEMA IF NOT EXISTS main")
conn.execute("CREATE SCHEMA IF NOT EXISTS vector_store")
# Enable vector search extension
try:
conn.execute("INSTALL vss;")
conn.execute("LOAD vss;")
except Exception as e:
print(f"⚠️ VSS extension warning (non-critical): {e}")
_org_db_connections[org_id] = conn
return _org_db_connections[org_id]
# app/deps.py – Replace get_vector_db function
def get_vector_db():
"""Shared vector database with VSS extension (fault-tolerant)"""
global _vector_db_conn
if _vector_db_conn is None:
_vector_db_conn = duckdb.connect(str(VECTOR_DB_PATH), read_only=False)
# Install VSS with retry logic
try:
_vector_db_conn.execute("INSTALL vss;")
_vector_db_conn.execute("LOAD vss;")
logger.info("βœ… VSS extension loaded successfully")
except Exception as e:
logger.warning(f"⚠️ VSS extension failed to load: {e}")
logger.warning(" Vector search will be disabled until VSS is available")
# Create schema and table
_vector_db_conn.execute("CREATE SCHEMA IF NOT EXISTS vector_store")
_vector_db_conn.execute("""
CREATE TABLE IF NOT EXISTS vector_store.embeddings (
id VARCHAR PRIMARY KEY,
org_id VARCHAR NOT NULL,
content TEXT,
embedding FLOAT[384],
entity_type VARCHAR,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create index if VSS loaded
try:
_vector_db_conn.execute("""
CREATE INDEX IF NOT EXISTS idx_org_entity
ON vector_store.embeddings (org_id, entity_type)
""")
except:
pass # Index creation fails if VSS isn't loaded
logger.info("βœ… Vector DB schema initialized")
return _vector_db_conn
# ── Redis Singleton ────────────────────────────────────────────────────────────
_redis_client = None
def get_redis():
"""
Upstash Redis client (singleton) for Vercel bridge.
"""
global _redis_client
if _redis_client is None:
_redis_client = Redis(url=REDIS_URL, token=REDIS_TOKEN)
# Test connection on first load
try:
_redis_client.ping()
print("βœ… Redis bridge connected")
except Exception as e:
raise RuntimeError(f"πŸ”΄ Redis connection failed: {e}")
return _redis_client
if TYPE_CHECKING:
from upstash_qstash import Client
def get_qstash_client() -> "Client":
"""
Initialize and return singleton QStash client for Hugging Face Spaces.
Required HF Secrets:
- QSTASH_TOKEN: Your QStash API token
Optional HF Secrets:
- QSTASH_URL: Custom QStash URL (defaults to official Upstash endpoint)
Returns:
Configured QStash Client instance
Raises:
RuntimeError: If QSTASH_TOKEN is missing or client initialization fails
"""
# Singleton pattern: store instance as function attribute
if not hasattr(get_qstash_client, "_client"):
token = os.getenv("QSTASH_TOKEN")
if not token:
raise RuntimeError(
"❌ QSTASH_TOKEN not found. Please add it to HF Space Secrets."
)
# Dynamic import to avoid requiring package at module load time
try:
from upstash_qstash import Client
except ImportError:
raise RuntimeError(
"❌ upstash_qstash not installed. "
"Add to requirements.txt: upstash-qstash"
)
# Optional: Use custom URL if provided
qstash_url = os.getenv("QSTASH_URL")
try:
if qstash_url:
get_qstash_client._client = Client(token=token, url=qstash_url)
print(f"βœ… QStash client initialized with custom URL: {qstash_url}")
else:
get_qstash_client._client = Client(token=token)
print("βœ… QStash client initialized")
except Exception as e:
raise RuntimeError(f"❌ QStash client initialization failed: {e}")
return get_qstash_client._client
def get_qstash_verifier():
"""
Initialize QStash webhook verifier for receiving callbacks.
Used in /api/v1/analytics/callback endpoint to verify requests.
Required HF Secrets:
- QSTASH_CURRENT_SIGNING_KEY
- QSTASH_NEXT_SIGNING_KEY
Returns:
QStash Receiver/Verifier instance
"""
if not hasattr(get_qstash_verifier, "_verifier"):
current_key = os.getenv("QSTASH_CURRENT_SIGNING_KEY")
next_key = os.getenv("QSTASH_NEXT_SIGNING_KEY")
if not current_key or not next_key:
raise RuntimeError(
"❌ QStash signing keys not configured. "
"Add QSTASH_CURRENT_SIGNING_KEY and QSTASH_NEXT_SIGNING_KEY to HF secrets."
)
try:
from upstash_qstash import Receiver
get_qstash_verifier._verifier = Receiver({
"current_signing_key": current_key,
"next_signing_key": next_key
})
print("βœ… QStash verifier initialized")
except Exception as e:
raise RuntimeError(f"❌ QStash verifier initialization failed: {e}")
return get_qstash_verifier._verifier
# ── API Security Dependency ────────────────────────────────────────────────────
def verify_api_key(x_api_key: str = Header(..., alias="X-API-KEY")):
"""
FastAPI dependency for Vercel endpoints.
Rejects invalid API keys with 401.
"""
if not API_KEYS:
raise HTTPException(
status_code=500,
detail="πŸ”΄ API_KEYS not configured in HF environment"
)
if x_api_key not in API_KEYS:
raise HTTPException(
status_code=401,
detail="❌ Invalid API key"
)
return x_api_key
# ── New User Auth Dependency ──────────────────────────────────────────────────
def get_current_user(request: Request, api_key: str = Depends(verify_api_key)):
"""
Extracts org_id from query parameters (since auth happens on Vercel).
Use this in analytics endpoints that need org context.
Stack Auth on Vercel already validated the user,
so we trust the orgId passed in the query.
"""
org_id = request.query_params.get("org_id") or request.query_params.get("orgId")
if not org_id:
raise HTTPException(
status_code=401,
detail="❌ org_id missing from query parameters. Vercel stack auth missing?"
)
# Validate org_id format (simple security check)
if not org_id.startswith("org_") and not org_id.startswith("user_"):
raise HTTPException(
status_code=400,
detail=f"❌ Invalid org_id format: {org_id}"
)
return {
"org_id": org_id,
"api_key": api_key,
"authenticated_at": datetime.utcnow().isoformat(),
"source": "vercel_stack_auth"
}
# ── Rate Limiting (Optional but Recommended) ──────────────────────────────────
# In-memory rate limiter (per org)
_rate_limits = defaultdict(lambda: {"count": 0, "reset_at": 0})
def rate_limit_org(max_requests: int = 100, window_seconds: int = 60):
"""
Rate limiter per organization.
Prevents one org from DOSing the analytics engine.
"""
def dependency(org_id: str = Depends(lambda r: get_current_user(r)["org_id"])):
now = time.time()
limit_data = _rate_limits[org_id]
# Reset window
if now > limit_data["reset_at"]:
limit_data["count"] = 0
limit_data["reset_at"] = now + window_seconds
# Check limit
if limit_data["count"] >= max_requests:
raise HTTPException(
status_code=429,
detail=f"⏸️ Rate limit exceeded for {org_id}: {max_requests} req/min"
)
limit_data["count"] += 1
return org_id
return dependency
# ── Health Check Utilities ─────────────────────────────────────────────────────
def check_all_services():
"""
Comprehensive health check for /health endpoint.
Returns dict with service statuses.
"""
statuses = {}
# Check DuckDB
try:
conn = get_duckdb("health_check")
conn.execute("SELECT 1")
statuses["duckdb"] = "βœ… connected"
except Exception as e:
statuses["duckdb"] = f"❌ {e}"
# Check Vector DB
try:
vdb = get_vector_db()
vdb.execute("SELECT 1")
statuses["vector_db"] = "βœ… connected"
except Exception as e:
statuses["vector_db"] = f"❌ {e}"
# Check Redis
try:
r = get_redis()
r.ping()
statuses["redis"] = "βœ… connected"
except Exception as e:
statuses["redis"] = f"❌ {e}"
return statuses