gate / app.py
evolai's picture
Upload app.py
903cc81 verified
Raw
History Blame Contribute Delete
14.4 kB
"""
Proxy API - Public-facing rate-limited API for EvolAI subnet
Designed for deployment on Hugging Face Spaces
"""
from fastapi import Depends, FastAPI, Header, HTTPException, Request, Query
from fastapi.middleware.cors import CORSMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, ValidationError
from contextlib import asynccontextmanager
import httpx
import os
import logging
import hashlib
import time
try:
from substrateinterface import Keypair as _Keypair
except ImportError as _substrate_err: # pragma: no cover
# Hard failure: substrate-interface is required for validator auth.
# Raise immediately so the container shows a clear build/startup error
# instead of silently accepting all requests and returning 401 forever.
raise RuntimeError(
"substrate-interface is required but not installed. "
"Check that the Rust toolchain is present in the Dockerfile so "
"py-sr25519-bindings compiles correctly. "
f"Original error: {_substrate_err}"
) from _substrate_err
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
REAL_API_URL = os.getenv("REAL_API_URL", "")
if not REAL_API_URL:
raise RuntimeError("REAL_API_URL environment variable is required")
RATE_LIMIT_PER_MINUTE = os.getenv("RATE_LIMIT_PER_MINUTE", "1000")
CACHE_TTL_SECONDS = int(os.getenv("CACHE_TTL_SECONDS", "10")) # TTL for read-only caches
# Owner API key, forwarded to backend when calling validator-restricted endpoints.
# Set this to the same value as OWNER_API_KEY in the manager deployment.
OWNER_API_KEY = os.getenv("OWNER_API_KEY", "")
# ==================== Validator Authentication ====================
def _verify_bittensor_signature(hotkey: str, message: str, signature_hex: str) -> bool:
"""Return True if *signature_hex* is a valid SR25519 signature of *message*
produced by *hotkey*.
"""
try:
sig_bytes = bytes.fromhex(signature_hex.removeprefix("0x"))
kp = _Keypair(ss58_address=hotkey)
return bool(kp.verify(message.encode(), sig_bytes))
except Exception:
return False
def _validate_validator_message(hotkey: str, message: str, ttl: int = 300) -> bool:
"""Return True if *message* has the correct format and is not older than *ttl* seconds.
Expected format: ``evolai_validator:{hotkey}:{unix_timestamp}``
"""
try:
prefix, msg_hotkey, ts_str = message.split(":")
if prefix != "evolai_validator":
return False
if msg_hotkey != hotkey:
return False
age = time.time() - int(ts_str)
return 0 <= age <= ttl
except Exception:
return False
async def require_validator_auth(
request: Request,
hotkey: str = Header(..., alias="X-Validator-Hotkey"),
message: str = Header(..., alias="X-Validator-Message"),
signature: str = Header(..., alias="X-Validator-Signature"),
) -> str:
"""FastAPI dependency that enforces Bittensor hotkey signature authentication.
Callers must include three headers:
- ``X-Validator-Hotkey``: SS58-encoded hotkey address
- ``X-Validator-Message``: ``evolai_validator:{hotkey}:{unix_timestamp}``
- ``X-Validator-Signature``: hex-encoded SR25519 signature of the message
"""
if not _validate_validator_message(hotkey, message):
raise HTTPException(status_code=401, detail="Invalid or expired validator message.")
if not _verify_bittensor_signature(hotkey, message, signature):
raise HTTPException(status_code=401, detail="Validator signature verification failed.")
return hotkey
# ==================== Request models (proxy validates before forwarding) ====================
class _EvalResult(BaseModel):
miner_uid: int
miner_hotkey: str
raw_score: float
effective_score: float
dataset_distribution: Optional[Dict[str, Any]] = {}
class _EvalSubmission(BaseModel):
evaluation_round: int
judge_model: str
results: List[_EvalResult]
# Simple in-process TTL cache for read endpoints
_cache: Dict[str, tuple] = {} # key -> (data, expires_at)
_CACHE_MAX_ENTRIES = 256 # hard cap to prevent unbounded memory growth
def _cache_get(key: str):
entry = _cache.get(key)
if entry and time.monotonic() < entry[1]:
return entry[0]
return None
def _cache_set(key: str, data, ttl: int = CACHE_TTL_SECONDS):
# Evict expired entries first; if still over cap, drop oldest.
now = time.monotonic()
expired = [k for k, (_, exp) in _cache.items() if exp <= now]
for k in expired:
del _cache[k]
if len(_cache) >= _CACHE_MAX_ENTRIES:
# Drop the entry that expires soonest (oldest effective entry)
oldest = min(_cache, key=lambda k: _cache[k][1])
del _cache[oldest]
_cache[key] = (data, now + ttl)
def get_rate_limit_key(request: Request) -> str:
ip = get_remote_address(request)
fingerprint = request.headers.get("X-Client-Fingerprint", "")
if not fingerprint:
user_agent = request.headers.get("User-Agent", "")
fingerprint = hashlib.md5(user_agent.encode()).hexdigest()[:8]
return f"{ip}:{fingerprint}"
# Rate limiter with composite key (IP + fingerprint)
limiter = Limiter(key_func=get_rate_limit_key)
# HTTP client — pool capped to prevent connection exhaustion under load
http_client: httpx.AsyncClient = None # initialised in lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
global http_client
http_client = httpx.AsyncClient(
timeout=30.0,
limits=httpx.Limits(
max_connections=100,
max_keepalive_connections=20,
keepalive_expiry=30,
),
)
yield
await http_client.aclose()
# FastAPI app
app = FastAPI(
docs_url=None,
redoc_url=None,
openapi_url=None,
lifespan=lifespan,
)
# Add rate limit exception handler
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# CORS — credentials must be False when origins is wildcard
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["GET", "POST"],
allow_headers=[
"Content-Type",
"X-Client-Fingerprint",
"X-Validator-Hotkey",
"X-Validator-Message",
"X-Validator-Signature",
],
)
# ==================== Dataset Endpoints (Public Read) ====================
@app.get("/datasets")
@limiter.limit(f"{RATE_LIMIT_PER_MINUTE}/minute")
async def list_datasets(
request: Request,
active_only: bool = True
):
cache_key = f"datasets:{active_only}"
if cached := _cache_get(cache_key):
return cached
try:
headers = {"X-API-Key": OWNER_API_KEY} if OWNER_API_KEY else {}
response = await http_client.get(
f"{REAL_API_URL}/datasets",
params={"active_only": active_only},
headers=headers,
)
if response.status_code == 200:
data = response.json()
_cache_set(cache_key, data)
return data
else:
logger.error(f"Backend error: {response.status_code}")
raise HTTPException(status_code=response.status_code, detail="Backend error")
except httpx.RequestError:
logger.error("Backend request failed")
raise HTTPException(status_code=503, detail="Backend unavailable")
# ==================== Miner Rankings (Public Read) ====================
@app.get("/evaluations/rankings")
@limiter.limit(f"{RATE_LIMIT_PER_MINUTE}/minute")
async def miner_rankings(
request: Request,
limit: int = Query(default=100, ge=1, le=500)
):
cache_key = f"rankings:{limit}"
if cached := _cache_get(cache_key):
return cached
try:
headers = {"X-API-Key": OWNER_API_KEY} if OWNER_API_KEY else {}
response = await http_client.get(
f"{REAL_API_URL}/evaluations/rankings",
params={"limit": limit},
headers=headers,
)
if response.status_code == 200:
data = response.json()
_cache_set(cache_key, data)
return data
else:
logger.error(f"Backend error: {response.status_code}")
raise HTTPException(status_code=response.status_code, detail="Backend error")
except httpx.RequestError:
logger.error("Backend request failed")
raise HTTPException(status_code=503, detail="Backend unavailable")
# ==================== Challenge Endpoint (Validator-only) ====================
@app.get("/challenge/{uid}")
@limiter.limit("120/minute")
async def get_challenge(
request: Request,
uid: int,
):
"""Fetch the current challenge spec for a miner UID.
Returns a mapping of dataset names to row indices that the miner is
evaluated on — **indices only, no text is fetched or returned here**.
Callers load the actual text themselves directly from HuggingFace using
these indices.
Rate-limited to prevent bulk scraping of all miners' test sets.
"""
try:
headers = {"X-API-Key": OWNER_API_KEY} if OWNER_API_KEY else {}
response = await http_client.get(
f"{REAL_API_URL}/challenge/{uid}",
headers=headers,
timeout=15.0,
)
if response.status_code == 404:
raise HTTPException(status_code=404, detail=f"No challenge for UID {uid}.")
if response.status_code != 200:
logger.error(f"Backend challenge error for UID {uid}: {response.status_code}")
raise HTTPException(status_code=response.status_code, detail="Backend error")
return response.json()
except HTTPException:
raise
except httpx.RequestError:
logger.error(f"Backend unreachable for /challenge/{uid}")
raise HTTPException(status_code=503, detail="Backend unavailable")
# ==================== Evaluation Submission (Validator-only) ====================
@app.post("/evaluations/submit")
@limiter.limit("60/minute")
async def submit_evaluations(
request: Request,
_hotkey: str = Depends(require_validator_auth),
):
"""Submit evaluation results for one or more miners.
Authenticated via hotkey signature, then forwarded to the backend with
the owner API key so the backend does not need to re-verify the signature.
"""
try:
raw = await request.json()
body = _EvalSubmission(**raw)
except (ValidationError, Exception):
raise HTTPException(status_code=422, detail="Invalid request body")
payload = {
"validator_hotkey": _hotkey, # from verified signature — not self-reported
"evaluation_round": body.evaluation_round,
"judge_model": body.judge_model,
"results": [r.model_dump() for r in body.results],
}
try:
headers = {"Content-Type": "application/json"}
if OWNER_API_KEY:
headers["X-API-Key"] = OWNER_API_KEY
response = await http_client.post(
f"{REAL_API_URL}/evaluations/submit",
json=payload,
headers=headers,
timeout=30.0,
)
if response.status_code not in (200, 201):
logger.error(f"Backend submission error: {response.status_code}")
raise HTTPException(status_code=response.status_code, detail="Backend error")
return response.json()
except HTTPException:
raise
except httpx.RequestError:
logger.error("Backend unreachable for /evaluations/submit")
raise HTTPException(status_code=503, detail="Backend unavailable")
# ==================== Validator Weight Submission ====================
class _WeightSubmission(BaseModel):
weights: Dict[str, float]
netuid: Optional[int] = None
block: Optional[int] = None
submitted_at: Optional[str] = None
@app.post("/weights/submit")
@limiter.limit("60/minute")
async def submit_weights(
request: Request,
_hotkey: str = Depends(require_validator_auth),
):
"""Validator submits the exact miner-weight vector it set on chain.
Authenticated via hotkey signature; forwarded to backend with
validator_hotkey set from the verified signature (not self-reported).
"""
try:
raw = await request.json()
body = _WeightSubmission(**raw)
except Exception:
raise HTTPException(status_code=422, detail="Invalid request body. Expected: {weights: {uid: float}}")
payload = {
"validator_hotkey": _hotkey,
"weights": body.weights,
"netuid": body.netuid,
"block": body.block,
"submitted_at": body.submitted_at,
}
try:
headers = {"Content-Type": "application/json"}
if OWNER_API_KEY:
headers["X-API-Key"] = OWNER_API_KEY
response = await http_client.post(
f"{REAL_API_URL}/weights/submit",
json=payload,
headers=headers,
timeout=15.0,
)
if response.status_code not in (200, 201):
logger.error(f"Backend weight submission error: {response.status_code}")
raise HTTPException(status_code=response.status_code, detail=response.json().get("detail", "Backend error"))
return response.json()
except HTTPException:
raise
except httpx.RequestError:
logger.error("Backend unreachable for /weights/submit")
raise HTTPException(status_code=503, detail="Backend unavailable")
# ==================== Catch-all: block everything else ====================
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def catch_all(path: str, request: Request):
raise HTTPException(status_code=404, detail="Not found.")