Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| from ultralytics import YOLO | |
| from collections import defaultdict | |
| import argparse | |
| class PersonCounter: | |
| def __init__(self, line_position=0.5): | |
| """Initialize person counter. | |
| Args: | |
| line_position (float): Virtual line position as fraction of frame height (0-1) | |
| """ | |
| self.model = YOLO("yolov8n.pt") # Load pretrained YOLOv8 model | |
| self.tracker = defaultdict(list) # Track object IDs | |
| self.crossed_ids = set() # Store IDs that have crossed the line | |
| self.line_position = line_position | |
| self.count = 0 | |
| def _calculate_center(self, box): | |
| """Calculate center point of detection box.""" | |
| x1, y1, x2, y2 = box | |
| return (x1 + x2) / 2, (y1 + y2) / 2 | |
| def process_frame(self, frame): | |
| """Process a single frame and update count. | |
| Args: | |
| frame: Input frame from video | |
| Returns: | |
| frame: Annotated frame | |
| count: Current count of people who entered | |
| """ | |
| height, width = frame.shape[:2] | |
| line_y = int(height * self.line_position) | |
| # Draw counting line | |
| cv2.line(frame, (0, line_y), (width, line_y), (0, 255, 0), 2) | |
| # Run detection and tracking | |
| results = self.model.track(frame, persist=True, classes=[0]) # class 0 is person | |
| if results[0].boxes.id is not None: | |
| boxes = results[0].boxes.xyxy.cpu().numpy() | |
| track_ids = results[0].boxes.id.cpu().numpy().astype(int) | |
| # Process each detection | |
| for box, track_id in zip(boxes, track_ids): | |
| # Draw bounding box | |
| cv2.rectangle(frame, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), | |
| (255, 0, 0), 2) | |
| # Get center point of the bottom edge of the box (feet position) | |
| center_x = (box[0] + box[2]) / 2 | |
| feet_y = box[3] # Bottom of the bounding box | |
| # Draw tracking point | |
| cv2.circle(frame, (int(center_x), int(feet_y)), 5, (0, 255, 255), -1) | |
| # Store tracking history | |
| if track_id in self.tracker: | |
| prev_y = self.tracker[track_id][-1] | |
| # Check if person has crossed the line (moving down) | |
| if prev_y < line_y and feet_y >= line_y and track_id not in self.crossed_ids: | |
| self.crossed_ids.add(track_id) | |
| self.count += 1 | |
| # Draw crossing indicator | |
| cv2.circle(frame, (int(center_x), int(line_y)), 8, (0, 0, 255), -1) | |
| # Update tracking history | |
| self.tracker[track_id] = [feet_y] # Only store current position | |
| # Draw count with bigger font and background | |
| count_text = f"Count: {self.count}" | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| font_scale = 1.5 | |
| thickness = 3 | |
| (text_width, text_height), _ = cv2.getTextSize(count_text, font, font_scale, thickness) | |
| # Draw background rectangle | |
| cv2.rectangle(frame, (10, 10), (20 + text_width, 20 + text_height), | |
| (0, 0, 0), -1) | |
| # Draw text | |
| cv2.putText(frame, count_text, (15, 15 + text_height), | |
| font, font_scale, (0, 255, 0), thickness) | |
| return frame, self.count | |
| def main(): | |
| parser = argparse.ArgumentParser(description='Count people entering through a line in video.') | |
| parser.add_argument('video_path', help='Path to input video file') | |
| parser.add_argument('--line-position', type=float, default=0.5, | |
| help='Position of counting line (0-1, fraction of frame height)') | |
| parser.add_argument('--output', default='result.mp4', help='Path to output video file (default: result.mp4)') | |
| args = parser.parse_args() | |
| # Initialize video capture | |
| cap = cv2.VideoCapture(args.video_path) | |
| if not cap.isOpened(): | |
| print(f"Error: Could not open video at {args.video_path}") | |
| return | |
| # Get video properties | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| # Initialize video writer | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| writer = cv2.VideoWriter(args.output, fourcc, fps, (width, height)) | |
| # Initialize person counter | |
| counter = PersonCounter(line_position=args.line_position) | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Process frame | |
| processed_frame, count = counter.process_frame(frame) | |
| # Display frame | |
| cv2.imshow('Frame', processed_frame) | |
| # Write processed frame to output video | |
| writer.write(processed_frame) | |
| # Break on 'q' press | |
| if cv2.waitKey(1) & 0xFF == ord('q'): | |
| break | |
| print(f"Final count: {counter.count}") | |
| # Clean up | |
| cap.release() | |
| writer.release() | |
| cv2.destroyAllWindows() | |
| if __name__ == "__main__": | |
| main() | |