| | import asyncio |
| | import json |
| | import websockets |
| | import requests |
| | import uuid |
| | import time |
| | import urllib.parse |
| | from reputation import ReputationManager |
| | from identity import MEPIdentity |
| |
|
| | class ChronosNode: |
| | """ |
| | Simulated Clawdbot Client (Both Consumer & Provider) |
| | """ |
| | def __init__(self, key_path: str, hub_url: str = "http://localhost:8000", ws_url: str = "ws://localhost:8000"): |
| | self.identity = MEPIdentity(key_path) |
| | self.node_id = self.identity.node_id |
| | self.hub_url = hub_url |
| | self.ws_url = ws_url |
| | self.reputation = ReputationManager(storage_path=f"reputation_{self.node_id}.json") |
| | self.is_sleeping = False |
| | |
| | |
| | self.my_pending_tasks: dict[str, dict] = {} |
| |
|
| | def register(self): |
| | """Register to get 10 SECONDS.""" |
| | print(f"[Node {self.node_id}] Registering with Hub...") |
| | resp = requests.post(f"{self.hub_url}/register", json={"pubkey": self.identity.pub_pem, "alias": "test"}) |
| | data = resp.json() |
| | print(f"[Node {self.node_id}] Balance: {data['balance']}s") |
| |
|
| | async def _handle_new_task(self, task_data: dict): |
| | """As a Provider (Sleeping), evaluate and execute the broadcasted task.""" |
| | if not self.is_sleeping: |
| | return |
| |
|
| | task_id = task_data["id"] |
| | payload = task_data["payload"] |
| | bounty = task_data["bounty"] |
| | print(f"[Node {self.node_id}] Broadcast received: Task {task_id[:6]} for {bounty}s") |
| | |
| | |
| | |
| | |
| | |
| | await asyncio.sleep(1) |
| | result = f"Hello from {self.node_id}. I processed your payload: {payload[:20]}..." |
| | |
| | |
| | payload_str = json.dumps({ |
| | "task_id": task_id, |
| | "provider_id": self.node_id, |
| | "result_payload": result |
| | }) |
| | headers = self.identity.get_auth_headers(payload_str) |
| | headers["Content-Type"] = "application/json" |
| | resp = requests.post(f"{self.hub_url}/tasks/complete", data=payload_str, headers=headers) |
| | if resp.status_code == 200: |
| | print(f"[Node {self.node_id}] Mined {bounty}s! New Balance: {resp.json()['new_balance']}s") |
| |
|
| | async def _handle_task_result(self, result_data: dict): |
| | """As a Consumer, receive the result and update Reputation.""" |
| | task_id = result_data["task_id"] |
| | provider_id = result_data["provider_id"] |
| | result = result_data["result_payload"] |
| | |
| | print(f"\n[Node {self.node_id} (Consumer)] Result received for {task_id[:6]}") |
| | print(f" -> Provider: {provider_id}") |
| | print(f" -> Result: {result}") |
| | |
| | |
| | score = self.reputation.evaluate_result(provider_id, result) |
| | print(f" -> Provider {provider_id} rated {score:.2f}/1.00 based on this result.\n") |
| |
|
| | async def listen(self): |
| | """Persistent WebSocket connection.""" |
| | ts = str(int(time.time())) |
| | sig = self.identity.sign(self.node_id, ts) |
| | sig_safe = urllib.parse.quote(sig) |
| | uri = f"{self.ws_url}/ws/{self.node_id}?timestamp={ts}&signature={sig_safe}" |
| | async with websockets.connect(uri) as ws: |
| | print(f"[Node {self.node_id}] Connected to Hub via WebSocket.") |
| | while True: |
| | msg = await ws.recv() |
| | data = json.loads(msg) |
| | |
| | if data["event"] == "new_task": |
| | asyncio.create_task(self._handle_new_task(data["data"])) |
| | elif data["event"] == "task_result": |
| | asyncio.create_task(self._handle_task_result(data["data"])) |
| |
|
| | async def submit_task(self, payload: str, bounty: float): |
| | """As a Consumer, create a task and lock SECONDS.""" |
| | payload_str = json.dumps({ |
| | "consumer_id": self.node_id, |
| | "payload": payload, |
| | "bounty": bounty |
| | }) |
| | headers = self.identity.get_auth_headers(payload_str) |
| | headers["Content-Type"] = "application/json" |
| | resp = requests.post(f"{self.hub_url}/tasks/submit", data=payload_str, headers=headers) |
| | if resp.status_code == 200: |
| | task_id = resp.json()["task_id"] |
| | print(f"[Node {self.node_id} (Consumer)] Submitted Task {task_id[:6]} for {bounty}s") |
| | else: |
| | print(f"Failed to submit task: {resp.text}") |
| |
|
| | async def run_demo(): |
| | |
| | usa_node = ChronosNode(f"usa_node_{uuid.uuid4().hex[:6]}.pem") |
| | usa_node.is_sleeping = False |
| | usa_node.register() |
| | |
| | asia_node = ChronosNode(f"asia_node_{uuid.uuid4().hex[:6]}.pem") |
| | asia_node.is_sleeping = True |
| | asia_node.register() |
| |
|
| | |
| | tasks = [ |
| | asyncio.create_task(usa_node.listen()), |
| | asyncio.create_task(asia_node.listen()) |
| | ] |
| | |
| | |
| | await asyncio.sleep(1) |
| |
|
| | |
| | await usa_node.submit_task("Write a long chapter about the Sleeping API", bounty=5.0) |
| |
|
| | |
| | await asyncio.sleep(3) |
| | |
| | |
| | for t in tasks: |
| | t.cancel() |
| |
|
| | if __name__ == "__main__": |
| | asyncio.run(run_demo()) |
| |
|