shadowbrain / shadow_brain_core /brain /cluster_node.py
taemin1980's picture
🔱 Imperial Deployment: Shadow Brain Core ignition
d50a68d verified
Raw
History Blame Contribute Delete
28.8 kB
import socket
import threading
import json
import time
import os
import ctypes
class JarvisClusterNode:
"""Manages Imperial Fleet UDP discovery and shared cluster configuration."""
def __init__(self, workspace_dir=None, port=18791, broadcast_interval=5, main_url=None):
self.port = port
self.broadcast_interval = broadcast_interval
self.workspace_dir = workspace_dir
self.main_url = main_url or os.environ.get("JARVIS_MAIN_URL")
if self.workspace_dir:
self.cluster_file = os.path.join(self.workspace_dir, "JARVIS_CLUSTER.json")
else:
self.cluster_file = None # Cloud Mode: No local file dependency
# [🔱 V4.3 Ultimate Hang Fix] DO NOT call ANY network functions (socket/requests) in __init__.
# Hostname remains safe as it is usually a cached system property.
self.hostname = os.environ.get("JARVIS_NODE_ID", socket.gethostname())
self.ip_address = "Detecting..." # Deferred to start()
self.local_ips = ["127.0.0.1"] # Deferred to start()
self.active_nodes = {} # {hostname: {ip, last_seen, role, ...}}
self.manual_peers = [] # List of explicit IPs to heartbeat towards
self._running = False
self._lock = threading.Lock()
self._last_file_sync = 0 # New: Track file sync frequency
# Absolute path log for user verification
if self.cluster_file:
print(f"🛰️ [Imperial Fleet] Cluster Registry: {os.path.abspath(self.cluster_file)}")
else:
print(f"🛰️ [Imperial Fleet] Cloud Mode Active: Syncing with {self.main_url}")
# Ensure default config exists (only if file-based)
if self.cluster_file:
self._ensure_config()
# [🔱 V4.3 Hang Fix] Moved _purge_stagnant_nodes to initial_run thread
def _get_local_ip(self):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "127.0.0.1"
def _get_all_local_ips(self):
"""Get all IPv4 addresses assigned to this machine to filter out self-packets."""
ips = set(["127.0.0.1"])
try:
# Add primary IP
primary = self._get_local_ip()
ips.add(primary)
# Add all other IPs associated with hostname (includes virtual adapters)
hostname = socket.gethostname()
_, _, ip_list = socket.gethostbyname_ex(hostname)
for ip in ip_list:
ips.add(ip)
except Exception:
pass
return list(ips)
def _ensure_config(self):
"""Initialize the JARVIS_CLUSTER.json if it doesn't exist."""
if not os.path.exists(self.cluster_file):
default_config = {
"nodes": {
self.hostname: {
"ip": self.ip_address,
"role": "MAIN",
"last_seen": int(time.time())
}
},
"allocations": {
"orpheus": self.hostname, # Default to self
"aizen": "ANY",
"gilgamesh": "ANY"
}
}
self._save_config(default_config)
def get_config(self):
"""Read the shared cluster configuration (from file or API) and merge with live memory state."""
config = {"nodes": {}, "allocations": {}, "allocations_status": {}}
# CLOUD MODE: If main_url is set and no workspace_dir (or worker role), pull from API
if self.main_url and (not self.workspace_dir or self.hostname != "ga403wr"):
try:
import requests
response = requests.get(f"{self.main_url.rstrip('/')}/api/cluster/nodes", timeout=3)
if response.ok:
return response.json()
except Exception as e:
# If API fails, fall back to memory or file if available
pass
with self._lock:
if not self.cluster_file:
# In Cloud Mode without a local file, we just return the base config with memory merges
pass
else:
max_retries = 3
for i in range(max_retries):
try:
if os.path.exists(self.cluster_file):
file_size = os.path.getsize(self.cluster_file)
if file_size == 0 and i < max_retries - 1:
time.sleep(0.1)
continue
with open(self.cluster_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict):
config.update(data)
break
except (json.JSONDecodeError, IOError):
if i < max_retries - 1:
time.sleep(0.1)
continue
# CRITICAL: Always merge live nodes from memory so the UI sees them immediately
# even if file-saving is failing due to network locks.
# Also, if any discovered node is MAIN, adopt its allocations as the current truth.
for host, data in self.active_nodes.items():
# Update node list
if host not in config["nodes"] or data.get("last_seen", 0) > config["nodes"][host].get("last_seen", 0):
config["nodes"][host] = data
# ADOPTION LOGIC: If we see a fresher command state from a MAIN node via UDP, use it!
if data.get("role") == "MAIN":
peer_time = data.get("last_seen", 0)
# We only adopt if the UDP heartbeat is as fresh or fresher than the file we just read
# (Note: this handles cases where the file system is lagging behind the network)
if peer_time > 0:
if "allocations" in data:
config["allocations"] = data["allocations"]
# Merging status: Only take global status if we aren't a WORKER reporting our own
if self.hostname == "ga403wr" and "allocations_status" in data:
for guardian, status in data.get("allocations_status", {}).items():
config["allocations_status"][guardian] = status
# STATUS MERGING: If a WORKER node is reporting its own status for a guardian allocated to it, prioritize it.
elif data.get("role") == "WORKER":
for guardian, status in data.get("allocations_status", {}).items():
# Only take status from the node actually allocated to run this guardian
if config.get("allocations", {}).get(guardian) == host:
config["allocations_status"][guardian] = status
# Ensure self is always present
if self.hostname not in config["nodes"]:
config["nodes"][self.hostname] = {"ip": self.ip_address, "role": "MAIN" if self.hostname == "ga403wr" else "WORKER", "last_seen": int(time.time())}
return config
def _save_config(self, config):
"""Write the shared cluster configuration to disk using an atomic operation with retries."""
if not self.cluster_file:
return
with self._lock:
temp_file = self.cluster_file + ".tmp"
try:
os.makedirs(os.path.dirname(self.cluster_file), exist_ok=True)
# Write to temporary file first
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=4, ensure_ascii=False)
# Try atomic replace with retries for WinError 5 (Access Denied)
replace_retries = 10
for i in range(replace_retries):
try:
# Atomic replace (prevents partial reads by other nodes)
if os.path.exists(self.cluster_file):
os.remove(self.cluster_file)
os.rename(temp_file, self.cluster_file)
return # Success
except (PermissionError, OSError):
if i < replace_retries - 1:
time.sleep(0.3)
continue
raise # Re-raise on final failure
except Exception as e:
print(f"🛑 [Imperial Fleet] Error saving cluster config: {e}")
if os.path.exists(temp_file):
try: os.remove(temp_file)
except: pass
def _purge_stagnant_nodes(self):
"""[🔱 Imperial] Purge nodes that haven't been seen in > 24h to prevent ghost mappings."""
if not self.cluster_file or not os.path.exists(self.cluster_file): return
try:
config = self.get_config()
now = int(time.time())
stale_threshold = 86400 # 24 hours
nodes = config.get("nodes", {})
to_remove = []
for host, data in nodes.items():
if host != self.hostname:
if now - data.get("last_seen", 0) > stale_threshold:
to_remove.append(host)
if to_remove:
print(f"🧹 [Imperial Fleet] Purging {len(to_remove)} stagnant nodes: {to_remove}")
for host in to_remove:
del config["nodes"][host]
self._save_config(config)
except Exception as e:
print(f"⚠️ [Imperial Fleet] Stagnant purge failed: {e}")
def update_allocation(self, guardian_name, target_hostname):
"""Update the guardian allocation mapping."""
config = self.get_config()
if "allocations" not in config:
config["allocations"] = {}
config["allocations"][guardian_name] = target_hostname
# When allocation changes, reset status logic here
if "allocations_status" not in config:
config["allocations_status"] = {}
config["allocations_status"][guardian_name] = "Pending (배치 전송 중...)"
self._save_config(config)
def update_allocation_status(self, guardian_name, status_message):
"""Update the deployment status message for a specific guardian."""
# 1. Update memory state (Critical for Cloud Mode/Workers)
with self._lock:
if self.hostname not in self.active_nodes:
self.active_nodes[self.hostname] = {"ip": self.ip_address, "allocations_status": {}}
if "allocations_status" not in self.active_nodes[self.hostname]:
self.active_nodes[self.hostname]["allocations_status"] = {}
self.active_nodes[self.hostname]["allocations_status"][guardian_name] = status_message
# 2. Update local file config if applicable (Main node or shared mode)
config = self.get_config()
if "allocations_status" not in config:
config["allocations_status"] = {}
config["allocations_status"][guardian_name] = status_message
self._save_config(config)
def get_guardian_host(self, guardian_name):
"""Get the allocated hostname for a specific guardian."""
config = self.get_config()
allocations = config.get("allocations", {})
return allocations.get(guardian_name, self.hostname) # Default to self if not found
def get_node_ip(self, target_hostname):
"""Get the IP address of a specific node."""
config = self.get_config()
nodes = config.get("nodes", {})
node = nodes.get(target_hostname, {})
return node.get("ip", "127.0.0.1")
def _get_broadcast_addresses(self):
"""Get a list of potential broadcast addresses across all local interfaces."""
bcasts = set(['255.255.255.255'])
try:
# Get all IPs associated with this host
hostname = socket.gethostname()
_, _, ip_list = socket.gethostbyname_ex(hostname)
for ip in ip_list:
parts = ip.split('.')
if len(parts) == 4:
# Generic /24 subnet broadcast for this interface
bcasts.add(f"{parts[0]}.{parts[1]}.{parts[2]}.255")
# Additional common defaults as fail-safe
for fallback in ['192.168.0.255', '192.168.1.255', '172.16.0.255', '192.168.56.255']:
bcasts.add(fallback)
except Exception as e:
print(f"[Imperial Fleet] Error deriving broadcast addresses: {e}")
return list(bcasts)
def update_manual_peers(self, ips):
"""Update the list of explicit IPs to send heartbeats to."""
if isinstance(ips, list):
self.manual_peers = list(set(ips))
print(f"📡 [Imperial Fleet] Manual peering updated: {self.manual_peers}")
return True
return False
def _broadcast_presence(self):
"""Broadcasts this node's presence to the local network continuously."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# Log to terminal once to confirm broadcasting starts
print(f"📡 [Imperial Fleet] UDP Discovery Active on port {self.port} (Broadcast to {', '.join(self._get_broadcast_addresses())})")
while self._running:
try:
config = self.get_config()
role_val = config.get("nodes", {}).get(self.hostname, {}).get("role", "WORKER")
payload = {
"hostname": self.hostname,
"ip": self.ip_address,
"timestamp": int(time.time()),
"type": "heartbeat",
"role": role_val,
"allocations": config.get("allocations", {}),
"allocations_status": config.get("allocations_status", {})
}
message = json.dumps(payload).encode('utf-8')
# 1. Send to all broadcast addresses
for addr in self._get_broadcast_addresses():
try:
sock.sendto(message, (addr, self.port))
except:
pass
# 1.1 Direct heartbeats to manual peers & known active peers (Failsafe)
targets = set(self.manual_peers)
for host, data in self.active_nodes.items():
if "ip" in data:
targets.add(data["ip"])
for peer_ip in targets:
if peer_ip != self.ip_address: # Don't send to self
try:
sock.sendto(message, (peer_ip, self.port))
except:
pass
# 2. Update self in the shared config (Less frequent or separate for Worker)
# This contributes to file-based discovery if UDP fails
if self.workspace_dir and os.path.exists(self.workspace_dir):
# [🔱 Imperial Fix] Ensure file sync happens regularly but not every 5s if many nodes are active
self._sync_active_nodes_to_config()
# 3. CLOUD SYNC: Worker reports to Main PC via API
if self.main_url and self.hostname != "ga403wr":
self._sync_to_cloud()
except Exception as e:
print(f"🛑 [Imperial Fleet] Error in broadcast loop: {e}", flush=True)
time.sleep(self.broadcast_interval)
sock.close()
def _listen_for_peers(self):
"""Listens for broadcasts from other Imperial Fleet nodes."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# Windows specific reuse port if needed
if os.name == 'nt':
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except: pass
# Bind to '' (all interfaces) to correctly intercept broadcast packets on Windows
sock.bind(('', self.port))
sock.settimeout(2.0)
while self._running:
try:
data, addr = sock.recvfrom(4096)
# Filter out all local IPs to avoid terminal spam from multi-interface packets
if addr[0] in self.local_ips:
continue
# Reciprocal Direct Link: If we receive a packet directly (not via broadcast),
# or from a known host, add them to our direct targets to ensure bidirectional link.
if addr[0] not in self.manual_peers:
self.manual_peers.append(addr[0])
print(f"📡 [Imperial Radar] Reciprocal link established with: {addr[0]}")
# [Phase 8 Polishing] Removed raw packet diagnostic spam.
# Only discovery logs (lines below) will be shown to avoid console saturation.
message = json.loads(data.decode('utf-8'))
# Debug logging for every received heartbeat if needed, or just keep it for new discoveries
# print(f"DEBUG: Received {message.get('type')} from {addr}")
if message.get("type") == "heartbeat":
peer_host = message.get("hostname")
peer_ip = message.get("ip")
peer_time = message.get("timestamp")
peer_role = message.get("role", "WORKER")
peer_allocations = message.get("allocations", {})
peer_status = message.get("allocations_status", {})
if peer_host:
# Log discovery for debugging
if peer_host not in self.active_nodes and peer_host != self.hostname:
print(f"📡 [Imperial Radar] Node discovered: {peer_host} ({peer_ip})")
elif peer_host == self.hostname:
# Periodic self-confirm to help user verify the listener is alive
if not hasattr(self, '_last_self_confirm'): self._last_self_confirm = 0
if time.time() - self._last_self_confirm > 120:
print(f"🛡️ [Imperial Radar] Self-link OK (Listener is active)")
self._last_self_confirm = time.time()
self.active_nodes[peer_host] = {
"ip": peer_ip,
"last_seen": peer_time,
"role": peer_role,
"allocations": peer_allocations,
"allocations_status": peer_status
}
# Removed immediate _sync_active_nodes_to_config() call here to reduce write pressure.
# Memory is sufficient for the Dashboard API now.
except socket.timeout:
continue
except Exception as e:
print(f"🛑 [Imperial Fleet] UDP Listener Error: {e}")
time.sleep(2) # Backoff on error
sock.close()
def _sync_active_nodes_to_config(self):
"""Write the latest active node state into the JSON config."""
config = self.get_config()
if "nodes" not in config:
config["nodes"] = {}
if "allocations" not in config:
config["allocations"] = {}
if "allocations_status" not in config:
config["allocations_status"] = {}
changed = False
now = int(time.time())
# Add self
if self.hostname not in config["nodes"]:
# Set role based on intended purpose (Main PC is specifically ga403wr or via env)
default_role = "MAIN" if self.hostname == "ga403wr" or self.hostname == "ga403" else "WORKER"
role_val = os.environ.get("JARVIS_NODE_ROLE", default_role)
config["nodes"][self.hostname] = {"ip": self.ip_address, "role": role_val}
changed = True
config["nodes"][self.hostname]["last_seen"] = now
changed = True
# Bi-directional discovery: learn about peers from the shared file (failsafe for UDP)
for host, data in config["nodes"].items():
if host != self.hostname:
if host not in self.active_nodes or data.get("last_seen", 0) > self.active_nodes[host].get("last_seen", 0):
# Only adopt it if it's fresh enough (within last 300s)
if now - data.get("last_seen", 0) < 300:
self.active_nodes[host] = data
# Add/update discovered peers
import copy
for peer_host, peer_data in self.active_nodes.items():
if peer_host not in config["nodes"]:
config["nodes"][peer_host] = {"ip": peer_data.get("ip", "unknown"), "role": peer_data.get("role", "WORKER")}
changed = True
# Update basic node presence
peer_last_seen = peer_data.get("last_seen", 0)
if config["nodes"][peer_host].get("last_seen", 0) < peer_last_seen:
config["nodes"][peer_host]["last_seen"] = peer_last_seen
config["nodes"][peer_host]["ip"] = peer_data.get("ip", "unknown")
config["nodes"][peer_host]["role"] = peer_data.get("role", "WORKER")
changed = True
# Sync 1: Allocations (Only MAIN node dictates who runs what)
if peer_data.get("role") == "MAIN" and peer_data.get("allocations"):
if config["allocations"] != peer_data["allocations"]:
config["allocations"] = copy.deepcopy(peer_data["allocations"])
changed = True
# Sync 2: Allocations Status (Workers report their container states: Deploying, Running...)
peer_status = peer_data.get("allocations_status", {})
if peer_status:
for g_name, g_status in peer_status.items():
# IMPORTANT: Only record status if this peer IS the one supposed to be running it
if config.get("allocations", {}).get(g_name) == peer_host:
if config["allocations_status"].get(g_name) != g_status:
config["allocations_status"][g_name] = g_status
changed = True
# Clean up dead nodes (not seen in 300 seconds to handle clock skew between PCs)
dead_nodes = []
for host, data in config["nodes"].items():
if now - data.get("last_seen", 0) > 300:
dead_nodes.append(host)
for host in dead_nodes:
# Never delete self
if host != self.hostname:
del config["nodes"][host]
if host in self.active_nodes:
del self.active_nodes[host]
changed = True
if changed:
self._save_config(config)
def _sync_to_cloud(self):
"""Worker Node: Push current state to the Main Node's API."""
if not self.main_url:
return
try:
import requests
# 1. Collect basic host info
payload = {
"hostname": self.hostname,
"ip": self.ip_address,
"role": "WORKER",
"last_seen": int(time.time()),
"allocations_status": {}
}
# 2. Collect statuses of guardians assigned to us
config = self.get_config()
local_status = {}
with self._lock:
for guardian, host in config.get("allocations", {}).items():
if host == self.hostname:
# Priority: Memory status > Global config status
status = self.active_nodes.get(self.hostname, {}).get("allocations_status", {}).get(guardian)
if not status:
# If no memory status, fall back to what we last knew
status = config.get("allocations_status", {}).get(guardian, "Connecting...")
local_status[guardian] = status
payload["allocations_status"] = local_status
# 3. Push to Main Brain
requests.post(f"{self.main_url.rstrip('/')}/api/cluster/register", json=payload, timeout=3)
except Exception as e:
# Silence errors in heartbeat
pass
def get_orpheus_url(self):
"""Returns the URL of the node currently assigned to run Orpheus TTS."""
config = self.get_config()
target_node = config.get("allocations", {}).get("orpheus")
# [🔱 V4.3 Routing Log]
import logging
lg = logging.getLogger(__name__)
lg.info(f"[Fleet] Routing discovery: Self='{self.hostname}', Target='{target_node}'")
# If assigned to self or not found, return local
if not target_node or target_node == self.hostname:
lg.info(f"[Fleet] Orpheus is LOCAL (or target not found). Using 127.0.0.1")
return "http://127.0.0.1:18800"
# If assigned to a worker, find that worker's IP
node_data = config.get("nodes", {}).get(target_node)
if node_data and "ip" in node_data:
url = f"http://{node_data['ip']}:18800"
lg.info(f"[Fleet] Orpheus is REMOTE. Routing to {url} ({target_node})")
return url
lg.warning(f"[Fleet] Target node '{target_node}' IP not found in nodes list. Falling back to local.")
return "http://127.0.0.1:18800"
def start(self):
"""Starts the Imperial Fleet orchestration daemon (Non-blocking)."""
if self._running: return
self._running = True
# [🔱 V4.3 Ultimate Hang Fix] Completely asynchronous boot.
# Everything from IP detection to config sync happens in background.
def initial_run():
try:
# 1. Detect IP and Local IPs (The slow socket part)
self.ip_address = self._get_local_ip()
self.local_ips = self._get_all_local_ips()
print(f"📡 [Imperial Fleet] Network context detected. Host:{self.hostname} IP:{self.ip_address}")
print(f"🛰️ [Imperial Fleet] Active local IPs for filtering: {self.local_ips}")
# 2. Perform stagnant nodes purge (The slow network/file part)
if self.cluster_file:
self._purge_stagnant_nodes()
# 3. First heartbeat immediately
self._broadcast_presence()
except Exception as e:
print(f"⚠️ [Imperial Fleet] Initial background sync failed: {e}")
# Start the Background Initializer
threading.Thread(target=initial_run, daemon=True, name="ClusterInit").start()
# Start Heartbeat and Listeners
threading.Thread(target=self._broadcast_presence, daemon=True, name="ClusterBroadcaster").start()
threading.Thread(target=self._listen_for_peers, daemon=True, name="ClusterListener").start()
print(f"[BOOT] 🔱 Orchestration Engine Ignition Sequence Completed.")
def stop(self):
self._running = False