import gradio as gr import cv2 import time from datetime import datetime from services.video_service import get_next_video_frame from services.crack_detection_service import detect_cracks_and_objects from services.overlay_service import overlay_boxes from services.metrics_service import update_metrics from services.map_service import generate_map import random import matplotlib.pyplot as plt from collections import Counter import numpy as np # Globals paused = False frame_rate = 0.5 # You can adjust the frame rate if the video is too fast frame_count = 0 log_entries = [] crack_counts = [] last_frame = None last_metrics = {} last_timestamp = "" last_detected_images = [] # Store up to 100+ crack images gps_coordinates = [] # Core monitor function def monitor_feed(): global paused, frame_count, last_frame, last_metrics, last_timestamp, gps_coordinates, last_detected_images if paused and last_frame is not None: frame = last_frame.copy() metrics = last_metrics.copy() else: try: frame = get_next_video_frame() except RuntimeError as e: log_entries.append(f"Error: {str(e)}") return None, last_metrics, "\n".join(log_entries[-10:]), None, None, last_detected_images, None detected_items = detect_cracks_and_objects(frame) frame = overlay_boxes(frame, detected_items) cv2.imwrite("temp.jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95]) metrics = update_metrics(detected_items) frame_count += 1 last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001] gps_coordinates.append(gps_coord) if any(item['type'] == 'crack' for item in detected_items): captured_frame_path = f"captured_frames/crack_{frame_count}.jpg" cv2.imwrite(captured_frame_path, frame) last_detected_images.append(captured_frame_path) if len(last_detected_images) > 100: last_detected_images.pop(0) last_frame = frame.copy() last_metrics = metrics.copy() log_entries.append(f"{last_timestamp} - Frame {frame_count} - Cracks: {len([item for item in detected_items if item['type'] == 'crack'])} - GPS: {gps_coord}") crack_counts.append(len([item for item in detected_items if item['type'] == 'crack'])) frame = cv2.resize(last_frame, (640, 480)) cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) map_path = generate_map(gps_coordinates[-5:], [item for item in last_metrics.get('items', []) if item['type'] == 'crack']) return frame[:, :, ::-1], last_metrics, "\n".join(log_entries[-10:]), generate_line_chart(), generate_pie_chart(), last_detected_images, map_path # Line chart function (for crack count over time) def generate_line_chart(): if not crack_counts: return None fig, ax = plt.subplots(figsize=(4, 2)) ax.plot(crack_counts[-50:], marker='o') ax.set_title("Cracks Over Time") ax.set_xlabel("Frame") ax.set_ylabel("Count") fig.tight_layout() chart_path = "chart_temp.png" fig.savefig(chart_path) plt.close(fig) return chart_path # Pie chart function (for crack severity) def generate_pie_chart(): if not crack_counts: return None fig, ax = plt.subplots(figsize=(4, 2)) count = Counter(crack_counts[-200:]) labels, sizes = zip(*count.items()) ax.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=140) ax.axis('equal') fig.tight_layout() pie_path = "pie_temp.png" fig.savefig(pie_path) plt.close(fig) return pie_path # Gradio UI with gr.Blocks() as app: gr.Markdown("# 🛡️ Drone Road Inspection Dashboard") status_text = gr.Markdown("**Status:** 🟢 Running") with gr.Row(): with gr.Column(scale=3): video_output = gr.Image(label="Live Drone Feed", width=640, height=480) with gr.Column(scale=1): metrics_output = gr.Textbox(label="Crack Metrics", lines=4) with gr.Row(): logs_output = gr.Textbox(label="Live Logs", lines=8) chart_output = gr.Image(label="Crack Trend") pie_output = gr.Image(label="Crack Severity") with gr.Row(): map_output = gr.HTML(label="Crack Locations Map") # Using HTML to display the map captured_images = gr.Gallery(label="Detected Cracks (Last 100+)", columns=4, rows=25) with gr.Row(): pause_btn = gr.Button("⏸️ Pause") resume_btn = gr.Button("▶️ Resume") frame_slider = gr.Slider(0.0005, 5, value=0.5, label="Frame Interval (seconds)") def toggle_pause(): global paused paused = True return "**Status:** ⏸️ Paused" def toggle_resume(): global paused paused = False return "**Status:** 🟢 Running" def set_frame_rate(val): global frame_rate frame_rate = val pause_btn.click(toggle_pause, outputs=status_text) resume_btn.click(toggle_resume, outputs=status_text) frame_slider.change(set_frame_rate, inputs=[frame_slider]) def streaming_loop(): while True: frame, metrics, logs, chart, pie, captured, map_path = monitor_feed() if frame is None: yield None, str(metrics), logs, chart, pie, captured, map_path else: yield frame, str(metrics), logs, chart, pie, captured, map_path time.sleep(frame_rate) app.load(streaming_loop, outputs=[video_output, metrics_output, logs_output, chart_output, pie_output, captured_images, map_output]) if __name__ == "__main__": app.launch(share=False) # Share should be False on Hugging Face Spaces