File size: 19,842 Bytes
f60a6c1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 |
from logging import getLogger
import os
import random
import asyncio
import signal
import gc
from pathlib import Path
from typing import Any
from scorevision.utils.settings import get_settings
from scorevision.utils.challenges import (
get_challenge_from_scorevision,
get_challenge_from_scorevision_with_source,
prepare_challenge_payload,
build_svchallenge_from_parts,
)
from scorevision.utils.data_models import SVChallenge
from scorevision.chute_template.schemas import TVPredictInput
from scorevision.utils.predict import call_miner_model_on_chutes
from scorevision.utils.evaluate import post_vlm_ranking
from scorevision.utils.cloudflare_helpers import emit_shard
from scorevision.utils.async_clients import close_http_clients
from scorevision.vlm_pipeline.vlm_annotator import (
generate_annotations_for_select_frames,
)
from scorevision.utils.miner_registry import get_miners_from_registry, Miner
from scorevision.utils.bittensor_helpers import get_subtensor, reset_subtensor
from scorevision.vlm_pipeline.non_vlm_scoring.smoothness import (
filter_low_quality_pseudo_gt_annotations,
)
from scorevision.utils.chutes_helpers import warmup_chute
from scorevision.utils.prometheus import (
RUNNER_BLOCK_HEIGHT,
RUNNER_RUNS_TOTAL,
RUNNER_WARMUP_TOTAL,
RUNNER_PGT_RETRY_TOTAL,
RUNNER_PGT_FRAMES,
RUNNER_MINER_CALLS_TOTAL,
RUNNER_MINER_LATENCY_MS,
RUNNER_EVALUATION_SCORE,
RUNNER_EVALUATION_FAIL_TOTAL,
RUNNER_SHARDS_EMITTED_TOTAL,
RUNNER_ACTIVE_MINERS,
RUNNER_LAST_RUN_DURATION_SECONDS,
RUNNER_LAST_PGT_DURATION_SECONDS,
RUNNER_MINER_LAST_DURATION_SECONDS,
)
from scorevision.utils.video_processing import FrameStore
logger = getLogger(__name__)
# Global shutdown event for graceful shutdown
shutdown_event = asyncio.Event()
def _chute_id_for_miner(m: Miner) -> str | None:
return getattr(m, "chute_id", None) or getattr(m, "slug", None)
async def _build_pgt_with_retries(
chal_api: dict,
*,
required_n_frames: int,
max_bbox_retries: int = 5,
max_quality_retries: int = 5,
video_cache: dict[str, Any] | None = None,
) -> tuple[SVChallenge, TVPredictInput, list]:
""" """
created_local_cache = video_cache is None
if video_cache is None:
video_cache = {}
MIN_BBOXES_PER_FRAME = int(os.getenv("SV_MIN_BBOXES_PER_FRAME", "6"))
MIN_FRAMES_REQUIRED = int(
os.getenv("SV_MIN_BBOX_FRAMES_REQUIRED", str(required_n_frames))
)
last_err = None
try:
for quality_attempt in range(max_quality_retries):
logger.info(
f"[PGT] Starting quality attempt {quality_attempt+1}/{max_quality_retries}"
)
for bbox_attempt in range(max_bbox_retries):
try:
payload, frame_numbers, frames, flows, _frame_store = (
await prepare_challenge_payload(
challenge=chal_api,
video_cache=video_cache,
)
)
if len(frames) < required_n_frames:
logger.warning(
f"[PGT] Not enough frames ({len(frames)}/{required_n_frames}) "
f"bbox attempt {bbox_attempt+1}/{max_bbox_retries}"
)
RUNNER_PGT_RETRY_TOTAL.labels(
reason="insufficient_frames"
).inc()
continue
challenge = build_svchallenge_from_parts(
chal_api=chal_api,
payload=payload,
frame_numbers=frame_numbers,
frames=frames,
flows=flows,
)
pseudo_gt_annotations = (
await generate_annotations_for_select_frames(
video_name=challenge.challenge_id,
frames=challenge.frames,
flow_frames=challenge.dense_optical_flow_frames,
frame_numbers=challenge.frame_numbers,
)
)
n_frames = len(pseudo_gt_annotations)
logger.info(
f"[PGT] {n_frames} pseudo-GT annotations generated "
f"(bbox attempt {bbox_attempt+1}/{max_bbox_retries})"
)
if not _enough_bboxes_per_frame(
pseudo_gt_annotations,
min_bboxes_per_frame=MIN_BBOXES_PER_FRAME,
min_frames_required=MIN_FRAMES_REQUIRED,
):
logger.warning(
f"[PGT] Too few bboxes per frame. bbox retry "
f"{bbox_attempt+1}/{max_bbox_retries}"
)
RUNNER_PGT_RETRY_TOTAL.labels(reason="too_few_bboxes").inc()
continue
filtered = filter_low_quality_pseudo_gt_annotations(
annotations=pseudo_gt_annotations
)
logger.info(f"[PGT] {len(filtered)} filtered annotations kept")
if _enough_bboxes_per_frame(
filtered,
min_bboxes_per_frame=MIN_BBOXES_PER_FRAME,
min_frames_required=required_n_frames,
):
RUNNER_PGT_FRAMES.set(len(filtered))
logger.info(
f"[PGT] Success: enough filtered frames "
f"(quality attempt {quality_attempt+1}/{max_quality_retries}, "
f"bbox attempt {bbox_attempt+1}/{max_bbox_retries})"
)
return challenge, payload, filtered
logger.warning(
f"[PGT] Not enough quality frames after filtering "
f"({len(filtered)}/{required_n_frames}), "
f"quality attempt {quality_attempt+1}/{max_quality_retries}, "
f"bbox attempt {bbox_attempt+1}/{max_bbox_retries}"
)
RUNNER_PGT_RETRY_TOTAL.labels(reason="too_few_filtered").inc()
except Exception as e:
last_err = e
logger.warning(
f"[PGT] Exception during bbox attempt {bbox_attempt+1}/{max_bbox_retries}: {e}"
)
RUNNER_PGT_RETRY_TOTAL.labels(reason="exception").inc()
continue
logger.warning(
f"[PGT] Bbox phase failed after {max_bbox_retries} retries "
f"→ new quality attempt ({quality_attempt+1}/{max_quality_retries})"
)
RUNNER_PGT_RETRY_TOTAL.labels(reason="bbox_phase_failed").inc()
raise RuntimeError(
f"Failed to prepare high-quality PGT after {max_quality_retries} quality attempts "
f"× {max_bbox_retries} bbox retries. Last error: {last_err}"
)
finally:
if created_local_cache and video_cache:
cached_path = video_cache.get("path")
if cached_path:
try:
from pathlib import Path as _Path
(
_Path(cached_path)
if not hasattr(cached_path, "unlink")
else cached_path
).unlink(missing_ok=True)
except Exception as e:
logger.debug(f"Failed to remove cached video {cached_path}: {e}")
def _enough_bboxes_per_frame(
pseudo_gt_annotations: list,
*,
min_bboxes_per_frame: int,
min_frames_required: int,
) -> bool:
ok_frames = 0
for pgt in pseudo_gt_annotations:
n = len(getattr(pgt.annotation, "bboxes", []) or [])
if n >= min_bboxes_per_frame:
ok_frames += 1
return ok_frames >= min_frames_required
async def runner(slug: str | None = None) -> None:
settings = get_settings()
loop = asyncio.get_running_loop()
run_start = loop.time()
last_pgt_duration = 0.0
NETUID = settings.SCOREVISION_NETUID
MAX_MINERS = int(os.getenv("SV_MAX_MINERS_PER_RUN", "60"))
WARMUP_ENABLED = os.getenv("SV_WARMUP_BEFORE_RUN", "1") not in (
"0",
"false",
"False",
)
WARMUP_CONC = int(os.getenv("SV_WARMUP_CONCURRENCY", "8"))
WARMUP_TIMEOUT = int(os.getenv("SV_WARMUP_TIMEOUT_S", "60"))
REQUIRED_PGT_FRAMES = int(getattr(settings, "SCOREVISION_VLM_SELECT_N_FRAMES", 3))
MAX_PGT_RETRIES = int(os.getenv("SV_PGT_MAX_RETRIES", "3"))
MAX_PGT_BBOX_RETRIES = int(
os.getenv("SV_PGT_MAX_BBOX_RETRIES", os.getenv("SV_PGT_MAX_RETRIES", "3"))
)
MAX_PGT_QUALITY_RETRIES = int(os.getenv("SV_PGT_MAX_QUALITY_RETRIES", "4"))
video_cache: dict[str, Any] = {}
frame_store: FrameStore | None = None
run_result = "success"
try:
miners = await get_miners_from_registry(NETUID)
if not miners:
logger.warning("No eligible miners found on-chain.")
RUNNER_ACTIVE_MINERS.set(0)
run_result = "no_miners"
return
challenge, payload, chal_api, frame_store = (
await get_challenge_from_scorevision_with_source(video_cache=video_cache)
)
miner_list = list(miners.values())
RUNNER_ACTIVE_MINERS.set(len(miner_list))
try:
pgt_build_start = loop.time()
challenge, payload, pseudo_gt_annotations = await _build_pgt_with_retries(
chal_api=chal_api,
required_n_frames=REQUIRED_PGT_FRAMES,
max_bbox_retries=MAX_PGT_BBOX_RETRIES,
max_quality_retries=MAX_PGT_QUALITY_RETRIES,
video_cache=video_cache,
)
last_pgt_duration = loop.time() - pgt_build_start
RUNNER_LAST_PGT_DURATION_SECONDS.set(last_pgt_duration)
except Exception as e:
logger.warning(
f"PGT quality gating failed after retries, skipping challenge: {e}"
)
last_pgt_duration = loop.time() - pgt_build_start
RUNNER_LAST_PGT_DURATION_SECONDS.set(last_pgt_duration)
run_result = "pgt_failed"
return
for m in miner_list:
miner_label = getattr(m, "slug", None) or str(getattr(m, "uid", "?"))
miner_output: TVPredictInput | None = None
emission_started = False
miner_total_start = loop.time()
try:
loop = asyncio.get_running_loop()
start = loop.time()
miner_output = await call_miner_model_on_chutes(
slug=m.slug,
chute_id=m.chute_id,
payload=payload,
)
latency_ms = (loop.time() - start) * 1000.0
RUNNER_MINER_LATENCY_MS.labels(miner=miner_label).set(latency_ms)
RUNNER_MINER_CALLS_TOTAL.labels(outcome="success").inc()
try:
evaluation = post_vlm_ranking(
payload=payload,
miner_run=miner_output,
challenge=challenge,
pseudo_gt_annotations=pseudo_gt_annotations,
frame_store=frame_store,
)
except Exception:
RUNNER_EVALUATION_FAIL_TOTAL.labels(stage="ranking").inc()
raise
logger.info(f"Evaluation: {evaluation}")
if getattr(evaluation, "score", None) is not None:
RUNNER_EVALUATION_SCORE.labels(miner=miner_label).set(
getattr(evaluation, "score", 0.0)
)
emission_started = True
emit_start = loop.time()
try:
await emit_shard(
slug=m.slug,
challenge=challenge,
miner_run=miner_output,
evaluation=evaluation,
miner_hotkey_ss58=m.hotkey,
)
except Exception:
dt_emit = (loop.time() - emit_start) * 1000.0
logger.exception(
"[emit] FAILED for %s in %.1fms", miner_label, dt_emit
)
raise
else:
dt_emit = (loop.time() - emit_start) * 1000.0
logger.info("[emit] success for %s in %.1fms", miner_label, dt_emit)
RUNNER_SHARDS_EMITTED_TOTAL.labels(status="success").inc()
finally:
emission_started = False
except Exception as e:
logger.warning(
"Miner uid=%s slug=%s failed: %s",
getattr(m, "uid", "?"),
getattr(m, "slug", "?"),
e,
)
if miner_output is None:
RUNNER_MINER_CALLS_TOTAL.labels(outcome="exception").inc()
if emission_started:
RUNNER_SHARDS_EMITTED_TOTAL.labels(status="error").inc()
continue
finally:
duration = loop.time() - miner_total_start
RUNNER_MINER_LAST_DURATION_SECONDS.labels(miner=miner_label).set(
duration
)
except Exception as e:
logger.error(e)
run_result = "error"
finally:
loop_now = asyncio.get_running_loop()
run_duration = loop_now.time() - run_start
RUNNER_LAST_RUN_DURATION_SECONDS.set(run_duration)
store_obj = video_cache.get("store") or frame_store
if store_obj:
try:
store_obj.unlink()
except Exception as err:
logger.debug(
f"Failed to remove cached video {getattr(store_obj, 'video_path', '?')}: {err}"
)
elif video_cache.get("path"):
cached_path = Path(video_cache["path"])
try:
cached_path.unlink(missing_ok=True)
except Exception as err:
logger.debug(f"Failed to remove cached video {cached_path}: {err}")
video_cache.clear()
RUNNER_RUNS_TOTAL.labels(result=run_result).inc()
close_http_clients()
gc.collect()
async def runner_loop():
"""Runs `runner()` every 300 blocks, with robust triggering."""
settings = get_settings()
TEMPO = 300
STALL_SECS_FALLBACK = 5400
GET_BLOCK_TIMEOUT = float(os.getenv("SUBTENSOR_GET_BLOCK_TIMEOUT_S", "15.0"))
WAIT_BLOCK_TIMEOUT = float(os.getenv("SUBTENSOR_WAIT_BLOCK_TIMEOUT_S", "15.0"))
RECONNECT_DELAY_S = float(os.getenv("SUBTENSOR_RECONNECT_DELAY_S", "5.0"))
def signal_handler():
logger.warning("Received shutdown signal, stopping runner...")
shutdown_event.set()
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, lambda s, f: signal_handler())
st = None
last_trigger_block = None
last_seen_block = None
loop = asyncio.get_running_loop()
last_progress_time = loop.time()
last_trigger_time = loop.time()
logger.warning("[RunnerLoop] starting, TEMPO=%s blocks", TEMPO)
while not shutdown_event.is_set():
try:
if st is None:
logger.warning("[RunnerLoop] (re)connecting subtensor…")
try:
st = await get_subtensor()
except Exception as e:
logger.warning(
"[RunnerLoop] subtensor connect failed: %s → retrying in %.1fs",
e, RECONNECT_DELAY_S
)
reset_subtensor()
st = None
await asyncio.sleep(RECONNECT_DELAY_S)
continue
try:
block = await asyncio.wait_for(
st.get_current_block(), timeout=GET_BLOCK_TIMEOUT
)
except asyncio.TimeoutError:
logger.warning(
"[RunnerLoop] get_current_block() timed out after %.1fs → resetting subtensor",
GET_BLOCK_TIMEOUT,
)
reset_subtensor()
st = None
await asyncio.sleep(2.0)
continue
except (KeyError, ConnectionError, RuntimeError) as err:
logger.warning(
"[RunnerLoop] get_current_block error (%s) → resetting subtensor",
err,
)
reset_subtensor()
st = None
await asyncio.sleep(2.0)
continue
RUNNER_BLOCK_HEIGHT.set(block)
now = loop.time()
if last_seen_block is None or block > last_seen_block:
last_seen_block = block
last_progress_time = now
should_trigger = False
if last_trigger_block is None:
should_trigger = True
logger.warning("[RunnerLoop] first trigger at block %s", block)
else:
if block - last_trigger_block >= TEMPO:
should_trigger = True
if (now - last_progress_time) >= STALL_SECS_FALLBACK:
logger.warning(
"[RunnerLoop] no block progress for %.0fs → fallback trigger",
now - last_progress_time,
)
should_trigger = True
last_progress_time = now
if (now - last_trigger_time) >= STALL_SECS_FALLBACK:
logger.warning(
"[RunnerLoop] no run triggered for %.0fs (wall clock) → fallback trigger",
now - last_trigger_time,
)
should_trigger = True
if should_trigger:
logger.warning(
"[RunnerLoop] Triggering runner at block %s (last_trigger_block=%s)",
block,
last_trigger_block,
)
await runner()
gc.collect()
last_trigger_block = block
last_trigger_time = loop.time()
else:
try:
await asyncio.wait_for(
st.wait_for_block(), timeout=WAIT_BLOCK_TIMEOUT
)
except asyncio.TimeoutError:
continue
except (KeyError, ConnectionError, RuntimeError) as err:
logger.warning(
"[RunnerLoop] wait_for_block error (%s); resetting subtensor",
err,
)
reset_subtensor()
st = None
await asyncio.sleep(2.0)
continue
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(
"[RunnerLoop] Error: %s; resetting subtensor and retrying…", e
)
reset_subtensor()
st = None
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=120.0)
except asyncio.TimeoutError:
pass
logger.warning("Runner loop shutting down gracefully...")
|