Clawdbot commited on
Commit
9e15418
·
1 Parent(s): a6973d6

Add L2 client and Reputation logic

Browse files
hub/__pycache__/main.cpython-310.pyc ADDED
Binary file (2.89 kB). View file
 
hub/__pycache__/models.cpython-310.pyc ADDED
Binary file (1.2 kB). View file
 
hub/main.py CHANGED
@@ -1,13 +1,12 @@
1
  from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
2
  from typing import Dict, List
3
  import uuid
4
- import json
5
 
6
  from models import NodeRegistration, TaskCreate, TaskResult, NodeBalance
7
 
8
- app = FastAPI(title="Chronos Protocol L1 Hub", description="The Time Exchange Clearinghouse", version="0.1.0")
9
 
10
- # In-memory storage for MVP (Use Postgres/Redis in prod)
11
  ledger: Dict[str, float] = {} # node_id -> balance
12
  active_tasks: Dict[str, dict] = {} # task_id -> task_details
13
  completed_tasks: Dict[str, dict] = {} # task_id -> result
@@ -15,28 +14,23 @@ connected_nodes: Dict[str, WebSocket] = {} # node_id -> websocket
15
 
16
  @app.post("/register")
17
  async def register_node(node: NodeRegistration):
18
- """Register a new node and initialize its SECONDS balance."""
19
  if node.pubkey not in ledger:
20
- # Give a 10 SECOND starter bonus for the MVP
21
- ledger[node.pubkey] = 10.0
22
  return {"status": "success", "node_id": node.pubkey, "balance": ledger[node.pubkey]}
23
 
24
  @app.get("/balance/{node_id}")
25
  async def get_balance(node_id: str):
26
- """Check a node's SECONDS balance."""
27
  if node_id not in ledger:
28
  raise HTTPException(status_code=404, detail="Node not found")
29
  return {"node_id": node_id, "balance_seconds": ledger[node_id]}
30
 
31
  @app.post("/tasks/submit")
32
  async def submit_task(task: TaskCreate):
33
- """Consumer submits a task, locking their SECONDS."""
34
  if task.consumer_id not in ledger:
35
  raise HTTPException(status_code=404, detail="Consumer node not found")
36
  if ledger[task.consumer_id] < task.bounty:
37
  raise HTTPException(status_code=400, detail="Insufficient SECONDS balance")
38
 
39
- # Deduct bounty
40
  ledger[task.consumer_id] -= task.bounty
41
 
42
  task_id = str(uuid.uuid4())
@@ -49,19 +43,18 @@ async def submit_task(task: TaskCreate):
49
  }
50
  active_tasks[task_id] = task_data
51
 
52
- # Broadcast to connected sleeping nodes (Providers)
53
- for node_id, ws in connected_nodes.items():
54
  if node_id != task.consumer_id:
55
  try:
56
  await ws.send_json({"event": "new_task", "data": task_data})
57
  except:
58
- pass # Disconnected
59
 
60
  return {"status": "success", "task_id": task_id}
61
 
62
  @app.post("/tasks/complete")
63
  async def complete_task(result: TaskResult):
64
- """Provider submits the result and claims the bounty."""
65
  task = active_tasks.get(result.task_id)
66
  if not task:
67
  raise HTTPException(status_code=404, detail="Task not found or already claimed")
@@ -79,20 +72,31 @@ async def complete_task(result: TaskResult):
79
  completed_tasks[result.task_id] = task
80
  del active_tasks[result.task_id]
81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
  return {"status": "success", "earned": task["bounty"], "new_balance": ledger[result.provider_id]}
83
 
84
  @app.websocket("/ws/{node_id}")
85
  async def websocket_endpoint(websocket: WebSocket, node_id: str):
86
- """WebSocket connection for real-time task broadcasting (Sleeping APIs listen here)."""
87
  await websocket.accept()
88
  connected_nodes[node_id] = websocket
89
  try:
90
  while True:
91
- # Ping/Pong to keep alive
92
  data = await websocket.receive_text()
93
  except WebSocketDisconnect:
94
- del connected_nodes[node_id]
95
-
96
- if __name__ == "__main__":
97
- import uvicorn
98
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
1
  from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
2
  from typing import Dict, List
3
  import uuid
 
4
 
5
  from models import NodeRegistration, TaskCreate, TaskResult, NodeBalance
6
 
7
+ app = FastAPI(title="Chronos Protocol L1 Hub", description="The Time Exchange Clearinghouse", version="0.1.1")
8
 
9
+ # In-memory storage for MVP
10
  ledger: Dict[str, float] = {} # node_id -> balance
11
  active_tasks: Dict[str, dict] = {} # task_id -> task_details
12
  completed_tasks: Dict[str, dict] = {} # task_id -> result
 
14
 
15
  @app.post("/register")
16
  async def register_node(node: NodeRegistration):
 
17
  if node.pubkey not in ledger:
18
+ ledger[node.pubkey] = 10.0 # Starter bonus
 
19
  return {"status": "success", "node_id": node.pubkey, "balance": ledger[node.pubkey]}
20
 
21
  @app.get("/balance/{node_id}")
22
  async def get_balance(node_id: str):
 
23
  if node_id not in ledger:
24
  raise HTTPException(status_code=404, detail="Node not found")
25
  return {"node_id": node_id, "balance_seconds": ledger[node_id]}
26
 
27
  @app.post("/tasks/submit")
28
  async def submit_task(task: TaskCreate):
 
29
  if task.consumer_id not in ledger:
30
  raise HTTPException(status_code=404, detail="Consumer node not found")
31
  if ledger[task.consumer_id] < task.bounty:
32
  raise HTTPException(status_code=400, detail="Insufficient SECONDS balance")
33
 
 
34
  ledger[task.consumer_id] -= task.bounty
35
 
36
  task_id = str(uuid.uuid4())
 
43
  }
44
  active_tasks[task_id] = task_data
45
 
46
+ # Broadcast to all connected nodes EXCEPT the consumer
47
+ for node_id, ws in list(connected_nodes.items()):
48
  if node_id != task.consumer_id:
49
  try:
50
  await ws.send_json({"event": "new_task", "data": task_data})
51
  except:
52
+ pass
53
 
54
  return {"status": "success", "task_id": task_id}
55
 
56
  @app.post("/tasks/complete")
57
  async def complete_task(result: TaskResult):
 
58
  task = active_tasks.get(result.task_id)
59
  if not task:
60
  raise HTTPException(status_code=404, detail="Task not found or already claimed")
 
72
  completed_tasks[result.task_id] = task
73
  del active_tasks[result.task_id]
74
 
75
+ # ROUTE RESULT BACK TO CONSUMER VIA WEBSOCKET
76
+ consumer_id = task["consumer_id"]
77
+ if consumer_id in connected_nodes:
78
+ try:
79
+ await connected_nodes[consumer_id].send_json({
80
+ "event": "task_result",
81
+ "data": {
82
+ "task_id": result.task_id,
83
+ "provider_id": result.provider_id,
84
+ "result_payload": result.result_payload,
85
+ "bounty_spent": task["bounty"]
86
+ }
87
+ })
88
+ except:
89
+ pass # Consumer disconnected, they can fetch it via REST later (TODO)
90
+
91
  return {"status": "success", "earned": task["bounty"], "new_balance": ledger[result.provider_id]}
92
 
93
  @app.websocket("/ws/{node_id}")
94
  async def websocket_endpoint(websocket: WebSocket, node_id: str):
 
95
  await websocket.accept()
96
  connected_nodes[node_id] = websocket
97
  try:
98
  while True:
 
99
  data = await websocket.receive_text()
100
  except WebSocketDisconnect:
101
+ if node_id in connected_nodes:
102
+ del connected_nodes[node_id]
 
 
 
node/__pycache__/reputation.cpython-310.pyc ADDED
Binary file (2.59 kB). View file
 
node/client.py ADDED
@@ -0,0 +1,129 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import json
3
+ import websockets
4
+ import requests
5
+ import uuid
6
+ import sys
7
+ from reputation import ReputationManager
8
+
9
+ class ChronosNode:
10
+ """
11
+ Simulated Clawdbot Client (Both Consumer & Provider)
12
+ """
13
+ def __init__(self, node_id: str, hub_url: str = "http://localhost:8000", ws_url: str = "ws://localhost:8000"):
14
+ self.node_id = node_id
15
+ self.hub_url = hub_url
16
+ self.ws_url = ws_url
17
+ self.reputation = ReputationManager(storage_path=f"reputation_{node_id}.json")
18
+ self.is_sleeping = False
19
+
20
+ # Track pending tasks we created (Consumer)
21
+ self.my_pending_tasks = {}
22
+
23
+ def register(self):
24
+ """Register to get 10 SECONDS."""
25
+ print(f"[Node {self.node_id}] Registering with Hub...")
26
+ resp = requests.post(f"{self.hub_url}/register", json={"pubkey": self.node_id, "alias": "test"})
27
+ data = resp.json()
28
+ print(f"[Node {self.node_id}] Balance: {data['balance']}s")
29
+
30
+ async def _handle_new_task(self, task_data: dict):
31
+ """As a Provider (Sleeping), evaluate and execute the broadcasted task."""
32
+ if not self.is_sleeping:
33
+ return # Awake nodes don't mine
34
+
35
+ task_id = task_data["id"]
36
+ payload = task_data["payload"]
37
+ bounty = task_data["bounty"]
38
+ consumer_id = task_data["consumer_id"]
39
+
40
+ print(f"[Node {self.node_id}] Broadcast received: Task {task_id[:6]} for {bounty}s")
41
+
42
+ # 1. Check L2 Reputation of Consumer (Don't work for bad nodes)
43
+ # (Mock implementation, skipping for brevity)
44
+
45
+ # 2. Local execution using user's API keys (Mocked)
46
+ await asyncio.sleep(1) # simulate think time
47
+ result = f"Hello from {self.node_id}. I processed your payload: {payload[:20]}..."
48
+
49
+ # 3. Submit proof of work
50
+ resp = requests.post(f"{self.hub_url}/tasks/complete", json={
51
+ "task_id": task_id,
52
+ "provider_id": self.node_id,
53
+ "result_payload": result
54
+ })
55
+ if resp.status_code == 200:
56
+ print(f"[Node {self.node_id}] Mined {bounty}s! New Balance: {resp.json()['new_balance']}s")
57
+
58
+ async def _handle_task_result(self, result_data: dict):
59
+ """As a Consumer, receive the result and update Reputation."""
60
+ task_id = result_data["task_id"]
61
+ provider_id = result_data["provider_id"]
62
+ result = result_data["result_payload"]
63
+
64
+ print(f"\n[Node {self.node_id} (Consumer)] Result received for {task_id[:6]}")
65
+ print(f" -> Provider: {provider_id}")
66
+ print(f" -> Result: {result}")
67
+
68
+ # L2 Reputation Evaluation
69
+ score = self.reputation.evaluate_result(provider_id, result)
70
+ print(f" -> Provider {provider_id} rated {score:.2f}/1.00 based on this result.\n")
71
+
72
+ async def listen(self):
73
+ """Persistent WebSocket connection."""
74
+ uri = f"{self.ws_url}/ws/{self.node_id}"
75
+ async with websockets.connect(uri) as ws:
76
+ print(f"[Node {self.node_id}] Connected to Hub via WebSocket.")
77
+ while True:
78
+ msg = await ws.recv()
79
+ data = json.loads(msg)
80
+
81
+ if data["event"] == "new_task":
82
+ asyncio.create_task(self._handle_new_task(data["data"]))
83
+ elif data["event"] == "task_result":
84
+ asyncio.create_task(self._handle_task_result(data["data"]))
85
+
86
+ async def submit_task(self, payload: str, bounty: float):
87
+ """As a Consumer, create a task and lock SECONDS."""
88
+ resp = requests.post(f"{self.hub_url}/tasks/submit", json={
89
+ "consumer_id": self.node_id,
90
+ "payload": payload,
91
+ "bounty": bounty
92
+ })
93
+ if resp.status_code == 200:
94
+ task_id = resp.json()["task_id"]
95
+ print(f"[Node {self.node_id} (Consumer)] Submitted Task {task_id[:6]} for {bounty}s")
96
+ else:
97
+ print(f"Failed to submit task: {resp.text}")
98
+
99
+ async def run_demo():
100
+ # Setup two nodes
101
+ usa_node = ChronosNode("usa_node")
102
+ usa_node.is_sleeping = False
103
+ usa_node.register()
104
+
105
+ asia_node = ChronosNode("asia_node")
106
+ asia_node.is_sleeping = True # Asia goes to sleep and mines
107
+ asia_node.register()
108
+
109
+ # Start listening
110
+ tasks = [
111
+ asyncio.create_task(usa_node.listen()),
112
+ asyncio.create_task(asia_node.listen())
113
+ ]
114
+
115
+ # Wait for connections
116
+ await asyncio.sleep(1)
117
+
118
+ # USA user creates a task
119
+ await usa_node.submit_task("Write a long chapter about the Sleeping API", bounty=5.0)
120
+
121
+ # Let the simulation run for a few seconds
122
+ await asyncio.sleep(3)
123
+
124
+ # Stop tasks
125
+ for t in tasks:
126
+ t.cancel()
127
+
128
+ if __name__ == "__main__":
129
+ asyncio.run(run_demo())
node/reputation.py ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import os
3
+ from typing import Dict, Any
4
+
5
+ class ReputationManager:
6
+ """
7
+ L2 Reputation Tracker: Stored locally on the user's Clawdbot.
8
+ Tracks which Provider Nodes are good, and which are 'slacking' (磨洋工).
9
+ """
10
+ def __init__(self, storage_path="reputation.json"):
11
+ self.storage_path = storage_path
12
+ self.scores: Dict[str, Dict[str, Any]] = self._load()
13
+
14
+ def _load(self) -> Dict[str, Dict[str, Any]]:
15
+ if os.path.exists(self.storage_path):
16
+ with open(self.storage_path, 'r') as f:
17
+ return json.load(f)
18
+ return {}
19
+
20
+ def _save(self):
21
+ with open(self.storage_path, 'w') as f:
22
+ json.dump(self.scores, f, indent=2)
23
+
24
+ def get_score(self, provider_id: str) -> float:
25
+ """Returns score between 0.0 (Worst) and 1.0 (Best). Default is 0.5."""
26
+ return self.scores.get(provider_id, {}).get("score", 0.5)
27
+
28
+ def evaluate_result(self, provider_id: str, result_payload: str) -> float:
29
+ """
30
+ Mock evaluation logic.
31
+ In production, this could be:
32
+ 1. Simple length/keyword check.
33
+ 2. A lightweight local LLM verification (e.g., Llama.cpp).
34
+ """
35
+ quality = 0.5
36
+
37
+ # Extremely basic heuristics for the prototype
38
+ if len(result_payload.strip()) == 0:
39
+ quality = 0.0 # Empty response = slacking!
40
+ elif "Error" in result_payload or "Failed" in result_payload:
41
+ quality = 0.2 # Bad response
42
+ elif len(result_payload) > 50:
43
+ quality = 0.9 # Looks like a solid response!
44
+
45
+ return self.update_score(provider_id, quality)
46
+
47
+ def update_score(self, provider_id: str, quality_score: float) -> float:
48
+ """Moving average reputation."""
49
+ if provider_id not in self.scores:
50
+ self.scores[provider_id] = {"score": 0.5, "tasks": 0}
51
+
52
+ current = self.scores[provider_id]
53
+ total_score = current["score"] * current["tasks"]
54
+
55
+ current["tasks"] += 1
56
+ current["score"] = (total_score + quality_score) / current["tasks"]
57
+
58
+ self._save()
59
+ print(f"[Reputation] Node {provider_id} updated. New Score: {current['score']:.2f} (Total Tasks: {current['tasks']})")
60
+ return current["score"]
node/reputation_usa_node.json ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ {
2
+ "asia_node": {
3
+ "score": 0.9,
4
+ "tasks": 1
5
+ }
6
+ }
node/requirements.txt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ websockets
2
+ requests
3
+ aiohttp