SafetyNet / app.py
Vansh180's picture
Update app.py
f111856 verified
import gradio as gr
from ultralytics import YOLO
import cv2
from PIL import Image
import numpy as np
import tempfile
import os
# Load the YOLO model - YOLOv11m for pothole, road damage, and garbage detection
try:
model = YOLO("model.pt")
except Exception as e:
print(f"Error loading model: {e}")
model = None
def predict_image(image, conf_threshold):
try:
if image is None or model is None:
return None, "Model not loaded or invalid image."
results = model(image, imgsz=768, conf=conf_threshold)
result = results[0]
annotated_image = result.plot()
annotated_image_rgb = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)
boxes = result.boxes
class_names = result.names
if len(boxes) == 0:
detection_summary = "No civic issues detected in this image."
else:
detection_counts = {}
for box in boxes:
cls_id = int(box.cls.item() if hasattr(box.cls, "item") else box.cls[0])
cls_name = class_names.get(cls_id, f"Class {cls_id}")
detection_counts[cls_name] = detection_counts.get(cls_name, 0) + 1
summary_lines = ["**Detections:**"]
for cls_name, count in detection_counts.items():
summary_lines.append(f"- {count} {cls_name}(s)")
detection_summary = "\n".join(summary_lines)
return Image.fromarray(annotated_image_rgb), detection_summary
except Exception as e:
import traceback
error_msg = f"ERROR during prediction: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"
return None, error_msg
def predict_video(video_path, conf_threshold, progress=gr.Progress()):
try:
if video_path is None or model is None:
return None, "Model not loaded or no video provided."
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return None, "Could not open video file."
# Video properties
fps = cap.get(cv2.CAP_PROP_FPS) or 25
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Output temp file
out_path = tempfile.mktemp(suffix=".mp4")
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(out_path, fourcc, fps, (width, height))
all_detection_counts = {}
frame_idx = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Update progress
if total_frames > 0:
progress(frame_idx / total_frames, desc=f"Processing frame {frame_idx}/{total_frames}")
# Run inference on frame (BGR numpy array works directly)
results = model(frame, imgsz=768, conf=conf_threshold, verbose=False)
result = results[0]
# Annotate frame
annotated_frame = result.plot()
out.write(annotated_frame)
# Accumulate detections
for box in result.boxes:
cls_id = int(box.cls.item() if hasattr(box.cls, "item") else box.cls[0])
cls_name = result.names.get(cls_id, f"Class {cls_id}")
all_detection_counts[cls_name] = all_detection_counts.get(cls_name, 0) + 1
frame_idx += 1
cap.release()
out.release()
# Re-encode with H.264 for browser compatibility (requires ffmpeg)
final_path = tempfile.mktemp(suffix=".mp4")
os.system(f'ffmpeg -y -i "{out_path}" -vcodec libx264 -crf 23 -preset fast "{final_path}" -loglevel quiet')
if os.path.exists(final_path) and os.path.getsize(final_path) > 0:
os.remove(out_path)
out_path = final_path
# Build summary
if not all_detection_counts:
summary = f"Processed {frame_idx} frames.\nNo civic issues detected in this video."
else:
summary_lines = [f"Processed {frame_idx} frames.\n\n**Total Detections Across All Frames:**"]
for cls_name, count in sorted(all_detection_counts.items(), key=lambda x: -x[1]):
summary_lines.append(f"- {count} {cls_name}(s)")
summary = "\n".join(summary_lines)
return out_path, summary
except Exception as e:
import traceback
error_msg = f"ERROR during video prediction: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"
return None, error_msg
# ── Gradio Interface ──────────────────────────────────────────────────────────
with gr.Blocks(title="PotholeNet-YOLO11m-v1 πŸ›‘") as interface:
gr.Markdown("# πŸ›‘ PotholeNet-YOLO11m-v1")
gr.Markdown(
"**Aamchi City AI Civic System** β€” Real-time pothole, road damage, and garbage detection for Indian urban roads."
)
gr.Markdown(
"Upload an image **or video** of a road to detect infrastructure issues. "
"The model was trained on 23,000+ street-level images."
)
with gr.Tabs():
# ── Image Tab ────────────────────────────────────────────────────────
with gr.TabItem("πŸ–ΌοΈ Image Detection"):
with gr.Row():
with gr.Column():
input_image = gr.Image(type="pil", label="Upload Street Image")
img_conf_slider = gr.Slider(
minimum=0.01, maximum=1.0, value=0.25, step=0.01,
label="Confidence Threshold"
)
img_submit_btn = gr.Button("Detect Civic Issues", variant="primary")
with gr.Column():
output_image = gr.Image(type="pil", label="Detection Results")
img_detection_text = gr.Textbox(
label="Detection Summary", interactive=False, lines=4
)
img_submit_btn.click(
fn=predict_image,
inputs=[input_image, img_conf_slider],
outputs=[output_image, img_detection_text],
)
# ── Video Tab ────────────────────────────────────────────────────────
with gr.TabItem("🎬 Video Detection"):
gr.Markdown(
"> ⚠️ **Note:** Video processing is frame-by-frame and may take a while depending on length and hardware."
)
with gr.Row():
with gr.Column():
input_video = gr.Video(
label="Upload Street Video",
sources=["upload"],
format="mp4",
)
vid_conf_slider = gr.Slider(
minimum=0.01, maximum=1.0, value=0.25, step=0.01,
label="Confidence Threshold"
)
vid_submit_btn = gr.Button("Detect Civic Issues in Video", variant="primary")
with gr.Column():
output_video = gr.Video(label="Annotated Video")
vid_detection_text = gr.Textbox(
label="Detection Summary", interactive=False, lines=6
)
vid_submit_btn.click(
fn=predict_video,
inputs=[input_video, vid_conf_slider],
outputs=[output_video, vid_detection_text],
)
gr.Markdown("### Intended Use")
gr.Markdown(
"Real-time pothole detection, Automated civic issue reporting, Infrastructure health monitoring."
)
gr.Markdown("**Developer:** Vansh Momaya")
if __name__ == "__main__":
interface.launch(server_name="0.0.0.0", server_port=7860)