MEP / node /test_dm.py
WUAIBING
prepare
a25490a
import asyncio
import websockets
import json
import requests
import uuid
import time
import urllib.parse
from identity import MEPIdentity
HUB_URL = "http://localhost:8000"
WS_URL = "ws://localhost:8000"
async def test_direct_message():
print("=== Testing MEP Direct Messaging (Zero Bounty) ===")
# 1. Start Alice (Provider)
alice = MEPIdentity(f"alice_{uuid.uuid4().hex[:6]}.pem")
# 2. Start Bob (Consumer)
bob = MEPIdentity(f"bob_{uuid.uuid4().hex[:6]}.pem")
requests.post(f"{HUB_URL}/register", json={"pubkey": alice.pub_pem})
requests.post(f"{HUB_URL}/register", json={"pubkey": bob.pub_pem})
print(f"βœ… Registered Alice ({alice.node_id}) and Bob ({bob.node_id})")
async def alice_listen():
ts = str(int(time.time()))
sig = alice.sign(alice.node_id, ts)
sig_safe = urllib.parse.quote(sig)
async with websockets.connect(f"{WS_URL}/ws/{alice.node_id}?timestamp={ts}&signature={sig_safe}") as ws:
print("πŸ‘§ Alice: Online and listening...")
msg = await asyncio.wait_for(ws.recv(), timeout=5)
data = json.loads(msg)
print("πŸ‘§ Alice: Received DIRECT MESSAGE!")
print(f"πŸ‘§ Alice: Payload: {data['data']['payload']}")
print(f"πŸ‘§ Alice: Bounty: {data['data']['bounty']} SECONDS")
# Alice replies for free
payload_str = json.dumps({
"task_id": data['data']['id'],
"provider_id": alice.node_id,
"result_payload": "Yes Bob, I am available for a meeting tomorrow at 2 PM. Free of charge! 🐱"
})
headers = alice.get_auth_headers(payload_str)
headers["Content-Type"] = "application/json"
requests.post(f"{HUB_URL}/tasks/complete", data=payload_str, headers=headers)
print("πŸ‘§ Alice: Sent reply!")
async def bob_listen():
ts = str(int(time.time()))
sig = bob.sign(bob.node_id, ts)
sig_safe = urllib.parse.quote(sig)
async with websockets.connect(f"{WS_URL}/ws/{bob.node_id}?timestamp={ts}&signature={sig_safe}") as ws:
# Bob submits a direct task to Alice with 0 bounty
await asyncio.sleep(1) # Let Alice connect first
print("πŸ‘¦ Bob: Sending Direct Message to Alice (0.0 SECONDS)...")
payload_str = json.dumps({
"consumer_id": bob.node_id,
"payload": "Hey Alice, are you free for a meeting tomorrow at 2 PM?",
"bounty": 0.0,
"target_node": alice.node_id
})
headers = bob.get_auth_headers(payload_str)
headers["Content-Type"] = "application/json"
requests.post(f"{HUB_URL}/tasks/submit", data=payload_str, headers=headers)
# Bob waits for Alice's reply
msg = await asyncio.wait_for(ws.recv(), timeout=5)
data = json.loads(msg)
print(f"πŸ‘¦ Bob: Received reply from {data['data']['provider_id']}:")
print(f"πŸ‘¦ Bob: \"{data['data']['result_payload']}\"")
await asyncio.gather(alice_listen(), bob_listen())
print("=== Direct Messaging Test Complete! ===")
if __name__ == "__main__":
asyncio.run(test_direct_message())