import asyncio from logging import getLogger from time import time from typing import Any from hashlib import sha256 from json import dumps from random import randint from pathlib import Path from aiohttp import ClientResponseError from numpy import ndarray from scorevision.utils.settings import get_settings from scorevision.utils.bittensor_helpers import load_hotkey_keypair from scorevision.utils.signing import build_validator_query_params from scorevision.utils.data_models import SVChallenge from scorevision.utils.async_clients import get_async_client from scorevision.utils.video_processing import download_video_cached, FrameStore from scorevision.utils.image_processing import image_to_base64, pil_from_array from scorevision.chute_template.schemas import SVFrame from scorevision.chute_template.schemas import TVPredictInput from scorevision.vlm_pipeline.domain_specific_schemas.challenge_types import ( parse_challenge_type, ChallengeType, ) logger = getLogger(__name__) class ScoreVisionChallengeError(Exception): pass async def get_challenge_from_scorevision() -> tuple[SVChallenge, TVPredictInput]: try: chal_api = await get_next_challenge() except ClientResponseError as e: raise ScoreVisionChallengeError(f"HTTP error while fetching challenge: {e}") except ScoreVisionChallengeError as e: raise e except Exception as e: raise Exception(f"Unexpected error while fetching challenge: {e}") payload, frame_numbers, frames, flows, frame_store = ( await prepare_challenge_payload(challenge=chal_api) ) if not payload: if frame_store: frame_store.unlink() raise ScoreVisionChallengeError("Failed to prepare payload from challenge.") # SVChallenge prompt = f"ScoreVision video task {chal_api.get('task_id')}" meta = payload.meta | {"seed": chal_api.get("seed", 0)} canonical = { "env": "SVEnv", "prompt": prompt, "extra": {"meta": meta, "n_frames": len(frames)}, } cid = sha256( dumps(canonical, sort_keys=True, separators=(",", ":")).encode() ).hexdigest() challenge = SVChallenge( env="SVEnv", payload=payload, meta=meta, prompt=prompt, challenge_id=cid, frame_numbers=frame_numbers, frames=frames, dense_optical_flow_frames=flows, ) if frame_store: frame_store.unlink() return challenge, payload async def prepare_challenge_payload( challenge: dict, batch_size: int = 64, *, video_cache: dict[str, Any] | None = None, ) -> tuple[TVPredictInput, list[int], list[ndarray], list[ndarray], FrameStore]: settings = get_settings() video_url = challenge.get("video_url") or challenge.get("asset_url") if not video_url: raise ScoreVisionChallengeError("Challenge missing video_url/asset_url") frame_numbers = list( range( settings.SCOREVISION_VIDEO_MIN_FRAME_NUMBER, settings.SCOREVISION_VIDEO_MAX_FRAME_NUMBER, ) ) # shuffle(frame_numbers) start_frame_number = randint( 1, settings.SCOREVISION_VIDEO_MAX_FRAME_NUMBER - settings.SCOREVISION_VLM_SELECT_N_FRAMES - 1, ) selected_frame_numbers = frame_numbers[ start_frame_number : start_frame_number + settings.SCOREVISION_VLM_SELECT_N_FRAMES ] logger.info(f"Selected Frames for Testing: {selected_frame_numbers}") cached_store: FrameStore | None = None cached_path: Path | None = None if video_cache is not None: cached_store = video_cache.get("store") cached_path = video_cache.get("path") if cached_store is None: video_name, frame_store = await download_video_cached( url=video_url, _frame_numbers=selected_frame_numbers, cached_path=cached_path, ) if video_cache is not None: video_cache["store"] = frame_store video_cache["path"] = frame_store.video_path else: frame_store = cached_store select_frames: list[ndarray] = [] flow_frames: list[ndarray] = [] for fn in selected_frame_numbers: frame = await asyncio.to_thread(frame_store.get_frame, fn) select_frames.append(frame) flow = await asyncio.to_thread(frame_store.get_flow, fn) flow_frames.append(flow) logger.info(f"frames {selected_frame_numbers} successful") if not select_frames: raise ScoreVisionChallengeError( "No Frames were successfully extracted from Video" ) if not flow_frames: raise ScoreVisionChallengeError( "No Dense Optical Flows were successfully computed from Video" ) height, width = select_frames[0].shape[:2] meta = { "version": 1, "width": width or 0, "height": height or 0, "fps": int( challenge.get("fps") or settings.SCOREVISION_VIDEO_FRAMES_PER_SECOND ), "task_id": challenge.get("task_id"), "challenge_type": challenge.get("challenge_type"), } if "seed" in challenge: meta["seed"] = challenge["seed"] meta["batch_size"] = batch_size meta["n_keypoints"] = ( 32 # TODO: update n_keypoints based on challenge type (32 is for football) ) payload = TVPredictInput(url=video_url, meta=meta) return ( payload, selected_frame_numbers, select_frames, flow_frames, frame_store, ) async def get_next_challenge() -> dict: """ Fetches the next video challenge from ScoreVision API. Returns a dict like: { "task_id": "...", # we will propagate this end-to-end "video_url": "...", # or "asset_url" "fps": 25|30, # optional (fallback 30) "seed": , # optional ... } """ settings = get_settings() if not settings.SCOREVISION_API: raise ScoreVisionChallengeError("SCOREVISION_API is not set.") keypair = load_hotkey_keypair( wallet_name=settings.BITTENSOR_WALLET_COLD, hotkey_name=settings.BITTENSOR_WALLET_HOT, ) # Build query parameters required to authenticate with the validator API params = build_validator_query_params(keypair) session = await get_async_client() async with session.get( f"{settings.SCOREVISION_API}/api/tasks/next/v2", params=params ) as response: response.raise_for_status() challenge = await response.json() or None if not challenge: raise ScoreVisionChallengeError("No challenge available from API") if "id" in challenge and "task_id" not in challenge: challenge["task_id"] = challenge.pop("id") if not (challenge.get("video_url") or challenge.get("asset_url")): raise ScoreVisionChallengeError("Challenge missing video url.") ct = ( parse_challenge_type(challenge.get("challenge_type")) or ChallengeType.FOOTBALL ) challenge["challenge_type"] = ct.value logger.info(f"Fetched challenge: task_id={challenge.get('task_id')}") return challenge def build_svchallenge_from_parts( chal_api: dict, payload: TVPredictInput, frame_numbers: list[int], frames: list[ndarray], flows: list[ndarray], ) -> SVChallenge: prompt = f"ScoreVision video task {chal_api.get('task_id')}" meta = payload.meta | {"seed": chal_api.get("seed", 0)} canonical = { "env": "SVEnv", "prompt": prompt, "extra": {"meta": meta, "n_frames": len(frames)}, } cid = sha256( dumps(canonical, sort_keys=True, separators=(",", ":")).encode() ).hexdigest() ct = parse_challenge_type(chal_api.get("challenge_type")) return SVChallenge( env="SVEnv", payload=payload, meta=meta, prompt=prompt, challenge_id=cid, frame_numbers=frame_numbers, frames=frames, dense_optical_flow_frames=flows, api_task_id=chal_api.get("task_id"), challenge_type=ct, ) async def get_challenge_from_scorevision_with_source( *, video_cache: dict[str, Any] | None = None, ) -> tuple[SVChallenge, TVPredictInput, dict, FrameStore]: try: chal_api = await get_next_challenge() except ClientResponseError as e: raise ScoreVisionChallengeError(f"HTTP error while fetching challenge: {e}") except ScoreVisionChallengeError as e: raise e except Exception as e: raise Exception(f"Unexpected error while fetching challenge: {e}") payload, frame_numbers, frames, flows, frame_store = ( await prepare_challenge_payload( challenge=chal_api, video_cache=video_cache, ) ) if not payload: raise ScoreVisionChallengeError("Failed to prepare payload from challenge.") challenge = build_svchallenge_from_parts( chal_api=chal_api, payload=payload, frame_numbers=frame_numbers, frames=frames, flows=flows, ) return challenge, payload, chal_api, frame_store