AumCore-AI / modules /orchestrator.py
AumCoreAI's picture
Update modules/orchestrator.py
bcde1c0 verified
raw
history blame
21.3 kB
import asyncio
import logging
import json
import os
import sys
import uuid
import signal
import time
import psutil
import threading
import functools
from abc import ABC, abstractmethod
from datetime import datetime
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from typing import Dict, Any, List, Optional, Union, Callable
from dataclasses import dataclass, field
# ==========================================
# 1. ADVANCED CONFIGURATION & CONSTANTS
# ==========================================
@dataclass
class SystemConfig:
version: str = "2.0.1-Stable"
max_workers: int = os.cpu_count() or 4
timeout: int = 60
memory_limit_mb: int = 512
db_path: str = "data/system_state.json"
log_file: str = "logs/aumcore_main.log"
diagnostics_log: str = "logs/diagnostics.log"
enable_telemetry: bool = True
# ==========================================
# 2. ENHANCED LOGGING SYSTEM
# ==========================================
class AumLogger:
"""Professional Logging System with Multiple Outputs"""
def __init__(self, name: str, log_file: str):
if not os.path.exists('logs'): os.makedirs('logs')
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s')
# File Handler
fh = logging.FileHandler(log_file)
fh.setFormatter(formatter)
self.logger.addHandler(fh)
# Console Handler
ch = logging.StreamHandler()
ch.setFormatter(formatter)
self.logger.addHandler(ch)
def info(self, msg): self.logger.info(msg)
def error(self, msg): self.logger.error(msg)
def warning(self, msg): self.logger.warning(msg)
# ==========================================
# 3. CUSTOM EXCEPTIONS & ERROR HANDLING
# ==========================================
class AumCoreError(Exception):
"""Base exception for AumCore system"""
def __init__(self, message: str, error_code: int):
super().__init__(message)
self.error_code = error_code
# ==========================================
# 4. PERFORMANCE TRACKING DECORATORS
# ==========================================
def monitor_system_resources(func: Callable):
"""Decorator to monitor CPU and Memory usage during execution"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
process = psutil.Process(os.getpid())
mem_before = process.memory_info().rss / (1024 * 1024)
start_time = time.perf_counter()
result = await func(*args, **kwargs)
end_time = time.perf_counter()
mem_after = process.memory_info().rss / (1024 * 1024)
logging.info(f"Method {func.__name__} executed. Time: {end_time-start_time:.2f}s | Mem Delta: {mem_after-mem_before:.2f}MB")
return result
return wrapper
# ==========================================
# 5. CORE INTERFACES (CONTRACTS)
# ==========================================
class IEngine(ABC):
@abstractmethod
async def startup(self): pass
@abstractmethod
async def shutdown(self): pass
@abstractmethod
async def process_request(self, data: Any): pass
# ==========================================
# 6. DATA PERSISTENCE LAYER
# ==========================================
class StateManager:
"""Handles Saving and Loading System State"""
def __init__(self, path: str):
self.path = path
self._lock = threading.Lock()
self._init_storage()
def _init_storage(self):
if not os.path.exists(os.path.dirname(self.path)):
os.makedirs(os.path.dirname(self.path))
if not os.path.exists(self.path):
self.save_state({"sessions": [], "metrics": {}, "diagnostics_history": []})
def save_state(self, data: Dict):
with self._lock:
with open(self.path, 'w') as f:
json.dump(data, f, indent=4)
def load_state(self) -> Dict:
with self._lock:
with open(self.path, 'r') as f:
return json.load(f)
# ==========================================
# 7. SYSTEM DIAGNOSTICS ENGINE (LEVEL 2 ADDITION)
# ==========================================
class DiagnosticsEngine:
"""Complete System Health Monitoring and Diagnostics"""
def __init__(self, config: SystemConfig):
self.config = config
self.logger = AumLogger("Diagnostics", config.diagnostics_log)
self.diagnostics_history = []
async def run_full_diagnostics(self) -> Dict:
"""Run complete system health check with detailed report"""
self.logger.info("Starting comprehensive system diagnostics...")
report = {
"timestamp": datetime.now().isoformat(),
"system_id": f"AUMCORE-{uuid.uuid4().hex[:8]}",
"status": "RUNNING",
"health_score": 0,
"sections": {}
}
# 1. SYSTEM RESOURCES CHECK
system_health = await self._check_system_resources()
report["sections"]["system_resources"] = system_health
# 2. APPLICATION HEALTH CHECK
app_health = await self._check_application_health()
report["sections"]["application"] = app_health
# 3. EXTERNAL SERVICES CHECK
services_health = await self._check_external_services()
report["sections"]["external_services"] = services_health
# 4. LOGS ANALYSIS
logs_analysis = await self._analyze_logs()
report["sections"]["logs_analysis"] = logs_analysis
# 5. PERFORMANCE METRICS
performance = await self._collect_performance_metrics()
report["sections"]["performance"] = performance
# Calculate overall health score (0-100)
report["health_score"] = self._calculate_health_score(report)
report["status"] = "HEALTHY" if report["health_score"] >= 80 else "DEGRADED" if report["health_score"] >= 50 else "CRITICAL"
# Save to history
self._save_diagnostics_to_history(report)
self.logger.info(f"Diagnostics completed. Health Score: {report['health_score']}/100")
return report
async def _check_system_resources(self) -> Dict:
"""Check CPU, Memory, Disk, Network"""
try:
cpu_usage = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
net_io = psutil.net_io_counters()
return {
"cpu": {
"usage_percent": cpu_usage,
"cores": psutil.cpu_count(),
"load_avg": os.getloadavg() if hasattr(os, 'getloadavg') else "N/A"
},
"memory": {
"total_gb": round(memory.total / (1024**3), 2),
"available_gb": round(memory.available / (1024**3), 2),
"used_percent": memory.percent,
"free_percent": 100 - memory.percent
},
"disk": {
"total_gb": round(disk.total / (1024**3), 2),
"used_gb": round(disk.used / (1024**3), 2),
"free_gb": round(disk.free / (1024**3), 2),
"used_percent": disk.percent
},
"network": {
"bytes_sent": net_io.bytes_sent,
"bytes_recv": net_io.bytes_recv
},
"processes": {
"total": len(psutil.pids()),
"aumcore_processes": len([p for p in psutil.process_iter(['name']) if 'python' in p.info['name'].lower()])
}
}
except Exception as e:
self.logger.error(f"System resources check failed: {e}")
return {"error": str(e)}
async def _check_application_health(self) -> Dict:
"""Check application specific health"""
try:
# Check if app.py is running
app_running = any('app.py' in p.info().get('cmdline', []) for p in psutil.process_iter(['cmdline']))
# Check logs directory
logs_exist = os.path.exists('logs')
data_dir_exist = os.path.exists('data')
# Check recent errors in main log
error_count = 0
if os.path.exists(self.config.log_file):
with open(self.config.log_file, 'r') as f:
lines = f.readlines()[-100:] # Last 100 lines
error_count = sum(1 for line in lines if 'ERROR' in line.upper())
return {
"application_running": app_running,
"directories": {
"logs": logs_exist,
"data": data_dir_exist
},
"recent_errors": error_count,
"uptime_estimate": "N/A" # Can be enhanced with process start time
}
except Exception as e:
self.logger.error(f"Application health check failed: {e}")
return {"error": str(e)}
async def _check_external_services(self) -> Dict:
"""Check Groq API, TiDB, and other external services"""
services = {
"groq_api": {"status": "UNKNOWN", "latency_ms": 0},
"tidb_database": {"status": "UNKNOWN", "connected": False}
}
# Check Groq API
try:
import os
from groq import Groq
start = time.time()
client = Groq(api_key=os.environ.get("GROQ_API_KEY"))
test = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": "ping"}],
max_tokens=1,
timeout=10
)
services["groq_api"] = {
"status": "HEALTHY",
"latency_ms": round((time.time() - start) * 1000, 2),
"model_available": True
}
except Exception as e:
services["groq_api"] = {
"status": "UNHEALTHY",
"error": str(e),
"latency_ms": 0
}
# Check TiDB (if memory_db exists)
try:
from memory_db import tidb_memory
services["tidb_database"] = {
"status": "HEALTHY",
"connected": True,
"type": "TiDB Cloud"
}
except ImportError:
services["tidb_database"] = {
"status": "NOT_CONFIGURED",
"connected": False,
"note": "memory_db module not found"
}
except Exception as e:
services["tidb_database"] = {
"status": "UNHEALTHY",
"connected": False,
"error": str(e)
}
return services
async def _analyze_logs(self) -> Dict:
"""Analyze application logs for patterns and errors"""
try:
if not os.path.exists(self.config.log_file):
return {"error": "Log file not found", "file": self.config.log_file}
with open(self.config.log_file, 'r') as f:
lines = f.readlines()[-500:] # Last 500 lines
analysis = {
"total_lines_analyzed": len(lines),
"error_count": sum(1 for line in lines if 'ERROR' in line.upper()),
"warning_count": sum(1 for line in lines if 'WARNING' in line.upper()),
"info_count": sum(1 for line in lines if 'INFO' in line.upper()),
"recent_errors": [],
"log_file_size_mb": round(os.path.getsize(self.config.log_file) / (1024*1024), 3)
}
# Extract recent errors
for line in lines[-20:]: # Last 20 lines
if 'ERROR' in line.upper():
analysis["recent_errors"].append(line.strip())
if len(analysis["recent_errors"]) >= 5:
break
return analysis
except Exception as e:
return {"error": f"Log analysis failed: {str(e)}"}
async def _collect_performance_metrics(self) -> Dict:
"""Collect performance metrics and statistics"""
try:
state = self.state.load_state() if hasattr(self, 'state') else {}
return {
"total_sessions": len(state.get("sessions", [])),
"total_tasks_processed": len(state.get("metrics", {}).get("tasks", [])),
"average_response_time": "N/A", # Can be calculated from actual data
"peak_memory_usage_mb": "N/A",
"system_uptime": "N/A",
"last_diagnostics": state.get("diagnostics_history", [])[-1]["timestamp"] if state.get("diagnostics_history") else "N/A"
}
except Exception as e:
return {"error": str(e)}
def _calculate_health_score(self, report: Dict) -> int:
"""Calculate overall health score 0-100"""
score = 100
# Deduct points for issues
if report["sections"]["system_resources"].get("memory", {}).get("used_percent", 0) > 90:
score -= 20
if report["sections"]["system_resources"].get("disk", {}).get("used_percent", 0) > 90:
score -= 15
if report["sections"]["application"].get("recent_errors", 0) > 5:
score -= 10 * min(report["sections"]["application"]["recent_errors"], 10)
if report["sections"]["external_services"].get("groq_api", {}).get("status") != "HEALTHY":
score -= 25
if report["sections"]["external_services"].get("tidb_database", {}).get("status") != "HEALTHY":
score -= 15
return max(0, min(100, score))
def _save_diagnostics_to_history(self, report: Dict):
"""Save diagnostics report to history"""
try:
state = self.state.load_state() if hasattr(self, 'state') else {"diagnostics_history": []}
history = state.get("diagnostics_history", [])
# Keep only last 10 reports
history.append({
"timestamp": report["timestamp"],
"health_score": report["health_score"],
"status": report["status"],
"summary": f"Health: {report['health_score']}/100, Status: {report['status']}"
})
if len(history) > 10:
history = history[-10:]
state["diagnostics_history"] = history
self.state.save_state(state)
except Exception as e:
self.logger.error(f"Failed to save diagnostics history: {e}")
# ==========================================
# 8. ASYNC AI TASK PROCESSOR
# ==========================================
class TaskProcessor(IEngine):
def __init__(self, config: SystemConfig):
self.config = config
self.logger = AumLogger("TaskProcessor", config.log_file)
self.thread_pool = ThreadPoolExecutor(max_workers=config.max_workers)
self.process_pool = ProcessPoolExecutor(max_workers=2)
self.active_tasks = 0
async def startup(self):
self.logger.info(f"System Version {self.config.version} starting up...")
self.logger.info(f"CPU Workers available: {self.config.max_workers}")
await asyncio.sleep(1) # Simulated hardware check
async def shutdown(self):
self.logger.info("Cleaning up resources...")
self.thread_pool.shutdown(wait=True)
self.process_pool.shutdown(wait=True)
@monitor_system_resources
async def process_request(self, payload: Dict):
self.active_tasks += 1
request_id = payload.get("id", str(uuid.uuid4()))
try:
loop = asyncio.get_running_loop()
# Offloading heavy computation to process pool to avoid blocking Event Loop
result = await loop.run_in_executor(
self.process_pool,
self._compute_heavy_logic,
payload.get("data")
)
return {"id": request_id, "result": result, "timestamp": str(datetime.now())}
except Exception as e:
self.logger.error(f"Processing failed: {str(e)}")
return {"error": "Processing Error", "id": request_id}
finally:
self.active_tasks -= 1
@staticmethod
def _compute_heavy_logic(data: str) -> str:
"""Isolated function for CPU intensive work"""
time.sleep(2) # Simulated Neural Network Inference
return f"PROCESSED_DATA_{data.upper()}"
# ==========================================
# 9. MASTER ORCHESTRATOR (UPDATED WITH DIAGNOSTICS)
# ==========================================
class AumCoreMaster:
def __init__(self):
self.config = SystemConfig()
self.logger = AumLogger("Master", self.config.log_file)
self.state = StateManager(self.config.db_path)
self.processor = TaskProcessor(self.config)
self.diagnostics = DiagnosticsEngine(self.config) # NEW: Diagnostics Engine
self.diagnostics.state = self.state # Share state manager
self.is_running = True
# Register OS Signals for Graceful Exit
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, sig, frame):
self.logger.warning(f"Signal {sig} received. Shutting down...")
self.is_running = False
async def run_forever(self):
await self.processor.startup()
self.logger.info("Master loop started. Listening for tasks...")
# Run initial diagnostics
try:
initial_report = await self.diagnostics.run_full_diagnostics()
self.logger.info(f"Initial diagnostics: Health Score {initial_report['health_score']}/100")
except Exception as e:
self.logger.error(f"Initial diagnostics failed: {e}")
try:
while self.is_running:
# In a real app, this would be a web server or message queue listener
dummy_input = f"input_batch_{int(time.time())}"
response = await self.processor.process_request({"data": dummy_input})
# Update State
current_state = self.state.load_state()
current_state["sessions"].append(response)
self.state.save_state(current_state)
self.logger.info(f"Task Handled: {response['id']}")
await asyncio.sleep(5) # Throttling for demo
except Exception as e:
self.logger.error(f"Critical System Failure: {str(e)}")
finally:
await self.processor.shutdown()
# ==========================================
# 10. DIAGNOSTICS API ENDPOINT FUNCTION
# ==========================================
async def get_system_diagnostics() -> Dict:
"""Public function to get system diagnostics (for app.py integration)"""
try:
master = AumCoreMaster()
report = await master.diagnostics.run_full_diagnostics()
return {
"success": True,
"diagnostics": report,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"success": False,
"error": str(e),
"timestamp": datetime.now().isoformat()
}
# ==========================================
# 11. EXECUTION ENTRY POINT
# ==========================================
async def bootstrap():
"""Main Entry point with proper lifecycle management"""
master = AumCoreMaster()
await master.run_forever()
if __name__ == "__main__":
try:
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(bootstrap())
except KeyboardInterrupt:
pass
except Exception as e:
print(f"FATAL ERROR: {e}")
# ==========================================
# 12. REGISTER MODULE FUNCTION (FIXED LINE 539)
# ==========================================
def register_module(app, client, username):
"""Register orchestrator module with FastAPI app"""
from fastapi import APIRouter
router = APIRouter()
@router.get("/orchestrator/health")
async def orchestrator_health():
return {
"module": "orchestrator",
"status": "active",
"version": "2.0.1-Stable",
"endpoints": ["/orchestrator/health", "/orchestrator/diagnostics", "/system/task", "/system/status"]
}
@router.get("/orchestrator/diagnostics")
async def module_diagnostics():
"""Diagnostics endpoint - FIXED: No circular import"""
# DIRECTLY CALL THE FUNCTION DEFINED ABOVE (line ~526)
return await get_system_diagnostics()
app.include_router(router)
print("✅ Orchestrator module registered with FastAPI")