DSatishchandra's picture
Update app.py
62dde81 verified
import gradio as gr
import cv2
import numpy as np
import time
from datetime import datetime
from collections import deque, defaultdict
import matplotlib.pyplot as plt
from services.video_service import VideoService
from services.detection_service import DetectionService
from services.thermal_service import ThermalService
from services.shadow_detection import ShadowDetection
from services.salesforce_dispatcher import SalesforceDispatcher
import os
# Initialize services
video_service = VideoService()
detection_service = DetectionService(model_name="facebook/detr-resnet-50")
thermal_service = ThermalService()
shadow_detection = ShadowDetection()
salesforce_dispatcher = SalesforceDispatcher()
# Paths to video files
VIDEO_PATHS = {
"Day Feed": "data/drone_day.mp4",
"Night Feed": "data/night_intrusion.mp4",
"Thermal Feed": "data/thermal_hotspot.mp4",
"Shadow/Dust Feed": "data/shadow_dust_issue.mp4",
}
# State for live feed
class LiveFeedState:
def __init__(self):
self.anomaly_history = deque(maxlen=100) # Last 100 frames for trend
self.anomaly_types = defaultdict(int) # Count of each anomaly type
self.captured_events = deque(maxlen=5) # Last 5 events with frames
self.total_detected = 0 # Total anomalies detected
self.logs = deque(maxlen=10) # Last 10 log entries
self.frame_count = 0 # Frame counter
def live_feed_generator(video_type, confidence_threshold=0.9):
"""Generator for live feed with real-time detection."""
state = LiveFeedState()
video_path = VIDEO_PATHS.get(video_type)
if not video_path or not os.path.exists(video_path):
yield gr.update(value="Video file not found."), None, None, None, None, None, None
return
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = 1 / fps # Time between frames for real-time simulation
while cap.isOpened():
ret, frame = cap.read()
if not ret:
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # Loop the video
continue
state.frame_count += 1
frame_pil = video_service.frame_to_pil(frame)
# Perform detection based on video type
if video_type == "Thermal Feed":
detections = thermal_service.detect_hotspots(frame_pil, detection_service, confidence_threshold)
alert_type = "Overheating"
elif video_type == "Shadow/Dust Feed":
detections = shadow_detection.detect_shadow_dust(frame_pil, detection_service, confidence_threshold)
alert_type = "Shadow/Dust"
else:
detections = detection_service.detect_objects(frame_pil, confidence_threshold)
alert_type = "General"
# Draw detections on frame
annotated_frame = video_service.draw_detections(frame, detections)
annotated_frame_rgb = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)
# Update state
num_anomalies = len(detections)
state.anomaly_history.append(num_anomalies)
state.total_detected += num_anomalies
# Update anomaly types
for detection in detections:
label = detection["label"]
state.anomaly_types[label] += 1
# Log detection
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"{timestamp} - Frame {state.frame_count} - Anomalies: {num_anomalies}"
state.logs.append(log_entry)
# Capture events (frames with anomalies)
if num_anomalies > 0:
state.captured_events.append(annotated_frame_rgb)
# Generate Salesforce case and notifications
if num_anomalies > 0:
case_id = salesforce_dispatcher.create_case(
subject=f"{alert_type} Detected in {video_type} (Frame {state.frame_count})",
description=str(detections)
)
salesforce_dispatcher.send_email(
to="admin@solarplant.com",
subject=f"Alert: {alert_type} in {video_type}",
body=f"Case ID: {case_id}\nDetails: {detections}\nFrame: {state.frame_count}"
)
salesforce_dispatcher.notify_security_team(
message=f"Alert: {alert_type} detected in {video_type}. Case ID: {case_id}, Frame: {state.frame_count}"
)
# Generate live metrics
metrics = []
for detection in detections:
box = detection["box"]
coords = f"[{box['xmin']},{box['ymin']},{box['xmax']},{box['ymax']}]"
metrics.append(coords)
metrics_str = f"Coordinates: {metrics}\nTotal Detected: {state.total_detected}"
# Generate detection trend plot with dark theme
plt.style.use('dark_background')
plt.figure(figsize=(4, 2))
plt.plot(list(state.anomaly_history), marker='o', color='yellow')
plt.title("Anomalies Over Time", color='white')
plt.xlabel("Frame", color='white')
plt.ylabel("Count", color='white')
plt.grid(True, color='gray')
plt.tick_params(colors='white')
trend_plot = plt.gcf()
plt.close()
# Generate anomaly types summary
anomaly_types_str = "\n".join([f"{k}: {v}" for k, v in state.anomaly_types.items()])
# Update timestamp
timestamp_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Yield updated UI components
yield (
gr.update(value=annotated_frame_rgb), # Live Video Feed
gr.update(value=metrics_str), # Live Metrics
gr.update(value="\n".join(state.logs)), # Live Logs
gr.update(value=trend_plot), # Detection Trend
gr.update(value=anomaly_types_str), # Anomaly Types
gr.update(value=list(state.captured_events)), # Captured Events
gr.update(value=timestamp_str) # Timestamp
)
# Simulate real-time by sleeping between frames
time.sleep(frame_interval)
cap.release()
# Custom CSS for dark theme and styling
custom_css = """
body, .gradio-container {
background-color: #1a1a1a !important;
color: white !important;
font-family: Arial, sans-serif !important;
}
h1, h2, h3, label {
color: white !important;
font-weight: bold !important;
}
.gradio-row, .gradio-column {
background-color: #2b2b2b !important;
border-radius: 8px !important;
padding: 10px !important;
margin: 5px !important;
}
#live-feed {
border: 2px solid #444 !important;
border-radius: 8px !important;
}
#live-metrics, #live-logs, #anomaly-types {
background-color: #333 !important;
color: white !important;
border: 1px solid #555 !important;
border-radius: 8px !important;
padding: 10px !important;
height: 100px !important;
overflow-y: auto !important;
}
#detection-trend, #captured-events {
background-color: #333 !important;
border: 1px solid #555 !important;
border-radius: 8px !important;
padding: 10px !important;
}
#status-indicator {
color: #00ff00 !important;
font-size: 14px !important;
}
#timestamp {
font-size: 16px !important;
color: #cccccc !important;
}
"""
# Gradio Interface
with gr.Blocks(css=custom_css) as demo:
gr.Markdown("# Fault Detection")
timestamp = gr.Textbox(label="", value=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), elem_id="timestamp")
with gr.Row():
# Left Panel: Live Feed and Controls
with gr.Column(scale=7):
with gr.Row():
video_type = gr.Dropdown(
choices=["Day Feed", "Night Feed", "Thermal Feed", "Shadow/Dust Feed"],
label="Select Drone Feed",
value="Thermal Feed"
)
confidence_threshold = gr.Slider(0.5, 1.0, value=0.9, label="Confidence Threshold")
start_button = gr.Button("Start Live Feed")
live_feed = gr.Image(label="Live Video Feed", streaming=True, elem_id="live-feed")
status_indicator = gr.HTML(
'<p id="status-indicator">Status: <span style="color: green;">Running</span> •</p>',
label=""
)
# Right Panel: Analytics
with gr.Column(scale=3):
live_metrics = gr.Textbox(label="Live Metrics", elem_id="live-metrics")
live_logs = gr.Textbox(label="Live Logs", elem_id="live-logs")
detection_trend = gr.Plot(label="Detection Trend", elem_id="detection-trend")
anomaly_types = gr.Textbox(label="Anomaly Types", elem_id="anomaly-types")
captured_events = gr.Gallery(label="Captured Events (Last 5)", elem_id="captured-events")
start_button.click(
fn=live_feed_generator,
inputs=[video_type, confidence_threshold],
outputs=[live_feed, live_metrics, live_logs, detection_trend, anomaly_types, captured_events, timestamp]
)
demo.launch()