Spaces:
Running on Zero
Running on Zero
| """CCTV Customer Analytics - Advanced Object Counting System | |
| This Space provides accurate object detection, tracking, and counting | |
| across a user-defined line. Optimized for counting large numbers of | |
| animals (sheep, cows) and vehicles in crowded scenes. | |
| Key Features: | |
| - RT-DETR and YOLOv8 model support | |
| - Optimized ByteTrack for dense scenes | |
| - Proper geometric line crossing detection | |
| - Multi-class object support | |
| """ | |
| import gradio as gr | |
| import spaces | |
| import cv2 | |
| import numpy as np | |
| import tempfile | |
| import os | |
| from collections import defaultdict | |
| from typing import Dict, List, Tuple, Optional | |
| import supervision as sv | |
| from ultralytics import YOLO, RTDETR | |
| # Detection modes with COCO class IDs | |
| DETECTION_MODES = { | |
| "All Objects (Street)": { | |
| "class_ids": [0, 1, 2, 3, 5, 7, 17, 18, 19], | |
| "labels": {0: "person", 1: "bicycle", 2: "car", 3: "motorcycle", | |
| 5: "bus", 7: "truck", 17: "horse", 18: "sheep", 19: "cow"}, | |
| }, | |
| "People Only": { | |
| "class_ids": [0], | |
| "labels": {0: "person"}, | |
| }, | |
| "Vehicles Only": { | |
| "class_ids": [1, 2, 3, 5, 7], | |
| "labels": {1: "bicycle", 2: "car", 3: "motorcycle", 5: "bus", 7: "truck"}, | |
| }, | |
| "Animals (Sheep/Cow/Horse)": { | |
| "class_ids": [17, 18, 19], | |
| "labels": {17: "horse", 18: "sheep", 19: "cow"}, | |
| }, | |
| "Sheep Only": { | |
| "class_ids": [18], | |
| "labels": {18: "sheep"}, | |
| }, | |
| } | |
| MODEL_CACHE: Dict[str, object] = {} | |
| def get_model(model_name: str): | |
| """Load and cache detection model.""" | |
| if model_name not in MODEL_CACHE: | |
| model_map = { | |
| "YOLOv8n (Fast)": ("yolov8n.pt", "yolo"), | |
| "YOLOv8s (Balanced)": ("yolov8s.pt", "yolo"), | |
| "YOLOv8m (Accurate)": ("yolov8m.pt", "yolo"), | |
| "YOLOv8x (Best YOLO)": ("yolov8x.pt", "yolo"), | |
| "RT-DETR-L (Dense Scenes)": ("rtdetr-l.pt", "rtdetr"), | |
| } | |
| model_file, model_type = model_map.get(model_name, ("yolov8s.pt", "yolo")) | |
| if model_type == "rtdetr": | |
| MODEL_CACHE[model_name] = RTDETR(model_file) | |
| else: | |
| MODEL_CACHE[model_name] = YOLO(model_file) | |
| return MODEL_CACHE[model_name] | |
| def point_side(point: Tuple[float, float], line: Tuple[Tuple[float, float], Tuple[float, float]]) -> float: | |
| """Return the sign of a point relative to a line using cross product.""" | |
| (x1, y1), (x2, y2) = line | |
| x, y = point | |
| return (x - x1) * (y2 - y1) - (y - y1) * (x2 - x1) | |
| def crossed_line(prev_point: Tuple[float, float], curr_point: Tuple[float, float], | |
| line: Tuple[Tuple[float, float], Tuple[float, float]]) -> bool: | |
| """Check if movement from prev_point to curr_point crosses the line.""" | |
| prev_side = point_side(prev_point, line) | |
| curr_side = point_side(curr_point, line) | |
| return prev_side * curr_side < 0 | |
| def bbox_center(bbox: Tuple[int, int, int, int]) -> Tuple[float, float]: | |
| """Get center point of bounding box.""" | |
| x1, y1, x2, y2 = bbox | |
| return ((x1 + x2) / 2.0, (y1 + y2) / 2.0) | |
| def determine_outside_side(line: Tuple[Tuple[float, float], Tuple[float, float]], | |
| frame_height: int) -> float: | |
| """Determine which side of the line is 'outside' based on line position.""" | |
| (x1, y1), (x2, y2) = line | |
| mid_y = (y1 + y2) / 2.0 | |
| mid_x = (x1 + x2) / 2.0 | |
| # If line is in upper half, outside is above (y=0) | |
| # If line is in lower half, outside is below (y=height) | |
| if mid_y < frame_height / 2.0: | |
| reference_point = (mid_x, 0.0) | |
| else: | |
| reference_point = (mid_x, float(frame_height)) | |
| return point_side(reference_point, line) | |
| def process_video( | |
| video_path: str, | |
| detection_model: str, | |
| detection_mode: str, | |
| confidence: float, | |
| line_position: float, | |
| track_buffer: int, | |
| activation_threshold: float, | |
| ): | |
| """Process video with advanced tracking and counting.""" | |
| if video_path is None: | |
| return None, "Please upload a video file." | |
| # Get model and detection config | |
| model = get_model(detection_model) | |
| mode_config = DETECTION_MODES.get(detection_mode, DETECTION_MODES["All Objects (Street)"]) | |
| target_class_ids = set(mode_config["class_ids"]) | |
| class_labels = mode_config["labels"] | |
| # Open video | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None, "Failed to open video file." | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Setup output video | |
| output_path = tempfile.mktemp(suffix=".mp4") | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| # Initialize tracker with optimized parameters for dense scenes | |
| tracker = sv.ByteTrack( | |
| track_activation_threshold=activation_threshold, | |
| lost_track_buffer=track_buffer, | |
| minimum_matching_threshold=0.7, | |
| frame_rate=fps, | |
| ) | |
| # Setup counting line (absolute coordinates) | |
| line_y = int(height * line_position) | |
| line_start = (0, line_y) | |
| line_end = (width, line_y) | |
| abs_line = ((0.0, float(line_y)), (float(width), float(line_y))) | |
| outside_side = determine_outside_side(abs_line, height) | |
| # Annotators | |
| box_annotator = sv.BoxAnnotator(thickness=2) | |
| label_annotator = sv.LabelAnnotator(text_scale=0.4, text_thickness=1) | |
| trace_annotator = sv.TraceAnnotator(thickness=1, trace_length=50) | |
| # Tracking state | |
| track_last_center: Dict[int, Tuple[float, float]] = {} | |
| track_class: Dict[int, str] = {} | |
| counted_tracks: set = set() | |
| # Counters | |
| total_in, total_out = 0, 0 | |
| class_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: {"in": 0, "out": 0}) | |
| frame_idx = 0 | |
| max_simultaneous = 0 | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Run detection | |
| results = model.predict(frame, conf=confidence, verbose=False)[0] | |
| # Filter detections by target classes | |
| boxes = results.boxes | |
| if boxes is not None and len(boxes) > 0: | |
| mask = np.array([int(cls) in target_class_ids for cls in boxes.cls]) | |
| if mask.any(): | |
| filtered_boxes = boxes[mask] | |
| detections = sv.Detections( | |
| xyxy=filtered_boxes.xyxy.cpu().numpy(), | |
| confidence=filtered_boxes.conf.cpu().numpy(), | |
| class_id=filtered_boxes.cls.cpu().numpy().astype(int), | |
| ) | |
| else: | |
| detections = sv.Detections.empty() | |
| else: | |
| detections = sv.Detections.empty() | |
| # Track objects | |
| detections = tracker.update_with_detections(detections) | |
| # Update max simultaneous count | |
| if len(detections) > max_simultaneous: | |
| max_simultaneous = len(detections) | |
| # Check line crossings with proper geometry | |
| if detections.tracker_id is not None: | |
| for idx in range(len(detections)): | |
| track_id = int(detections.tracker_id[idx]) | |
| x1, y1, x2, y2 = detections.xyxy[idx] | |
| class_id = int(detections.class_id[idx]) if detections.class_id is not None else 0 | |
| class_name = class_labels.get(class_id, f"class_{class_id}") | |
| current_center = bbox_center((int(x1), int(y1), int(x2), int(y2))) | |
| track_class[track_id] = class_name | |
| if track_id in track_last_center and track_id not in counted_tracks: | |
| prev_center = track_last_center[track_id] | |
| if crossed_line(prev_center, current_center, abs_line): | |
| prev_side = point_side(prev_center, abs_line) | |
| curr_side = point_side(current_center, abs_line) | |
| # Determine direction based on which side is "outside" | |
| if prev_side * outside_side >= 0 and curr_side * outside_side < 0: | |
| total_in += 1 | |
| class_counts[class_name]["in"] += 1 | |
| elif prev_side * outside_side < 0 and curr_side * outside_side >= 0: | |
| total_out += 1 | |
| class_counts[class_name]["out"] += 1 | |
| counted_tracks.add(track_id) | |
| track_last_center[track_id] = current_center | |
| # Annotate frame | |
| annotated = frame.copy() | |
| # Draw counting line | |
| cv2.line(annotated, line_start, line_end, (0, 0, 255), 3) | |
| cv2.putText(annotated, "COUNTING LINE", (10, line_y - 10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) | |
| # Draw traces, boxes, and labels | |
| annotated = trace_annotator.annotate(annotated, detections) | |
| annotated = box_annotator.annotate(annotated, detections) | |
| labels = [] | |
| if detections.tracker_id is not None: | |
| for idx in range(len(detections)): | |
| class_id = int(detections.class_id[idx]) if detections.class_id is not None else 0 | |
| class_name = class_labels.get(class_id, f"class_{class_id}") | |
| track_id = int(detections.tracker_id[idx]) | |
| labels.append(f"{class_name} #{track_id}") | |
| annotated = label_annotator.annotate(annotated, detections, labels) | |
| # Draw stats overlay | |
| overlay_h = 80 | |
| cv2.rectangle(annotated, (5, 5), (300, overlay_h), (0, 0, 0), -1) | |
| cv2.putText(annotated, f"IN: {total_in} | OUT: {total_out}", (15, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) | |
| cv2.putText(annotated, f"Net: {total_in - total_out} | Now: {len(detections)}", (15, 55), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| cv2.putText(annotated, f"Frame: {frame_idx}/{total_frames}", (15, 75), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (200, 200, 200), 1) | |
| out.write(annotated) | |
| frame_idx += 1 | |
| cap.release() | |
| out.release() | |
| # Convert to H.264 for browser compatibility | |
| final_path = tempfile.mktemp(suffix=".mp4") | |
| os.system(f'ffmpeg -y -i {output_path} -c:v libx264 -preset fast -crf 23 {final_path} -loglevel quiet') | |
| if os.path.exists(final_path) and os.path.getsize(final_path) > 0: | |
| os.remove(output_path) | |
| output_path = final_path | |
| # Generate statistics report | |
| unique_tracks = len(track_last_center) | |
| stats = "## Counting Results\n\n" | |
| stats += f"**Total Entered:** {total_in}\n" | |
| stats += f"**Total Exited:** {total_out}\n" | |
| stats += f"**Net Count:** {total_in - total_out}\n" | |
| stats += f"**Unique Tracks:** {unique_tracks}\n" | |
| stats += f"**Max Simultaneous:** {max_simultaneous}\n\n" | |
| if class_counts: | |
| stats += "### By Class\n" | |
| for cls, counts in sorted(class_counts.items()): | |
| net = counts['in'] - counts['out'] | |
| stats += f"- **{cls}**: IN={counts['in']}, OUT={counts['out']}, Net={net}\n" | |
| stats += f"\n### Video Info\n" | |
| stats += f"- Frames: {frame_idx}\n" | |
| stats += f"- Resolution: {width}x{height}\n" | |
| stats += f"- FPS: {fps}\n" | |
| return output_path, stats | |
| # Build Gradio interface | |
| with gr.Blocks(analytics_enabled=False, title="CCTV Customer Analytics") as demo: | |
| gr.Markdown(""" | |
| # CCTV Customer Analytics | |
| Advanced object detection, tracking, and counting system. | |
| Optimized for counting large numbers of animals and vehicles in crowded scenes. | |
| **Tips for best results:** | |
| - Use **RT-DETR** model for dense/crowded scenes (sheep flocks, traffic) | |
| - Lower **confidence** (0.15-0.25) to detect more objects | |
| - Increase **track buffer** (60-90) for objects that temporarily disappear | |
| - Adjust **line position** to where objects cross most clearly | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_input = gr.Video(label="Upload Video") | |
| model_dropdown = gr.Dropdown( | |
| choices=[ | |
| "YOLOv8n (Fast)", | |
| "YOLOv8s (Balanced)", | |
| "YOLOv8m (Accurate)", | |
| "YOLOv8x (Best YOLO)", | |
| "RT-DETR-L (Dense Scenes)", | |
| ], | |
| value="YOLOv8s (Balanced)", | |
| label="Detection Model", | |
| ) | |
| mode_dropdown = gr.Dropdown( | |
| choices=list(DETECTION_MODES.keys()), | |
| value="All Objects (Street)", | |
| label="Detection Mode", | |
| ) | |
| confidence_slider = gr.Slider( | |
| 0.05, 0.9, value=0.25, step=0.05, | |
| label="Confidence Threshold", | |
| info="Lower = more detections, higher = fewer false positives" | |
| ) | |
| line_slider = gr.Slider( | |
| 0.1, 0.9, value=0.5, step=0.05, | |
| label="Line Position", | |
| info="Vertical position of counting line (0=top, 1=bottom)" | |
| ) | |
| with gr.Accordion("Advanced Tracking Settings", open=False): | |
| track_buffer = gr.Slider( | |
| 10, 120, value=45, step=5, | |
| label="Track Buffer", | |
| info="Frames to keep lost tracks (higher for crowded scenes)" | |
| ) | |
| activation_threshold = gr.Slider( | |
| 0.1, 0.5, value=0.2, step=0.05, | |
| label="Track Activation Threshold", | |
| info="Lower = easier to start new tracks" | |
| ) | |
| submit_btn = gr.Button("Process Video", variant="primary", size="lg") | |
| with gr.Column(scale=1): | |
| video_output = gr.Video(label="Processed Video") | |
| stats_output = gr.Markdown(label="Statistics") | |
| submit_btn.click( | |
| fn=process_video, | |
| inputs=[ | |
| video_input, model_dropdown, mode_dropdown, | |
| confidence_slider, line_slider, track_buffer, activation_threshold | |
| ], | |
| outputs=[video_output, stats_output], | |
| api_name=False, | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| **Models:** | |
| - **YOLOv8n/s/m/x**: General purpose, good for most scenarios | |
| - **RT-DETR-L**: Transformer-based, better for dense/crowded scenes (recommended for sheep counting) | |
| **Detection Modes:** | |
| - **All Objects**: People + vehicles + animals | |
| - **Animals**: Sheep, cows, horses | |
| - **Sheep Only**: Optimized for sheep counting | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch() | |