Spaces:
Runtime error
Runtime error
| import cv2 | |
| import numpy as np | |
| from collections import defaultdict | |
| class VideoTracker: | |
| def __init__(self, video_path, detection_method='background'): | |
| self.video_path = video_path | |
| self.detection_method = detection_method | |
| self.trajectories = defaultdict(list) | |
| def detect_objects_background(self, frame, bg_subtractor): | |
| """Detect moving objects using background subtraction""" | |
| fg_mask = bg_subtractor.apply(frame) | |
| fg_mask = cv2.GaussianBlur(fg_mask, (21, 21), 0) | |
| _, thresh = cv2.threshold(fg_mask, 25, 255, cv2.THRESH_BINARY) | |
| # Find contours | |
| contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| detections = [] | |
| for contour in contours: | |
| area = cv2.contourArea(contour) | |
| if area > 500: # Filter small objects | |
| x, y, w, h = cv2.boundingRect(contour) | |
| center_x = x + w // 2 | |
| center_y = y + h // 2 | |
| detections.append({ | |
| 'bbox': [x, y, w, h], | |
| 'center': [center_x, center_y], | |
| 'area': area | |
| }) | |
| return detections | |
| def detect_objects_color(self, frame, color_range): | |
| """Detect objects by color (example: detecting red objects)""" | |
| hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) | |
| # Default: detect red objects | |
| lower = np.array([0, 100, 100]) | |
| upper = np.array([10, 255, 255]) | |
| mask = cv2.inRange(hsv, lower, upper) | |
| mask = cv2.GaussianBlur(mask, (21, 21), 0) | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| detections = [] | |
| for contour in contours: | |
| area = cv2.contourArea(contour) | |
| if area > 300: | |
| x, y, w, h = cv2.boundingRect(contour) | |
| center_x = x + w // 2 | |
| center_y = y + h // 2 | |
| detections.append({ | |
| 'bbox': [x, y, w, h], | |
| 'center': [center_x, center_y], | |
| 'area': area | |
| }) | |
| return detections | |
| def match_detections(self, prev_objects, curr_detections, max_distance=50): | |
| """Match current detections with previous objects""" | |
| matched = [] | |
| unmatched_detections = list(range(len(curr_detections))) | |
| for obj_id, prev_pos in prev_objects.items(): | |
| if not prev_pos: | |
| continue | |
| last_pos = prev_pos[-1]['center'] | |
| min_dist = float('inf') | |
| best_match = None | |
| for idx in unmatched_detections: | |
| curr_pos = curr_detections[idx]['center'] | |
| dist = np.sqrt((curr_pos[0] - last_pos[0])**2 + | |
| (curr_pos[1] - last_pos[1])**2) | |
| if dist < min_dist and dist < max_distance: | |
| min_dist = dist | |
| best_match = idx | |
| if best_match is not None: | |
| matched.append((obj_id, best_match)) | |
| unmatched_detections.remove(best_match) | |
| return matched, unmatched_detections | |
| def process_video(self): | |
| """Process video and extract trajectories""" | |
| cap = cv2.VideoCapture(self.video_path) | |
| if not cap.isOpened(): | |
| raise Exception("Could not open video file") | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| bg_subtractor = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=16) | |
| tracked_objects = {} | |
| next_id = 0 | |
| frame_idx = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Detect objects | |
| if self.detection_method == 'background': | |
| detections = self.detect_objects_background(frame, bg_subtractor) | |
| else: | |
| detections = self.detect_objects_color(frame, None) | |
| # Match with existing objects | |
| matched, unmatched = self.match_detections(tracked_objects, detections) | |
| # Update matched objects | |
| for obj_id, det_idx in matched: | |
| detection = detections[det_idx] | |
| tracked_objects[obj_id].append({ | |
| 'frame': frame_idx, | |
| 'time': frame_idx / fps, | |
| 'center': detection['center'], | |
| 'bbox': detection['bbox'] | |
| }) | |
| # Create new objects for unmatched detections | |
| for det_idx in unmatched: | |
| detection = detections[det_idx] | |
| tracked_objects[next_id] = [{ | |
| 'frame': frame_idx, | |
| 'time': frame_idx / fps, | |
| 'center': detection['center'], | |
| 'bbox': detection['bbox'] | |
| }] | |
| next_id += 1 | |
| frame_idx += 1 | |
| cap.release() | |
| # Convert to 3D trajectories (add z-axis based on object size/distance) | |
| trajectories = [] | |
| for obj_id, points in tracked_objects.items(): | |
| if len(points) > 5: # Filter short trajectories | |
| trajectory = { | |
| 'id': obj_id, | |
| 'points': [] | |
| } | |
| for point in points: | |
| # Normalize coordinates to [-1, 1] | |
| x = (point['center'][0] / width) * 2 - 1 | |
| y = -((point['center'][1] / height) * 2 - 1) # Flip y | |
| # Estimate z based on object size (larger = closer) | |
| bbox_area = point['bbox'][2] * point['bbox'][3] | |
| z = (bbox_area / (width * height)) * 2 - 0.5 | |
| trajectory['points'].append({ | |
| 'x': x, | |
| 'y': y, | |
| 'z': z, | |
| 'time': point['time'], | |
| 'frame': point['frame'] | |
| }) | |
| trajectories.append(trajectory) | |
| return { | |
| 'trajectories': trajectories, | |
| 'metadata': { | |
| 'fps': fps, | |
| 'frame_count': frame_count, | |
| 'width': width, | |
| 'height': height, | |
| 'num_objects': len(trajectories) | |
| } | |
| } |