Spaces:
Running
Running
| """ | |
| app.py | |
| ------ | |
| Streamlit Dashboard for Autonomous Vehicle Obstacle Detection. | |
| Deployed on Hugging Face Spaces (Streamlit SDK). | |
| Sections: | |
| πΌοΈ Image Detection β Upload & analyse images | |
| π¬ Video Detection β Process video files | |
| π· Webcam β Real-time live detection | |
| π Analytics β Class distribution & confidence charts | |
| """ | |
| # βββ Standard Library ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import gc | |
| import io | |
| import sys | |
| import tempfile | |
| import time | |
| import warnings | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| warnings.filterwarnings("ignore") | |
| # βββ Third-Party βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import cv2 | |
| import numpy as np | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import streamlit as st | |
| from PIL import Image | |
| # βββ Project root on sys.path βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ROOT = Path(__file__).parent | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| # βββ Page Config (must be first Streamlit call) βββββββββββββββββββββββββββββββ | |
| st.set_page_config( | |
| page_title="π Obstacle Detection Dashboard", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| menu_items={ | |
| "Get Help": "https://github.com/pun33th45/autonomous-vehicle-obstacle-detection-yolo", | |
| "Report a bug": "https://github.com/pun33th45/autonomous-vehicle-obstacle-detection-yolo/issues", | |
| "About": "YOLOv8-powered Autonomous Vehicle Obstacle Detection System", | |
| }, | |
| ) | |
| # βββ Constants ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Automotive COCO class names as returned by ultralytics/YOLOv8 | |
| CLASS_NAMES: List[str] = [ | |
| "person", "bicycle", "car", "motorcycle", | |
| "bus", "truck", "traffic light", "stop sign", | |
| ] | |
| CLASS_ICONS: Dict[str, str] = { | |
| "person": "πΆ", | |
| "bicycle": "π²", | |
| "car": "π", | |
| "motorcycle": "ποΈ", | |
| "bus": "π", | |
| "truck": "π", | |
| "traffic light": "π¦", | |
| "stop sign": "π", | |
| } | |
| # Distinct colour palette (BGR for OpenCV, RGB for display) | |
| CLASS_COLORS_BGR: List[Tuple[int, int, int]] = [ | |
| (0, 200, 50), # person β green | |
| (255, 140, 0), # bicycle β orange | |
| (30, 80, 255), # car β blue | |
| (200, 0, 200), # motorcycle β magenta | |
| (0, 220, 220), # bus β cyan | |
| (150, 0, 150), # truck β purple | |
| (0, 200, 255), # traffic light β yellow-blue | |
| (50, 255, 150), # stop sign β teal | |
| ] | |
| CLASS_COLORS_HEX: List[str] = [ | |
| "#32C832", "#FF8C00", "#1E50FF", "#C800C8", | |
| "#00DCDC", "#960096", "#00C8FF", "#32FF96", | |
| ] | |
| # Resize images longer than this before inference (keeps UI responsive) | |
| MAX_INFER_SIZE = 640 | |
| # βββ Inline CSS βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.markdown(""" | |
| <style> | |
| .main { background-color: #0e1117; } | |
| [data-testid="metric-container"] { | |
| background: linear-gradient(135deg, #1a1f2e, #252d40); | |
| border: 1px solid #2d3548; | |
| border-radius: 12px; | |
| padding: 16px 20px; | |
| box-shadow: 0 4px 15px rgba(0,0,0,0.3); | |
| } | |
| [data-testid="metric-container"] label { | |
| color: #8b9dc3 !important; | |
| font-size: 0.82rem !important; | |
| text-transform: uppercase; | |
| letter-spacing: 0.08em; | |
| } | |
| [data-testid="metric-container"] [data-testid="stMetricValue"] { | |
| color: #e0e6f0 !important; | |
| font-size: 1.9rem !important; | |
| font-weight: 700; | |
| } | |
| .det-card { | |
| background: #1a1f2e; | |
| border-left: 4px solid; | |
| border-radius: 8px; | |
| padding: 10px 14px; | |
| margin: 6px 0; | |
| } | |
| .section-header { | |
| background: linear-gradient(90deg, #1a237e, #283593); | |
| color: white; | |
| padding: 10px 20px; | |
| border-radius: 10px; | |
| margin-bottom: 16px; | |
| font-size: 1.1rem; | |
| font-weight: 600; | |
| } | |
| [data-testid="stSidebar"] { | |
| background: linear-gradient(180deg, #0d1117 0%, #161b27 100%); | |
| border-right: 1px solid #21262d; | |
| } | |
| .stTabs [data-baseweb="tab"] { | |
| color: #8b9dc3; | |
| font-weight: 600; | |
| font-size: 0.95rem; | |
| } | |
| .stTabs [aria-selected="true"] { | |
| color: #58a6ff !important; | |
| border-bottom: 2px solid #58a6ff !important; | |
| } | |
| .stButton>button { | |
| background: linear-gradient(135deg, #1565c0, #1976d2); | |
| color: white; | |
| border: none; | |
| border-radius: 8px; | |
| font-weight: 600; | |
| transition: all 0.2s; | |
| } | |
| .stButton>button:hover { | |
| background: linear-gradient(135deg, #1976d2, #1e88e5); | |
| box-shadow: 0 4px 12px rgba(21,101,192,0.4); | |
| transform: translateY(-1px); | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Model Loading β cached singleton | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_model(): | |
| """Load YOLOv8n once and cache it for the session lifetime.""" | |
| from ultralytics import YOLO | |
| model = YOLO("yolov8n.pt") | |
| model.to("cpu") | |
| return model | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Inference helpers | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _resize_for_inference(img: np.ndarray) -> np.ndarray: | |
| """Resize so the longest edge β€ MAX_INFER_SIZE (keeps inference snappy).""" | |
| h, w = img.shape[:2] | |
| if max(h, w) > MAX_INFER_SIZE: | |
| scale = MAX_INFER_SIZE / max(h, w) | |
| img = cv2.resize(img, (int(w * scale), int(h * scale)), | |
| interpolation=cv2.INTER_AREA) | |
| return img | |
| def run_inference( | |
| model, | |
| image: np.ndarray, | |
| conf: float, | |
| iou: float, | |
| ) -> Tuple[np.ndarray, List[Dict[str, Any]], float]: | |
| """ | |
| Run YOLOv8 inference on a BGR image. | |
| Returns: | |
| (annotated_image, list_of_dets, inference_ms) | |
| """ | |
| image = _resize_for_inference(image) | |
| t0 = time.perf_counter() | |
| results = model.predict(image, conf=conf, iou=iou, device="cpu", verbose=False) | |
| inf_ms = (time.perf_counter() - t0) * 1000 | |
| detections: List[Dict[str, Any]] = [] | |
| annotated = image.copy() | |
| for result in results: | |
| if result.boxes is None: | |
| continue | |
| for box in result.boxes: | |
| cls_id = int(box.cls.item()) | |
| conf_val = float(box.conf.item()) | |
| x1, y1, x2, y2 = [int(v) for v in box.xyxy[0].tolist()] | |
| cls_name = result.names.get(cls_id, str(cls_id)) if result.names else str(cls_id) | |
| color = CLASS_COLORS_BGR[cls_id % len(CLASS_COLORS_BGR)] | |
| cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2) | |
| label = f"{cls_name} {conf_val:.2f}" | |
| (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.55, 1) | |
| cv2.rectangle(annotated, (x1, y1 - th - 6), (x1 + tw + 4, y1), color, -1) | |
| cv2.putText(annotated, label, (x1 + 2, y1 - 3), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.55, (255, 255, 255), 1) | |
| detections.append({ | |
| "class_id": cls_id, | |
| "class_name": cls_name, | |
| "confidence": round(conf_val, 4), | |
| "bbox": [x1, y1, x2, y2], | |
| }) | |
| del results | |
| gc.collect() | |
| return annotated, detections, inf_ms | |
| def bgr_to_rgb(img: np.ndarray) -> np.ndarray: | |
| return cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Sidebar | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def render_sidebar() -> Dict[str, Any]: | |
| with st.sidebar: | |
| st.markdown(""" | |
| <div style="text-align:center; padding: 20px 0 10px;"> | |
| <div style="font-size:3rem;">π</div> | |
| <div style="color:#58a6ff; font-size:1.1rem; font-weight:700; | |
| letter-spacing:0.05em;">OBSTACLE DETECTION</div> | |
| <div style="color:#6b7280; font-size:0.75rem;">Powered by YOLOv8n</div> | |
| </div> | |
| <hr style="border-color:#21262d; margin:0 0 20px;"/> | |
| """, unsafe_allow_html=True) | |
| # ββ Model Settings ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.markdown("### βοΈ Model Settings") | |
| st.caption("π `yolov8n.pt`") | |
| st.info("π» Inference on **CPU** via Ultralytics YOLOv8", icon="βΉοΈ") | |
| st.divider() | |
| # ββ Detection Thresholds ββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.markdown("### π― Detection Settings") | |
| conf_threshold = st.slider( | |
| "Confidence Threshold", | |
| min_value=0.10, max_value=0.95, value=0.35, step=0.05, | |
| help="Minimum confidence score to show a detection.", | |
| ) | |
| iou_threshold = st.slider( | |
| "IoU Threshold (NMS)", | |
| min_value=0.10, max_value=0.95, value=0.45, step=0.05, | |
| help="Non-maximum suppression IoU threshold.", | |
| ) | |
| st.divider() | |
| # ββ Class Filter ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.markdown("### π Class Filter") | |
| show_all = st.checkbox("Show all classes", value=True) | |
| selected_classes = CLASS_NAMES | |
| if not show_all: | |
| selected_classes = st.multiselect( | |
| "Select classes to display", | |
| options=CLASS_NAMES, | |
| default=CLASS_NAMES, | |
| format_func=lambda x: f"{CLASS_ICONS.get(x,'')} {x}", | |
| ) | |
| st.divider() | |
| st.markdown(""" | |
| <div style="color:#6b7280; font-size:0.78rem; text-align:center;"> | |
| <b>Autonomous Obstacle Detection</b><br/> | |
| YOLOv8n Β· Ultralytics Β· OpenCV<br/> | |
| <a href="https://github.com/pun33th45/autonomous-vehicle-obstacle-detection-yolo" | |
| style="color:#58a6ff;">GitHub β</a> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| return { | |
| "conf_threshold": conf_threshold, | |
| "iou_threshold": iou_threshold, | |
| "selected_classes": selected_classes, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Analytics helpers β no pandas, plotly accepts plain lists/dicts | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _chart_layout() -> Dict: | |
| return dict( | |
| plot_bgcolor="rgba(0,0,0,0)", | |
| paper_bgcolor="rgba(0,0,0,0)", | |
| font_color="#c9d1d9", | |
| showlegend=False, | |
| margin=dict(t=40, b=20), | |
| ) | |
| def render_detection_stats(detections: List[Dict], inf_ms: float) -> None: | |
| if not detections: | |
| st.info("π No obstacles detected above the confidence threshold.") | |
| return | |
| total = len(detections) | |
| avg_conf = sum(d["confidence"] for d in detections) / total | |
| classes = list({d["class_name"] for d in detections}) | |
| c1, c2, c3, c4 = st.columns(4) | |
| c1.metric("π― Detections", total) | |
| c2.metric("π Avg Confidence", f"{avg_conf:.1%}") | |
| c3.metric("β‘ Inference", f"{inf_ms:.1f} ms") | |
| c4.metric("π·οΈ Unique Classes", len(classes)) | |
| st.divider() | |
| col_chart, col_table = st.columns([3, 2]) | |
| with col_chart: | |
| # Bar chart β class counts | |
| class_counts: Dict[str, int] = {} | |
| for d in detections: | |
| class_counts[d["class_name"]] = class_counts.get(d["class_name"], 0) + 1 | |
| sorted_classes = sorted(class_counts, key=class_counts.__getitem__, reverse=True) | |
| color_map = {n: CLASS_COLORS_HEX[i % len(CLASS_COLORS_HEX)] | |
| for i, n in enumerate(CLASS_NAMES)} | |
| fig_bar = px.bar( | |
| x=sorted_classes, | |
| y=[class_counts[c] for c in sorted_classes], | |
| color=sorted_classes, | |
| color_discrete_map=color_map, | |
| labels={"x": "Class", "y": "Count"}, | |
| title="Detections per Class", | |
| text=[class_counts[c] for c in sorted_classes], | |
| ) | |
| fig_bar.update_layout(**_chart_layout()) | |
| fig_bar.update_traces(textposition="outside", marker_line_width=0) | |
| fig_bar.update_xaxes(showgrid=False) | |
| fig_bar.update_yaxes(gridcolor="#21262d") | |
| st.plotly_chart(fig_bar, use_container_width=True) | |
| # Box plot β confidence distribution per class | |
| x_vals = [d["class_name"] for d in detections] | |
| y_vals = [d["confidence"] for d in detections] | |
| fig_box = px.box( | |
| x=x_vals, y=y_vals, | |
| color=x_vals, | |
| color_discrete_map=color_map, | |
| labels={"x": "Class", "y": "Confidence"}, | |
| title="Confidence Score Distribution", | |
| points="all", | |
| ) | |
| fig_box.update_layout(**_chart_layout()) | |
| fig_box.update_yaxes(range=[0, 1.05], gridcolor="#21262d") | |
| fig_box.update_xaxes(showgrid=False) | |
| st.plotly_chart(fig_box, use_container_width=True) | |
| with col_table: | |
| st.markdown("#### π Detection Details") | |
| for det in sorted(detections, key=lambda d: -d["confidence"]): | |
| icon = CLASS_ICONS.get(det["class_name"], "π·") | |
| color = CLASS_COLORS_HEX[det["class_id"] % len(CLASS_COLORS_HEX)] | |
| conf_pct = int(det["confidence"] * 100) | |
| x1, y1, x2, y2 = det["bbox"] | |
| st.markdown( | |
| f"""<div class="det-card" style="border-left-color:{color};"> | |
| <span style="font-size:1.2rem;">{icon}</span> | |
| <strong style="color:{color}; margin-left:6px;"> | |
| {det['class_name'].replace('_',' ').title()} | |
| </strong> | |
| <br/> | |
| <span style="color:#8b9dc3; font-size:0.82rem;"> | |
| Conf: <b style="color:#e0e6f0;">{conf_pct}%</b> | |
| Size: <b style="color:#e0e6f0;">{x2-x1}Γ{y2-y1}px</b> | |
| </span> | |
| </div>""", | |
| unsafe_allow_html=True, | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Tab 1 β Image Detection | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def tab_image_detection(model, cfg: Dict) -> None: | |
| st.markdown( | |
| '<div class="section-header">πΌοΈ Image Obstacle Detection</div>', | |
| unsafe_allow_html=True, | |
| ) | |
| col_upload, col_options = st.columns([3, 1]) | |
| with col_options: | |
| st.markdown("#### Options") | |
| show_original = st.checkbox("Show original side-by-side", value=True) | |
| download_result = st.checkbox("Enable result download", value=True) | |
| with col_upload: | |
| uploaded = st.file_uploader( | |
| "Upload an image", | |
| type=["jpg", "jpeg", "png", "bmp", "webp"], | |
| label_visibility="collapsed", | |
| ) | |
| if uploaded is None: | |
| st.markdown(""" | |
| <div style="border:2px dashed #21262d; border-radius:12px; | |
| padding:40px; text-align:center; color:#6b7280; margin:20px 0;"> | |
| <div style="font-size:3rem;">πΈ</div> | |
| <div style="font-size:1.1rem; margin:10px 0;"> | |
| Upload an image to detect road obstacles | |
| </div> | |
| <div style="font-size:0.85rem;">Supports JPG Β· PNG Β· BMP Β· WEBP</div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| return | |
| if model is None: | |
| st.error("β Model not loaded. Check the weights path in the sidebar.") | |
| return | |
| file_bytes = np.frombuffer(uploaded.read(), dtype=np.uint8) | |
| img_bgr = cv2.imdecode(file_bytes, cv2.IMREAD_COLOR) | |
| if img_bgr is None: | |
| st.error("β Could not decode image.") | |
| return | |
| with st.spinner("π Running detectionβ¦"): | |
| annotated_bgr, dets, inf_ms = run_inference( | |
| model, img_bgr, cfg["conf_threshold"], cfg["iou_threshold"], | |
| ) | |
| # Free original before displaying | |
| del file_bytes | |
| gc.collect() | |
| dets = [d for d in dets if d["class_name"] in cfg["selected_classes"]] | |
| if show_original: | |
| col_orig, col_det = st.columns(2) | |
| with col_orig: | |
| st.markdown("##### Original") | |
| st.image(bgr_to_rgb(img_bgr), use_column_width=True) | |
| with col_det: | |
| st.markdown(f"##### Detected β {len(dets)} obstacle(s)") | |
| st.image(bgr_to_rgb(annotated_bgr), use_column_width=True) | |
| else: | |
| st.image(bgr_to_rgb(annotated_bgr), | |
| caption=f"Detected: {len(dets)} obstacle(s)", | |
| use_column_width=True) | |
| if download_result: | |
| _, buf = cv2.imencode(".png", annotated_bgr) | |
| st.download_button( | |
| "β¬οΈ Download Annotated Image", | |
| data=buf.tobytes(), | |
| file_name=f"detected_{uploaded.name}", | |
| mime="image/png", | |
| ) | |
| del img_bgr, annotated_bgr | |
| gc.collect() | |
| st.divider() | |
| st.markdown("### π Detection Analytics") | |
| render_detection_stats(dets, inf_ms) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Tab 2 β Video Detection | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def tab_video_detection(model, cfg: Dict) -> None: | |
| st.markdown( | |
| '<div class="section-header">π¬ Video Obstacle Detection</div>', | |
| unsafe_allow_html=True, | |
| ) | |
| col_up, col_opt = st.columns([3, 1]) | |
| with col_opt: | |
| st.markdown("#### Options") | |
| frame_skip = st.slider( | |
| "Frame Skip", min_value=1, max_value=10, value=2, | |
| help="Process every N-th frame (higher = faster, less RAM).", | |
| ) | |
| max_frames = st.number_input( | |
| "Max Frames", min_value=10, max_value=500, value=150, | |
| help="Cap frames to process (keeps memory bounded).", | |
| ) | |
| with col_up: | |
| uploaded_video = st.file_uploader( | |
| "Upload a video", | |
| type=["mp4", "avi", "mov", "mkv"], | |
| label_visibility="collapsed", | |
| ) | |
| if uploaded_video is None: | |
| st.markdown(""" | |
| <div style="border:2px dashed #21262d; border-radius:12px; | |
| padding:40px; text-align:center; color:#6b7280; margin:20px 0;"> | |
| <div style="font-size:3rem;">π¬</div> | |
| <div style="font-size:1.1rem; margin:10px 0;"> | |
| Upload a video to detect obstacles frame by frame | |
| </div> | |
| <div style="font-size:0.85rem;">Supports MP4 Β· AVI Β· MOV Β· MKV</div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| return | |
| if model is None: | |
| st.error("β Model not loaded.") | |
| return | |
| if st.button("βΆοΈ Process Video", type="primary", use_container_width=True): | |
| _process_and_display_video(uploaded_video, model, cfg, frame_skip, int(max_frames)) | |
| def _process_and_display_video(uploaded_video, model, cfg, frame_skip, max_frames): | |
| with tempfile.NamedTemporaryFile( | |
| suffix=Path(uploaded_video.name).suffix, delete=False | |
| ) as tmp: | |
| tmp.write(uploaded_video.read()) | |
| tmp_path = Path(tmp.name) | |
| cap = cv2.VideoCapture(str(tmp_path)) | |
| if not cap.isOpened(): | |
| st.error("β Cannot open video file.") | |
| tmp_path.unlink(missing_ok=True) | |
| return | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| src_fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # Clamp output size to MAX_INFER_SIZE to save disk + RAM | |
| scale = min(1.0, MAX_INFER_SIZE / max(width, height, 1)) | |
| out_w, out_h = int(width * scale), int(height * scale) | |
| st.info( | |
| f"πΉ **{uploaded_video.name}** | {width}Γ{height} β {out_w}Γ{out_h} " | |
| f"| {src_fps:.0f} FPS | {total_frames} frames" | |
| ) | |
| out_path = tmp_path.with_name("output.mp4") | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| writer = cv2.VideoWriter(str(out_path), fourcc, src_fps, (out_w, out_h)) | |
| progress_bar = st.progress(0, text="Processing framesβ¦") | |
| status_text = st.empty() | |
| preview_slot = st.empty() | |
| all_dets: List[Dict] = [] | |
| fps_times: List[float] = [] | |
| processed = 0 | |
| frame_idx = 0 | |
| frames_to_process = min(max_frames, total_frames) | |
| try: | |
| while processed < frames_to_process: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_idx % max(1, frame_skip) == 0: | |
| t0 = time.perf_counter() | |
| annotated, dets, _ = run_inference( | |
| model, frame, cfg["conf_threshold"], cfg["iou_threshold"], | |
| ) | |
| fps_times.append(time.perf_counter() - t0) | |
| fps_val = 1.0 / (fps_times[-1] + 1e-9) | |
| cv2.putText(annotated, f"FPS: {fps_val:.1f}", | |
| (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2) | |
| writer.write(annotated) | |
| dets_filtered = [d for d in dets if d["class_name"] in cfg["selected_classes"]] | |
| all_dets.extend(dets_filtered) | |
| processed += 1 | |
| if processed % 10 == 0: | |
| pct = processed / frames_to_process | |
| progress_bar.progress(pct, text=f"Processing⦠{processed}/{frames_to_process}") | |
| status_text.markdown( | |
| f"**Frame {frame_idx}** | Detections: {len(dets_filtered)} | Total: {len(all_dets)}" | |
| ) | |
| preview_slot.image(bgr_to_rgb(annotated), | |
| caption=f"Frame {frame_idx}", | |
| use_column_width=True) | |
| del annotated, dets | |
| gc.collect() | |
| else: | |
| # Write resized frame for skipped frames | |
| writer.write(cv2.resize(frame, (out_w, out_h))) | |
| frame_idx += 1 | |
| finally: | |
| cap.release() | |
| writer.release() | |
| tmp_path.unlink(missing_ok=True) | |
| progress_bar.progress(1.0, text="β Processing complete!") | |
| status_text.empty() | |
| avg_ms = sum(fps_times) / max(1, len(fps_times)) * 1000 | |
| avg_fps = 1000 / avg_ms if avg_ms > 0 else 0 | |
| st.success( | |
| f"β Processed **{processed}** frames | " | |
| f"Avg: **{avg_fps:.1f} FPS** ({avg_ms:.1f} ms) | " | |
| f"Total detections: **{len(all_dets)}**" | |
| ) | |
| if out_path.exists(): | |
| with open(out_path, "rb") as f: | |
| st.download_button( | |
| "β¬οΈ Download Annotated Video", | |
| data=f, | |
| file_name=f"detected_{uploaded_video.name}", | |
| mime="video/mp4", | |
| ) | |
| out_path.unlink(missing_ok=True) | |
| if all_dets: | |
| st.divider() | |
| st.markdown("### π Video Detection Analytics") | |
| render_detection_stats(all_dets, avg_ms) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Tab 3 β Webcam Detection | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def tab_webcam_detection(model, cfg: Dict) -> None: | |
| st.markdown( | |
| '<div class="section-header">π· Live Webcam Detection</div>', | |
| unsafe_allow_html=True, | |
| ) | |
| col_ctrl, col_info = st.columns([1, 2]) | |
| with col_ctrl: | |
| camera_index = st.number_input("Camera Index", min_value=0, max_value=10, value=0) | |
| max_webcam_frames = st.slider("Capture Frames", min_value=10, max_value=300, value=60) | |
| run_webcam = st.button("π· Start Webcam Detection", type="primary", | |
| use_container_width=True) | |
| with col_info: | |
| st.info(""" | |
| **π Instructions:** | |
| 1. Select your camera index (0 for default) | |
| 2. Set the number of frames to capture | |
| 3. Click **Start Webcam Detection** | |
| > β οΈ Webcam access requires a local browser session. | |
| > On Render / cloud deployments use Image or Video mode instead. | |
| """) | |
| if not run_webcam or model is None: | |
| return | |
| cap = cv2.VideoCapture(int(camera_index)) | |
| if not cap.isOpened(): | |
| st.error(f"β Cannot open camera (index {camera_index}).") | |
| return | |
| st.success(f"β Camera opened (index {camera_index})") | |
| frame_slot = st.empty() | |
| metrics_slot = st.empty() | |
| stop_btn = st.button("βΉ Stop", key="stop_webcam") | |
| all_dets: List[Dict] = [] | |
| fps_times: List[float] = [] | |
| frame_num = 0 | |
| try: | |
| while frame_num < max_webcam_frames and not stop_btn: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| t0 = time.perf_counter() | |
| annotated, dets, inf_ms = run_inference( | |
| model, frame, cfg["conf_threshold"], cfg["iou_threshold"], | |
| ) | |
| fps_times.append(time.perf_counter() - t0) | |
| fps_val = 1.0 / (fps_times[-1] + 1e-9) | |
| dets_filtered = [d for d in dets if d["class_name"] in cfg["selected_classes"]] | |
| all_dets.extend(dets_filtered) | |
| cv2.putText(annotated, f"FPS: {fps_val:.1f} Frame: {frame_num}", | |
| (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2) | |
| frame_slot.image(bgr_to_rgb(annotated), | |
| caption=f"Frame {frame_num} | {len(dets_filtered)} detection(s)", | |
| use_column_width=True) | |
| with metrics_slot.container(): | |
| m1, m2, m3 = st.columns(3) | |
| m1.metric("Frame", frame_num) | |
| m2.metric("FPS", f"{fps_val:.1f}") | |
| m3.metric("Detections", len(dets_filtered)) | |
| del annotated, dets | |
| gc.collect() | |
| frame_num += 1 | |
| finally: | |
| cap.release() | |
| avg_fps = len(fps_times) / (sum(fps_times) + 1e-9) | |
| st.success( | |
| f"β Session ended | Frames: **{frame_num}** | " | |
| f"Avg FPS: **{avg_fps:.1f}** | Total detections: **{len(all_dets)}**" | |
| ) | |
| if all_dets: | |
| st.divider() | |
| avg_ms = sum(fps_times) / max(1, len(fps_times)) * 1000 | |
| render_detection_stats(all_dets, avg_ms) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Tab 4 β Analytics (static benchmarks, no pandas required) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def tab_analytics(cfg: Dict) -> None: | |
| st.markdown( | |
| '<div class="section-header">π Model & Dataset Analytics</div>', | |
| unsafe_allow_html=True, | |
| ) | |
| # ββ YOLOv8 variant comparison βββββββββββββββββββββββββββββββββββββββββββββ | |
| st.markdown("#### π€ YOLOv8 Variant Comparison") | |
| variants = ["YOLOv8n", "YOLOv8s", "YOLOv8m", "YOLOv8l", "YOLOv8x"] | |
| params = [3.2, 11.2, 25.9, 43.7, 68.2] | |
| fps_vals = [310, 200, 142, 95, 68] | |
| map_vals = [0.65, 0.70, 0.74, 0.76, 0.78] | |
| lat_vals = [3.2, 5.0, 7.0, 10.5, 14.7] | |
| col_a, col_b = st.columns(2) | |
| with col_a: | |
| fig_fps = px.bar( | |
| x=variants, y=fps_vals, color=variants, text=fps_vals, | |
| title="Inference Speed (FPS)", | |
| labels={"x": "Variant", "y": "FPS (GPU)"}, | |
| color_discrete_sequence=px.colors.sequential.Blues_r, | |
| ) | |
| fig_fps.update_layout(**_chart_layout()) | |
| fig_fps.update_traces(textposition="outside") | |
| st.plotly_chart(fig_fps, use_container_width=True) | |
| with col_b: | |
| fig_map = px.bar( | |
| x=variants, y=map_vals, color=variants, text=map_vals, | |
| title="mAP@50 Score", | |
| labels={"x": "Variant", "y": "mAP@50"}, | |
| color_discrete_sequence=px.colors.sequential.Greens_r, | |
| ) | |
| fig_map.update_layout(**_chart_layout()) | |
| fig_map.update_traces(textfont_size=11, textposition="outside") | |
| fig_map.update_yaxes(range=[0, 0.95]) | |
| st.plotly_chart(fig_map, use_container_width=True) | |
| fig_scatter = px.scatter( | |
| x=map_vals, y=fps_vals, | |
| size=params, color=variants, text=variants, | |
| hover_name=variants, | |
| title="Speed vs Accuracy Trade-off (bubble size = model parameters)", | |
| labels={"x": "mAP@50", "y": "FPS (GPU)"}, | |
| size_max=50, | |
| ) | |
| fig_scatter.update_layout( | |
| plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", | |
| font_color="#c9d1d9", margin=dict(t=40, b=20), | |
| ) | |
| fig_scatter.update_traces(textposition="top center") | |
| st.plotly_chart(fig_scatter, use_container_width=True) | |
| # ββ Deployment benchmark ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.markdown("#### β‘ Export Format Benchmark (YOLOv8m)") | |
| fmt_names = ["PyTorch FP32", "PyTorch FP16", "ONNX FP32", "ONNX FP16", "TensorRT FP16"] | |
| fmt_fps = [85, 142, 95, 160, 310] | |
| fmt_lat = [11.8, 7.0, 10.5, 6.2, 3.2] | |
| fmt_map = [0.72, 0.72, 0.72, 0.72, 0.71] | |
| col_c, col_d = st.columns(2) | |
| with col_c: | |
| fig_deploy = px.bar( | |
| x=fmt_names, y=fmt_fps, | |
| color=fmt_fps, color_continuous_scale="RdYlGn", | |
| text=fmt_fps, title="Inference Speed by Export Format", | |
| labels={"x": "Format", "y": "FPS"}, | |
| ) | |
| fig_deploy.update_layout(**_chart_layout()) | |
| fig_deploy.update_xaxes(tickangle=-30) | |
| st.plotly_chart(fig_deploy, use_container_width=True) | |
| with col_d: | |
| st.markdown("##### Benchmark Summary") | |
| st.table({ | |
| "Format": fmt_names, | |
| "FPS": fmt_fps, | |
| "Latency (ms)": fmt_lat, | |
| "mAP@50": [f"{v:.2f}" for v in fmt_map], | |
| }) | |
| # ββ Per-class metrics βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| st.divider() | |
| st.markdown("#### π·οΈ Per-Class Detection Metrics (YOLOv8n β COCO)") | |
| ap_vals = [0.78, 0.64, 0.81, 0.68, 0.74, 0.69, 0.62, 0.75] | |
| prec_vals = [0.82, 0.70, 0.86, 0.73, 0.79, 0.74, 0.68, 0.81] | |
| rec_vals = [0.74, 0.60, 0.77, 0.64, 0.70, 0.65, 0.57, 0.71] | |
| f1_vals = [0.78, 0.65, 0.81, 0.68, 0.74, 0.69, 0.62, 0.76] | |
| metrics_to_plot = ["AP@50", "Precision", "Recall", "F1"] | |
| metrics_data = {"AP@50": ap_vals, "Precision": prec_vals, "Recall": rec_vals, "F1": f1_vals} | |
| fig_radar = go.Figure() | |
| for i, cls_name in enumerate(CLASS_NAMES): | |
| r_vals = [metrics_data[m][i] for m in metrics_to_plot] | |
| fig_radar.add_trace(go.Scatterpolar( | |
| r=r_vals + [r_vals[0]], | |
| theta=metrics_to_plot + [metrics_to_plot[0]], | |
| name=f"{CLASS_ICONS.get(cls_name,'')} {cls_name}", | |
| mode="lines", | |
| line_width=1.5, | |
| )) | |
| fig_radar.update_layout( | |
| polar=dict( | |
| radialaxis=dict(visible=True, range=[0, 1], color="#6b7280"), | |
| angularaxis=dict(color="#c9d1d9"), | |
| bgcolor="rgba(0,0,0,0)", | |
| ), | |
| plot_bgcolor="rgba(0,0,0,0)", | |
| paper_bgcolor="rgba(0,0,0,0)", | |
| font_color="#c9d1d9", | |
| title="Per-Class Metrics Radar Chart", | |
| legend=dict(orientation="h", y=-0.15), | |
| margin=dict(t=60, b=80), | |
| showlegend=True, | |
| ) | |
| st.plotly_chart(fig_radar, use_container_width=True) | |
| st.table({ | |
| "Class": CLASS_NAMES, | |
| "AP@50": [f"{v:.3f}" for v in ap_vals], | |
| "Precision": [f"{v:.3f}" for v in prec_vals], | |
| "Recall": [f"{v:.3f}" for v in rec_vals], | |
| "F1": [f"{v:.3f}" for v in f1_vals], | |
| }) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main App | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main() -> None: | |
| st.markdown(""" | |
| <div style="text-align:center; padding: 24px 0 16px;"> | |
| <h1 style="color:#58a6ff; font-size:2.4rem; font-weight:800; | |
| letter-spacing:-0.02em; margin:0;"> | |
| π Autonomous Vehicle Obstacle Detection | |
| </h1> | |
| <p style="color:#8b9dc3; font-size:1.05rem; margin:8px 0 0;"> | |
| Real-Time YOLOv8n Deep Learning Detection System | |
| </p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| cfg = render_sidebar() | |
| model = load_model() | |
| if model is not None: | |
| col_s1, col_s2, col_s3, col_s4 = st.columns(4) | |
| col_s1.metric("π€ Model", "YOLOv8n") | |
| col_s2.metric("π― Confidence", f"{cfg['conf_threshold']:.0%}") | |
| col_s3.metric("π IoU Threshold", f"{cfg['iou_threshold']:.0%}") | |
| col_s4.metric("π» Device", "CPU") | |
| st.divider() | |
| else: | |
| st.warning("β οΈ Model failed to load. Check the application logs for details.") | |
| tab1, tab2, tab3, tab4 = st.tabs([ | |
| "πΌοΈ Image Detection", | |
| "π¬ Video Detection", | |
| "π· Webcam", | |
| "π Analytics", | |
| ]) | |
| with tab1: | |
| tab_image_detection(model, cfg) | |
| with tab2: | |
| tab_video_detection(model, cfg) | |
| with tab3: | |
| tab_webcam_detection(model, cfg) | |
| with tab4: | |
| tab_analytics(cfg) | |
| st.markdown(""" | |
| <hr style="border-color:#21262d; margin:40px 0 10px;"/> | |
| <div style="text-align:center; color:#6b7280; font-size:0.8rem; padding-bottom:20px;"> | |
| Autonomous Vehicle Obstacle Detection Β· | |
| YOLOv8n Β· Ultralytics Β· OpenCV Β· Streamlit<br/> | |
| <a href="https://github.com/pun33th45/autonomous-vehicle-obstacle-detection-yolo" | |
| style="color:#58a6ff;">β GitHub Repository</a> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| if __name__ == "__main__": | |
| main() | |