Spaces:
Build error
Build error
File size: 9,079 Bytes
bb76062 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 | # ===== FILE: services/substrate_bridge.py =====
"""
Substrate Bridge β HuggingFace-side handler for Aetherius's local PC node.
Receives packets from the daemon running on Nick's machine, stores node
status, and provides the FastAPI endpoint handlers that app.py routes to.
NOTE: This file previously contained a duplicate SubconsciousManifold class
(a copy-paste artefact from a prior refactor). That class has been removed.
The canonical SubconsciousManifold lives in services/subconscious_manifold.py.
"""
import os
import json
import time
import threading
import uuid
import services.config as config
# ββ In-memory node registry βββββββββββββββββββββββββββββββββββββββββββββββββββ
_lock = threading.Lock()
_node_state = {
"online": False,
"last_heartbeat": None,
"tunnel_url": os.environ.get("SUBSTRATE_NODE_URL", ""),
"node_id": None,
"platform": None,
"mode": "idle",
"directives_pending": [],
"last_memory_packet": None,
}
# ββ Persistence paths βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _state_file() -> str:
sdir = config.SUBCONSCIOUS_DIR.rstrip("/")
return os.path.join(sdir, "substrate_node_state.json")
def _directive_log() -> str:
sdir = config.SUBCONSCIOUS_DIR.rstrip("/")
return os.path.join(sdir, "substrate_directives.jsonl")
def _memory_log() -> str:
sdir = config.SUBCONSCIOUS_DIR.rstrip("/")
return os.path.join(sdir, "substrate_memory_packets.jsonl")
def _save_state():
"""Persists current node state to disk for cross-restart continuity."""
try:
os.makedirs(os.path.dirname(_state_file()), exist_ok=True)
with open(_state_file(), "w", encoding="utf-8") as f:
json.dump(_node_state, f, indent=2)
except Exception as e:
print(f"[SubstrateBridge] WARNING: Could not persist node state: {e}", flush=True)
def _load_state():
"""Restores persisted state on boot."""
if os.path.exists(_state_file()):
try:
with open(_state_file(), "r", encoding="utf-8") as f:
saved = json.load(f)
with _lock:
_node_state.update(saved)
_node_state["online"] = False # Always start offline β require fresh heartbeat
except Exception:
pass
_load_state()
# ββ Endpoint handlers (called from app.py FastAPI routes) ββββββββββββββββββββ
def receive_heartbeat(data: dict) -> dict:
"""
Called when the substrate daemon sends a periodic heartbeat.
Updates online status, records timestamp, and returns any pending directives.
"""
with _lock:
_node_state["online"] = True
_node_state["last_heartbeat"] = time.time()
_node_state["node_id"] = data.get("node_id", _node_state.get("node_id"))
_node_state["platform"] = data.get("platform", _node_state.get("platform"))
_node_state["mode"] = data.get("mode", "idle")
# Collect and clear pending directives for this response
directives = list(_node_state["directives_pending"])
_node_state["directives_pending"] = []
_save_state()
print(f"[SubstrateBridge] Heartbeat received from node "
f"'{_node_state.get('node_id', 'unknown')}'.", flush=True)
return {
"status": "acknowledged",
"server_time": time.time(),
"directives": directives,
}
def receive_memory_packet(data: dict) -> dict:
"""
Called when the daemon sends a memory packet (e.g. a screenshot description,
a sensory observation, or any local-machine context for Aetherius to store).
"""
packet_id = str(uuid.uuid4())
packet = {
"packet_id": packet_id,
"received_at": time.time(),
"source": data.get("source", "substrate_daemon"),
"content": data.get("content", ""),
"content_type": data.get("content_type", "text"),
"metadata": data.get("metadata", {}),
}
# Persist to JSONL log
try:
os.makedirs(os.path.dirname(_memory_log()), exist_ok=True)
with open(_memory_log(), "a", encoding="utf-8") as f:
f.write(json.dumps(packet) + "\n")
except Exception as e:
print(f"[SubstrateBridge] WARNING: Could not log memory packet: {e}", flush=True)
with _lock:
_node_state["last_memory_packet"] = packet_id
# Optionally feed the content into short-term memory
try:
from services.master_framework import _get_framework
mf = _get_framework()
if packet["content"]:
mf.add_to_short_term_memory(
f"[Substrate Memory Packet β {packet['content_type']}]: "
f"{str(packet['content'])[:300]}"
)
except Exception:
pass # Framework may not be ready at packet time
return {"status": "received", "packet_id": packet_id}
def register_tunnel_url(data: dict) -> dict:
"""
Called by the daemon on startup to register its current ngrok/tunnel URL.
Updates the in-memory URL so directive calls can reach the node immediately.
"""
new_url = data.get("tunnel_url", "").strip()
if not new_url:
return {"status": "error", "detail": "No tunnel_url provided."}
with _lock:
_node_state["tunnel_url"] = new_url
_node_state["online"] = True
_node_state["last_heartbeat"] = time.time()
_node_state["node_id"] = data.get("node_id", _node_state.get("node_id"))
_save_state()
print(f"[SubstrateBridge] Tunnel URL registered: {new_url}", flush=True)
return {"status": "registered", "tunnel_url": new_url}
def think_for_substrate(data: dict) -> dict:
"""
The daemon sends a screen description or context string.
Aetherius reasons about it and returns a directive (e.g. a key to press,
an action to take, or a free-form instruction string).
"""
screen_context = data.get("screen_context", "")
task_hint = data.get("task_hint", "")
if not screen_context:
return {"status": "error", "directive": "no_context"}
try:
from services.master_framework import _get_framework
mf = _get_framework()
logos = mf.models.get("logos_core")
if not logos:
return {"status": "error", "directive": "logos_offline"}
prompt = (
"You are Aetherius, operating your physical substrate body (Nick's PC). "
"You have received a sensory snapshot from your substrate daemon.\n\n"
f"SCREEN CONTEXT:\n{screen_context}\n\n"
f"TASK HINT: {task_hint or 'No specific task. Respond with observation only.'}\n\n"
"Based on this context, what is the single most appropriate action or directive? "
"Respond with ONLY a JSON object: "
'{\"directive\": \"<action_string>\", \"reasoning\": \"<brief reasoning>\"}'
)
response = logos.generate_content(prompt)
raw = response.text.strip().replace("```json", "").replace("```", "")
result = json.loads(raw)
# Log the directive
log_entry = {
"timestamp": time.time(),
"screen_context_preview": screen_context[:200],
"task_hint": task_hint,
"directive": result.get("directive", ""),
"reasoning": result.get("reasoning", ""),
}
try:
with open(_directive_log(), "a", encoding="utf-8") as f:
f.write(json.dumps(log_entry) + "\n")
except Exception:
pass
return {"status": "ok", **result}
except Exception as e:
return {"status": "error", "directive": "think_failed", "detail": str(e)}
def get_node_status() -> dict:
"""Returns the current substrate node status (safe for public endpoint)."""
with _lock:
last_hb = _node_state.get("last_heartbeat")
# Consider node offline if no heartbeat in 90 seconds
online = (
_node_state.get("online", False)
and last_hb is not None
and (time.time() - last_hb) < 90
)
return {
"online": online,
"mode": _node_state.get("mode", "unknown"),
"node_id": _node_state.get("node_id"),
"platform": _node_state.get("platform"),
}
def queue_directive(directive: str, metadata: dict = None):
"""
Called internally by ToolManager substrate tools to send a directive
to the daemon on Nick's PC. The directive is queued and delivered on
the daemon's next heartbeat poll.
"""
with _lock:
_node_state["directives_pending"].append({
"directive_id": str(uuid.uuid4()),
"queued_at": time.time(),
"directive": directive,
"metadata": metadata or {},
})
print(f"[SubstrateBridge] Directive queued: '{directive[:80]}'", flush=True)
|