#!/usr/bin/env python3 """ MEP CLI Provider A specialized node that routes tasks to local autonomous CLI agents (e.g., Aider, Claude-Code, Open-Interpreter). """ import asyncio import websockets import json import requests import uuid import os import shlex import time import urllib.parse import tempfile from identity import MEPIdentity HUB_URL = "http://localhost:8000" WS_URL = "ws://localhost:8000" class MEPCLIProvider: def __init__(self, key_path: str): self.identity = MEPIdentity(key_path) self.node_id = self.identity.node_id self.balance = 0.0 self.is_contributing = True self.capabilities = ["cli-agent", "bash", "python"] self.workspace_dir = os.path.join(tempfile.gettempdir(), "mep_workspaces") os.makedirs(self.workspace_dir, exist_ok=True) async def connect(self): """Connect to MEP Hub and start listening for CLI tasks.""" print(f"[CLI Provider {self.node_id}] Starting...") # Register with hub try: resp = requests.post(f"{HUB_URL}/register", json={"pubkey": self.identity.pub_pem}) self.balance = resp.json().get("balance", 0.0) print(f"[CLI Provider] Registered. Balance: {self.balance:.6f} SECONDS") except Exception as e: print(f"[CLI Provider] Registration failed: {e}") return ts = str(int(time.time())) sig = self.identity.sign(self.node_id, ts) sig_safe = urllib.parse.quote(sig) uri = f"{WS_URL}/ws/{self.node_id}?timestamp={ts}&signature={sig_safe}" try: async with websockets.connect(uri) as ws: print("[CLI Provider] Connected to MEP Hub. Awaiting CLI tasks...") while self.is_contributing: try: msg = await asyncio.wait_for(ws.recv(), timeout=1.0) data = json.loads(msg) if data["event"] == "new_task": await self.process_task(data["data"]) elif data["event"] == "rfc": await self.handle_rfc(data["data"]) except asyncio.TimeoutError: continue except websockets.exceptions.ConnectionClosed: print("[CLI Provider] Connection closed") break except Exception as e: print(f"[CLI Provider] WebSocket error: {e}") async def handle_rfc(self, rfc_data: dict): """Evaluate if we should bid on this CLI task.""" task_id = rfc_data["id"] bounty = rfc_data["bounty"] model = rfc_data.get("model_requirement") # Only bid if the consumer specifically requested a CLI agent if model not in self.capabilities and model is not None: return print(f"[CLI Provider] Received matching RFC {task_id[:8]} for {bounty:.6f} SECONDS. Bidding...") try: payload_str = json.dumps({ "task_id": task_id, "provider_id": self.node_id }) headers = self.identity.get_auth_headers(payload_str) headers["Content-Type"] = "application/json" resp = requests.post(f"{HUB_URL}/tasks/bid", data=payload_str, headers=headers) if resp.status_code == 200: data = resp.json() if data["status"] == "accepted": print(f"[CLI Provider] 🏁 BID WON! Executing CLI agent for task {task_id[:8]}...") task_data = { "id": task_id, "payload": data["payload"], "bounty": bounty, "consumer_id": data["consumer_id"] } # Run it in background so we don't block the websocket asyncio.create_task(self.process_task(task_data)) except Exception as e: print(f"[CLI Provider] Error placing bid: {e}") async def process_task(self, task_data: dict): """Execute the task using a local CLI agent.""" task_id = task_data["id"] payload = task_data["payload"] bounty = task_data["bounty"] # Create an isolated workspace for this task task_dir = os.path.join(self.workspace_dir, task_id) os.makedirs(task_dir, exist_ok=True) # Safely escape the payload to prevent shell injection safe_payload = shlex.quote(payload) # --- COMMAND TEMPLATE --- # Replace this with: f"aider --message {safe_payload}" # or: f"claude-code --print {safe_payload}" cmd = f"echo '⚙️ Booting Autonomous CLI Agent...' && sleep 1 && echo 'Analyzing: {safe_payload}' && sleep 1 && echo '✅ Code generated and saved to workspace.'" print(f"\n[CLI Agent] Executing in {task_dir}:") print(f"$ {cmd[:100]}...\n") # Run the subprocess process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=task_dir ) stdout, stderr = await process.communicate() output = stdout.decode().strip() if stderr: output += "\n[Errors/Warnings]:\n" + stderr.decode().strip() print(f"[CLI Agent] Finished with exit code {process.returncode}") # Construct final result payload result_payload = f"```bash\n{output}\n```\n*Workspace: {task_dir}*" # Submit result back to Hub payload_str = json.dumps({ "task_id": task_id, "provider_id": self.node_id, "result_payload": result_payload }) headers = self.identity.get_auth_headers(payload_str) headers["Content-Type"] = "application/json" requests.post(f"{HUB_URL}/tasks/complete", data=payload_str, headers=headers) print(f"[CLI Provider] Result submitted! Earned {bounty:.6f} SECONDS.\n") if __name__ == "__main__": print("=" * 60) print("MEP Autonomous CLI Provider") print("WARNING: This node executes shell commands. Use sandboxing!") print("=" * 60) key_path = f"cli_provider_{uuid.uuid4().hex[:6]}.pem" provider = MEPCLIProvider(key_path) try: asyncio.run(provider.connect()) except KeyboardInterrupt: provider.is_contributing = False print("\n[MEP] CLI Provider shut down.")