|
|
import asyncio |
|
|
from logging import getLogger |
|
|
from time import time |
|
|
from typing import Any |
|
|
from hashlib import sha256 |
|
|
from json import dumps |
|
|
from random import randint |
|
|
from pathlib import Path |
|
|
|
|
|
from aiohttp import ClientResponseError |
|
|
from numpy import ndarray |
|
|
|
|
|
from scorevision.utils.settings import get_settings |
|
|
from scorevision.utils.bittensor_helpers import load_hotkey_keypair |
|
|
from scorevision.utils.signing import build_validator_query_params |
|
|
from scorevision.utils.data_models import SVChallenge |
|
|
from scorevision.utils.async_clients import get_async_client |
|
|
from scorevision.utils.video_processing import download_video_cached, FrameStore |
|
|
from scorevision.utils.image_processing import image_to_base64, pil_from_array |
|
|
from scorevision.chute_template.schemas import SVFrame |
|
|
from scorevision.chute_template.schemas import TVPredictInput |
|
|
from scorevision.vlm_pipeline.domain_specific_schemas.challenge_types import ( |
|
|
parse_challenge_type, |
|
|
ChallengeType, |
|
|
) |
|
|
|
|
|
logger = getLogger(__name__) |
|
|
|
|
|
|
|
|
class ScoreVisionChallengeError(Exception): |
|
|
pass |
|
|
|
|
|
|
|
|
async def get_challenge_from_scorevision() -> tuple[SVChallenge, TVPredictInput]: |
|
|
try: |
|
|
chal_api = await get_next_challenge() |
|
|
except ClientResponseError as e: |
|
|
raise ScoreVisionChallengeError(f"HTTP error while fetching challenge: {e}") |
|
|
except ScoreVisionChallengeError as e: |
|
|
raise e |
|
|
except Exception as e: |
|
|
raise Exception(f"Unexpected error while fetching challenge: {e}") |
|
|
|
|
|
payload, frame_numbers, frames, flows, frame_store = ( |
|
|
await prepare_challenge_payload(challenge=chal_api) |
|
|
) |
|
|
if not payload: |
|
|
if frame_store: |
|
|
frame_store.unlink() |
|
|
raise ScoreVisionChallengeError("Failed to prepare payload from challenge.") |
|
|
|
|
|
|
|
|
prompt = f"ScoreVision video task {chal_api.get('task_id')}" |
|
|
meta = payload.meta | {"seed": chal_api.get("seed", 0)} |
|
|
canonical = { |
|
|
"env": "SVEnv", |
|
|
"prompt": prompt, |
|
|
"extra": {"meta": meta, "n_frames": len(frames)}, |
|
|
} |
|
|
|
|
|
cid = sha256( |
|
|
dumps(canonical, sort_keys=True, separators=(",", ":")).encode() |
|
|
).hexdigest() |
|
|
challenge = SVChallenge( |
|
|
env="SVEnv", |
|
|
payload=payload, |
|
|
meta=meta, |
|
|
prompt=prompt, |
|
|
challenge_id=cid, |
|
|
frame_numbers=frame_numbers, |
|
|
frames=frames, |
|
|
dense_optical_flow_frames=flows, |
|
|
) |
|
|
if frame_store: |
|
|
frame_store.unlink() |
|
|
return challenge, payload |
|
|
|
|
|
|
|
|
async def prepare_challenge_payload( |
|
|
challenge: dict, |
|
|
batch_size: int = 64, |
|
|
*, |
|
|
video_cache: dict[str, Any] | None = None, |
|
|
) -> tuple[TVPredictInput, list[int], list[ndarray], list[ndarray], FrameStore]: |
|
|
settings = get_settings() |
|
|
|
|
|
video_url = challenge.get("video_url") or challenge.get("asset_url") |
|
|
if not video_url: |
|
|
raise ScoreVisionChallengeError("Challenge missing video_url/asset_url") |
|
|
|
|
|
frame_numbers = list( |
|
|
range( |
|
|
settings.SCOREVISION_VIDEO_MIN_FRAME_NUMBER, |
|
|
settings.SCOREVISION_VIDEO_MAX_FRAME_NUMBER, |
|
|
) |
|
|
) |
|
|
|
|
|
start_frame_number = randint( |
|
|
1, |
|
|
settings.SCOREVISION_VIDEO_MAX_FRAME_NUMBER |
|
|
- settings.SCOREVISION_VLM_SELECT_N_FRAMES |
|
|
- 1, |
|
|
) |
|
|
selected_frame_numbers = frame_numbers[ |
|
|
start_frame_number : start_frame_number |
|
|
+ settings.SCOREVISION_VLM_SELECT_N_FRAMES |
|
|
] |
|
|
logger.info(f"Selected Frames for Testing: {selected_frame_numbers}") |
|
|
|
|
|
cached_store: FrameStore | None = None |
|
|
cached_path: Path | None = None |
|
|
if video_cache is not None: |
|
|
cached_store = video_cache.get("store") |
|
|
cached_path = video_cache.get("path") |
|
|
|
|
|
if cached_store is None: |
|
|
video_name, frame_store = await download_video_cached( |
|
|
url=video_url, |
|
|
_frame_numbers=selected_frame_numbers, |
|
|
cached_path=cached_path, |
|
|
) |
|
|
if video_cache is not None: |
|
|
video_cache["store"] = frame_store |
|
|
video_cache["path"] = frame_store.video_path |
|
|
else: |
|
|
frame_store = cached_store |
|
|
|
|
|
select_frames: list[ndarray] = [] |
|
|
flow_frames: list[ndarray] = [] |
|
|
for fn in selected_frame_numbers: |
|
|
frame = await asyncio.to_thread(frame_store.get_frame, fn) |
|
|
select_frames.append(frame) |
|
|
flow = await asyncio.to_thread(frame_store.get_flow, fn) |
|
|
flow_frames.append(flow) |
|
|
|
|
|
logger.info(f"frames {selected_frame_numbers} successful") |
|
|
if not select_frames: |
|
|
raise ScoreVisionChallengeError( |
|
|
"No Frames were successfully extracted from Video" |
|
|
) |
|
|
if not flow_frames: |
|
|
raise ScoreVisionChallengeError( |
|
|
"No Dense Optical Flows were successfully computed from Video" |
|
|
) |
|
|
|
|
|
height, width = select_frames[0].shape[:2] |
|
|
meta = { |
|
|
"version": 1, |
|
|
"width": width or 0, |
|
|
"height": height or 0, |
|
|
"fps": int( |
|
|
challenge.get("fps") or settings.SCOREVISION_VIDEO_FRAMES_PER_SECOND |
|
|
), |
|
|
"task_id": challenge.get("task_id"), |
|
|
"challenge_type": challenge.get("challenge_type"), |
|
|
} |
|
|
if "seed" in challenge: |
|
|
meta["seed"] = challenge["seed"] |
|
|
meta["batch_size"] = batch_size |
|
|
meta["n_keypoints"] = ( |
|
|
32 |
|
|
) |
|
|
payload = TVPredictInput(url=video_url, meta=meta) |
|
|
return ( |
|
|
payload, |
|
|
selected_frame_numbers, |
|
|
select_frames, |
|
|
flow_frames, |
|
|
frame_store, |
|
|
) |
|
|
|
|
|
|
|
|
async def get_next_challenge() -> dict: |
|
|
""" |
|
|
Fetches the next video challenge from ScoreVision API. |
|
|
Returns a dict like: |
|
|
{ |
|
|
"task_id": "...", # we will propagate this end-to-end |
|
|
"video_url": "...", # or "asset_url" |
|
|
"fps": 25|30, # optional (fallback 30) |
|
|
"seed": <int>, # optional |
|
|
... |
|
|
} |
|
|
""" |
|
|
settings = get_settings() |
|
|
|
|
|
if not settings.SCOREVISION_API: |
|
|
raise ScoreVisionChallengeError("SCOREVISION_API is not set.") |
|
|
|
|
|
keypair = load_hotkey_keypair( |
|
|
wallet_name=settings.BITTENSOR_WALLET_COLD, |
|
|
hotkey_name=settings.BITTENSOR_WALLET_HOT, |
|
|
) |
|
|
|
|
|
|
|
|
params = build_validator_query_params(keypair) |
|
|
|
|
|
session = await get_async_client() |
|
|
async with session.get( |
|
|
f"{settings.SCOREVISION_API}/api/tasks/next/v2", params=params |
|
|
) as response: |
|
|
response.raise_for_status() |
|
|
challenge = await response.json() or None |
|
|
if not challenge: |
|
|
raise ScoreVisionChallengeError("No challenge available from API") |
|
|
|
|
|
if "id" in challenge and "task_id" not in challenge: |
|
|
challenge["task_id"] = challenge.pop("id") |
|
|
|
|
|
if not (challenge.get("video_url") or challenge.get("asset_url")): |
|
|
raise ScoreVisionChallengeError("Challenge missing video url.") |
|
|
|
|
|
ct = ( |
|
|
parse_challenge_type(challenge.get("challenge_type")) |
|
|
or ChallengeType.FOOTBALL |
|
|
) |
|
|
challenge["challenge_type"] = ct.value |
|
|
|
|
|
logger.info(f"Fetched challenge: task_id={challenge.get('task_id')}") |
|
|
return challenge |
|
|
|
|
|
|
|
|
def build_svchallenge_from_parts( |
|
|
chal_api: dict, |
|
|
payload: TVPredictInput, |
|
|
frame_numbers: list[int], |
|
|
frames: list[ndarray], |
|
|
flows: list[ndarray], |
|
|
) -> SVChallenge: |
|
|
prompt = f"ScoreVision video task {chal_api.get('task_id')}" |
|
|
meta = payload.meta | {"seed": chal_api.get("seed", 0)} |
|
|
canonical = { |
|
|
"env": "SVEnv", |
|
|
"prompt": prompt, |
|
|
"extra": {"meta": meta, "n_frames": len(frames)}, |
|
|
} |
|
|
cid = sha256( |
|
|
dumps(canonical, sort_keys=True, separators=(",", ":")).encode() |
|
|
).hexdigest() |
|
|
ct = parse_challenge_type(chal_api.get("challenge_type")) |
|
|
return SVChallenge( |
|
|
env="SVEnv", |
|
|
payload=payload, |
|
|
meta=meta, |
|
|
prompt=prompt, |
|
|
challenge_id=cid, |
|
|
frame_numbers=frame_numbers, |
|
|
frames=frames, |
|
|
dense_optical_flow_frames=flows, |
|
|
api_task_id=chal_api.get("task_id"), |
|
|
challenge_type=ct, |
|
|
) |
|
|
|
|
|
|
|
|
async def get_challenge_from_scorevision_with_source( |
|
|
*, |
|
|
video_cache: dict[str, Any] | None = None, |
|
|
) -> tuple[SVChallenge, TVPredictInput, dict, FrameStore]: |
|
|
try: |
|
|
chal_api = await get_next_challenge() |
|
|
except ClientResponseError as e: |
|
|
raise ScoreVisionChallengeError(f"HTTP error while fetching challenge: {e}") |
|
|
except ScoreVisionChallengeError as e: |
|
|
raise e |
|
|
except Exception as e: |
|
|
raise Exception(f"Unexpected error while fetching challenge: {e}") |
|
|
|
|
|
payload, frame_numbers, frames, flows, frame_store = ( |
|
|
await prepare_challenge_payload( |
|
|
challenge=chal_api, |
|
|
video_cache=video_cache, |
|
|
) |
|
|
) |
|
|
if not payload: |
|
|
raise ScoreVisionChallengeError("Failed to prepare payload from challenge.") |
|
|
|
|
|
challenge = build_svchallenge_from_parts( |
|
|
chal_api=chal_api, |
|
|
payload=payload, |
|
|
frame_numbers=frame_numbers, |
|
|
frames=frames, |
|
|
flows=flows, |
|
|
) |
|
|
return challenge, payload, chal_api, frame_store |
|
|
|