File size: 6,192 Bytes
ec6677f 52b7689 ec6677f 52b7689 ec6677f fa9eeff 95613a0 52b7689 ec6677f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
from pathlib import Path
from ultralytics import YOLO
from numpy import ndarray
from pydantic import BaseModel
#-----------------------
import importlib.util
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
def manual_import(name, filename):
"""
Manually loads a module (.so, .pyc, or .py) from a specific file path,
bypassing sys.meta_path import hooks.
"""
# Locate the file relative to the current miner.py
curr_dir = Path(__file__).parent
file_path = curr_dir / filename
if not file_path.exists():
raise FileNotFoundError(f"Could not find {file_path}")
# Load the spec directly from the file path
spec = importlib.util.spec_from_file_location(name, file_path)
if spec is None:
raise ImportError(f"Could not load spec for {name} from {file_path}")
# Create the module and register it in sys.modules
module = importlib.util.module_from_spec(spec)
sys.modules[name] = module
# Execute the module
spec.loader.exec_module(module)
return module
inference = manual_import("foobar", "foobar.py")
# import foobar
#-----------------------
class BoundingBox(BaseModel):
x1: int
y1: int
x2: int
y2: int
cls_id: int
conf: float
class TVFrameResult(BaseModel):
frame_id: int
boxes: list[BoundingBox]
keypoints: list[tuple[int, int]]
class Miner:
"""
This class is responsible for:
- Loading ML models.
- Running batched predictions on images.
- Parsing ML model outputs into structured results (TVFrameResult).
This class can be modified, but it must have the following to be compatible with the chute:
- be named `Miner`
- have a `predict_batch` function with the inputs and outputs specified
- be stored in a file called `miner.py` which lives in the root of the HFHub repo
"""
def __init__(self, path_hf_repo: Path) -> None:
"""
Loads all ML models from the repository.
-----(Adjust as needed)----
Args:
path_hf_repo (Path):
Path to the downloaded HuggingFace Hub repository
Returns:
None
"""
self.bbox_model = YOLO(path_hf_repo / "football-player-detection.pt")
print(f"✅ BBox Model Loaded")
self.keypoints_model = YOLO(path_hf_repo / "football-pitch-detection.pt")
print(f"✅ Keypoints Model Loaded")
def __repr__(self) -> str:
"""
Information about miner returned in the health endpoint
to inspect the loaded ML models (and their types)
-----(Adjust as needed)----
"""
return f"BBox Model: {type(self.bbox_model).__name__}\nKeypoints Model: {type(self.keypoints_model).__name__}"
def predict_batch(
self,
batch_images: list[ndarray],
offset: int,
n_keypoints: int,
) -> list[TVFrameResult]:
"""
Miner prediction for a batch of images.
Handles the orchestration of ML models and any preprocessing and postprocessing
-----(Adjust as needed)----
Args:
batch_images (list[np.ndarray]):
A list of images (as NumPy arrays) to process in this batch.
offset (int):
The frame number corresponding to the first image in the batch.
Used to correctly index frames in the output results.
n_keypoints (int):
The number of keypoints expected for each frame in this challenge type.
Returns:
list[TVFrameResult]:
A list of predictions for each image in the batch
"""
bboxes: dict[int, list[BoundingBox]] = {}
bbox_model_results = self.bbox_model.predict(batch_images)
if bbox_model_results is not None:
for frame_number_in_batch, detection in enumerate(bbox_model_results):
if not hasattr(detection, "boxes") or detection.boxes is None:
continue
boxes = []
for box in detection.boxes.data:
x1, y1, x2, y2, conf, cls_id = box.tolist()
boxes.append(
BoundingBox(
x1=int(x1),
y1=int(y1),
x2=int(x2),
y2=int(y2),
cls_id=int(cls_id),
conf=float(conf),
)
)
bboxes[offset + frame_number_in_batch] = boxes
print("✅ BBoxes predicted")
keypoints: dict[int, tuple[int, int]] = {}
keypoints_model_results = self.keypoints_model.predict(batch_images)
if keypoints_model_results is not None:
for frame_number_in_batch, detection in enumerate(keypoints_model_results):
if not hasattr(detection, "keypoints") or detection.keypoints is None:
continue
frame_keypoints: list[tuple[int, int]] = []
for part_points in detection.keypoints.data:
for x, y, _ in part_points:
frame_keypoints.append((int(x), int(y)))
if len(frame_keypoints) < n_keypoints:
frame_keypoints.extend(
[(0, 0)] * (n_keypoints - len(frame_keypoints))
)
else:
frame_keypoints = frame_keypoints[:n_keypoints]
keypoints[offset + frame_number_in_batch] = frame_keypoints
print("✅ Keypoints predicted")
results: list[TVFrameResult] = []
for frame_number in range(offset, offset + len(batch_images)):
results.append(
TVFrameResult(
frame_id=frame_number,
boxes=bboxes.get(frame_number, []),
keypoints=keypoints.get(
frame_number, [(0, 0) for _ in range(n_keypoints)]
),
)
)
print("✅ Combined results as TVFrameResult")
return results |