| """
|
| Proxy API - Public-facing rate-limited API for EvolAI subnet
|
| Designed for deployment on Hugging Face Spaces
|
| """
|
|
|
| from fastapi import Depends, FastAPI, Header, HTTPException, Request, Query
|
| from fastapi.middleware.cors import CORSMiddleware
|
| from slowapi import Limiter, _rate_limit_exceeded_handler
|
| from slowapi.util import get_remote_address
|
| from slowapi.errors import RateLimitExceeded
|
| from typing import Any, Dict, List, Optional
|
| from pydantic import BaseModel, ValidationError
|
| from contextlib import asynccontextmanager
|
| import httpx
|
| import os
|
| import logging
|
| import hashlib
|
| import time
|
|
|
| try:
|
| from substrateinterface import Keypair as _Keypair
|
| except ImportError as _substrate_err:
|
|
|
|
|
|
|
| raise RuntimeError(
|
| "substrate-interface is required but not installed. "
|
| "Check that the Rust toolchain is present in the Dockerfile so "
|
| "py-sr25519-bindings compiles correctly. "
|
| f"Original error: {_substrate_err}"
|
| ) from _substrate_err
|
|
|
|
|
| logging.basicConfig(
|
| level=logging.INFO,
|
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
| )
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| REAL_API_URL = os.getenv("REAL_API_URL", "")
|
| if not REAL_API_URL:
|
| raise RuntimeError("REAL_API_URL environment variable is required")
|
| RATE_LIMIT_PER_MINUTE = os.getenv("RATE_LIMIT_PER_MINUTE", "1000")
|
| CACHE_TTL_SECONDS = int(os.getenv("CACHE_TTL_SECONDS", "10"))
|
|
|
|
|
| OWNER_API_KEY = os.getenv("OWNER_API_KEY", "")
|
|
|
|
|
|
|
| def _verify_bittensor_signature(hotkey: str, message: str, signature_hex: str) -> bool:
|
| """Return True if *signature_hex* is a valid SR25519 signature of *message*
|
| produced by *hotkey*.
|
| """
|
| try:
|
| sig_bytes = bytes.fromhex(signature_hex.removeprefix("0x"))
|
| kp = _Keypair(ss58_address=hotkey)
|
| return bool(kp.verify(message.encode(), sig_bytes))
|
| except Exception:
|
| return False
|
|
|
|
|
| def _validate_validator_message(hotkey: str, message: str, ttl: int = 300) -> bool:
|
| """Return True if *message* has the correct format and is not older than *ttl* seconds.
|
|
|
| Expected format: ``evolai_validator:{hotkey}:{unix_timestamp}``
|
| """
|
| try:
|
| prefix, msg_hotkey, ts_str = message.split(":")
|
| if prefix != "evolai_validator":
|
| return False
|
| if msg_hotkey != hotkey:
|
| return False
|
| age = time.time() - int(ts_str)
|
| return 0 <= age <= ttl
|
| except Exception:
|
| return False
|
|
|
|
|
| async def require_validator_auth(
|
| request: Request,
|
| hotkey: str = Header(..., alias="X-Validator-Hotkey"),
|
| message: str = Header(..., alias="X-Validator-Message"),
|
| signature: str = Header(..., alias="X-Validator-Signature"),
|
| ) -> str:
|
| """FastAPI dependency that enforces Bittensor hotkey signature authentication.
|
|
|
| Callers must include three headers:
|
| - ``X-Validator-Hotkey``: SS58-encoded hotkey address
|
| - ``X-Validator-Message``: ``evolai_validator:{hotkey}:{unix_timestamp}``
|
| - ``X-Validator-Signature``: hex-encoded SR25519 signature of the message
|
| """
|
| if not _validate_validator_message(hotkey, message):
|
| raise HTTPException(status_code=401, detail="Invalid or expired validator message.")
|
| if not _verify_bittensor_signature(hotkey, message, signature):
|
| raise HTTPException(status_code=401, detail="Validator signature verification failed.")
|
| return hotkey
|
|
|
|
|
|
|
|
|
| class _EvalResult(BaseModel):
|
| miner_uid: int
|
| miner_hotkey: str
|
| raw_score: float
|
| effective_score: float
|
| dataset_distribution: Optional[Dict[str, Any]] = {}
|
|
|
|
|
| class _EvalSubmission(BaseModel):
|
| evaluation_round: int
|
| judge_model: str
|
| results: List[_EvalResult]
|
|
|
|
|
|
|
| _cache: Dict[str, tuple] = {}
|
| _CACHE_MAX_ENTRIES = 256
|
|
|
| def _cache_get(key: str):
|
| entry = _cache.get(key)
|
| if entry and time.monotonic() < entry[1]:
|
| return entry[0]
|
| return None
|
|
|
| def _cache_set(key: str, data, ttl: int = CACHE_TTL_SECONDS):
|
|
|
| now = time.monotonic()
|
| expired = [k for k, (_, exp) in _cache.items() if exp <= now]
|
| for k in expired:
|
| del _cache[k]
|
| if len(_cache) >= _CACHE_MAX_ENTRIES:
|
|
|
| oldest = min(_cache, key=lambda k: _cache[k][1])
|
| del _cache[oldest]
|
| _cache[key] = (data, now + ttl)
|
|
|
|
|
| def get_rate_limit_key(request: Request) -> str:
|
| ip = get_remote_address(request)
|
| fingerprint = request.headers.get("X-Client-Fingerprint", "")
|
| if not fingerprint:
|
| user_agent = request.headers.get("User-Agent", "")
|
| fingerprint = hashlib.md5(user_agent.encode()).hexdigest()[:8]
|
| return f"{ip}:{fingerprint}"
|
|
|
|
|
|
|
| limiter = Limiter(key_func=get_rate_limit_key)
|
|
|
|
|
| http_client: httpx.AsyncClient = None
|
|
|
| @asynccontextmanager
|
| async def lifespan(app: FastAPI):
|
| global http_client
|
| http_client = httpx.AsyncClient(
|
| timeout=30.0,
|
| limits=httpx.Limits(
|
| max_connections=100,
|
| max_keepalive_connections=20,
|
| keepalive_expiry=30,
|
| ),
|
| )
|
| yield
|
| await http_client.aclose()
|
|
|
|
|
| app = FastAPI(
|
| docs_url=None,
|
| redoc_url=None,
|
| openapi_url=None,
|
| lifespan=lifespan,
|
| )
|
|
|
|
|
| app.state.limiter = limiter
|
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
|
|
|
|
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"],
|
| allow_credentials=False,
|
| allow_methods=["GET", "POST"],
|
| allow_headers=[
|
| "Content-Type",
|
| "X-Client-Fingerprint",
|
| "X-Validator-Hotkey",
|
| "X-Validator-Message",
|
| "X-Validator-Signature",
|
| ],
|
| )
|
|
|
|
|
|
|
|
|
| @app.get("/datasets")
|
| @limiter.limit(f"{RATE_LIMIT_PER_MINUTE}/minute")
|
| async def list_datasets(
|
| request: Request,
|
| active_only: bool = True
|
| ):
|
| cache_key = f"datasets:{active_only}"
|
| if cached := _cache_get(cache_key):
|
| return cached
|
| try:
|
| headers = {"X-API-Key": OWNER_API_KEY} if OWNER_API_KEY else {}
|
| response = await http_client.get(
|
| f"{REAL_API_URL}/datasets",
|
| params={"active_only": active_only},
|
| headers=headers,
|
| )
|
| if response.status_code == 200:
|
| data = response.json()
|
| _cache_set(cache_key, data)
|
| return data
|
| else:
|
| logger.error(f"Backend error: {response.status_code}")
|
| raise HTTPException(status_code=response.status_code, detail="Backend error")
|
| except httpx.RequestError:
|
| logger.error("Backend request failed")
|
| raise HTTPException(status_code=503, detail="Backend unavailable")
|
|
|
|
|
|
|
|
|
| @app.get("/evaluations/rankings")
|
| @limiter.limit(f"{RATE_LIMIT_PER_MINUTE}/minute")
|
| async def miner_rankings(
|
| request: Request,
|
| limit: int = Query(default=100, ge=1, le=500)
|
| ):
|
| cache_key = f"rankings:{limit}"
|
| if cached := _cache_get(cache_key):
|
| return cached
|
| try:
|
| headers = {"X-API-Key": OWNER_API_KEY} if OWNER_API_KEY else {}
|
| response = await http_client.get(
|
| f"{REAL_API_URL}/evaluations/rankings",
|
| params={"limit": limit},
|
| headers=headers,
|
| )
|
| if response.status_code == 200:
|
| data = response.json()
|
| _cache_set(cache_key, data)
|
| return data
|
| else:
|
| logger.error(f"Backend error: {response.status_code}")
|
| raise HTTPException(status_code=response.status_code, detail="Backend error")
|
| except httpx.RequestError:
|
| logger.error("Backend request failed")
|
| raise HTTPException(status_code=503, detail="Backend unavailable")
|
|
|
|
|
|
|
|
|
| @app.get("/challenge/{uid}")
|
| @limiter.limit("120/minute")
|
| async def get_challenge(
|
| request: Request,
|
| uid: int,
|
| ):
|
| """Fetch the current challenge spec for a miner UID.
|
|
|
| Returns a mapping of dataset names to row indices that the miner is
|
| evaluated on — **indices only, no text is fetched or returned here**.
|
| Callers load the actual text themselves directly from HuggingFace using
|
| these indices.
|
|
|
| Rate-limited to prevent bulk scraping of all miners' test sets.
|
| """
|
| try:
|
| headers = {"X-API-Key": OWNER_API_KEY} if OWNER_API_KEY else {}
|
| response = await http_client.get(
|
| f"{REAL_API_URL}/challenge/{uid}",
|
| headers=headers,
|
| timeout=15.0,
|
| )
|
| if response.status_code == 404:
|
| raise HTTPException(status_code=404, detail=f"No challenge for UID {uid}.")
|
| if response.status_code != 200:
|
| logger.error(f"Backend challenge error for UID {uid}: {response.status_code}")
|
| raise HTTPException(status_code=response.status_code, detail="Backend error")
|
| return response.json()
|
| except HTTPException:
|
| raise
|
| except httpx.RequestError:
|
| logger.error(f"Backend unreachable for /challenge/{uid}")
|
| raise HTTPException(status_code=503, detail="Backend unavailable")
|
|
|
|
|
|
|
|
|
| @app.post("/evaluations/submit")
|
| @limiter.limit("60/minute")
|
| async def submit_evaluations(
|
| request: Request,
|
| _hotkey: str = Depends(require_validator_auth),
|
| ):
|
| """Submit evaluation results for one or more miners.
|
|
|
| Authenticated via hotkey signature, then forwarded to the backend with
|
| the owner API key so the backend does not need to re-verify the signature.
|
| """
|
| try:
|
| raw = await request.json()
|
| body = _EvalSubmission(**raw)
|
| except (ValidationError, Exception):
|
| raise HTTPException(status_code=422, detail="Invalid request body")
|
| payload = {
|
| "validator_hotkey": _hotkey,
|
| "evaluation_round": body.evaluation_round,
|
| "judge_model": body.judge_model,
|
| "results": [r.model_dump() for r in body.results],
|
| }
|
| try:
|
| headers = {"Content-Type": "application/json"}
|
| if OWNER_API_KEY:
|
| headers["X-API-Key"] = OWNER_API_KEY
|
| response = await http_client.post(
|
| f"{REAL_API_URL}/evaluations/submit",
|
| json=payload,
|
| headers=headers,
|
| timeout=30.0,
|
| )
|
| if response.status_code not in (200, 201):
|
| logger.error(f"Backend submission error: {response.status_code}")
|
| raise HTTPException(status_code=response.status_code, detail="Backend error")
|
| return response.json()
|
| except HTTPException:
|
| raise
|
| except httpx.RequestError:
|
| logger.error("Backend unreachable for /evaluations/submit")
|
| raise HTTPException(status_code=503, detail="Backend unavailable")
|
|
|
|
|
|
|
|
|
| class _WeightSubmission(BaseModel):
|
| weights: Dict[str, float]
|
| netuid: Optional[int] = None
|
| block: Optional[int] = None
|
| submitted_at: Optional[str] = None
|
|
|
|
|
| @app.post("/weights/submit")
|
| @limiter.limit("60/minute")
|
| async def submit_weights(
|
| request: Request,
|
| _hotkey: str = Depends(require_validator_auth),
|
| ):
|
| """Validator submits the exact miner-weight vector it set on chain.
|
|
|
| Authenticated via hotkey signature; forwarded to backend with
|
| validator_hotkey set from the verified signature (not self-reported).
|
| """
|
| try:
|
| raw = await request.json()
|
| body = _WeightSubmission(**raw)
|
| except Exception:
|
| raise HTTPException(status_code=422, detail="Invalid request body. Expected: {weights: {uid: float}}")
|
|
|
| payload = {
|
| "validator_hotkey": _hotkey,
|
| "weights": body.weights,
|
| "netuid": body.netuid,
|
| "block": body.block,
|
| "submitted_at": body.submitted_at,
|
| }
|
| try:
|
| headers = {"Content-Type": "application/json"}
|
| if OWNER_API_KEY:
|
| headers["X-API-Key"] = OWNER_API_KEY
|
| response = await http_client.post(
|
| f"{REAL_API_URL}/weights/submit",
|
| json=payload,
|
| headers=headers,
|
| timeout=15.0,
|
| )
|
| if response.status_code not in (200, 201):
|
| logger.error(f"Backend weight submission error: {response.status_code}")
|
| raise HTTPException(status_code=response.status_code, detail=response.json().get("detail", "Backend error"))
|
| return response.json()
|
| except HTTPException:
|
| raise
|
| except httpx.RequestError:
|
| logger.error("Backend unreachable for /weights/submit")
|
| raise HTTPException(status_code=503, detail="Backend unavailable")
|
|
|
|
|
|
|
|
|
| @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
|
| async def catch_all(path: str, request: Request):
|
| raise HTTPException(status_code=404, detail="Not found.")
|
|
|