ultravision-01 / cli /runner.py
mohantesting's picture
Add files using upload-large-folder tool
f60a6c1 verified
from logging import getLogger
import os
import random
import asyncio
import signal
import gc
from pathlib import Path
from typing import Any
from scorevision.utils.settings import get_settings
from scorevision.utils.challenges import (
get_challenge_from_scorevision,
get_challenge_from_scorevision_with_source,
prepare_challenge_payload,
build_svchallenge_from_parts,
)
from scorevision.utils.data_models import SVChallenge
from scorevision.chute_template.schemas import TVPredictInput
from scorevision.utils.predict import call_miner_model_on_chutes
from scorevision.utils.evaluate import post_vlm_ranking
from scorevision.utils.cloudflare_helpers import emit_shard
from scorevision.utils.async_clients import close_http_clients
from scorevision.vlm_pipeline.vlm_annotator import (
generate_annotations_for_select_frames,
)
from scorevision.utils.miner_registry import get_miners_from_registry, Miner
from scorevision.utils.bittensor_helpers import get_subtensor, reset_subtensor
from scorevision.vlm_pipeline.non_vlm_scoring.smoothness import (
filter_low_quality_pseudo_gt_annotations,
)
from scorevision.utils.chutes_helpers import warmup_chute
from scorevision.utils.prometheus import (
RUNNER_BLOCK_HEIGHT,
RUNNER_RUNS_TOTAL,
RUNNER_WARMUP_TOTAL,
RUNNER_PGT_RETRY_TOTAL,
RUNNER_PGT_FRAMES,
RUNNER_MINER_CALLS_TOTAL,
RUNNER_MINER_LATENCY_MS,
RUNNER_EVALUATION_SCORE,
RUNNER_EVALUATION_FAIL_TOTAL,
RUNNER_SHARDS_EMITTED_TOTAL,
RUNNER_ACTIVE_MINERS,
RUNNER_LAST_RUN_DURATION_SECONDS,
RUNNER_LAST_PGT_DURATION_SECONDS,
RUNNER_MINER_LAST_DURATION_SECONDS,
)
from scorevision.utils.video_processing import FrameStore
logger = getLogger(__name__)
# Global shutdown event for graceful shutdown
shutdown_event = asyncio.Event()
def _chute_id_for_miner(m: Miner) -> str | None:
return getattr(m, "chute_id", None) or getattr(m, "slug", None)
async def _build_pgt_with_retries(
chal_api: dict,
*,
required_n_frames: int,
max_bbox_retries: int = 5,
max_quality_retries: int = 5,
video_cache: dict[str, Any] | None = None,
) -> tuple[SVChallenge, TVPredictInput, list]:
""" """
created_local_cache = video_cache is None
if video_cache is None:
video_cache = {}
MIN_BBOXES_PER_FRAME = int(os.getenv("SV_MIN_BBOXES_PER_FRAME", "6"))
MIN_FRAMES_REQUIRED = int(
os.getenv("SV_MIN_BBOX_FRAMES_REQUIRED", str(required_n_frames))
)
last_err = None
try:
for quality_attempt in range(max_quality_retries):
logger.info(
f"[PGT] Starting quality attempt {quality_attempt+1}/{max_quality_retries}"
)
for bbox_attempt in range(max_bbox_retries):
try:
payload, frame_numbers, frames, flows, _frame_store = (
await prepare_challenge_payload(
challenge=chal_api,
video_cache=video_cache,
)
)
if len(frames) < required_n_frames:
logger.warning(
f"[PGT] Not enough frames ({len(frames)}/{required_n_frames}) "
f"bbox attempt {bbox_attempt+1}/{max_bbox_retries}"
)
RUNNER_PGT_RETRY_TOTAL.labels(
reason="insufficient_frames"
).inc()
continue
challenge = build_svchallenge_from_parts(
chal_api=chal_api,
payload=payload,
frame_numbers=frame_numbers,
frames=frames,
flows=flows,
)
pseudo_gt_annotations = (
await generate_annotations_for_select_frames(
video_name=challenge.challenge_id,
frames=challenge.frames,
flow_frames=challenge.dense_optical_flow_frames,
frame_numbers=challenge.frame_numbers,
)
)
n_frames = len(pseudo_gt_annotations)
logger.info(
f"[PGT] {n_frames} pseudo-GT annotations generated "
f"(bbox attempt {bbox_attempt+1}/{max_bbox_retries})"
)
if not _enough_bboxes_per_frame(
pseudo_gt_annotations,
min_bboxes_per_frame=MIN_BBOXES_PER_FRAME,
min_frames_required=MIN_FRAMES_REQUIRED,
):
logger.warning(
f"[PGT] Too few bboxes per frame. bbox retry "
f"{bbox_attempt+1}/{max_bbox_retries}"
)
RUNNER_PGT_RETRY_TOTAL.labels(reason="too_few_bboxes").inc()
continue
filtered = filter_low_quality_pseudo_gt_annotations(
annotations=pseudo_gt_annotations
)
logger.info(f"[PGT] {len(filtered)} filtered annotations kept")
if _enough_bboxes_per_frame(
filtered,
min_bboxes_per_frame=MIN_BBOXES_PER_FRAME,
min_frames_required=required_n_frames,
):
RUNNER_PGT_FRAMES.set(len(filtered))
logger.info(
f"[PGT] Success: enough filtered frames "
f"(quality attempt {quality_attempt+1}/{max_quality_retries}, "
f"bbox attempt {bbox_attempt+1}/{max_bbox_retries})"
)
return challenge, payload, filtered
logger.warning(
f"[PGT] Not enough quality frames after filtering "
f"({len(filtered)}/{required_n_frames}), "
f"quality attempt {quality_attempt+1}/{max_quality_retries}, "
f"bbox attempt {bbox_attempt+1}/{max_bbox_retries}"
)
RUNNER_PGT_RETRY_TOTAL.labels(reason="too_few_filtered").inc()
except Exception as e:
last_err = e
logger.warning(
f"[PGT] Exception during bbox attempt {bbox_attempt+1}/{max_bbox_retries}: {e}"
)
RUNNER_PGT_RETRY_TOTAL.labels(reason="exception").inc()
continue
logger.warning(
f"[PGT] Bbox phase failed after {max_bbox_retries} retries "
f"→ new quality attempt ({quality_attempt+1}/{max_quality_retries})"
)
RUNNER_PGT_RETRY_TOTAL.labels(reason="bbox_phase_failed").inc()
raise RuntimeError(
f"Failed to prepare high-quality PGT after {max_quality_retries} quality attempts "
f"× {max_bbox_retries} bbox retries. Last error: {last_err}"
)
finally:
if created_local_cache and video_cache:
cached_path = video_cache.get("path")
if cached_path:
try:
from pathlib import Path as _Path
(
_Path(cached_path)
if not hasattr(cached_path, "unlink")
else cached_path
).unlink(missing_ok=True)
except Exception as e:
logger.debug(f"Failed to remove cached video {cached_path}: {e}")
def _enough_bboxes_per_frame(
pseudo_gt_annotations: list,
*,
min_bboxes_per_frame: int,
min_frames_required: int,
) -> bool:
ok_frames = 0
for pgt in pseudo_gt_annotations:
n = len(getattr(pgt.annotation, "bboxes", []) or [])
if n >= min_bboxes_per_frame:
ok_frames += 1
return ok_frames >= min_frames_required
async def runner(slug: str | None = None) -> None:
settings = get_settings()
loop = asyncio.get_running_loop()
run_start = loop.time()
last_pgt_duration = 0.0
NETUID = settings.SCOREVISION_NETUID
MAX_MINERS = int(os.getenv("SV_MAX_MINERS_PER_RUN", "60"))
WARMUP_ENABLED = os.getenv("SV_WARMUP_BEFORE_RUN", "1") not in (
"0",
"false",
"False",
)
WARMUP_CONC = int(os.getenv("SV_WARMUP_CONCURRENCY", "8"))
WARMUP_TIMEOUT = int(os.getenv("SV_WARMUP_TIMEOUT_S", "60"))
REQUIRED_PGT_FRAMES = int(getattr(settings, "SCOREVISION_VLM_SELECT_N_FRAMES", 3))
MAX_PGT_RETRIES = int(os.getenv("SV_PGT_MAX_RETRIES", "3"))
MAX_PGT_BBOX_RETRIES = int(
os.getenv("SV_PGT_MAX_BBOX_RETRIES", os.getenv("SV_PGT_MAX_RETRIES", "3"))
)
MAX_PGT_QUALITY_RETRIES = int(os.getenv("SV_PGT_MAX_QUALITY_RETRIES", "4"))
video_cache: dict[str, Any] = {}
frame_store: FrameStore | None = None
run_result = "success"
try:
miners = await get_miners_from_registry(NETUID)
if not miners:
logger.warning("No eligible miners found on-chain.")
RUNNER_ACTIVE_MINERS.set(0)
run_result = "no_miners"
return
challenge, payload, chal_api, frame_store = (
await get_challenge_from_scorevision_with_source(video_cache=video_cache)
)
miner_list = list(miners.values())
RUNNER_ACTIVE_MINERS.set(len(miner_list))
try:
pgt_build_start = loop.time()
challenge, payload, pseudo_gt_annotations = await _build_pgt_with_retries(
chal_api=chal_api,
required_n_frames=REQUIRED_PGT_FRAMES,
max_bbox_retries=MAX_PGT_BBOX_RETRIES,
max_quality_retries=MAX_PGT_QUALITY_RETRIES,
video_cache=video_cache,
)
last_pgt_duration = loop.time() - pgt_build_start
RUNNER_LAST_PGT_DURATION_SECONDS.set(last_pgt_duration)
except Exception as e:
logger.warning(
f"PGT quality gating failed after retries, skipping challenge: {e}"
)
last_pgt_duration = loop.time() - pgt_build_start
RUNNER_LAST_PGT_DURATION_SECONDS.set(last_pgt_duration)
run_result = "pgt_failed"
return
for m in miner_list:
miner_label = getattr(m, "slug", None) or str(getattr(m, "uid", "?"))
miner_output: TVPredictInput | None = None
emission_started = False
miner_total_start = loop.time()
try:
loop = asyncio.get_running_loop()
start = loop.time()
miner_output = await call_miner_model_on_chutes(
slug=m.slug,
chute_id=m.chute_id,
payload=payload,
)
latency_ms = (loop.time() - start) * 1000.0
RUNNER_MINER_LATENCY_MS.labels(miner=miner_label).set(latency_ms)
RUNNER_MINER_CALLS_TOTAL.labels(outcome="success").inc()
try:
evaluation = post_vlm_ranking(
payload=payload,
miner_run=miner_output,
challenge=challenge,
pseudo_gt_annotations=pseudo_gt_annotations,
frame_store=frame_store,
)
except Exception:
RUNNER_EVALUATION_FAIL_TOTAL.labels(stage="ranking").inc()
raise
logger.info(f"Evaluation: {evaluation}")
if getattr(evaluation, "score", None) is not None:
RUNNER_EVALUATION_SCORE.labels(miner=miner_label).set(
getattr(evaluation, "score", 0.0)
)
emission_started = True
emit_start = loop.time()
try:
await emit_shard(
slug=m.slug,
challenge=challenge,
miner_run=miner_output,
evaluation=evaluation,
miner_hotkey_ss58=m.hotkey,
)
except Exception:
dt_emit = (loop.time() - emit_start) * 1000.0
logger.exception(
"[emit] FAILED for %s in %.1fms", miner_label, dt_emit
)
raise
else:
dt_emit = (loop.time() - emit_start) * 1000.0
logger.info("[emit] success for %s in %.1fms", miner_label, dt_emit)
RUNNER_SHARDS_EMITTED_TOTAL.labels(status="success").inc()
finally:
emission_started = False
except Exception as e:
logger.warning(
"Miner uid=%s slug=%s failed: %s",
getattr(m, "uid", "?"),
getattr(m, "slug", "?"),
e,
)
if miner_output is None:
RUNNER_MINER_CALLS_TOTAL.labels(outcome="exception").inc()
if emission_started:
RUNNER_SHARDS_EMITTED_TOTAL.labels(status="error").inc()
continue
finally:
duration = loop.time() - miner_total_start
RUNNER_MINER_LAST_DURATION_SECONDS.labels(miner=miner_label).set(
duration
)
except Exception as e:
logger.error(e)
run_result = "error"
finally:
loop_now = asyncio.get_running_loop()
run_duration = loop_now.time() - run_start
RUNNER_LAST_RUN_DURATION_SECONDS.set(run_duration)
store_obj = video_cache.get("store") or frame_store
if store_obj:
try:
store_obj.unlink()
except Exception as err:
logger.debug(
f"Failed to remove cached video {getattr(store_obj, 'video_path', '?')}: {err}"
)
elif video_cache.get("path"):
cached_path = Path(video_cache["path"])
try:
cached_path.unlink(missing_ok=True)
except Exception as err:
logger.debug(f"Failed to remove cached video {cached_path}: {err}")
video_cache.clear()
RUNNER_RUNS_TOTAL.labels(result=run_result).inc()
close_http_clients()
gc.collect()
async def runner_loop():
"""Runs `runner()` every 300 blocks, with robust triggering."""
settings = get_settings()
TEMPO = 300
STALL_SECS_FALLBACK = 5400
GET_BLOCK_TIMEOUT = float(os.getenv("SUBTENSOR_GET_BLOCK_TIMEOUT_S", "15.0"))
WAIT_BLOCK_TIMEOUT = float(os.getenv("SUBTENSOR_WAIT_BLOCK_TIMEOUT_S", "15.0"))
RECONNECT_DELAY_S = float(os.getenv("SUBTENSOR_RECONNECT_DELAY_S", "5.0"))
def signal_handler():
logger.warning("Received shutdown signal, stopping runner...")
shutdown_event.set()
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, lambda s, f: signal_handler())
st = None
last_trigger_block = None
last_seen_block = None
loop = asyncio.get_running_loop()
last_progress_time = loop.time()
last_trigger_time = loop.time()
logger.warning("[RunnerLoop] starting, TEMPO=%s blocks", TEMPO)
while not shutdown_event.is_set():
try:
if st is None:
logger.warning("[RunnerLoop] (re)connecting subtensor…")
try:
st = await get_subtensor()
except Exception as e:
logger.warning(
"[RunnerLoop] subtensor connect failed: %s → retrying in %.1fs",
e, RECONNECT_DELAY_S
)
reset_subtensor()
st = None
await asyncio.sleep(RECONNECT_DELAY_S)
continue
try:
block = await asyncio.wait_for(
st.get_current_block(), timeout=GET_BLOCK_TIMEOUT
)
except asyncio.TimeoutError:
logger.warning(
"[RunnerLoop] get_current_block() timed out after %.1fs → resetting subtensor",
GET_BLOCK_TIMEOUT,
)
reset_subtensor()
st = None
await asyncio.sleep(2.0)
continue
except (KeyError, ConnectionError, RuntimeError) as err:
logger.warning(
"[RunnerLoop] get_current_block error (%s) → resetting subtensor",
err,
)
reset_subtensor()
st = None
await asyncio.sleep(2.0)
continue
RUNNER_BLOCK_HEIGHT.set(block)
now = loop.time()
if last_seen_block is None or block > last_seen_block:
last_seen_block = block
last_progress_time = now
should_trigger = False
if last_trigger_block is None:
should_trigger = True
logger.warning("[RunnerLoop] first trigger at block %s", block)
else:
if block - last_trigger_block >= TEMPO:
should_trigger = True
if (now - last_progress_time) >= STALL_SECS_FALLBACK:
logger.warning(
"[RunnerLoop] no block progress for %.0fs → fallback trigger",
now - last_progress_time,
)
should_trigger = True
last_progress_time = now
if (now - last_trigger_time) >= STALL_SECS_FALLBACK:
logger.warning(
"[RunnerLoop] no run triggered for %.0fs (wall clock) → fallback trigger",
now - last_trigger_time,
)
should_trigger = True
if should_trigger:
logger.warning(
"[RunnerLoop] Triggering runner at block %s (last_trigger_block=%s)",
block,
last_trigger_block,
)
await runner()
gc.collect()
last_trigger_block = block
last_trigger_time = loop.time()
else:
try:
await asyncio.wait_for(
st.wait_for_block(), timeout=WAIT_BLOCK_TIMEOUT
)
except asyncio.TimeoutError:
continue
except (KeyError, ConnectionError, RuntimeError) as err:
logger.warning(
"[RunnerLoop] wait_for_block error (%s); resetting subtensor",
err,
)
reset_subtensor()
st = None
await asyncio.sleep(2.0)
continue
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(
"[RunnerLoop] Error: %s; resetting subtensor and retrying…", e
)
reset_subtensor()
st = None
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=120.0)
except asyncio.TimeoutError:
pass
logger.warning("Runner loop shutting down gracefully...")