| |
| import os |
| from typing import Optional, TYPE_CHECKING |
| import pathlib |
| import logging |
| import time |
| from datetime import datetime |
| |
| import duckdb |
| from fastapi import HTTPException, Header,Request, Depends |
| from upstash_redis import Redis |
| from collections import defaultdict |
|
|
|
|
| |
| |
| DATA_DIR = pathlib.Path("./data/duckdb") |
| DATA_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| |
| VECTOR_DB_PATH = DATA_DIR / "vectors.duckdb" |
| logger = logging.getLogger(__name__) |
| |
| def get_secret(name: str, required: bool = True) -> Optional[str]: |
| """ |
| Centralized secret retrieval with validation. |
| Fails fast on missing required secrets. |
| """ |
| value = os.getenv(name) |
| if required and (not value or value.strip() == ""): |
| raise ValueError(f"π΄ CRITICAL: Required secret '{name}' not found in HF environment") |
| return value |
|
|
| |
| API_KEYS = get_secret("API_KEYS").split(",") if get_secret("API_KEYS") else [] |
|
|
| |
| REDIS_URL = get_secret("UPSTASH_REDIS_REST_URL") |
| REDIS_TOKEN = get_secret("UPSTASH_REDIS_REST_TOKEN") |
|
|
| |
| HF_API_TOKEN = get_secret("HF_API_TOKEN", required=False) |
|
|
| |
| QSTASH_TOKEN = get_secret("QSTASH_TOKEN", required=False) |
| |
| |
| APP_URL = os.getenv("SPACE_HOST", "http://localhost:8000").rstrip("/") |
|
|
| |
| _org_db_connections = {} |
| _vector_db_conn = None |
|
|
| def get_duckdb(org_id: str): |
| """ |
| Multi-tenant DuckDB connection (YOUR proven pattern). |
| Each org gets isolated: ./data/duckdb/{org_id}.duckdb |
| """ |
| if org_id not in _org_db_connections: |
| db_file = DATA_DIR / f"{org_id}.duckdb" |
| conn = duckdb.connect(str(db_file), read_only=False) |
| |
| |
| conn.execute("CREATE SCHEMA IF NOT EXISTS main") |
| conn.execute("CREATE SCHEMA IF NOT EXISTS vector_store") |
| |
| |
| try: |
| conn.execute("INSTALL vss;") |
| conn.execute("LOAD vss;") |
| except Exception as e: |
| print(f"β οΈ VSS extension warning (non-critical): {e}") |
| |
| _org_db_connections[org_id] = conn |
| |
| return _org_db_connections[org_id] |
|
|
| |
| def get_vector_db(): |
| """Shared vector database with VSS extension (fault-tolerant)""" |
| global _vector_db_conn |
| if _vector_db_conn is None: |
| _vector_db_conn = duckdb.connect(str(VECTOR_DB_PATH), read_only=False) |
| |
| |
| try: |
| _vector_db_conn.execute("INSTALL vss;") |
| _vector_db_conn.execute("LOAD vss;") |
| logger.info("β
VSS extension loaded successfully") |
| except Exception as e: |
| logger.warning(f"β οΈ VSS extension failed to load: {e}") |
| logger.warning(" Vector search will be disabled until VSS is available") |
| |
| |
| _vector_db_conn.execute("CREATE SCHEMA IF NOT EXISTS vector_store") |
| _vector_db_conn.execute(""" |
| CREATE TABLE IF NOT EXISTS vector_store.embeddings ( |
| id VARCHAR PRIMARY KEY, |
| org_id VARCHAR NOT NULL, |
| content TEXT, |
| embedding FLOAT[384], |
| entity_type VARCHAR, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| """) |
| |
| |
| try: |
| _vector_db_conn.execute(""" |
| CREATE INDEX IF NOT EXISTS idx_org_entity |
| ON vector_store.embeddings (org_id, entity_type) |
| """) |
| except: |
| pass |
| |
| logger.info("β
Vector DB schema initialized") |
| |
| return _vector_db_conn |
|
|
| |
| _redis_client = None |
|
|
| def get_redis(): |
| """ |
| Upstash Redis client (singleton) for Vercel bridge. |
| """ |
| global _redis_client |
| if _redis_client is None: |
| _redis_client = Redis(url=REDIS_URL, token=REDIS_TOKEN) |
| |
| |
| try: |
| _redis_client.ping() |
| print("β
Redis bridge connected") |
| except Exception as e: |
| raise RuntimeError(f"π΄ Redis connection failed: {e}") |
| |
| return _redis_client |
|
|
|
|
|
|
| if TYPE_CHECKING: |
| from upstash_qstash import Client |
|
|
| def get_qstash_client() -> "Client": |
| """ |
| Initialize and return singleton QStash client for Hugging Face Spaces. |
| |
| Required HF Secrets: |
| - QSTASH_TOKEN: Your QStash API token |
| |
| Optional HF Secrets: |
| - QSTASH_URL: Custom QStash URL (defaults to official Upstash endpoint) |
| |
| Returns: |
| Configured QStash Client instance |
| |
| Raises: |
| RuntimeError: If QSTASH_TOKEN is missing or client initialization fails |
| """ |
| |
| if not hasattr(get_qstash_client, "_client"): |
| token = os.getenv("QSTASH_TOKEN") |
| if not token: |
| raise RuntimeError( |
| "β QSTASH_TOKEN not found. Please add it to HF Space Secrets." |
| ) |
| |
| |
| try: |
| from upstash_qstash import Client |
| except ImportError: |
| raise RuntimeError( |
| "β upstash_qstash not installed. " |
| "Add to requirements.txt: upstash-qstash" |
| ) |
| |
| |
| qstash_url = os.getenv("QSTASH_URL") |
| |
| try: |
| if qstash_url: |
| get_qstash_client._client = Client(token=token, url=qstash_url) |
| print(f"β
QStash client initialized with custom URL: {qstash_url}") |
| else: |
| get_qstash_client._client = Client(token=token) |
| print("β
QStash client initialized") |
| except Exception as e: |
| raise RuntimeError(f"β QStash client initialization failed: {e}") |
| |
| return get_qstash_client._client |
|
|
|
|
| def get_qstash_verifier(): |
| """ |
| Initialize QStash webhook verifier for receiving callbacks. |
| Used in /api/v1/analytics/callback endpoint to verify requests. |
| |
| Required HF Secrets: |
| - QSTASH_CURRENT_SIGNING_KEY |
| - QSTASH_NEXT_SIGNING_KEY |
| |
| Returns: |
| QStash Receiver/Verifier instance |
| """ |
| if not hasattr(get_qstash_verifier, "_verifier"): |
| current_key = os.getenv("QSTASH_CURRENT_SIGNING_KEY") |
| next_key = os.getenv("QSTASH_NEXT_SIGNING_KEY") |
| |
| if not current_key or not next_key: |
| raise RuntimeError( |
| "β QStash signing keys not configured. " |
| "Add QSTASH_CURRENT_SIGNING_KEY and QSTASH_NEXT_SIGNING_KEY to HF secrets." |
| ) |
| |
| try: |
| from upstash_qstash import Receiver |
| |
| get_qstash_verifier._verifier = Receiver({ |
| "current_signing_key": current_key, |
| "next_signing_key": next_key |
| }) |
| print("β
QStash verifier initialized") |
| except Exception as e: |
| raise RuntimeError(f"β QStash verifier initialization failed: {e}") |
| |
| return get_qstash_verifier._verifier |
|
|
| |
| def verify_api_key(x_api_key: str = Header(..., alias="X-API-KEY")): |
| """ |
| FastAPI dependency for Vercel endpoints. |
| Rejects invalid API keys with 401. |
| """ |
| if not API_KEYS: |
| raise HTTPException( |
| status_code=500, |
| detail="π΄ API_KEYS not configured in HF environment" |
| ) |
| |
| if x_api_key not in API_KEYS: |
| raise HTTPException( |
| status_code=401, |
| detail="β Invalid API key" |
| ) |
| |
| return x_api_key |
|
|
| |
|
|
|
|
| def get_current_user(request: Request, api_key: str = Depends(verify_api_key)): |
| """ |
| Extracts org_id from query parameters (since auth happens on Vercel). |
| Use this in analytics endpoints that need org context. |
| |
| Stack Auth on Vercel already validated the user, |
| so we trust the orgId passed in the query. |
| """ |
| org_id = request.query_params.get("org_id") or request.query_params.get("orgId") |
| |
| if not org_id: |
| raise HTTPException( |
| status_code=401, |
| detail="β org_id missing from query parameters. Vercel stack auth missing?" |
| ) |
| |
| |
| if not org_id.startswith("org_") and not org_id.startswith("user_"): |
| raise HTTPException( |
| status_code=400, |
| detail=f"β Invalid org_id format: {org_id}" |
| ) |
| |
| return { |
| "org_id": org_id, |
| "api_key": api_key, |
| "authenticated_at": datetime.utcnow().isoformat(), |
| "source": "vercel_stack_auth" |
| } |
|
|
| |
|
|
| |
| _rate_limits = defaultdict(lambda: {"count": 0, "reset_at": 0}) |
|
|
| def rate_limit_org(max_requests: int = 100, window_seconds: int = 60): |
| """ |
| Rate limiter per organization. |
| Prevents one org from DOSing the analytics engine. |
| """ |
| def dependency(org_id: str = Depends(lambda r: get_current_user(r)["org_id"])): |
| now = time.time() |
| limit_data = _rate_limits[org_id] |
| |
| |
| if now > limit_data["reset_at"]: |
| limit_data["count"] = 0 |
| limit_data["reset_at"] = now + window_seconds |
| |
| |
| if limit_data["count"] >= max_requests: |
| raise HTTPException( |
| status_code=429, |
| detail=f"βΈοΈ Rate limit exceeded for {org_id}: {max_requests} req/min" |
| ) |
| |
| limit_data["count"] += 1 |
| return org_id |
| |
| return dependency |
| |
| def check_all_services(): |
| """ |
| Comprehensive health check for /health endpoint. |
| Returns dict with service statuses. |
| """ |
| statuses = {} |
| |
| |
| try: |
| conn = get_duckdb("health_check") |
| conn.execute("SELECT 1") |
| statuses["duckdb"] = "β
connected" |
| except Exception as e: |
| statuses["duckdb"] = f"β {e}" |
| |
| |
| try: |
| vdb = get_vector_db() |
| vdb.execute("SELECT 1") |
| statuses["vector_db"] = "β
connected" |
| except Exception as e: |
| statuses["vector_db"] = f"β {e}" |
| |
| |
| try: |
| r = get_redis() |
| r.ping() |
| statuses["redis"] = "β
connected" |
| except Exception as e: |
| statuses["redis"] = f"β {e}" |
| |
| return statuses |