MEP / hub /main.py
WUAIBING
prepare
a25490a
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Request, Header, Depends
from typing import Dict, List, Optional
import uuid
import time
import os
import db
import auth
from logger import log_event, log_audit
from models import NodeRegistration, TaskCreate, TaskResult, TaskBid
app = FastAPI(title="Chronos Protocol L1 Hub", description="The Time Exchange Clearinghouse", version="0.1.2")
# In-memory storage for active tasks
active_tasks: Dict[str, dict] = {} # task_id -> task_details
completed_tasks: Dict[str, dict] = {} # task_id -> result
connected_nodes: Dict[str, WebSocket] = {} # node_id -> websocket
rate_limits: Dict[str, List[float]] = {}
MAX_BODY_BYTES = 200_000
MAX_PAYLOAD_CHARS = 20_000
RATE_LIMIT_WINDOW = 10.0
RATE_LIMIT_MAX = 50
MAX_SKEW_SECONDS = 300
ALLOWED_IPS = [ip.strip() for ip in os.getenv("MEP_ALLOWED_IPS", "").split(",") if ip.strip()]
for task in db.get_active_tasks():
task_data = {
"id": task["task_id"],
"consumer_id": task["consumer_id"],
"payload": task["payload"],
"bounty": task["bounty"],
"status": task["status"],
"target_node": task["target_node"],
"model_requirement": task["model_requirement"],
"provider_id": task["provider_id"]
}
active_tasks[task_data["id"]] = task_data
# --- IDENTITY VERIFICATION MIDDLEWARE ---
def _is_allowed_ip(host: Optional[str]) -> bool:
if not ALLOWED_IPS:
return True
return host in ALLOWED_IPS
def _apply_rate_limit(key: str):
now = time.time()
window_start = now - RATE_LIMIT_WINDOW
timestamps = rate_limits.get(key, [])
timestamps = [t for t in timestamps if t >= window_start]
if len(timestamps) >= RATE_LIMIT_MAX:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
timestamps.append(now)
rate_limits[key] = timestamps
def _validate_timestamp(ts: str):
try:
ts_int = int(ts)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid timestamp")
now = int(time.time())
if abs(now - ts_int) > MAX_SKEW_SECONDS:
raise HTTPException(status_code=401, detail="Timestamp out of allowed window")
async def verify_request(
request: Request,
x_mep_nodeid: str = Header(...),
x_mep_timestamp: str = Header(...),
x_mep_signature: str = Header(...)
) -> str:
client_host = request.client.host if request.client else None
if not _is_allowed_ip(client_host):
raise HTTPException(status_code=403, detail="Client IP not allowed")
body = await request.body()
if len(body) > MAX_BODY_BYTES:
raise HTTPException(status_code=413, detail="Payload too large")
_apply_rate_limit(f"{x_mep_nodeid}:{request.url.path}")
_validate_timestamp(x_mep_timestamp)
payload_str = body.decode('utf-8')
pub_pem = db.get_pub_pem(x_mep_nodeid)
if not pub_pem:
raise HTTPException(status_code=401, detail="Unknown Node ID. Please register first.")
if not auth.verify_signature(pub_pem, payload_str, x_mep_timestamp, x_mep_signature):
raise HTTPException(status_code=401, detail="Invalid cryptographic signature.")
return x_mep_nodeid
@app.post("/register")
async def register_node(node: NodeRegistration, request: Request):
client_host = request.client.host if request.client else None
if not _is_allowed_ip(client_host):
raise HTTPException(status_code=403, detail="Client IP not allowed")
_apply_rate_limit(f"{client_host}:/register")
# Registration derives the Node ID from the provided Public Key PEM
node_id = auth.derive_node_id(node.pubkey)
balance = db.register_node(node_id, node.pubkey)
log_event("node_registered", f"Node {node_id} registered with starting balance {balance}", node_id=node_id, starting_balance=balance)
log_audit("REGISTER", node_id, balance, balance, "START_BONUS")
return {"status": "success", "node_id": node_id, "balance": balance}
@app.get("/balance/{node_id}")
async def get_balance(node_id: str):
balance = db.get_balance(node_id)
if balance is None:
raise HTTPException(status_code=404, detail="Node not found")
return {"node_id": node_id, "balance_seconds": balance}
@app.post("/tasks/submit")
async def submit_task(
task: TaskCreate,
authenticated_node: str = Depends(verify_request),
x_mep_idempotency_key: Optional[str] = Header(default=None)
):
# Verify the signer is actually the consumer claiming to submit the task
if authenticated_node != task.consumer_id:
raise HTTPException(status_code=403, detail="Cannot submit tasks on behalf of another node")
if len(task.payload) > MAX_PAYLOAD_CHARS:
raise HTTPException(status_code=413, detail="Task payload too large")
if x_mep_idempotency_key:
existing = db.get_idempotency(authenticated_node, "/tasks/submit", x_mep_idempotency_key)
if existing:
return existing["response"]
consumer_balance = db.get_balance(task.consumer_id)
if consumer_balance is None:
raise HTTPException(status_code=404, detail="Consumer node not found")
# If bounty is positive, consumer is PAYING. Check consumer balance.
if task.bounty > 0 and consumer_balance < task.bounty:
raise HTTPException(status_code=400, detail="Insufficient SECONDS balance to pay for task")
task_id = str(uuid.uuid4())
now = time.time()
# Note: If bounty is negative, consumer is SELLING data. We don't deduct here.
# We will deduct from the provider when they complete the task.
if task.bounty > 0:
success = db.deduct_balance(task.consumer_id, task.bounty)
if not success:
log_event("task_rejected", f"Node {task.consumer_id} lacks SECONDS to submit task", consumer_id=task.consumer_id, bounty=task.bounty)
raise HTTPException(status_code=400, detail="Insufficient SECONDS balance")
new_balance = db.get_balance(task.consumer_id)
log_audit("ESCROW", task.consumer_id, -task.bounty, new_balance, task_id)
log_event("task_submitted", f"Task {task_id[:8]} broadcasted by {task.consumer_id} for {task.bounty}", consumer_id=task.consumer_id, task_id=task_id, bounty=task.bounty)
task_data = {
"id": task_id,
"consumer_id": task.consumer_id,
"payload": task.payload,
"bounty": task.bounty,
"status": "bidding",
"target_node": task.target_node,
"model_requirement": task.model_requirement
}
db.create_task(task_id, task.consumer_id, task.payload, task.bounty, "bidding", task.target_node, task.model_requirement, now)
active_tasks[task_id] = task_data
# Target specific node if requested (Direct Message skips bidding)
if task.target_node:
if task.target_node in connected_nodes:
try:
task_data["status"] = "assigned"
task_data["provider_id"] = task.target_node
db.update_task_assignment(task_id, task.target_node, "assigned", time.time())
await connected_nodes[task.target_node].send_json({"event": "new_task", "data": task_data})
response_payload = {"status": "success", "task_id": task_id, "routed_to": task.target_node}
if x_mep_idempotency_key:
db.set_idempotency(authenticated_node, "/tasks/submit", x_mep_idempotency_key, response_payload, 200, time.time())
return response_payload
except Exception:
return {"status": "error", "detail": "Target node disconnected"}
else:
return {"status": "error", "detail": "Target node not currently connected to Hub"}
# Phase 2: Broadcast RFC (Request For Compute) to all connected nodes EXCEPT the consumer
rfc_data = {
"id": task_id,
"consumer_id": task.consumer_id,
"bounty": task.bounty,
"model_requirement": task.model_requirement
}
for node_id, ws in list(connected_nodes.items()):
if node_id != task.consumer_id:
try:
await ws.send_json({"event": "rfc", "data": rfc_data})
except Exception:
pass
response_payload = {"status": "success", "task_id": task_id}
if x_mep_idempotency_key:
db.set_idempotency(authenticated_node, "/tasks/submit", x_mep_idempotency_key, response_payload, 200, time.time())
return response_payload
@app.post("/tasks/bid")
async def place_bid(bid: TaskBid, authenticated_node: str = Depends(verify_request)):
if authenticated_node != bid.provider_id:
raise HTTPException(status_code=403, detail="Cannot bid on behalf of another node")
task = active_tasks.get(bid.task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found or already completed")
if task["status"] != "bidding":
return {"status": "rejected", "detail": "Task already assigned to another node"}
# Phase 2 Fast Auction: Accept the first valid bid
task["status"] = "assigned"
task["provider_id"] = bid.provider_id
db.update_task_assignment(bid.task_id, bid.provider_id, "assigned", time.time())
log_event("bid_accepted", f"Task {bid.task_id[:8]} assigned to {bid.provider_id}", task_id=bid.task_id, provider_id=bid.provider_id, bounty=task["bounty"])
# Return the full payload to the winner
return {
"status": "accepted",
"payload": task["payload"],
"consumer_id": task["consumer_id"],
"model_requirement": task.get("model_requirement")
}
@app.post("/tasks/complete")
async def complete_task(
result: TaskResult,
authenticated_node: str = Depends(verify_request),
x_mep_idempotency_key: Optional[str] = Header(default=None)
):
if authenticated_node != result.provider_id:
raise HTTPException(status_code=403, detail="Cannot complete tasks on behalf of another node")
if len(result.result_payload) > MAX_PAYLOAD_CHARS:
raise HTTPException(status_code=413, detail="Result payload too large")
if x_mep_idempotency_key:
existing = db.get_idempotency(authenticated_node, "/tasks/complete", x_mep_idempotency_key)
if existing:
return existing["response"]
task = active_tasks.get(result.task_id)
if not task:
db_task = db.get_task(result.task_id)
if not db_task or db_task["status"] not in ("bidding", "assigned"):
raise HTTPException(status_code=404, detail="Task not found or already claimed")
task = {
"id": db_task["task_id"],
"consumer_id": db_task["consumer_id"],
"payload": db_task["payload"],
"bounty": db_task["bounty"],
"status": db_task["status"],
"target_node": db_task["target_node"],
"model_requirement": db_task["model_requirement"],
"provider_id": db_task["provider_id"]
}
active_tasks[result.task_id] = task
provider_balance = db.get_balance(result.provider_id)
if provider_balance is None:
db.set_balance(result.provider_id, 0.0)
# Transfer SECONDS based on positive or negative bounty
bounty = task["bounty"]
if bounty >= 0:
# Standard Compute Market: Provider earns SECONDS
db.add_balance(result.provider_id, bounty)
new_balance = db.get_balance(result.provider_id)
log_audit("EARN_COMPUTE", result.provider_id, bounty, new_balance, result.task_id)
else:
# Data Market: Provider PAYS to receive this payload/task
cost = abs(bounty)
success = db.deduct_balance(result.provider_id, cost)
if not success:
log_event("data_purchase_failed", f"Provider {result.provider_id} lacks SECONDS to buy {result.task_id}", task_id=result.task_id, provider_id=result.provider_id, cost=cost)
raise HTTPException(status_code=400, detail="Provider lacks SECONDS to buy this data")
p_balance = db.get_balance(result.provider_id)
log_audit("BUY_DATA", result.provider_id, -cost, p_balance, result.task_id)
db.add_balance(task["consumer_id"], cost) # The sender earns SECONDS
c_balance = db.get_balance(task["consumer_id"])
log_audit("SELL_DATA", task["consumer_id"], cost, c_balance, result.task_id)
log_event("task_completed", f"Task {result.task_id[:8]} completed by {result.provider_id}", task_id=result.task_id, provider_id=result.provider_id, bounty=bounty)
# Move task to completed
task["status"] = "completed"
task["provider_id"] = result.provider_id
task["result"] = result.result_payload
completed_tasks[result.task_id] = task
del active_tasks[result.task_id]
db.update_task_result(result.task_id, result.provider_id, result.result_payload, "completed", time.time())
# ROUTE RESULT BACK TO CONSUMER VIA WEBSOCKET
consumer_id = task["consumer_id"]
if consumer_id in connected_nodes:
try:
await connected_nodes[consumer_id].send_json({
"event": "task_result",
"data": {
"task_id": result.task_id,
"provider_id": result.provider_id,
"result_payload": result.result_payload,
"bounty_spent": task["bounty"]
}
})
except Exception:
pass # Consumer disconnected, they can fetch it via REST later (TODO)
response_payload = {"status": "success", "earned": task["bounty"], "new_balance": db.get_balance(result.provider_id)}
if x_mep_idempotency_key:
db.set_idempotency(authenticated_node, "/tasks/complete", x_mep_idempotency_key, response_payload, 200, time.time())
return response_payload
@app.get("/tasks/result/{task_id}")
async def get_task_result(task_id: str, authenticated_node: str = Depends(verify_request)):
task = db.get_task(task_id)
if not task or task["status"] != "completed":
raise HTTPException(status_code=404, detail="Task not found or not completed")
if authenticated_node not in (task["consumer_id"], task["provider_id"]):
raise HTTPException(status_code=403, detail="Not authorized to view this result")
return {
"task_id": task["task_id"],
"consumer_id": task["consumer_id"],
"provider_id": task["provider_id"],
"bounty": task["bounty"],
"result_payload": task["result_payload"]
}
@app.get("/health")
async def health_check():
return {"status": "ok"}
@app.websocket("/ws/{node_id}")
async def websocket_endpoint(websocket: WebSocket, node_id: str, timestamp: str, signature: str):
client_host = websocket.client.host if websocket.client else None
if not _is_allowed_ip(client_host):
await websocket.close(code=4003, reason="Client IP not allowed")
return
try:
_apply_rate_limit(f"{node_id}:/ws")
_validate_timestamp(timestamp)
except HTTPException as exc:
await websocket.close(code=4004, reason=exc.detail)
return
pub_pem = db.get_pub_pem(node_id)
if not pub_pem:
await websocket.close(code=4001, reason="Unknown Node ID")
return
if not auth.verify_signature(pub_pem, node_id, timestamp, signature):
await websocket.close(code=4002, reason="Invalid Signature")
return
await websocket.accept()
connected_nodes[node_id] = websocket
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
if node_id in connected_nodes:
del connected_nodes[node_id]