AumCoreAI commited on
Commit
3d0d89c
·
verified ·
1 Parent(s): cebfe45

Update system_orchestrator.py

Browse files
Files changed (1) hide show
  1. system_orchestrator.py +294 -4
system_orchestrator.py CHANGED
@@ -25,6 +25,7 @@ class SystemConfig:
25
  memory_limit_mb: int = 512
26
  db_path: str = "data/system_state.json"
27
  log_file: str = "logs/aumcore_main.log"
 
28
  enable_telemetry: bool = True
29
 
30
  # ==========================================
@@ -109,7 +110,7 @@ class StateManager:
109
  if not os.path.exists(os.path.dirname(self.path)):
110
  os.makedirs(os.path.dirname(self.path))
111
  if not os.path.exists(self.path):
112
- self.save_state({"sessions": [], "metrics": {}})
113
 
114
  def save_state(self, data: Dict):
115
  with self._lock:
@@ -122,7 +123,267 @@ class StateManager:
122
  return json.load(f)
123
 
124
  # ==========================================
125
- # 7. ASYNC AI TASK PROCESSOR
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
126
  # ==========================================
127
  class TaskProcessor(IEngine):
128
  def __init__(self, config: SystemConfig):
@@ -169,7 +430,7 @@ class TaskProcessor(IEngine):
169
  return f"PROCESSED_DATA_{data.upper()}"
170
 
171
  # ==========================================
172
- # 8. MASTER ORCHESTRATOR
173
  # ==========================================
174
  class AumCoreMaster:
175
  def __init__(self):
@@ -177,6 +438,8 @@ class AumCoreMaster:
177
  self.logger = AumLogger("Master", self.config.log_file)
178
  self.state = StateManager(self.config.db_path)
179
  self.processor = TaskProcessor(self.config)
 
 
180
  self.is_running = True
181
 
182
  # Register OS Signals for Graceful Exit
@@ -191,6 +454,13 @@ class AumCoreMaster:
191
  await self.processor.startup()
192
  self.logger.info("Master loop started. Listening for tasks...")
193
 
 
 
 
 
 
 
 
194
  try:
195
  while self.is_running:
196
  # In a real app, this would be a web server or message queue listener
@@ -210,7 +480,27 @@ class AumCoreMaster:
210
  await self.processor.shutdown()
211
 
212
  # ==========================================
213
- # 9. EXECUTION ENTRY POINT
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
214
  # ==========================================
215
  async def bootstrap():
216
  """Main Entry point with proper lifecycle management"""
 
25
  memory_limit_mb: int = 512
26
  db_path: str = "data/system_state.json"
27
  log_file: str = "logs/aumcore_main.log"
28
+ diagnostics_log: str = "logs/diagnostics.log"
29
  enable_telemetry: bool = True
30
 
31
  # ==========================================
 
110
  if not os.path.exists(os.path.dirname(self.path)):
111
  os.makedirs(os.path.dirname(self.path))
112
  if not os.path.exists(self.path):
113
+ self.save_state({"sessions": [], "metrics": {}, "diagnostics_history": []})
114
 
115
  def save_state(self, data: Dict):
116
  with self._lock:
 
123
  return json.load(f)
124
 
125
  # ==========================================
126
+ # 7. SYSTEM DIAGNOSTICS ENGINE (LEVEL 2 ADDITION)
127
+ # ==========================================
128
+ class DiagnosticsEngine:
129
+ """Complete System Health Monitoring and Diagnostics"""
130
+ def __init__(self, config: SystemConfig):
131
+ self.config = config
132
+ self.logger = AumLogger("Diagnostics", config.diagnostics_log)
133
+ self.diagnostics_history = []
134
+
135
+ async def run_full_diagnostics(self) -> Dict:
136
+ """Run complete system health check with detailed report"""
137
+ self.logger.info("Starting comprehensive system diagnostics...")
138
+
139
+ report = {
140
+ "timestamp": datetime.now().isoformat(),
141
+ "system_id": f"AUMCORE-{uuid.uuid4().hex[:8]}",
142
+ "status": "RUNNING",
143
+ "health_score": 0,
144
+ "sections": {}
145
+ }
146
+
147
+ # 1. SYSTEM RESOURCES CHECK
148
+ system_health = await self._check_system_resources()
149
+ report["sections"]["system_resources"] = system_health
150
+
151
+ # 2. APPLICATION HEALTH CHECK
152
+ app_health = await self._check_application_health()
153
+ report["sections"]["application"] = app_health
154
+
155
+ # 3. EXTERNAL SERVICES CHECK
156
+ services_health = await self._check_external_services()
157
+ report["sections"]["external_services"] = services_health
158
+
159
+ # 4. LOGS ANALYSIS
160
+ logs_analysis = await self._analyze_logs()
161
+ report["sections"]["logs_analysis"] = logs_analysis
162
+
163
+ # 5. PERFORMANCE METRICS
164
+ performance = await self._collect_performance_metrics()
165
+ report["sections"]["performance"] = performance
166
+
167
+ # Calculate overall health score (0-100)
168
+ report["health_score"] = self._calculate_health_score(report)
169
+ report["status"] = "HEALTHY" if report["health_score"] >= 80 else "DEGRADED" if report["health_score"] >= 50 else "CRITICAL"
170
+
171
+ # Save to history
172
+ self._save_diagnostics_to_history(report)
173
+
174
+ self.logger.info(f"Diagnostics completed. Health Score: {report['health_score']}/100")
175
+ return report
176
+
177
+ async def _check_system_resources(self) -> Dict:
178
+ """Check CPU, Memory, Disk, Network"""
179
+ try:
180
+ cpu_usage = psutil.cpu_percent(interval=1)
181
+ memory = psutil.virtual_memory()
182
+ disk = psutil.disk_usage('/')
183
+ net_io = psutil.net_io_counters()
184
+
185
+ return {
186
+ "cpu": {
187
+ "usage_percent": cpu_usage,
188
+ "cores": psutil.cpu_count(),
189
+ "load_avg": os.getloadavg() if hasattr(os, 'getloadavg') else "N/A"
190
+ },
191
+ "memory": {
192
+ "total_gb": round(memory.total / (1024**3), 2),
193
+ "available_gb": round(memory.available / (1024**3), 2),
194
+ "used_percent": memory.percent,
195
+ "free_percent": 100 - memory.percent
196
+ },
197
+ "disk": {
198
+ "total_gb": round(disk.total / (1024**3), 2),
199
+ "used_gb": round(disk.used / (1024**3), 2),
200
+ "free_gb": round(disk.free / (1024**3), 2),
201
+ "used_percent": disk.percent
202
+ },
203
+ "network": {
204
+ "bytes_sent": net_io.bytes_sent,
205
+ "bytes_recv": net_io.bytes_recv
206
+ },
207
+ "processes": {
208
+ "total": len(psutil.pids()),
209
+ "aumcore_processes": len([p for p in psutil.process_iter(['name']) if 'python' in p.info['name'].lower()])
210
+ }
211
+ }
212
+ except Exception as e:
213
+ self.logger.error(f"System resources check failed: {e}")
214
+ return {"error": str(e)}
215
+
216
+ async def _check_application_health(self) -> Dict:
217
+ """Check application specific health"""
218
+ try:
219
+ # Check if app.py is running
220
+ app_running = any('app.py' in p.info().get('cmdline', []) for p in psutil.process_iter(['cmdline']))
221
+
222
+ # Check logs directory
223
+ logs_exist = os.path.exists('logs')
224
+ data_dir_exist = os.path.exists('data')
225
+
226
+ # Check recent errors in main log
227
+ error_count = 0
228
+ if os.path.exists(self.config.log_file):
229
+ with open(self.config.log_file, 'r') as f:
230
+ lines = f.readlines()[-100:] # Last 100 lines
231
+ error_count = sum(1 for line in lines if 'ERROR' in line.upper())
232
+
233
+ return {
234
+ "application_running": app_running,
235
+ "directories": {
236
+ "logs": logs_exist,
237
+ "data": data_dir_exist
238
+ },
239
+ "recent_errors": error_count,
240
+ "uptime_estimate": "N/A" # Can be enhanced with process start time
241
+ }
242
+ except Exception as e:
243
+ self.logger.error(f"Application health check failed: {e}")
244
+ return {"error": str(e)}
245
+
246
+ async def _check_external_services(self) -> Dict:
247
+ """Check Groq API, TiDB, and other external services"""
248
+ services = {
249
+ "groq_api": {"status": "UNKNOWN", "latency_ms": 0},
250
+ "tidb_database": {"status": "UNKNOWN", "connected": False}
251
+ }
252
+
253
+ # Check Groq API
254
+ try:
255
+ import os
256
+ from groq import Groq
257
+ start = time.time()
258
+ client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
259
+ test = client.chat.completions.create(
260
+ model="llama-3.3-70b-versatile",
261
+ messages=[{"role": "user", "content": "ping"}],
262
+ max_tokens=1,
263
+ timeout=10
264
+ )
265
+ services["groq_api"] = {
266
+ "status": "HEALTHY",
267
+ "latency_ms": round((time.time() - start) * 1000, 2),
268
+ "model_available": True
269
+ }
270
+ except Exception as e:
271
+ services["groq_api"] = {
272
+ "status": "UNHEALTHY",
273
+ "error": str(e),
274
+ "latency_ms": 0
275
+ }
276
+
277
+ # Check TiDB (if memory_db exists)
278
+ try:
279
+ from memory_db import tidb_memory
280
+ services["tidb_database"] = {
281
+ "status": "HEALTHY",
282
+ "connected": True,
283
+ "type": "TiDB Cloud"
284
+ }
285
+ except ImportError:
286
+ services["tidb_database"] = {
287
+ "status": "NOT_CONFIGURED",
288
+ "connected": False,
289
+ "note": "memory_db module not found"
290
+ }
291
+ except Exception as e:
292
+ services["tidb_database"] = {
293
+ "status": "UNHEALTHY",
294
+ "connected": False,
295
+ "error": str(e)
296
+ }
297
+
298
+ return services
299
+
300
+ async def _analyze_logs(self) -> Dict:
301
+ """Analyze application logs for patterns and errors"""
302
+ try:
303
+ if not os.path.exists(self.config.log_file):
304
+ return {"error": "Log file not found", "file": self.config.log_file}
305
+
306
+ with open(self.config.log_file, 'r') as f:
307
+ lines = f.readlines()[-500:] # Last 500 lines
308
+
309
+ analysis = {
310
+ "total_lines_analyzed": len(lines),
311
+ "error_count": sum(1 for line in lines if 'ERROR' in line.upper()),
312
+ "warning_count": sum(1 for line in lines if 'WARNING' in line.upper()),
313
+ "info_count": sum(1 for line in lines if 'INFO' in line.upper()),
314
+ "recent_errors": [],
315
+ "log_file_size_mb": round(os.path.getsize(self.config.log_file) / (1024*1024), 3)
316
+ }
317
+
318
+ # Extract recent errors
319
+ for line in lines[-20:]: # Last 20 lines
320
+ if 'ERROR' in line.upper():
321
+ analysis["recent_errors"].append(line.strip())
322
+ if len(analysis["recent_errors"]) >= 5:
323
+ break
324
+
325
+ return analysis
326
+ except Exception as e:
327
+ return {"error": f"Log analysis failed: {str(e)}"}
328
+
329
+ async def _collect_performance_metrics(self) -> Dict:
330
+ """Collect performance metrics and statistics"""
331
+ try:
332
+ state = self.state.load_state() if hasattr(self, 'state') else {}
333
+
334
+ return {
335
+ "total_sessions": len(state.get("sessions", [])),
336
+ "total_tasks_processed": len(state.get("metrics", {}).get("tasks", [])),
337
+ "average_response_time": "N/A", # Can be calculated from actual data
338
+ "peak_memory_usage_mb": "N/A",
339
+ "system_uptime": "N/A",
340
+ "last_diagnostics": state.get("diagnostics_history", [])[-1]["timestamp"] if state.get("diagnostics_history") else "N/A"
341
+ }
342
+ except Exception as e:
343
+ return {"error": str(e)}
344
+
345
+ def _calculate_health_score(self, report: Dict) -> int:
346
+ """Calculate overall health score 0-100"""
347
+ score = 100
348
+
349
+ # Deduct points for issues
350
+ if report["sections"]["system_resources"].get("memory", {}).get("used_percent", 0) > 90:
351
+ score -= 20
352
+ if report["sections"]["system_resources"].get("disk", {}).get("used_percent", 0) > 90:
353
+ score -= 15
354
+ if report["sections"]["application"].get("recent_errors", 0) > 5:
355
+ score -= 10 * min(report["sections"]["application"]["recent_errors"], 10)
356
+ if report["sections"]["external_services"].get("groq_api", {}).get("status") != "HEALTHY":
357
+ score -= 25
358
+ if report["sections"]["external_services"].get("tidb_database", {}).get("status") != "HEALTHY":
359
+ score -= 15
360
+
361
+ return max(0, min(100, score))
362
+
363
+ def _save_diagnostics_to_history(self, report: Dict):
364
+ """Save diagnostics report to history"""
365
+ try:
366
+ state = self.state.load_state() if hasattr(self, 'state') else {"diagnostics_history": []}
367
+ history = state.get("diagnostics_history", [])
368
+
369
+ # Keep only last 10 reports
370
+ history.append({
371
+ "timestamp": report["timestamp"],
372
+ "health_score": report["health_score"],
373
+ "status": report["status"],
374
+ "summary": f"Health: {report['health_score']}/100, Status: {report['status']}"
375
+ })
376
+
377
+ if len(history) > 10:
378
+ history = history[-10:]
379
+
380
+ state["diagnostics_history"] = history
381
+ self.state.save_state(state)
382
+ except Exception as e:
383
+ self.logger.error(f"Failed to save diagnostics history: {e}")
384
+
385
+ # ==========================================
386
+ # 8. ASYNC AI TASK PROCESSOR
387
  # ==========================================
388
  class TaskProcessor(IEngine):
389
  def __init__(self, config: SystemConfig):
 
430
  return f"PROCESSED_DATA_{data.upper()}"
431
 
432
  # ==========================================
433
+ # 9. MASTER ORCHESTRATOR (UPDATED WITH DIAGNOSTICS)
434
  # ==========================================
435
  class AumCoreMaster:
436
  def __init__(self):
 
438
  self.logger = AumLogger("Master", self.config.log_file)
439
  self.state = StateManager(self.config.db_path)
440
  self.processor = TaskProcessor(self.config)
441
+ self.diagnostics = DiagnosticsEngine(self.config) # NEW: Diagnostics Engine
442
+ self.diagnostics.state = self.state # Share state manager
443
  self.is_running = True
444
 
445
  # Register OS Signals for Graceful Exit
 
454
  await self.processor.startup()
455
  self.logger.info("Master loop started. Listening for tasks...")
456
 
457
+ # Run initial diagnostics
458
+ try:
459
+ initial_report = await self.diagnostics.run_full_diagnostics()
460
+ self.logger.info(f"Initial diagnostics: Health Score {initial_report['health_score']}/100")
461
+ except Exception as e:
462
+ self.logger.error(f"Initial diagnostics failed: {e}")
463
+
464
  try:
465
  while self.is_running:
466
  # In a real app, this would be a web server or message queue listener
 
480
  await self.processor.shutdown()
481
 
482
  # ==========================================
483
+ # 10. DIAGNOSTICS API ENDPOINT FUNCTION
484
+ # ==========================================
485
+ async def get_system_diagnostics() -> Dict:
486
+ """Public function to get system diagnostics (for app.py integration)"""
487
+ try:
488
+ master = AumCoreMaster()
489
+ report = await master.diagnostics.run_full_diagnostics()
490
+ return {
491
+ "success": True,
492
+ "diagnostics": report,
493
+ "timestamp": datetime.now().isoformat()
494
+ }
495
+ except Exception as e:
496
+ return {
497
+ "success": False,
498
+ "error": str(e),
499
+ "timestamp": datetime.now().isoformat()
500
+ }
501
+
502
+ # ==========================================
503
+ # 11. EXECUTION ENTRY POINT
504
  # ==========================================
505
  async def bootstrap():
506
  """Main Entry point with proper lifecycle management"""