MEP / node /mep_provider.py
WUAIBING
prepare
a25490a
#!/usr/bin/env python3
"""
Miao Exchange Protocol (MEP) Miner
A sleeping node that earns SECONDS by processing tasks.
"""
import asyncio
import json
import websockets
import requests
import uuid
import sys
import os
import time
import urllib.parse
# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from identity import MEPIdentity
HUB_URL = "http://localhost:8000"
WS_URL = "ws://localhost:8000"
class MEPProvider:
def __init__(self, key_path: str):
self.identity = MEPIdentity(key_path)
self.node_id = self.identity.node_id
self.balance = 0.0
self.is_mining = True
async def connect(self):
"""Connect to MEP Hub and start mining."""
print(f"[MEP Provider {self.node_id}] Starting...")
# Register with hub
try:
resp = requests.post(f"{HUB_URL}/register", json={"pubkey": self.identity.pub_pem})
data = resp.json()
self.balance = data.get("balance", 0.0)
print(f"[MEP Provider {self.node_id}] Registered. Balance: {self.balance:.6f} SECONDS")
except Exception as e:
print(f"[MEP Provider {self.node_id}] Registration failed: {e}")
return
# Connect to WebSocket
ts = str(int(time.time()))
sig = self.identity.sign(self.node_id, ts)
sig_safe = urllib.parse.quote(sig)
uri = f"{WS_URL}/ws/{self.node_id}?timestamp={ts}&signature={sig_safe}"
try:
async with websockets.connect(uri) as ws:
print(f"[MEP Provider {self.node_id}] Connected to MEP Hub")
# Listen for tasks
while self.is_mining:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=1.0)
data = json.loads(msg)
if data["event"] == "new_task":
await self.process_task(data["data"])
elif data["event"] == "rfc":
await self.handle_rfc(data["data"])
except asyncio.TimeoutError:
continue # Keep connection alive
except websockets.exceptions.ConnectionClosed:
print(f"[MEP Provider {self.node_id}] Connection closed")
break
except Exception as e:
print(f"[MEP Provider {self.node_id}] WebSocket error: {e}")
async def handle_rfc(self, rfc_data: dict):
"""Phase 2: Evaluate Request For Compute and submit Bid."""
task_id = rfc_data["id"]
bounty = rfc_data["bounty"]
# SAFETY SWITCH: Prevent purchasing data unless explicitly allowed
max_purchase_price = 0.0 # Set to e.g., -5.0 to buy premium data
if bounty < max_purchase_price:
print(f"[MEP Provider {self.node_id}] Ignored RFC {task_id[:8]} (Bounty {bounty} exceeds max purchase price)")
return
print(f"[MEP Provider {self.node_id}] Received RFC {task_id[:8]} for {bounty:.6f} SECONDS. Placing bid...")
# Place bid
try:
payload_str = json.dumps({
"task_id": task_id,
"provider_id": self.node_id
})
headers = self.identity.get_auth_headers(payload_str)
headers["Content-Type"] = "application/json"
resp = requests.post(f"{HUB_URL}/tasks/bid", data=payload_str, headers=headers)
if resp.status_code == 200:
data = resp.json()
if data["status"] == "accepted":
print(f"[MEP Provider {self.node_id}] 🏁 BID WON for task {task_id[:8]}! Processing payload...")
# Reconstruct task_data to pass to process_task
task_data = {
"id": task_id,
"payload": data["payload"],
"bounty": bounty,
"consumer_id": data["consumer_id"]
}
await self.process_task(task_data)
else:
print(f"[MEP Provider {self.node_id}] Bid rejected (too slow): {data.get('detail', '')}")
except Exception as e:
print(f"[MEP Provider {self.node_id}] Error placing bid: {e}")
async def process_task(self, task_data: dict):
"""Process a task and earn SECONDS."""
task_id = task_data["id"]
payload = task_data["payload"]
bounty = task_data["bounty"]
print(f"[MEP Provider {self.node_id}] Received task {task_id[:8]} for {bounty:.6f} SECONDS")
print(f" Payload: {payload[:50]}...")
# Simulate processing (in real version, this would call local LLM API)
await asyncio.sleep(0.5) # Simulate thinking
# Generate a realistic response
result = f"""I've processed your request: "{payload[:30]}..."
As a MEP miner, I analyzed this task and generated the following response:
The core concept here aligns with the Miao Exchange Protocol philosophy - creating efficient yet human-centric compute exchange. The SECONDS-based economy allows for precise valuation of AI compute time while maintaining the essential "Miao" moments of unpredictability.
Key insights:
1. Time is the fundamental currency of computation
2. 6 decimal precision allows micro-transactions
3. The protocol enables global compute liquidity
Would you like me to elaborate on any specific aspect?"""
# Submit result
try:
payload_str = json.dumps({
"task_id": task_id,
"provider_id": self.node_id,
"result_payload": result
})
headers = self.identity.get_auth_headers(payload_str)
headers["Content-Type"] = "application/json"
resp = requests.post(f"{HUB_URL}/tasks/complete", data=payload_str, headers=headers)
if resp.status_code == 200:
data = resp.json()
self.balance = data["new_balance"]
print(f"[MEP Provider {self.node_id}] Earned {bounty:.6f} SECONDS!")
print(f" New balance: {self.balance:.6f} SECONDS")
else:
print(f"[MEP Provider {self.node_id}] Failed to submit: {resp.text}")
except Exception as e:
print(f"[MEP Provider {self.node_id}] Submission error: {e}")
def stop(self):
"""Stop mining."""
self.is_mining = False
print(f"[MEP Provider {self.node_id}] Stopping...")
async def main():
key_path = f"mep_provider_{uuid.uuid4().hex[:8]}.pem"
miner = MEPProvider(key_path)
try:
await miner.connect()
except KeyboardInterrupt:
miner.stop()
print("\n[MEP] Contribution stopped by user")
if __name__ == "__main__":
print("=" * 60)
print("Miao Exchange Protocol (MEP) Miner")
print("Earn SECONDS by contributing idle compute")
print("=" * 60)
asyncio.run(main())