shaliz-kong commited on
Commit
ba09259
Β·
1 Parent(s): 00f9956

agragated all logind in heal th api for prometheus

Browse files
app/deps.py CHANGED
@@ -131,7 +131,7 @@ def get_duckdb(org_id: str) -> duckdb.DuckDBPyConnection:
131
 
132
  try:
133
  conn = duckdb.connect(str(db_file), read_only=False)
134
-
135
  # Enable VSS
136
  conn.execute("INSTALL vss;")
137
  conn.execute("LOAD vss;")
@@ -313,25 +313,72 @@ _qstash_client = None
313
  _qstash_verifier = None
314
 
315
  def get_qstash_client():
316
- """Singleton QStash client (unchanged)"""
 
 
 
 
 
317
  global _qstash_client
318
- if _qstash_client is None and QSTASH_TOKEN:
 
 
 
 
 
 
 
 
319
  from upstash_qstash import Client
320
- _qstash_client = Client(token=QSTASH_TOKEN)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
321
  return _qstash_client
322
 
323
  def get_qstash_verifier():
324
- """Singleton QStash verifier (unchanged)"""
 
 
 
 
325
  global _qstash_verifier
326
- if _qstash_verifier is None:
327
- current = os.getenv("QSTASH_CURRENT_SIGNING_KEY")
328
- next_key = os.getenv("QSTASH_NEXT_SIGNING_KEY")
329
- if current and next_key:
330
- from upstash_qstash import Receiver
331
- _qstash_verifier = Receiver({
332
- "current_signing_key": current,
333
- "next_signing_key": next_key
334
- })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
335
  return _qstash_verifier
336
 
337
 
 
131
 
132
  try:
133
  conn = duckdb.connect(str(db_file), read_only=False)
134
+ conn.execute("SET hnsw_enable_experimental_persistence = true")
135
  # Enable VSS
136
  conn.execute("INSTALL vss;")
137
  conn.execute("LOAD vss;")
 
313
  _qstash_verifier = None
314
 
315
  def get_qstash_client():
316
+ """Singleton QStash client.
317
+
318
+ This is optional. If the `QSTASH_TOKEN` environment variable is not set
319
+ or the `upstash_qstash` package is not installed, this function will
320
+ return `None` and log a warning/info rather than raising an ImportError.
321
+ """
322
  global _qstash_client
323
+ if _qstash_client is not None:
324
+ return _qstash_client
325
+
326
+ token = os.getenv("QSTASH_TOKEN")
327
+ if not token:
328
+ logger.info("QStash token not configured; skipping QStash client initialization")
329
+ return None
330
+
331
+ try:
332
  from upstash_qstash import Client
333
+ except Exception as e:
334
+ logger.warning("upstash_qstash package not installed; QStash disabled: %s", e)
335
+ return None
336
+
337
+ try:
338
+ qstash_url = os.getenv("QSTASH_URL")
339
+ if qstash_url:
340
+ _qstash_client = Client(token=token, url=qstash_url)
341
+ else:
342
+ _qstash_client = Client(token=token)
343
+ logger.info("βœ… QStash client initialized")
344
+ except Exception as e:
345
+ logger.warning(f"Failed to initialize QStash client: {e}")
346
+ _qstash_client = None
347
+
348
  return _qstash_client
349
 
350
  def get_qstash_verifier():
351
+ """Singleton QStash verifier.
352
+
353
+ Safe to call even if `upstash_qstash` is not installed or signing keys
354
+ are not configured. Returns `None` when verifier cannot be created.
355
+ """
356
  global _qstash_verifier
357
+ if _qstash_verifier is not None:
358
+ return _qstash_verifier
359
+
360
+ current = os.getenv("QSTASH_CURRENT_SIGNING_KEY")
361
+ next_key = os.getenv("QSTASH_NEXT_SIGNING_KEY")
362
+ if not (current and next_key):
363
+ logger.info("QStash signing keys not configured; skipping verifier initialization")
364
+ return None
365
+
366
+ try:
367
+ from upstash_qstash import Receiver
368
+ except Exception as e:
369
+ logger.warning("upstash_qstash package not installed; cannot create QStash verifier: %s", e)
370
+ return None
371
+
372
+ try:
373
+ _qstash_verifier = Receiver({
374
+ "current_signing_key": current,
375
+ "next_signing_key": next_key
376
+ })
377
+ logger.info("βœ… QStash verifier initialized")
378
+ except Exception as e:
379
+ logger.warning(f"Failed to initialize QStash verifier: {e}")
380
+ _qstash_verifier = None
381
+
382
  return _qstash_verifier
383
 
384
 
app/mapper.py CHANGED
@@ -25,7 +25,7 @@ from app.db import get_conn, ensure_raw_table, transactional_conn, ensure_schema
25
  from app.hybrid_entity_detector import hybrid_detect_entity_type
26
  from app.core.event_hub import event_hub
27
  from app.deps import get_sre_metrics
28
-
29
  # Prometheus metrics (free tier compatible)
30
  try:
31
  from prometheus_client import Counter, Histogram, Gauge
@@ -596,7 +596,7 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
596
  Added: SRE metrics, structured logging, pub/sub events
597
  """
598
  start_time = time.time()
599
- logger.info(f"\n[MAPPER] πŸš€ Starting pipeline for {org_id}/{source_id}")
600
 
601
  # Load aliases
602
  load_dynamic_aliases()
@@ -615,7 +615,7 @@ def canonify_df(org_id: str, source_id: str, hours_window: int = 24) -> tuple[pd
615
  ORDER BY ingested_at DESC
616
  """, (cutoff_time,)).fetchall()
617
  except Exception as e:
618
- logger.error(f"[MAPPER] ❌ SQL read error: {e}")
619
  return pd.DataFrame(), "unknown", 0.0
620
 
621
  if not rows:
 
25
  from app.hybrid_entity_detector import hybrid_detect_entity_type
26
  from app.core.event_hub import event_hub
27
  from app.deps import get_sre_metrics
28
+ from app.routers.health import emit_mapper_log
29
  # Prometheus metrics (free tier compatible)
30
  try:
31
  from prometheus_client import Counter, Histogram, Gauge
 
596
  Added: SRE metrics, structured logging, pub/sub events
597
  """
598
  start_time = time.time()
599
+ emit_mapper_log("info", f"πŸš€ Starting pipeline for {org_id}/{source_id}")
600
 
601
  # Load aliases
602
  load_dynamic_aliases()
 
615
  ORDER BY ingested_at DESC
616
  """, (cutoff_time,)).fetchall()
617
  except Exception as e:
618
+ emit_mapper_log("error", f"❌ SQL read error: {e}", error=str(e))
619
  return pd.DataFrame(), "unknown", 0.0
620
 
621
  if not rows:
app/routers/health.py CHANGED
@@ -1,190 +1,428 @@
1
  """
2
- app/routers/health.py – ENTERPRISE HEALTH & MONITORING
3
- ======================================================
4
- Provides Kubernetes-compatible probes, service status checks, and per-tenant
5
- database metrics for capacity planning and alerting.
 
 
 
 
 
 
6
  """
7
 
8
- from fastapi import APIRouter, HTTPException, Depends, Path
9
- from app.deps import check_all_services, get_redis, get_vector_db, get_duckdb
10
- from app.db import get_db_stats # Import our new stats function
11
  import os
12
  import time
13
- from app.service.llm_service import LocalLLMService
14
- from typing import Dict, Any
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
 
 
 
 
 
 
 
 
16
  router = APIRouter(tags=["health"])
17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
 
19
- @router.get("/llm-status")
20
- async def llm_status():
21
- """Check if LLM is ready"""
22
- return {
23
- "loaded": llm_service.is_loaded,
24
- "loading": llm_service._is_loading,
25
- "error": llm_service.load_error,
26
- "model": llm_service.model_id
27
- }
 
 
28
 
 
 
29
 
30
- @router.get("/health")
31
- def health_check():
32
- """
33
- Basic health check for load balancers.
34
- Returns 200 if service is alive.
35
- """
36
- return {"status": "ok", "service": "analytics-engine"}
37
 
 
 
38
 
39
- @router.get("/health/detailed")
40
- def health_detailed():
41
- """
42
- Comprehensive health check for all services.
43
- Returns detailed status of each component.
44
- """
 
 
 
 
 
 
 
 
45
  start_time = time.time()
46
- statuses = check_all_services()
47
 
48
- # Determine overall health
49
- all_healthy = all("βœ…" in str(status) for status in statuses.values())
50
- http_status = 200 if all_healthy else 503
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51
 
52
  return {
53
- "status": "healthy" if all_healthy else "unhealthy",
54
- "services": statuses,
55
- "environment": "production" if os.getenv("SPACE_ID") else "development",
56
  "uptime_seconds": time.time() - start_time,
57
- "timestamp": time.time()
 
 
 
 
 
 
 
 
 
 
 
 
58
  }
59
 
 
60
 
61
- @router.get("/health/ready")
62
- def health_ready():
 
 
 
 
 
63
  """
64
- Kubernetes-style readiness probe.
65
- Returns 200 if ready to serve traffic.
 
 
 
66
  """
67
- try:
68
- # Quick smoke test: Can we connect to core services?
69
- redis = get_redis()
70
- redis.ping()
71
-
72
- # Test DuckDB with a dummy org
73
- conn = get_duckdb("health_check")
74
- conn.execute("SELECT 1")
 
 
 
 
 
75
 
76
- return {"status": "ready"}
77
- except Exception as e:
78
- raise HTTPException(
79
- status_code=503,
80
- detail=f"Not ready: {str(e)}"
81
  )
 
 
 
 
 
 
 
 
 
 
 
82
 
 
83
 
84
- @router.get("/health/live")
85
- def health_live():
86
- """
87
- Kubernetes-style liveness probe.
88
- Returns 200 if service is alive (doesn't check dependencies).
89
- """
90
- return {"status": "alive"}
91
-
92
-
93
- @router.post("/health/reload")
94
- def health_reload(_: str = Depends(check_all_services)):
95
- """
96
- Trigger reload of services (if needed).
97
- Requires API key for security.
98
- """
99
- # Clear cached connections
100
- from app.deps import _org_db_connections, _vector_db_conn, _redis_client
101
 
102
- _org_db_connections.clear()
103
- _vector_db_conn = None
104
- _redis_client = None
 
 
 
 
 
 
 
 
 
105
 
106
- return {"status": "reloaded", "message": "Connections cleared"}
 
 
 
 
 
 
 
 
 
 
 
 
107
 
 
108
 
109
- @router.get("/health/metrics")
110
- def health_metrics():
111
- """
112
- Performance metrics for monitoring.
113
- """
114
  try:
115
- import psutil
 
 
 
 
116
 
117
  return {
118
- "cpu_percent": psutil.cpu_percent(),
119
- "memory_mb": psutil.virtual_memory().used // (1024 * 1024),
120
- "disk_gb": psutil.disk_usage("/").free // (1024**3),
121
- "connections": len(_org_db_connections) if '_org_db_connections' in globals() else 0
 
 
 
 
 
 
122
  }
123
- except ImportError:
124
- return {"error": "psutil not installed"}
125
-
126
 
127
- @router.get("/health/db-stats/{org_id}")
128
- def health_db_stats(org_id: str = Path(..., description="Organization ID")):
129
- """
130
- Per-tenant database statistics for capacity planning.
131
-
132
- Returns:
133
- - DB size in GB (with quota status)
134
- - Row counts per table
135
- - Total rows ingested
136
- - Schema version info
137
 
138
- Use this for:
139
- - Monitoring tenant growth
140
- - Alerting on quota breaches
141
- - Capacity planning ahead of limits
142
- """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
143
  try:
144
- stats = get_db_stats(org_id)
 
145
 
146
- # Enrich with quota status
147
- quota_status = "ok"
148
- if stats["db_size_gb"] > MAX_DB_SIZE_GB * 0.8:
149
- quota_status = "warning"
150
- if stats["db_size_gb"] > MAX_DB_SIZE_GB:
151
- quota_status = "critical"
152
 
153
- # Add schema versions
154
- with get_conn(org_id) as conn:
155
- schema_versions = conn.execute("""
156
- SELECT version_id, table_name, status, created_at
157
- FROM main.schema_versions
158
- ORDER BY version_id DESC
159
- LIMIT 5
160
- """).fetchall()
161
-
162
- versions = [
163
- {
164
- "version_id": r[0],
165
- "table_name": r[1],
166
- "status": r[2],
167
- "created_at": r[3].isoformat() if r[3] else None
168
- }
169
- for r in schema_versions
170
- ]
 
171
 
172
  return {
173
- "org_id": org_id,
174
- "storage": {
175
- "size_gb": round(stats["db_size_gb"], 2),
176
- "quota_gb": MAX_DB_SIZE_GB,
177
- "status": quota_status,
178
- "percent_used": round((stats["db_size_gb"] / MAX_DB_SIZE_GB) * 100, 1)
179
- },
180
- "tables": stats["table_counts"],
181
- "total_rows": stats["total_rows"],
182
- "recent_schema_versions": versions,
183
- "timestamp": time.time()
184
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
 
 
186
  except Exception as e:
187
- raise HTTPException(
188
- status_code=500,
189
- detail=f"Failed to retrieve DB stats: {str(e)}"
190
- )
 
1
  """
2
+ app/routers/health.py – SRE LOG AGGREGATION HUB
3
+ ===============================================
4
+ Central observability endpoint aggregating logs from all refactored services:
5
+ - Analytics Worker
6
+ - Vector Service
7
+ - LLM Service
8
+ - Mapper/Detector
9
+ - Database Connections
10
+
11
+ Provides real-time logs, error rates, and service-specific diagnostics.
12
  """
13
 
14
+ from fastapi import APIRouter, HTTPException, Depends, Query, Path
15
+ from typing import Dict, Any, List, Optional
 
16
  import os
17
  import time
18
+ import json
19
+ import logging
20
+ import threading
21
+ import asyncio
22
+ import torch
23
+ import datetime
24
+ import timedelta
25
+ from app.deps import (
26
+ check_all_services, get_redis, get_vector_db, get_duckdb,
27
+ get_sre_metrics, HF_API_TOKEN, close_all_connections
28
+ )
29
+ from app.db import get_db_stats
30
+ from app.service.llm_service import LocalLLMService, get_llm_service
31
+ from app.tasks.analytics_worker import get_worker_manager
32
+ from app.service.vector_service import VectorService
33
+ from app.mapper import health_check_mapper, MapperMetrics
34
+ from app.core.event_hub import StreamingResponse, Response
35
+
36
+ # Prometheus aggregation
37
+ try:
38
+ from prometheus_client import generate_latest, CollectorRegistry, CONTENT_TYPE_LATEST, Gauge
39
+ except ImportError:
40
+ CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
41
+ Gauge = None
42
+
43
+ logger = logging.getLogger(__name__)
44
+ from app.mapper import health_check_mapper, MapperMetrics
45
 
46
+ # Prometheus aggregation
47
+ try:
48
+ from prometheus_client import generate_latest, CollectorRegistry, CONTENT_TYPE_LATEST
49
+ except ImportError:
50
+ CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
51
+
52
+ logger = logging.getLogger(__name__)
53
  router = APIRouter(tags=["health"])
54
 
55
+ # Global log aggregator (in-memory ring buffer for recent logs)
56
+ class LogAggregator:
57
+ """Ring buffer storing last 1000 logs from all services"""
58
+ def __init__(self, max_size: int = 1000):
59
+ self.max_size = max_size
60
+ self.buffer: List[Dict[str, Any]] = []
61
+ self.lock = threading.Lock()
62
+
63
+ def emit(self, service: str, level: str, message: str, **kwargs):
64
+ """Add a log entry from any service"""
65
+ with self.lock:
66
+ entry = {
67
+ "timestamp": datetime.utcnow().isoformat(),
68
+ "service": service,
69
+ "level": level,
70
+ "message": message,
71
+ **kwargs
72
+ }
73
+ self.buffer.append(entry)
74
+ if len(self.buffer) > self.max_size:
75
+ self.buffer.pop(0)
76
+
77
+ def get_logs(self, service: Optional[str] = None, level: Optional[str] = None, limit: int = 100) -> List[Dict]:
78
+ """Retrieve filtered logs"""
79
+ with self.lock:
80
+ filtered = [
81
+ log for log in self.buffer
82
+ if (not service or log["service"] == service)
83
+ and (not level or log["level"] == level)
84
+ ]
85
+ return filtered[-limit:]
86
 
87
+ def get_error_rate(self, service: str, window_minutes: int = 5) -> float:
88
+ """Calculate error rate for a service"""
89
+ cutoff = datetime.utcnow() - timedelta(minutes=window_minutes)
90
+ recent = [
91
+ log for log in self.buffer
92
+ if log["service"] == service and log["timestamp"] >= cutoff.isoformat()
93
+ ]
94
+ if not recent:
95
+ return 0.0
96
+ errors = [log for log in recent if log["level"] in ("error", "critical")]
97
+ return len(errors) / len(recent)
98
 
99
+ # Global aggregator instance
100
+ log_aggregator = LogAggregator(max_size=1000)
101
 
102
+ # Service-specific log emitters (to be imported by each service)
103
+ def emit_worker_log(level: str, message: str, **kwargs):
104
+ log_aggregator.emit("analytics_worker", level, message, **kwargs)
 
 
 
 
105
 
106
+ def emit_vector_log(level: str, message: str, **kwargs):
107
+ log_aggregator.emit("vector_service", level, message, **kwargs)
108
 
109
+ def emit_llm_log(level: str, message: str, **kwargs):
110
+ log_aggregator.emit("llm_service", level, message, **kwargs)
111
+
112
+ def emit_mapper_log(level: str, message: str, **kwargs):
113
+ log_aggregator.emit("mapper", level, message, **kwargs)
114
+
115
+ def emit_deps_log(level: str, message: str, **kwargs):
116
+ log_aggregator.emit("dependencies", level, message, **kwargs)
117
+
118
+ # ---------------------- SRE: Unified Health Endpoint ---------------------- #
119
+
120
+ @router.get("/health")
121
+ async def health_check():
122
+ """Aggregated health status from all services"""
123
  start_time = time.time()
 
124
 
125
+ # Check all core services
126
+ service_status = check_all_services()
127
+
128
+ # Check worker manager health
129
+ try:
130
+ manager = await get_worker_manager()
131
+ worker_metrics = manager.get_metrics()
132
+ worker_healthy = len(worker_metrics.get("active_workers", [])) < 50 # Arbitrary threshold
133
+ except Exception as e:
134
+ worker_healthy = False
135
+ service_status["worker_manager"] = f"❌ {e}"
136
+
137
+ # Check LLM service
138
+ try:
139
+ llm = get_llm_service()
140
+ llm_health = llm.health_check()
141
+ llm_healthy = llm_health["status"] == "healthy"
142
+ except Exception as e:
143
+ llm_healthy = False
144
+ service_status["llm_service"] = f"❌ {e}"
145
+
146
+ # Check mapper cache health
147
+ try:
148
+ mapper_health = health_check_mapper()
149
+ mapper_healthy = mapper_health["status"] == "healthy"
150
+ except Exception as e:
151
+ mapper_healthy = False
152
+ service_status["mapper"] = f"❌ {e}"
153
+
154
+ # Overall health determination
155
+ all_healthy = (
156
+ all("βœ…" in str(v) for v in service_status.values()) and
157
+ worker_healthy and llm_healthy and mapper_healthy
158
+ )
159
+
160
+ # Emit aggregated health log
161
+ log_aggregator.emit(
162
+ "health_router", "info" if all_healthy else "error",
163
+ "Health check completed",
164
+ all_healthy=all_healthy,
165
+ services_checked=len(service_status),
166
+ duration_ms=(time.time() - start_time) * 1000
167
+ )
168
 
169
  return {
170
+ "status": "healthy" if all_healthy else "degraded",
171
+ "timestamp": datetime.utcnow().isoformat(),
 
172
  "uptime_seconds": time.time() - start_time,
173
+ "environment": "production" if os.getenv("SPACE_ID") else "development",
174
+ "services": {
175
+ **service_status,
176
+ "worker_manager": "βœ… healthy" if worker_healthy else "❌ unhealthy",
177
+ "llm_service": "βœ… healthy" if llm_healthy else "❌ unhealthy",
178
+ "mapper": "βœ… healthy" if mapper_healthy else "❌ unhealthy"
179
+ },
180
+ "sre_metrics": get_sre_metrics(),
181
+ "_links": {
182
+ "logs": "/health/logs",
183
+ "metrics": "/health/metrics",
184
+ "status": "/health/status"
185
+ }
186
  }
187
 
188
+ # ---------------------- SRE: Real-Time Log Streaming ---------------------- #
189
 
190
+ @router.get("/health/logs")
191
+ async def get_service_logs(
192
+ service: Optional[str] = Query(None, description="Filter by service (analytics_worker, vector_service, llm_service, mapper, dependencies)"),
193
+ level: Optional[str] = Query(None, description="Filter by level (info, warning, error, critical)"),
194
+ limit: int = Query(100, ge=1, le=1000, description="Number of logs to return"),
195
+ tail: bool = Query(False, description="Stream logs in real-time (SSE)")
196
+ ):
197
  """
198
+ Retrieve recent logs from all services or filter by service/level.
199
+
200
+ Examples:
201
+ - GET /health/logs?service=vector_service&level=error
202
+ - GET /health/logs?service=analytics_worker&tail=true (SSE stream)
203
  """
204
+ if tail:
205
+ # SSE streaming of logs
206
+ async def log_stream():
207
+ last_count = len(log_aggregator.buffer)
208
+ while True:
209
+ current_count = len(log_aggregator.buffer)
210
+ if current_count > last_count:
211
+ new_logs = log_aggregator.buffer[last_count:]
212
+ for log in new_logs:
213
+ if (not service or log["service"] == service) and (not level or log["level"] == level):
214
+ yield f"data: {json.dumps(log)}\n\n"
215
+ last_count = current_count
216
+ await asyncio.sleep(0.5)
217
 
218
+ return StreamingResponse(
219
+ log_stream(),
220
+ media_type="text/event-stream",
221
+ headers={"Cache-Control": "no-cache"}
 
222
  )
223
+
224
+ # Return historical logs
225
+ logs = log_aggregator.get_logs(service=service, level=level, limit=limit)
226
+
227
+ return {
228
+ "status": "success",
229
+ "logs": logs,
230
+ "total": len(logs),
231
+ "service": service or "all",
232
+ "level": level or "all"
233
+ }
234
 
235
+ # ---------------------- SRE: Error Rate Tracking ---------------------- #
236
 
237
+ @router.get("/health/error-rates")
238
+ async def get_error_rates(
239
+ window_minutes: int = Query(5, ge=1, le=60, description="Time window in minutes")
240
+ ):
241
+ """Get error rates for all services over the specified time window"""
242
+ services = ["analytics_worker", "vector_service", "llm_service", "mapper", "dependencies"]
 
 
 
 
 
 
 
 
 
 
 
243
 
244
+ rates = {}
245
+ for service in services:
246
+ rates[service] = {
247
+ "error_rate": log_aggregator.get_error_rate(service, window_minutes),
248
+ "window_minutes": window_minutes
249
+ }
250
+
251
+ # Overall system error rate
252
+ total_logs = sum(len([log for log in log_aggregator.buffer if log["timestamp"] >= (datetime.utcnow() - timedelta(minutes=window_minutes)).isoformat()]) for _ in services)
253
+ total_errors = sum(len([log for log in log_aggregator.buffer if log["level"] in ("error", "critical") and log["timestamp"] >= (datetime.utcnow() - timedelta(minutes=window_minutes)).isoformat()]) for _ in services)
254
+
255
+ overall_rate = total_errors / total_logs if total_logs > 0 else 0.0
256
 
257
+ # Alert if error rate is high
258
+ alert = overall_rate > 0.1 # 10% error rate threshold
259
+
260
+ if alert:
261
+ log_aggregator.emit("health_router", "error", "High system error rate detected", rate=overall_rate)
262
+
263
+ return {
264
+ "status": "healthy" if not alert else "alerting",
265
+ "overall_error_rate": round(overall_rate, 4),
266
+ "service_rates": rates,
267
+ "window_minutes": window_minutes,
268
+ "alert": alert
269
+ }
270
 
271
+ # ---------------------- SRE: Service-Specific Health ---------------------- #
272
 
273
+ @router.get("/health/workers")
274
+ async def health_workers():
275
+ """Analytics worker health and metrics"""
 
 
276
  try:
277
+ manager = await get_worker_manager()
278
+ metrics = manager.get_metrics()
279
+
280
+ # Get recent worker logs
281
+ worker_logs = log_aggregator.get_logs(service="analytics_worker", limit=50)
282
 
283
  return {
284
+ "status": "healthy" if metrics.get("workers_failed", 0) < 10 else "degraded",
285
+ "active_workers": metrics.get("active_workers", 0),
286
+ "triggers_processed": metrics.get("triggers_processed", 0),
287
+ "workers_failed": metrics.get("workers_failed", 0),
288
+ "total_latency_ms": metrics.get("total_latency_ms", 0),
289
+ "recent_logs": worker_logs,
290
+ "_links": {
291
+ "logs": "/health/logs?service=analytics_worker",
292
+ "stream": "/api/v1/analytics/stream/sse"
293
+ }
294
  }
295
+ except Exception as e:
296
+ return {"status": "error", "error": str(e)}
 
297
 
298
+ @router.get("/health/vectors")
299
+ async def health_vectors():
300
+ """Vector service health and metrics"""
301
+ try:
302
+ # Create a dummy vector service to check health
303
+ vector_service = VectorService(org_id="health_check")
 
 
 
 
304
 
305
+ # Get recent vector logs
306
+ vector_logs = log_aggregator.get_logs(service="vector_service", limit=50)
307
+
308
+ return {
309
+ "status": "healthy",
310
+ "model_cached": len(vector_service._global_model_cache) > 0,
311
+ "redis_type": "tcp" if hasattr(vector_service.vector_conn, 'pubsub') else "upstash",
312
+ "recent_logs": vector_logs,
313
+ "circuit_breaker": vector_service._check_circuit_breaker(),
314
+ "_links": {
315
+ "logs": "/health/logs?service=vector_service",
316
+ "metrics": "/health/metrics/vector"
317
+ }
318
+ }
319
+ except Exception as e:
320
+ return {"status": "error", "error": str(e)}
321
+
322
+ @router.get("/health/llm")
323
+ async def health_llm():
324
+ """LLM service health and metrics"""
325
  try:
326
+ llm_service = get_llm_service()
327
+ health = llm_service.health_check()
328
 
329
+ # Get recent LLM logs
330
+ llm_logs = log_aggregator.get_logs(service="llm_service", limit=50)
 
 
 
 
331
 
332
+ return {
333
+ **health,
334
+ "recent_logs": llm_logs,
335
+ "_links": {
336
+ "logs": "/health/logs?service=llm_service",
337
+ "generate": "/api/v1/generate"
338
+ }
339
+ }
340
+ except Exception as e:
341
+ return {"status": "error", "error": str(e)}
342
+
343
+ @router.get("/health/mapper")
344
+ async def health_mapper():
345
+ """Mapper service health and metrics"""
346
+ try:
347
+ mapper_health = health_check_mapper()
348
+
349
+ # Get recent mapper logs
350
+ mapper_logs = log_aggregator.get_logs(service="mapper", limit=50)
351
 
352
  return {
353
+ **mapper_health,
354
+ "recent_logs": mapper_logs,
355
+ "_links": {
356
+ "logs": "/health/logs?service=mapper",
357
+ "canonical_columns": len(mapper_health.get("canonical_columns", []))
358
+ }
 
 
 
 
 
359
  }
360
+ except Exception as e:
361
+ return {"status": "error", "error": str(e)}
362
+
363
+ # ---------------------- SRE: Prometheus Metrics ---------------------- #
364
+
365
+ @router.get("/health/metrics")
366
+ async def get_prometheus_metrics():
367
+ """
368
+ Return aggregated Prometheus metrics from all services
369
+ Compatible with Prometheus scraping
370
+ """
371
+ registry = CollectorRegistry()
372
+
373
+ # Aggregate metrics from all services
374
+ sre_metrics = get_sre_metrics()
375
+
376
+ # Create gauges for SRE metrics
377
+ for metric_name, values in sre_metrics.items():
378
+ if isinstance(values, dict):
379
+ gauge = Gauge(f'sre_{metric_name}', f'SRE {metric_name}', ['org_id'], registry=registry)
380
+ for org_id, value in values.items():
381
+ gauge.labels(org_id=org_id).set(value)
382
+
383
+ # Add error rates
384
+ error_rate_gauge = Gauge('system_error_rate', 'Overall system error rate', registry=registry)
385
+ error_rate_gauge.set(log_aggregator.get_error_rate("all", 5))
386
+
387
+ # Add service health status
388
+ health_gauge = Gauge('service_health', 'Service health status (1=healthy)', ['service'], registry=registry)
389
+ services = ["analytics_worker", "vector_service", "llm_service", "mapper", "dependencies"]
390
+ for service in services:
391
+ is_healthy = log_aggregator.get_error_rate(service, 5) < 0.1
392
+ health_gauge.labels(service=service).set(1 if is_healthy else 0)
393
+
394
+ return Response(
395
+ content=generate_latest(registry),
396
+ media_type=CONTENT_TYPE_LATEST
397
+ )
398
+
399
+ # ---------------------- SRE: Shutdown Handler ---------------------- #
400
+
401
+ @router.post("/health/shutdown")
402
+ async def shutdown_services():
403
+ """Graceful shutdown - close all connections"""
404
+ try:
405
+ # Shutdown LLM service
406
+ llm_service = get_llm_service()
407
+ if hasattr(llm_service, '_model') and llm_service._model:
408
+ del llm_service._model
409
+ if 'torch' in globals() and torch is not None:
410
+ torch.cuda.empty_cache()
411
+
412
+ # Shutdown worker manager
413
+ manager = await get_worker_manager()
414
+ manager.shutdown()
415
+
416
+ # Shutdown LLM service again (if needed)
417
+ llm_service = get_llm_service()
418
+ if hasattr(llm_service, '_model') and llm_service._model:
419
+ del llm_service._model
420
+ if 'torch' in globals() and torch is not None:
421
+ torch.cuda.empty_cache()
422
+
423
+ log_aggregator.emit("health_router", "info", "Shutdown completed")
424
 
425
+ return {"status": "shutdown_complete"}
426
  except Exception as e:
427
+ log_aggregator.emit("health_router", "error", f"Shutdown failed: {e}")
428
+ raise HTTPException(status_code=500, detail=str(e))
 
 
app/service/llm_service.py CHANGED
@@ -25,6 +25,7 @@ from typing import Optional, Dict, Any, List, Callable
25
  from dataclasses import dataclass, asdict
26
  import psutil # For resource monitoring
27
  from fastapi import HTTPException
 
28
  # Prometheus metrics (free tier compatible)
29
  try:
30
  from prometheus_client import Counter, Histogram, Gauge
@@ -344,7 +345,7 @@ class LocalLLMService:
344
  # βœ… SRE: Update gauge
345
  self.model_loaded_gauge.labels(org_id=self.org_id).set(1)
346
 
347
- logger.info("βœ… [BACKGROUND] LLM loaded successfully")
348
 
349
  except Exception as e:
350
  logger.error(f"❌ [BACKGROUND] LLM loading failed: {e}")
@@ -530,7 +531,7 @@ class LocalLLMService:
530
  raise
531
 
532
  except Exception as e:
533
- logger.error(f"[ASYNC] ❌ Generation error: {e}")
534
 
535
  self.inference_requests.labels(
536
  org_id=self.org_id,
 
25
  from dataclasses import dataclass, asdict
26
  import psutil # For resource monitoring
27
  from fastapi import HTTPException
28
+ from app.routers.health import emit_llm_log
29
  # Prometheus metrics (free tier compatible)
30
  try:
31
  from prometheus_client import Counter, Histogram, Gauge
 
345
  # βœ… SRE: Update gauge
346
  self.model_loaded_gauge.labels(org_id=self.org_id).set(1)
347
 
348
+ emit_llm_log("info", "βœ… LLM loaded successfully", model_id=self.model_id)
349
 
350
  except Exception as e:
351
  logger.error(f"❌ [BACKGROUND] LLM loading failed: {e}")
 
531
  raise
532
 
533
  except Exception as e:
534
+ emit_llm_log("error", f"❌ Generation failed: {e}", error=str(e))
535
 
536
  self.inference_requests.labels(
537
  org_id=self.org_id,
app/service/vector_service.py CHANGED
@@ -11,7 +11,7 @@ from sentence_transformers import SentenceTransformer
11
  import logging
12
  from datetime import datetime, timedelta
13
  from enum import Enum
14
-
15
  logger = logging.getLogger(__name__)
16
 
17
 
@@ -250,7 +250,8 @@ class VectorService:
250
  if (i // batch_size + 1) % 5 == 0:
251
  logger.debug(f"[Embed] Batch {i//batch_size + 1}/{total_batches}")
252
 
253
- logger.info(f"[Embed] βœ… Generated {len(embeddings)} embeddings")
 
254
  return embeddings
255
 
256
  # ====== REFACTORED: TCP Redis pipeline + pub/sub ======
@@ -372,7 +373,7 @@ class VectorService:
372
  }
373
  )
374
 
375
- logger.error(f"[❌ VECTOR] Redis error: {e}")
376
  return False
377
 
378
  # ====== Existing methods (polished with metrics) ======
 
11
  import logging
12
  from datetime import datetime, timedelta
13
  from enum import Enum
14
+ from app.routers.health import emit_vector_log
15
  logger = logging.getLogger(__name__)
16
 
17
 
 
250
  if (i // batch_size + 1) % 5 == 0:
251
  logger.debug(f"[Embed] Batch {i//batch_size + 1}/{total_batches}")
252
 
253
+ emit_vector_log("info", f"βœ… Generated {len(embeddings)} embeddings",
254
+ org_id=self.org_id, vector_count=len(embeddings))
255
  return embeddings
256
 
257
  # ====== REFACTORED: TCP Redis pipeline + pub/sub ======
 
373
  }
374
  )
375
 
376
+ emit_vector_log("error", f"❌ Redis error: {e}", error=str(e))
377
  return False
378
 
379
  # ====== Existing methods (polished with metrics) ======
app/tasks/analytics_worker.py CHANGED
@@ -26,6 +26,7 @@ from app.schemas.org_schema import OrgSchema
26
  from app.service.vector_service import VectorService, VectorStoreEventType, VectorMetrics
27
  from app.engine.kpi_calculators.registry import get_kpi_calculator_async
28
  from app.service.embedding_service import EmbeddingService
 
29
 
30
  # Configure structured logging for SRE tools (Loki, etc.)
31
  logging.basicConfig(
@@ -169,7 +170,7 @@ class AnalyticsWorker:
169
  if not await self._acquire_lock():
170
  return {"status": "skipped", "reason": "lock_failed"}
171
 
172
- logger.info(f"\n[WORKER] πŸš€ STARTING {worker_id}")
173
 
174
  # STEP 2: Load entity info from Redis
175
  await self._load_entity_from_redis()
@@ -245,7 +246,7 @@ class AnalyticsWorker:
245
  return results
246
 
247
  except Exception as e:
248
- logger.error(f"[WORKER] ❌ CRITICAL: {e}", exc_info=True)
249
  await self._publish_status("error", str(e))
250
 
251
  # Publish error event
 
26
  from app.service.vector_service import VectorService, VectorStoreEventType, VectorMetrics
27
  from app.engine.kpi_calculators.registry import get_kpi_calculator_async
28
  from app.service.embedding_service import EmbeddingService
29
+ from app.routers.health import emit_worker_log
30
 
31
  # Configure structured logging for SRE tools (Loki, etc.)
32
  logging.basicConfig(
 
170
  if not await self._acquire_lock():
171
  return {"status": "skipped", "reason": "lock_failed"}
172
 
173
+ emit_worker_log("info", f"πŸš€ STARTING {worker_id}", worker_id=worker_id)
174
 
175
  # STEP 2: Load entity info from Redis
176
  await self._load_entity_from_redis()
 
246
  return results
247
 
248
  except Exception as e:
249
+ emit_worker_log("error", f"❌ CRITICAL: {e}", error=str(e))
250
  await self._publish_status("error", str(e))
251
 
252
  # Publish error event