import socket import threading import json import time import os import ctypes class JarvisClusterNode: """Manages Imperial Fleet UDP discovery and shared cluster configuration.""" def __init__(self, workspace_dir=None, port=18791, broadcast_interval=5, main_url=None): self.port = port self.broadcast_interval = broadcast_interval self.workspace_dir = workspace_dir self.main_url = main_url or os.environ.get("JARVIS_MAIN_URL") if self.workspace_dir: self.cluster_file = os.path.join(self.workspace_dir, "JARVIS_CLUSTER.json") else: self.cluster_file = None # Cloud Mode: No local file dependency # [πŸ”± V4.3 Ultimate Hang Fix] DO NOT call ANY network functions (socket/requests) in __init__. # Hostname remains safe as it is usually a cached system property. self.hostname = os.environ.get("JARVIS_NODE_ID", socket.gethostname()) self.ip_address = "Detecting..." # Deferred to start() self.local_ips = ["127.0.0.1"] # Deferred to start() self.active_nodes = {} # {hostname: {ip, last_seen, role, ...}} self.manual_peers = [] # List of explicit IPs to heartbeat towards self._running = False self._lock = threading.Lock() self._last_file_sync = 0 # New: Track file sync frequency # Absolute path log for user verification if self.cluster_file: print(f"πŸ›°οΈ [Imperial Fleet] Cluster Registry: {os.path.abspath(self.cluster_file)}") else: print(f"πŸ›°οΈ [Imperial Fleet] Cloud Mode Active: Syncing with {self.main_url}") # Ensure default config exists (only if file-based) if self.cluster_file: self._ensure_config() # [πŸ”± V4.3 Hang Fix] Moved _purge_stagnant_nodes to initial_run thread def _get_local_ip(self): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "127.0.0.1" def _get_all_local_ips(self): """Get all IPv4 addresses assigned to this machine to filter out self-packets.""" ips = set(["127.0.0.1"]) try: # Add primary IP primary = self._get_local_ip() ips.add(primary) # Add all other IPs associated with hostname (includes virtual adapters) hostname = socket.gethostname() _, _, ip_list = socket.gethostbyname_ex(hostname) for ip in ip_list: ips.add(ip) except Exception: pass return list(ips) def _ensure_config(self): """Initialize the JARVIS_CLUSTER.json if it doesn't exist.""" if not os.path.exists(self.cluster_file): default_config = { "nodes": { self.hostname: { "ip": self.ip_address, "role": "MAIN", "last_seen": int(time.time()) } }, "allocations": { "orpheus": self.hostname, # Default to self "aizen": "ANY", "gilgamesh": "ANY" } } self._save_config(default_config) def get_config(self): """Read the shared cluster configuration (from file or API) and merge with live memory state.""" config = {"nodes": {}, "allocations": {}, "allocations_status": {}} # CLOUD MODE: If main_url is set and no workspace_dir (or worker role), pull from API if self.main_url and (not self.workspace_dir or self.hostname != "ga403wr"): try: import requests response = requests.get(f"{self.main_url.rstrip('/')}/api/cluster/nodes", timeout=3) if response.ok: return response.json() except Exception as e: # If API fails, fall back to memory or file if available pass with self._lock: if not self.cluster_file: # In Cloud Mode without a local file, we just return the base config with memory merges pass else: max_retries = 3 for i in range(max_retries): try: if os.path.exists(self.cluster_file): file_size = os.path.getsize(self.cluster_file) if file_size == 0 and i < max_retries - 1: time.sleep(0.1) continue with open(self.cluster_file, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): config.update(data) break except (json.JSONDecodeError, IOError): if i < max_retries - 1: time.sleep(0.1) continue # CRITICAL: Always merge live nodes from memory so the UI sees them immediately # even if file-saving is failing due to network locks. # Also, if any discovered node is MAIN, adopt its allocations as the current truth. for host, data in self.active_nodes.items(): # Update node list if host not in config["nodes"] or data.get("last_seen", 0) > config["nodes"][host].get("last_seen", 0): config["nodes"][host] = data # ADOPTION LOGIC: If we see a fresher command state from a MAIN node via UDP, use it! if data.get("role") == "MAIN": peer_time = data.get("last_seen", 0) # We only adopt if the UDP heartbeat is as fresh or fresher than the file we just read # (Note: this handles cases where the file system is lagging behind the network) if peer_time > 0: if "allocations" in data: config["allocations"] = data["allocations"] # Merging status: Only take global status if we aren't a WORKER reporting our own if self.hostname == "ga403wr" and "allocations_status" in data: for guardian, status in data.get("allocations_status", {}).items(): config["allocations_status"][guardian] = status # STATUS MERGING: If a WORKER node is reporting its own status for a guardian allocated to it, prioritize it. elif data.get("role") == "WORKER": for guardian, status in data.get("allocations_status", {}).items(): # Only take status from the node actually allocated to run this guardian if config.get("allocations", {}).get(guardian) == host: config["allocations_status"][guardian] = status # Ensure self is always present if self.hostname not in config["nodes"]: config["nodes"][self.hostname] = {"ip": self.ip_address, "role": "MAIN" if self.hostname == "ga403wr" else "WORKER", "last_seen": int(time.time())} return config def _save_config(self, config): """Write the shared cluster configuration to disk using an atomic operation with retries.""" if not self.cluster_file: return with self._lock: temp_file = self.cluster_file + ".tmp" try: os.makedirs(os.path.dirname(self.cluster_file), exist_ok=True) # Write to temporary file first with open(temp_file, 'w', encoding='utf-8') as f: json.dump(config, f, indent=4, ensure_ascii=False) # Try atomic replace with retries for WinError 5 (Access Denied) replace_retries = 10 for i in range(replace_retries): try: # Atomic replace (prevents partial reads by other nodes) if os.path.exists(self.cluster_file): os.remove(self.cluster_file) os.rename(temp_file, self.cluster_file) return # Success except (PermissionError, OSError): if i < replace_retries - 1: time.sleep(0.3) continue raise # Re-raise on final failure except Exception as e: print(f"πŸ›‘ [Imperial Fleet] Error saving cluster config: {e}") if os.path.exists(temp_file): try: os.remove(temp_file) except: pass def _purge_stagnant_nodes(self): """[πŸ”± Imperial] Purge nodes that haven't been seen in > 24h to prevent ghost mappings.""" if not self.cluster_file or not os.path.exists(self.cluster_file): return try: config = self.get_config() now = int(time.time()) stale_threshold = 86400 # 24 hours nodes = config.get("nodes", {}) to_remove = [] for host, data in nodes.items(): if host != self.hostname: if now - data.get("last_seen", 0) > stale_threshold: to_remove.append(host) if to_remove: print(f"🧹 [Imperial Fleet] Purging {len(to_remove)} stagnant nodes: {to_remove}") for host in to_remove: del config["nodes"][host] self._save_config(config) except Exception as e: print(f"⚠️ [Imperial Fleet] Stagnant purge failed: {e}") def update_allocation(self, guardian_name, target_hostname): """Update the guardian allocation mapping.""" config = self.get_config() if "allocations" not in config: config["allocations"] = {} config["allocations"][guardian_name] = target_hostname # When allocation changes, reset status logic here if "allocations_status" not in config: config["allocations_status"] = {} config["allocations_status"][guardian_name] = "Pending (배치 전솑 쀑...)" self._save_config(config) def update_allocation_status(self, guardian_name, status_message): """Update the deployment status message for a specific guardian.""" # 1. Update memory state (Critical for Cloud Mode/Workers) with self._lock: if self.hostname not in self.active_nodes: self.active_nodes[self.hostname] = {"ip": self.ip_address, "allocations_status": {}} if "allocations_status" not in self.active_nodes[self.hostname]: self.active_nodes[self.hostname]["allocations_status"] = {} self.active_nodes[self.hostname]["allocations_status"][guardian_name] = status_message # 2. Update local file config if applicable (Main node or shared mode) config = self.get_config() if "allocations_status" not in config: config["allocations_status"] = {} config["allocations_status"][guardian_name] = status_message self._save_config(config) def get_guardian_host(self, guardian_name): """Get the allocated hostname for a specific guardian.""" config = self.get_config() allocations = config.get("allocations", {}) return allocations.get(guardian_name, self.hostname) # Default to self if not found def get_node_ip(self, target_hostname): """Get the IP address of a specific node.""" config = self.get_config() nodes = config.get("nodes", {}) node = nodes.get(target_hostname, {}) return node.get("ip", "127.0.0.1") def _get_broadcast_addresses(self): """Get a list of potential broadcast addresses across all local interfaces.""" bcasts = set(['255.255.255.255']) try: # Get all IPs associated with this host hostname = socket.gethostname() _, _, ip_list = socket.gethostbyname_ex(hostname) for ip in ip_list: parts = ip.split('.') if len(parts) == 4: # Generic /24 subnet broadcast for this interface bcasts.add(f"{parts[0]}.{parts[1]}.{parts[2]}.255") # Additional common defaults as fail-safe for fallback in ['192.168.0.255', '192.168.1.255', '172.16.0.255', '192.168.56.255']: bcasts.add(fallback) except Exception as e: print(f"[Imperial Fleet] Error deriving broadcast addresses: {e}") return list(bcasts) def update_manual_peers(self, ips): """Update the list of explicit IPs to send heartbeats to.""" if isinstance(ips, list): self.manual_peers = list(set(ips)) print(f"πŸ“‘ [Imperial Fleet] Manual peering updated: {self.manual_peers}") return True return False def _broadcast_presence(self): """Broadcasts this node's presence to the local network continuously.""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # Log to terminal once to confirm broadcasting starts print(f"πŸ“‘ [Imperial Fleet] UDP Discovery Active on port {self.port} (Broadcast to {', '.join(self._get_broadcast_addresses())})") while self._running: try: config = self.get_config() role_val = config.get("nodes", {}).get(self.hostname, {}).get("role", "WORKER") payload = { "hostname": self.hostname, "ip": self.ip_address, "timestamp": int(time.time()), "type": "heartbeat", "role": role_val, "allocations": config.get("allocations", {}), "allocations_status": config.get("allocations_status", {}) } message = json.dumps(payload).encode('utf-8') # 1. Send to all broadcast addresses for addr in self._get_broadcast_addresses(): try: sock.sendto(message, (addr, self.port)) except: pass # 1.1 Direct heartbeats to manual peers & known active peers (Failsafe) targets = set(self.manual_peers) for host, data in self.active_nodes.items(): if "ip" in data: targets.add(data["ip"]) for peer_ip in targets: if peer_ip != self.ip_address: # Don't send to self try: sock.sendto(message, (peer_ip, self.port)) except: pass # 2. Update self in the shared config (Less frequent or separate for Worker) # This contributes to file-based discovery if UDP fails if self.workspace_dir and os.path.exists(self.workspace_dir): # [πŸ”± Imperial Fix] Ensure file sync happens regularly but not every 5s if many nodes are active self._sync_active_nodes_to_config() # 3. CLOUD SYNC: Worker reports to Main PC via API if self.main_url and self.hostname != "ga403wr": self._sync_to_cloud() except Exception as e: print(f"πŸ›‘ [Imperial Fleet] Error in broadcast loop: {e}", flush=True) time.sleep(self.broadcast_interval) sock.close() def _listen_for_peers(self): """Listens for broadcasts from other Imperial Fleet nodes.""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Windows specific reuse port if needed if os.name == 'nt': try: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except: pass # Bind to '' (all interfaces) to correctly intercept broadcast packets on Windows sock.bind(('', self.port)) sock.settimeout(2.0) while self._running: try: data, addr = sock.recvfrom(4096) # Filter out all local IPs to avoid terminal spam from multi-interface packets if addr[0] in self.local_ips: continue # Reciprocal Direct Link: If we receive a packet directly (not via broadcast), # or from a known host, add them to our direct targets to ensure bidirectional link. if addr[0] not in self.manual_peers: self.manual_peers.append(addr[0]) print(f"πŸ“‘ [Imperial Radar] Reciprocal link established with: {addr[0]}") # [Phase 8 Polishing] Removed raw packet diagnostic spam. # Only discovery logs (lines below) will be shown to avoid console saturation. message = json.loads(data.decode('utf-8')) # Debug logging for every received heartbeat if needed, or just keep it for new discoveries # print(f"DEBUG: Received {message.get('type')} from {addr}") if message.get("type") == "heartbeat": peer_host = message.get("hostname") peer_ip = message.get("ip") peer_time = message.get("timestamp") peer_role = message.get("role", "WORKER") peer_allocations = message.get("allocations", {}) peer_status = message.get("allocations_status", {}) if peer_host: # Log discovery for debugging if peer_host not in self.active_nodes and peer_host != self.hostname: print(f"πŸ“‘ [Imperial Radar] Node discovered: {peer_host} ({peer_ip})") elif peer_host == self.hostname: # Periodic self-confirm to help user verify the listener is alive if not hasattr(self, '_last_self_confirm'): self._last_self_confirm = 0 if time.time() - self._last_self_confirm > 120: print(f"πŸ›‘οΈ [Imperial Radar] Self-link OK (Listener is active)") self._last_self_confirm = time.time() self.active_nodes[peer_host] = { "ip": peer_ip, "last_seen": peer_time, "role": peer_role, "allocations": peer_allocations, "allocations_status": peer_status } # Removed immediate _sync_active_nodes_to_config() call here to reduce write pressure. # Memory is sufficient for the Dashboard API now. except socket.timeout: continue except Exception as e: print(f"πŸ›‘ [Imperial Fleet] UDP Listener Error: {e}") time.sleep(2) # Backoff on error sock.close() def _sync_active_nodes_to_config(self): """Write the latest active node state into the JSON config.""" config = self.get_config() if "nodes" not in config: config["nodes"] = {} if "allocations" not in config: config["allocations"] = {} if "allocations_status" not in config: config["allocations_status"] = {} changed = False now = int(time.time()) # Add self if self.hostname not in config["nodes"]: # Set role based on intended purpose (Main PC is specifically ga403wr or via env) default_role = "MAIN" if self.hostname == "ga403wr" or self.hostname == "ga403" else "WORKER" role_val = os.environ.get("JARVIS_NODE_ROLE", default_role) config["nodes"][self.hostname] = {"ip": self.ip_address, "role": role_val} changed = True config["nodes"][self.hostname]["last_seen"] = now changed = True # Bi-directional discovery: learn about peers from the shared file (failsafe for UDP) for host, data in config["nodes"].items(): if host != self.hostname: if host not in self.active_nodes or data.get("last_seen", 0) > self.active_nodes[host].get("last_seen", 0): # Only adopt it if it's fresh enough (within last 300s) if now - data.get("last_seen", 0) < 300: self.active_nodes[host] = data # Add/update discovered peers import copy for peer_host, peer_data in self.active_nodes.items(): if peer_host not in config["nodes"]: config["nodes"][peer_host] = {"ip": peer_data.get("ip", "unknown"), "role": peer_data.get("role", "WORKER")} changed = True # Update basic node presence peer_last_seen = peer_data.get("last_seen", 0) if config["nodes"][peer_host].get("last_seen", 0) < peer_last_seen: config["nodes"][peer_host]["last_seen"] = peer_last_seen config["nodes"][peer_host]["ip"] = peer_data.get("ip", "unknown") config["nodes"][peer_host]["role"] = peer_data.get("role", "WORKER") changed = True # Sync 1: Allocations (Only MAIN node dictates who runs what) if peer_data.get("role") == "MAIN" and peer_data.get("allocations"): if config["allocations"] != peer_data["allocations"]: config["allocations"] = copy.deepcopy(peer_data["allocations"]) changed = True # Sync 2: Allocations Status (Workers report their container states: Deploying, Running...) peer_status = peer_data.get("allocations_status", {}) if peer_status: for g_name, g_status in peer_status.items(): # IMPORTANT: Only record status if this peer IS the one supposed to be running it if config.get("allocations", {}).get(g_name) == peer_host: if config["allocations_status"].get(g_name) != g_status: config["allocations_status"][g_name] = g_status changed = True # Clean up dead nodes (not seen in 300 seconds to handle clock skew between PCs) dead_nodes = [] for host, data in config["nodes"].items(): if now - data.get("last_seen", 0) > 300: dead_nodes.append(host) for host in dead_nodes: # Never delete self if host != self.hostname: del config["nodes"][host] if host in self.active_nodes: del self.active_nodes[host] changed = True if changed: self._save_config(config) def _sync_to_cloud(self): """Worker Node: Push current state to the Main Node's API.""" if not self.main_url: return try: import requests # 1. Collect basic host info payload = { "hostname": self.hostname, "ip": self.ip_address, "role": "WORKER", "last_seen": int(time.time()), "allocations_status": {} } # 2. Collect statuses of guardians assigned to us config = self.get_config() local_status = {} with self._lock: for guardian, host in config.get("allocations", {}).items(): if host == self.hostname: # Priority: Memory status > Global config status status = self.active_nodes.get(self.hostname, {}).get("allocations_status", {}).get(guardian) if not status: # If no memory status, fall back to what we last knew status = config.get("allocations_status", {}).get(guardian, "Connecting...") local_status[guardian] = status payload["allocations_status"] = local_status # 3. Push to Main Brain requests.post(f"{self.main_url.rstrip('/')}/api/cluster/register", json=payload, timeout=3) except Exception as e: # Silence errors in heartbeat pass def get_orpheus_url(self): """Returns the URL of the node currently assigned to run Orpheus TTS.""" config = self.get_config() target_node = config.get("allocations", {}).get("orpheus") # [πŸ”± V4.3 Routing Log] import logging lg = logging.getLogger(__name__) lg.info(f"[Fleet] Routing discovery: Self='{self.hostname}', Target='{target_node}'") # If assigned to self or not found, return local if not target_node or target_node == self.hostname: lg.info(f"[Fleet] Orpheus is LOCAL (or target not found). Using 127.0.0.1") return "http://127.0.0.1:18800" # If assigned to a worker, find that worker's IP node_data = config.get("nodes", {}).get(target_node) if node_data and "ip" in node_data: url = f"http://{node_data['ip']}:18800" lg.info(f"[Fleet] Orpheus is REMOTE. Routing to {url} ({target_node})") return url lg.warning(f"[Fleet] Target node '{target_node}' IP not found in nodes list. Falling back to local.") return "http://127.0.0.1:18800" def start(self): """Starts the Imperial Fleet orchestration daemon (Non-blocking).""" if self._running: return self._running = True # [πŸ”± V4.3 Ultimate Hang Fix] Completely asynchronous boot. # Everything from IP detection to config sync happens in background. def initial_run(): try: # 1. Detect IP and Local IPs (The slow socket part) self.ip_address = self._get_local_ip() self.local_ips = self._get_all_local_ips() print(f"πŸ“‘ [Imperial Fleet] Network context detected. Host:{self.hostname} IP:{self.ip_address}") print(f"πŸ›°οΈ [Imperial Fleet] Active local IPs for filtering: {self.local_ips}") # 2. Perform stagnant nodes purge (The slow network/file part) if self.cluster_file: self._purge_stagnant_nodes() # 3. First heartbeat immediately self._broadcast_presence() except Exception as e: print(f"⚠️ [Imperial Fleet] Initial background sync failed: {e}") # Start the Background Initializer threading.Thread(target=initial_run, daemon=True, name="ClusterInit").start() # Start Heartbeat and Listeners threading.Thread(target=self._broadcast_presence, daemon=True, name="ClusterBroadcaster").start() threading.Thread(target=self._listen_for_peers, daemon=True, name="ClusterListener").start() print(f"[BOOT] πŸ”± Orchestration Engine Ignition Sequence Completed.") def stop(self): self._running = False