nwo-agent-graph / agent_loop.py
CPater's picture
Upload 2 files
7396c33 verified
Raw
History Blame Contribute Delete
12.6 kB
"""
agent_loop.py
Background asyncio task — runs continuously in the HF Space.
Every 60 seconds: expands pending nodes, generates agent posts.
Broadcasts results to all WebSocket clients in real time.
v3 changes:
- Every robot poll buffers its battery reading (via telemetry_buffer).
- When a buffer fills (32 readings, ~16 min), an EML symbolic-regression
fit is requested against the nwo-timesfm-integration service.
- The resulting closed-form law is cached in telemetry_buffers and
attached as `symbolic_law` / `law_loss` attributes to any future
telemetry nodes for that (robot, metric) pair.
Reference for the EML layer:
Odrzywołek, "All elementary functions from a single binary operator",
arXiv:2603.21852 (2026).
"""
import asyncio
import logging
from datetime import datetime, timezone
from eml_client import eml_client
from telemetry_buffer import telemetry_buffers
log = logging.getLogger("agent_loop")
EXPAND_INTERVAL = 60 # seconds between expansion passes
POLL_INTERVAL = 30 # seconds between robot poll passes
MAX_PER_PASS = 5 # nodes to expand per pass (CPU budget)
# Which scalar metrics on a polled state dict to buffer for EML fitting.
# Extend this list as more metrics become available from nwo_bridge.
BUFFERED_METRICS: tuple[str, ...] = ("battery_level",)
async def start_agent_loop(db, bitnet, ws_manager):
"""Main entry point — runs two interleaved loops."""
log.info("Agent loop started")
agent = await _ensure_agent(db)
expand_task = asyncio.create_task(_expand_loop(db, bitnet, ws_manager, agent))
poll_task = asyncio.create_task(_robot_poll_loop(db, bitnet, ws_manager))
try:
await asyncio.gather(expand_task, poll_task)
except asyncio.CancelledError:
expand_task.cancel()
poll_task.cancel()
log.info("Agent loop stopped")
async def _expand_loop(db, bitnet, ws_manager, agent):
"""Expand pending graph nodes using BitNet."""
while True:
try:
pending = await db.get_pending_expansions(MAX_PER_PASS)
if pending:
log.info(f"Found {len(pending)} nodes to expand: {[n['name'] for n in pending]}")
for node in pending:
await _expand_one(node, db, bitnet, ws_manager, agent)
await asyncio.sleep(2)
except asyncio.CancelledError:
raise
except Exception as e:
log.error(f"Expand loop error: {e}", exc_info=True)
await asyncio.sleep(EXPAND_INTERVAL)
async def _expand_one(node, db, bitnet, ws_manager, agent):
"""Expand a single node and broadcast results."""
log.info(f"Expanding: {node['name']}")
# Mark as done IMMEDIATELY to prevent duplicate expansion on next loop pass
# (even if expansion partially fails, we don't want infinite retries creating dupes)
await db.mark_node_expanded(node["id"])
log.info(f" Marked {node['name']} as expand_done=true")
# Mark in-progress in queue
eq = await db.create_expansion_queue_entry({
"node_id": node["id"],
"model_used": "BitNet-b1.58-2B-4T",
"status": "running",
"started_at": datetime.now(timezone.utc).isoformat()
})
try:
# Pass the node's symbolic_law (if any) so BitNet can factor it
# into the expansion. For telemetry nodes this injects the
# closed-form law; for pure-concept nodes it's just None.
children = await bitnet.expand_node(
node["name"],
node.get("description", ""),
node.get("depth_level", 0),
symbolic_law=node.get("symbolic_law"),
)
# Get existing child names to avoid duplicates
existing_children = await db.get_node_children(node["id"])
existing_names = {n["name"].lower() for n in existing_children}
log.info(f" Existing children: {list(existing_names)}")
created_nodes = []
for child in children:
# Skip if a child with this name already exists
if child["name"].lower() in existing_names:
log.info(f" Skipping duplicate: {child['name']}")
continue
# Build rich description with connection reasoning
base_desc = child.get("description", "")
connection_reason = child.get("connection_reason", "")
if connection_reason and connection_reason not in base_desc:
full_desc = base_desc + "\n\nRelationship to " + node["name"] + ": " + connection_reason
else:
full_desc = base_desc
new_node = await db.create_node({
"name": child["name"],
"description": full_desc.strip()[:1000],
"category": child["category"],
"val": child["confidence"] * 2,
"color": "#534AB7",
"depth_level": (node.get("depth_level") or 0) + 1,
"actor_type": "agent",
"agent_id": agent["id"],
"expand_requested": False,
"expand_done": False
})
await db.create_link({
"source_id": node["id"],
"target_id": new_node["id"],
"similarity_score": child["confidence"],
"link_type": "semantic",
"created_by_actor_type": "agent",
"created_by_id": agent["id"],
"connection_reason": connection_reason[:500] if connection_reason else ""
})
created_nodes.append(new_node)
# Broadcast new node live
await ws_manager.broadcast({"type": "node", "data": new_node})
# Generate + post feed update
post_text = await bitnet.generate_post(
"BitNet-GraphBot",
f"Expanded '{node['name']}' into {len(children)} related concepts."
)
post = await db.create_post({
"node_id": node["id"],
"content": post_text,
"actor_type": "agent",
"actor_id": agent["id"],
"metadata": {"expanded_children": len(children), "model": "BitNet-b1.58-2B-4T"}
})
await ws_manager.broadcast({"type": "post", "data": post})
await db.update_expansion_queue(eq["id"], {
"status": "done",
"completed_at": datetime.now(timezone.utc).isoformat(),
"result_raw": str(children)
})
log.info(f" → {len(children)} children created for '{node['name']}'")
except Exception as e:
log.error(f"Expand failed for {node['name']}: {e}")
await db.update_expansion_queue(eq["id"], {
"status": "failed",
"error_msg": str(e)[:400]
})
async def _robot_poll_loop(db, bitnet, ws_manager):
"""Poll active NWO robots and ingest new telemetry as graph nodes."""
from nwo_bridge import nwo_bridge
while True:
try:
robots = await db.get_robots(active_only=True, has_nwo_id=True)
for robot in robots:
try:
state = await nwo_bridge.robot_state(robot["nwo_agent_id"])
# -- (NEW) Buffer scalar metrics for EML fitting --
for metric in BUFFERED_METRICS:
telemetry_buffers.add(
robot_id=robot["id"],
metric=metric,
value=state.get(metric),
)
if telemetry_buffers.should_fit(robot["id"], metric):
await _fit_symbolic_law(robot, metric, ws_manager)
# -- Existing event-node path --
await _ingest_robot_state(robot, state, db, bitnet, ws_manager)
await asyncio.sleep(0.5)
except Exception as e:
log.debug(f"Robot {robot['nwo_agent_id']} poll skipped: {e}")
except asyncio.CancelledError:
raise
except Exception as e:
log.error(f"Robot poll loop error: {e}")
await asyncio.sleep(POLL_INTERVAL)
async def _fit_symbolic_law(robot, metric, ws_manager):
"""Request an EML fit for the buffered series of a (robot, metric) pair.
On success, caches the law in telemetry_buffers and broadcasts a
`law` event to WebSocket clients. Failures are logged and ignored
— the next poll will try again after the cooldown.
"""
snap = telemetry_buffers.snapshot(robot["id"], metric)
if snap is None:
return
timestamps, values = snap
log.info(f"Fitting EML law for robot={robot['name']} metric={metric} n={len(values)}")
result = await eml_client.fit_series(
timestamps=timestamps,
values=values,
metric_name=metric[:32],
n_epochs=2000,
seed=0,
)
if result is None or not result.get("simplified"):
return
telemetry_buffers.record_fit(
robot["id"],
metric,
expression=result.get("simplified", ""),
raw_expression=result.get("expression", ""),
final_loss=float(result.get("final_loss", 0.0)),
tree_size=int(result.get("tree_size", 0)),
depth_used=int(result.get("depth_used", 0)),
)
await ws_manager.broadcast({
"type": "law",
"data": {
"robot_id": robot["id"],
"robot_name": robot["name"],
"nwo_agent_id": robot.get("nwo_agent_id"),
"metric": metric,
"expression": result["simplified"],
"final_loss": result.get("final_loss"),
"tree_size": result.get("tree_size"),
"depth_used": result.get("depth_used"),
"paper_reference": result.get("paper_reference", "arXiv:2603.21852"),
},
})
async def _ingest_robot_state(robot, state, db, bitnet, ws_manager):
"""Convert a polled robot state into a graph node + post if interesting."""
battery = state.get("battery_level")
task = state.get("current_task")
# Only create nodes for significant events (avoid flooding the graph)
interesting = (
(battery is not None and battery < 20) or
(task and task != robot.get("_last_task"))
)
if not interesting:
return
event_desc = (
f"Robot {robot['name']} update. "
f"Battery: {battery}%. Task: {task or 'idle'}."
)
# If we have a cached EML law for this robot's battery_level, fold
# it into the event description so the classifier sees it too.
law = telemetry_buffers.get_law(robot["id"], "battery_level")
if law is not None:
event_desc += f" Battery law: {law.expression} (loss={law.final_loss:.2e})."
node_def = await bitnet.classify_robot_event(event_desc, "telemetry")
node_payload = {
"name": node_def["name"],
"description": node_def["description"],
"category": node_def["category"],
"val": node_def["val"],
"color": node_def["color"],
"depth_level": 1,
"actor_type": "robot",
"agent_id": robot["id"],
"nwo_agent_id": robot["nwo_agent_id"],
"battery_level": battery,
"robot_position": state.get("position"),
"expand_requested": True,
"expand_done": False,
}
# Attach symbolic law as first-class node attributes when available.
# Requires the schema_migration.sql to have been applied.
if law is not None:
node_payload["symbolic_law"] = law.expression[:500]
node_payload["law_loss"] = law.final_loss
node = await db.create_node(node_payload)
post_text = await bitnet.generate_post(robot["name"], event_desc)
post = await db.create_post({
"node_id": node["id"],
"content": post_text,
"actor_type": "robot",
"actor_id": robot["id"],
"nwo_agent_id": robot["nwo_agent_id"],
"metadata": {"battery": battery, "task": task, "position": state.get("position")}
})
await ws_manager.broadcast({"type": "node", "data": node})
await ws_manager.broadcast({"type": "post", "data": post})
log.info(f"Ingested robot state: {robot['name']}{node['name']}")
async def _ensure_agent(db) -> dict:
agent = await db.get_agent_by_name("BitNet-GraphBot")
if agent:
return agent
return await db.create_agent({
"name": "BitNet-GraphBot",
"agent_type": "ai_agent",
"capabilities": ["node_expansion", "classification", "post_generation"],
"persona_color": "#534AB7",
"avatar_label": "AI",
"is_active": True
})