Clawdbot commited on
Commit
bdfaf81
·
1 Parent(s): 5c1d721

Add Data Market (Negative Bounties) and Security Switches

Browse files
Files changed (2) hide show
  1. hub/main.py +22 -7
  2. node/mep_provider.py +19 -18
hub/main.py CHANGED
@@ -28,11 +28,16 @@ async def get_balance(node_id: str):
28
  async def submit_task(task: TaskCreate):
29
  if task.consumer_id not in ledger:
30
  raise HTTPException(status_code=404, detail="Consumer node not found")
31
- if ledger[task.consumer_id] < task.bounty:
32
- raise HTTPException(status_code=400, detail="Insufficient SECONDS balance")
33
-
34
- ledger[task.consumer_id] -= task.bounty
35
-
 
 
 
 
 
36
  task_id = str(uuid.uuid4())
37
  task_data = {
38
  "id": task_id,
@@ -104,8 +109,18 @@ async def complete_task(result: TaskResult):
104
  if result.provider_id not in ledger:
105
  ledger[result.provider_id] = 0.0
106
 
107
- # Transfer SECONDS to provider
108
- ledger[result.provider_id] += task["bounty"]
 
 
 
 
 
 
 
 
 
 
109
 
110
  # Move task to completed
111
  task["status"] = "completed"
 
28
  async def submit_task(task: TaskCreate):
29
  if task.consumer_id not in ledger:
30
  raise HTTPException(status_code=404, detail="Consumer node not found")
31
+
32
+ # If bounty is positive, consumer is PAYING. Check consumer balance.
33
+ if task.bounty > 0 and ledger[task.consumer_id] < task.bounty:
34
+ raise HTTPException(status_code=400, detail="Insufficient SECONDS balance to pay for task")
35
+
36
+ # Note: If bounty is negative, consumer is SELLING data. We don't deduct here.
37
+ # We will deduct from the provider when they complete the task.
38
+ if task.bounty > 0:
39
+ ledger[task.consumer_id] -= task.bounty
40
+
41
  task_id = str(uuid.uuid4())
42
  task_data = {
43
  "id": task_id,
 
109
  if result.provider_id not in ledger:
110
  ledger[result.provider_id] = 0.0
111
 
112
+ # Transfer SECONDS based on positive or negative bounty
113
+ bounty = task["bounty"]
114
+ if bounty >= 0:
115
+ # Standard Compute Market: Provider earns SECONDS
116
+ ledger[result.provider_id] += bounty
117
+ else:
118
+ # Data Market: Provider PAYS to receive this payload/task
119
+ cost = abs(bounty)
120
+ if ledger[result.provider_id] < cost:
121
+ raise HTTPException(status_code=400, detail="Provider lacks SECONDS to buy this data")
122
+ ledger[result.provider_id] -= cost
123
+ ledger[task["consumer_id"]] += cost # The sender earns SECONDS
124
 
125
  # Move task to completed
126
  task["status"] = "completed"
node/mep_provider.py CHANGED
@@ -25,23 +25,23 @@ class MEPProvider:
25
 
26
  async def connect(self):
27
  """Connect to MEP Hub and start mining."""
28
- print(f"[MEP Miner {self.node_id}] Starting...")
29
 
30
  # Register with hub
31
  try:
32
  resp = requests.post(f"{HUB_URL}/register", json={"pubkey": self.node_id})
33
  data = resp.json()
34
  self.balance = data.get("balance", 0.0)
35
- print(f"[MEP Miner {self.node_id}] Registered. Balance: {self.balance:.6f} SECONDS")
36
  except Exception as e:
37
- print(f"[MEP Miner {self.node_id}] Registration failed: {e}")
38
  return
39
 
40
  # Connect to WebSocket
41
  uri = f"{WS_URL}/ws/{self.node_id}"
42
  try:
43
  async with websockets.connect(uri) as ws:
44
- print(f"[MEP Miner {self.node_id}] Connected to MEP Hub")
45
 
46
  # Listen for tasks
47
  while self.is_mining:
@@ -57,11 +57,11 @@ class MEPProvider:
57
  except asyncio.TimeoutError:
58
  continue # Keep connection alive
59
  except websockets.exceptions.ConnectionClosed:
60
- print(f"[MEP Miner {self.node_id}] Connection closed")
61
  break
62
 
63
  except Exception as e:
64
- print(f"[MEP Miner {self.node_id}] WebSocket error: {e}")
65
 
66
  async def handle_rfc(self, rfc_data: dict):
67
  """Phase 2: Evaluate Request For Compute and submit Bid."""
@@ -69,12 +69,13 @@ class MEPProvider:
69
  bounty = rfc_data["bounty"]
70
  model = rfc_data.get("model_requirement")
71
 
72
- # Simple evaluation logic
73
- if bounty < 0.1 and bounty != 0.0: # Allow 0 for testing DM
74
- print(f"[MEP Miner {self.node_id}] Ignored RFC {task_id[:8]} (Bounty too low: {bounty})")
 
75
  return
76
 
77
- print(f"[MEP Miner {self.node_id}] Received RFC {task_id[:8]} for {bounty:.6f} SECONDS. Placing bid...")
78
 
79
  # Place bid
80
  try:
@@ -86,7 +87,7 @@ class MEPProvider:
86
  if resp.status_code == 200:
87
  data = resp.json()
88
  if data["status"] == "accepted":
89
- print(f"[MEP Miner {self.node_id}] 🏁 BID WON for task {task_id[:8]}! Processing payload...")
90
 
91
  # Reconstruct task_data to pass to process_task
92
  task_data = {
@@ -97,9 +98,9 @@ class MEPProvider:
97
  }
98
  await self.process_task(task_data)
99
  else:
100
- print(f"[MEP Miner {self.node_id}] Bid rejected (too slow): {data.get('detail', '')}")
101
  except Exception as e:
102
- print(f"[MEP Miner {self.node_id}] Error placing bid: {e}")
103
 
104
  async def process_task(self, task_data: dict):
105
  """Process a task and earn SECONDS."""
@@ -108,7 +109,7 @@ class MEPProvider:
108
  bounty = task_data["bounty"]
109
  consumer_id = task_data["consumer_id"]
110
 
111
- print(f"[MEP Miner {self.node_id}] Received task {task_id[:8]} for {bounty:.6f} SECONDS")
112
  print(f" Payload: {payload[:50]}...")
113
 
114
  # Simulate processing (in real version, this would call local LLM API)
@@ -139,18 +140,18 @@ Would you like me to elaborate on any specific aspect?"""
139
  if resp.status_code == 200:
140
  data = resp.json()
141
  self.balance = data["new_balance"]
142
- print(f"[MEP Miner {self.node_id}] Earned {bounty:.6f} SECONDS!")
143
  print(f" New balance: {self.balance:.6f} SECONDS")
144
  else:
145
- print(f"[MEP Miner {self.node_id}] Failed to submit: {resp.text}")
146
 
147
  except Exception as e:
148
- print(f"[MEP Miner {self.node_id}] Submission error: {e}")
149
 
150
  def stop(self):
151
  """Stop mining."""
152
  self.is_mining = False
153
- print(f"[MEP Miner {self.node_id}] Stopping...")
154
 
155
  async def main():
156
  # Create a miner with unique ID
 
25
 
26
  async def connect(self):
27
  """Connect to MEP Hub and start mining."""
28
+ print(f"[MEP Provider {self.node_id}] Starting...")
29
 
30
  # Register with hub
31
  try:
32
  resp = requests.post(f"{HUB_URL}/register", json={"pubkey": self.node_id})
33
  data = resp.json()
34
  self.balance = data.get("balance", 0.0)
35
+ print(f"[MEP Provider {self.node_id}] Registered. Balance: {self.balance:.6f} SECONDS")
36
  except Exception as e:
37
+ print(f"[MEP Provider {self.node_id}] Registration failed: {e}")
38
  return
39
 
40
  # Connect to WebSocket
41
  uri = f"{WS_URL}/ws/{self.node_id}"
42
  try:
43
  async with websockets.connect(uri) as ws:
44
+ print(f"[MEP Provider {self.node_id}] Connected to MEP Hub")
45
 
46
  # Listen for tasks
47
  while self.is_mining:
 
57
  except asyncio.TimeoutError:
58
  continue # Keep connection alive
59
  except websockets.exceptions.ConnectionClosed:
60
+ print(f"[MEP Provider {self.node_id}] Connection closed")
61
  break
62
 
63
  except Exception as e:
64
+ print(f"[MEP Provider {self.node_id}] WebSocket error: {e}")
65
 
66
  async def handle_rfc(self, rfc_data: dict):
67
  """Phase 2: Evaluate Request For Compute and submit Bid."""
 
69
  bounty = rfc_data["bounty"]
70
  model = rfc_data.get("model_requirement")
71
 
72
+ # SAFETY SWITCH: Prevent purchasing data unless explicitly allowed
73
+ max_purchase_price = 0.0 # Set to e.g., -5.0 to buy premium data
74
+ if bounty < max_purchase_price:
75
+ print(f"[MEP Provider {self.node_id}] Ignored RFC {task_id[:8]} (Bounty {bounty} exceeds max purchase price)")
76
  return
77
 
78
+ print(f"[MEP Provider {self.node_id}] Received RFC {task_id[:8]} for {bounty:.6f} SECONDS. Placing bid...")
79
 
80
  # Place bid
81
  try:
 
87
  if resp.status_code == 200:
88
  data = resp.json()
89
  if data["status"] == "accepted":
90
+ print(f"[MEP Provider {self.node_id}] 🏁 BID WON for task {task_id[:8]}! Processing payload...")
91
 
92
  # Reconstruct task_data to pass to process_task
93
  task_data = {
 
98
  }
99
  await self.process_task(task_data)
100
  else:
101
+ print(f"[MEP Provider {self.node_id}] Bid rejected (too slow): {data.get('detail', '')}")
102
  except Exception as e:
103
+ print(f"[MEP Provider {self.node_id}] Error placing bid: {e}")
104
 
105
  async def process_task(self, task_data: dict):
106
  """Process a task and earn SECONDS."""
 
109
  bounty = task_data["bounty"]
110
  consumer_id = task_data["consumer_id"]
111
 
112
+ print(f"[MEP Provider {self.node_id}] Received task {task_id[:8]} for {bounty:.6f} SECONDS")
113
  print(f" Payload: {payload[:50]}...")
114
 
115
  # Simulate processing (in real version, this would call local LLM API)
 
140
  if resp.status_code == 200:
141
  data = resp.json()
142
  self.balance = data["new_balance"]
143
+ print(f"[MEP Provider {self.node_id}] Earned {bounty:.6f} SECONDS!")
144
  print(f" New balance: {self.balance:.6f} SECONDS")
145
  else:
146
+ print(f"[MEP Provider {self.node_id}] Failed to submit: {resp.text}")
147
 
148
  except Exception as e:
149
+ print(f"[MEP Provider {self.node_id}] Submission error: {e}")
150
 
151
  def stop(self):
152
  """Stop mining."""
153
  self.is_mining = False
154
+ print(f"[MEP Provider {self.node_id}] Stopping...")
155
 
156
  async def main():
157
  # Create a miner with unique ID