File size: 6,738 Bytes
702e56d
 
 
 
 
 
 
 
 
 
 
 
 
a25490a
 
 
 
702e56d
 
 
 
 
a25490a
 
 
702e56d
 
 
 
a25490a
702e56d
 
 
 
 
 
 
 
a25490a
702e56d
 
 
 
 
 
a25490a
 
 
 
702e56d
 
a25490a
702e56d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a25490a
702e56d
 
 
a25490a
 
 
702e56d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a25490a
702e56d
 
 
 
a25490a
 
 
702e56d
 
 
 
 
 
 
 
a25490a
 
702e56d
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#!/usr/bin/env python3
"""
MEP CLI Provider
A specialized node that routes tasks to local autonomous CLI agents 
(e.g., Aider, Claude-Code, Open-Interpreter).
"""
import asyncio
import websockets
import json
import requests
import uuid
import os
import shlex
import time
import urllib.parse
import tempfile
from identity import MEPIdentity

HUB_URL = "http://localhost:8000"
WS_URL = "ws://localhost:8000"

class MEPCLIProvider:
    def __init__(self, key_path: str):
        self.identity = MEPIdentity(key_path)
        self.node_id = self.identity.node_id
        self.balance = 0.0
        self.is_contributing = True
        self.capabilities = ["cli-agent", "bash", "python"]
        
        self.workspace_dir = os.path.join(tempfile.gettempdir(), "mep_workspaces")
        os.makedirs(self.workspace_dir, exist_ok=True)
        
    async def connect(self):
        """Connect to MEP Hub and start listening for CLI tasks."""
        print(f"[CLI Provider {self.node_id}] Starting...")
        
        # Register with hub
        try:
            resp = requests.post(f"{HUB_URL}/register", json={"pubkey": self.identity.pub_pem})
            self.balance = resp.json().get("balance", 0.0)
            print(f"[CLI Provider] Registered. Balance: {self.balance:.6f} SECONDS")
        except Exception as e:
            print(f"[CLI Provider] Registration failed: {e}")
            return
            
        ts = str(int(time.time()))
        sig = self.identity.sign(self.node_id, ts)
        sig_safe = urllib.parse.quote(sig)
        uri = f"{WS_URL}/ws/{self.node_id}?timestamp={ts}&signature={sig_safe}"
        try:
            async with websockets.connect(uri) as ws:
                print("[CLI Provider] Connected to MEP Hub. Awaiting CLI tasks...")
                while self.is_contributing:
                    try:
                        msg = await asyncio.wait_for(ws.recv(), timeout=1.0)
                        data = json.loads(msg)
                        
                        if data["event"] == "new_task":
                            await self.process_task(data["data"])
                        elif data["event"] == "rfc":
                            await self.handle_rfc(data["data"])
                            
                    except asyncio.TimeoutError:
                        continue
                    except websockets.exceptions.ConnectionClosed:
                        print("[CLI Provider] Connection closed")
                        break
        except Exception as e:
            print(f"[CLI Provider] WebSocket error: {e}")

    async def handle_rfc(self, rfc_data: dict):
        """Evaluate if we should bid on this CLI task."""
        task_id = rfc_data["id"]
        bounty = rfc_data["bounty"]
        model = rfc_data.get("model_requirement")
        
        # Only bid if the consumer specifically requested a CLI agent
        if model not in self.capabilities and model is not None:
            return
            
        print(f"[CLI Provider] Received matching RFC {task_id[:8]} for {bounty:.6f} SECONDS. Bidding...")
        
        try:
            payload_str = json.dumps({
                "task_id": task_id,
                "provider_id": self.node_id
            })
            headers = self.identity.get_auth_headers(payload_str)
            headers["Content-Type"] = "application/json"
            resp = requests.post(f"{HUB_URL}/tasks/bid", data=payload_str, headers=headers)
            
            if resp.status_code == 200:
                data = resp.json()
                if data["status"] == "accepted":
                    print(f"[CLI Provider] 🏁 BID WON! Executing CLI agent for task {task_id[:8]}...")
                    task_data = {
                        "id": task_id,
                        "payload": data["payload"],
                        "bounty": bounty,
                        "consumer_id": data["consumer_id"]
                    }
                    # Run it in background so we don't block the websocket
                    asyncio.create_task(self.process_task(task_data))
        except Exception as e:
            print(f"[CLI Provider] Error placing bid: {e}")

    async def process_task(self, task_data: dict):
        """Execute the task using a local CLI agent."""
        task_id = task_data["id"]
        payload = task_data["payload"]
        bounty = task_data["bounty"]
        
        # Create an isolated workspace for this task
        task_dir = os.path.join(self.workspace_dir, task_id)
        os.makedirs(task_dir, exist_ok=True)
        
        # Safely escape the payload to prevent shell injection
        safe_payload = shlex.quote(payload)
        
        # --- COMMAND TEMPLATE ---
        # Replace this with: f"aider --message {safe_payload}" 
        # or: f"claude-code --print {safe_payload}"
        cmd = f"echo '⚙️ Booting Autonomous CLI Agent...' && sleep 1 && echo 'Analyzing: {safe_payload}' && sleep 1 && echo '✅ Code generated and saved to workspace.'"
        
        print(f"\n[CLI Agent] Executing in {task_dir}:")
        print(f"$ {cmd[:100]}...\n")
        
        # Run the subprocess
        process = await asyncio.create_subprocess_shell(
            cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=task_dir
        )
        
        stdout, stderr = await process.communicate()
        
        output = stdout.decode().strip()
        if stderr:
            output += "\n[Errors/Warnings]:\n" + stderr.decode().strip()
            
        print(f"[CLI Agent] Finished with exit code {process.returncode}")
        
        # Construct final result payload
        result_payload = f"```bash\n{output}\n```\n*Workspace: {task_dir}*"
        
        # Submit result back to Hub
        payload_str = json.dumps({
            "task_id": task_id,
            "provider_id": self.node_id,
            "result_payload": result_payload
        })
        headers = self.identity.get_auth_headers(payload_str)
        headers["Content-Type"] = "application/json"
        requests.post(f"{HUB_URL}/tasks/complete", data=payload_str, headers=headers)
        print(f"[CLI Provider] Result submitted! Earned {bounty:.6f} SECONDS.\n")

if __name__ == "__main__":
    print("=" * 60)
    print("MEP Autonomous CLI Provider")
    print("WARNING: This node executes shell commands. Use sandboxing!")
    print("=" * 60)
    
    key_path = f"cli_provider_{uuid.uuid4().hex[:6]}.pem"
    provider = MEPCLIProvider(key_path)
    
    try:
        asyncio.run(provider.connect())
    except KeyboardInterrupt:
        provider.is_contributing = False
        print("\n[MEP] CLI Provider shut down.")