|
|
import os, time, socket, asyncio, logging, signal, gc, threading |
|
|
from typing import Tuple |
|
|
|
|
|
from aiohttp import web |
|
|
import bittensor as bt |
|
|
|
|
|
from scorevision.utils.settings import get_settings |
|
|
|
|
|
logger = logging.getLogger("sv-signer") |
|
|
|
|
|
NETUID = int(os.getenv("SCOREVISION_NETUID", "44")) |
|
|
MECHID = 1 |
|
|
|
|
|
|
|
|
shutdown_event = asyncio.Event() |
|
|
|
|
|
_ASYNC_SUBTENSOR: bt.AsyncSubtensor | None = None |
|
|
_ASYNC_SUBTENSOR_LOCK = asyncio.Lock() |
|
|
_SYNC_SUBTENSOR: bt.Subtensor | None = None |
|
|
_SYNC_SUBTENSOR_LOCK = threading.Lock() |
|
|
|
|
|
|
|
|
async def get_subtensor(): |
|
|
global _ASYNC_SUBTENSOR |
|
|
async with _ASYNC_SUBTENSOR_LOCK: |
|
|
if _ASYNC_SUBTENSOR is not None: |
|
|
return _ASYNC_SUBTENSOR |
|
|
settings = get_settings() |
|
|
ep = settings.BITTENSOR_SUBTENSOR_ENDPOINT |
|
|
fb = settings.BITTENSOR_SUBTENSOR_FALLBACK |
|
|
for endpoint in (ep, fb): |
|
|
try: |
|
|
st = bt.async_subtensor(endpoint) |
|
|
await st.initialize() |
|
|
_ASYNC_SUBTENSOR = st |
|
|
if endpoint != ep: |
|
|
logger.warning("Subtensor init fell back to %s", endpoint) |
|
|
break |
|
|
except Exception as e: |
|
|
logger.warning("Subtensor init failed for %s: %s", endpoint, e) |
|
|
continue |
|
|
if _ASYNC_SUBTENSOR is None: |
|
|
raise RuntimeError("Unable to initialize async subtensor") |
|
|
return _ASYNC_SUBTENSOR |
|
|
|
|
|
|
|
|
def _get_sync_subtensor() -> bt.Subtensor: |
|
|
global _SYNC_SUBTENSOR |
|
|
with _SYNC_SUBTENSOR_LOCK: |
|
|
if _SYNC_SUBTENSOR is not None: |
|
|
return _SYNC_SUBTENSOR |
|
|
settings = get_settings() |
|
|
ep = settings.BITTENSOR_SUBTENSOR_ENDPOINT |
|
|
fb = settings.BITTENSOR_SUBTENSOR_FALLBACK |
|
|
for endpoint in (ep, fb): |
|
|
try: |
|
|
st = bt.subtensor(endpoint) |
|
|
_SYNC_SUBTENSOR = st |
|
|
if endpoint != ep: |
|
|
logger.warning("Sync subtensor init fell back to %s", endpoint) |
|
|
break |
|
|
except Exception as e: |
|
|
logger.warning("Sync subtensor init failed for %s: %s", endpoint, e) |
|
|
continue |
|
|
if _SYNC_SUBTENSOR is None: |
|
|
raise RuntimeError("Unable to initialize sync subtensor") |
|
|
return _SYNC_SUBTENSOR |
|
|
|
|
|
|
|
|
async def _reset_async_subtensor(): |
|
|
global _ASYNC_SUBTENSOR |
|
|
async with _ASYNC_SUBTENSOR_LOCK: |
|
|
if _ASYNC_SUBTENSOR is not None: |
|
|
try: |
|
|
await _ASYNC_SUBTENSOR.close() |
|
|
except Exception: |
|
|
pass |
|
|
_ASYNC_SUBTENSOR = None |
|
|
|
|
|
|
|
|
def _reset_sync_subtensor(): |
|
|
global _SYNC_SUBTENSOR |
|
|
with _SYNC_SUBTENSOR_LOCK: |
|
|
if _SYNC_SUBTENSOR is not None: |
|
|
try: |
|
|
_SYNC_SUBTENSOR.close() |
|
|
except Exception: |
|
|
pass |
|
|
_SYNC_SUBTENSOR = None |
|
|
|
|
|
|
|
|
def _set_weights( |
|
|
*, |
|
|
wallet: "bt.wallet", |
|
|
netuid: int, |
|
|
mechid: int, |
|
|
uids: list[int], |
|
|
weights: list[float], |
|
|
wait_for_inclusion: bool, |
|
|
wait_for_finalization: bool, |
|
|
log_prefix: str = "[signer]", |
|
|
) -> bool: |
|
|
""" """ |
|
|
try: |
|
|
st = _get_sync_subtensor() |
|
|
except Exception: |
|
|
_reset_sync_subtensor() |
|
|
st = _get_sync_subtensor() |
|
|
|
|
|
try: |
|
|
success, message = st.set_weights( |
|
|
wallet=wallet, |
|
|
netuid=netuid, |
|
|
mechid=mechid, |
|
|
uids=uids, |
|
|
weights=weights, |
|
|
wait_for_inclusion=wait_for_inclusion, |
|
|
wait_for_finalization=wait_for_finalization, |
|
|
) |
|
|
msg_str = str(message or "") |
|
|
except Exception as e: |
|
|
msg_str = f"{type(e).__name__}: {e}" |
|
|
if "SettingWeightsTooFast" in msg_str: |
|
|
logger.error( |
|
|
f"{log_prefix} SettingWeightsTooFast (exception) → treating as success." |
|
|
) |
|
|
return True |
|
|
logger.warning(f"{log_prefix} set_weights exception: {msg_str}") |
|
|
return False |
|
|
|
|
|
if success: |
|
|
logger.info(f"{log_prefix} set_weights success.") |
|
|
return True |
|
|
|
|
|
if "SettingWeightsTooFast" in msg_str: |
|
|
logger.error( |
|
|
f"{log_prefix} SettingWeightsTooFast (return) → treating as success." |
|
|
) |
|
|
return True |
|
|
|
|
|
logger.warning(f"{log_prefix} set_weights failed: {msg_str or 'unknown error'}") |
|
|
return False |
|
|
|
|
|
|
|
|
async def run_signer() -> None: |
|
|
settings = get_settings() |
|
|
host = settings.SIGNER_HOST |
|
|
port = settings.SIGNER_PORT |
|
|
|
|
|
def signal_handler(): |
|
|
logger.info("Received shutdown signal, stopping signer...") |
|
|
shutdown_event.set() |
|
|
|
|
|
for sig in (signal.SIGTERM, signal.SIGINT): |
|
|
signal.signal(sig, lambda s, f: signal_handler()) |
|
|
|
|
|
cold = settings.BITTENSOR_WALLET_COLD |
|
|
hot = settings.BITTENSOR_WALLET_HOT |
|
|
wallet = bt.wallet(name=cold, hotkey=hot) |
|
|
|
|
|
@web.middleware |
|
|
async def access_log(request: web.Request, handler): |
|
|
t0 = time.monotonic() |
|
|
try: |
|
|
resp = await handler(request) |
|
|
return resp |
|
|
finally: |
|
|
dt = (time.monotonic() - t0) * 1000 |
|
|
logger.info( |
|
|
"[signer] %s %s -> %s %.1fms", |
|
|
request.method, |
|
|
request.path, |
|
|
getattr(getattr(request, "response", None), "status", "?"), |
|
|
dt, |
|
|
) |
|
|
|
|
|
async def health(_req: web.Request): |
|
|
return web.json_response({"ok": True}) |
|
|
|
|
|
async def sign_handler(req: web.Request): |
|
|
try: |
|
|
payload = await req.json() |
|
|
data = payload.get("payloads") or payload.get("data") or [] |
|
|
if isinstance(data, str): |
|
|
data = [data] |
|
|
sigs = [(wallet.hotkey.sign(data=d.encode("utf-8"))).hex() for d in data] |
|
|
return web.json_response( |
|
|
{ |
|
|
"success": True, |
|
|
"signatures": sigs, |
|
|
"hotkey": wallet.hotkey.ss58_address, |
|
|
} |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error("[sign] error: %s", e) |
|
|
return web.json_response({"success": False, "error": str(e)}, status=500) |
|
|
|
|
|
async def set_weights_handler(req: web.Request): |
|
|
try: |
|
|
payload = await req.json() |
|
|
netuid = int(payload.get("netuid", NETUID)) |
|
|
default_mechid = getattr(settings, "SCOREVISION_MECHID", MECHID) |
|
|
mechid = int(payload.get("mechid", default_mechid)) |
|
|
uids = payload.get("uids") or [] |
|
|
wgts = payload.get("weights") or [] |
|
|
wfi = bool(payload.get("wait_for_inclusion", False)) |
|
|
wff = bool(payload.get("wait_for_finalization", False)) |
|
|
|
|
|
if isinstance(uids, int): |
|
|
uids = [uids] |
|
|
if isinstance(wgts, (int, float, str)): |
|
|
try: |
|
|
wgts = [float(wgts)] |
|
|
except Exception: |
|
|
wgts = [0.0] |
|
|
if not isinstance(uids, list): |
|
|
uids = list(uids) |
|
|
if not isinstance(wgts, list): |
|
|
wgts = list(wgts) |
|
|
try: |
|
|
uids = [int(u) for u in uids] |
|
|
except Exception: |
|
|
uids = [] |
|
|
try: |
|
|
wgts = [float(w) for w in wgts] |
|
|
except Exception: |
|
|
wgts = [] |
|
|
|
|
|
if len(uids) != len(wgts) or not uids: |
|
|
return web.json_response( |
|
|
{"success": False, "error": "uids/weights mismatch or empty"}, |
|
|
status=400, |
|
|
) |
|
|
|
|
|
ok = _set_weights( |
|
|
wallet=wallet, |
|
|
netuid=netuid, |
|
|
mechid=mechid, |
|
|
uids=uids, |
|
|
weights=wgts, |
|
|
wait_for_inclusion=wfi, |
|
|
wait_for_finalization=wff, |
|
|
log_prefix="[signer]", |
|
|
) |
|
|
return web.json_response( |
|
|
( |
|
|
{"success": True} |
|
|
if ok |
|
|
else {"success": False, "error": "set_weights failed"} |
|
|
), |
|
|
status=200 if ok else 500, |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error("[set_weights] error: %s", e) |
|
|
return web.json_response({"success": False, "error": str(e)}, status=500) |
|
|
finally: |
|
|
gc.collect() |
|
|
|
|
|
app = web.Application(middlewares=[access_log]) |
|
|
app.add_routes( |
|
|
[ |
|
|
web.get("/healthz", health), |
|
|
web.post("/sign", sign_handler), |
|
|
web.post("/set_weights", set_weights_handler), |
|
|
] |
|
|
) |
|
|
runner = web.AppRunner(app) |
|
|
await runner.setup() |
|
|
site = web.TCPSite(runner, host=host, port=port) |
|
|
await site.start() |
|
|
|
|
|
try: |
|
|
hn = socket.gethostname() |
|
|
ip = socket.gethostbyname(hn) |
|
|
except Exception: |
|
|
hn, ip = ("?", "?") |
|
|
logger.info( |
|
|
"Signer listening on http://%s:%s hostname=%s ip=%s", host, port, hn, ip |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
await shutdown_event.wait() |
|
|
except asyncio.CancelledError: |
|
|
pass |
|
|
finally: |
|
|
logger.info("Shutting down signer...") |
|
|
await runner.cleanup() |
|
|
await _reset_async_subtensor() |
|
|
_reset_sync_subtensor() |
|
|
gc.collect() |
|
|
|