Spaces:
Sleeping
Sleeping
| import asyncio | |
| import logging | |
| import json | |
| import os | |
| import sys | |
| import uuid | |
| import signal | |
| import time | |
| import psutil | |
| import threading | |
| import functools | |
| from abc import ABC, abstractmethod | |
| from datetime import datetime | |
| from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor | |
| from typing import Dict, Any, List, Optional, Union, Callable | |
| from dataclasses import dataclass, field | |
| # ========================================== | |
| # 1. ADVANCED CONFIGURATION & CONSTANTS | |
| # ========================================== | |
| class SystemConfig: | |
| version: str = "2.0.1-Stable" | |
| max_workers: int = os.cpu_count() or 4 | |
| timeout: int = 60 | |
| memory_limit_mb: int = 512 | |
| db_path: str = "data/system_state.json" | |
| log_file: str = "logs/aumcore_main.log" | |
| diagnostics_log: str = "logs/diagnostics.log" | |
| enable_telemetry: bool = True | |
| # ========================================== | |
| # 2. ENHANCED LOGGING SYSTEM | |
| # ========================================== | |
| class AumLogger: | |
| """Professional Logging System with Multiple Outputs""" | |
| def __init__(self, name: str, log_file: str): | |
| if not os.path.exists('logs'): os.makedirs('logs') | |
| self.logger = logging.getLogger(name) | |
| self.logger.setLevel(logging.DEBUG) | |
| formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s') | |
| # File Handler | |
| fh = logging.FileHandler(log_file) | |
| fh.setFormatter(formatter) | |
| self.logger.addHandler(fh) | |
| # Console Handler | |
| ch = logging.StreamHandler() | |
| ch.setFormatter(formatter) | |
| self.logger.addHandler(ch) | |
| def info(self, msg): self.logger.info(msg) | |
| def error(self, msg): self.logger.error(msg) | |
| def warning(self, msg): self.logger.warning(msg) | |
| # ========================================== | |
| # 3. CUSTOM EXCEPTIONS & ERROR HANDLING | |
| # ========================================== | |
| class AumCoreError(Exception): | |
| """Base exception for AumCore system""" | |
| def __init__(self, message: str, error_code: int): | |
| super().__init__(message) | |
| self.error_code = error_code | |
| # ========================================== | |
| # 4. PERFORMANCE TRACKING DECORATORS | |
| # ========================================== | |
| def monitor_system_resources(func: Callable): | |
| """Decorator to monitor CPU and Memory usage during execution""" | |
| async def wrapper(*args, **kwargs): | |
| process = psutil.Process(os.getpid()) | |
| mem_before = process.memory_info().rss / (1024 * 1024) | |
| start_time = time.perf_counter() | |
| result = await func(*args, **kwargs) | |
| end_time = time.perf_counter() | |
| mem_after = process.memory_info().rss / (1024 * 1024) | |
| logging.info(f"Method {func.__name__} executed. Time: {end_time-start_time:.2f}s | Mem Delta: {mem_after-mem_before:.2f}MB") | |
| return result | |
| return wrapper | |
| # ========================================== | |
| # 5. CORE INTERFACES (CONTRACTS) | |
| # ========================================== | |
| class IEngine(ABC): | |
| async def startup(self): pass | |
| async def shutdown(self): pass | |
| async def process_request(self, data: Any): pass | |
| # ========================================== | |
| # 6. DATA PERSISTENCE LAYER | |
| # ========================================== | |
| class StateManager: | |
| """Handles Saving and Loading System State""" | |
| def __init__(self, path: str): | |
| self.path = path | |
| self._lock = threading.Lock() | |
| self._init_storage() | |
| def _init_storage(self): | |
| if not os.path.exists(os.path.dirname(self.path)): | |
| os.makedirs(os.path.dirname(self.path)) | |
| if not os.path.exists(self.path): | |
| self.save_state({"sessions": [], "metrics": {}, "diagnostics_history": []}) | |
| def save_state(self, data: Dict): | |
| with self._lock: | |
| with open(self.path, 'w') as f: | |
| json.dump(data, f, indent=4) | |
| def load_state(self) -> Dict: | |
| with self._lock: | |
| with open(self.path, 'r') as f: | |
| return json.load(f) | |
| # ========================================== | |
| # 7. SYSTEM DIAGNOSTICS ENGINE (LEVEL 2 ADDITION) | |
| # ========================================== | |
| class DiagnosticsEngine: | |
| """Complete System Health Monitoring and Diagnostics""" | |
| def __init__(self, config: SystemConfig): | |
| self.config = config | |
| self.logger = AumLogger("Diagnostics", config.diagnostics_log) | |
| self.diagnostics_history = [] | |
| async def run_full_diagnostics(self) -> Dict: | |
| """Run complete system health check with detailed report""" | |
| self.logger.info("Starting comprehensive system diagnostics...") | |
| report = { | |
| "timestamp": datetime.now().isoformat(), | |
| "system_id": f"AUMCORE-{uuid.uuid4().hex[:8]}", | |
| "status": "RUNNING", | |
| "health_score": 0, | |
| "sections": {} | |
| } | |
| # 1. SYSTEM RESOURCES CHECK | |
| system_health = await self._check_system_resources() | |
| report["sections"]["system_resources"] = system_health | |
| # 2. APPLICATION HEALTH CHECK | |
| app_health = await self._check_application_health() | |
| report["sections"]["application"] = app_health | |
| # 3. EXTERNAL SERVICES CHECK | |
| services_health = await self._check_external_services() | |
| report["sections"]["external_services"] = services_health | |
| # 4. LOGS ANALYSIS | |
| logs_analysis = await self._analyze_logs() | |
| report["sections"]["logs_analysis"] = logs_analysis | |
| # 5. PERFORMANCE METRICS | |
| performance = await self._collect_performance_metrics() | |
| report["sections"]["performance"] = performance | |
| # Calculate overall health score (0-100) | |
| report["health_score"] = self._calculate_health_score(report) | |
| report["status"] = "HEALTHY" if report["health_score"] >= 80 else "DEGRADED" if report["health_score"] >= 50 else "CRITICAL" | |
| # Save to history | |
| self._save_diagnostics_to_history(report) | |
| self.logger.info(f"Diagnostics completed. Health Score: {report['health_score']}/100") | |
| return report | |
| async def _check_system_resources(self) -> Dict: | |
| """Check CPU, Memory, Disk, Network""" | |
| try: | |
| cpu_usage = psutil.cpu_percent(interval=1) | |
| memory = psutil.virtual_memory() | |
| disk = psutil.disk_usage('/') | |
| net_io = psutil.net_io_counters() | |
| return { | |
| "cpu": { | |
| "usage_percent": cpu_usage, | |
| "cores": psutil.cpu_count(), | |
| "load_avg": os.getloadavg() if hasattr(os, 'getloadavg') else "N/A" | |
| }, | |
| "memory": { | |
| "total_gb": round(memory.total / (1024**3), 2), | |
| "available_gb": round(memory.available / (1024**3), 2), | |
| "used_percent": memory.percent, | |
| "free_percent": 100 - memory.percent | |
| }, | |
| "disk": { | |
| "total_gb": round(disk.total / (1024**3), 2), | |
| "used_gb": round(disk.used / (1024**3), 2), | |
| "free_gb": round(disk.free / (1024**3), 2), | |
| "used_percent": disk.percent | |
| }, | |
| "network": { | |
| "bytes_sent": net_io.bytes_sent, | |
| "bytes_recv": net_io.bytes_recv | |
| }, | |
| "processes": { | |
| "total": len(psutil.pids()), | |
| "aumcore_processes": len([p for p in psutil.process_iter(['name']) if 'python' in p.info['name'].lower()]) | |
| } | |
| } | |
| except Exception as e: | |
| self.logger.error(f"System resources check failed: {e}") | |
| return {"error": str(e)} | |
| async def _check_application_health(self) -> Dict: | |
| """Check application specific health""" | |
| try: | |
| # Check if app.py is running | |
| app_running = any('app.py' in p.info().get('cmdline', []) for p in psutil.process_iter(['cmdline'])) | |
| # Check logs directory | |
| logs_exist = os.path.exists('logs') | |
| data_dir_exist = os.path.exists('data') | |
| # Check recent errors in main log | |
| error_count = 0 | |
| if os.path.exists(self.config.log_file): | |
| with open(self.config.log_file, 'r') as f: | |
| lines = f.readlines()[-100:] # Last 100 lines | |
| error_count = sum(1 for line in lines if 'ERROR' in line.upper()) | |
| return { | |
| "application_running": app_running, | |
| "directories": { | |
| "logs": logs_exist, | |
| "data": data_dir_exist | |
| }, | |
| "recent_errors": error_count, | |
| "uptime_estimate": "N/A" # Can be enhanced with process start time | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Application health check failed: {e}") | |
| return {"error": str(e)} | |
| async def _check_external_services(self) -> Dict: | |
| """Check Groq API, TiDB, and other external services""" | |
| services = { | |
| "groq_api": {"status": "UNKNOWN", "latency_ms": 0}, | |
| "tidb_database": {"status": "UNKNOWN", "connected": False} | |
| } | |
| # Check Groq API | |
| try: | |
| import os | |
| from groq import Groq | |
| start = time.time() | |
| client = Groq(api_key=os.environ.get("GROQ_API_KEY")) | |
| test = client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[{"role": "user", "content": "ping"}], | |
| max_tokens=1, | |
| timeout=10 | |
| ) | |
| services["groq_api"] = { | |
| "status": "HEALTHY", | |
| "latency_ms": round((time.time() - start) * 1000, 2), | |
| "model_available": True | |
| } | |
| except Exception as e: | |
| services["groq_api"] = { | |
| "status": "UNHEALTHY", | |
| "error": str(e), | |
| "latency_ms": 0 | |
| } | |
| # Check TiDB (if memory_db exists) | |
| try: | |
| from memory_db import tidb_memory | |
| services["tidb_database"] = { | |
| "status": "HEALTHY", | |
| "connected": True, | |
| "type": "TiDB Cloud" | |
| } | |
| except ImportError: | |
| services["tidb_database"] = { | |
| "status": "NOT_CONFIGURED", | |
| "connected": False, | |
| "note": "memory_db module not found" | |
| } | |
| except Exception as e: | |
| services["tidb_database"] = { | |
| "status": "UNHEALTHY", | |
| "connected": False, | |
| "error": str(e) | |
| } | |
| return services | |
| async def _analyze_logs(self) -> Dict: | |
| """Analyze application logs for patterns and errors""" | |
| try: | |
| if not os.path.exists(self.config.log_file): | |
| return {"error": "Log file not found", "file": self.config.log_file} | |
| with open(self.config.log_file, 'r') as f: | |
| lines = f.readlines()[-500:] # Last 500 lines | |
| analysis = { | |
| "total_lines_analyzed": len(lines), | |
| "error_count": sum(1 for line in lines if 'ERROR' in line.upper()), | |
| "warning_count": sum(1 for line in lines if 'WARNING' in line.upper()), | |
| "info_count": sum(1 for line in lines if 'INFO' in line.upper()), | |
| "recent_errors": [], | |
| "log_file_size_mb": round(os.path.getsize(self.config.log_file) / (1024*1024), 3) | |
| } | |
| # Extract recent errors | |
| for line in lines[-20:]: # Last 20 lines | |
| if 'ERROR' in line.upper(): | |
| analysis["recent_errors"].append(line.strip()) | |
| if len(analysis["recent_errors"]) >= 5: | |
| break | |
| return analysis | |
| except Exception as e: | |
| return {"error": f"Log analysis failed: {str(e)}"} | |
| async def _collect_performance_metrics(self) -> Dict: | |
| """Collect performance metrics and statistics""" | |
| try: | |
| state = self.state.load_state() if hasattr(self, 'state') else {} | |
| return { | |
| "total_sessions": len(state.get("sessions", [])), | |
| "total_tasks_processed": len(state.get("metrics", {}).get("tasks", [])), | |
| "average_response_time": "N/A", # Can be calculated from actual data | |
| "peak_memory_usage_mb": "N/A", | |
| "system_uptime": "N/A", | |
| "last_diagnostics": state.get("diagnostics_history", [])[-1]["timestamp"] if state.get("diagnostics_history") else "N/A" | |
| } | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def _calculate_health_score(self, report: Dict) -> int: | |
| """Calculate overall health score 0-100""" | |
| score = 100 | |
| # Deduct points for issues | |
| if report["sections"]["system_resources"].get("memory", {}).get("used_percent", 0) > 90: | |
| score -= 20 | |
| if report["sections"]["system_resources"].get("disk", {}).get("used_percent", 0) > 90: | |
| score -= 15 | |
| if report["sections"]["application"].get("recent_errors", 0) > 5: | |
| score -= 10 * min(report["sections"]["application"]["recent_errors"], 10) | |
| if report["sections"]["external_services"].get("groq_api", {}).get("status") != "HEALTHY": | |
| score -= 25 | |
| if report["sections"]["external_services"].get("tidb_database", {}).get("status") != "HEALTHY": | |
| score -= 15 | |
| return max(0, min(100, score)) | |
| def _save_diagnostics_to_history(self, report: Dict): | |
| """Save diagnostics report to history""" | |
| try: | |
| state = self.state.load_state() if hasattr(self, 'state') else {"diagnostics_history": []} | |
| history = state.get("diagnostics_history", []) | |
| # Keep only last 10 reports | |
| history.append({ | |
| "timestamp": report["timestamp"], | |
| "health_score": report["health_score"], | |
| "status": report["status"], | |
| "summary": f"Health: {report['health_score']}/100, Status: {report['status']}" | |
| }) | |
| if len(history) > 10: | |
| history = history[-10:] | |
| state["diagnostics_history"] = history | |
| self.state.save_state(state) | |
| except Exception as e: | |
| self.logger.error(f"Failed to save diagnostics history: {e}") | |
| # ========================================== | |
| # 8. ASYNC AI TASK PROCESSOR | |
| # ========================================== | |
| class TaskProcessor(IEngine): | |
| def __init__(self, config: SystemConfig): | |
| self.config = config | |
| self.logger = AumLogger("TaskProcessor", config.log_file) | |
| self.thread_pool = ThreadPoolExecutor(max_workers=config.max_workers) | |
| self.process_pool = ProcessPoolExecutor(max_workers=2) | |
| self.active_tasks = 0 | |
| async def startup(self): | |
| self.logger.info(f"System Version {self.config.version} starting up...") | |
| self.logger.info(f"CPU Workers available: {self.config.max_workers}") | |
| await asyncio.sleep(1) # Simulated hardware check | |
| async def shutdown(self): | |
| self.logger.info("Cleaning up resources...") | |
| self.thread_pool.shutdown(wait=True) | |
| self.process_pool.shutdown(wait=True) | |
| async def process_request(self, payload: Dict): | |
| self.active_tasks += 1 | |
| request_id = payload.get("id", str(uuid.uuid4())) | |
| try: | |
| loop = asyncio.get_running_loop() | |
| # Offloading heavy computation to process pool to avoid blocking Event Loop | |
| result = await loop.run_in_executor( | |
| self.process_pool, | |
| self._compute_heavy_logic, | |
| payload.get("data") | |
| ) | |
| return {"id": request_id, "result": result, "timestamp": str(datetime.now())} | |
| except Exception as e: | |
| self.logger.error(f"Processing failed: {str(e)}") | |
| return {"error": "Processing Error", "id": request_id} | |
| finally: | |
| self.active_tasks -= 1 | |
| def _compute_heavy_logic(data: str) -> str: | |
| """Isolated function for CPU intensive work""" | |
| time.sleep(2) # Simulated Neural Network Inference | |
| return f"PROCESSED_DATA_{data.upper()}" | |
| # ========================================== | |
| # 9. MASTER ORCHESTRATOR (UPDATED WITH DIAGNOSTICS) | |
| # ========================================== | |
| class AumCoreMaster: | |
| def __init__(self): | |
| self.config = SystemConfig() | |
| self.logger = AumLogger("Master", self.config.log_file) | |
| self.state = StateManager(self.config.db_path) | |
| self.processor = TaskProcessor(self.config) | |
| self.diagnostics = DiagnosticsEngine(self.config) # NEW: Diagnostics Engine | |
| self.diagnostics.state = self.state # Share state manager | |
| self.is_running = True | |
| # Register OS Signals for Graceful Exit | |
| signal.signal(signal.SIGINT, self._signal_handler) | |
| signal.signal(signal.SIGTERM, self._signal_handler) | |
| def _signal_handler(self, sig, frame): | |
| self.logger.warning(f"Signal {sig} received. Shutting down...") | |
| self.is_running = False | |
| async def run_forever(self): | |
| await self.processor.startup() | |
| self.logger.info("Master loop started. Listening for tasks...") | |
| # Run initial diagnostics | |
| try: | |
| initial_report = await self.diagnostics.run_full_diagnostics() | |
| self.logger.info(f"Initial diagnostics: Health Score {initial_report['health_score']}/100") | |
| except Exception as e: | |
| self.logger.error(f"Initial diagnostics failed: {e}") | |
| try: | |
| while self.is_running: | |
| # In a real app, this would be a web server or message queue listener | |
| dummy_input = f"input_batch_{int(time.time())}" | |
| response = await self.processor.process_request({"data": dummy_input}) | |
| # Update State | |
| current_state = self.state.load_state() | |
| current_state["sessions"].append(response) | |
| self.state.save_state(current_state) | |
| self.logger.info(f"Task Handled: {response['id']}") | |
| await asyncio.sleep(5) # Throttling for demo | |
| except Exception as e: | |
| self.logger.error(f"Critical System Failure: {str(e)}") | |
| finally: | |
| await self.processor.shutdown() | |
| # ========================================== | |
| # 10. DIAGNOSTICS API ENDPOINT FUNCTION | |
| # ========================================== | |
| async def get_system_diagnostics() -> Dict: | |
| """Public function to get system diagnostics (for app.py integration)""" | |
| try: | |
| master = AumCoreMaster() | |
| report = await master.diagnostics.run_full_diagnostics() | |
| return { | |
| "success": True, | |
| "diagnostics": report, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # ========================================== | |
| # 11. EXECUTION ENTRY POINT | |
| # ========================================== | |
| async def bootstrap(): | |
| """Main Entry point with proper lifecycle management""" | |
| master = AumCoreMaster() | |
| await master.run_forever() | |
| if __name__ == "__main__": | |
| try: | |
| if sys.platform == 'win32': | |
| asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) | |
| asyncio.run(bootstrap()) | |
| except KeyboardInterrupt: | |
| pass | |
| except Exception as e: | |
| print(f"FATAL ERROR: {e}") | |
| # ========================================== | |
| # 12. REGISTER MODULE FUNCTION (FIXED LINE 539) | |
| # ========================================== | |
| def register_module(app, client, username): | |
| """Register orchestrator module with FastAPI app""" | |
| from fastapi import APIRouter | |
| router = APIRouter() | |
| async def orchestrator_health(): | |
| return { | |
| "module": "orchestrator", | |
| "status": "active", | |
| "version": "2.0.1-Stable", | |
| "endpoints": ["/orchestrator/health", "/orchestrator/diagnostics", "/system/task", "/system/status"] | |
| } | |
| async def module_diagnostics(): | |
| """Diagnostics endpoint - FIXED: No circular import""" | |
| # DIRECTLY CALL THE FUNCTION DEFINED ABOVE (line ~526) | |
| return await get_system_diagnostics() | |
| app.include_router(router) | |
| print("✅ Orchestrator module registered with FastAPI") |