Spaces:
Sleeping
Sleeping
| """ | |
| ============================================ | |
| RUHI-CORE - Process Manager (THE HEART) | |
| Auto-restart, Health Check, Live Logs | |
| ============================================ | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import uuid | |
| import signal | |
| import asyncio | |
| import subprocess | |
| import time | |
| from datetime import datetime | |
| from typing import Dict, Optional, List | |
| from pathlib import Path | |
| from collections import deque | |
| import psutil | |
| import aiosqlite | |
| from loguru import logger | |
| from core.config import settings | |
| from core.port_manager import port_manager | |
| class ServiceProcess: | |
| """Represents a single managed service/process""" | |
| def __init__(self, service_id: str, name: str, **kwargs): | |
| self.id = service_id | |
| self.name = name | |
| self.type = kwargs.get("type", "web") # web, worker, bot, cron | |
| self.language = kwargs.get("language", "python") | |
| self.entry_file = kwargs.get("entry_file", "main.py") | |
| self.command = kwargs.get("command", "") | |
| self.app_dir = kwargs.get("app_dir", str(settings.APPS_DIR / name)) | |
| self.port = kwargs.get("port", None) | |
| self.env_vars = kwargs.get("env_vars", {}) | |
| self.auto_restart = kwargs.get("auto_restart", True) | |
| self.max_memory_mb = kwargs.get("max_memory_mb", 512) | |
| self.description = kwargs.get("description", "") | |
| # Runtime state | |
| self.process: Optional[subprocess.Popen] = None | |
| self.pid: int = 0 | |
| self.status: str = "stopped" # stopped, starting, running, crashed, restarting | |
| self.restart_count: int = 0 | |
| self.last_started: Optional[datetime] = None | |
| self.last_stopped: Optional[datetime] = None | |
| self.uptime_start: Optional[float] = None | |
| # Log buffer (keep last 1000 lines in memory) | |
| self.log_buffer: deque = deque(maxlen=1000) | |
| self._log_task: Optional[asyncio.Task] = None | |
| self._monitor_task: Optional[asyncio.Task] = None | |
| def _build_command(self) -> List[str]: | |
| """Build the command to run the service""" | |
| if self.command: | |
| return self.command.split() | |
| entry_path = os.path.join(self.app_dir, self.entry_file) | |
| if self.language == "python": | |
| return [sys.executable, "-u", entry_path] | |
| elif self.language == "node" or self.language == "javascript": | |
| return ["node", entry_path] | |
| elif self.language == "shell" or self.language == "bash": | |
| return ["bash", entry_path] | |
| else: | |
| return [sys.executable, "-u", entry_path] | |
| def _build_env(self) -> dict: | |
| """Build environment variables for the process""" | |
| env = os.environ.copy() | |
| env.update({ | |
| "RUHI_SERVICE_ID": self.id, | |
| "RUHI_SERVICE_NAME": self.name, | |
| "RUHI_SERVICE_TYPE": self.type, | |
| "RUHI_DATA_DIR": str(settings.DATA_DIR), | |
| "PORT": str(self.port) if self.port else "8000", | |
| }) | |
| # Add custom env vars | |
| if isinstance(self.env_vars, dict): | |
| env.update(self.env_vars) | |
| elif isinstance(self.env_vars, str): | |
| try: | |
| env.update(json.loads(self.env_vars)) | |
| except json.JSONDecodeError: | |
| pass | |
| return env | |
| async def start(self) -> bool: | |
| """Start the service process""" | |
| if self.status == "running" and self.process and self.process.poll() is None: | |
| logger.warning(f"⚠️ Service '{self.name}' is already running (PID: {self.pid})") | |
| return True | |
| # Allocate port for web services | |
| if self.type == "web" and not self.port: | |
| self.port = port_manager.allocate(self.id) | |
| cmd = self._build_command() | |
| env = self._build_env() | |
| # Ensure app directory exists | |
| os.makedirs(self.app_dir, exist_ok=True) | |
| # Check if entry file exists | |
| if not self.command: | |
| entry_path = os.path.join(self.app_dir, self.entry_file) | |
| if not os.path.exists(entry_path): | |
| error_msg = f"Entry file not found: {entry_path}" | |
| logger.error(f"❌ {error_msg}") | |
| self.log_buffer.append(f"[ERROR] {error_msg}") | |
| self.status = "crashed" | |
| return False | |
| try: | |
| self.status = "starting" | |
| log_msg = f"🚀 Starting service '{self.name}' with command: {' '.join(cmd)}" | |
| logger.info(log_msg) | |
| self.log_buffer.append(f"[SYSTEM] {log_msg}") | |
| # Create log file | |
| log_file_path = settings.LOGS_DIR / f"{self.name}.log" | |
| self.process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| cwd=self.app_dir, | |
| env=env, | |
| bufsize=1, | |
| universal_newlines=True, | |
| preexec_fn=os.setsid if os.name != 'nt' else None | |
| ) | |
| self.pid = self.process.pid | |
| self.status = "running" | |
| self.last_started = datetime.now() | |
| self.uptime_start = time.time() | |
| logger.info(f"✅ Service '{self.name}' started successfully (PID: {self.pid}, Port: {self.port})") | |
| self.log_buffer.append(f"[SYSTEM] ✅ Started successfully (PID: {self.pid})") | |
| # Start log reader task | |
| self._log_task = asyncio.create_task(self._read_logs()) | |
| # Start monitor task | |
| self._monitor_task = asyncio.create_task(self._monitor_process()) | |
| # Save to DB | |
| await self._save_to_db() | |
| return True | |
| except Exception as e: | |
| error_msg = f"Failed to start service '{self.name}': {str(e)}" | |
| logger.error(f"❌ {error_msg}") | |
| self.log_buffer.append(f"[ERROR] {error_msg}") | |
| self.status = "crashed" | |
| return False | |
| async def stop(self) -> bool: | |
| """Stop the service process""" | |
| if not self.process: | |
| self.status = "stopped" | |
| return True | |
| try: | |
| logger.info(f"🛑 Stopping service '{self.name}' (PID: {self.pid})...") | |
| self.log_buffer.append(f"[SYSTEM] 🛑 Stopping service...") | |
| # Cancel tasks | |
| if self._log_task and not self._log_task.done(): | |
| self._log_task.cancel() | |
| if self._monitor_task and not self._monitor_task.done(): | |
| self._monitor_task.cancel() | |
| # Graceful shutdown | |
| try: | |
| if os.name != 'nt': | |
| os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) | |
| else: | |
| self.process.terminate() | |
| except ProcessLookupError: | |
| pass | |
| # Wait for process to end (max 10 seconds) | |
| try: | |
| self.process.wait(timeout=10) | |
| except subprocess.TimeoutExpired: | |
| logger.warning(f"⚠️ Force killing service '{self.name}'") | |
| try: | |
| if os.name != 'nt': | |
| os.killpg(os.getpgid(self.process.pid), signal.SIGKILL) | |
| else: | |
| self.process.kill() | |
| except ProcessLookupError: | |
| pass | |
| self.status = "stopped" | |
| self.last_stopped = datetime.now() | |
| self.process = None | |
| self.pid = 0 | |
| # Release port | |
| if self.type == "web": | |
| port_manager.release(self.id) | |
| logger.info(f"✅ Service '{self.name}' stopped successfully") | |
| self.log_buffer.append(f"[SYSTEM] ✅ Service stopped") | |
| await self._save_to_db() | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Error stopping service '{self.name}': {str(e)}") | |
| self.status = "crashed" | |
| return False | |
| async def restart(self) -> bool: | |
| """Restart the service""" | |
| self.status = "restarting" | |
| self.log_buffer.append(f"[SYSTEM] 🔄 Restarting service...") | |
| await self.stop() | |
| await asyncio.sleep(2) | |
| self.restart_count += 1 | |
| return await self.start() | |
| async def _read_logs(self): | |
| """Continuously read stdout/stderr from the process""" | |
| try: | |
| log_file = settings.LOGS_DIR / f"{self.name}.log" | |
| while self.process and self.process.poll() is None: | |
| line = await asyncio.get_event_loop().run_in_executor( | |
| None, self.process.stdout.readline | |
| ) | |
| if line: | |
| line = line.strip() | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| formatted = f"[{timestamp}] {line}" | |
| self.log_buffer.append(formatted) | |
| # Write to log file | |
| try: | |
| with open(log_file, "a", encoding="utf-8") as f: | |
| f.write(formatted + "\n") | |
| except Exception: | |
| pass | |
| else: | |
| await asyncio.sleep(0.1) | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception as e: | |
| logger.error(f"Log reader error for '{self.name}': {str(e)}") | |
| async def _monitor_process(self): | |
| """Monitor process health and auto-restart if needed""" | |
| try: | |
| while True: | |
| await asyncio.sleep(settings.HEALTH_CHECK_INTERVAL) | |
| if self.process is None: | |
| break | |
| # Check if process is still running | |
| poll_result = self.process.poll() | |
| if poll_result is not None: | |
| exit_code = poll_result | |
| logger.warning(f"💀 Service '{self.name}' exited with code {exit_code}") | |
| self.log_buffer.append(f"[SYSTEM] 💀 Process exited with code {exit_code}") | |
| self.status = "crashed" | |
| # Auto-restart logic | |
| if self.auto_restart and self.restart_count < settings.MAX_RESTART_ATTEMPTS: | |
| logger.info(f"🔄 Auto-restarting '{self.name}' in {settings.AUTO_RESTART_DELAY}s (attempt {self.restart_count + 1})") | |
| self.log_buffer.append(f"[SYSTEM] 🔄 Auto-restarting in {settings.AUTO_RESTART_DELAY}s...") | |
| await asyncio.sleep(settings.AUTO_RESTART_DELAY) | |
| self.restart_count += 1 | |
| self.process = None | |
| await self.start() | |
| else: | |
| if self.restart_count >= settings.MAX_RESTART_ATTEMPTS: | |
| self.log_buffer.append(f"[SYSTEM] ❌ Max restart attempts reached. Service stopped.") | |
| logger.error(f"❌ Service '{self.name}' exceeded max restart attempts") | |
| break | |
| # Memory check | |
| try: | |
| proc = psutil.Process(self.pid) | |
| memory_mb = proc.memory_info().rss / (1024 * 1024) | |
| if memory_mb > self.max_memory_mb: | |
| logger.warning(f"⚠️ Service '{self.name}' exceeding memory limit ({memory_mb:.0f}MB > {self.max_memory_mb}MB)") | |
| self.log_buffer.append(f"[WARNING] Memory usage: {memory_mb:.0f}MB / {self.max_memory_mb}MB") | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| pass | |
| except asyncio.CancelledError: | |
| pass | |
| except Exception as e: | |
| logger.error(f"Monitor error for '{self.name}': {str(e)}") | |
| async def _save_to_db(self): | |
| """Save service state to database""" | |
| try: | |
| async with aiosqlite.connect(str(settings.DB_PATH)) as db: | |
| await db.execute(""" | |
| INSERT OR REPLACE INTO services | |
| (id, name, type, language, entry_file, port, status, auto_restart, | |
| env_vars, app_dir, command, description, max_memory_mb, | |
| restart_count, last_started, last_stopped, pid, updated_at) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| self.id, self.name, self.type, self.language, self.entry_file, | |
| self.port, self.status, int(self.auto_restart), | |
| json.dumps(self.env_vars) if isinstance(self.env_vars, dict) else self.env_vars, | |
| self.app_dir, self.command, self.description, self.max_memory_mb, | |
| self.restart_count, | |
| self.last_started.isoformat() if self.last_started else None, | |
| self.last_stopped.isoformat() if self.last_stopped else None, | |
| self.pid, datetime.now().isoformat() | |
| )) | |
| await db.commit() | |
| except Exception as e: | |
| logger.error(f"DB save error: {str(e)}") | |
| def get_info(self) -> dict: | |
| """Get complete service information""" | |
| uptime = 0 | |
| cpu_percent = 0 | |
| memory_mb = 0 | |
| if self.process and self.process.poll() is None and self.pid: | |
| try: | |
| proc = psutil.Process(self.pid) | |
| cpu_percent = proc.cpu_percent(interval=0.1) | |
| memory_mb = proc.memory_info().rss / (1024 * 1024) | |
| if self.uptime_start: | |
| uptime = int(time.time() - self.uptime_start) | |
| except (psutil.NoSuchProcess, psutil.AccessDenied): | |
| pass | |
| return { | |
| "id": self.id, | |
| "name": self.name, | |
| "type": self.type, | |
| "language": self.language, | |
| "entry_file": self.entry_file, | |
| "command": self.command, | |
| "app_dir": self.app_dir, | |
| "port": self.port, | |
| "status": self.status, | |
| "pid": self.pid, | |
| "auto_restart": self.auto_restart, | |
| "restart_count": self.restart_count, | |
| "uptime_seconds": uptime, | |
| "uptime_formatted": self._format_uptime(uptime), | |
| "cpu_percent": round(cpu_percent, 1), | |
| "memory_mb": round(memory_mb, 1), | |
| "max_memory_mb": self.max_memory_mb, | |
| "last_started": self.last_started.isoformat() if self.last_started else None, | |
| "last_stopped": self.last_stopped.isoformat() if self.last_stopped else None, | |
| "env_vars": self.env_vars, | |
| "description": self.description, | |
| "recent_logs": list(self.log_buffer)[-50:] | |
| } | |
| def _format_uptime(seconds: int) -> str: | |
| """Format uptime in human readable format""" | |
| if seconds <= 0: | |
| return "0s" | |
| days = seconds // 86400 | |
| hours = (seconds % 86400) // 3600 | |
| minutes = (seconds % 3600) // 60 | |
| secs = seconds % 60 | |
| parts = [] | |
| if days > 0: | |
| parts.append(f"{days}d") | |
| if hours > 0: | |
| parts.append(f"{hours}h") | |
| if minutes > 0: | |
| parts.append(f"{minutes}m") | |
| parts.append(f"{secs}s") | |
| return " ".join(parts) | |
| class ProcessManager: | |
| """ | |
| Central Process Manager - Manages all services | |
| This is the BRAIN of RUHI-CORE | |
| """ | |
| def __init__(self): | |
| self.services: Dict[str, ServiceProcess] = {} | |
| self._initialized = False | |
| logger.info("🧠 ProcessManager initialized") | |
| async def initialize(self): | |
| """Load existing services from database""" | |
| if self._initialized: | |
| return | |
| try: | |
| await init_db_safe() | |
| async with aiosqlite.connect(str(settings.DB_PATH)) as db: | |
| db.row_factory = aiosqlite.Row | |
| async with db.execute("SELECT * FROM services") as cursor: | |
| rows = await cursor.fetchall() | |
| for row in rows: | |
| service = ServiceProcess( | |
| service_id=row["id"], | |
| name=row["name"], | |
| type=row["type"], | |
| language=row["language"], | |
| entry_file=row["entry_file"], | |
| port=row["port"], | |
| auto_restart=bool(row["auto_restart"]), | |
| env_vars=row["env_vars"], | |
| app_dir=row["app_dir"], | |
| command=row["command"] or "", | |
| description=row["description"] or "", | |
| max_memory_mb=row["max_memory_mb"] | |
| ) | |
| service.restart_count = row["restart_count"] or 0 | |
| self.services[service.id] = service | |
| logger.info(f"📦 Loaded service: {service.name} ({service.type})") | |
| self._initialized = True | |
| logger.info(f"✅ ProcessManager loaded {len(self.services)} services from database") | |
| except Exception as e: | |
| logger.error(f"❌ ProcessManager initialization error: {str(e)}") | |
| self._initialized = True # Mark as initialized to prevent infinite loops | |
| async def create_service(self, name: str, **kwargs) -> ServiceProcess: | |
| """Create a new service""" | |
| if len(self.services) >= settings.MAX_SERVICES: | |
| raise ValueError(f"Maximum services limit reached ({settings.MAX_SERVICES})") | |
| # Check duplicate name | |
| for svc in self.services.values(): | |
| if svc.name == name: | |
| raise ValueError(f"Service '{name}' already exists") | |
| service_id = str(uuid.uuid4())[:12] | |
| app_dir = str(settings.APPS_DIR / name) | |
| os.makedirs(app_dir, exist_ok=True) | |
| service = ServiceProcess( | |
| service_id=service_id, | |
| name=name, | |
| app_dir=app_dir, | |
| **kwargs | |
| ) | |
| self.services[service_id] = service | |
| await service._save_to_db() | |
| logger.info(f"✅ Service '{name}' created (ID: {service_id})") | |
| return service | |
| async def delete_service(self, service_id: str) -> bool: | |
| """Delete a service""" | |
| service = self.services.get(service_id) | |
| if not service: | |
| raise ValueError(f"Service {service_id} not found") | |
| # Stop if running | |
| if service.status == "running": | |
| await service.stop() | |
| # Release port | |
| port_manager.release(service_id) | |
| # Remove from memory | |
| del self.services[service_id] | |
| # Remove from database | |
| try: | |
| async with aiosqlite.connect(str(settings.DB_PATH)) as db: | |
| await db.execute("DELETE FROM services WHERE id = ?", (service_id,)) | |
| await db.execute("DELETE FROM service_logs WHERE service_id = ?", (service_id,)) | |
| await db.commit() | |
| except Exception as e: | |
| logger.error(f"DB delete error: {str(e)}") | |
| logger.info(f"🗑️ Service '{service.name}' deleted") | |
| return True | |
| async def start_service(self, service_id: str) -> bool: | |
| """Start a specific service""" | |
| service = self.services.get(service_id) | |
| if not service: | |
| raise ValueError(f"Service {service_id} not found") | |
| return await service.start() | |
| async def stop_service(self, service_id: str) -> bool: | |
| """Stop a specific service""" | |
| service = self.services.get(service_id) | |
| if not service: | |
| raise ValueError(f"Service {service_id} not found") | |
| return await service.stop() | |
| async def restart_service(self, service_id: str) -> bool: | |
| """Restart a specific service""" | |
| service = self.services.get(service_id) | |
| if not service: | |
| raise ValueError(f"Service {service_id} not found") | |
| return await service.restart() | |
| async def start_all(self): | |
| """Start all services that were previously running""" | |
| for service in self.services.values(): | |
| if service.auto_restart: | |
| await service.start() | |
| await asyncio.sleep(1) # Stagger starts | |
| async def stop_all(self): | |
| """Stop all running services""" | |
| for service in self.services.values(): | |
| if service.status == "running": | |
| await service.stop() | |
| def get_service(self, service_id: str) -> Optional[ServiceProcess]: | |
| """Get a service by ID""" | |
| return self.services.get(service_id) | |
| def get_service_by_name(self, name: str) -> Optional[ServiceProcess]: | |
| """Get a service by name""" | |
| for service in self.services.values(): | |
| if service.name == name: | |
| return service | |
| return None | |
| def get_all_services(self) -> List[dict]: | |
| """Get info of all services""" | |
| return [svc.get_info() for svc in self.services.values()] | |
| def get_stats(self) -> dict: | |
| """Get summary statistics""" | |
| running = sum(1 for s in self.services.values() if s.status == "running") | |
| stopped = sum(1 for s in self.services.values() if s.status == "stopped") | |
| crashed = sum(1 for s in self.services.values() if s.status == "crashed") | |
| return { | |
| "total_services": len(self.services), | |
| "running": running, | |
| "stopped": stopped, | |
| "crashed": crashed, | |
| "max_services": settings.MAX_SERVICES, | |
| "available_ports": port_manager.available_count, | |
| "port_allocations": port_manager.get_all_allocations() | |
| } | |
| async def init_db_safe(): | |
| """Safe database initialization""" | |
| try: | |
| from core.config import init_db | |
| await init_db() | |
| except Exception as e: | |
| logger.warning(f"DB init warning: {str(e)}") | |
| # Global instance | |
| process_manager = ProcessManager() | |