File size: 18,106 Bytes
a0ff994 9d87f4c ce0938c a0ff994 eb5f1af ae09122 5f7588b 458e9ae a0ff994 3b5e1d5 71de6ef 472833f 8aaa8a5 472833f ae09122 b9f0345 6be95d4 90ddfb7 72194b1 b9f0345 7002f8a b39a40c 698a188 644db45 049be5a a0ff994 ae09122 a0ff994 50fc282 a0ff994 472833f a0ff994 f77f60f a0ff994 f77f60f a0ff994 ce0938c eb5f1af a0ff994 0bd628a ce0938c 6be95d4 ce0938c b9f0345 ce0938c 472833f a0ff994 eb5f1af a0ff994 7f15715 a0ff994 7f15715 305eb68 a0ff994 472833f a0ff994 472833f a0ff994 784ea15 a0ff994 472833f 049be5a 472833f ae09122 b9f0345 ae09122 a0ff994 472833f a0ff994 472833f a0ff994 472833f ae09122 1e5d7a4 ae09122 4d17e4d 72194b1 4d17e4d 0bd628a 4d17e4d 0bd628a 4d17e4d ae09122 0bd628a b9f0345 72194b1 0bd628a eb81ac1 0bd628a ae09122 0bd628a 472833f ae09122 0bd628a 94311f6 0bd628a b9f0345 94311f6 50fc282 0bd628a b9f0345 ae09122 0bd628a b9f0345 0bd628a b9f0345 94311f6 0bd628a 94311f6 0bd628a 94311f6 98c2088 5549d4c a0ff994 ae09122 a0ff994 472833f a0ff994 472833f a0ff994 472833f a0ff994 472833f a0ff994 472833f a0ff994 ae09122 307409b 6b41c9e 12e51aa 1e5d7a4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 | # app/main.py β ENTERPRISE ANALYTICS ENGINE v3.0
"""
MutSyncHub Analytics Engine
Enterprise-grade AI analytics platform with zero-cost inference
# """
import logging
import os
import time
import uuid
import subprocess
import asyncio
import threading
import pathlib
import json
# # βββ Third-Party ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
from fastapi import FastAPI, Depends, HTTPException, Request, Query, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
# βββ Internal Imports βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
from app.core.event_hub import event_hub
# NOTE: worker_manager is now created via async factory `get_worker_manager()`
# Old import kept as comment for reference:
# from app.core.worker_manager import worker_manager
from app.core.worker_manager import get_worker_manager
from app.deps import rate_limit_org, verify_api_key, check_all_services
from app.tasks.analytics_worker import trigger_kpi_computation
from app.service.vector_service import cleanup_expired_vectors
from app.routers import health, datasources, reports, flags, scheduler, analytics_stream,ai_query,schema
from app.service.llm_service import load_llm_service
from app.deps import get_qstash_client
from prometheus_client import make_asgi_app
# βββ Logger Configuration βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
logger = logging.getLogger(__name__)
def safe_redis_decode(value):
"""Safely decode Redis values that might be bytes or str"""
if isinstance(value, bytes):
return value.decode('utf-8')
return value
# βββ Lifespan Management βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Enterprise startup/shutdown sequence with health validation.
"""
# βββ Startup βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
logger.info("=" * 60)
logger.info("π ANALYTICS ENGINE v3.0 - STARTUP SEQUENCE")
logger.info("=" * 60)
app.state.instance_id = f"engine-{uuid.uuid4().hex[:8]}"
logger.info(f"Instance ID: {app.state.instance_id}")
logger.info("π STARTUP SEQUENCE")
# β
CRITICAL: Set persistent cache dir (survives restarts)
os.makedirs("/data/hf_cache", exist_ok=True)
os.environ["HF_HOME"] = "/data/hf_cache"
os.environ["TRANSFORMERS_CACHE"] = "/data/hf_cache"
os.environ["HF_HUB_CACHE"] = "/data/hf_cache"
# Set Hugging Face cache symlink (if needed)
cache_dir = pathlib.Path("/data/hf_cache")
home_cache = pathlib.Path.home() / ".cache" / "huggingface"
if not home_cache.exists():
home_cache.parent.mkdir(parents=True, exist_ok=True)
home_cache.symlink_to(cache_dir)
# Validate service health on boot
try:
services = check_all_services()
healthy = [k for k, v in services.items() if "β
" in str(v)]
unhealthy = [k for k, v in services.items() if "β" in str(v)]
logger.info(f"β
Healthy: {len(healthy)} services")
for svc in healthy:
logger.info(f" β {svc}: {services[svc]}")
if unhealthy:
logger.warning(f"β οΈ Unhealthy: {len(unhealthy)} services")
for svc in unhealthy:
logger.warning(f" β {svc}: {services[svc]}")
except Exception as e:
logger.error(f"π΄ Startup health check failed: {e}")
# Start scheduler in background (optional - controllable via env)
scheduler_process = None
if os.getenv("DISABLE_SCHEDULER") != "1":
try:
scheduler_process = subprocess.Popen(["python", "/app/scheduler_loop.py"])
logger.info(f"β
Scheduler started (PID: {scheduler_process.pid})")
except Exception as e:
logger.warning(f"β οΈ Scheduler failed to start: {e}")
else:
logger.info("βΉοΈ Scheduler start skipped (DISABLE_SCHEDULER=1)")
logger.info("β
Startup sequence complete")
# β
start worker manager listener (optional)
if os.getenv("DISABLE_WORKER_MANAGER") != "1":
logger.info("π starting worker manager...")
try:
# Use the async factory to get the singleton manager instance
worker_manager = await get_worker_manager()
asyncio.create_task(worker_manager.start_listener(), name="worker-manager")
except Exception as e:
logger.error(f"β Failed to start worker manager: {e}")
else:
logger.info("βΉοΈ Worker manager start skipped (DISABLE_WORKER_MANAGER=1)")
# Now load optional services (LLM, QStash)
if os.getenv("DISABLE_LLM_LOAD") != "1":
try:
load_llm_service() # Starts background loading
logger.info("π€ LLM service loading in background...")
except Exception as e:
logger.error(f"β LLM load failed: {e}")
else:
logger.info("βΉοΈ LLM loading skipped (DISABLE_LLM_LOAD=1)")
# QStash client is optional; guard behind env var
if os.getenv("DISABLE_QSTASH") != "1":
try:
get_qstash_client() # This creates the singleton if not exists
logger.info("β
QStash ready")
except RuntimeError as e:
logger.warning(f"β οΈ QStash disabled: {e}")
else:
logger.info("βΉοΈ QStash initialization skipped (DISABLE_QSTASH=1)")
yield
# βββ Shutdown ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
logger.info("=" * 60)
logger.info("π ANALYTICS ENGINE - SHUTDOWN SEQUENCE")
logger.info("=" * 60)
# Close scheduler
scheduler_process.terminate()
logger.info(" β Stopped scheduler")
# Close all database connections
from app.deps import _org_db_connections, _vector_db_conn
if _org_db_connections:
for org_id, conn in _org_db_connections.items():
try:
conn.close()
logger.info(f" β Closed DB: {org_id}")
except Exception:
pass
if _vector_db_conn:
try:
_vector_db_conn.close()
logger.info(" β Closed Vector DB")
except Exception:
pass
logger.info("β
Shutdown complete")
# βββ FastAPI Application βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
app = FastAPI(
title="MutSyncHub Analytics Engine",
version="3.0.0",
description="""Enterprise-grade AI analytics engine with:
β’ Hybrid entity detection (Rule-based + LLM)
β’ Vector similarity search (DuckDB VSS)
β’ Zero external API costs (Local Mistral-7B)
β’ Multi-tenant data isolation
β’ Redis-backed async processing
**π All endpoints require X-API-KEY header except /health**""",
lifespan=lifespan,
docs_url="/api/docs",
redoc_url="/api/redoc",
openapi_url="/api/openapi.json",
contact={
"name": "MutSyncHub Enterprise",
"email": "enterprise@mutsynchub.com"
},
license_info={
"name": "MIT License",
}
)
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
# βββ Startup Workers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.on_event("startup")
async def start_workers():
"""π Start Einstein+Elon engine"""
# 1. Redis listener (triggers AnalyticsWorker)
# Redis listener removed; worker manager now handles trigger events
logger.info("β
Worker manager will handle trigger events")
# 2. Vector cleanup (daily)
def run_cleanup():
while True:
cleanup_expired_vectors()
time.sleep(86400) # 24 hours
cleanup_thread = threading.Thread(target=run_cleanup, daemon=True)
cleanup_thread.start()
logger.info("β
Vector cleanup scheduler started")
# βββ Request ID Middleware βββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.middleware("http")
async def add_request_tracking(request: Request, call_next):
"""
Add request ID and timing for observability.
"""
request_id = f"req-{uuid.uuid4().hex[:12]}"
request.state.request_id = request_id
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
# Add headers
response.headers["X-Request-ID"] = request_id
response.headers["X-Response-Time"] = f"{process_time:.3f}s"
# Log
logger.info(
f"{request.method} {request.url.path} | {response.status_code} "
f"| {process_time:.3f}s | {request_id}"
)
return response
# βββ KPI Computation Endpoint ββββββββββββββββββββββββββββββββββββββββββββββββββ
# βββ KPI Computation Endpoint ββββββββββββββββββββββββββββββββββββββββββββββββββ
# At top of app/main.py - add import
# Replace the compute_kpis function
@app.post("/api/v1/kpi/compute")
async def compute_kpis(
background_tasks: BackgroundTasks,
org_id: str = Query(..., description="Organization ID"),
source_id: str = Query(..., description="Data source ID"),
api_key: str = Depends(verify_api_key), # β
Returns string, not HTTPAuthorizationCredentials
limited_org: str = Depends(rate_limit_org(max_requests=50))
):
"""
Trigger KPI computation.
Returns immediately; results published to Redis stream.
"""
try:
# Check cache first
cached = event_hub.get_key(f"kpi_cache:{org_id}:{source_id}")
if cached:
return {
"status": "cached",
"org_id": org_id,
"data": json.loads(cached),
"rate_limit": {
"remaining": 50,
"reset_in": 60
}
}
background_tasks.add_task(trigger_kpi_computation, org_id, source_id)
return {
"status": "processing",
"org_id": org_id,
"message": "KPI computation queued. Poll /analytics/stream/recent for results.",
"poll_url": f"/api/v1/analytics/stream/recent?org_id={org_id}&source_id={source_id}"
}
except Exception as e:
logger.error(f"β KPI compute error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# βββ Background KPI Scheduler ββββββββββββββββββββββββββββββββββββββββββββββββββ
async def continuous_kpi_refresh():
"""
Auto-refresh KPIs every 5 minutes for active organizations.
"""
await asyncio.sleep(10) # Let app startup complete
while True:
try:
logger.debug("π KPI scheduler tick...")
active_keys = event_hub.keys("entity:*")
for key in active_keys[:10]: # Max 10 per batch
key_parts = safe_redis_decode(key).split(":")
if len(key_parts) >= 3:
org_id, source_id = key_parts[1], key_parts[2]
# Skip if recently computed
cache_key = f"kpi_cache:{org_id}:{source_id}"
if event_hub.exists(cache_key):
continue
# Skip if worker already running
if event_hub.exists(f"worker:lock:{org_id}:{source_id}"):
continue
# Trigger computation
logger.info(f"β° Auto-triggering KPIs for {org_id}/{source_id}")
await trigger_kpi_computation(org_id, source_id)
await asyncio.sleep(1) # 1s gap between triggers
except Exception as e:
logger.error(f"β Scheduler error: {e}")
await asyncio.sleep(300) # β CRITICAL: Sleep 5 minutes between cycles
@app.get("/debug/stream-content")
def debug_stream(
org_id: str = Query(...),
source_id: str = Query(...),
api_key: str = Depends(verify_api_key)
):
"""See what's actually in the Redis stream"""
stream_key = f"stream:analytics:{org_id}:{source_id}"
events = event_hub.read_recent_stream(stream_key, 10)
# Also check for entity/industry keys
entity_key = f"entity:{org_id}:{source_id}"
industry_key = f"industry:{org_id}:{source_id}"
return {
"stream_key": stream_key,
"events_count": len(events),
"events": events,
"entity_exists": bool(event_hub.get_key(entity_key)),
"industry_exists": bool(event_hub.get_key(industry_key)),
"entity_data": event_hub.get_key(entity_key),
"industry_data": event_hub.get_key(industry_key),
}
@app.post("/api/v1/cache/clear")
def clear_cache(org_id: str, source_id: str, api_key: str = Depends(verify_api_key)):
"""Clear entity/industry caches to force fresh reads"""
cache_key = (org_id, source_id)
# Import the cache dicts
from app.mapper import _ENTITY_CACHE, _INDUSTRY_CACHE
if cache_key in _ENTITY_CACHE:
del _ENTITY_CACHE[cache_key]
if cache_key in _INDUSTRY_CACHE:
del _INDUSTRY_CACHE[cache_key]
return {"status": "cleared", "cache_key": str(cache_key)}
# βββ Root Endpoint βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.get("/", tags=["root"])
def read_root():
"""
Service information and discovery.
"""
return {
"status": "operational",
"service": "MutSyncHub Analytics Engine",
"version": "3.0.0",
"mode": "production" if os.getenv("SPACE_ID") else "development",
"instance_id": app.state.instance_id,
"endpoints": {
"docs": "/api/docs",
"health": "/api/health/detailed",
"datasources": "/api/datasources",
},
"features": [
"Hybrid entity detection",
"Vector similarity search",
"Multi-tenant isolation",
"Redis-backed async processing"
]
}
# βββ CORS Configuration ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ALLOWED_ORIGINS = [
"https://mut-sync-hub.vercel.app",
"http://localhost:3000",
"https://studio.huggingface.co",
]
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
expose_headers=["X-Request-ID", "X-Response-Time"],
max_age=3600,
)
# βββ Global Error Handler ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""
Catch all uncaught exceptions and return safe error response.
"""
logger.error(
f"π΄ Unhandled error | Path: {request.url.path} | "
f"Request ID: {request.state.request_id} | Error: {str(exc)}",
exc_info=True
)
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"message": "An unexpected error occurred. Check server logs.",
"request_id": request.state.request_id,
"timestamp": time.time()
}
)
# βββ Router Registration βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Register routers (explicitly, no loops)
app.include_router(health.router, prefix="/health")
app.include_router(datasources.router, prefix="/api/v1/datasources", dependencies=[Depends(verify_api_key)])
app.include_router(reports.router, prefix="/api/v1/reports", dependencies=[Depends(verify_api_key)])
app.include_router(flags.router, prefix="/api/v1/flags", dependencies=[Depends(verify_api_key)])
app.include_router(scheduler.router, prefix="/api/v1/scheduler", dependencies=[Depends(verify_api_key)])
app.include_router(analytics_stream.router, dependencies=[Depends(verify_api_key)])
app.include_router(ai_query.router, prefix="/api/v1/ai-query", dependencies=[Depends(verify_api_key)])
app.include_router(schema.router, prefix="/api/v1/schema", dependencies=[Depends(verify_api_key)]) |