Spaces:
Paused
Paused
File size: 18,106 Bytes
98a466d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 | # app/main.py β ENTERPRISE ANALYTICS ENGINE v3.0
"""
MutSyncHub Analytics Engine
Enterprise-grade AI analytics platform with zero-cost inference
# """
import logging
import os
import time
import uuid
import subprocess
import asyncio
import threading
import pathlib
import json
# # βββ Third-Party ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
from fastapi import FastAPI, Depends, HTTPException, Request, Query, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
# βββ Internal Imports βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
from app.core.event_hub import event_hub
# NOTE: worker_manager is now created via async factory `get_worker_manager()`
# Old import kept as comment for reference:
# from app.core.worker_manager import worker_manager
from app.core.worker_manager import get_worker_manager
from app.deps import rate_limit_org, verify_api_key, check_all_services
from app.tasks.analytics_worker import trigger_kpi_computation
from app.service.vector_service import cleanup_expired_vectors
from app.routers import health, datasources, reports, flags, scheduler, analytics_stream,ai_query,schema
from app.service.llm_service import load_llm_service
from app.deps import get_qstash_client
from prometheus_client import make_asgi_app
# βββ Logger Configuration βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
def safe_redis_decode(value):
"""Safely decode Redis values that might be bytes or str"""
if isinstance(value, bytes):
return value.decode('utf-8')
return value
# βββ Lifespan Management βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Enterprise startup/shutdown sequence with health validation.
"""
# βββ Startup βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
logger.info("=" * 60)
logger.info("π ANALYTICS ENGINE v3.0 - STARTUP SEQUENCE")
logger.info("=" * 60)
app.state.instance_id = f"engine-{uuid.uuid4().hex[:8]}"
logger.info(f"Instance ID: {app.state.instance_id}")
logger.info("π STARTUP SEQUENCE")
# β
CRITICAL: Set persistent cache dir (survives restarts)
os.makedirs("/data/hf_cache", exist_ok=True)
os.environ["HF_HOME"] = "/data/hf_cache"
os.environ["TRANSFORMERS_CACHE"] = "/data/hf_cache"
os.environ["HF_HUB_CACHE"] = "/data/hf_cache"
# Set Hugging Face cache symlink (if needed)
cache_dir = pathlib.Path("/data/hf_cache")
home_cache = pathlib.Path.home() / ".cache" / "huggingface"
if not home_cache.exists():
home_cache.parent.mkdir(parents=True, exist_ok=True)
home_cache.symlink_to(cache_dir)
# Validate service health on boot
try:
services = check_all_services()
healthy = [k for k, v in services.items() if "β
" in str(v)]
unhealthy = [k for k, v in services.items() if "β" in str(v)]
logger.info(f"β
Healthy: {len(healthy)} services")
for svc in healthy:
logger.info(f" β {svc}: {services[svc]}")
if unhealthy:
logger.warning(f"β οΈ Unhealthy: {len(unhealthy)} services")
for svc in unhealthy:
logger.warning(f" β {svc}: {services[svc]}")
except Exception as e:
logger.error(f"π΄ Startup health check failed: {e}")
# Start scheduler in background (optional - controllable via env)
scheduler_process = None
if os.getenv("DISABLE_SCHEDULER") != "1":
try:
scheduler_process = subprocess.Popen(["python", "/app/scheduler_loop.py"])
logger.info(f"β
Scheduler started (PID: {scheduler_process.pid})")
except Exception as e:
logger.warning(f"β οΈ Scheduler failed to start: {e}")
else:
logger.info("βΉοΈ Scheduler start skipped (DISABLE_SCHEDULER=1)")
logger.info("β
Startup sequence complete")
# β
start worker manager listener (optional)
if os.getenv("DISABLE_WORKER_MANAGER") != "1":
logger.info("π starting worker manager...")
try:
# Use the async factory to get the singleton manager instance
worker_manager = await get_worker_manager()
asyncio.create_task(worker_manager.start_listener(), name="worker-manager")
except Exception as e:
logger.error(f"β Failed to start worker manager: {e}")
else:
logger.info("βΉοΈ Worker manager start skipped (DISABLE_WORKER_MANAGER=1)")
# Now load optional services (LLM, QStash)
if os.getenv("DISABLE_LLM_LOAD") != "1":
try:
load_llm_service() # Starts background loading
logger.info("π€ LLM service loading in background...")
except Exception as e:
logger.error(f"β LLM load failed: {e}")
else:
logger.info("βΉοΈ LLM loading skipped (DISABLE_LLM_LOAD=1)")
# QStash client is optional; guard behind env var
if os.getenv("DISABLE_QSTASH") != "1":
try:
get_qstash_client() # This creates the singleton if not exists
logger.info("β
QStash ready")
except RuntimeError as e:
logger.warning(f"β οΈ QStash disabled: {e}")
else:
logger.info("βΉοΈ QStash initialization skipped (DISABLE_QSTASH=1)")
yield
# βββ Shutdown ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
logger.info("=" * 60)
logger.info("π ANALYTICS ENGINE - SHUTDOWN SEQUENCE")
logger.info("=" * 60)
# Close scheduler
scheduler_process.terminate()
logger.info(" β Stopped scheduler")
# Close all database connections
from app.deps import _org_db_connections, _vector_db_conn
if _org_db_connections:
for org_id, conn in _org_db_connections.items():
try:
conn.close()
logger.info(f" β Closed DB: {org_id}")
except Exception:
pass
if _vector_db_conn:
try:
_vector_db_conn.close()
logger.info(" β Closed Vector DB")
except Exception:
pass
logger.info("β
Shutdown complete")
# βββ FastAPI Application βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
app = FastAPI(
title="MutSyncHub Analytics Engine",
version="3.0.0",
description="""Enterprise-grade AI analytics engine with:
β’ Hybrid entity detection (Rule-based + LLM)
β’ Vector similarity search (DuckDB VSS)
β’ Zero external API costs (Local Mistral-7B)
β’ Multi-tenant data isolation
β’ Redis-backed async processing
**π All endpoints require X-API-KEY header except /health**""",
lifespan=lifespan,
docs_url="/api/docs",
redoc_url="/api/redoc",
openapi_url="/api/openapi.json",
contact={
"name": "MutSyncHub Enterprise",
"email": "enterprise@mutsynchub.com"
},
license_info={
"name": "MIT License",
}
)
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
# βββ Startup Workers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.on_event("startup")
async def start_workers():
"""π Start Einstein+Elon engine"""
# 1. Redis listener (triggers AnalyticsWorker)
# Redis listener removed; worker manager now handles trigger events
logger.info("β
Worker manager will handle trigger events")
# 2. Vector cleanup (daily)
def run_cleanup():
while True:
cleanup_expired_vectors()
time.sleep(86400) # 24 hours
cleanup_thread = threading.Thread(target=run_cleanup, daemon=True)
cleanup_thread.start()
logger.info("β
Vector cleanup scheduler started")
# βββ Request ID Middleware βββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.middleware("http")
async def add_request_tracking(request: Request, call_next):
"""
Add request ID and timing for observability.
"""
request_id = f"req-{uuid.uuid4().hex[:12]}"
request.state.request_id = request_id
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
# Add headers
response.headers["X-Request-ID"] = request_id
response.headers["X-Response-Time"] = f"{process_time:.3f}s"
# Log
logger.info(
f"{request.method} {request.url.path} | {response.status_code} "
f"| {process_time:.3f}s | {request_id}"
)
return response
# βββ KPI Computation Endpoint ββββββββββββββββββββββββββββββββββββββββββββββββββ
# βββ KPI Computation Endpoint ββββββββββββββββββββββββββββββββββββββββββββββββββ
# At top of app/main.py - add import
# Replace the compute_kpis function
@app.post("/api/v1/kpi/compute")
async def compute_kpis(
background_tasks: BackgroundTasks,
org_id: str = Query(..., description="Organization ID"),
source_id: str = Query(..., description="Data source ID"),
api_key: str = Depends(verify_api_key), # β
Returns string, not HTTPAuthorizationCredentials
limited_org: str = Depends(rate_limit_org(max_requests=50))
):
"""
Trigger KPI computation.
Returns immediately; results published to Redis stream.
"""
try:
# Check cache first
cached = event_hub.get_key(f"kpi_cache:{org_id}:{source_id}")
if cached:
return {
"status": "cached",
"org_id": org_id,
"data": json.loads(cached),
"rate_limit": {
"remaining": 50,
"reset_in": 60
}
}
background_tasks.add_task(trigger_kpi_computation, org_id, source_id)
return {
"status": "processing",
"org_id": org_id,
"message": "KPI computation queued. Poll /analytics/stream/recent for results.",
"poll_url": f"/api/v1/analytics/stream/recent?org_id={org_id}&source_id={source_id}"
}
except Exception as e:
logger.error(f"β KPI compute error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# βββ Background KPI Scheduler ββββββββββββββββββββββββββββββββββββββββββββββββββ
async def continuous_kpi_refresh():
"""
Auto-refresh KPIs every 5 minutes for active organizations.
"""
await asyncio.sleep(10) # Let app startup complete
while True:
try:
logger.debug("π KPI scheduler tick...")
active_keys = event_hub.keys("entity:*")
for key in active_keys[:10]: # Max 10 per batch
key_parts = safe_redis_decode(key).split(":")
if len(key_parts) >= 3:
org_id, source_id = key_parts[1], key_parts[2]
# Skip if recently computed
cache_key = f"kpi_cache:{org_id}:{source_id}"
if event_hub.exists(cache_key):
continue
# Skip if worker already running
if event_hub.exists(f"worker:lock:{org_id}:{source_id}"):
continue
# Trigger computation
logger.info(f"β° Auto-triggering KPIs for {org_id}/{source_id}")
await trigger_kpi_computation(org_id, source_id)
await asyncio.sleep(1) # 1s gap between triggers
except Exception as e:
logger.error(f"β Scheduler error: {e}")
await asyncio.sleep(300) # β CRITICAL: Sleep 5 minutes between cycles
@app.get("/debug/stream-content")
def debug_stream(
org_id: str = Query(...),
source_id: str = Query(...),
api_key: str = Depends(verify_api_key)
):
"""See what's actually in the Redis stream"""
stream_key = f"stream:analytics:{org_id}:{source_id}"
events = event_hub.read_recent_stream(stream_key, 10)
# Also check for entity/industry keys
entity_key = f"entity:{org_id}:{source_id}"
industry_key = f"industry:{org_id}:{source_id}"
return {
"stream_key": stream_key,
"events_count": len(events),
"events": events,
"entity_exists": bool(event_hub.get_key(entity_key)),
"industry_exists": bool(event_hub.get_key(industry_key)),
"entity_data": event_hub.get_key(entity_key),
"industry_data": event_hub.get_key(industry_key),
}
@app.post("/api/v1/cache/clear")
def clear_cache(org_id: str, source_id: str, api_key: str = Depends(verify_api_key)):
"""Clear entity/industry caches to force fresh reads"""
cache_key = (org_id, source_id)
# Import the cache dicts
from app.mapper import _ENTITY_CACHE, _INDUSTRY_CACHE
if cache_key in _ENTITY_CACHE:
del _ENTITY_CACHE[cache_key]
if cache_key in _INDUSTRY_CACHE:
del _INDUSTRY_CACHE[cache_key]
return {"status": "cleared", "cache_key": str(cache_key)}
# βββ Root Endpoint βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.get("/", tags=["root"])
def read_root():
"""
Service information and discovery.
"""
return {
"status": "operational",
"service": "MutSyncHub Analytics Engine",
"version": "3.0.0",
"mode": "production" if os.getenv("SPACE_ID") else "development",
"instance_id": app.state.instance_id,
"endpoints": {
"docs": "/api/docs",
"health": "/api/health/detailed",
"datasources": "/api/datasources",
},
"features": [
"Hybrid entity detection",
"Vector similarity search",
"Multi-tenant isolation",
"Redis-backed async processing"
]
}
# βββ CORS Configuration ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ALLOWED_ORIGINS = [
"https://mut-sync-hub.vercel.app",
"http://localhost:3000",
"https://studio.huggingface.co",
]
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
expose_headers=["X-Request-ID", "X-Response-Time"],
max_age=3600,
)
# βββ Global Error Handler ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""
Catch all uncaught exceptions and return safe error response.
"""
logger.error(
f"π΄ Unhandled error | Path: {request.url.path} | "
f"Request ID: {request.state.request_id} | Error: {str(exc)}",
exc_info=True
)
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"message": "An unexpected error occurred. Check server logs.",
"request_id": request.state.request_id,
"timestamp": time.time()
}
)
# βββ Router Registration βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Register routers (explicitly, no loops)
app.include_router(health.router, prefix="/health")
app.include_router(datasources.router, prefix="/api/v1/datasources", dependencies=[Depends(verify_api_key)])
app.include_router(reports.router, prefix="/api/v1/reports", dependencies=[Depends(verify_api_key)])
app.include_router(flags.router, prefix="/api/v1/flags", dependencies=[Depends(verify_api_key)])
app.include_router(scheduler.router, prefix="/api/v1/scheduler", dependencies=[Depends(verify_api_key)])
app.include_router(analytics_stream.router, dependencies=[Depends(verify_api_key)])
app.include_router(ai_query.router, prefix="/api/v1/ai-query", dependencies=[Depends(verify_api_key)])
app.include_router(schema.router, prefix="/api/v1/schema", dependencies=[Depends(verify_api_key)]) |