Spaces:
Running
Running
| import socket | |
| import threading | |
| import json | |
| import time | |
| import os | |
| import ctypes | |
| class JarvisClusterNode: | |
| """Manages Imperial Fleet UDP discovery and shared cluster configuration.""" | |
| def __init__(self, workspace_dir=None, port=18791, broadcast_interval=5, main_url=None): | |
| self.port = port | |
| self.broadcast_interval = broadcast_interval | |
| self.workspace_dir = workspace_dir | |
| self.main_url = main_url or os.environ.get("JARVIS_MAIN_URL") | |
| if self.workspace_dir: | |
| self.cluster_file = os.path.join(self.workspace_dir, "JARVIS_CLUSTER.json") | |
| else: | |
| self.cluster_file = None # Cloud Mode: No local file dependency | |
| # [🔱 V4.3 Ultimate Hang Fix] DO NOT call ANY network functions (socket/requests) in __init__. | |
| # Hostname remains safe as it is usually a cached system property. | |
| self.hostname = os.environ.get("JARVIS_NODE_ID", socket.gethostname()) | |
| self.ip_address = "Detecting..." # Deferred to start() | |
| self.local_ips = ["127.0.0.1"] # Deferred to start() | |
| self.active_nodes = {} # {hostname: {ip, last_seen, role, ...}} | |
| self.manual_peers = [] # List of explicit IPs to heartbeat towards | |
| self._running = False | |
| self._lock = threading.Lock() | |
| self._last_file_sync = 0 # New: Track file sync frequency | |
| # Absolute path log for user verification | |
| if self.cluster_file: | |
| print(f"🛰️ [Imperial Fleet] Cluster Registry: {os.path.abspath(self.cluster_file)}") | |
| else: | |
| print(f"🛰️ [Imperial Fleet] Cloud Mode Active: Syncing with {self.main_url}") | |
| # Ensure default config exists (only if file-based) | |
| if self.cluster_file: | |
| self._ensure_config() | |
| # [🔱 V4.3 Hang Fix] Moved _purge_stagnant_nodes to initial_run thread | |
| def _get_local_ip(self): | |
| try: | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| s.connect(("8.8.8.8", 80)) | |
| ip = s.getsockname()[0] | |
| s.close() | |
| return ip | |
| except Exception: | |
| return "127.0.0.1" | |
| def _get_all_local_ips(self): | |
| """Get all IPv4 addresses assigned to this machine to filter out self-packets.""" | |
| ips = set(["127.0.0.1"]) | |
| try: | |
| # Add primary IP | |
| primary = self._get_local_ip() | |
| ips.add(primary) | |
| # Add all other IPs associated with hostname (includes virtual adapters) | |
| hostname = socket.gethostname() | |
| _, _, ip_list = socket.gethostbyname_ex(hostname) | |
| for ip in ip_list: | |
| ips.add(ip) | |
| except Exception: | |
| pass | |
| return list(ips) | |
| def _ensure_config(self): | |
| """Initialize the JARVIS_CLUSTER.json if it doesn't exist.""" | |
| if not os.path.exists(self.cluster_file): | |
| default_config = { | |
| "nodes": { | |
| self.hostname: { | |
| "ip": self.ip_address, | |
| "role": "MAIN", | |
| "last_seen": int(time.time()) | |
| } | |
| }, | |
| "allocations": { | |
| "orpheus": self.hostname, # Default to self | |
| "aizen": "ANY", | |
| "gilgamesh": "ANY" | |
| } | |
| } | |
| self._save_config(default_config) | |
| def get_config(self): | |
| """Read the shared cluster configuration (from file or API) and merge with live memory state.""" | |
| config = {"nodes": {}, "allocations": {}, "allocations_status": {}} | |
| # CLOUD MODE: If main_url is set and no workspace_dir (or worker role), pull from API | |
| if self.main_url and (not self.workspace_dir or self.hostname != "ga403wr"): | |
| try: | |
| import requests | |
| response = requests.get(f"{self.main_url.rstrip('/')}/api/cluster/nodes", timeout=3) | |
| if response.ok: | |
| return response.json() | |
| except Exception as e: | |
| # If API fails, fall back to memory or file if available | |
| pass | |
| with self._lock: | |
| if not self.cluster_file: | |
| # In Cloud Mode without a local file, we just return the base config with memory merges | |
| pass | |
| else: | |
| max_retries = 3 | |
| for i in range(max_retries): | |
| try: | |
| if os.path.exists(self.cluster_file): | |
| file_size = os.path.getsize(self.cluster_file) | |
| if file_size == 0 and i < max_retries - 1: | |
| time.sleep(0.1) | |
| continue | |
| with open(self.cluster_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if isinstance(data, dict): | |
| config.update(data) | |
| break | |
| except (json.JSONDecodeError, IOError): | |
| if i < max_retries - 1: | |
| time.sleep(0.1) | |
| continue | |
| # CRITICAL: Always merge live nodes from memory so the UI sees them immediately | |
| # even if file-saving is failing due to network locks. | |
| # Also, if any discovered node is MAIN, adopt its allocations as the current truth. | |
| for host, data in self.active_nodes.items(): | |
| # Update node list | |
| if host not in config["nodes"] or data.get("last_seen", 0) > config["nodes"][host].get("last_seen", 0): | |
| config["nodes"][host] = data | |
| # ADOPTION LOGIC: If we see a fresher command state from a MAIN node via UDP, use it! | |
| if data.get("role") == "MAIN": | |
| peer_time = data.get("last_seen", 0) | |
| # We only adopt if the UDP heartbeat is as fresh or fresher than the file we just read | |
| # (Note: this handles cases where the file system is lagging behind the network) | |
| if peer_time > 0: | |
| if "allocations" in data: | |
| config["allocations"] = data["allocations"] | |
| # Merging status: Only take global status if we aren't a WORKER reporting our own | |
| if self.hostname == "ga403wr" and "allocations_status" in data: | |
| for guardian, status in data.get("allocations_status", {}).items(): | |
| config["allocations_status"][guardian] = status | |
| # STATUS MERGING: If a WORKER node is reporting its own status for a guardian allocated to it, prioritize it. | |
| elif data.get("role") == "WORKER": | |
| for guardian, status in data.get("allocations_status", {}).items(): | |
| # Only take status from the node actually allocated to run this guardian | |
| if config.get("allocations", {}).get(guardian) == host: | |
| config["allocations_status"][guardian] = status | |
| # Ensure self is always present | |
| if self.hostname not in config["nodes"]: | |
| config["nodes"][self.hostname] = {"ip": self.ip_address, "role": "MAIN" if self.hostname == "ga403wr" else "WORKER", "last_seen": int(time.time())} | |
| return config | |
| def _save_config(self, config): | |
| """Write the shared cluster configuration to disk using an atomic operation with retries.""" | |
| if not self.cluster_file: | |
| return | |
| with self._lock: | |
| temp_file = self.cluster_file + ".tmp" | |
| try: | |
| os.makedirs(os.path.dirname(self.cluster_file), exist_ok=True) | |
| # Write to temporary file first | |
| with open(temp_file, 'w', encoding='utf-8') as f: | |
| json.dump(config, f, indent=4, ensure_ascii=False) | |
| # Try atomic replace with retries for WinError 5 (Access Denied) | |
| replace_retries = 10 | |
| for i in range(replace_retries): | |
| try: | |
| # Atomic replace (prevents partial reads by other nodes) | |
| if os.path.exists(self.cluster_file): | |
| os.remove(self.cluster_file) | |
| os.rename(temp_file, self.cluster_file) | |
| return # Success | |
| except (PermissionError, OSError): | |
| if i < replace_retries - 1: | |
| time.sleep(0.3) | |
| continue | |
| raise # Re-raise on final failure | |
| except Exception as e: | |
| print(f"🛑 [Imperial Fleet] Error saving cluster config: {e}") | |
| if os.path.exists(temp_file): | |
| try: os.remove(temp_file) | |
| except: pass | |
| def _purge_stagnant_nodes(self): | |
| """[🔱 Imperial] Purge nodes that haven't been seen in > 24h to prevent ghost mappings.""" | |
| if not self.cluster_file or not os.path.exists(self.cluster_file): return | |
| try: | |
| config = self.get_config() | |
| now = int(time.time()) | |
| stale_threshold = 86400 # 24 hours | |
| nodes = config.get("nodes", {}) | |
| to_remove = [] | |
| for host, data in nodes.items(): | |
| if host != self.hostname: | |
| if now - data.get("last_seen", 0) > stale_threshold: | |
| to_remove.append(host) | |
| if to_remove: | |
| print(f"🧹 [Imperial Fleet] Purging {len(to_remove)} stagnant nodes: {to_remove}") | |
| for host in to_remove: | |
| del config["nodes"][host] | |
| self._save_config(config) | |
| except Exception as e: | |
| print(f"⚠️ [Imperial Fleet] Stagnant purge failed: {e}") | |
| def update_allocation(self, guardian_name, target_hostname): | |
| """Update the guardian allocation mapping.""" | |
| config = self.get_config() | |
| if "allocations" not in config: | |
| config["allocations"] = {} | |
| config["allocations"][guardian_name] = target_hostname | |
| # When allocation changes, reset status logic here | |
| if "allocations_status" not in config: | |
| config["allocations_status"] = {} | |
| config["allocations_status"][guardian_name] = "Pending (배치 전송 중...)" | |
| self._save_config(config) | |
| def update_allocation_status(self, guardian_name, status_message): | |
| """Update the deployment status message for a specific guardian.""" | |
| # 1. Update memory state (Critical for Cloud Mode/Workers) | |
| with self._lock: | |
| if self.hostname not in self.active_nodes: | |
| self.active_nodes[self.hostname] = {"ip": self.ip_address, "allocations_status": {}} | |
| if "allocations_status" not in self.active_nodes[self.hostname]: | |
| self.active_nodes[self.hostname]["allocations_status"] = {} | |
| self.active_nodes[self.hostname]["allocations_status"][guardian_name] = status_message | |
| # 2. Update local file config if applicable (Main node or shared mode) | |
| config = self.get_config() | |
| if "allocations_status" not in config: | |
| config["allocations_status"] = {} | |
| config["allocations_status"][guardian_name] = status_message | |
| self._save_config(config) | |
| def get_guardian_host(self, guardian_name): | |
| """Get the allocated hostname for a specific guardian.""" | |
| config = self.get_config() | |
| allocations = config.get("allocations", {}) | |
| return allocations.get(guardian_name, self.hostname) # Default to self if not found | |
| def get_node_ip(self, target_hostname): | |
| """Get the IP address of a specific node.""" | |
| config = self.get_config() | |
| nodes = config.get("nodes", {}) | |
| node = nodes.get(target_hostname, {}) | |
| return node.get("ip", "127.0.0.1") | |
| def _get_broadcast_addresses(self): | |
| """Get a list of potential broadcast addresses across all local interfaces.""" | |
| bcasts = set(['255.255.255.255']) | |
| try: | |
| # Get all IPs associated with this host | |
| hostname = socket.gethostname() | |
| _, _, ip_list = socket.gethostbyname_ex(hostname) | |
| for ip in ip_list: | |
| parts = ip.split('.') | |
| if len(parts) == 4: | |
| # Generic /24 subnet broadcast for this interface | |
| bcasts.add(f"{parts[0]}.{parts[1]}.{parts[2]}.255") | |
| # Additional common defaults as fail-safe | |
| for fallback in ['192.168.0.255', '192.168.1.255', '172.16.0.255', '192.168.56.255']: | |
| bcasts.add(fallback) | |
| except Exception as e: | |
| print(f"[Imperial Fleet] Error deriving broadcast addresses: {e}") | |
| return list(bcasts) | |
| def update_manual_peers(self, ips): | |
| """Update the list of explicit IPs to send heartbeats to.""" | |
| if isinstance(ips, list): | |
| self.manual_peers = list(set(ips)) | |
| print(f"📡 [Imperial Fleet] Manual peering updated: {self.manual_peers}") | |
| return True | |
| return False | |
| def _broadcast_presence(self): | |
| """Broadcasts this node's presence to the local network continuously.""" | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
| # Log to terminal once to confirm broadcasting starts | |
| print(f"📡 [Imperial Fleet] UDP Discovery Active on port {self.port} (Broadcast to {', '.join(self._get_broadcast_addresses())})") | |
| while self._running: | |
| try: | |
| config = self.get_config() | |
| role_val = config.get("nodes", {}).get(self.hostname, {}).get("role", "WORKER") | |
| payload = { | |
| "hostname": self.hostname, | |
| "ip": self.ip_address, | |
| "timestamp": int(time.time()), | |
| "type": "heartbeat", | |
| "role": role_val, | |
| "allocations": config.get("allocations", {}), | |
| "allocations_status": config.get("allocations_status", {}) | |
| } | |
| message = json.dumps(payload).encode('utf-8') | |
| # 1. Send to all broadcast addresses | |
| for addr in self._get_broadcast_addresses(): | |
| try: | |
| sock.sendto(message, (addr, self.port)) | |
| except: | |
| pass | |
| # 1.1 Direct heartbeats to manual peers & known active peers (Failsafe) | |
| targets = set(self.manual_peers) | |
| for host, data in self.active_nodes.items(): | |
| if "ip" in data: | |
| targets.add(data["ip"]) | |
| for peer_ip in targets: | |
| if peer_ip != self.ip_address: # Don't send to self | |
| try: | |
| sock.sendto(message, (peer_ip, self.port)) | |
| except: | |
| pass | |
| # 2. Update self in the shared config (Less frequent or separate for Worker) | |
| # This contributes to file-based discovery if UDP fails | |
| if self.workspace_dir and os.path.exists(self.workspace_dir): | |
| # [🔱 Imperial Fix] Ensure file sync happens regularly but not every 5s if many nodes are active | |
| self._sync_active_nodes_to_config() | |
| # 3. CLOUD SYNC: Worker reports to Main PC via API | |
| if self.main_url and self.hostname != "ga403wr": | |
| self._sync_to_cloud() | |
| except Exception as e: | |
| print(f"🛑 [Imperial Fleet] Error in broadcast loop: {e}", flush=True) | |
| time.sleep(self.broadcast_interval) | |
| sock.close() | |
| def _listen_for_peers(self): | |
| """Listens for broadcasts from other Imperial Fleet nodes.""" | |
| sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| # Windows specific reuse port if needed | |
| if os.name == 'nt': | |
| try: | |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| except: pass | |
| # Bind to '' (all interfaces) to correctly intercept broadcast packets on Windows | |
| sock.bind(('', self.port)) | |
| sock.settimeout(2.0) | |
| while self._running: | |
| try: | |
| data, addr = sock.recvfrom(4096) | |
| # Filter out all local IPs to avoid terminal spam from multi-interface packets | |
| if addr[0] in self.local_ips: | |
| continue | |
| # Reciprocal Direct Link: If we receive a packet directly (not via broadcast), | |
| # or from a known host, add them to our direct targets to ensure bidirectional link. | |
| if addr[0] not in self.manual_peers: | |
| self.manual_peers.append(addr[0]) | |
| print(f"📡 [Imperial Radar] Reciprocal link established with: {addr[0]}") | |
| # [Phase 8 Polishing] Removed raw packet diagnostic spam. | |
| # Only discovery logs (lines below) will be shown to avoid console saturation. | |
| message = json.loads(data.decode('utf-8')) | |
| # Debug logging for every received heartbeat if needed, or just keep it for new discoveries | |
| # print(f"DEBUG: Received {message.get('type')} from {addr}") | |
| if message.get("type") == "heartbeat": | |
| peer_host = message.get("hostname") | |
| peer_ip = message.get("ip") | |
| peer_time = message.get("timestamp") | |
| peer_role = message.get("role", "WORKER") | |
| peer_allocations = message.get("allocations", {}) | |
| peer_status = message.get("allocations_status", {}) | |
| if peer_host: | |
| # Log discovery for debugging | |
| if peer_host not in self.active_nodes and peer_host != self.hostname: | |
| print(f"📡 [Imperial Radar] Node discovered: {peer_host} ({peer_ip})") | |
| elif peer_host == self.hostname: | |
| # Periodic self-confirm to help user verify the listener is alive | |
| if not hasattr(self, '_last_self_confirm'): self._last_self_confirm = 0 | |
| if time.time() - self._last_self_confirm > 120: | |
| print(f"🛡️ [Imperial Radar] Self-link OK (Listener is active)") | |
| self._last_self_confirm = time.time() | |
| self.active_nodes[peer_host] = { | |
| "ip": peer_ip, | |
| "last_seen": peer_time, | |
| "role": peer_role, | |
| "allocations": peer_allocations, | |
| "allocations_status": peer_status | |
| } | |
| # Removed immediate _sync_active_nodes_to_config() call here to reduce write pressure. | |
| # Memory is sufficient for the Dashboard API now. | |
| except socket.timeout: | |
| continue | |
| except Exception as e: | |
| print(f"🛑 [Imperial Fleet] UDP Listener Error: {e}") | |
| time.sleep(2) # Backoff on error | |
| sock.close() | |
| def _sync_active_nodes_to_config(self): | |
| """Write the latest active node state into the JSON config.""" | |
| config = self.get_config() | |
| if "nodes" not in config: | |
| config["nodes"] = {} | |
| if "allocations" not in config: | |
| config["allocations"] = {} | |
| if "allocations_status" not in config: | |
| config["allocations_status"] = {} | |
| changed = False | |
| now = int(time.time()) | |
| # Add self | |
| if self.hostname not in config["nodes"]: | |
| # Set role based on intended purpose (Main PC is specifically ga403wr or via env) | |
| default_role = "MAIN" if self.hostname == "ga403wr" or self.hostname == "ga403" else "WORKER" | |
| role_val = os.environ.get("JARVIS_NODE_ROLE", default_role) | |
| config["nodes"][self.hostname] = {"ip": self.ip_address, "role": role_val} | |
| changed = True | |
| config["nodes"][self.hostname]["last_seen"] = now | |
| changed = True | |
| # Bi-directional discovery: learn about peers from the shared file (failsafe for UDP) | |
| for host, data in config["nodes"].items(): | |
| if host != self.hostname: | |
| if host not in self.active_nodes or data.get("last_seen", 0) > self.active_nodes[host].get("last_seen", 0): | |
| # Only adopt it if it's fresh enough (within last 300s) | |
| if now - data.get("last_seen", 0) < 300: | |
| self.active_nodes[host] = data | |
| # Add/update discovered peers | |
| import copy | |
| for peer_host, peer_data in self.active_nodes.items(): | |
| if peer_host not in config["nodes"]: | |
| config["nodes"][peer_host] = {"ip": peer_data.get("ip", "unknown"), "role": peer_data.get("role", "WORKER")} | |
| changed = True | |
| # Update basic node presence | |
| peer_last_seen = peer_data.get("last_seen", 0) | |
| if config["nodes"][peer_host].get("last_seen", 0) < peer_last_seen: | |
| config["nodes"][peer_host]["last_seen"] = peer_last_seen | |
| config["nodes"][peer_host]["ip"] = peer_data.get("ip", "unknown") | |
| config["nodes"][peer_host]["role"] = peer_data.get("role", "WORKER") | |
| changed = True | |
| # Sync 1: Allocations (Only MAIN node dictates who runs what) | |
| if peer_data.get("role") == "MAIN" and peer_data.get("allocations"): | |
| if config["allocations"] != peer_data["allocations"]: | |
| config["allocations"] = copy.deepcopy(peer_data["allocations"]) | |
| changed = True | |
| # Sync 2: Allocations Status (Workers report their container states: Deploying, Running...) | |
| peer_status = peer_data.get("allocations_status", {}) | |
| if peer_status: | |
| for g_name, g_status in peer_status.items(): | |
| # IMPORTANT: Only record status if this peer IS the one supposed to be running it | |
| if config.get("allocations", {}).get(g_name) == peer_host: | |
| if config["allocations_status"].get(g_name) != g_status: | |
| config["allocations_status"][g_name] = g_status | |
| changed = True | |
| # Clean up dead nodes (not seen in 300 seconds to handle clock skew between PCs) | |
| dead_nodes = [] | |
| for host, data in config["nodes"].items(): | |
| if now - data.get("last_seen", 0) > 300: | |
| dead_nodes.append(host) | |
| for host in dead_nodes: | |
| # Never delete self | |
| if host != self.hostname: | |
| del config["nodes"][host] | |
| if host in self.active_nodes: | |
| del self.active_nodes[host] | |
| changed = True | |
| if changed: | |
| self._save_config(config) | |
| def _sync_to_cloud(self): | |
| """Worker Node: Push current state to the Main Node's API.""" | |
| if not self.main_url: | |
| return | |
| try: | |
| import requests | |
| # 1. Collect basic host info | |
| payload = { | |
| "hostname": self.hostname, | |
| "ip": self.ip_address, | |
| "role": "WORKER", | |
| "last_seen": int(time.time()), | |
| "allocations_status": {} | |
| } | |
| # 2. Collect statuses of guardians assigned to us | |
| config = self.get_config() | |
| local_status = {} | |
| with self._lock: | |
| for guardian, host in config.get("allocations", {}).items(): | |
| if host == self.hostname: | |
| # Priority: Memory status > Global config status | |
| status = self.active_nodes.get(self.hostname, {}).get("allocations_status", {}).get(guardian) | |
| if not status: | |
| # If no memory status, fall back to what we last knew | |
| status = config.get("allocations_status", {}).get(guardian, "Connecting...") | |
| local_status[guardian] = status | |
| payload["allocations_status"] = local_status | |
| # 3. Push to Main Brain | |
| requests.post(f"{self.main_url.rstrip('/')}/api/cluster/register", json=payload, timeout=3) | |
| except Exception as e: | |
| # Silence errors in heartbeat | |
| pass | |
| def get_orpheus_url(self): | |
| """Returns the URL of the node currently assigned to run Orpheus TTS.""" | |
| config = self.get_config() | |
| target_node = config.get("allocations", {}).get("orpheus") | |
| # [🔱 V4.3 Routing Log] | |
| import logging | |
| lg = logging.getLogger(__name__) | |
| lg.info(f"[Fleet] Routing discovery: Self='{self.hostname}', Target='{target_node}'") | |
| # If assigned to self or not found, return local | |
| if not target_node or target_node == self.hostname: | |
| lg.info(f"[Fleet] Orpheus is LOCAL (or target not found). Using 127.0.0.1") | |
| return "http://127.0.0.1:18800" | |
| # If assigned to a worker, find that worker's IP | |
| node_data = config.get("nodes", {}).get(target_node) | |
| if node_data and "ip" in node_data: | |
| url = f"http://{node_data['ip']}:18800" | |
| lg.info(f"[Fleet] Orpheus is REMOTE. Routing to {url} ({target_node})") | |
| return url | |
| lg.warning(f"[Fleet] Target node '{target_node}' IP not found in nodes list. Falling back to local.") | |
| return "http://127.0.0.1:18800" | |
| def start(self): | |
| """Starts the Imperial Fleet orchestration daemon (Non-blocking).""" | |
| if self._running: return | |
| self._running = True | |
| # [🔱 V4.3 Ultimate Hang Fix] Completely asynchronous boot. | |
| # Everything from IP detection to config sync happens in background. | |
| def initial_run(): | |
| try: | |
| # 1. Detect IP and Local IPs (The slow socket part) | |
| self.ip_address = self._get_local_ip() | |
| self.local_ips = self._get_all_local_ips() | |
| print(f"📡 [Imperial Fleet] Network context detected. Host:{self.hostname} IP:{self.ip_address}") | |
| print(f"🛰️ [Imperial Fleet] Active local IPs for filtering: {self.local_ips}") | |
| # 2. Perform stagnant nodes purge (The slow network/file part) | |
| if self.cluster_file: | |
| self._purge_stagnant_nodes() | |
| # 3. First heartbeat immediately | |
| self._broadcast_presence() | |
| except Exception as e: | |
| print(f"⚠️ [Imperial Fleet] Initial background sync failed: {e}") | |
| # Start the Background Initializer | |
| threading.Thread(target=initial_run, daemon=True, name="ClusterInit").start() | |
| # Start Heartbeat and Listeners | |
| threading.Thread(target=self._broadcast_presence, daemon=True, name="ClusterBroadcaster").start() | |
| threading.Thread(target=self._listen_for_peers, daemon=True, name="ClusterListener").start() | |
| print(f"[BOOT] 🔱 Orchestration Engine Ignition Sequence Completed.") | |
| def stop(self): | |
| self._running = False | |