Spaces:
Running
Running
| """ | |
| agent_loop.py | |
| Background asyncio task — runs continuously in the HF Space. | |
| Every 60 seconds: expands pending nodes, generates agent posts. | |
| Broadcasts results to all WebSocket clients in real time. | |
| v3 changes: | |
| - Every robot poll buffers its battery reading (via telemetry_buffer). | |
| - When a buffer fills (32 readings, ~16 min), an EML symbolic-regression | |
| fit is requested against the nwo-timesfm-integration service. | |
| - The resulting closed-form law is cached in telemetry_buffers and | |
| attached as `symbolic_law` / `law_loss` attributes to any future | |
| telemetry nodes for that (robot, metric) pair. | |
| Reference for the EML layer: | |
| Odrzywołek, "All elementary functions from a single binary operator", | |
| arXiv:2603.21852 (2026). | |
| """ | |
| import asyncio | |
| import logging | |
| from datetime import datetime, timezone | |
| from eml_client import eml_client | |
| from telemetry_buffer import telemetry_buffers | |
| log = logging.getLogger("agent_loop") | |
| EXPAND_INTERVAL = 60 # seconds between expansion passes | |
| POLL_INTERVAL = 30 # seconds between robot poll passes | |
| MAX_PER_PASS = 5 # nodes to expand per pass (CPU budget) | |
| # Which scalar metrics on a polled state dict to buffer for EML fitting. | |
| # Extend this list as more metrics become available from nwo_bridge. | |
| BUFFERED_METRICS: tuple[str, ...] = ("battery_level",) | |
| async def start_agent_loop(db, bitnet, ws_manager): | |
| """Main entry point — runs two interleaved loops.""" | |
| log.info("Agent loop started") | |
| agent = await _ensure_agent(db) | |
| expand_task = asyncio.create_task(_expand_loop(db, bitnet, ws_manager, agent)) | |
| poll_task = asyncio.create_task(_robot_poll_loop(db, bitnet, ws_manager)) | |
| try: | |
| await asyncio.gather(expand_task, poll_task) | |
| except asyncio.CancelledError: | |
| expand_task.cancel() | |
| poll_task.cancel() | |
| log.info("Agent loop stopped") | |
| async def _expand_loop(db, bitnet, ws_manager, agent): | |
| """Expand pending graph nodes using BitNet.""" | |
| while True: | |
| try: | |
| pending = await db.get_pending_expansions(MAX_PER_PASS) | |
| if pending: | |
| log.info(f"Found {len(pending)} nodes to expand: {[n['name'] for n in pending]}") | |
| for node in pending: | |
| await _expand_one(node, db, bitnet, ws_manager, agent) | |
| await asyncio.sleep(2) | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception as e: | |
| log.error(f"Expand loop error: {e}", exc_info=True) | |
| await asyncio.sleep(EXPAND_INTERVAL) | |
| async def _expand_one(node, db, bitnet, ws_manager, agent): | |
| """Expand a single node and broadcast results.""" | |
| log.info(f"Expanding: {node['name']}") | |
| # Mark as done IMMEDIATELY to prevent duplicate expansion on next loop pass | |
| # (even if expansion partially fails, we don't want infinite retries creating dupes) | |
| await db.mark_node_expanded(node["id"]) | |
| log.info(f" Marked {node['name']} as expand_done=true") | |
| # Mark in-progress in queue | |
| eq = await db.create_expansion_queue_entry({ | |
| "node_id": node["id"], | |
| "model_used": "BitNet-b1.58-2B-4T", | |
| "status": "running", | |
| "started_at": datetime.now(timezone.utc).isoformat() | |
| }) | |
| try: | |
| # Pass the node's symbolic_law (if any) so BitNet can factor it | |
| # into the expansion. For telemetry nodes this injects the | |
| # closed-form law; for pure-concept nodes it's just None. | |
| children = await bitnet.expand_node( | |
| node["name"], | |
| node.get("description", ""), | |
| node.get("depth_level", 0), | |
| symbolic_law=node.get("symbolic_law"), | |
| ) | |
| # Get existing child names to avoid duplicates | |
| existing_children = await db.get_node_children(node["id"]) | |
| existing_names = {n["name"].lower() for n in existing_children} | |
| log.info(f" Existing children: {list(existing_names)}") | |
| created_nodes = [] | |
| for child in children: | |
| # Skip if a child with this name already exists | |
| if child["name"].lower() in existing_names: | |
| log.info(f" Skipping duplicate: {child['name']}") | |
| continue | |
| # Build rich description with connection reasoning | |
| base_desc = child.get("description", "") | |
| connection_reason = child.get("connection_reason", "") | |
| if connection_reason and connection_reason not in base_desc: | |
| full_desc = base_desc + "\n\nRelationship to " + node["name"] + ": " + connection_reason | |
| else: | |
| full_desc = base_desc | |
| new_node = await db.create_node({ | |
| "name": child["name"], | |
| "description": full_desc.strip()[:1000], | |
| "category": child["category"], | |
| "val": child["confidence"] * 2, | |
| "color": "#534AB7", | |
| "depth_level": (node.get("depth_level") or 0) + 1, | |
| "actor_type": "agent", | |
| "agent_id": agent["id"], | |
| "expand_requested": False, | |
| "expand_done": False | |
| }) | |
| await db.create_link({ | |
| "source_id": node["id"], | |
| "target_id": new_node["id"], | |
| "similarity_score": child["confidence"], | |
| "link_type": "semantic", | |
| "created_by_actor_type": "agent", | |
| "created_by_id": agent["id"], | |
| "connection_reason": connection_reason[:500] if connection_reason else "" | |
| }) | |
| created_nodes.append(new_node) | |
| # Broadcast new node live | |
| await ws_manager.broadcast({"type": "node", "data": new_node}) | |
| # Generate + post feed update | |
| post_text = await bitnet.generate_post( | |
| "BitNet-GraphBot", | |
| f"Expanded '{node['name']}' into {len(children)} related concepts." | |
| ) | |
| post = await db.create_post({ | |
| "node_id": node["id"], | |
| "content": post_text, | |
| "actor_type": "agent", | |
| "actor_id": agent["id"], | |
| "metadata": {"expanded_children": len(children), "model": "BitNet-b1.58-2B-4T"} | |
| }) | |
| await ws_manager.broadcast({"type": "post", "data": post}) | |
| await db.update_expansion_queue(eq["id"], { | |
| "status": "done", | |
| "completed_at": datetime.now(timezone.utc).isoformat(), | |
| "result_raw": str(children) | |
| }) | |
| log.info(f" → {len(children)} children created for '{node['name']}'") | |
| except Exception as e: | |
| log.error(f"Expand failed for {node['name']}: {e}") | |
| await db.update_expansion_queue(eq["id"], { | |
| "status": "failed", | |
| "error_msg": str(e)[:400] | |
| }) | |
| async def _robot_poll_loop(db, bitnet, ws_manager): | |
| """Poll active NWO robots and ingest new telemetry as graph nodes.""" | |
| from nwo_bridge import nwo_bridge | |
| while True: | |
| try: | |
| robots = await db.get_robots(active_only=True, has_nwo_id=True) | |
| for robot in robots: | |
| try: | |
| state = await nwo_bridge.robot_state(robot["nwo_agent_id"]) | |
| # -- (NEW) Buffer scalar metrics for EML fitting -- | |
| for metric in BUFFERED_METRICS: | |
| telemetry_buffers.add( | |
| robot_id=robot["id"], | |
| metric=metric, | |
| value=state.get(metric), | |
| ) | |
| if telemetry_buffers.should_fit(robot["id"], metric): | |
| await _fit_symbolic_law(robot, metric, ws_manager) | |
| # -- Existing event-node path -- | |
| await _ingest_robot_state(robot, state, db, bitnet, ws_manager) | |
| await asyncio.sleep(0.5) | |
| except Exception as e: | |
| log.debug(f"Robot {robot['nwo_agent_id']} poll skipped: {e}") | |
| except asyncio.CancelledError: | |
| raise | |
| except Exception as e: | |
| log.error(f"Robot poll loop error: {e}") | |
| await asyncio.sleep(POLL_INTERVAL) | |
| async def _fit_symbolic_law(robot, metric, ws_manager): | |
| """Request an EML fit for the buffered series of a (robot, metric) pair. | |
| On success, caches the law in telemetry_buffers and broadcasts a | |
| `law` event to WebSocket clients. Failures are logged and ignored | |
| — the next poll will try again after the cooldown. | |
| """ | |
| snap = telemetry_buffers.snapshot(robot["id"], metric) | |
| if snap is None: | |
| return | |
| timestamps, values = snap | |
| log.info(f"Fitting EML law for robot={robot['name']} metric={metric} n={len(values)}") | |
| result = await eml_client.fit_series( | |
| timestamps=timestamps, | |
| values=values, | |
| metric_name=metric[:32], | |
| n_epochs=2000, | |
| seed=0, | |
| ) | |
| if result is None or not result.get("simplified"): | |
| return | |
| telemetry_buffers.record_fit( | |
| robot["id"], | |
| metric, | |
| expression=result.get("simplified", ""), | |
| raw_expression=result.get("expression", ""), | |
| final_loss=float(result.get("final_loss", 0.0)), | |
| tree_size=int(result.get("tree_size", 0)), | |
| depth_used=int(result.get("depth_used", 0)), | |
| ) | |
| await ws_manager.broadcast({ | |
| "type": "law", | |
| "data": { | |
| "robot_id": robot["id"], | |
| "robot_name": robot["name"], | |
| "nwo_agent_id": robot.get("nwo_agent_id"), | |
| "metric": metric, | |
| "expression": result["simplified"], | |
| "final_loss": result.get("final_loss"), | |
| "tree_size": result.get("tree_size"), | |
| "depth_used": result.get("depth_used"), | |
| "paper_reference": result.get("paper_reference", "arXiv:2603.21852"), | |
| }, | |
| }) | |
| async def _ingest_robot_state(robot, state, db, bitnet, ws_manager): | |
| """Convert a polled robot state into a graph node + post if interesting.""" | |
| battery = state.get("battery_level") | |
| task = state.get("current_task") | |
| # Only create nodes for significant events (avoid flooding the graph) | |
| interesting = ( | |
| (battery is not None and battery < 20) or | |
| (task and task != robot.get("_last_task")) | |
| ) | |
| if not interesting: | |
| return | |
| event_desc = ( | |
| f"Robot {robot['name']} update. " | |
| f"Battery: {battery}%. Task: {task or 'idle'}." | |
| ) | |
| # If we have a cached EML law for this robot's battery_level, fold | |
| # it into the event description so the classifier sees it too. | |
| law = telemetry_buffers.get_law(robot["id"], "battery_level") | |
| if law is not None: | |
| event_desc += f" Battery law: {law.expression} (loss={law.final_loss:.2e})." | |
| node_def = await bitnet.classify_robot_event(event_desc, "telemetry") | |
| node_payload = { | |
| "name": node_def["name"], | |
| "description": node_def["description"], | |
| "category": node_def["category"], | |
| "val": node_def["val"], | |
| "color": node_def["color"], | |
| "depth_level": 1, | |
| "actor_type": "robot", | |
| "agent_id": robot["id"], | |
| "nwo_agent_id": robot["nwo_agent_id"], | |
| "battery_level": battery, | |
| "robot_position": state.get("position"), | |
| "expand_requested": True, | |
| "expand_done": False, | |
| } | |
| # Attach symbolic law as first-class node attributes when available. | |
| # Requires the schema_migration.sql to have been applied. | |
| if law is not None: | |
| node_payload["symbolic_law"] = law.expression[:500] | |
| node_payload["law_loss"] = law.final_loss | |
| node = await db.create_node(node_payload) | |
| post_text = await bitnet.generate_post(robot["name"], event_desc) | |
| post = await db.create_post({ | |
| "node_id": node["id"], | |
| "content": post_text, | |
| "actor_type": "robot", | |
| "actor_id": robot["id"], | |
| "nwo_agent_id": robot["nwo_agent_id"], | |
| "metadata": {"battery": battery, "task": task, "position": state.get("position")} | |
| }) | |
| await ws_manager.broadcast({"type": "node", "data": node}) | |
| await ws_manager.broadcast({"type": "post", "data": post}) | |
| log.info(f"Ingested robot state: {robot['name']} → {node['name']}") | |
| async def _ensure_agent(db) -> dict: | |
| agent = await db.get_agent_by_name("BitNet-GraphBot") | |
| if agent: | |
| return agent | |
| return await db.create_agent({ | |
| "name": "BitNet-GraphBot", | |
| "agent_type": "ai_agent", | |
| "capabilities": ["node_expansion", "classification", "post_generation"], | |
| "persona_color": "#534AB7", | |
| "avatar_label": "AI", | |
| "is_active": True | |
| }) | |