import cv2 import numpy as np from collections import defaultdict class VideoTracker: def __init__(self, video_path, detection_method='background'): self.video_path = video_path self.detection_method = detection_method self.trajectories = defaultdict(list) def detect_objects_background(self, frame, bg_subtractor): """Detect moving objects using background subtraction""" fg_mask = bg_subtractor.apply(frame) fg_mask = cv2.GaussianBlur(fg_mask, (21, 21), 0) _, thresh = cv2.threshold(fg_mask, 25, 255, cv2.THRESH_BINARY) # Find contours contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) detections = [] for contour in contours: area = cv2.contourArea(contour) if area > 500: # Filter small objects x, y, w, h = cv2.boundingRect(contour) center_x = x + w // 2 center_y = y + h // 2 detections.append({ 'bbox': [x, y, w, h], 'center': [center_x, center_y], 'area': area }) return detections def detect_objects_color(self, frame, color_range): """Detect objects by color (example: detecting red objects)""" hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) # Default: detect red objects lower = np.array([0, 100, 100]) upper = np.array([10, 255, 255]) mask = cv2.inRange(hsv, lower, upper) mask = cv2.GaussianBlur(mask, (21, 21), 0) contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) detections = [] for contour in contours: area = cv2.contourArea(contour) if area > 300: x, y, w, h = cv2.boundingRect(contour) center_x = x + w // 2 center_y = y + h // 2 detections.append({ 'bbox': [x, y, w, h], 'center': [center_x, center_y], 'area': area }) return detections def match_detections(self, prev_objects, curr_detections, max_distance=50): """Match current detections with previous objects""" matched = [] unmatched_detections = list(range(len(curr_detections))) for obj_id, prev_pos in prev_objects.items(): if not prev_pos: continue last_pos = prev_pos[-1]['center'] min_dist = float('inf') best_match = None for idx in unmatched_detections: curr_pos = curr_detections[idx]['center'] dist = np.sqrt((curr_pos[0] - last_pos[0])**2 + (curr_pos[1] - last_pos[1])**2) if dist < min_dist and dist < max_distance: min_dist = dist best_match = idx if best_match is not None: matched.append((obj_id, best_match)) unmatched_detections.remove(best_match) return matched, unmatched_detections def process_video(self): """Process video and extract trajectories""" cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): raise Exception("Could not open video file") fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) bg_subtractor = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=16) tracked_objects = {} next_id = 0 frame_idx = 0 while True: ret, frame = cap.read() if not ret: break # Detect objects if self.detection_method == 'background': detections = self.detect_objects_background(frame, bg_subtractor) else: detections = self.detect_objects_color(frame, None) # Match with existing objects matched, unmatched = self.match_detections(tracked_objects, detections) # Update matched objects for obj_id, det_idx in matched: detection = detections[det_idx] tracked_objects[obj_id].append({ 'frame': frame_idx, 'time': frame_idx / fps, 'center': detection['center'], 'bbox': detection['bbox'] }) # Create new objects for unmatched detections for det_idx in unmatched: detection = detections[det_idx] tracked_objects[next_id] = [{ 'frame': frame_idx, 'time': frame_idx / fps, 'center': detection['center'], 'bbox': detection['bbox'] }] next_id += 1 frame_idx += 1 cap.release() # Convert to 3D trajectories (add z-axis based on object size/distance) trajectories = [] for obj_id, points in tracked_objects.items(): if len(points) > 5: # Filter short trajectories trajectory = { 'id': obj_id, 'points': [] } for point in points: # Normalize coordinates to [-1, 1] x = (point['center'][0] / width) * 2 - 1 y = -((point['center'][1] / height) * 2 - 1) # Flip y # Estimate z based on object size (larger = closer) bbox_area = point['bbox'][2] * point['bbox'][3] z = (bbox_area / (width * height)) * 2 - 0.5 trajectory['points'].append({ 'x': x, 'y': y, 'z': z, 'time': point['time'], 'frame': point['frame'] }) trajectories.append(trajectory) return { 'trajectories': trajectories, 'metadata': { 'fps': fps, 'frame_count': frame_count, 'width': width, 'height': height, 'num_objects': len(trajectories) } }