import os import cv2 import numpy as np import logging from ultralytics import YOLO from pathlib import Path logger = logging.getLogger(__name__) class MultiTracker: def __init__(self): logger.info("Initializing Multi Tracker (Group Centroid)") # Determine paths # Get the directory containing the current script (multi_tracker.py) base_dir = Path(__file__).parent.parent # Model folder is now inside backend/ detector_model_path = base_dir / "Model" / "yolov8n-face.pt" print(detector_model_path,"de") try: self.model = YOLO(detector_model_path) logger.info(f"Loaded YOLO model from {detector_model_path}") except Exception as e: logger.error(f"Failed to load YOLO model: {e}") self.model = None def process_frame(self, frame): """ Process a single BGR image frame for group object tracking. Returns a dictionary with tracking results (individual boxes + aggregate box). """ results_data = { "individual_boxes": [], "aggregate_box": None, "centroid": None, "error": None, "frame_width": int(frame.shape[1]), "frame_height": int(frame.shape[0]) } if self.model is None: results_data["error"] = "Model not initialized" return results_data try: # RUN BYTETRACK (Detection + Tracking) results = self.model.track(frame, persist=True, tracker="bytetrack.yaml", verbose=False) if results and len(results) > 0 and results[0].boxes.id is not None: boxes = results[0].boxes.xyxy.cpu().numpy().astype(int) track_ids = results[0].boxes.id.cpu().numpy().astype(int) all_x1, all_y1, all_x2, all_y2 = [], [], [], [] for box, track_id in zip(boxes, track_ids): x1, y1, x2, y2 = box.tolist() all_x1.append(x1) all_y1.append(y1) all_x2.append(x2) all_y2.append(y2) results_data["individual_boxes"].append({ "id": int(track_id), "x1": int(x1), "y1": int(y1), "x2": int(x2), "y2": int(y2) }) # Calculate Aggregate Bounding Box if faces exist if len(all_x1) > 0: agg_x1 = int(min(all_x1)) agg_y1 = int(min(all_y1)) agg_x2 = int(max(all_x2)) agg_y2 = int(max(all_y2)) # Aggregate Centroid agg_cx = (agg_x1 + agg_x2) // 2 agg_cy = (agg_y1 + agg_y2) // 2 results_data["aggregate_box"] = { "x1": agg_x1, "y1": agg_y1, "x2": agg_x2, "y2": agg_y2 } results_data["centroid"] = { "cx": agg_cx, "cy": agg_cy } except Exception as e: logger.error(f"Error during ByteTrack: {e}") results_data["error"] = str(e) return results_data