File size: 9,206 Bytes
f60a6c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
import asyncio
from logging import getLogger
from time import time
from typing import Any
from hashlib import sha256
from json import dumps
from random import randint
from pathlib import Path

from aiohttp import ClientResponseError
from numpy import ndarray

from scorevision.utils.settings import get_settings
from scorevision.utils.bittensor_helpers import load_hotkey_keypair
from scorevision.utils.signing import build_validator_query_params
from scorevision.utils.data_models import SVChallenge
from scorevision.utils.async_clients import get_async_client
from scorevision.utils.video_processing import download_video_cached, FrameStore
from scorevision.utils.image_processing import image_to_base64, pil_from_array
from scorevision.chute_template.schemas import SVFrame
from scorevision.chute_template.schemas import TVPredictInput
from scorevision.vlm_pipeline.domain_specific_schemas.challenge_types import (
    parse_challenge_type,
    ChallengeType,
)

logger = getLogger(__name__)


class ScoreVisionChallengeError(Exception):
    pass


async def get_challenge_from_scorevision() -> tuple[SVChallenge, TVPredictInput]:
    try:
        chal_api = await get_next_challenge()
    except ClientResponseError as e:
        raise ScoreVisionChallengeError(f"HTTP error while fetching challenge: {e}")
    except ScoreVisionChallengeError as e:
        raise e
    except Exception as e:
        raise Exception(f"Unexpected error while fetching challenge: {e}")

    payload, frame_numbers, frames, flows, frame_store = (
        await prepare_challenge_payload(challenge=chal_api)
    )
    if not payload:
        if frame_store:
            frame_store.unlink()
        raise ScoreVisionChallengeError("Failed to prepare payload from challenge.")

    # SVChallenge
    prompt = f"ScoreVision video task {chal_api.get('task_id')}"
    meta = payload.meta | {"seed": chal_api.get("seed", 0)}
    canonical = {
        "env": "SVEnv",
        "prompt": prompt,
        "extra": {"meta": meta, "n_frames": len(frames)},
    }

    cid = sha256(
        dumps(canonical, sort_keys=True, separators=(",", ":")).encode()
    ).hexdigest()
    challenge = SVChallenge(
        env="SVEnv",
        payload=payload,
        meta=meta,
        prompt=prompt,
        challenge_id=cid,
        frame_numbers=frame_numbers,
        frames=frames,
        dense_optical_flow_frames=flows,
    )
    if frame_store:
        frame_store.unlink()
    return challenge, payload


async def prepare_challenge_payload(
    challenge: dict,
    batch_size: int = 64,
    *,
    video_cache: dict[str, Any] | None = None,
) -> tuple[TVPredictInput, list[int], list[ndarray], list[ndarray], FrameStore]:
    settings = get_settings()

    video_url = challenge.get("video_url") or challenge.get("asset_url")
    if not video_url:
        raise ScoreVisionChallengeError("Challenge missing video_url/asset_url")

    frame_numbers = list(
        range(
            settings.SCOREVISION_VIDEO_MIN_FRAME_NUMBER,
            settings.SCOREVISION_VIDEO_MAX_FRAME_NUMBER,
        )
    )
    # shuffle(frame_numbers)
    start_frame_number = randint(
        1,
        settings.SCOREVISION_VIDEO_MAX_FRAME_NUMBER
        - settings.SCOREVISION_VLM_SELECT_N_FRAMES
        - 1,
    )
    selected_frame_numbers = frame_numbers[
        start_frame_number : start_frame_number
        + settings.SCOREVISION_VLM_SELECT_N_FRAMES
    ]
    logger.info(f"Selected Frames for Testing: {selected_frame_numbers}")

    cached_store: FrameStore | None = None
    cached_path: Path | None = None
    if video_cache is not None:
        cached_store = video_cache.get("store")
        cached_path = video_cache.get("path")

    if cached_store is None:
        video_name, frame_store = await download_video_cached(
            url=video_url,
            _frame_numbers=selected_frame_numbers,
            cached_path=cached_path,
        )
        if video_cache is not None:
            video_cache["store"] = frame_store
            video_cache["path"] = frame_store.video_path
    else:
        frame_store = cached_store

    select_frames: list[ndarray] = []
    flow_frames: list[ndarray] = []
    for fn in selected_frame_numbers:
        frame = await asyncio.to_thread(frame_store.get_frame, fn)
        select_frames.append(frame)
        flow = await asyncio.to_thread(frame_store.get_flow, fn)
        flow_frames.append(flow)

    logger.info(f"frames {selected_frame_numbers} successful")
    if not select_frames:
        raise ScoreVisionChallengeError(
            "No Frames were successfully extracted from Video"
        )
    if not flow_frames:
        raise ScoreVisionChallengeError(
            "No Dense Optical Flows were successfully computed from Video"
        )

    height, width = select_frames[0].shape[:2]
    meta = {
        "version": 1,
        "width": width or 0,
        "height": height or 0,
        "fps": int(
            challenge.get("fps") or settings.SCOREVISION_VIDEO_FRAMES_PER_SECOND
        ),
        "task_id": challenge.get("task_id"),
        "challenge_type": challenge.get("challenge_type"),
    }
    if "seed" in challenge:
        meta["seed"] = challenge["seed"]
    meta["batch_size"] = batch_size
    meta["n_keypoints"] = (
        32  # TODO: update n_keypoints based on challenge type (32 is for football)
    )
    payload = TVPredictInput(url=video_url, meta=meta)
    return (
        payload,
        selected_frame_numbers,
        select_frames,
        flow_frames,
        frame_store,
    )


async def get_next_challenge() -> dict:
    """
    Fetches the next video challenge from ScoreVision API.
    Returns a dict like:
      {
        "task_id": "...",     # we will propagate this end-to-end
        "video_url": "...",   # or "asset_url"
        "fps": 25|30,         # optional (fallback 30)
        "seed": <int>,        # optional
        ...
      }
    """
    settings = get_settings()

    if not settings.SCOREVISION_API:
        raise ScoreVisionChallengeError("SCOREVISION_API is not set.")

    keypair = load_hotkey_keypair(
        wallet_name=settings.BITTENSOR_WALLET_COLD,
        hotkey_name=settings.BITTENSOR_WALLET_HOT,
    )

    # Build query parameters required to authenticate with the validator API
    params = build_validator_query_params(keypair)

    session = await get_async_client()
    async with session.get(
        f"{settings.SCOREVISION_API}/api/tasks/next/v2", params=params
    ) as response:
        response.raise_for_status()
        challenge = await response.json() or None
        if not challenge:
            raise ScoreVisionChallengeError("No challenge available from API")

        if "id" in challenge and "task_id" not in challenge:
            challenge["task_id"] = challenge.pop("id")

        if not (challenge.get("video_url") or challenge.get("asset_url")):
            raise ScoreVisionChallengeError("Challenge missing video url.")

        ct = (
            parse_challenge_type(challenge.get("challenge_type"))
            or ChallengeType.FOOTBALL
        )
        challenge["challenge_type"] = ct.value

        logger.info(f"Fetched challenge: task_id={challenge.get('task_id')}")
        return challenge


def build_svchallenge_from_parts(
    chal_api: dict,
    payload: TVPredictInput,
    frame_numbers: list[int],
    frames: list[ndarray],
    flows: list[ndarray],
) -> SVChallenge:
    prompt = f"ScoreVision video task {chal_api.get('task_id')}"
    meta = payload.meta | {"seed": chal_api.get("seed", 0)}
    canonical = {
        "env": "SVEnv",
        "prompt": prompt,
        "extra": {"meta": meta, "n_frames": len(frames)},
    }
    cid = sha256(
        dumps(canonical, sort_keys=True, separators=(",", ":")).encode()
    ).hexdigest()
    ct = parse_challenge_type(chal_api.get("challenge_type"))
    return SVChallenge(
        env="SVEnv",
        payload=payload,
        meta=meta,
        prompt=prompt,
        challenge_id=cid,
        frame_numbers=frame_numbers,
        frames=frames,
        dense_optical_flow_frames=flows,
        api_task_id=chal_api.get("task_id"),
        challenge_type=ct,
    )


async def get_challenge_from_scorevision_with_source(
    *,
    video_cache: dict[str, Any] | None = None,
) -> tuple[SVChallenge, TVPredictInput, dict, FrameStore]:
    try:
        chal_api = await get_next_challenge()
    except ClientResponseError as e:
        raise ScoreVisionChallengeError(f"HTTP error while fetching challenge: {e}")
    except ScoreVisionChallengeError as e:
        raise e
    except Exception as e:
        raise Exception(f"Unexpected error while fetching challenge: {e}")

    payload, frame_numbers, frames, flows, frame_store = (
        await prepare_challenge_payload(
            challenge=chal_api,
            video_cache=video_cache,
        )
    )
    if not payload:
        raise ScoreVisionChallengeError("Failed to prepare payload from challenge.")

    challenge = build_svchallenge_from_parts(
        chal_api=chal_api,
        payload=payload,
        frame_numbers=frame_numbers,
        frames=frames,
        flows=flows,
    )
    return challenge, payload, chal_api, frame_store