File size: 8,080 Bytes
c0f6caf
 
 
 
5ea6336
 
 
c0f6caf
 
 
 
 
 
 
 
5ea6336
 
c0f6caf
 
 
5ea6336
c0f6caf
 
5ea6336
c0f6caf
 
5ea6336
c0f6caf
 
5ea6336
c0f6caf
 
 
 
 
 
 
 
5ea6336
c0f6caf
 
 
5ea6336
c0f6caf
5ea6336
c0f6caf
5ea6336
c0f6caf
 
 
 
 
5ea6336
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c0f6caf
 
5ea6336
 
 
 
 
 
c0f6caf
5ea6336
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f111856
 
 
 
 
5ea6336
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c0f6caf
5ea6336
 
 
c0f6caf
 
 
5ea6336
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import gradio as gr
from ultralytics import YOLO
import cv2
from PIL import Image
import numpy as np
import tempfile
import os

# Load the YOLO model - YOLOv11m for pothole, road damage, and garbage detection
try:
    model = YOLO("model.pt")
except Exception as e:
    print(f"Error loading model: {e}")
    model = None


def predict_image(image, conf_threshold):
    try:
        if image is None or model is None:
            return None, "Model not loaded or invalid image."

        results = model(image, imgsz=768, conf=conf_threshold)
        result = results[0]

        annotated_image = result.plot()
        annotated_image_rgb = cv2.cvtColor(annotated_image, cv2.COLOR_BGR2RGB)

        boxes = result.boxes
        class_names = result.names

        if len(boxes) == 0:
            detection_summary = "No civic issues detected in this image."
        else:
            detection_counts = {}
            for box in boxes:
                cls_id = int(box.cls.item() if hasattr(box.cls, "item") else box.cls[0])
                cls_name = class_names.get(cls_id, f"Class {cls_id}")
                detection_counts[cls_name] = detection_counts.get(cls_name, 0) + 1

            summary_lines = ["**Detections:**"]
            for cls_name, count in detection_counts.items():
                summary_lines.append(f"- {count} {cls_name}(s)")

            detection_summary = "\n".join(summary_lines)

        return Image.fromarray(annotated_image_rgb), detection_summary

    except Exception as e:
        import traceback
        error_msg = f"ERROR during prediction: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"
        return None, error_msg


def predict_video(video_path, conf_threshold, progress=gr.Progress()):
    try:
        if video_path is None or model is None:
            return None, "Model not loaded or no video provided."

        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            return None, "Could not open video file."

        # Video properties
        fps = cap.get(cv2.CAP_PROP_FPS) or 25
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        # Output temp file
        out_path = tempfile.mktemp(suffix=".mp4")
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        out = cv2.VideoWriter(out_path, fourcc, fps, (width, height))

        all_detection_counts = {}
        frame_idx = 0

        while True:
            ret, frame = cap.read()
            if not ret:
                break

            # Update progress
            if total_frames > 0:
                progress(frame_idx / total_frames, desc=f"Processing frame {frame_idx}/{total_frames}")

            # Run inference on frame (BGR numpy array works directly)
            results = model(frame, imgsz=768, conf=conf_threshold, verbose=False)
            result = results[0]

            # Annotate frame
            annotated_frame = result.plot()
            out.write(annotated_frame)

            # Accumulate detections
            for box in result.boxes:
                cls_id = int(box.cls.item() if hasattr(box.cls, "item") else box.cls[0])
                cls_name = result.names.get(cls_id, f"Class {cls_id}")
                all_detection_counts[cls_name] = all_detection_counts.get(cls_name, 0) + 1

            frame_idx += 1

        cap.release()
        out.release()

        # Re-encode with H.264 for browser compatibility (requires ffmpeg)
        final_path = tempfile.mktemp(suffix=".mp4")
        os.system(f'ffmpeg -y -i "{out_path}" -vcodec libx264 -crf 23 -preset fast "{final_path}" -loglevel quiet')
        if os.path.exists(final_path) and os.path.getsize(final_path) > 0:
            os.remove(out_path)
            out_path = final_path

        # Build summary
        if not all_detection_counts:
            summary = f"Processed {frame_idx} frames.\nNo civic issues detected in this video."
        else:
            summary_lines = [f"Processed {frame_idx} frames.\n\n**Total Detections Across All Frames:**"]
            for cls_name, count in sorted(all_detection_counts.items(), key=lambda x: -x[1]):
                summary_lines.append(f"- {count} {cls_name}(s)")
            summary = "\n".join(summary_lines)

        return out_path, summary

    except Exception as e:
        import traceback
        error_msg = f"ERROR during video prediction: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"
        return None, error_msg


# ── Gradio Interface ──────────────────────────────────────────────────────────
with gr.Blocks(title="PotholeNet-YOLO11m-v1 πŸ›‘") as interface:
    gr.Markdown("# πŸ›‘ PotholeNet-YOLO11m-v1")
    gr.Markdown(
        "**Aamchi City AI Civic System** β€” Real-time pothole, road damage, and garbage detection for Indian urban roads."
    )
    gr.Markdown(
        "Upload an image **or video** of a road to detect infrastructure issues. "
        "The model was trained on 23,000+ street-level images."
    )

    with gr.Tabs():
        # ── Image Tab ────────────────────────────────────────────────────────
        with gr.TabItem("πŸ–ΌοΈ Image Detection"):
            with gr.Row():
                with gr.Column():
                    input_image = gr.Image(type="pil", label="Upload Street Image")
                    img_conf_slider = gr.Slider(
                        minimum=0.01, maximum=1.0, value=0.25, step=0.01,
                        label="Confidence Threshold"
                    )
                    img_submit_btn = gr.Button("Detect Civic Issues", variant="primary")

                with gr.Column():
                    output_image = gr.Image(type="pil", label="Detection Results")
                    img_detection_text = gr.Textbox(
                        label="Detection Summary", interactive=False, lines=4
                    )

            img_submit_btn.click(
                fn=predict_image,
                inputs=[input_image, img_conf_slider],
                outputs=[output_image, img_detection_text],
            )

        # ── Video Tab ────────────────────────────────────────────────────────
        with gr.TabItem("🎬 Video Detection"):
            gr.Markdown(
                "> ⚠️ **Note:** Video processing is frame-by-frame and may take a while depending on length and hardware."
            )
            with gr.Row():
                with gr.Column():
                    input_video = gr.Video(
                        label="Upload Street Video",
                        sources=["upload"],
                        format="mp4",
                    )
                    vid_conf_slider = gr.Slider(
                        minimum=0.01, maximum=1.0, value=0.25, step=0.01,
                        label="Confidence Threshold"
                    )
                    vid_submit_btn = gr.Button("Detect Civic Issues in Video", variant="primary")

                with gr.Column():
                    output_video = gr.Video(label="Annotated Video")
                    vid_detection_text = gr.Textbox(
                        label="Detection Summary", interactive=False, lines=6
                    )

            vid_submit_btn.click(
                fn=predict_video,
                inputs=[input_video, vid_conf_slider],
                outputs=[output_video, vid_detection_text],
            )

    gr.Markdown("### Intended Use")
    gr.Markdown(
        "Real-time pothole detection, Automated civic issue reporting, Infrastructure health monitoring."
    )
    gr.Markdown("**Developer:** Vansh Momaya")

if __name__ == "__main__":
    interface.launch(server_name="0.0.0.0", server_port=7860)