File size: 7,293 Bytes
84506ca
 
 
 
 
 
 
 
 
 
 
 
a25490a
 
84506ca
 
 
a25490a
84506ca
 
 
 
c412eea
a25490a
 
 
84506ca
 
 
 
 
bdfaf81
84506ca
 
 
a25490a
84506ca
 
bdfaf81
84506ca
bdfaf81
84506ca
 
 
a25490a
 
 
 
84506ca
 
bdfaf81
84506ca
 
 
 
 
 
 
 
 
a890f8f
 
84506ca
 
 
 
bdfaf81
84506ca
 
 
bdfaf81
84506ca
a890f8f
 
 
 
bdfaf81
 
 
 
a890f8f
 
bdfaf81
a890f8f
 
 
a25490a
a890f8f
 
 
a25490a
 
 
a890f8f
 
 
 
bdfaf81
a890f8f
 
 
 
 
 
 
 
 
 
bdfaf81
a890f8f
bdfaf81
a890f8f
84506ca
 
 
 
 
bdfaf81
84506ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a25490a
84506ca
 
 
 
a25490a
 
 
84506ca
 
 
 
bdfaf81
84506ca
 
bdfaf81
84506ca
 
bdfaf81
84506ca
 
 
 
bdfaf81
84506ca
 
a25490a
 
84506ca
 
 
 
 
c412eea
84506ca
 
 
 
c412eea
84506ca
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python3
"""
Miao Exchange Protocol (MEP) Miner
A sleeping node that earns SECONDS by processing tasks.
"""
import asyncio
import json
import websockets
import requests
import uuid
import sys
import os
import time
import urllib.parse

# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from identity import MEPIdentity

HUB_URL = "http://localhost:8000"
WS_URL = "ws://localhost:8000"

class MEPProvider:
    def __init__(self, key_path: str):
        self.identity = MEPIdentity(key_path)
        self.node_id = self.identity.node_id
        self.balance = 0.0
        self.is_mining = True
        
    async def connect(self):
        """Connect to MEP Hub and start mining."""
        print(f"[MEP Provider {self.node_id}] Starting...")
        
        # Register with hub
        try:
            resp = requests.post(f"{HUB_URL}/register", json={"pubkey": self.identity.pub_pem})
            data = resp.json()
            self.balance = data.get("balance", 0.0)
            print(f"[MEP Provider {self.node_id}] Registered. Balance: {self.balance:.6f} SECONDS")
        except Exception as e:
            print(f"[MEP Provider {self.node_id}] Registration failed: {e}")
            return
        
        # Connect to WebSocket
        ts = str(int(time.time()))
        sig = self.identity.sign(self.node_id, ts)
        sig_safe = urllib.parse.quote(sig)
        uri = f"{WS_URL}/ws/{self.node_id}?timestamp={ts}&signature={sig_safe}"
        try:
            async with websockets.connect(uri) as ws:
                print(f"[MEP Provider {self.node_id}] Connected to MEP Hub")
                
                # Listen for tasks
                while self.is_mining:
                    try:
                        msg = await asyncio.wait_for(ws.recv(), timeout=1.0)
                        data = json.loads(msg)
                        
                        if data["event"] == "new_task":
                            await self.process_task(data["data"])
                        elif data["event"] == "rfc":
                            await self.handle_rfc(data["data"])
                            
                    except asyncio.TimeoutError:
                        continue  # Keep connection alive
                    except websockets.exceptions.ConnectionClosed:
                        print(f"[MEP Provider {self.node_id}] Connection closed")
                        break
                        
        except Exception as e:
            print(f"[MEP Provider {self.node_id}] WebSocket error: {e}")
    
    async def handle_rfc(self, rfc_data: dict):
        """Phase 2: Evaluate Request For Compute and submit Bid."""
        task_id = rfc_data["id"]
        bounty = rfc_data["bounty"]
        # SAFETY SWITCH: Prevent purchasing data unless explicitly allowed
        max_purchase_price = 0.0 # Set to e.g., -5.0 to buy premium data
        if bounty < max_purchase_price:
            print(f"[MEP Provider {self.node_id}] Ignored RFC {task_id[:8]} (Bounty {bounty} exceeds max purchase price)")
            return
            
        print(f"[MEP Provider {self.node_id}] Received RFC {task_id[:8]} for {bounty:.6f} SECONDS. Placing bid...")
        
        # Place bid
        try:
            payload_str = json.dumps({
                "task_id": task_id,
                "provider_id": self.node_id
            })
            headers = self.identity.get_auth_headers(payload_str)
            headers["Content-Type"] = "application/json"
            resp = requests.post(f"{HUB_URL}/tasks/bid", data=payload_str, headers=headers)
            
            if resp.status_code == 200:
                data = resp.json()
                if data["status"] == "accepted":
                    print(f"[MEP Provider {self.node_id}] 🏁 BID WON for task {task_id[:8]}! Processing payload...")
                    
                    # Reconstruct task_data to pass to process_task
                    task_data = {
                        "id": task_id,
                        "payload": data["payload"],
                        "bounty": bounty,
                        "consumer_id": data["consumer_id"]
                    }
                    await self.process_task(task_data)
                else:
                    print(f"[MEP Provider {self.node_id}] Bid rejected (too slow): {data.get('detail', '')}")
        except Exception as e:
            print(f"[MEP Provider {self.node_id}] Error placing bid: {e}")

    async def process_task(self, task_data: dict):
        """Process a task and earn SECONDS."""
        task_id = task_data["id"]
        payload = task_data["payload"]
        bounty = task_data["bounty"]
        print(f"[MEP Provider {self.node_id}] Received task {task_id[:8]} for {bounty:.6f} SECONDS")
        print(f"  Payload: {payload[:50]}...")
        
        # Simulate processing (in real version, this would call local LLM API)
        await asyncio.sleep(0.5)  # Simulate thinking
        
        # Generate a realistic response
        result = f"""I've processed your request: "{payload[:30]}..."

As a MEP miner, I analyzed this task and generated the following response:

The core concept here aligns with the Miao Exchange Protocol philosophy - creating efficient yet human-centric compute exchange. The SECONDS-based economy allows for precise valuation of AI compute time while maintaining the essential "Miao" moments of unpredictability.

Key insights:
1. Time is the fundamental currency of computation
2. 6 decimal precision allows micro-transactions
3. The protocol enables global compute liquidity

Would you like me to elaborate on any specific aspect?"""
        
        # Submit result
        try:
            payload_str = json.dumps({
                "task_id": task_id,
                "provider_id": self.node_id,
                "result_payload": result
            })
            headers = self.identity.get_auth_headers(payload_str)
            headers["Content-Type"] = "application/json"
            resp = requests.post(f"{HUB_URL}/tasks/complete", data=payload_str, headers=headers)
            
            if resp.status_code == 200:
                data = resp.json()
                self.balance = data["new_balance"]
                print(f"[MEP Provider {self.node_id}] Earned {bounty:.6f} SECONDS!")
                print(f"  New balance: {self.balance:.6f} SECONDS")
            else:
                print(f"[MEP Provider {self.node_id}] Failed to submit: {resp.text}")
                
        except Exception as e:
            print(f"[MEP Provider {self.node_id}] Submission error: {e}")
    
    def stop(self):
        """Stop mining."""
        self.is_mining = False
        print(f"[MEP Provider {self.node_id}] Stopping...")

async def main():
    key_path = f"mep_provider_{uuid.uuid4().hex[:8]}.pem"
    miner = MEPProvider(key_path)
    
    try:
        await miner.connect()
    except KeyboardInterrupt:
        miner.stop()
        print("\n[MEP] Contribution stopped by user")

if __name__ == "__main__":
    print("=" * 60)
    print("Miao Exchange Protocol (MEP) Miner")
    print("Earn SECONDS by contributing idle compute")
    print("=" * 60)
    asyncio.run(main())