|
|
from ultralytics import YOLO |
|
|
import matplotlib.pyplot as plt |
|
|
import matplotlib.image as mpimg |
|
|
import os |
|
|
import shutil |
|
|
import cv2 |
|
|
import subprocess |
|
|
import time |
|
|
|
|
|
|
|
|
|
|
|
def detectObjects(imageFile, confidence_score): |
|
|
current_path = os.getcwd() |
|
|
print(f"Current working directory: {current_path}") |
|
|
|
|
|
model = YOLO("yolov8n.pt", verbose=False) |
|
|
|
|
|
|
|
|
custom_save_dir = f"{current_path}/runs/detect/predict" |
|
|
|
|
|
custom_read_dir = "./" |
|
|
|
|
|
|
|
|
results = model.predict( |
|
|
source=f"{imageFile}", |
|
|
save=True, |
|
|
conf=confidence_score, |
|
|
save_dir=custom_save_dir, |
|
|
exist_ok=True |
|
|
) |
|
|
|
|
|
|
|
|
for result in results: |
|
|
for box in result.boxes: |
|
|
print(f"Class: {model.names[int(box.cls)]}, Confidence: {box.conf}, Coordinates: {box.xyxy}") |
|
|
output_path = f"/output" |
|
|
|
|
|
|
|
|
image_path = f"{custom_save_dir}/{imageFile}" |
|
|
|
|
|
|
|
|
img = mpimg.imread(image_path) |
|
|
return img |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detectVideo(videoFile, confidence_score): |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(videoFile) |
|
|
|
|
|
model = YOLO("yolov8n.pt", verbose=False) |
|
|
|
|
|
fps = int(cap.get(cv2.CAP_PROP_FPS)) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
|
|
|
output_path = f"annotated_{videoFile}" |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
|
|
|
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
|
|
|
results = model(frame) |
|
|
|
|
|
|
|
|
annotated_frame = results[0].plot() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
out.write(annotated_frame) |
|
|
|
|
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'): |
|
|
break |
|
|
out.release() |
|
|
repaired_path = f"repaired_{output_path}" |
|
|
|
|
|
command = [ |
|
|
'ffmpeg', '-y', |
|
|
'-i', output_path, |
|
|
'-c:v', 'libx264', |
|
|
'-c:a', 'aac', |
|
|
repaired_path |
|
|
] |
|
|
|
|
|
if retry_file_access(output_path): |
|
|
|
|
|
try: |
|
|
subprocess.run(command, check=True) |
|
|
print("Video processed successfully") |
|
|
except subprocess.CalledProcessError as e: |
|
|
print(f"Error occurred: {e}") |
|
|
|
|
|
cap.release() |
|
|
out.release() |
|
|
return repaired_path |
|
|
|
|
|
|
|
|
|
|
|
def retry_file_access(file_path, retries=3, delay=2): |
|
|
for i in range(retries): |
|
|
try: |
|
|
|
|
|
with open(file_path, 'rb') as file: |
|
|
|
|
|
return True |
|
|
except IOError: |
|
|
|
|
|
print(f"File is not ready yet. Retrying... {i+1}/{retries}") |
|
|
time.sleep(delay) |
|
|
print("File is not accessible after multiple retries.") |
|
|
return False |
|
|
|
|
|
|