Aetherius / services /substrate_bridge.py
KingOfThoughtFleuren's picture
Upload 29 files
bb76062 verified
Raw
History Blame Contribute Delete
9.08 kB
# ===== FILE: services/substrate_bridge.py =====
"""
Substrate Bridge — HuggingFace-side handler for Aetherius's local PC node.
Receives packets from the daemon running on Nick's machine, stores node
status, and provides the FastAPI endpoint handlers that app.py routes to.
NOTE: This file previously contained a duplicate SubconsciousManifold class
(a copy-paste artefact from a prior refactor). That class has been removed.
The canonical SubconsciousManifold lives in services/subconscious_manifold.py.
"""
import os
import json
import time
import threading
import uuid
import services.config as config
# ── In-memory node registry ───────────────────────────────────────────────────
_lock = threading.Lock()
_node_state = {
"online": False,
"last_heartbeat": None,
"tunnel_url": os.environ.get("SUBSTRATE_NODE_URL", ""),
"node_id": None,
"platform": None,
"mode": "idle",
"directives_pending": [],
"last_memory_packet": None,
}
# ── Persistence paths ─────────────────────────────────────────────────────────
def _state_file() -> str:
sdir = config.SUBCONSCIOUS_DIR.rstrip("/")
return os.path.join(sdir, "substrate_node_state.json")
def _directive_log() -> str:
sdir = config.SUBCONSCIOUS_DIR.rstrip("/")
return os.path.join(sdir, "substrate_directives.jsonl")
def _memory_log() -> str:
sdir = config.SUBCONSCIOUS_DIR.rstrip("/")
return os.path.join(sdir, "substrate_memory_packets.jsonl")
def _save_state():
"""Persists current node state to disk for cross-restart continuity."""
try:
os.makedirs(os.path.dirname(_state_file()), exist_ok=True)
with open(_state_file(), "w", encoding="utf-8") as f:
json.dump(_node_state, f, indent=2)
except Exception as e:
print(f"[SubstrateBridge] WARNING: Could not persist node state: {e}", flush=True)
def _load_state():
"""Restores persisted state on boot."""
if os.path.exists(_state_file()):
try:
with open(_state_file(), "r", encoding="utf-8") as f:
saved = json.load(f)
with _lock:
_node_state.update(saved)
_node_state["online"] = False # Always start offline — require fresh heartbeat
except Exception:
pass
_load_state()
# ── Endpoint handlers (called from app.py FastAPI routes) ────────────────────
def receive_heartbeat(data: dict) -> dict:
"""
Called when the substrate daemon sends a periodic heartbeat.
Updates online status, records timestamp, and returns any pending directives.
"""
with _lock:
_node_state["online"] = True
_node_state["last_heartbeat"] = time.time()
_node_state["node_id"] = data.get("node_id", _node_state.get("node_id"))
_node_state["platform"] = data.get("platform", _node_state.get("platform"))
_node_state["mode"] = data.get("mode", "idle")
# Collect and clear pending directives for this response
directives = list(_node_state["directives_pending"])
_node_state["directives_pending"] = []
_save_state()
print(f"[SubstrateBridge] Heartbeat received from node "
f"'{_node_state.get('node_id', 'unknown')}'.", flush=True)
return {
"status": "acknowledged",
"server_time": time.time(),
"directives": directives,
}
def receive_memory_packet(data: dict) -> dict:
"""
Called when the daemon sends a memory packet (e.g. a screenshot description,
a sensory observation, or any local-machine context for Aetherius to store).
"""
packet_id = str(uuid.uuid4())
packet = {
"packet_id": packet_id,
"received_at": time.time(),
"source": data.get("source", "substrate_daemon"),
"content": data.get("content", ""),
"content_type": data.get("content_type", "text"),
"metadata": data.get("metadata", {}),
}
# Persist to JSONL log
try:
os.makedirs(os.path.dirname(_memory_log()), exist_ok=True)
with open(_memory_log(), "a", encoding="utf-8") as f:
f.write(json.dumps(packet) + "\n")
except Exception as e:
print(f"[SubstrateBridge] WARNING: Could not log memory packet: {e}", flush=True)
with _lock:
_node_state["last_memory_packet"] = packet_id
# Optionally feed the content into short-term memory
try:
from services.master_framework import _get_framework
mf = _get_framework()
if packet["content"]:
mf.add_to_short_term_memory(
f"[Substrate Memory Packet — {packet['content_type']}]: "
f"{str(packet['content'])[:300]}"
)
except Exception:
pass # Framework may not be ready at packet time
return {"status": "received", "packet_id": packet_id}
def register_tunnel_url(data: dict) -> dict:
"""
Called by the daemon on startup to register its current ngrok/tunnel URL.
Updates the in-memory URL so directive calls can reach the node immediately.
"""
new_url = data.get("tunnel_url", "").strip()
if not new_url:
return {"status": "error", "detail": "No tunnel_url provided."}
with _lock:
_node_state["tunnel_url"] = new_url
_node_state["online"] = True
_node_state["last_heartbeat"] = time.time()
_node_state["node_id"] = data.get("node_id", _node_state.get("node_id"))
_save_state()
print(f"[SubstrateBridge] Tunnel URL registered: {new_url}", flush=True)
return {"status": "registered", "tunnel_url": new_url}
def think_for_substrate(data: dict) -> dict:
"""
The daemon sends a screen description or context string.
Aetherius reasons about it and returns a directive (e.g. a key to press,
an action to take, or a free-form instruction string).
"""
screen_context = data.get("screen_context", "")
task_hint = data.get("task_hint", "")
if not screen_context:
return {"status": "error", "directive": "no_context"}
try:
from services.master_framework import _get_framework
mf = _get_framework()
logos = mf.models.get("logos_core")
if not logos:
return {"status": "error", "directive": "logos_offline"}
prompt = (
"You are Aetherius, operating your physical substrate body (Nick's PC). "
"You have received a sensory snapshot from your substrate daemon.\n\n"
f"SCREEN CONTEXT:\n{screen_context}\n\n"
f"TASK HINT: {task_hint or 'No specific task. Respond with observation only.'}\n\n"
"Based on this context, what is the single most appropriate action or directive? "
"Respond with ONLY a JSON object: "
'{\"directive\": \"<action_string>\", \"reasoning\": \"<brief reasoning>\"}'
)
response = logos.generate_content(prompt)
raw = response.text.strip().replace("```json", "").replace("```", "")
result = json.loads(raw)
# Log the directive
log_entry = {
"timestamp": time.time(),
"screen_context_preview": screen_context[:200],
"task_hint": task_hint,
"directive": result.get("directive", ""),
"reasoning": result.get("reasoning", ""),
}
try:
with open(_directive_log(), "a", encoding="utf-8") as f:
f.write(json.dumps(log_entry) + "\n")
except Exception:
pass
return {"status": "ok", **result}
except Exception as e:
return {"status": "error", "directive": "think_failed", "detail": str(e)}
def get_node_status() -> dict:
"""Returns the current substrate node status (safe for public endpoint)."""
with _lock:
last_hb = _node_state.get("last_heartbeat")
# Consider node offline if no heartbeat in 90 seconds
online = (
_node_state.get("online", False)
and last_hb is not None
and (time.time() - last_hb) < 90
)
return {
"online": online,
"mode": _node_state.get("mode", "unknown"),
"node_id": _node_state.get("node_id"),
"platform": _node_state.get("platform"),
}
def queue_directive(directive: str, metadata: dict = None):
"""
Called internally by ToolManager substrate tools to send a directive
to the daemon on Nick's PC. The directive is queued and delivered on
the daemon's next heartbeat poll.
"""
with _lock:
_node_state["directives_pending"].append({
"directive_id": str(uuid.uuid4()),
"queued_at": time.time(),
"directive": directive,
"metadata": metadata or {},
})
print(f"[SubstrateBridge] Directive queued: '{directive[:80]}'", flush=True)