File size: 6,192 Bytes
ec6677f
 
 
 
 
 
52b7689
ec6677f
52b7689
 
 
ec6677f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fa9eeff
95613a0
52b7689
ec6677f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from pathlib import Path

from ultralytics import YOLO
from numpy import ndarray
from pydantic import BaseModel

#-----------------------
import importlib.util
import sys
import os 
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

def manual_import(name, filename):
    """
    Manually loads a module (.so, .pyc, or .py) from a specific file path, 
    bypassing sys.meta_path import hooks.
    """
    # Locate the file relative to the current miner.py
    curr_dir = Path(__file__).parent
    file_path = curr_dir / filename
    
    if not file_path.exists():
        raise FileNotFoundError(f"Could not find {file_path}")

    # Load the spec directly from the file path
    spec = importlib.util.spec_from_file_location(name, file_path)
    if spec is None:
        raise ImportError(f"Could not load spec for {name} from {file_path}")
        
    # Create the module and register it in sys.modules
    module = importlib.util.module_from_spec(spec)
    sys.modules[name] = module
    
    # Execute the module
    spec.loader.exec_module(module)
    return module

inference = manual_import("foobar", "foobar.py")
# import foobar
#-----------------------

class BoundingBox(BaseModel):
    x1: int
    y1: int
    x2: int
    y2: int
    cls_id: int
    conf: float


class TVFrameResult(BaseModel):
    frame_id: int
    boxes: list[BoundingBox]
    keypoints: list[tuple[int, int]]


class Miner:
    """
    This class is responsible for:
    - Loading ML models.
    - Running batched predictions on images.
    - Parsing ML model outputs into structured results (TVFrameResult).
    This class can be modified, but it must have the following to be compatible with the chute:
        - be named `Miner`
        - have a `predict_batch` function with the inputs and outputs specified
        - be stored in a file called `miner.py` which lives in the root of the HFHub repo
    """

    def __init__(self, path_hf_repo: Path) -> None:
        """
        Loads all ML models from the repository.
        -----(Adjust as needed)----
        Args:
            path_hf_repo (Path):
                Path to the downloaded HuggingFace Hub repository
        Returns:
            None
        """
        self.bbox_model = YOLO(path_hf_repo / "football-player-detection.pt")
        print(f"✅ BBox Model Loaded")
        self.keypoints_model = YOLO(path_hf_repo / "football-pitch-detection.pt")
        print(f"✅ Keypoints Model Loaded")

    def __repr__(self) -> str:
        """
        Information about miner returned in the health endpoint
        to inspect the loaded ML models (and their types)
        -----(Adjust as needed)----
        """
        return f"BBox Model: {type(self.bbox_model).__name__}\nKeypoints Model: {type(self.keypoints_model).__name__}"

    def predict_batch(
        self,
        batch_images: list[ndarray],
        offset: int,
        n_keypoints: int,
    ) -> list[TVFrameResult]:
        """
        Miner prediction for a batch of images.
        Handles the orchestration of ML models and any preprocessing and postprocessing
        -----(Adjust as needed)----
        Args:
            batch_images (list[np.ndarray]):
                A list of images (as NumPy arrays) to process in this batch.
            offset (int):
                The frame number corresponding to the first image in the batch.
                Used to correctly index frames in the output results.
            n_keypoints (int):
                The number of keypoints expected for each frame in this challenge type.
        Returns:
            list[TVFrameResult]:
                A list of predictions for each image in the batch
        """

        bboxes: dict[int, list[BoundingBox]] = {}
        bbox_model_results = self.bbox_model.predict(batch_images)
        if bbox_model_results is not None:
            for frame_number_in_batch, detection in enumerate(bbox_model_results):
                if not hasattr(detection, "boxes") or detection.boxes is None:
                    continue
                boxes = []
                for box in detection.boxes.data:
                    x1, y1, x2, y2, conf, cls_id = box.tolist()
                    boxes.append(
                        BoundingBox(
                            x1=int(x1),
                            y1=int(y1),
                            x2=int(x2),
                            y2=int(y2),
                            cls_id=int(cls_id),
                            conf=float(conf),
                        )
                    )
                bboxes[offset + frame_number_in_batch] = boxes
        print("✅ BBoxes predicted")

        keypoints: dict[int, tuple[int, int]] = {}
        keypoints_model_results = self.keypoints_model.predict(batch_images)
        if keypoints_model_results is not None:
            for frame_number_in_batch, detection in enumerate(keypoints_model_results):
                if not hasattr(detection, "keypoints") or detection.keypoints is None:
                    continue
                frame_keypoints: list[tuple[int, int]] = []
                for part_points in detection.keypoints.data:
                    for x, y, _ in part_points:
                        frame_keypoints.append((int(x), int(y)))
                if len(frame_keypoints) < n_keypoints:
                    frame_keypoints.extend(
                        [(0, 0)] * (n_keypoints - len(frame_keypoints))
                    )
                else:
                    frame_keypoints = frame_keypoints[:n_keypoints]
                keypoints[offset + frame_number_in_batch] = frame_keypoints
        print("✅ Keypoints predicted")

        results: list[TVFrameResult] = []
        for frame_number in range(offset, offset + len(batch_images)):
            results.append(
                TVFrameResult(
                    frame_id=frame_number,
                    boxes=bboxes.get(frame_number, []),
                    keypoints=keypoints.get(
                        frame_number, [(0, 0) for _ in range(n_keypoints)]
                    ),
                )
            )
        print("✅ Combined results as TVFrameResult")
        return results