| from ultralytics import YOLO |
| import matplotlib.pyplot as plt |
| import matplotlib.image as mpimg |
| import os |
| import shutil |
| import cv2 |
| import subprocess |
| from pathlib import Path |
|
|
| |
|
|
| def detectTrafficObjects(imageFile, confidence_score): |
| current_path = os.getcwd() |
| print(f"Current working directory: {imageFile}") |
| |
| model = YOLO("./yolov8n_trained_traffic_signs.pt") |
| |
| |
| custom_save_dir = f"/app/runs/detect/predict" |
|
|
| custom_read_dir = "./" |
| |
| |
| results = model.predict( |
| source=f"{imageFile}", |
| save=True, |
| conf=confidence_score, |
| save_dir=custom_save_dir, |
| exist_ok=True |
| ) |
| |
| |
| for result in results: |
| for box in result.boxes: |
| print(f"Class: {model.names[int(box.cls)]}, Confidence: {box.conf}, Coordinates: {box.xyxy}") |
| output_path = f"{current_path}/output" |
| |
| |
| |
| image_path = f"{custom_save_dir}/{imageFile}" |
| |
| pred_dir = Path("/app/runs/detect/predict") |
| stem = Path(image_path).stem |
| matches = list(pred_dir.glob(f"{stem}.*")) |
|
|
| if not matches: |
| raise FileNotFoundError(f"No output image found for {stem} in {pred_dir}") |
| |
| img = mpimg.imread(str(matches[0])) |
| return img |
| |
| |
| |
| |
|
|
| def detectTrafficVideo(videoFile, confidence_score): |
| |
| |
| cap = cv2.VideoCapture(videoFile) |
| |
| model = YOLO("./yolov8n_trained_traffic_signs.pt", verbose=False) |
| |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| |
| |
| output_path = f"annotated_{videoFile}" |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
| |
| |
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
| |
| |
| results = model(frame) |
| |
| |
| annotated_frame = results[0].plot() |
| |
| |
| |
| |
| |
| out.write(annotated_frame) |
| |
| |
| if cv2.waitKey(1) & 0xFF == ord('q'): |
| break |
| out.release() |
| repaired_path = f"repaired_{output_path}" |
| |
| command = [ |
| 'ffmpeg', '-y', |
| '-i', output_path, |
| '-c:v', 'libx264', |
| '-c:a', 'aac', |
| repaired_path |
| ] |
| |
| if retry_file_access(output_path): |
| |
| try: |
| subprocess.run(command, check=True) |
| print("Video processed successfully") |
| except subprocess.CalledProcessError as e: |
| print(f"Error occurred: {e}") |
| |
| cap.release() |
| out.release() |
| return repaired_path |
| |
| |
|
|
| def retry_file_access(file_path, retries=3, delay=2): |
| for i in range(retries): |
| try: |
| |
| with open(file_path, 'rb') as file: |
| |
| return True |
| except IOError: |
| |
| print(f"File is not ready yet. Retrying... {i+1}/{retries}") |
| time.sleep(delay) |
| print("File is not accessible after multiple retries.") |
| return False |
|
|
|
|