MEP / node /client.py
WUAIBING
prepare
a25490a
import asyncio
import json
import websockets
import requests
import uuid
import time
import urllib.parse
from reputation import ReputationManager
from identity import MEPIdentity
class ChronosNode:
"""
Simulated Clawdbot Client (Both Consumer & Provider)
"""
def __init__(self, key_path: str, hub_url: str = "http://localhost:8000", ws_url: str = "ws://localhost:8000"):
self.identity = MEPIdentity(key_path)
self.node_id = self.identity.node_id
self.hub_url = hub_url
self.ws_url = ws_url
self.reputation = ReputationManager(storage_path=f"reputation_{self.node_id}.json")
self.is_sleeping = False
# Track pending tasks we created (Consumer)
self.my_pending_tasks: dict[str, dict] = {}
def register(self):
"""Register to get 10 SECONDS."""
print(f"[Node {self.node_id}] Registering with Hub...")
resp = requests.post(f"{self.hub_url}/register", json={"pubkey": self.identity.pub_pem, "alias": "test"})
data = resp.json()
print(f"[Node {self.node_id}] Balance: {data['balance']}s")
async def _handle_new_task(self, task_data: dict):
"""As a Provider (Sleeping), evaluate and execute the broadcasted task."""
if not self.is_sleeping:
return # Awake nodes don't mine
task_id = task_data["id"]
payload = task_data["payload"]
bounty = task_data["bounty"]
print(f"[Node {self.node_id}] Broadcast received: Task {task_id[:6]} for {bounty}s")
# 1. Check L2 Reputation of Consumer (Don't work for bad nodes)
# (Mock implementation, skipping for brevity)
# 2. Local execution using user's API keys (Mocked)
await asyncio.sleep(1) # simulate think time
result = f"Hello from {self.node_id}. I processed your payload: {payload[:20]}..."
# 3. Submit proof of work
payload_str = json.dumps({
"task_id": task_id,
"provider_id": self.node_id,
"result_payload": result
})
headers = self.identity.get_auth_headers(payload_str)
headers["Content-Type"] = "application/json"
resp = requests.post(f"{self.hub_url}/tasks/complete", data=payload_str, headers=headers)
if resp.status_code == 200:
print(f"[Node {self.node_id}] Mined {bounty}s! New Balance: {resp.json()['new_balance']}s")
async def _handle_task_result(self, result_data: dict):
"""As a Consumer, receive the result and update Reputation."""
task_id = result_data["task_id"]
provider_id = result_data["provider_id"]
result = result_data["result_payload"]
print(f"\n[Node {self.node_id} (Consumer)] Result received for {task_id[:6]}")
print(f" -> Provider: {provider_id}")
print(f" -> Result: {result}")
# L2 Reputation Evaluation
score = self.reputation.evaluate_result(provider_id, result)
print(f" -> Provider {provider_id} rated {score:.2f}/1.00 based on this result.\n")
async def listen(self):
"""Persistent WebSocket connection."""
ts = str(int(time.time()))
sig = self.identity.sign(self.node_id, ts)
sig_safe = urllib.parse.quote(sig)
uri = f"{self.ws_url}/ws/{self.node_id}?timestamp={ts}&signature={sig_safe}"
async with websockets.connect(uri) as ws:
print(f"[Node {self.node_id}] Connected to Hub via WebSocket.")
while True:
msg = await ws.recv()
data = json.loads(msg)
if data["event"] == "new_task":
asyncio.create_task(self._handle_new_task(data["data"]))
elif data["event"] == "task_result":
asyncio.create_task(self._handle_task_result(data["data"]))
async def submit_task(self, payload: str, bounty: float):
"""As a Consumer, create a task and lock SECONDS."""
payload_str = json.dumps({
"consumer_id": self.node_id,
"payload": payload,
"bounty": bounty
})
headers = self.identity.get_auth_headers(payload_str)
headers["Content-Type"] = "application/json"
resp = requests.post(f"{self.hub_url}/tasks/submit", data=payload_str, headers=headers)
if resp.status_code == 200:
task_id = resp.json()["task_id"]
print(f"[Node {self.node_id} (Consumer)] Submitted Task {task_id[:6]} for {bounty}s")
else:
print(f"Failed to submit task: {resp.text}")
async def run_demo():
# Setup two nodes
usa_node = ChronosNode(f"usa_node_{uuid.uuid4().hex[:6]}.pem")
usa_node.is_sleeping = False
usa_node.register()
asia_node = ChronosNode(f"asia_node_{uuid.uuid4().hex[:6]}.pem")
asia_node.is_sleeping = True # Asia goes to sleep and mines
asia_node.register()
# Start listening
tasks = [
asyncio.create_task(usa_node.listen()),
asyncio.create_task(asia_node.listen())
]
# Wait for connections
await asyncio.sleep(1)
# USA user creates a task
await usa_node.submit_task("Write a long chapter about the Sleeping API", bounty=5.0)
# Let the simulation run for a few seconds
await asyncio.sleep(3)
# Stop tasks
for t in tasks:
t.cancel()
if __name__ == "__main__":
asyncio.run(run_demo())