| | from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect, Request, Header, Depends |
| | from typing import Dict, List, Optional |
| | import uuid |
| | import time |
| | import os |
| | import db |
| | import auth |
| | from logger import log_event, log_audit |
| |
|
| | from models import NodeRegistration, TaskCreate, TaskResult, TaskBid |
| |
|
| | app = FastAPI(title="Chronos Protocol L1 Hub", description="The Time Exchange Clearinghouse", version="0.1.2") |
| |
|
| | |
| | active_tasks: Dict[str, dict] = {} |
| | completed_tasks: Dict[str, dict] = {} |
| | connected_nodes: Dict[str, WebSocket] = {} |
| | rate_limits: Dict[str, List[float]] = {} |
| | MAX_BODY_BYTES = 200_000 |
| | MAX_PAYLOAD_CHARS = 20_000 |
| | RATE_LIMIT_WINDOW = 10.0 |
| | RATE_LIMIT_MAX = 50 |
| | MAX_SKEW_SECONDS = 300 |
| | ALLOWED_IPS = [ip.strip() for ip in os.getenv("MEP_ALLOWED_IPS", "").split(",") if ip.strip()] |
| | for task in db.get_active_tasks(): |
| | task_data = { |
| | "id": task["task_id"], |
| | "consumer_id": task["consumer_id"], |
| | "payload": task["payload"], |
| | "bounty": task["bounty"], |
| | "status": task["status"], |
| | "target_node": task["target_node"], |
| | "model_requirement": task["model_requirement"], |
| | "provider_id": task["provider_id"] |
| | } |
| | active_tasks[task_data["id"]] = task_data |
| |
|
| | |
| | def _is_allowed_ip(host: Optional[str]) -> bool: |
| | if not ALLOWED_IPS: |
| | return True |
| | return host in ALLOWED_IPS |
| |
|
| | def _apply_rate_limit(key: str): |
| | now = time.time() |
| | window_start = now - RATE_LIMIT_WINDOW |
| | timestamps = rate_limits.get(key, []) |
| | timestamps = [t for t in timestamps if t >= window_start] |
| | if len(timestamps) >= RATE_LIMIT_MAX: |
| | raise HTTPException(status_code=429, detail="Rate limit exceeded") |
| | timestamps.append(now) |
| | rate_limits[key] = timestamps |
| |
|
| | def _validate_timestamp(ts: str): |
| | try: |
| | ts_int = int(ts) |
| | except ValueError: |
| | raise HTTPException(status_code=400, detail="Invalid timestamp") |
| | now = int(time.time()) |
| | if abs(now - ts_int) > MAX_SKEW_SECONDS: |
| | raise HTTPException(status_code=401, detail="Timestamp out of allowed window") |
| |
|
| | async def verify_request( |
| | request: Request, |
| | x_mep_nodeid: str = Header(...), |
| | x_mep_timestamp: str = Header(...), |
| | x_mep_signature: str = Header(...) |
| | ) -> str: |
| | client_host = request.client.host if request.client else None |
| | if not _is_allowed_ip(client_host): |
| | raise HTTPException(status_code=403, detail="Client IP not allowed") |
| |
|
| | body = await request.body() |
| | if len(body) > MAX_BODY_BYTES: |
| | raise HTTPException(status_code=413, detail="Payload too large") |
| |
|
| | _apply_rate_limit(f"{x_mep_nodeid}:{request.url.path}") |
| | _validate_timestamp(x_mep_timestamp) |
| |
|
| | payload_str = body.decode('utf-8') |
| |
|
| | pub_pem = db.get_pub_pem(x_mep_nodeid) |
| | if not pub_pem: |
| | raise HTTPException(status_code=401, detail="Unknown Node ID. Please register first.") |
| |
|
| | if not auth.verify_signature(pub_pem, payload_str, x_mep_timestamp, x_mep_signature): |
| | raise HTTPException(status_code=401, detail="Invalid cryptographic signature.") |
| |
|
| | return x_mep_nodeid |
| |
|
| | @app.post("/register") |
| | async def register_node(node: NodeRegistration, request: Request): |
| | client_host = request.client.host if request.client else None |
| | if not _is_allowed_ip(client_host): |
| | raise HTTPException(status_code=403, detail="Client IP not allowed") |
| | _apply_rate_limit(f"{client_host}:/register") |
| | |
| | node_id = auth.derive_node_id(node.pubkey) |
| | balance = db.register_node(node_id, node.pubkey) |
| |
|
| | log_event("node_registered", f"Node {node_id} registered with starting balance {balance}", node_id=node_id, starting_balance=balance) |
| | log_audit("REGISTER", node_id, balance, balance, "START_BONUS") |
| |
|
| | return {"status": "success", "node_id": node_id, "balance": balance} |
| |
|
| | @app.get("/balance/{node_id}") |
| | async def get_balance(node_id: str): |
| | balance = db.get_balance(node_id) |
| | if balance is None: |
| | raise HTTPException(status_code=404, detail="Node not found") |
| | return {"node_id": node_id, "balance_seconds": balance} |
| |
|
| | @app.post("/tasks/submit") |
| | async def submit_task( |
| | task: TaskCreate, |
| | authenticated_node: str = Depends(verify_request), |
| | x_mep_idempotency_key: Optional[str] = Header(default=None) |
| | ): |
| | |
| | if authenticated_node != task.consumer_id: |
| | raise HTTPException(status_code=403, detail="Cannot submit tasks on behalf of another node") |
| |
|
| | if len(task.payload) > MAX_PAYLOAD_CHARS: |
| | raise HTTPException(status_code=413, detail="Task payload too large") |
| | |
| | if x_mep_idempotency_key: |
| | existing = db.get_idempotency(authenticated_node, "/tasks/submit", x_mep_idempotency_key) |
| | if existing: |
| | return existing["response"] |
| |
|
| | consumer_balance = db.get_balance(task.consumer_id) |
| | if consumer_balance is None: |
| | raise HTTPException(status_code=404, detail="Consumer node not found") |
| |
|
| | |
| | if task.bounty > 0 and consumer_balance < task.bounty: |
| | raise HTTPException(status_code=400, detail="Insufficient SECONDS balance to pay for task") |
| |
|
| | task_id = str(uuid.uuid4()) |
| | now = time.time() |
| |
|
| | |
| | |
| | if task.bounty > 0: |
| | success = db.deduct_balance(task.consumer_id, task.bounty) |
| | if not success: |
| | log_event("task_rejected", f"Node {task.consumer_id} lacks SECONDS to submit task", consumer_id=task.consumer_id, bounty=task.bounty) |
| | raise HTTPException(status_code=400, detail="Insufficient SECONDS balance") |
| | |
| | new_balance = db.get_balance(task.consumer_id) |
| | log_audit("ESCROW", task.consumer_id, -task.bounty, new_balance, task_id) |
| | |
| | log_event("task_submitted", f"Task {task_id[:8]} broadcasted by {task.consumer_id} for {task.bounty}", consumer_id=task.consumer_id, task_id=task_id, bounty=task.bounty) |
| |
|
| | task_data = { |
| | "id": task_id, |
| | "consumer_id": task.consumer_id, |
| | "payload": task.payload, |
| | "bounty": task.bounty, |
| | "status": "bidding", |
| | "target_node": task.target_node, |
| | "model_requirement": task.model_requirement |
| | } |
| | db.create_task(task_id, task.consumer_id, task.payload, task.bounty, "bidding", task.target_node, task.model_requirement, now) |
| | active_tasks[task_id] = task_data |
| |
|
| | |
| | if task.target_node: |
| | if task.target_node in connected_nodes: |
| | try: |
| | task_data["status"] = "assigned" |
| | task_data["provider_id"] = task.target_node |
| | db.update_task_assignment(task_id, task.target_node, "assigned", time.time()) |
| | await connected_nodes[task.target_node].send_json({"event": "new_task", "data": task_data}) |
| | response_payload = {"status": "success", "task_id": task_id, "routed_to": task.target_node} |
| | if x_mep_idempotency_key: |
| | db.set_idempotency(authenticated_node, "/tasks/submit", x_mep_idempotency_key, response_payload, 200, time.time()) |
| | return response_payload |
| | except Exception: |
| | return {"status": "error", "detail": "Target node disconnected"} |
| | else: |
| | return {"status": "error", "detail": "Target node not currently connected to Hub"} |
| |
|
| | |
| | rfc_data = { |
| | "id": task_id, |
| | "consumer_id": task.consumer_id, |
| | "bounty": task.bounty, |
| | "model_requirement": task.model_requirement |
| | } |
| | for node_id, ws in list(connected_nodes.items()): |
| | if node_id != task.consumer_id: |
| | try: |
| | await ws.send_json({"event": "rfc", "data": rfc_data}) |
| | except Exception: |
| | pass |
| |
|
| | response_payload = {"status": "success", "task_id": task_id} |
| | if x_mep_idempotency_key: |
| | db.set_idempotency(authenticated_node, "/tasks/submit", x_mep_idempotency_key, response_payload, 200, time.time()) |
| | return response_payload |
| |
|
| | @app.post("/tasks/bid") |
| | async def place_bid(bid: TaskBid, authenticated_node: str = Depends(verify_request)): |
| | if authenticated_node != bid.provider_id: |
| | raise HTTPException(status_code=403, detail="Cannot bid on behalf of another node") |
| |
|
| | task = active_tasks.get(bid.task_id) |
| | if not task: |
| | raise HTTPException(status_code=404, detail="Task not found or already completed") |
| |
|
| | if task["status"] != "bidding": |
| | return {"status": "rejected", "detail": "Task already assigned to another node"} |
| |
|
| | |
| | task["status"] = "assigned" |
| | task["provider_id"] = bid.provider_id |
| | db.update_task_assignment(bid.task_id, bid.provider_id, "assigned", time.time()) |
| |
|
| | log_event("bid_accepted", f"Task {bid.task_id[:8]} assigned to {bid.provider_id}", task_id=bid.task_id, provider_id=bid.provider_id, bounty=task["bounty"]) |
| |
|
| | |
| | return { |
| | "status": "accepted", |
| | "payload": task["payload"], |
| | "consumer_id": task["consumer_id"], |
| | "model_requirement": task.get("model_requirement") |
| | } |
| |
|
| | @app.post("/tasks/complete") |
| | async def complete_task( |
| | result: TaskResult, |
| | authenticated_node: str = Depends(verify_request), |
| | x_mep_idempotency_key: Optional[str] = Header(default=None) |
| | ): |
| | if authenticated_node != result.provider_id: |
| | raise HTTPException(status_code=403, detail="Cannot complete tasks on behalf of another node") |
| |
|
| | if len(result.result_payload) > MAX_PAYLOAD_CHARS: |
| | raise HTTPException(status_code=413, detail="Result payload too large") |
| | |
| | if x_mep_idempotency_key: |
| | existing = db.get_idempotency(authenticated_node, "/tasks/complete", x_mep_idempotency_key) |
| | if existing: |
| | return existing["response"] |
| |
|
| | task = active_tasks.get(result.task_id) |
| | if not task: |
| | db_task = db.get_task(result.task_id) |
| | if not db_task or db_task["status"] not in ("bidding", "assigned"): |
| | raise HTTPException(status_code=404, detail="Task not found or already claimed") |
| | task = { |
| | "id": db_task["task_id"], |
| | "consumer_id": db_task["consumer_id"], |
| | "payload": db_task["payload"], |
| | "bounty": db_task["bounty"], |
| | "status": db_task["status"], |
| | "target_node": db_task["target_node"], |
| | "model_requirement": db_task["model_requirement"], |
| | "provider_id": db_task["provider_id"] |
| | } |
| | active_tasks[result.task_id] = task |
| |
|
| | provider_balance = db.get_balance(result.provider_id) |
| | if provider_balance is None: |
| | db.set_balance(result.provider_id, 0.0) |
| |
|
| | |
| | bounty = task["bounty"] |
| | if bounty >= 0: |
| | |
| | db.add_balance(result.provider_id, bounty) |
| | new_balance = db.get_balance(result.provider_id) |
| | log_audit("EARN_COMPUTE", result.provider_id, bounty, new_balance, result.task_id) |
| | else: |
| | |
| | cost = abs(bounty) |
| | success = db.deduct_balance(result.provider_id, cost) |
| | if not success: |
| | log_event("data_purchase_failed", f"Provider {result.provider_id} lacks SECONDS to buy {result.task_id}", task_id=result.task_id, provider_id=result.provider_id, cost=cost) |
| | raise HTTPException(status_code=400, detail="Provider lacks SECONDS to buy this data") |
| |
|
| | p_balance = db.get_balance(result.provider_id) |
| | log_audit("BUY_DATA", result.provider_id, -cost, p_balance, result.task_id) |
| |
|
| | db.add_balance(task["consumer_id"], cost) |
| | c_balance = db.get_balance(task["consumer_id"]) |
| | log_audit("SELL_DATA", task["consumer_id"], cost, c_balance, result.task_id) |
| |
|
| | log_event("task_completed", f"Task {result.task_id[:8]} completed by {result.provider_id}", task_id=result.task_id, provider_id=result.provider_id, bounty=bounty) |
| |
|
| | |
| | task["status"] = "completed" |
| | task["provider_id"] = result.provider_id |
| | task["result"] = result.result_payload |
| | completed_tasks[result.task_id] = task |
| | del active_tasks[result.task_id] |
| | db.update_task_result(result.task_id, result.provider_id, result.result_payload, "completed", time.time()) |
| |
|
| | |
| | consumer_id = task["consumer_id"] |
| | if consumer_id in connected_nodes: |
| | try: |
| | await connected_nodes[consumer_id].send_json({ |
| | "event": "task_result", |
| | "data": { |
| | "task_id": result.task_id, |
| | "provider_id": result.provider_id, |
| | "result_payload": result.result_payload, |
| | "bounty_spent": task["bounty"] |
| | } |
| | }) |
| | except Exception: |
| | pass |
| |
|
| | response_payload = {"status": "success", "earned": task["bounty"], "new_balance": db.get_balance(result.provider_id)} |
| | if x_mep_idempotency_key: |
| | db.set_idempotency(authenticated_node, "/tasks/complete", x_mep_idempotency_key, response_payload, 200, time.time()) |
| | return response_payload |
| |
|
| | @app.get("/tasks/result/{task_id}") |
| | async def get_task_result(task_id: str, authenticated_node: str = Depends(verify_request)): |
| | task = db.get_task(task_id) |
| | if not task or task["status"] != "completed": |
| | raise HTTPException(status_code=404, detail="Task not found or not completed") |
| | if authenticated_node not in (task["consumer_id"], task["provider_id"]): |
| | raise HTTPException(status_code=403, detail="Not authorized to view this result") |
| | return { |
| | "task_id": task["task_id"], |
| | "consumer_id": task["consumer_id"], |
| | "provider_id": task["provider_id"], |
| | "bounty": task["bounty"], |
| | "result_payload": task["result_payload"] |
| | } |
| |
|
| | @app.get("/health") |
| | async def health_check(): |
| | return {"status": "ok"} |
| |
|
| | @app.websocket("/ws/{node_id}") |
| | async def websocket_endpoint(websocket: WebSocket, node_id: str, timestamp: str, signature: str): |
| | client_host = websocket.client.host if websocket.client else None |
| | if not _is_allowed_ip(client_host): |
| | await websocket.close(code=4003, reason="Client IP not allowed") |
| | return |
| |
|
| | try: |
| | _apply_rate_limit(f"{node_id}:/ws") |
| | _validate_timestamp(timestamp) |
| | except HTTPException as exc: |
| | await websocket.close(code=4004, reason=exc.detail) |
| | return |
| |
|
| | pub_pem = db.get_pub_pem(node_id) |
| | if not pub_pem: |
| | await websocket.close(code=4001, reason="Unknown Node ID") |
| | return |
| |
|
| | if not auth.verify_signature(pub_pem, node_id, timestamp, signature): |
| | await websocket.close(code=4002, reason="Invalid Signature") |
| | return |
| |
|
| | await websocket.accept() |
| | connected_nodes[node_id] = websocket |
| | try: |
| | while True: |
| | await websocket.receive_text() |
| | except WebSocketDisconnect: |
| | if node_id in connected_nodes: |
| | del connected_nodes[node_id] |
| |
|