lokesh341's picture
Update app.py
f91a82f verified
import gradio as gr
import cv2
import time
import os
import random
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
from collections import Counter
from services.video_service import get_next_video_frame, reset_video_index
from services.crack_detection_service import detect_cracks_and_objects
from services.overlay_service import overlay_boxes
from services.metrics_service import update_metrics
from services.map_service import generate_map
# Globals
paused = False
frame_rate = 0.5 # Faster frame rate for quicker video reading
frame_count = 0
log_entries = []
crack_counts = []
crack_severity_all = []
last_frame = None
last_metrics = {}
last_timestamp = ""
last_detected_images = [] # Store up to 100+ crack images
gps_coordinates = []
# Constants
TEMP_IMAGE_PATH = "temp.jpg"
CAPTURED_FRAMES_DIR = "captured_frames"
os.makedirs(CAPTURED_FRAMES_DIR, exist_ok=True)
# Core monitor function
def monitor_feed():
global paused, frame_count, last_frame, last_metrics, last_timestamp, gps_coordinates, last_detected_images
if paused and last_frame is not None:
frame = last_frame.copy()
metrics = last_metrics.copy()
else:
try:
frame = get_next_video_frame()
except RuntimeError as e:
log_entries.append(f"Error: {str(e)}")
return None, last_metrics, "\n".join(log_entries[-10:]), None, None, last_detected_images, None
detected_items = detect_cracks_and_objects(frame) # Includes cracks and objects
frame = overlay_boxes(frame, detected_items)
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
metrics = update_metrics(detected_items)
frame_count += 1
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001]
gps_coordinates.append(gps_coord)
if any(item['type'] == 'crack' for item in detected_items):
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"crack_{frame_count}.jpg")
cv2.imwrite(captured_frame_path, frame)
last_detected_images.append(captured_frame_path)
if len(last_detected_images) > 100: # Store up to 100 images
last_detected_images.pop(0)
last_frame = frame.copy()
last_metrics = metrics.copy()
# Update logs and stats
crack_detected = len([item for item in last_metrics.get('items', []) if item['type'] == 'crack'])
crack_severity_all.extend([
item['severity']
for item in last_metrics.get('items', [])
if item['type'] == 'crack' and isinstance(item, dict) and 'severity' in item
])
log_entries.append(f"{last_timestamp} - Frame {frame_count} - Cracks: {crack_detected} - GPS: {gps_coord}")
crack_counts.append(crack_detected)
if len(log_entries) > 100:
log_entries.pop(0)
if len(crack_counts) > 500:
crack_counts.pop(0)
if len(crack_severity_all) > 500:
crack_severity_all.pop(0)
frame = cv2.resize(last_frame, (640, 480))
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
map_path = generate_map(gps_coordinates[-5:], [item for item in last_metrics.get('items', []) if item['type'] == 'crack'])
return frame[:, :, ::-1], last_metrics, "\n".join(log_entries[-10:]), generate_line_chart(), generate_pie_chart(), last_detected_images, map_path
# Line chart
def generate_line_chart():
if not crack_counts:
return None
fig, ax = plt.subplots(figsize=(4, 2))
ax.plot(crack_counts[-50:], marker='o')
ax.set_title("Cracks Over Time")
ax.set_xlabel("Frame")
ax.set_ylabel("Count")
fig.tight_layout()
chart_path = "chart_temp.png"
fig.savefig(chart_path)
plt.close(fig)
return chart_path
# Pie chart for crack severity
def generate_pie_chart():
if not crack_severity_all:
return None
fig, ax = plt.subplots(figsize=(4, 2))
count = Counter(crack_severity_all[-200:])
labels, sizes = zip(*count.items())
ax.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=140)
ax.axis('equal')
fig.tight_layout()
pie_path = "pie_temp.png"
fig.savefig(pie_path)
plt.close(fig)
return pie_path
# Gradio UI
with gr.Blocks(theme=gr.themes.Soft()) as app:
gr.Markdown("# 🛡️ Drone Road Inspection Dashboard")
status_text = gr.Markdown("**Status:** 🟢 Running")
with gr.Row():
with gr.Column(scale=3):
video_output = gr.Image(label="Live Drone Feed", width=640, height=480)
with gr.Column(scale=1):
metrics_output = gr.Textbox(label="Crack Metrics", lines=4)
with gr.Row():
logs_output = gr.Textbox(label="Live Logs", lines=8)
chart_output = gr.Image(label="Crack Trend")
pie_output = gr.Image(label="Crack Severity")
with gr.Row():
map_output = gr.Image(label="Crack Locations Map")
captured_images = gr.Gallery(label="Detected Cracks (Last 100+)", columns=4, rows=25)
with gr.Row():
pause_btn = gr.Button("⏸️ Pause")
resume_btn = gr.Button("▶️ Resume")
frame_slider = gr.Slider(0.0005, 5, value=0.5, label="Frame Interval (seconds)")
def toggle_pause():
global paused
paused = True
return "**Status:** ⏸️ Paused"
def toggle_resume():
global paused
paused = False
return "**Status:** 🟢 Running"
def set_frame_rate(val):
global frame_rate
frame_rate = val
pause_btn.click(toggle_pause, outputs=status_text)
resume_btn.click(toggle_resume, outputs=status_text)
frame_slider.change(set_frame_rate, inputs=[frame_slider])
def streaming_loop():
while True:
frame, metrics, logs, chart, pie, captured, map_path = monitor_feed()
if frame is None:
yield None, str(metrics), logs, chart, pie, captured, map_path
else:
yield frame, str(metrics), logs, chart, pie, captured, map_path
time.sleep(frame_rate)
app.load(streaming_loop, outputs=[video_output, metrics_output, logs_output, chart_output, pie_output, captured_images, map_output])
if __name__ == "__main__":
app.launch(share=True)