from logging import getLogger import os import random import asyncio import signal import gc from pathlib import Path from typing import Any from scorevision.utils.settings import get_settings from scorevision.utils.challenges import ( get_challenge_from_scorevision, get_challenge_from_scorevision_with_source, prepare_challenge_payload, build_svchallenge_from_parts, ) from scorevision.utils.data_models import SVChallenge from scorevision.chute_template.schemas import TVPredictInput from scorevision.utils.predict import call_miner_model_on_chutes from scorevision.utils.evaluate import post_vlm_ranking from scorevision.utils.cloudflare_helpers import emit_shard from scorevision.utils.async_clients import close_http_clients from scorevision.vlm_pipeline.vlm_annotator import ( generate_annotations_for_select_frames, ) from scorevision.utils.miner_registry import get_miners_from_registry, Miner from scorevision.utils.bittensor_helpers import get_subtensor, reset_subtensor from scorevision.vlm_pipeline.non_vlm_scoring.smoothness import ( filter_low_quality_pseudo_gt_annotations, ) from scorevision.utils.chutes_helpers import warmup_chute from scorevision.utils.prometheus import ( RUNNER_BLOCK_HEIGHT, RUNNER_RUNS_TOTAL, RUNNER_WARMUP_TOTAL, RUNNER_PGT_RETRY_TOTAL, RUNNER_PGT_FRAMES, RUNNER_MINER_CALLS_TOTAL, RUNNER_MINER_LATENCY_MS, RUNNER_EVALUATION_SCORE, RUNNER_EVALUATION_FAIL_TOTAL, RUNNER_SHARDS_EMITTED_TOTAL, RUNNER_ACTIVE_MINERS, RUNNER_LAST_RUN_DURATION_SECONDS, RUNNER_LAST_PGT_DURATION_SECONDS, RUNNER_MINER_LAST_DURATION_SECONDS, ) from scorevision.utils.video_processing import FrameStore logger = getLogger(__name__) # Global shutdown event for graceful shutdown shutdown_event = asyncio.Event() def _chute_id_for_miner(m: Miner) -> str | None: return getattr(m, "chute_id", None) or getattr(m, "slug", None) async def _build_pgt_with_retries( chal_api: dict, *, required_n_frames: int, max_bbox_retries: int = 5, max_quality_retries: int = 5, video_cache: dict[str, Any] | None = None, ) -> tuple[SVChallenge, TVPredictInput, list]: """ """ created_local_cache = video_cache is None if video_cache is None: video_cache = {} MIN_BBOXES_PER_FRAME = int(os.getenv("SV_MIN_BBOXES_PER_FRAME", "6")) MIN_FRAMES_REQUIRED = int( os.getenv("SV_MIN_BBOX_FRAMES_REQUIRED", str(required_n_frames)) ) last_err = None try: for quality_attempt in range(max_quality_retries): logger.info( f"[PGT] Starting quality attempt {quality_attempt+1}/{max_quality_retries}" ) for bbox_attempt in range(max_bbox_retries): try: payload, frame_numbers, frames, flows, _frame_store = ( await prepare_challenge_payload( challenge=chal_api, video_cache=video_cache, ) ) if len(frames) < required_n_frames: logger.warning( f"[PGT] Not enough frames ({len(frames)}/{required_n_frames}) " f"bbox attempt {bbox_attempt+1}/{max_bbox_retries}" ) RUNNER_PGT_RETRY_TOTAL.labels( reason="insufficient_frames" ).inc() continue challenge = build_svchallenge_from_parts( chal_api=chal_api, payload=payload, frame_numbers=frame_numbers, frames=frames, flows=flows, ) pseudo_gt_annotations = ( await generate_annotations_for_select_frames( video_name=challenge.challenge_id, frames=challenge.frames, flow_frames=challenge.dense_optical_flow_frames, frame_numbers=challenge.frame_numbers, ) ) n_frames = len(pseudo_gt_annotations) logger.info( f"[PGT] {n_frames} pseudo-GT annotations generated " f"(bbox attempt {bbox_attempt+1}/{max_bbox_retries})" ) if not _enough_bboxes_per_frame( pseudo_gt_annotations, min_bboxes_per_frame=MIN_BBOXES_PER_FRAME, min_frames_required=MIN_FRAMES_REQUIRED, ): logger.warning( f"[PGT] Too few bboxes per frame. bbox retry " f"{bbox_attempt+1}/{max_bbox_retries}" ) RUNNER_PGT_RETRY_TOTAL.labels(reason="too_few_bboxes").inc() continue filtered = filter_low_quality_pseudo_gt_annotations( annotations=pseudo_gt_annotations ) logger.info(f"[PGT] {len(filtered)} filtered annotations kept") if _enough_bboxes_per_frame( filtered, min_bboxes_per_frame=MIN_BBOXES_PER_FRAME, min_frames_required=required_n_frames, ): RUNNER_PGT_FRAMES.set(len(filtered)) logger.info( f"[PGT] Success: enough filtered frames " f"(quality attempt {quality_attempt+1}/{max_quality_retries}, " f"bbox attempt {bbox_attempt+1}/{max_bbox_retries})" ) return challenge, payload, filtered logger.warning( f"[PGT] Not enough quality frames after filtering " f"({len(filtered)}/{required_n_frames}), " f"quality attempt {quality_attempt+1}/{max_quality_retries}, " f"bbox attempt {bbox_attempt+1}/{max_bbox_retries}" ) RUNNER_PGT_RETRY_TOTAL.labels(reason="too_few_filtered").inc() except Exception as e: last_err = e logger.warning( f"[PGT] Exception during bbox attempt {bbox_attempt+1}/{max_bbox_retries}: {e}" ) RUNNER_PGT_RETRY_TOTAL.labels(reason="exception").inc() continue logger.warning( f"[PGT] Bbox phase failed after {max_bbox_retries} retries " f"→ new quality attempt ({quality_attempt+1}/{max_quality_retries})" ) RUNNER_PGT_RETRY_TOTAL.labels(reason="bbox_phase_failed").inc() raise RuntimeError( f"Failed to prepare high-quality PGT after {max_quality_retries} quality attempts " f"× {max_bbox_retries} bbox retries. Last error: {last_err}" ) finally: if created_local_cache and video_cache: cached_path = video_cache.get("path") if cached_path: try: from pathlib import Path as _Path ( _Path(cached_path) if not hasattr(cached_path, "unlink") else cached_path ).unlink(missing_ok=True) except Exception as e: logger.debug(f"Failed to remove cached video {cached_path}: {e}") def _enough_bboxes_per_frame( pseudo_gt_annotations: list, *, min_bboxes_per_frame: int, min_frames_required: int, ) -> bool: ok_frames = 0 for pgt in pseudo_gt_annotations: n = len(getattr(pgt.annotation, "bboxes", []) or []) if n >= min_bboxes_per_frame: ok_frames += 1 return ok_frames >= min_frames_required async def runner(slug: str | None = None) -> None: settings = get_settings() loop = asyncio.get_running_loop() run_start = loop.time() last_pgt_duration = 0.0 NETUID = settings.SCOREVISION_NETUID MAX_MINERS = int(os.getenv("SV_MAX_MINERS_PER_RUN", "60")) WARMUP_ENABLED = os.getenv("SV_WARMUP_BEFORE_RUN", "1") not in ( "0", "false", "False", ) WARMUP_CONC = int(os.getenv("SV_WARMUP_CONCURRENCY", "8")) WARMUP_TIMEOUT = int(os.getenv("SV_WARMUP_TIMEOUT_S", "60")) REQUIRED_PGT_FRAMES = int(getattr(settings, "SCOREVISION_VLM_SELECT_N_FRAMES", 3)) MAX_PGT_RETRIES = int(os.getenv("SV_PGT_MAX_RETRIES", "3")) MAX_PGT_BBOX_RETRIES = int( os.getenv("SV_PGT_MAX_BBOX_RETRIES", os.getenv("SV_PGT_MAX_RETRIES", "3")) ) MAX_PGT_QUALITY_RETRIES = int(os.getenv("SV_PGT_MAX_QUALITY_RETRIES", "4")) video_cache: dict[str, Any] = {} frame_store: FrameStore | None = None run_result = "success" try: miners = await get_miners_from_registry(NETUID) if not miners: logger.warning("No eligible miners found on-chain.") RUNNER_ACTIVE_MINERS.set(0) run_result = "no_miners" return challenge, payload, chal_api, frame_store = ( await get_challenge_from_scorevision_with_source(video_cache=video_cache) ) miner_list = list(miners.values()) RUNNER_ACTIVE_MINERS.set(len(miner_list)) try: pgt_build_start = loop.time() challenge, payload, pseudo_gt_annotations = await _build_pgt_with_retries( chal_api=chal_api, required_n_frames=REQUIRED_PGT_FRAMES, max_bbox_retries=MAX_PGT_BBOX_RETRIES, max_quality_retries=MAX_PGT_QUALITY_RETRIES, video_cache=video_cache, ) last_pgt_duration = loop.time() - pgt_build_start RUNNER_LAST_PGT_DURATION_SECONDS.set(last_pgt_duration) except Exception as e: logger.warning( f"PGT quality gating failed after retries, skipping challenge: {e}" ) last_pgt_duration = loop.time() - pgt_build_start RUNNER_LAST_PGT_DURATION_SECONDS.set(last_pgt_duration) run_result = "pgt_failed" return for m in miner_list: miner_label = getattr(m, "slug", None) or str(getattr(m, "uid", "?")) miner_output: TVPredictInput | None = None emission_started = False miner_total_start = loop.time() try: loop = asyncio.get_running_loop() start = loop.time() miner_output = await call_miner_model_on_chutes( slug=m.slug, chute_id=m.chute_id, payload=payload, ) latency_ms = (loop.time() - start) * 1000.0 RUNNER_MINER_LATENCY_MS.labels(miner=miner_label).set(latency_ms) RUNNER_MINER_CALLS_TOTAL.labels(outcome="success").inc() try: evaluation = post_vlm_ranking( payload=payload, miner_run=miner_output, challenge=challenge, pseudo_gt_annotations=pseudo_gt_annotations, frame_store=frame_store, ) except Exception: RUNNER_EVALUATION_FAIL_TOTAL.labels(stage="ranking").inc() raise logger.info(f"Evaluation: {evaluation}") if getattr(evaluation, "score", None) is not None: RUNNER_EVALUATION_SCORE.labels(miner=miner_label).set( getattr(evaluation, "score", 0.0) ) emission_started = True emit_start = loop.time() try: await emit_shard( slug=m.slug, challenge=challenge, miner_run=miner_output, evaluation=evaluation, miner_hotkey_ss58=m.hotkey, ) except Exception: dt_emit = (loop.time() - emit_start) * 1000.0 logger.exception( "[emit] FAILED for %s in %.1fms", miner_label, dt_emit ) raise else: dt_emit = (loop.time() - emit_start) * 1000.0 logger.info("[emit] success for %s in %.1fms", miner_label, dt_emit) RUNNER_SHARDS_EMITTED_TOTAL.labels(status="success").inc() finally: emission_started = False except Exception as e: logger.warning( "Miner uid=%s slug=%s failed: %s", getattr(m, "uid", "?"), getattr(m, "slug", "?"), e, ) if miner_output is None: RUNNER_MINER_CALLS_TOTAL.labels(outcome="exception").inc() if emission_started: RUNNER_SHARDS_EMITTED_TOTAL.labels(status="error").inc() continue finally: duration = loop.time() - miner_total_start RUNNER_MINER_LAST_DURATION_SECONDS.labels(miner=miner_label).set( duration ) except Exception as e: logger.error(e) run_result = "error" finally: loop_now = asyncio.get_running_loop() run_duration = loop_now.time() - run_start RUNNER_LAST_RUN_DURATION_SECONDS.set(run_duration) store_obj = video_cache.get("store") or frame_store if store_obj: try: store_obj.unlink() except Exception as err: logger.debug( f"Failed to remove cached video {getattr(store_obj, 'video_path', '?')}: {err}" ) elif video_cache.get("path"): cached_path = Path(video_cache["path"]) try: cached_path.unlink(missing_ok=True) except Exception as err: logger.debug(f"Failed to remove cached video {cached_path}: {err}") video_cache.clear() RUNNER_RUNS_TOTAL.labels(result=run_result).inc() close_http_clients() gc.collect() async def runner_loop(): """Runs `runner()` every 300 blocks, with robust triggering.""" settings = get_settings() TEMPO = 300 STALL_SECS_FALLBACK = 5400 GET_BLOCK_TIMEOUT = float(os.getenv("SUBTENSOR_GET_BLOCK_TIMEOUT_S", "15.0")) WAIT_BLOCK_TIMEOUT = float(os.getenv("SUBTENSOR_WAIT_BLOCK_TIMEOUT_S", "15.0")) RECONNECT_DELAY_S = float(os.getenv("SUBTENSOR_RECONNECT_DELAY_S", "5.0")) def signal_handler(): logger.warning("Received shutdown signal, stopping runner...") shutdown_event.set() for sig in (signal.SIGTERM, signal.SIGINT): signal.signal(sig, lambda s, f: signal_handler()) st = None last_trigger_block = None last_seen_block = None loop = asyncio.get_running_loop() last_progress_time = loop.time() last_trigger_time = loop.time() logger.warning("[RunnerLoop] starting, TEMPO=%s blocks", TEMPO) while not shutdown_event.is_set(): try: if st is None: logger.warning("[RunnerLoop] (re)connecting subtensor…") try: st = await get_subtensor() except Exception as e: logger.warning( "[RunnerLoop] subtensor connect failed: %s → retrying in %.1fs", e, RECONNECT_DELAY_S ) reset_subtensor() st = None await asyncio.sleep(RECONNECT_DELAY_S) continue try: block = await asyncio.wait_for( st.get_current_block(), timeout=GET_BLOCK_TIMEOUT ) except asyncio.TimeoutError: logger.warning( "[RunnerLoop] get_current_block() timed out after %.1fs → resetting subtensor", GET_BLOCK_TIMEOUT, ) reset_subtensor() st = None await asyncio.sleep(2.0) continue except (KeyError, ConnectionError, RuntimeError) as err: logger.warning( "[RunnerLoop] get_current_block error (%s) → resetting subtensor", err, ) reset_subtensor() st = None await asyncio.sleep(2.0) continue RUNNER_BLOCK_HEIGHT.set(block) now = loop.time() if last_seen_block is None or block > last_seen_block: last_seen_block = block last_progress_time = now should_trigger = False if last_trigger_block is None: should_trigger = True logger.warning("[RunnerLoop] first trigger at block %s", block) else: if block - last_trigger_block >= TEMPO: should_trigger = True if (now - last_progress_time) >= STALL_SECS_FALLBACK: logger.warning( "[RunnerLoop] no block progress for %.0fs → fallback trigger", now - last_progress_time, ) should_trigger = True last_progress_time = now if (now - last_trigger_time) >= STALL_SECS_FALLBACK: logger.warning( "[RunnerLoop] no run triggered for %.0fs (wall clock) → fallback trigger", now - last_trigger_time, ) should_trigger = True if should_trigger: logger.warning( "[RunnerLoop] Triggering runner at block %s (last_trigger_block=%s)", block, last_trigger_block, ) await runner() gc.collect() last_trigger_block = block last_trigger_time = loop.time() else: try: await asyncio.wait_for( st.wait_for_block(), timeout=WAIT_BLOCK_TIMEOUT ) except asyncio.TimeoutError: continue except (KeyError, ConnectionError, RuntimeError) as err: logger.warning( "[RunnerLoop] wait_for_block error (%s); resetting subtensor", err, ) reset_subtensor() st = None await asyncio.sleep(2.0) continue except asyncio.CancelledError: break except Exception as e: logger.warning( "[RunnerLoop] Error: %s; resetting subtensor and retrying…", e ) reset_subtensor() st = None try: await asyncio.wait_for(shutdown_event.wait(), timeout=120.0) except asyncio.TimeoutError: pass logger.warning("Runner loop shutting down gracefully...")