""" Proxy API - Public-facing rate-limited API for EvolAI subnet Designed for deployment on Hugging Face Spaces """ from fastapi import Depends, FastAPI, Header, HTTPException, Request, Query from fastapi.middleware.cors import CORSMiddleware from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from typing import Any, Dict, List, Optional from pydantic import BaseModel, ValidationError from contextlib import asynccontextmanager import httpx import os import logging import hashlib import time try: from substrateinterface import Keypair as _Keypair except ImportError as _substrate_err: # pragma: no cover # Hard failure: substrate-interface is required for validator auth. # Raise immediately so the container shows a clear build/startup error # instead of silently accepting all requests and returning 401 forever. raise RuntimeError( "substrate-interface is required but not installed. " "Check that the Rust toolchain is present in the Dockerfile so " "py-sr25519-bindings compiles correctly. " f"Original error: {_substrate_err}" ) from _substrate_err # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuration REAL_API_URL = os.getenv("REAL_API_URL", "") if not REAL_API_URL: raise RuntimeError("REAL_API_URL environment variable is required") RATE_LIMIT_PER_MINUTE = os.getenv("RATE_LIMIT_PER_MINUTE", "1000") CACHE_TTL_SECONDS = int(os.getenv("CACHE_TTL_SECONDS", "10")) # TTL for read-only caches # Owner API key, forwarded to backend when calling validator-restricted endpoints. # Set this to the same value as OWNER_API_KEY in the manager deployment. OWNER_API_KEY = os.getenv("OWNER_API_KEY", "") # ==================== Validator Authentication ==================== def _verify_bittensor_signature(hotkey: str, message: str, signature_hex: str) -> bool: """Return True if *signature_hex* is a valid SR25519 signature of *message* produced by *hotkey*. """ try: sig_bytes = bytes.fromhex(signature_hex.removeprefix("0x")) kp = _Keypair(ss58_address=hotkey) return bool(kp.verify(message.encode(), sig_bytes)) except Exception: return False def _validate_validator_message(hotkey: str, message: str, ttl: int = 300) -> bool: """Return True if *message* has the correct format and is not older than *ttl* seconds. Expected format: ``evolai_validator:{hotkey}:{unix_timestamp}`` """ try: prefix, msg_hotkey, ts_str = message.split(":") if prefix != "evolai_validator": return False if msg_hotkey != hotkey: return False age = time.time() - int(ts_str) return 0 <= age <= ttl except Exception: return False async def require_validator_auth( request: Request, hotkey: str = Header(..., alias="X-Validator-Hotkey"), message: str = Header(..., alias="X-Validator-Message"), signature: str = Header(..., alias="X-Validator-Signature"), ) -> str: """FastAPI dependency that enforces Bittensor hotkey signature authentication. Callers must include three headers: - ``X-Validator-Hotkey``: SS58-encoded hotkey address - ``X-Validator-Message``: ``evolai_validator:{hotkey}:{unix_timestamp}`` - ``X-Validator-Signature``: hex-encoded SR25519 signature of the message """ if not _validate_validator_message(hotkey, message): raise HTTPException(status_code=401, detail="Invalid or expired validator message.") if not _verify_bittensor_signature(hotkey, message, signature): raise HTTPException(status_code=401, detail="Validator signature verification failed.") return hotkey # ==================== Request models (proxy validates before forwarding) ==================== class _EvalResult(BaseModel): miner_uid: int miner_hotkey: str raw_score: float effective_score: float dataset_distribution: Optional[Dict[str, Any]] = {} class _EvalSubmission(BaseModel): evaluation_round: int judge_model: str results: List[_EvalResult] # Simple in-process TTL cache for read endpoints _cache: Dict[str, tuple] = {} # key -> (data, expires_at) _CACHE_MAX_ENTRIES = 256 # hard cap to prevent unbounded memory growth def _cache_get(key: str): entry = _cache.get(key) if entry and time.monotonic() < entry[1]: return entry[0] return None def _cache_set(key: str, data, ttl: int = CACHE_TTL_SECONDS): # Evict expired entries first; if still over cap, drop oldest. now = time.monotonic() expired = [k for k, (_, exp) in _cache.items() if exp <= now] for k in expired: del _cache[k] if len(_cache) >= _CACHE_MAX_ENTRIES: # Drop the entry that expires soonest (oldest effective entry) oldest = min(_cache, key=lambda k: _cache[k][1]) del _cache[oldest] _cache[key] = (data, now + ttl) def get_rate_limit_key(request: Request) -> str: ip = get_remote_address(request) fingerprint = request.headers.get("X-Client-Fingerprint", "") if not fingerprint: user_agent = request.headers.get("User-Agent", "") fingerprint = hashlib.md5(user_agent.encode()).hexdigest()[:8] return f"{ip}:{fingerprint}" # Rate limiter with composite key (IP + fingerprint) limiter = Limiter(key_func=get_rate_limit_key) # HTTP client — pool capped to prevent connection exhaustion under load http_client: httpx.AsyncClient = None # initialised in lifespan @asynccontextmanager async def lifespan(app: FastAPI): global http_client http_client = httpx.AsyncClient( timeout=30.0, limits=httpx.Limits( max_connections=100, max_keepalive_connections=20, keepalive_expiry=30, ), ) yield await http_client.aclose() # FastAPI app app = FastAPI( docs_url=None, redoc_url=None, openapi_url=None, lifespan=lifespan, ) # Add rate limit exception handler app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # CORS — credentials must be False when origins is wildcard app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["GET", "POST"], allow_headers=[ "Content-Type", "X-Client-Fingerprint", "X-Validator-Hotkey", "X-Validator-Message", "X-Validator-Signature", ], ) # ==================== Dataset Endpoints (Public Read) ==================== @app.get("/datasets") @limiter.limit(f"{RATE_LIMIT_PER_MINUTE}/minute") async def list_datasets( request: Request, active_only: bool = True ): cache_key = f"datasets:{active_only}" if cached := _cache_get(cache_key): return cached try: headers = {"X-API-Key": OWNER_API_KEY} if OWNER_API_KEY else {} response = await http_client.get( f"{REAL_API_URL}/datasets", params={"active_only": active_only}, headers=headers, ) if response.status_code == 200: data = response.json() _cache_set(cache_key, data) return data else: logger.error(f"Backend error: {response.status_code}") raise HTTPException(status_code=response.status_code, detail="Backend error") except httpx.RequestError: logger.error("Backend request failed") raise HTTPException(status_code=503, detail="Backend unavailable") # ==================== Miner Rankings (Public Read) ==================== @app.get("/evaluations/rankings") @limiter.limit(f"{RATE_LIMIT_PER_MINUTE}/minute") async def miner_rankings( request: Request, limit: int = Query(default=100, ge=1, le=500) ): cache_key = f"rankings:{limit}" if cached := _cache_get(cache_key): return cached try: headers = {"X-API-Key": OWNER_API_KEY} if OWNER_API_KEY else {} response = await http_client.get( f"{REAL_API_URL}/evaluations/rankings", params={"limit": limit}, headers=headers, ) if response.status_code == 200: data = response.json() _cache_set(cache_key, data) return data else: logger.error(f"Backend error: {response.status_code}") raise HTTPException(status_code=response.status_code, detail="Backend error") except httpx.RequestError: logger.error("Backend request failed") raise HTTPException(status_code=503, detail="Backend unavailable") # ==================== Challenge Endpoint (Validator-only) ==================== @app.get("/challenge/{uid}") @limiter.limit("120/minute") async def get_challenge( request: Request, uid: int, ): """Fetch the current challenge spec for a miner UID. Returns a mapping of dataset names to row indices that the miner is evaluated on — **indices only, no text is fetched or returned here**. Callers load the actual text themselves directly from HuggingFace using these indices. Rate-limited to prevent bulk scraping of all miners' test sets. """ try: headers = {"X-API-Key": OWNER_API_KEY} if OWNER_API_KEY else {} response = await http_client.get( f"{REAL_API_URL}/challenge/{uid}", headers=headers, timeout=15.0, ) if response.status_code == 404: raise HTTPException(status_code=404, detail=f"No challenge for UID {uid}.") if response.status_code != 200: logger.error(f"Backend challenge error for UID {uid}: {response.status_code}") raise HTTPException(status_code=response.status_code, detail="Backend error") return response.json() except HTTPException: raise except httpx.RequestError: logger.error(f"Backend unreachable for /challenge/{uid}") raise HTTPException(status_code=503, detail="Backend unavailable") # ==================== Evaluation Submission (Validator-only) ==================== @app.post("/evaluations/submit") @limiter.limit("60/minute") async def submit_evaluations( request: Request, _hotkey: str = Depends(require_validator_auth), ): """Submit evaluation results for one or more miners. Authenticated via hotkey signature, then forwarded to the backend with the owner API key so the backend does not need to re-verify the signature. """ try: raw = await request.json() body = _EvalSubmission(**raw) except (ValidationError, Exception): raise HTTPException(status_code=422, detail="Invalid request body") payload = { "validator_hotkey": _hotkey, # from verified signature — not self-reported "evaluation_round": body.evaluation_round, "judge_model": body.judge_model, "results": [r.model_dump() for r in body.results], } try: headers = {"Content-Type": "application/json"} if OWNER_API_KEY: headers["X-API-Key"] = OWNER_API_KEY response = await http_client.post( f"{REAL_API_URL}/evaluations/submit", json=payload, headers=headers, timeout=30.0, ) if response.status_code not in (200, 201): logger.error(f"Backend submission error: {response.status_code}") raise HTTPException(status_code=response.status_code, detail="Backend error") return response.json() except HTTPException: raise except httpx.RequestError: logger.error("Backend unreachable for /evaluations/submit") raise HTTPException(status_code=503, detail="Backend unavailable") # ==================== Validator Weight Submission ==================== class _WeightSubmission(BaseModel): weights: Dict[str, float] netuid: Optional[int] = None block: Optional[int] = None submitted_at: Optional[str] = None @app.post("/weights/submit") @limiter.limit("60/minute") async def submit_weights( request: Request, _hotkey: str = Depends(require_validator_auth), ): """Validator submits the exact miner-weight vector it set on chain. Authenticated via hotkey signature; forwarded to backend with validator_hotkey set from the verified signature (not self-reported). """ try: raw = await request.json() body = _WeightSubmission(**raw) except Exception: raise HTTPException(status_code=422, detail="Invalid request body. Expected: {weights: {uid: float}}") payload = { "validator_hotkey": _hotkey, "weights": body.weights, "netuid": body.netuid, "block": body.block, "submitted_at": body.submitted_at, } try: headers = {"Content-Type": "application/json"} if OWNER_API_KEY: headers["X-API-Key"] = OWNER_API_KEY response = await http_client.post( f"{REAL_API_URL}/weights/submit", json=payload, headers=headers, timeout=15.0, ) if response.status_code not in (200, 201): logger.error(f"Backend weight submission error: {response.status_code}") raise HTTPException(status_code=response.status_code, detail=response.json().get("detail", "Backend error")) return response.json() except HTTPException: raise except httpx.RequestError: logger.error("Backend unreachable for /weights/submit") raise HTTPException(status_code=503, detail="Backend unavailable") # ==================== Catch-all: block everything else ==================== @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"]) async def catch_all(path: str, request: Request): raise HTTPException(status_code=404, detail="Not found.")