road1 / app.py
Yaswanth56's picture
Update app.py
e303948 verified
import gradio as gr
import cv2
import time
from datetime import datetime
from services.video_service import get_next_video_frame
from services.crack_detection_service import detect_cracks_and_objects
from services.overlay_service import overlay_boxes
from services.metrics_service import update_metrics
from services.map_service import generate_map
import random
import matplotlib.pyplot as plt
from collections import Counter
import numpy as np
# Globals
paused = False
frame_rate = 0.5 # You can adjust the frame rate if the video is too fast
frame_count = 0
log_entries = []
crack_counts = []
last_frame = None
last_metrics = {}
last_timestamp = ""
last_detected_images = [] # Store up to 100+ crack images
gps_coordinates = []
# Core monitor function
def monitor_feed():
global paused, frame_count, last_frame, last_metrics, last_timestamp, gps_coordinates, last_detected_images
if paused and last_frame is not None:
frame = last_frame.copy()
metrics = last_metrics.copy()
else:
try:
frame = get_next_video_frame()
except RuntimeError as e:
log_entries.append(f"Error: {str(e)}")
return None, last_metrics, "\n".join(log_entries[-10:]), None, None, last_detected_images, None
detected_items = detect_cracks_and_objects(frame)
frame = overlay_boxes(frame, detected_items)
cv2.imwrite("temp.jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
metrics = update_metrics(detected_items)
frame_count += 1
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001]
gps_coordinates.append(gps_coord)
if any(item['type'] == 'crack' for item in detected_items):
captured_frame_path = f"captured_frames/crack_{frame_count}.jpg"
cv2.imwrite(captured_frame_path, frame)
last_detected_images.append(captured_frame_path)
if len(last_detected_images) > 100:
last_detected_images.pop(0)
last_frame = frame.copy()
last_metrics = metrics.copy()
log_entries.append(f"{last_timestamp} - Frame {frame_count} - Cracks: {len([item for item in detected_items if item['type'] == 'crack'])} - GPS: {gps_coord}")
crack_counts.append(len([item for item in detected_items if item['type'] == 'crack']))
frame = cv2.resize(last_frame, (640, 480))
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
map_path = generate_map(gps_coordinates[-5:], [item for item in last_metrics.get('items', []) if item['type'] == 'crack'])
return frame[:, :, ::-1], last_metrics, "\n".join(log_entries[-10:]), generate_line_chart(), generate_pie_chart(), last_detected_images, map_path
# Line chart function (for crack count over time)
def generate_line_chart():
if not crack_counts:
return None
fig, ax = plt.subplots(figsize=(4, 2))
ax.plot(crack_counts[-50:], marker='o')
ax.set_title("Cracks Over Time")
ax.set_xlabel("Frame")
ax.set_ylabel("Count")
fig.tight_layout()
chart_path = "chart_temp.png"
fig.savefig(chart_path)
plt.close(fig)
return chart_path
# Pie chart function (for crack severity)
def generate_pie_chart():
if not crack_counts:
return None
fig, ax = plt.subplots(figsize=(4, 2))
count = Counter(crack_counts[-200:])
labels, sizes = zip(*count.items())
ax.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=140)
ax.axis('equal')
fig.tight_layout()
pie_path = "pie_temp.png"
fig.savefig(pie_path)
plt.close(fig)
return pie_path
# Gradio UI
with gr.Blocks() as app:
gr.Markdown("# 🛡️ Drone Road Inspection Dashboard")
status_text = gr.Markdown("**Status:** 🟢 Running")
with gr.Row():
with gr.Column(scale=3):
video_output = gr.Image(label="Live Drone Feed", width=640, height=480)
with gr.Column(scale=1):
metrics_output = gr.Textbox(label="Crack Metrics", lines=4)
with gr.Row():
logs_output = gr.Textbox(label="Live Logs", lines=8)
chart_output = gr.Image(label="Crack Trend")
pie_output = gr.Image(label="Crack Severity")
with gr.Row():
map_output = gr.HTML(label="Crack Locations Map") # Using HTML to display the map
captured_images = gr.Gallery(label="Detected Cracks (Last 100+)", columns=4, rows=25)
with gr.Row():
pause_btn = gr.Button("⏸️ Pause")
resume_btn = gr.Button("▶️ Resume")
frame_slider = gr.Slider(0.0005, 5, value=0.5, label="Frame Interval (seconds)")
def toggle_pause():
global paused
paused = True
return "**Status:** ⏸️ Paused"
def toggle_resume():
global paused
paused = False
return "**Status:** 🟢 Running"
def set_frame_rate(val):
global frame_rate
frame_rate = val
pause_btn.click(toggle_pause, outputs=status_text)
resume_btn.click(toggle_resume, outputs=status_text)
frame_slider.change(set_frame_rate, inputs=[frame_slider])
def streaming_loop():
while True:
frame, metrics, logs, chart, pie, captured, map_path = monitor_feed()
if frame is None:
yield None, str(metrics), logs, chart, pie, captured, map_path
else:
yield frame, str(metrics), logs, chart, pie, captured, map_path
time.sleep(frame_rate)
app.load(streaming_loop, outputs=[video_output, metrics_output, logs_output, chart_output, pie_output, captured_images, map_output])
if __name__ == "__main__":
app.launch(share=False) # Share should be False on Hugging Face Spaces