import asyncio import logging import json import os import sys import uuid import signal import time import psutil import threading import functools from abc import ABC, abstractmethod from datetime import datetime from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from typing import Dict, Any, List, Optional, Union, Callable from dataclasses import dataclass, field # ========================================== # 1. ADVANCED CONFIGURATION & CONSTANTS # ========================================== @dataclass class SystemConfig: version: str = "2.0.1-Stable" max_workers: int = os.cpu_count() or 4 timeout: int = 60 memory_limit_mb: int = 512 db_path: str = "data/system_state.json" log_file: str = "logs/aumcore_main.log" diagnostics_log: str = "logs/diagnostics.log" enable_telemetry: bool = True # ========================================== # 2. ENHANCED LOGGING SYSTEM # ========================================== class AumLogger: """Professional Logging System with Multiple Outputs""" def __init__(self, name: str, log_file: str): if not os.path.exists('logs'): os.makedirs('logs') self.logger = logging.getLogger(name) self.logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s') # File Handler fh = logging.FileHandler(log_file) fh.setFormatter(formatter) self.logger.addHandler(fh) # Console Handler ch = logging.StreamHandler() ch.setFormatter(formatter) self.logger.addHandler(ch) def info(self, msg): self.logger.info(msg) def error(self, msg): self.logger.error(msg) def warning(self, msg): self.logger.warning(msg) # ========================================== # 3. CUSTOM EXCEPTIONS & ERROR HANDLING # ========================================== class AumCoreError(Exception): """Base exception for AumCore system""" def __init__(self, message: str, error_code: int): super().__init__(message) self.error_code = error_code # ========================================== # 4. PERFORMANCE TRACKING DECORATORS # ========================================== def monitor_system_resources(func: Callable): """Decorator to monitor CPU and Memory usage during execution""" @functools.wraps(func) async def wrapper(*args, **kwargs): process = psutil.Process(os.getpid()) mem_before = process.memory_info().rss / (1024 * 1024) start_time = time.perf_counter() result = await func(*args, **kwargs) end_time = time.perf_counter() mem_after = process.memory_info().rss / (1024 * 1024) logging.info(f"Method {func.__name__} executed. Time: {end_time-start_time:.2f}s | Mem Delta: {mem_after-mem_before:.2f}MB") return result return wrapper # ========================================== # 5. CORE INTERFACES (CONTRACTS) # ========================================== class IEngine(ABC): @abstractmethod async def startup(self): pass @abstractmethod async def shutdown(self): pass @abstractmethod async def process_request(self, data: Any): pass # ========================================== # 6. DATA PERSISTENCE LAYER # ========================================== class StateManager: """Handles Saving and Loading System State""" def __init__(self, path: str): self.path = path self._lock = threading.Lock() self._init_storage() def _init_storage(self): if not os.path.exists(os.path.dirname(self.path)): os.makedirs(os.path.dirname(self.path)) if not os.path.exists(self.path): self.save_state({"sessions": [], "metrics": {}, "diagnostics_history": []}) def save_state(self, data: Dict): with self._lock: with open(self.path, 'w') as f: json.dump(data, f, indent=4) def load_state(self) -> Dict: with self._lock: with open(self.path, 'r') as f: return json.load(f) # ========================================== # 7. SYSTEM DIAGNOSTICS ENGINE (LEVEL 2 ADDITION) # ========================================== class DiagnosticsEngine: """Complete System Health Monitoring and Diagnostics""" def __init__(self, config: SystemConfig): self.config = config self.logger = AumLogger("Diagnostics", config.diagnostics_log) self.diagnostics_history = [] async def run_full_diagnostics(self) -> Dict: """Run complete system health check with detailed report""" self.logger.info("Starting comprehensive system diagnostics...") report = { "timestamp": datetime.now().isoformat(), "system_id": f"AUMCORE-{uuid.uuid4().hex[:8]}", "status": "RUNNING", "health_score": 0, "sections": {} } # 1. SYSTEM RESOURCES CHECK system_health = await self._check_system_resources() report["sections"]["system_resources"] = system_health # 2. APPLICATION HEALTH CHECK app_health = await self._check_application_health() report["sections"]["application"] = app_health # 3. EXTERNAL SERVICES CHECK services_health = await self._check_external_services() report["sections"]["external_services"] = services_health # 4. LOGS ANALYSIS logs_analysis = await self._analyze_logs() report["sections"]["logs_analysis"] = logs_analysis # 5. PERFORMANCE METRICS performance = await self._collect_performance_metrics() report["sections"]["performance"] = performance # Calculate overall health score (0-100) report["health_score"] = self._calculate_health_score(report) report["status"] = "HEALTHY" if report["health_score"] >= 80 else "DEGRADED" if report["health_score"] >= 50 else "CRITICAL" # Save to history self._save_diagnostics_to_history(report) self.logger.info(f"Diagnostics completed. Health Score: {report['health_score']}/100") return report async def _check_system_resources(self) -> Dict: """Check CPU, Memory, Disk, Network""" try: cpu_usage = psutil.cpu_percent(interval=1) memory = psutil.virtual_memory() disk = psutil.disk_usage('/') net_io = psutil.net_io_counters() return { "cpu": { "usage_percent": cpu_usage, "cores": psutil.cpu_count(), "load_avg": os.getloadavg() if hasattr(os, 'getloadavg') else "N/A" }, "memory": { "total_gb": round(memory.total / (1024**3), 2), "available_gb": round(memory.available / (1024**3), 2), "used_percent": memory.percent, "free_percent": 100 - memory.percent }, "disk": { "total_gb": round(disk.total / (1024**3), 2), "used_gb": round(disk.used / (1024**3), 2), "free_gb": round(disk.free / (1024**3), 2), "used_percent": disk.percent }, "network": { "bytes_sent": net_io.bytes_sent, "bytes_recv": net_io.bytes_recv }, "processes": { "total": len(psutil.pids()), "aumcore_processes": len([p for p in psutil.process_iter(['name']) if 'python' in p.info['name'].lower()]) } } except Exception as e: self.logger.error(f"System resources check failed: {e}") return {"error": str(e)} async def _check_application_health(self) -> Dict: """Check application specific health""" try: # Check if app.py is running app_running = any('app.py' in p.info().get('cmdline', []) for p in psutil.process_iter(['cmdline'])) # Check logs directory logs_exist = os.path.exists('logs') data_dir_exist = os.path.exists('data') # Check recent errors in main log error_count = 0 if os.path.exists(self.config.log_file): with open(self.config.log_file, 'r') as f: lines = f.readlines()[-100:] # Last 100 lines error_count = sum(1 for line in lines if 'ERROR' in line.upper()) return { "application_running": app_running, "directories": { "logs": logs_exist, "data": data_dir_exist }, "recent_errors": error_count, "uptime_estimate": "N/A" # Can be enhanced with process start time } except Exception as e: self.logger.error(f"Application health check failed: {e}") return {"error": str(e)} async def _check_external_services(self) -> Dict: """Check Groq API, TiDB, and other external services""" services = { "groq_api": {"status": "UNKNOWN", "latency_ms": 0}, "tidb_database": {"status": "UNKNOWN", "connected": False} } # Check Groq API try: import os from groq import Groq start = time.time() client = Groq(api_key=os.environ.get("GROQ_API_KEY")) test = client.chat.completions.create( model="llama-3.3-70b-versatile", messages=[{"role": "user", "content": "ping"}], max_tokens=1, timeout=10 ) services["groq_api"] = { "status": "HEALTHY", "latency_ms": round((time.time() - start) * 1000, 2), "model_available": True } except Exception as e: services["groq_api"] = { "status": "UNHEALTHY", "error": str(e), "latency_ms": 0 } # Check TiDB (if memory_db exists) try: from memory_db import tidb_memory services["tidb_database"] = { "status": "HEALTHY", "connected": True, "type": "TiDB Cloud" } except ImportError: services["tidb_database"] = { "status": "NOT_CONFIGURED", "connected": False, "note": "memory_db module not found" } except Exception as e: services["tidb_database"] = { "status": "UNHEALTHY", "connected": False, "error": str(e) } return services async def _analyze_logs(self) -> Dict: """Analyze application logs for patterns and errors""" try: if not os.path.exists(self.config.log_file): return {"error": "Log file not found", "file": self.config.log_file} with open(self.config.log_file, 'r') as f: lines = f.readlines()[-500:] # Last 500 lines analysis = { "total_lines_analyzed": len(lines), "error_count": sum(1 for line in lines if 'ERROR' in line.upper()), "warning_count": sum(1 for line in lines if 'WARNING' in line.upper()), "info_count": sum(1 for line in lines if 'INFO' in line.upper()), "recent_errors": [], "log_file_size_mb": round(os.path.getsize(self.config.log_file) / (1024*1024), 3) } # Extract recent errors for line in lines[-20:]: # Last 20 lines if 'ERROR' in line.upper(): analysis["recent_errors"].append(line.strip()) if len(analysis["recent_errors"]) >= 5: break return analysis except Exception as e: return {"error": f"Log analysis failed: {str(e)}"} async def _collect_performance_metrics(self) -> Dict: """Collect performance metrics and statistics""" try: state = self.state.load_state() if hasattr(self, 'state') else {} return { "total_sessions": len(state.get("sessions", [])), "total_tasks_processed": len(state.get("metrics", {}).get("tasks", [])), "average_response_time": "N/A", # Can be calculated from actual data "peak_memory_usage_mb": "N/A", "system_uptime": "N/A", "last_diagnostics": state.get("diagnostics_history", [])[-1]["timestamp"] if state.get("diagnostics_history") else "N/A" } except Exception as e: return {"error": str(e)} def _calculate_health_score(self, report: Dict) -> int: """Calculate overall health score 0-100""" score = 100 # Deduct points for issues if report["sections"]["system_resources"].get("memory", {}).get("used_percent", 0) > 90: score -= 20 if report["sections"]["system_resources"].get("disk", {}).get("used_percent", 0) > 90: score -= 15 if report["sections"]["application"].get("recent_errors", 0) > 5: score -= 10 * min(report["sections"]["application"]["recent_errors"], 10) if report["sections"]["external_services"].get("groq_api", {}).get("status") != "HEALTHY": score -= 25 if report["sections"]["external_services"].get("tidb_database", {}).get("status") != "HEALTHY": score -= 15 return max(0, min(100, score)) def _save_diagnostics_to_history(self, report: Dict): """Save diagnostics report to history""" try: state = self.state.load_state() if hasattr(self, 'state') else {"diagnostics_history": []} history = state.get("diagnostics_history", []) # Keep only last 10 reports history.append({ "timestamp": report["timestamp"], "health_score": report["health_score"], "status": report["status"], "summary": f"Health: {report['health_score']}/100, Status: {report['status']}" }) if len(history) > 10: history = history[-10:] state["diagnostics_history"] = history self.state.save_state(state) except Exception as e: self.logger.error(f"Failed to save diagnostics history: {e}") # ========================================== # 8. ASYNC AI TASK PROCESSOR # ========================================== class TaskProcessor(IEngine): def __init__(self, config: SystemConfig): self.config = config self.logger = AumLogger("TaskProcessor", config.log_file) self.thread_pool = ThreadPoolExecutor(max_workers=config.max_workers) self.process_pool = ProcessPoolExecutor(max_workers=2) self.active_tasks = 0 async def startup(self): self.logger.info(f"System Version {self.config.version} starting up...") self.logger.info(f"CPU Workers available: {self.config.max_workers}") await asyncio.sleep(1) # Simulated hardware check async def shutdown(self): self.logger.info("Cleaning up resources...") self.thread_pool.shutdown(wait=True) self.process_pool.shutdown(wait=True) @monitor_system_resources async def process_request(self, payload: Dict): self.active_tasks += 1 request_id = payload.get("id", str(uuid.uuid4())) try: loop = asyncio.get_running_loop() # Offloading heavy computation to process pool to avoid blocking Event Loop result = await loop.run_in_executor( self.process_pool, self._compute_heavy_logic, payload.get("data") ) return {"id": request_id, "result": result, "timestamp": str(datetime.now())} except Exception as e: self.logger.error(f"Processing failed: {str(e)}") return {"error": "Processing Error", "id": request_id} finally: self.active_tasks -= 1 @staticmethod def _compute_heavy_logic(data: str) -> str: """Isolated function for CPU intensive work""" time.sleep(2) # Simulated Neural Network Inference return f"PROCESSED_DATA_{data.upper()}" # ========================================== # 9. MASTER ORCHESTRATOR (UPDATED WITH DIAGNOSTICS) # ========================================== class AumCoreMaster: def __init__(self): self.config = SystemConfig() self.logger = AumLogger("Master", self.config.log_file) self.state = StateManager(self.config.db_path) self.processor = TaskProcessor(self.config) self.diagnostics = DiagnosticsEngine(self.config) # NEW: Diagnostics Engine self.diagnostics.state = self.state # Share state manager self.is_running = True # Register OS Signals for Graceful Exit signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, sig, frame): self.logger.warning(f"Signal {sig} received. Shutting down...") self.is_running = False async def run_forever(self): await self.processor.startup() self.logger.info("Master loop started. Listening for tasks...") # Run initial diagnostics try: initial_report = await self.diagnostics.run_full_diagnostics() self.logger.info(f"Initial diagnostics: Health Score {initial_report['health_score']}/100") except Exception as e: self.logger.error(f"Initial diagnostics failed: {e}") try: while self.is_running: # In a real app, this would be a web server or message queue listener dummy_input = f"input_batch_{int(time.time())}" response = await self.processor.process_request({"data": dummy_input}) # Update State current_state = self.state.load_state() current_state["sessions"].append(response) self.state.save_state(current_state) self.logger.info(f"Task Handled: {response['id']}") await asyncio.sleep(5) # Throttling for demo except Exception as e: self.logger.error(f"Critical System Failure: {str(e)}") finally: await self.processor.shutdown() # ========================================== # 10. DIAGNOSTICS API ENDPOINT FUNCTION # ========================================== async def get_system_diagnostics() -> Dict: """Public function to get system diagnostics (for app.py integration)""" try: master = AumCoreMaster() report = await master.diagnostics.run_full_diagnostics() return { "success": True, "diagnostics": report, "timestamp": datetime.now().isoformat() } except Exception as e: return { "success": False, "error": str(e), "timestamp": datetime.now().isoformat() } # ========================================== # 11. EXECUTION ENTRY POINT # ========================================== async def bootstrap(): """Main Entry point with proper lifecycle management""" master = AumCoreMaster() await master.run_forever() if __name__ == "__main__": try: if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(bootstrap()) except KeyboardInterrupt: pass except Exception as e: print(f"FATAL ERROR: {e}") # ========================================== # 12. REGISTER MODULE FUNCTION (FIXED LINE 539) # ========================================== def register_module(app, client, username): """Register orchestrator module with FastAPI app""" from fastapi import APIRouter router = APIRouter() @router.get("/orchestrator/health") async def orchestrator_health(): return { "module": "orchestrator", "status": "active", "version": "2.0.1-Stable", "endpoints": ["/orchestrator/health", "/orchestrator/diagnostics", "/system/task", "/system/status"] } @router.get("/orchestrator/diagnostics") async def module_diagnostics(): """Diagnostics endpoint - FIXED: No circular import""" # DIRECTLY CALL THE FUNCTION DEFINED ABOVE (line ~526) return await get_system_diagnostics() app.include_router(router) print("✅ Orchestrator module registered with FastAPI")