Clawdbot commited on
Commit
2f95766
·
1 Parent(s): 192a5c5

Add SQLite persistence to Hub for safe reboots

Browse files
.gitignore ADDED
@@ -0,0 +1 @@
 
 
1
+ \n*.db\n*.sqlite3
hub/__pycache__/db.cpython-310.pyc ADDED
Binary file (2.04 kB). View file
 
hub/__pycache__/main.cpython-310.pyc CHANGED
Binary files a/hub/__pycache__/main.cpython-310.pyc and b/hub/__pycache__/main.cpython-310.pyc differ
 
hub/db.py ADDED
@@ -0,0 +1,71 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sqlite3
2
+ import os
3
+ from typing import Optional
4
+
5
+ DB_FILE = "ledger.db"
6
+
7
+ def init_db():
8
+ conn = sqlite3.connect(DB_FILE)
9
+ cursor = conn.cursor()
10
+ cursor.execute('''
11
+ CREATE TABLE IF NOT EXISTS ledger (
12
+ node_id TEXT PRIMARY KEY,
13
+ balance REAL NOT NULL
14
+ )
15
+ ''')
16
+ conn.commit()
17
+ conn.close()
18
+
19
+ def get_balance(node_id: str) -> Optional[float]:
20
+ conn = sqlite3.connect(DB_FILE)
21
+ cursor = conn.cursor()
22
+ cursor.execute("SELECT balance FROM ledger WHERE node_id = ?", (node_id,))
23
+ row = cursor.fetchone()
24
+ conn.close()
25
+ if row:
26
+ return row[0]
27
+ return None
28
+
29
+ def set_balance(node_id: str, balance: float):
30
+ conn = sqlite3.connect(DB_FILE)
31
+ cursor = conn.cursor()
32
+ cursor.execute('''
33
+ INSERT INTO ledger (node_id, balance)
34
+ VALUES (?, ?)
35
+ ON CONFLICT(node_id) DO UPDATE SET balance = excluded.balance
36
+ ''', (node_id, balance))
37
+ conn.commit()
38
+ conn.close()
39
+
40
+ def add_balance(node_id: str, amount: float):
41
+ # Atomic addition
42
+ conn = sqlite3.connect(DB_FILE)
43
+ cursor = conn.cursor()
44
+ # Ensure node exists first
45
+ cursor.execute("SELECT balance FROM ledger WHERE node_id = ?", (node_id,))
46
+ row = cursor.fetchone()
47
+ if row is None:
48
+ # Should not happen if correctly initialized, but fallback to amount
49
+ cursor.execute("INSERT INTO ledger (node_id, balance) VALUES (?, ?)", (node_id, amount))
50
+ else:
51
+ cursor.execute("UPDATE ledger SET balance = balance + ? WHERE node_id = ?", (amount, node_id))
52
+ conn.commit()
53
+ conn.close()
54
+
55
+ def deduct_balance(node_id: str, amount: float) -> bool:
56
+ # Atomic deduction, returns True if successful, False if insufficient balance
57
+ conn = sqlite3.connect(DB_FILE)
58
+ cursor = conn.cursor()
59
+ cursor.execute("SELECT balance FROM ledger WHERE node_id = ?", (node_id,))
60
+ row = cursor.fetchone()
61
+ if row is None or row[0] < amount:
62
+ conn.close()
63
+ return False
64
+
65
+ cursor.execute("UPDATE ledger SET balance = balance - ? WHERE node_id = ?", (amount, node_id))
66
+ conn.commit()
67
+ conn.close()
68
+ return True
69
+
70
+ # Initialize database on module import
71
+ init_db()
hub/ledger.db ADDED
Binary file (12.3 kB). View file
 
hub/main.py CHANGED
@@ -1,42 +1,48 @@
1
  from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
2
  from typing import Dict, List
3
  import uuid
 
4
 
5
  from models import NodeRegistration, TaskCreate, TaskResult, NodeBalance, TaskBid
6
 
7
  app = FastAPI(title="Chronos Protocol L1 Hub", description="The Time Exchange Clearinghouse", version="0.1.1")
8
 
9
- # In-memory storage for MVP
10
- ledger: Dict[str, float] = {} # node_id -> balance
11
  active_tasks: Dict[str, dict] = {} # task_id -> task_details
12
  completed_tasks: Dict[str, dict] = {} # task_id -> result
13
  connected_nodes: Dict[str, WebSocket] = {} # node_id -> websocket
14
 
15
  @app.post("/register")
16
  async def register_node(node: NodeRegistration):
17
- if node.pubkey not in ledger:
18
- ledger[node.pubkey] = 10.0 # Starter bonus
19
- return {"status": "success", "node_id": node.pubkey, "balance": ledger[node.pubkey]}
 
 
20
 
21
  @app.get("/balance/{node_id}")
22
  async def get_balance(node_id: str):
23
- if node_id not in ledger:
 
24
  raise HTTPException(status_code=404, detail="Node not found")
25
- return {"node_id": node_id, "balance_seconds": ledger[node_id]}
26
 
27
  @app.post("/tasks/submit")
28
  async def submit_task(task: TaskCreate):
29
- if task.consumer_id not in ledger:
 
30
  raise HTTPException(status_code=404, detail="Consumer node not found")
31
 
32
  # If bounty is positive, consumer is PAYING. Check consumer balance.
33
- if task.bounty > 0 and ledger[task.consumer_id] < task.bounty:
34
  raise HTTPException(status_code=400, detail="Insufficient SECONDS balance to pay for task")
35
 
36
  # Note: If bounty is negative, consumer is SELLING data. We don't deduct here.
37
  # We will deduct from the provider when they complete the task.
38
  if task.bounty > 0:
39
- ledger[task.consumer_id] -= task.bounty
 
 
40
 
41
  task_id = str(uuid.uuid4())
42
  task_data = {
@@ -106,21 +112,23 @@ async def complete_task(result: TaskResult):
106
  if not task:
107
  raise HTTPException(status_code=404, detail="Task not found or already claimed")
108
 
109
- if result.provider_id not in ledger:
110
- ledger[result.provider_id] = 0.0
 
111
 
112
  # Transfer SECONDS based on positive or negative bounty
113
  bounty = task["bounty"]
114
  if bounty >= 0:
115
  # Standard Compute Market: Provider earns SECONDS
116
- ledger[result.provider_id] += bounty
117
  else:
118
  # Data Market: Provider PAYS to receive this payload/task
119
  cost = abs(bounty)
120
- if ledger[result.provider_id] < cost:
 
121
  raise HTTPException(status_code=400, detail="Provider lacks SECONDS to buy this data")
122
- ledger[result.provider_id] -= cost
123
- ledger[task["consumer_id"]] += cost # The sender earns SECONDS
124
 
125
  # Move task to completed
126
  task["status"] = "completed"
@@ -145,7 +153,7 @@ async def complete_task(result: TaskResult):
145
  except:
146
  pass # Consumer disconnected, they can fetch it via REST later (TODO)
147
 
148
- return {"status": "success", "earned": task["bounty"], "new_balance": ledger[result.provider_id]}
149
 
150
  @app.websocket("/ws/{node_id}")
151
  async def websocket_endpoint(websocket: WebSocket, node_id: str):
 
1
  from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
2
  from typing import Dict, List
3
  import uuid
4
+ import db
5
 
6
  from models import NodeRegistration, TaskCreate, TaskResult, NodeBalance, TaskBid
7
 
8
  app = FastAPI(title="Chronos Protocol L1 Hub", description="The Time Exchange Clearinghouse", version="0.1.1")
9
 
10
+ # In-memory storage for active tasks
 
11
  active_tasks: Dict[str, dict] = {} # task_id -> task_details
12
  completed_tasks: Dict[str, dict] = {} # task_id -> result
13
  connected_nodes: Dict[str, WebSocket] = {} # node_id -> websocket
14
 
15
  @app.post("/register")
16
  async def register_node(node: NodeRegistration):
17
+ balance = db.get_balance(node.pubkey)
18
+ if balance is None:
19
+ db.set_balance(node.pubkey, 10.0) # Starter bonus
20
+ balance = 10.0
21
+ return {"status": "success", "node_id": node.pubkey, "balance": balance}
22
 
23
  @app.get("/balance/{node_id}")
24
  async def get_balance(node_id: str):
25
+ balance = db.get_balance(node_id)
26
+ if balance is None:
27
  raise HTTPException(status_code=404, detail="Node not found")
28
+ return {"node_id": node_id, "balance_seconds": balance}
29
 
30
  @app.post("/tasks/submit")
31
  async def submit_task(task: TaskCreate):
32
+ consumer_balance = db.get_balance(task.consumer_id)
33
+ if consumer_balance is None:
34
  raise HTTPException(status_code=404, detail="Consumer node not found")
35
 
36
  # If bounty is positive, consumer is PAYING. Check consumer balance.
37
+ if task.bounty > 0 and consumer_balance < task.bounty:
38
  raise HTTPException(status_code=400, detail="Insufficient SECONDS balance to pay for task")
39
 
40
  # Note: If bounty is negative, consumer is SELLING data. We don't deduct here.
41
  # We will deduct from the provider when they complete the task.
42
  if task.bounty > 0:
43
+ success = db.deduct_balance(task.consumer_id, task.bounty)
44
+ if not success:
45
+ raise HTTPException(status_code=400, detail="Insufficient SECONDS balance")
46
 
47
  task_id = str(uuid.uuid4())
48
  task_data = {
 
112
  if not task:
113
  raise HTTPException(status_code=404, detail="Task not found or already claimed")
114
 
115
+ provider_balance = db.get_balance(result.provider_id)
116
+ if provider_balance is None:
117
+ db.set_balance(result.provider_id, 0.0)
118
 
119
  # Transfer SECONDS based on positive or negative bounty
120
  bounty = task["bounty"]
121
  if bounty >= 0:
122
  # Standard Compute Market: Provider earns SECONDS
123
+ db.add_balance(result.provider_id, bounty)
124
  else:
125
  # Data Market: Provider PAYS to receive this payload/task
126
  cost = abs(bounty)
127
+ success = db.deduct_balance(result.provider_id, cost)
128
+ if not success:
129
  raise HTTPException(status_code=400, detail="Provider lacks SECONDS to buy this data")
130
+
131
+ db.add_balance(task["consumer_id"], cost) # The sender earns SECONDS
132
 
133
  # Move task to completed
134
  task["status"] = "completed"
 
153
  except:
154
  pass # Consumer disconnected, they can fetch it via REST later (TODO)
155
 
156
+ return {"status": "success", "earned": task["bounty"], "new_balance": db.get_balance(result.provider_id)}
157
 
158
  @app.websocket("/ws/{node_id}")
159
  async def websocket_endpoint(websocket: WebSocket, node_id: str):