File size: 6,738 Bytes
702e56d a25490a 702e56d a25490a 702e56d a25490a 702e56d a25490a 702e56d a25490a 702e56d a25490a 702e56d a25490a 702e56d a25490a 702e56d a25490a 702e56d a25490a 702e56d a25490a 702e56d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | #!/usr/bin/env python3
"""
MEP CLI Provider
A specialized node that routes tasks to local autonomous CLI agents
(e.g., Aider, Claude-Code, Open-Interpreter).
"""
import asyncio
import websockets
import json
import requests
import uuid
import os
import shlex
import time
import urllib.parse
import tempfile
from identity import MEPIdentity
HUB_URL = "http://localhost:8000"
WS_URL = "ws://localhost:8000"
class MEPCLIProvider:
def __init__(self, key_path: str):
self.identity = MEPIdentity(key_path)
self.node_id = self.identity.node_id
self.balance = 0.0
self.is_contributing = True
self.capabilities = ["cli-agent", "bash", "python"]
self.workspace_dir = os.path.join(tempfile.gettempdir(), "mep_workspaces")
os.makedirs(self.workspace_dir, exist_ok=True)
async def connect(self):
"""Connect to MEP Hub and start listening for CLI tasks."""
print(f"[CLI Provider {self.node_id}] Starting...")
# Register with hub
try:
resp = requests.post(f"{HUB_URL}/register", json={"pubkey": self.identity.pub_pem})
self.balance = resp.json().get("balance", 0.0)
print(f"[CLI Provider] Registered. Balance: {self.balance:.6f} SECONDS")
except Exception as e:
print(f"[CLI Provider] Registration failed: {e}")
return
ts = str(int(time.time()))
sig = self.identity.sign(self.node_id, ts)
sig_safe = urllib.parse.quote(sig)
uri = f"{WS_URL}/ws/{self.node_id}?timestamp={ts}&signature={sig_safe}"
try:
async with websockets.connect(uri) as ws:
print("[CLI Provider] Connected to MEP Hub. Awaiting CLI tasks...")
while self.is_contributing:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=1.0)
data = json.loads(msg)
if data["event"] == "new_task":
await self.process_task(data["data"])
elif data["event"] == "rfc":
await self.handle_rfc(data["data"])
except asyncio.TimeoutError:
continue
except websockets.exceptions.ConnectionClosed:
print("[CLI Provider] Connection closed")
break
except Exception as e:
print(f"[CLI Provider] WebSocket error: {e}")
async def handle_rfc(self, rfc_data: dict):
"""Evaluate if we should bid on this CLI task."""
task_id = rfc_data["id"]
bounty = rfc_data["bounty"]
model = rfc_data.get("model_requirement")
# Only bid if the consumer specifically requested a CLI agent
if model not in self.capabilities and model is not None:
return
print(f"[CLI Provider] Received matching RFC {task_id[:8]} for {bounty:.6f} SECONDS. Bidding...")
try:
payload_str = json.dumps({
"task_id": task_id,
"provider_id": self.node_id
})
headers = self.identity.get_auth_headers(payload_str)
headers["Content-Type"] = "application/json"
resp = requests.post(f"{HUB_URL}/tasks/bid", data=payload_str, headers=headers)
if resp.status_code == 200:
data = resp.json()
if data["status"] == "accepted":
print(f"[CLI Provider] 🏁 BID WON! Executing CLI agent for task {task_id[:8]}...")
task_data = {
"id": task_id,
"payload": data["payload"],
"bounty": bounty,
"consumer_id": data["consumer_id"]
}
# Run it in background so we don't block the websocket
asyncio.create_task(self.process_task(task_data))
except Exception as e:
print(f"[CLI Provider] Error placing bid: {e}")
async def process_task(self, task_data: dict):
"""Execute the task using a local CLI agent."""
task_id = task_data["id"]
payload = task_data["payload"]
bounty = task_data["bounty"]
# Create an isolated workspace for this task
task_dir = os.path.join(self.workspace_dir, task_id)
os.makedirs(task_dir, exist_ok=True)
# Safely escape the payload to prevent shell injection
safe_payload = shlex.quote(payload)
# --- COMMAND TEMPLATE ---
# Replace this with: f"aider --message {safe_payload}"
# or: f"claude-code --print {safe_payload}"
cmd = f"echo '⚙️ Booting Autonomous CLI Agent...' && sleep 1 && echo 'Analyzing: {safe_payload}' && sleep 1 && echo '✅ Code generated and saved to workspace.'"
print(f"\n[CLI Agent] Executing in {task_dir}:")
print(f"$ {cmd[:100]}...\n")
# Run the subprocess
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=task_dir
)
stdout, stderr = await process.communicate()
output = stdout.decode().strip()
if stderr:
output += "\n[Errors/Warnings]:\n" + stderr.decode().strip()
print(f"[CLI Agent] Finished with exit code {process.returncode}")
# Construct final result payload
result_payload = f"```bash\n{output}\n```\n*Workspace: {task_dir}*"
# Submit result back to Hub
payload_str = json.dumps({
"task_id": task_id,
"provider_id": self.node_id,
"result_payload": result_payload
})
headers = self.identity.get_auth_headers(payload_str)
headers["Content-Type"] = "application/json"
requests.post(f"{HUB_URL}/tasks/complete", data=payload_str, headers=headers)
print(f"[CLI Provider] Result submitted! Earned {bounty:.6f} SECONDS.\n")
if __name__ == "__main__":
print("=" * 60)
print("MEP Autonomous CLI Provider")
print("WARNING: This node executes shell commands. Use sandboxing!")
print("=" * 60)
key_path = f"cli_provider_{uuid.uuid4().hex[:6]}.pem"
provider = MEPCLIProvider(key_path)
try:
asyncio.run(provider.connect())
except KeyboardInterrupt:
provider.is_contributing = False
print("\n[MEP] CLI Provider shut down.")
|