ultravision-01 / cli /signer_api.py
mohantesting's picture
Add files using upload-large-folder tool
f60a6c1 verified
import os, time, socket, asyncio, logging, signal, gc, threading
from typing import Tuple
from aiohttp import web
import bittensor as bt
from scorevision.utils.settings import get_settings
logger = logging.getLogger("sv-signer")
NETUID = int(os.getenv("SCOREVISION_NETUID", "44"))
MECHID = 1
# Global shutdown event
shutdown_event = asyncio.Event()
_ASYNC_SUBTENSOR: bt.AsyncSubtensor | None = None
_ASYNC_SUBTENSOR_LOCK = asyncio.Lock()
_SYNC_SUBTENSOR: bt.Subtensor | None = None
_SYNC_SUBTENSOR_LOCK = threading.Lock()
async def get_subtensor():
global _ASYNC_SUBTENSOR
async with _ASYNC_SUBTENSOR_LOCK:
if _ASYNC_SUBTENSOR is not None:
return _ASYNC_SUBTENSOR
settings = get_settings()
ep = settings.BITTENSOR_SUBTENSOR_ENDPOINT
fb = settings.BITTENSOR_SUBTENSOR_FALLBACK
for endpoint in (ep, fb):
try:
st = bt.async_subtensor(endpoint)
await st.initialize()
_ASYNC_SUBTENSOR = st
if endpoint != ep:
logger.warning("Subtensor init fell back to %s", endpoint)
break
except Exception as e:
logger.warning("Subtensor init failed for %s: %s", endpoint, e)
continue
if _ASYNC_SUBTENSOR is None:
raise RuntimeError("Unable to initialize async subtensor")
return _ASYNC_SUBTENSOR
def _get_sync_subtensor() -> bt.Subtensor:
global _SYNC_SUBTENSOR
with _SYNC_SUBTENSOR_LOCK:
if _SYNC_SUBTENSOR is not None:
return _SYNC_SUBTENSOR
settings = get_settings()
ep = settings.BITTENSOR_SUBTENSOR_ENDPOINT
fb = settings.BITTENSOR_SUBTENSOR_FALLBACK
for endpoint in (ep, fb):
try:
st = bt.subtensor(endpoint)
_SYNC_SUBTENSOR = st
if endpoint != ep:
logger.warning("Sync subtensor init fell back to %s", endpoint)
break
except Exception as e:
logger.warning("Sync subtensor init failed for %s: %s", endpoint, e)
continue
if _SYNC_SUBTENSOR is None:
raise RuntimeError("Unable to initialize sync subtensor")
return _SYNC_SUBTENSOR
async def _reset_async_subtensor():
global _ASYNC_SUBTENSOR
async with _ASYNC_SUBTENSOR_LOCK:
if _ASYNC_SUBTENSOR is not None:
try:
await _ASYNC_SUBTENSOR.close()
except Exception:
pass
_ASYNC_SUBTENSOR = None
def _reset_sync_subtensor():
global _SYNC_SUBTENSOR
with _SYNC_SUBTENSOR_LOCK:
if _SYNC_SUBTENSOR is not None:
try:
_SYNC_SUBTENSOR.close()
except Exception:
pass
_SYNC_SUBTENSOR = None
def _set_weights(
*,
wallet: "bt.wallet",
netuid: int,
mechid: int,
uids: list[int],
weights: list[float],
wait_for_inclusion: bool,
wait_for_finalization: bool,
log_prefix: str = "[signer]",
) -> bool:
""" """
try:
st = _get_sync_subtensor()
except Exception:
_reset_sync_subtensor()
st = _get_sync_subtensor()
try:
success, message = st.set_weights(
wallet=wallet,
netuid=netuid,
mechid=mechid,
uids=uids,
weights=weights,
wait_for_inclusion=wait_for_inclusion,
wait_for_finalization=wait_for_finalization,
)
msg_str = str(message or "")
except Exception as e:
msg_str = f"{type(e).__name__}: {e}"
if "SettingWeightsTooFast" in msg_str:
logger.error(
f"{log_prefix} SettingWeightsTooFast (exception) → treating as success."
)
return True
logger.warning(f"{log_prefix} set_weights exception: {msg_str}")
return False
if success:
logger.info(f"{log_prefix} set_weights success.")
return True
if "SettingWeightsTooFast" in msg_str:
logger.error(
f"{log_prefix} SettingWeightsTooFast (return) → treating as success."
)
return True
logger.warning(f"{log_prefix} set_weights failed: {msg_str or 'unknown error'}")
return False
async def run_signer() -> None:
settings = get_settings()
host = settings.SIGNER_HOST
port = settings.SIGNER_PORT
def signal_handler():
logger.info("Received shutdown signal, stopping signer...")
shutdown_event.set()
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, lambda s, f: signal_handler())
cold = settings.BITTENSOR_WALLET_COLD
hot = settings.BITTENSOR_WALLET_HOT
wallet = bt.wallet(name=cold, hotkey=hot)
@web.middleware
async def access_log(request: web.Request, handler):
t0 = time.monotonic()
try:
resp = await handler(request)
return resp
finally:
dt = (time.monotonic() - t0) * 1000
logger.info(
"[signer] %s %s -> %s %.1fms",
request.method,
request.path,
getattr(getattr(request, "response", None), "status", "?"),
dt,
)
async def health(_req: web.Request):
return web.json_response({"ok": True})
async def sign_handler(req: web.Request):
try:
payload = await req.json()
data = payload.get("payloads") or payload.get("data") or []
if isinstance(data, str):
data = [data]
sigs = [(wallet.hotkey.sign(data=d.encode("utf-8"))).hex() for d in data]
return web.json_response(
{
"success": True,
"signatures": sigs,
"hotkey": wallet.hotkey.ss58_address,
}
)
except Exception as e:
logger.error("[sign] error: %s", e)
return web.json_response({"success": False, "error": str(e)}, status=500)
async def set_weights_handler(req: web.Request):
try:
payload = await req.json()
netuid = int(payload.get("netuid", NETUID))
default_mechid = getattr(settings, "SCOREVISION_MECHID", MECHID)
mechid = int(payload.get("mechid", default_mechid))
uids = payload.get("uids") or []
wgts = payload.get("weights") or []
wfi = bool(payload.get("wait_for_inclusion", False))
wff = bool(payload.get("wait_for_finalization", False))
if isinstance(uids, int):
uids = [uids]
if isinstance(wgts, (int, float, str)):
try:
wgts = [float(wgts)]
except Exception:
wgts = [0.0]
if not isinstance(uids, list):
uids = list(uids)
if not isinstance(wgts, list):
wgts = list(wgts)
try:
uids = [int(u) for u in uids]
except Exception:
uids = []
try:
wgts = [float(w) for w in wgts]
except Exception:
wgts = []
if len(uids) != len(wgts) or not uids:
return web.json_response(
{"success": False, "error": "uids/weights mismatch or empty"},
status=400,
)
ok = _set_weights(
wallet=wallet,
netuid=netuid,
mechid=mechid,
uids=uids,
weights=wgts,
wait_for_inclusion=wfi,
wait_for_finalization=wff,
log_prefix="[signer]",
)
return web.json_response(
(
{"success": True}
if ok
else {"success": False, "error": "set_weights failed"}
),
status=200 if ok else 500,
)
except Exception as e:
logger.error("[set_weights] error: %s", e)
return web.json_response({"success": False, "error": str(e)}, status=500)
finally:
gc.collect()
app = web.Application(middlewares=[access_log])
app.add_routes(
[
web.get("/healthz", health),
web.post("/sign", sign_handler),
web.post("/set_weights", set_weights_handler),
]
)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, host=host, port=port)
await site.start()
try:
hn = socket.gethostname()
ip = socket.gethostbyname(hn)
except Exception:
hn, ip = ("?", "?")
logger.info(
"Signer listening on http://%s:%s hostname=%s ip=%s", host, port, hn, ip
)
# Wait for shutdown signal instead of infinite loop
try:
await shutdown_event.wait()
except asyncio.CancelledError:
pass
finally:
logger.info("Shutting down signer...")
await runner.cleanup()
await _reset_async_subtensor()
_reset_sync_subtensor()
gc.collect()