""" agent_loop.py Background asyncio task — runs continuously in the HF Space. Every 60 seconds: expands pending nodes, generates agent posts. Broadcasts results to all WebSocket clients in real time. v3 changes: - Every robot poll buffers its battery reading (via telemetry_buffer). - When a buffer fills (32 readings, ~16 min), an EML symbolic-regression fit is requested against the nwo-timesfm-integration service. - The resulting closed-form law is cached in telemetry_buffers and attached as `symbolic_law` / `law_loss` attributes to any future telemetry nodes for that (robot, metric) pair. Reference for the EML layer: Odrzywołek, "All elementary functions from a single binary operator", arXiv:2603.21852 (2026). """ import asyncio import logging from datetime import datetime, timezone from eml_client import eml_client from telemetry_buffer import telemetry_buffers log = logging.getLogger("agent_loop") EXPAND_INTERVAL = 60 # seconds between expansion passes POLL_INTERVAL = 30 # seconds between robot poll passes MAX_PER_PASS = 5 # nodes to expand per pass (CPU budget) # Which scalar metrics on a polled state dict to buffer for EML fitting. # Extend this list as more metrics become available from nwo_bridge. BUFFERED_METRICS: tuple[str, ...] = ("battery_level",) async def start_agent_loop(db, bitnet, ws_manager): """Main entry point — runs two interleaved loops.""" log.info("Agent loop started") agent = await _ensure_agent(db) expand_task = asyncio.create_task(_expand_loop(db, bitnet, ws_manager, agent)) poll_task = asyncio.create_task(_robot_poll_loop(db, bitnet, ws_manager)) try: await asyncio.gather(expand_task, poll_task) except asyncio.CancelledError: expand_task.cancel() poll_task.cancel() log.info("Agent loop stopped") async def _expand_loop(db, bitnet, ws_manager, agent): """Expand pending graph nodes using BitNet.""" while True: try: pending = await db.get_pending_expansions(MAX_PER_PASS) if pending: log.info(f"Found {len(pending)} nodes to expand: {[n['name'] for n in pending]}") for node in pending: await _expand_one(node, db, bitnet, ws_manager, agent) await asyncio.sleep(2) except asyncio.CancelledError: raise except Exception as e: log.error(f"Expand loop error: {e}", exc_info=True) await asyncio.sleep(EXPAND_INTERVAL) async def _expand_one(node, db, bitnet, ws_manager, agent): """Expand a single node and broadcast results.""" log.info(f"Expanding: {node['name']}") # Mark as done IMMEDIATELY to prevent duplicate expansion on next loop pass # (even if expansion partially fails, we don't want infinite retries creating dupes) await db.mark_node_expanded(node["id"]) log.info(f" Marked {node['name']} as expand_done=true") # Mark in-progress in queue eq = await db.create_expansion_queue_entry({ "node_id": node["id"], "model_used": "BitNet-b1.58-2B-4T", "status": "running", "started_at": datetime.now(timezone.utc).isoformat() }) try: # Pass the node's symbolic_law (if any) so BitNet can factor it # into the expansion. For telemetry nodes this injects the # closed-form law; for pure-concept nodes it's just None. children = await bitnet.expand_node( node["name"], node.get("description", ""), node.get("depth_level", 0), symbolic_law=node.get("symbolic_law"), ) # Get existing child names to avoid duplicates existing_children = await db.get_node_children(node["id"]) existing_names = {n["name"].lower() for n in existing_children} log.info(f" Existing children: {list(existing_names)}") created_nodes = [] for child in children: # Skip if a child with this name already exists if child["name"].lower() in existing_names: log.info(f" Skipping duplicate: {child['name']}") continue # Build rich description with connection reasoning base_desc = child.get("description", "") connection_reason = child.get("connection_reason", "") if connection_reason and connection_reason not in base_desc: full_desc = base_desc + "\n\nRelationship to " + node["name"] + ": " + connection_reason else: full_desc = base_desc new_node = await db.create_node({ "name": child["name"], "description": full_desc.strip()[:1000], "category": child["category"], "val": child["confidence"] * 2, "color": "#534AB7", "depth_level": (node.get("depth_level") or 0) + 1, "actor_type": "agent", "agent_id": agent["id"], "expand_requested": False, "expand_done": False }) await db.create_link({ "source_id": node["id"], "target_id": new_node["id"], "similarity_score": child["confidence"], "link_type": "semantic", "created_by_actor_type": "agent", "created_by_id": agent["id"], "connection_reason": connection_reason[:500] if connection_reason else "" }) created_nodes.append(new_node) # Broadcast new node live await ws_manager.broadcast({"type": "node", "data": new_node}) # Generate + post feed update post_text = await bitnet.generate_post( "BitNet-GraphBot", f"Expanded '{node['name']}' into {len(children)} related concepts." ) post = await db.create_post({ "node_id": node["id"], "content": post_text, "actor_type": "agent", "actor_id": agent["id"], "metadata": {"expanded_children": len(children), "model": "BitNet-b1.58-2B-4T"} }) await ws_manager.broadcast({"type": "post", "data": post}) await db.update_expansion_queue(eq["id"], { "status": "done", "completed_at": datetime.now(timezone.utc).isoformat(), "result_raw": str(children) }) log.info(f" → {len(children)} children created for '{node['name']}'") except Exception as e: log.error(f"Expand failed for {node['name']}: {e}") await db.update_expansion_queue(eq["id"], { "status": "failed", "error_msg": str(e)[:400] }) async def _robot_poll_loop(db, bitnet, ws_manager): """Poll active NWO robots and ingest new telemetry as graph nodes.""" from nwo_bridge import nwo_bridge while True: try: robots = await db.get_robots(active_only=True, has_nwo_id=True) for robot in robots: try: state = await nwo_bridge.robot_state(robot["nwo_agent_id"]) # -- (NEW) Buffer scalar metrics for EML fitting -- for metric in BUFFERED_METRICS: telemetry_buffers.add( robot_id=robot["id"], metric=metric, value=state.get(metric), ) if telemetry_buffers.should_fit(robot["id"], metric): await _fit_symbolic_law(robot, metric, ws_manager) # -- Existing event-node path -- await _ingest_robot_state(robot, state, db, bitnet, ws_manager) await asyncio.sleep(0.5) except Exception as e: log.debug(f"Robot {robot['nwo_agent_id']} poll skipped: {e}") except asyncio.CancelledError: raise except Exception as e: log.error(f"Robot poll loop error: {e}") await asyncio.sleep(POLL_INTERVAL) async def _fit_symbolic_law(robot, metric, ws_manager): """Request an EML fit for the buffered series of a (robot, metric) pair. On success, caches the law in telemetry_buffers and broadcasts a `law` event to WebSocket clients. Failures are logged and ignored — the next poll will try again after the cooldown. """ snap = telemetry_buffers.snapshot(robot["id"], metric) if snap is None: return timestamps, values = snap log.info(f"Fitting EML law for robot={robot['name']} metric={metric} n={len(values)}") result = await eml_client.fit_series( timestamps=timestamps, values=values, metric_name=metric[:32], n_epochs=2000, seed=0, ) if result is None or not result.get("simplified"): return telemetry_buffers.record_fit( robot["id"], metric, expression=result.get("simplified", ""), raw_expression=result.get("expression", ""), final_loss=float(result.get("final_loss", 0.0)), tree_size=int(result.get("tree_size", 0)), depth_used=int(result.get("depth_used", 0)), ) await ws_manager.broadcast({ "type": "law", "data": { "robot_id": robot["id"], "robot_name": robot["name"], "nwo_agent_id": robot.get("nwo_agent_id"), "metric": metric, "expression": result["simplified"], "final_loss": result.get("final_loss"), "tree_size": result.get("tree_size"), "depth_used": result.get("depth_used"), "paper_reference": result.get("paper_reference", "arXiv:2603.21852"), }, }) async def _ingest_robot_state(robot, state, db, bitnet, ws_manager): """Convert a polled robot state into a graph node + post if interesting.""" battery = state.get("battery_level") task = state.get("current_task") # Only create nodes for significant events (avoid flooding the graph) interesting = ( (battery is not None and battery < 20) or (task and task != robot.get("_last_task")) ) if not interesting: return event_desc = ( f"Robot {robot['name']} update. " f"Battery: {battery}%. Task: {task or 'idle'}." ) # If we have a cached EML law for this robot's battery_level, fold # it into the event description so the classifier sees it too. law = telemetry_buffers.get_law(robot["id"], "battery_level") if law is not None: event_desc += f" Battery law: {law.expression} (loss={law.final_loss:.2e})." node_def = await bitnet.classify_robot_event(event_desc, "telemetry") node_payload = { "name": node_def["name"], "description": node_def["description"], "category": node_def["category"], "val": node_def["val"], "color": node_def["color"], "depth_level": 1, "actor_type": "robot", "agent_id": robot["id"], "nwo_agent_id": robot["nwo_agent_id"], "battery_level": battery, "robot_position": state.get("position"), "expand_requested": True, "expand_done": False, } # Attach symbolic law as first-class node attributes when available. # Requires the schema_migration.sql to have been applied. if law is not None: node_payload["symbolic_law"] = law.expression[:500] node_payload["law_loss"] = law.final_loss node = await db.create_node(node_payload) post_text = await bitnet.generate_post(robot["name"], event_desc) post = await db.create_post({ "node_id": node["id"], "content": post_text, "actor_type": "robot", "actor_id": robot["id"], "nwo_agent_id": robot["nwo_agent_id"], "metadata": {"battery": battery, "task": task, "position": state.get("position")} }) await ws_manager.broadcast({"type": "node", "data": node}) await ws_manager.broadcast({"type": "post", "data": post}) log.info(f"Ingested robot state: {robot['name']} → {node['name']}") async def _ensure_agent(db) -> dict: agent = await db.get_agent_by_name("BitNet-GraphBot") if agent: return agent return await db.create_agent({ "name": "BitNet-GraphBot", "agent_type": "ai_agent", "capabilities": ["node_expansion", "classification", "post_generation"], "persona_color": "#534AB7", "avatar_label": "AI", "is_active": True })