Spaces:
Sleeping
Sleeping
| import cv2 | |
| import pandas as pd | |
| import math | |
| from ultralytics import YOLO | |
| from tracker import Tracker | |
| from datetime import datetime | |
| def estimate_speed(p1, p2, fps, ppm=8): | |
| """Estimate speed in km/h from two points, frames per second, and pixels-per-meter scale.""" | |
| distance_pixels = math.hypot(p2[0] - p1[0], p2[1] - p1[1]) | |
| distance_meters = distance_pixels / ppm | |
| speed_mps = distance_meters * fps | |
| speed_kmph = speed_mps * 3.6 | |
| return speed_kmph | |
| def run_speed_estimation(video_path, output_path): | |
| model = YOLO('yolov8s.pt') | |
| with open("coco.txt", "r") as my_file: | |
| class_list = my_file.read().split("\n") | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| print(f"Error opening video file: {video_path}") | |
| return | |
| frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (1020, 500)) | |
| tracker = Tracker() | |
| count = 0 | |
| log_data = [] | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| count += 1 | |
| if count % 3 != 0: | |
| continue | |
| frame = cv2.resize(frame, (1020, 500)) | |
| timestamp = count / fps | |
| results = model.predict(frame) | |
| boxes = results[0].boxes.data | |
| px = pd.DataFrame(boxes).astype("float") | |
| det_list = [] | |
| for _, row in px.iterrows(): | |
| x1, y1, x2, y2 = map(int, row[:4]) | |
| class_id = int(row[5]) | |
| label = class_list[class_id] | |
| if 'car' in label: | |
| det_list.append([x1, y1, x2, y2]) | |
| bbox_id = tracker.update(det_list) | |
| for bbox in bbox_id: | |
| x3, y3, x4, y4, obj_id = bbox | |
| cx = (x3 + x4) // 2 | |
| cy = (y3 + y4) // 2 | |
| center = (cx, cy) | |
| # Estimate speed | |
| speed = 0 | |
| history = tracker.track_history[obj_id] | |
| if len(history) >= 2: | |
| speed = estimate_speed(history[-2], history[-1], fps) | |
| speed = round(speed, 2) | |
| # Draw info | |
| cv2.circle(frame, center, 4, (0, 0, 255), -1) | |
| cv2.putText(frame, f"ID {obj_id} | {speed} km/h", (cx, cy), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) | |
| # Save to CSV log | |
| log_data.append({ | |
| 'timestamp_sec': round(timestamp, 2), | |
| 'vehicle_id': obj_id, | |
| 'x': cx, | |
| 'y': cy, | |
| 'speed_kmph': speed | |
| }) | |
| out.write(frame) | |
| cap.release() | |
| out.release() | |
| # Save CSV log | |
| df = pd.DataFrame(log_data) | |
| df.to_csv('static/vehicle_log.csv', index=False) | |
| print(f"Processing complete. Output saved to {output_path}") | |