| import os |
| import sys |
| from pathlib import Path |
| from typing import List, Tuple |
|
|
| from numpy import ndarray |
| from pydantic import BaseModel |
| sys.path.append(os.path.dirname(os.path.abspath(__file__))) |
|
|
| |
| import importlib.util |
| from pathlib import Path |
|
|
| def manual_import(name, filename): |
| curr_dir = Path(__file__).parent |
| file_path = curr_dir / filename |
| |
| if not file_path.exists(): |
| raise FileNotFoundError(f"Could not find {file_path}") |
|
|
| spec = importlib.util.spec_from_file_location(name, file_path) |
| if spec is None: |
| raise ImportError(f"Could not load spec for {name} from {file_path}") |
| |
| module = importlib.util.module_from_spec(spec) |
| |
| spec.loader.exec_module(module) |
| return module |
|
|
|
|
| def manual_import_pipeline(name: str): |
| """ |
| Manual import for pipeline module, bypassing sys.meta_path. |
| Tries .py first (development), then .so (production). |
| """ |
| curr_dir = Path(__file__).parent |
| last_error = None |
| |
| |
| py_path = curr_dir / f"{name}.py" |
| if py_path.is_file(): |
| try: |
| return manual_import(name, f"{name}.py") |
| except Exception as e: |
| last_error = e |
| |
| |
| so_candidates = sorted(curr_dir.glob(f"{name}*.so")) |
| if so_candidates: |
| so_filename = so_candidates[0].name |
| try: |
| return manual_import(name, so_filename) |
| except Exception as e: |
| last_error = e |
| |
| |
| if last_error: |
| raise ImportError(f"Could not import {name} from {curr_dir}: {last_error}") from last_error |
| raise ImportError(f"Could not find {name}.py or {name}*.so in {curr_dir}") |
|
|
| class BoundingBox(BaseModel): |
| x1: int |
| y1: int |
| x2: int |
| y2: int |
| cls_id: int |
| conf: float |
| team_id: str | None = None |
|
|
|
|
| class TVFrameResult(BaseModel): |
| frame_id: int |
| boxes: List[BoundingBox] |
| keypoints: List[Tuple[float, float]] |
|
|
|
|
| class Miner: |
| def __init__(self, path_hf_repo: Path) -> None: |
| print("model laoding") |
| self.health = 'Okay-turbo-16' |
| self.pipeline = None |
| self.path_hf_repo = path_hf_repo |
| self.is_start = False |
|
|
| def __repr__(self) -> str: |
| return self.health |
|
|
| def predict_batch(self, batch_images: List[ndarray], offset: int, n_keypoints: int) -> List[TVFrameResult]: |
| if self.is_start == False: |
| self.is_start = True |
| return [] |
| if self.pipeline is None: |
| pipeline_module = manual_import_pipeline("pipeline") |
| self.pipeline = pipeline_module.MinerPipeline(repo_root=self.path_hf_repo) |
| results = self.pipeline.predict_batch( |
| batch_images, |
| offset, |
| n_keypoints, |
| ) |
| return results |