Peter Mutwiri commited on
Commit
a0ff994
Β·
1 Parent(s): 318d2d8

feat: enterprise AI stack with hybrid entity detection

Browse files
.env DELETED
@@ -1 +0,0 @@
1
- API_KEYS=dev-analytics-key-123
 
 
app/deps.py CHANGED
@@ -1,10 +1,180 @@
 
1
  import os
 
 
 
 
 
2
  from fastapi import HTTPException, Header
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
 
4
- API_KEYS = os.getenv("API_KEYS", "").split(",")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
 
6
- def verify_key(x_api_key: str = Header(None, convert_underscores=True)): # ← accept any case
7
- print(f"[verify_key] received: {x_api_key}, allowed: {API_KEYS}")
8
- if not x_api_key or x_api_key not in API_KEYS:
9
- raise HTTPException(status_code=401, detail="Invalid API key")
10
- return x_api_key
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ── Standard Library ──────────────────────────────────────────────────────────
2
  import os
3
+ from typing import Optional
4
+ import pathlib
5
+
6
+ # ── Third-Party ────────────────────────────────────────────────────────────────
7
+ import duckdb
8
  from fastapi import HTTPException, Header
9
+ from upstash_redis import Redis
10
+
11
+ # ── Configuration Paths ────────────────────────────────────────────────────────
12
+ # Use YOUR existing pattern from app/db.py (multi-tenant)
13
+ DATA_DIR = pathlib.Path("./data/duckdb")
14
+ DATA_DIR.mkdir(parents=True, exist_ok=True)
15
+
16
+ # Vector database for AI embeddings (shared but org-filtered)
17
+ VECTOR_DB_PATH = DATA_DIR / "vectors.duckdb"
18
+
19
+ # ── Secrets Management ─────────────────────────────────────────────────────────
20
+ def get_secret(name: str, required: bool = True) -> Optional[str]:
21
+ """
22
+ Centralized secret retrieval with validation.
23
+ Fails fast on missing required secrets.
24
+ """
25
+ value = os.getenv(name)
26
+ if required and (not value or value.strip() == ""):
27
+ raise ValueError(f"πŸ”΄ CRITICAL: Required secret '{name}' not found in HF environment")
28
+ return value
29
+
30
+ # API Keys (comma-separated for multiple Vercel projects)
31
+ API_KEYS = get_secret("API_KEYS").split(",") if get_secret("API_KEYS") else []
32
+
33
+ # Upstash Redis Bridge (required for Vercel ↔ HF communication)
34
+ REDIS_URL = get_secret("UPSTASH_REDIS_REST_URL")
35
+ REDIS_TOKEN = get_secret("UPSTASH_REDIS_REST_TOKEN")
36
+
37
+ # Hugging Face Token (read-only, for model download)
38
+ HF_API_TOKEN = get_secret("HF_API_TOKEN", required=False)
39
+
40
+ # QStash Token (optional, for advanced queue features)
41
+ QSTASH_TOKEN = get_secret("QSTASH_TOKEN", required=False)
42
+
43
+ # ── Singleton Database Connections ──────────────────────────────────────────────
44
+ _org_db_connections = {}
45
+ _vector_db_conn = None
46
+
47
+ def get_duckdb(org_id: str):
48
+ """
49
+ Multi-tenant DuckDB connection (YOUR proven pattern).
50
+ Each org gets isolated: ./data/duckdb/{org_id}.duckdb
51
+ """
52
+ if org_id not in _org_db_connections:
53
+ db_file = DATA_DIR / f"{org_id}.duckdb"
54
+ conn = duckdb.connect(str(db_file), read_only=False)
55
+
56
+ # Ensure schemas exist
57
+ conn.execute("CREATE SCHEMA IF NOT EXISTS main")
58
+ conn.execute("CREATE SCHEMA IF NOT EXISTS vector_store")
59
+
60
+ # Enable vector search extension
61
+ try:
62
+ conn.execute("INSTALL vss;")
63
+ conn.execute("LOAD vss;")
64
+ except Exception as e:
65
+ print(f"⚠️ VSS extension warning (non-critical): {e}")
66
+
67
+ _org_db_connections[org_id] = conn
68
+
69
+ return _org_db_connections[org_id]
70
+
71
+ def get_vector_db():
72
+ """
73
+ Shared vector database for AI embeddings.
74
+ Org-isolated via queries: WHERE org_id = ?
75
+ """
76
+ global _vector_db_conn
77
+ if _vector_db_conn is None:
78
+ _vector_db_conn = duckdb.connect(str(VECTOR_DB_PATH), read_only=False)
79
+
80
+ # Enable vector search
81
+ _vector_db_conn.execute("INSTALL vss;")
82
+ _vector_db_conn.execute("LOAD vss;")
83
+
84
+ # Create schema and table
85
+ _vector_db_conn.execute("CREATE SCHEMA IF NOT EXISTS vector_store")
86
+ _vector_db_conn.execute("""
87
+ CREATE TABLE IF NOT EXISTS vector_store.embeddings (
88
+ id VARCHAR PRIMARY KEY,
89
+ org_id VARCHAR NOT NULL,
90
+ content TEXT,
91
+ embedding FLOAT[384],
92
+ entity_type VARCHAR,
93
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
94
+ )
95
+ """)
96
+
97
+ # Performance index for org-filtered searches
98
+ try:
99
+ _vector_db_conn.execute("""
100
+ CREATE INDEX IF NOT EXISTS idx_org_entity
101
+ ON vector_store.embeddings (org_id, entity_type)
102
+ """)
103
+ except Exception as e:
104
+ print(f"⚠️ Index creation warning: {e}")
105
+
106
+ return _vector_db_conn
107
+
108
+ # ── Redis Singleton ────────────────────────────────────────────────────────────
109
+ _redis_client = None
110
+
111
+ def get_redis():
112
+ """
113
+ Upstash Redis client (singleton) for Vercel bridge.
114
+ """
115
+ global _redis_client
116
+ if _redis_client is None:
117
+ _redis_client = Redis(url=REDIS_URL, token=REDIS_TOKEN)
118
+
119
+ # Test connection on first load
120
+ try:
121
+ _redis_client.ping()
122
+ print("βœ… Redis bridge connected")
123
+ except Exception as e:
124
+ raise RuntimeError(f"πŸ”΄ Redis connection failed: {e}")
125
+
126
+ return _redis_client
127
 
128
+ # ── API Security Dependency ────────────────────────────────────────────────────
129
+ def verify_api_key(x_api_key: str = Header(..., alias="X-API-KEY")):
130
+ """
131
+ FastAPI dependency for Vercel endpoints.
132
+ Rejects invalid API keys with 401.
133
+ """
134
+ if not API_KEYS:
135
+ raise HTTPException(
136
+ status_code=500,
137
+ detail="πŸ”΄ API_KEYS not configured in HF environment"
138
+ )
139
+
140
+ if x_api_key not in API_KEYS:
141
+ raise HTTPException(
142
+ status_code=401,
143
+ detail="❌ Invalid API key"
144
+ )
145
+
146
+ return x_api_key
147
 
148
+ # ── Health Check Utilities ─────────────────────────────────────────────────────
149
+ def check_all_services():
150
+ """
151
+ Comprehensive health check for /health endpoint.
152
+ Returns dict with service statuses.
153
+ """
154
+ statuses = {}
155
+
156
+ # Check DuckDB
157
+ try:
158
+ conn = get_duckdb("health_check")
159
+ conn.execute("SELECT 1")
160
+ statuses["duckdb"] = "βœ… connected"
161
+ except Exception as e:
162
+ statuses["duckdb"] = f"❌ {e}"
163
+
164
+ # Check Vector DB
165
+ try:
166
+ vdb = get_vector_db()
167
+ vdb.execute("SELECT 1")
168
+ statuses["vector_db"] = "βœ… connected"
169
+ except Exception as e:
170
+ statuses["vector_db"] = f"❌ {e}"
171
+
172
+ # Check Redis
173
+ try:
174
+ r = get_redis()
175
+ r.ping()
176
+ statuses["redis"] = "βœ… connected"
177
+ except Exception as e:
178
+ statuses["redis"] = f"❌ {e}"
179
+
180
+ return statuses
app/hybrid_entity_detector.py ADDED
@@ -0,0 +1,33 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/hybrid_entity_detector.py
2
+ from typing import Tuple
3
+ import pandas as pd
4
+ from app.entity_detector import detect_entity_type as rule_based_detect
5
+ from app.service.ai_service import ai_service
6
+
7
+ def hybrid_detect_entity_type(org_id: str, df: pd.DataFrame, filename: str) -> Tuple[str, float, bool]:
8
+ """
9
+ Hybrid detection: Rule-based (fast) β†’ LLM fallback (accurate).
10
+ Returns: (entity_type, confidence, is_confident)
11
+ """
12
+ # 1. Rule-based first (your proven logic)
13
+ entity_type, confidence = rule_based_detect(df)
14
+
15
+ # 2. If highly confident, return immediately
16
+ if confidence > 0.75:
17
+ return entity_type, confidence, True
18
+
19
+ # 3. LLM fallback for edge cases
20
+ columns = list(df.columns)
21
+ try:
22
+ llm_result = ai_service.detect_entity_type(org_id, columns, filename)
23
+
24
+ # Use LLM result if it's more confident
25
+ if llm_result["confidence"] > confidence:
26
+ return llm_result["entity_type"], llm_result["confidence"], True
27
+
28
+ # LLM agrees but with lower confidence
29
+ return entity_type, confidence, False
30
+
31
+ except Exception as e:
32
+ print(f"[hybrid] LLM fallback failed: {e}, using rule-based")
33
+ return entity_type, confidence, False
app/main.py CHANGED
@@ -1,64 +1,254 @@
1
- from fastapi import FastAPI, Depends
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  from fastapi.middleware.cors import CORSMiddleware
3
- from fastapi.encoders import jsonable_encoder
4
  from fastapi.responses import JSONResponse
5
- from app.routers import ingress, reports, flags, datasources, scheduler, run, health, socket
6
- from app.tasks.scheduler import start_scheduler
7
- from app.deps import verify_key
8
- from contextlib import asynccontextmanager
9
- import os
10
 
11
- # ---------- lifespan ----------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  @asynccontextmanager
13
  async def lifespan(app: FastAPI):
14
- start_scheduler()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
  yield
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
- # ---------- app init ----------
18
  app = FastAPI(
19
  title="MutSyncHub Analytics Engine",
20
- version="2.2",
21
- lifespan=lifespan
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  )
23
 
24
- @app.get("/")
25
- def read_root():
26
- return {"status": "ok", "service": "analytics-engine"}
27
- # ---------- Socket.IO Mount ----------
28
- app.mount("/socket.io", socket.socket_app)
29
-
30
- # ---------- Middleware (fix order) ----------
31
  @app.middleware("http")
32
- async def serialize_all_responses(request, call_next):
33
- """Ensure all responses are safely JSON-serializable."""
 
 
 
 
 
 
34
  response = await call_next(request)
35
- if isinstance(response, dict):
36
- return JSONResponse(content=jsonable_encoder(response))
 
 
 
 
 
 
 
 
 
 
37
  return response
38
 
39
- # ---------- CORS Configuration ----------
40
- origins = [
41
- "https://mut-sync-hub.vercel.app", # live frontend
42
- "http://localhost:3000", # local dev
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  ]
 
44
  app.add_middleware(
45
  CORSMiddleware,
46
- allow_origins=origins,
47
  allow_credentials=True,
48
- allow_methods=["*"],
49
  allow_headers=["*"],
 
 
50
  )
51
 
52
- # ---------- Routers ----------
53
- app.include_router(health.router) # public route (no key)
54
- app.include_router(datasources.router, dependencies=[Depends(verify_key)])
55
- app.include_router(reports.router, dependencies=[Depends(verify_key)])
56
- app.include_router(flags.router, dependencies=[Depends(verify_key)])
57
- app.include_router(scheduler.router, dependencies=[Depends(verify_key)])
58
- app.include_router(run.router, dependencies=[Depends(verify_key)])
59
- app.include_router(socket.router)
60
-
61
- # ---------- Public Health Endpoint ----------
62
- @app.get("/health")
63
- def health_check():
64
- return {"status": "ok", "service": "analytics-engine"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/main.py – ENTERPRISE ANALYTICS ENGINE v3.0
2
+ """
3
+ MutSyncHub Analytics Engine
4
+ Enterprise-grade AI analytics platform with zero-cost inference
5
+ """
6
+
7
+ # ─── Standard Library ─────────────────────────────────────────────────────────
8
+ import os
9
+ import time
10
+ import uuid
11
+ import logging
12
+
13
+ # ─── Third-Party ──────────────────────────────────────────────────────────────
14
+ from fastapi import FastAPI, Depends, HTTPException, Request
15
  from fastapi.middleware.cors import CORSMiddleware
 
16
  from fastapi.responses import JSONResponse
 
 
 
 
 
17
 
18
+ # ─── Router Imports ───────────────────────────────────────────────────────────
19
+ from app.routers import (
20
+ health, # Health & monitoring
21
+ datasources, # Data ingestion
22
+ reports, # Report generation
23
+ flags, # Feature flags
24
+ scheduler, # Background jobs
25
+ run, # Analytics execution
26
+ ai, # AI endpoints (NEW)
27
+ )
28
+
29
+ # ─── Dependencies ─────────────────────────────────────────────────────────────
30
+ from app.deps import verify_api_key, check_all_services
31
+
32
+ # ─── Logger Configuration ───────────────────────────────────────────────────────
33
+ logging.basicConfig(
34
+ level=logging.INFO,
35
+ format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
36
+ datefmt="%Y-%m-%d %H:%M:%S"
37
+ )
38
+ logger = logging.getLogger(__name__)
39
+
40
+ # ─── Lifespan Management ───────────────────────────────────────────────────────
41
  @asynccontextmanager
42
  async def lifespan(app: FastAPI):
43
+ """
44
+ Enterprise startup/shutdown sequence with health validation.
45
+ """
46
+ # ─── Startup ───────────────────────────────────────────────────────────────
47
+ logger.info("=" * 60)
48
+ logger.info("πŸš€ ANALYTICS ENGINE v3.0 - STARTUP SEQUENCE")
49
+ logger.info("=" * 60)
50
+
51
+ app.state.instance_id = f"engine-{uuid.uuid4().hex[:8]}"
52
+ logger.info(f"Instance ID: {app.state.instance_id}")
53
+
54
+ # Validate service health on boot
55
+ try:
56
+ services = check_all_services()
57
+ healthy = [k for k, v in services.items() if "βœ…" in str(v)]
58
+ unhealthy = [k for k, v in services.items() if "❌" in str(v)]
59
+
60
+ logger.info(f"βœ… Healthy: {len(healthy)} services")
61
+ for svc in healthy:
62
+ logger.info(f" β†’ {svc}: {services[svc]}")
63
+
64
+ if unhealthy:
65
+ logger.warning(f"⚠️ Unhealthy: {len(unhealthy)} services")
66
+ for svc in unhealthy:
67
+ logger.warning(f" β†’ {svc}: {services[svc]}")
68
+
69
+ except Exception as e:
70
+ logger.error(f"πŸ”΄ Startup health check failed: {e}")
71
+
72
+ logger.info("βœ… Startup sequence complete")
73
  yield
74
+
75
+ # ─── Shutdown ──────────────────────────────────────────────────────────────
76
+ logger.info("=" * 60)
77
+ logger.info("πŸ›‘ ANALYTICS ENGINE - SHUTDOWN SEQUENCE")
78
+ logger.info("=" * 60)
79
+
80
+ # Close all database connections
81
+ from app.deps import _org_db_connections, _vector_db_conn
82
+
83
+ if _org_db_connections:
84
+ for org_id, conn in _org_db_connections.items():
85
+ try:
86
+ conn.close()
87
+ logger.info(f" β†’ Closed DB: {org_id}")
88
+ except:
89
+ pass
90
+
91
+ if _vector_db_conn:
92
+ try:
93
+ _vector_db_conn.close()
94
+ logger.info(" β†’ Closed Vector DB")
95
+ except:
96
+ pass
97
+
98
+ logger.info("βœ… Shutdown complete")
99
 
100
+ # ─── FastAPI Application ───────────────────────────────────────────────────────
101
  app = FastAPI(
102
  title="MutSyncHub Analytics Engine",
103
+ version="3.0.0",
104
+ description="""Enterprise-grade AI analytics engine with:
105
+
106
+ β€’ Hybrid entity detection (Rule-based + LLM)
107
+ β€’ Vector similarity search (DuckDB VSS)
108
+ β€’ Zero external API costs (Local Mistral-7B)
109
+ β€’ Multi-tenant data isolation
110
+ β€’ Redis-backed async processing
111
+
112
+ **πŸ”’ All endpoints require X-API-KEY header except /health**""",
113
+ lifespan=lifespan,
114
+ docs_url="/api/docs",
115
+ redoc_url="/api/redoc",
116
+ openapi_url="/api/openapi.json",
117
+ contact={
118
+ "name": "MutSyncHub Enterprise",
119
+ "email": "enterprise@mutsynchub.com"
120
+ },
121
+ license_info={
122
+ "name": "Enterprise License",
123
+ }
124
  )
125
 
126
+ # ─── Request ID Middleware ─────────────────────────────────────────────────────
 
 
 
 
 
 
127
  @app.middleware("http")
128
+ async def add_request_tracking(request: Request, call_next):
129
+ """
130
+ Add request ID and timing for observability.
131
+ """
132
+ request_id = f"req-{uuid.uuid4().hex[:12]}"
133
+ request.state.request_id = request_id
134
+
135
+ start_time = time.time()
136
  response = await call_next(request)
137
+ process_time = time.time() - start_time
138
+
139
+ # Add headers
140
+ response.headers["X-Request-ID"] = request_id
141
+ response.headers["X-Response-Time"] = f"{process_time:.3f}s"
142
+
143
+ # Log
144
+ logger.info(
145
+ f"{request.method} {request.url.path} | {response.status_code} "
146
+ f"| {process_time:.3f}s | {request_id}"
147
+ )
148
+
149
  return response
150
 
151
+ # ─── Root Endpoint ─────────────────────────────────────────────────────────────
152
+ @app.get("/", tags=["root"])
153
+ def read_root():
154
+ """
155
+ Service information and discovery.
156
+ """
157
+ return {
158
+ "status": "operational",
159
+ "service": "MutSyncHub Analytics Engine",
160
+ "version": "3.0.0",
161
+ "mode": "production" if os.getenv("SPACE_ID") else "development",
162
+ "instance_id": app.state.instance_id,
163
+ "endpoints": {
164
+ "docs": "/api/docs",
165
+ "health": "/api/health/detailed",
166
+ "datasources": "/api/datasources",
167
+ "ai": "/api/ai",
168
+ },
169
+ "features": [
170
+ "Hybrid entity detection",
171
+ "Vector similarity search",
172
+ "Multi-tenant isolation",
173
+ "Zero-cost LLM inference",
174
+ "Redis-backed processing"
175
+ ]
176
+ }
177
+
178
+ # ─── CORS Configuration ────────────────────────────────────────────────────────
179
+ ALLOWED_ORIGINS = [
180
+ "https://mut-sync-hub.vercel.app",
181
+ "http://localhost:3000",
182
+ "https://studio.huggingface.co",
183
  ]
184
+
185
  app.add_middleware(
186
  CORSMiddleware,
187
+ allow_origins=ALLOWED_ORIGINS,
188
  allow_credentials=True,
189
+ allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
190
  allow_headers=["*"],
191
+ expose_headers=["X-Request-ID", "X-Response-Time"],
192
+ max_age=3600,
193
  )
194
 
195
+ # ─── Global Error Handler ──────────────────────────────────────────────────────
196
+ @app.exception_handler(Exception)
197
+ async def global_exception_handler(request: Request, exc: Exception):
198
+ """
199
+ Catch all uncaught exceptions and return safe error response.
200
+ """
201
+ logger.error(
202
+ f"πŸ”΄ Unhandled error | Path: {request.url.path} | "
203
+ f"Request ID: {request.state.request_id} | Error: {str(exc)}",
204
+ exc_info=True
205
+ )
206
+
207
+ return JSONResponse(
208
+ status_code=500,
209
+ content={
210
+ "error": "Internal server error",
211
+ "message": "An unexpected error occurred. Check server logs.",
212
+ "request_id": request.state.request_id,
213
+ "timestamp": time.time()
214
+ }
215
+ )
216
+
217
+ # ─── Router Registration ───────────────────────────────────────────────────────
218
+ # Public routers (no authentication)
219
+ PUBLIC_ROUTERS = [
220
+ (health.router, "/api"),
221
+ ]
222
+
223
+ # Protected routers (require X-API-KEY)
224
+ PROTECTED_ROUTERS = [
225
+ (datasources.router, "/api/datasources"),
226
+ (reports.router, "/api/reports"),
227
+ (flags.router, "/api/flags"),
228
+ (scheduler.router, "/api/scheduler"),
229
+ (run.router, "/api/run"),
230
+ (ai.router, "/api/ai"),
231
+ ]
232
+
233
+ # Register routers with tags for OpenAPI
234
+ for router, prefix in PUBLIC_ROUTERS:
235
+ app.include_router(router, prefix=prefix)
236
+
237
+ for router, prefix in PROTECTED_ROUTERS:
238
+ app.include_router(
239
+ router,
240
+ prefix=prefix,
241
+ dependencies=[Depends(verify_api_key)],
242
+ tags=[prefix.split("/")[-1].title()]
243
+ )
244
+
245
+ # Log router registration
246
+ if __name__ == "__main__":
247
+ logger.info("=" * 60)
248
+ logger.info("πŸ“Š ROUTER REGISTRATION SUMMARY")
249
+ logger.info("=" * 60)
250
+ for router, prefix in PROTECTED_ROUTERS:
251
+ logger.info(f"πŸ”’ {prefix:30} β†’ PROTECTED")
252
+ for router, prefix in PUBLIC_ROUTERS:
253
+ logger.info(f"πŸ”“ {prefix:30} β†’ PUBLIC")
254
+ logger.info("=" * 60)
app/mapper.py CHANGED
@@ -6,6 +6,8 @@ import pandas as pd
6
  from datetime import datetime, timedelta
7
  from app.db import get_conn, ensure_raw_table
8
  from app.utils.detect_industry import _ALIAS, detect_industry
 
 
9
 
10
  # ---------------------- Canonical schema base ---------------------- #
11
  CANONICAL = {
@@ -28,13 +30,10 @@ def map_pandas_to_duck(col: str, series: pd.Series) -> str:
28
  if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP"
29
  return "VARCHAR"
30
 
31
- # ---------- INDUSTRY DETECTION (uses centralized detect_industry) ---------- #
32
- def ensure_canonical_table(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame) -> str:
33
- """
34
- Creates single canonical table and adds missing columns dynamically.
35
- BULLETPROOF: Handles int column names, missing columns, race conditions.
36
- """
37
- table_name = "main.canonical"
38
 
39
  # Create base table if doesn't exist
40
  duck.execute(f"""
@@ -81,7 +80,7 @@ def save_dynamic_aliases() -> None:
81
  json.dump(CANONICAL, f, indent=2)
82
 
83
  # ---------- Main Canonify Function (ENTERPRISE-GRADE) ---------- #
84
- def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
85
  """
86
  Enterprise ingestion pipeline:
87
  - Accepts ANY raw data shape
@@ -197,15 +196,17 @@ def canonify_df(org_id: str, hours_window: int = 24) -> tuple[pd.DataFrame, str,
197
  except Exception as e:
198
  print(f"[canonify] Type conversion warning (non-critical): {e}")
199
 
200
- # 6) βœ… Industry detection
201
- industry, confidence = detect_industry(df)
202
- print(f"[canonify] 🎯 Industry: {industry} ({confidence:.1%} confidence)")
 
203
 
204
  # 7) Dynamic schema evolution
205
  os.makedirs("./db", exist_ok=True)
206
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
207
 
208
- table_name = ensure_canonical_table(duck, df)
 
209
 
210
  # βœ… SAFE INSERT: Match columns explicitly
211
  if not df.empty:
 
6
  from datetime import datetime, timedelta
7
  from app.db import get_conn, ensure_raw_table
8
  from app.utils.detect_industry import _ALIAS, detect_industry
9
+ # app/mapper.py (add line 1)
10
+ from app.hybrid_entity_detector import hybrid_detect_entity_type
11
 
12
  # ---------------------- Canonical schema base ---------------------- #
13
  CANONICAL = {
 
30
  if pd.api.types.is_datetime64_any_dtype(series): return "TIMESTAMP"
31
  return "VARCHAR"
32
 
33
+ # ---------- entity detection(uses ai to detect entity from the data) ---------- #
34
+ def ensure_canonical_table(duck: duckdb.DuckDBPyConnection, df: pd.DataFrame, entity_type: str) -> str:
35
+ """Creates entity-specific table: main.sales_canonical, main.inventory_canonical, etc."""
36
+ table_name = f"main.{entity_type}_canonical"
 
 
 
37
 
38
  # Create base table if doesn't exist
39
  duck.execute(f"""
 
80
  json.dump(CANONICAL, f, indent=2)
81
 
82
  # ---------- Main Canonify Function (ENTERPRISE-GRADE) ---------- #
83
+ def canonify_df(org_id: str, filename: str, hours_window: int = 24) -> tuple[pd.DataFrame, str, float]:
84
  """
85
  Enterprise ingestion pipeline:
86
  - Accepts ANY raw data shape
 
196
  except Exception as e:
197
  print(f"[canonify] Type conversion warning (non-critical): {e}")
198
 
199
+ # 6) βœ… Hybrid entity detection (rule-based + LLM fallback)
200
+ entity_type, confidence, is_confident = hybrid_detect_entity_type(org_id, df, filename)
201
+ print(f"[canonify] 🎯 Entity: {entity_type} ({confidence:.1%} confidence, AI: {not is_confident})")
202
+ industry = entity_type
203
 
204
  # 7) Dynamic schema evolution
205
  os.makedirs("./db", exist_ok=True)
206
  duck = duckdb.connect(f"./db/{org_id}.duckdb")
207
 
208
+ # 7) βœ… Entity-specific canonical table
209
+ table_name = ensure_canonical_table(duck, df, entity_type)
210
 
211
  # βœ… SAFE INSERT: Match columns explicitly
212
  if not df.empty:
app/redis_client.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/redis_client.py
2
+ from app.deps import get_redis
3
+
4
+ # Export the singleton instance
5
+ redis = get_redis()
6
+
7
+ # Test on import
8
+ try:
9
+ redis.ping()
10
+ print("βœ… Redis bridge connected")
11
+ except Exception as e:
12
+ print(f"❌ Redis connection failed: {e}")
13
+ raise RuntimeError(f"Redis not available: {e}")
app/routers/health.py CHANGED
@@ -1,7 +1,98 @@
1
- from fastapi import APIRouter
 
 
 
 
2
 
3
  router = APIRouter(tags=["health"])
4
 
5
  @router.get("/health")
6
- def health():
7
- return {"status": "ok", "service": "analytics-engine"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/routers/health.py – ENTERPRISE HEALTH CHECKS
2
+ from fastapi import APIRouter, HTTPException, Depends
3
+ from app.deps import check_all_services, get_redis, get_vector_db, get_duckdb
4
+ import os
5
+ import time
6
 
7
  router = APIRouter(tags=["health"])
8
 
9
  @router.get("/health")
10
+ def health_check():
11
+ """
12
+ Basic health check for load balancers.
13
+ Returns 200 if service is alive.
14
+ """
15
+ return {"status": "ok", "service": "analytics-engine"}
16
+
17
+ @router.get("/health/detailed")
18
+ def health_detailed():
19
+ """
20
+ Comprehensive health check for all services.
21
+ Returns detailed status of each component.
22
+ """
23
+ start_time = time.time()
24
+ statuses = check_all_services()
25
+
26
+ # Determine overall health
27
+ all_healthy = all("βœ…" in str(status) for status in statuses.values())
28
+ http_status = 200 if all_healthy else 503
29
+
30
+ return {
31
+ "status": "healthy" if all_healthy else "unhealthy",
32
+ "services": statuses,
33
+ "environment": "production" if os.getenv("SPACE_ID") else "development",
34
+ "uptime_seconds": time.time() - start_time,
35
+ "timestamp": time.time()
36
+ }
37
+
38
+ @router.get("/health/ready")
39
+ def health_ready():
40
+ """
41
+ Kubernetes-style readiness probe.
42
+ Returns 200 if ready to serve traffic.
43
+ """
44
+ try:
45
+ # Quick smoke test: Can we connect to core services?
46
+ redis = get_redis()
47
+ redis.ping()
48
+
49
+ # Test DuckDB with a dummy org
50
+ conn = get_duckdb("health_check")
51
+ conn.execute("SELECT 1")
52
+
53
+ return {"status": "ready"}
54
+ except Exception as e:
55
+ raise HTTPException(
56
+ status_code=503,
57
+ detail=f"Not ready: {str(e)}"
58
+ )
59
+
60
+ @router.get("/health/live")
61
+ def health_live():
62
+ """
63
+ Kubernetes-style liveness probe.
64
+ Returns 200 if service is alive (doesn't check dependencies).
65
+ """
66
+ return {"status": "alive"}
67
+
68
+ @router.post("/health/reload")
69
+ def health_reload(_: str = Depends(check_all_services)):
70
+ """
71
+ Trigger reload of services (if needed).
72
+ Requires API key for security.
73
+ """
74
+ # Clear cached connections
75
+ from app.deps import _org_db_connections, _vector_db_conn, _redis_client
76
+
77
+ _org_db_connections.clear()
78
+ _vector_db_conn = None
79
+ _redis_client = None
80
+
81
+ return {"status": "reloaded", "message": "Connections cleared"}
82
+
83
+ @router.get("/health/metrics")
84
+ def health_metrics():
85
+ """
86
+ Performance metrics for monitoring.
87
+ """
88
+ try:
89
+ import psutil
90
+
91
+ return {
92
+ "cpu_percent": psutil.cpu_percent(),
93
+ "memory_mb": psutil.virtual_memory().used // (1024 * 1024),
94
+ "disk_gb": psutil.disk_usage("/").free // (1024**3),
95
+ "connections": len(_org_db_connections) if '_org_db_connections' in globals() else 0
96
+ }
97
+ except ImportError:
98
+ return {"error": "psutil not installed"}
app/service/ai_service.py ADDED
@@ -0,0 +1,61 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/service/ai_service.py
2
+ import json
3
+ from app.deps import get_vector_db
4
+ from app.service.llm_service import llm_service
5
+ from app.service.embedding_service import embedder
6
+
7
+ class AIService:
8
+ def __init__(self):
9
+ self.vector_db = get_vector_db()
10
+ self.llm = llm_service
11
+ self.embedder = embedder
12
+
13
+ def detect_entity_type(self, org_id: str, columns: list[str], filename: str) -> dict:
14
+ """Detect entity type - per-org cache"""
15
+ columns_str = ",".join(columns)
16
+
17
+ # Check vector cache for this org
18
+ cached = self.vector_db.execute("""
19
+ SELECT entity_type FROM vector_store.embeddings
20
+ WHERE org_id = ? AND content = ?
21
+ ORDER BY created_at DESC LIMIT 1
22
+ """, [org_id, columns_str]).fetchone()
23
+
24
+ if cached:
25
+ return {"entity_type": cached[0], "confidence": 0.99, "cached": True}
26
+
27
+ # AI detection
28
+ prompt = f"""Columns: {columns_str}\nFilename: {filename}\nClassify as: sales,inventory,customer,product. JSON: {{"entity_type":"...","confidence":0.95}}"""
29
+ response = self.llm.generate(prompt, max_tokens=100)
30
+ result = json.loads(response)
31
+
32
+ # Cache for this org
33
+ embedding = self.embedder.generate(columns_str)
34
+ self.vector_db.execute("""
35
+ INSERT INTO vector_store.embeddings (org_id, content, embedding, entity_type)
36
+ VALUES (?, ?, ?, ?)
37
+ """, [org_id, columns_str, embedding, result["entity_type"]])
38
+
39
+ return result
40
+
41
+ def generate_sql(self, org_id: str, question: str, entity_type: str, schema: dict) -> str:
42
+ """Generate SQL for specific org"""
43
+ prompt = f"Org: {org_id}\nSchema: {json.dumps(schema)}\nEntity: {entity_type}\nQuestion: {question}\nDuckDB SQL only:"
44
+ sql = self.llm.generate(prompt, max_tokens=300)
45
+ return sql.strip()
46
+
47
+ def similarity_search(self, org_id: str, query: str, entity_type: str, top_k: int = 5) -> list[dict]:
48
+ """Search within org's vector history"""
49
+ query_vector = self.embedder.generate(query)
50
+
51
+ results = self.vector_db.execute("""
52
+ SELECT id, content, entity_type, array_cosine_similarity(embedding, ?::FLOAT[384]) as score
53
+ FROM vector_store.embeddings
54
+ WHERE org_id = ? AND entity_type = ?
55
+ ORDER BY score DESC
56
+ LIMIT ?
57
+ """, [query_vector, org_id, entity_type, top_k]).fetchall()
58
+
59
+ return [{"id": r[0], "content": r[1], "entity_type": r[2], "score": r[3]} for r in results]
60
+
61
+ ai_service = AIService()
app/service/embedding_service.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/service/embedding_service.py
2
+ import requests
3
+ from app.deps import HF_API_TOKEN
4
+
5
+ class EmbeddingService:
6
+ def __init__(self):
7
+ self.api_url = "https://api-inference.huggingface.co/pipeline/feature-extraction/sentence-transformers/all-MiniLM-L6-v2"
8
+ self.headers = {"Authorization": f"Bearer {HF_API_TOKEN}"}
9
+
10
+ def generate(self, text: str) -> list[float]:
11
+ """Generate embedding - uses HF free tier (10k/day)"""
12
+ try:
13
+ response = requests.post(
14
+ self.api_url,
15
+ headers=self.headers,
16
+ json={"inputs": text, "options": {"wait_for_model": True}},
17
+ timeout=30
18
+ )
19
+ response.raise_for_status()
20
+ return response.json()
21
+ except Exception as e:
22
+ # Fallback to local if API fails
23
+ print(f"HF API failed, using local fallback: {e}")
24
+ return self._local_fallback(text)
25
+
26
+ def _local_fallback(self, text: str) -> list[float]:
27
+ """Local embedding generation (slower but reliable)"""
28
+ from sentence_transformers import SentenceTransformer
29
+ model = SentenceTransformer('all-MiniLM-L6-v2')
30
+ return model.encode(text).tolist()
31
+
32
+ embedder = EmbeddingService()
app/service/llm_service.py ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/service/llm_service.py
2
+ import torch
3
+ from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
4
+ from app.deps import HF_API_TOKEN
5
+
6
+ class LocalLLMService:
7
+ def __init__(self):
8
+ # FREE, permissive license, fits in T4 GPU
9
+ self.model_id = "mistralai/Mistral-7B-Instruct-v0.3"
10
+
11
+ self.tokenizer = AutoTokenizer.from_pretrained(
12
+ self.model_id,
13
+ token=HF_API_TOKEN,
14
+ trust_remote_code=True
15
+ )
16
+ self.tokenizer.pad_token = self.tokenizer.eos_token
17
+
18
+ # Load to GPU automatically
19
+ self.model = AutoModelForCausalLM.from_pretrained(
20
+ self.model_id,
21
+ token=HF_API_TOKEN,
22
+ torch_dtype=torch.float16,
23
+ device_map="auto"
24
+ )
25
+
26
+ self.pipe = pipeline(
27
+ "text-generation",
28
+ model=self.model,
29
+ tokenizer=self.tokenizer,
30
+ device_map="auto"
31
+ )
32
+
33
+ def generate(self, prompt: str, max_tokens: int = 500, temperature: float = 0.3) -> str:
34
+ """Generate text using local model"""
35
+ messages = [
36
+ {"role": "system", "content": "You are a data analytics assistant. Respond with valid JSON only."},
37
+ {"role": "user", "content": prompt}
38
+ ]
39
+
40
+ formatted_prompt = self.tokenizer.apply_chat_template(
41
+ messages,
42
+ tokenize=False,
43
+ add_generation_prompt=True
44
+ )
45
+
46
+ outputs = self.pipe(
47
+ formatted_prompt,
48
+ max_new_tokens=max_tokens,
49
+ temperature=temperature,
50
+ do_sample=True
51
+ )
52
+
53
+ # Extract response after [/INST]
54
+ response = outputs[0]["generated_text"]
55
+ if "[/INST]" in response:
56
+ return response.split("[/INST]")[-1].strip()
57
+ return response.strip()
58
+
59
+ # Singleton instance
60
+ llm_service = LocalLLMService()
app/tasks/worker.py ADDED
@@ -0,0 +1,145 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/tasks/worker.py – ENTERPRISE GRADE
2
+ import json
3
+ import time
4
+ import signal
5
+ import sys
6
+ import traceback
7
+ from typing import Dict, Any, Callable
8
+ from app.redis_client import redis
9
+ from app.service.ai_service import ai_service
10
+ from app.deps import get_duckdb
11
+
12
+ # ── Graceful Shutdown ──────────────────────────────────────────────────────────
13
+ def shutdown(signum, frame):
14
+ print("\nπŸ›‘ Worker shutting down gracefully...")
15
+ sys.exit(0)
16
+
17
+ signal.signal(signal.SIGINT, shutdown)
18
+ signal.signal(signal.SIGTERM, shutdown)
19
+
20
+ # ── Task Handler Registry ─────────────────────────────────────────────────────
21
+ # All handlers MUST accept org_id as first argument
22
+ TASK_HANDLERS: Dict[str, Callable] = {
23
+ "detect_entity_type": lambda org_id, **args: ai_service.detect_entity_type(org_id, **args),
24
+ "generate_sql": lambda org_id, **args: ai_service.generate_sql(org_id, **args),
25
+ "generate_insights": lambda org_id, **args: ai_service.generate_insights(org_id, **args),
26
+ "similarity_search": lambda org_id, **args: ai_service.similarity_search(org_id, **args),
27
+
28
+ # Mapper integration
29
+ "canonify_df": lambda org_id, **args: canonify_df_with_entity(org_id, **args),
30
+ "execute_sql": lambda org_id, **args: execute_org_sql(org_id, **args),
31
+ }
32
+
33
+ # ── Wrapper for Legacy Functions ──────────────────────────────────────────────
34
+ def canonify_df_with_entity(org_id: str, filename: str, hours_window: int = 24):
35
+ """Bridge to your existing mapper.canoify_df"""
36
+ from app.mapper import canonify_df
37
+ # This now uses hybrid detection internally
38
+ return canonify_df(org_id, filename, hours_window)
39
+
40
+ def execute_org_sql(org_id: str, sql: str):
41
+ """Execute SQL for specific org with safety limits"""
42
+ conn = get_duckdb(org_id)
43
+
44
+ # Security: Only allow SELECT queries
45
+ safe_sql = sql.strip().upper()
46
+ if not safe_sql.startswith("SELECT"):
47
+ raise ValueError("πŸ”’ Only SELECT queries allowed")
48
+
49
+ # Add LIMIT 10000 if not present to prevent overload
50
+ if "LIMIT" not in safe_sql:
51
+ safe_sql += " LIMIT 10000"
52
+
53
+ return conn.execute(safe_sql).fetchall()
54
+
55
+ # ── Task Processing ────────────────────────────────────────────────────────────
56
+ def process_task(task_data: Dict[str, Any]):
57
+ """Process a single task with full error handling and logging"""
58
+ task_id = task_data.get("id")
59
+ function_name = task_data.get("function")
60
+ args = task_data.get("args", {})
61
+
62
+ # ── Validation ─────────────────────────────────────────────────────────────
63
+ if not task_id or not function_name:
64
+ raise ValueError("❌ Invalid task: missing id or function")
65
+
66
+ if "org_id" not in args:
67
+ raise ValueError(f"❌ Task {task_id} missing required org_id")
68
+
69
+ org_id = args["org_id"]
70
+
71
+ # ── Handler Execution ──────────────────────────────────────────────────────
72
+ start_time = time.time()
73
+ print(f"πŸ”΅ [{org_id}] Processing {function_name} (task: {task_id})")
74
+
75
+ try:
76
+ handler = TASK_HANDLERS.get(function_name)
77
+ if not handler:
78
+ raise ValueError(f"Unknown function: {function_name}")
79
+
80
+ # Execute handler (org_id is passed explicitly, rest via **args)
81
+ result = handler(org_id, **args)
82
+
83
+ # ── Success Response ───────────────────────────────────────────────────
84
+ duration = time.time() - start_time
85
+ print(f"βœ… [{org_id}] {function_name} completed in {duration:.2f}s")
86
+
87
+ redis.setex(
88
+ f"python:response:{task_id}",
89
+ 3600, # 1 hour TTL
90
+ json.dumps({
91
+ "status": "success",
92
+ "org_id": org_id,
93
+ "function": function_name,
94
+ "data": result,
95
+ "duration": duration
96
+ })
97
+ )
98
+
99
+ except Exception as e:
100
+ # ── Error Response ─────────────────────────────────────────────────────
101
+ duration = time.time() - start_time
102
+ error_msg = f"{type(e).__name__}: {str(e)}"
103
+ print(f"❌ [{org_id}] {function_name} failed after {duration:.2f}s: {error_msg}")
104
+ print(traceback.format_exc()) # Full stack trace for debugging
105
+
106
+ redis.setex(
107
+ f"python:response:{task_id}",
108
+ 3600,
109
+ json.dumps({
110
+ "status": "error",
111
+ "org_id": org_id,
112
+ "function": function_name,
113
+ "message": error_msg,
114
+ "duration": duration
115
+ })
116
+ )
117
+
118
+ # ── Main Worker Loop ───────────────────────────────────────────────────────────
119
+ if __name__ == "__main__":
120
+ print("πŸš€ Python worker listening on Redis queue...")
121
+ print("Press Ctrl+C to stop")
122
+
123
+ while True:
124
+ try:
125
+ # Blocking pop with timeout (0 = infinite wait)
126
+ _, task_json = redis.brpop("python:task_queue", timeout=0)
127
+
128
+ # Deserialize with error handling
129
+ try:
130
+ task_data = json.loads(task_json)
131
+ except json.JSONDecodeError as e:
132
+ print(f"❌ Malformed task JSON: {e}")
133
+ continue
134
+
135
+ # Process task
136
+ process_task(task_data)
137
+
138
+ except KeyboardInterrupt:
139
+ print("\nShutting down...")
140
+ break
141
+ except Exception as e:
142
+ # Worker-level error (Redis connection, etc.)
143
+ print(f"πŸ”΄ Worker error: {e}")
144
+ traceback.print_exc()
145
+ time.sleep(5) # Longer cooldown for worker errors
fly.toml DELETED
@@ -1,23 +0,0 @@
1
- # fly.toml app configuration file generated for mutsynchub on 2025-11-06T14:44:31Z
2
- #
3
- # See https://fly.io/docs/reference/configuration/ for information about how to use this file.
4
- #
5
-
6
- app = 'mutsynchub'
7
- primary_region = 'iad'
8
-
9
- [build]
10
-
11
- [http_service]
12
- internal_port = 8080
13
- force_https = true
14
- auto_stop_machines = 'stop'
15
- auto_start_machines = true
16
- min_machines_running = 0
17
- processes = ['app']
18
-
19
- [[vm]]
20
- memory = '1gb'
21
- cpu_kind = 'shared'
22
- cpus = 1
23
- memory_mb = 1024