import gradio as gr import cv2 import numpy as np import time from datetime import datetime from collections import deque, defaultdict import matplotlib.pyplot as plt from services.video_service import VideoService from services.detection_service import DetectionService from services.thermal_service import ThermalService from services.shadow_detection import ShadowDetection from services.salesforce_dispatcher import SalesforceDispatcher import os # Initialize services video_service = VideoService() detection_service = DetectionService(model_name="facebook/detr-resnet-50") thermal_service = ThermalService() shadow_detection = ShadowDetection() salesforce_dispatcher = SalesforceDispatcher() # Paths to video files VIDEO_PATHS = { "Day Feed": "data/drone_day.mp4", "Night Feed": "data/night_intrusion.mp4", "Thermal Feed": "data/thermal_hotspot.mp4", "Shadow/Dust Feed": "data/shadow_dust_issue.mp4", } # State for live feed class LiveFeedState: def __init__(self): self.anomaly_history = deque(maxlen=100) # Last 100 frames for trend self.anomaly_types = defaultdict(int) # Count of each anomaly type self.captured_events = deque(maxlen=5) # Last 5 events with frames self.total_detected = 0 # Total anomalies detected self.logs = deque(maxlen=10) # Last 10 log entries self.frame_count = 0 # Frame counter def live_feed_generator(video_type, confidence_threshold=0.9): """Generator for live feed with real-time detection.""" state = LiveFeedState() video_path = VIDEO_PATHS.get(video_type) if not video_path or not os.path.exists(video_path): yield gr.update(value="Video file not found."), None, None, None, None, None, None return cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) frame_interval = 1 / fps # Time between frames for real-time simulation while cap.isOpened(): ret, frame = cap.read() if not ret: cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # Loop the video continue state.frame_count += 1 frame_pil = video_service.frame_to_pil(frame) # Perform detection based on video type if video_type == "Thermal Feed": detections = thermal_service.detect_hotspots(frame_pil, detection_service, confidence_threshold) alert_type = "Overheating" elif video_type == "Shadow/Dust Feed": detections = shadow_detection.detect_shadow_dust(frame_pil, detection_service, confidence_threshold) alert_type = "Shadow/Dust" else: detections = detection_service.detect_objects(frame_pil, confidence_threshold) alert_type = "General" # Draw detections on frame annotated_frame = video_service.draw_detections(frame, detections) annotated_frame_rgb = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB) # Update state num_anomalies = len(detections) state.anomaly_history.append(num_anomalies) state.total_detected += num_anomalies # Update anomaly types for detection in detections: label = detection["label"] state.anomaly_types[label] += 1 # Log detection timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"{timestamp} - Frame {state.frame_count} - Anomalies: {num_anomalies}" state.logs.append(log_entry) # Capture events (frames with anomalies) if num_anomalies > 0: state.captured_events.append(annotated_frame_rgb) # Generate Salesforce case and notifications if num_anomalies > 0: case_id = salesforce_dispatcher.create_case( subject=f"{alert_type} Detected in {video_type} (Frame {state.frame_count})", description=str(detections) ) salesforce_dispatcher.send_email( to="admin@solarplant.com", subject=f"Alert: {alert_type} in {video_type}", body=f"Case ID: {case_id}\nDetails: {detections}\nFrame: {state.frame_count}" ) salesforce_dispatcher.notify_security_team( message=f"Alert: {alert_type} detected in {video_type}. Case ID: {case_id}, Frame: {state.frame_count}" ) # Generate live metrics metrics = [] for detection in detections: box = detection["box"] coords = f"[{box['xmin']},{box['ymin']},{box['xmax']},{box['ymax']}]" metrics.append(coords) metrics_str = f"Coordinates: {metrics}\nTotal Detected: {state.total_detected}" # Generate detection trend plot with dark theme plt.style.use('dark_background') plt.figure(figsize=(4, 2)) plt.plot(list(state.anomaly_history), marker='o', color='yellow') plt.title("Anomalies Over Time", color='white') plt.xlabel("Frame", color='white') plt.ylabel("Count", color='white') plt.grid(True, color='gray') plt.tick_params(colors='white') trend_plot = plt.gcf() plt.close() # Generate anomaly types summary anomaly_types_str = "\n".join([f"{k}: {v}" for k, v in state.anomaly_types.items()]) # Update timestamp timestamp_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Yield updated UI components yield ( gr.update(value=annotated_frame_rgb), # Live Video Feed gr.update(value=metrics_str), # Live Metrics gr.update(value="\n".join(state.logs)), # Live Logs gr.update(value=trend_plot), # Detection Trend gr.update(value=anomaly_types_str), # Anomaly Types gr.update(value=list(state.captured_events)), # Captured Events gr.update(value=timestamp_str) # Timestamp ) # Simulate real-time by sleeping between frames time.sleep(frame_interval) cap.release() # Custom CSS for dark theme and styling custom_css = """ body, .gradio-container { background-color: #1a1a1a !important; color: white !important; font-family: Arial, sans-serif !important; } h1, h2, h3, label { color: white !important; font-weight: bold !important; } .gradio-row, .gradio-column { background-color: #2b2b2b !important; border-radius: 8px !important; padding: 10px !important; margin: 5px !important; } #live-feed { border: 2px solid #444 !important; border-radius: 8px !important; } #live-metrics, #live-logs, #anomaly-types { background-color: #333 !important; color: white !important; border: 1px solid #555 !important; border-radius: 8px !important; padding: 10px !important; height: 100px !important; overflow-y: auto !important; } #detection-trend, #captured-events { background-color: #333 !important; border: 1px solid #555 !important; border-radius: 8px !important; padding: 10px !important; } #status-indicator { color: #00ff00 !important; font-size: 14px !important; } #timestamp { font-size: 16px !important; color: #cccccc !important; } """ # Gradio Interface with gr.Blocks(css=custom_css) as demo: gr.Markdown("# Fault Detection") timestamp = gr.Textbox(label="", value=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), elem_id="timestamp") with gr.Row(): # Left Panel: Live Feed and Controls with gr.Column(scale=7): with gr.Row(): video_type = gr.Dropdown( choices=["Day Feed", "Night Feed", "Thermal Feed", "Shadow/Dust Feed"], label="Select Drone Feed", value="Thermal Feed" ) confidence_threshold = gr.Slider(0.5, 1.0, value=0.9, label="Confidence Threshold") start_button = gr.Button("Start Live Feed") live_feed = gr.Image(label="Live Video Feed", streaming=True, elem_id="live-feed") status_indicator = gr.HTML( '

Status: Running

', label="" ) # Right Panel: Analytics with gr.Column(scale=3): live_metrics = gr.Textbox(label="Live Metrics", elem_id="live-metrics") live_logs = gr.Textbox(label="Live Logs", elem_id="live-logs") detection_trend = gr.Plot(label="Detection Trend", elem_id="detection-trend") anomaly_types = gr.Textbox(label="Anomaly Types", elem_id="anomaly-types") captured_events = gr.Gallery(label="Captured Events (Last 5)", elem_id="captured-events") start_button.click( fn=live_feed_generator, inputs=[video_type, confidence_threshold], outputs=[live_feed, live_metrics, live_logs, detection_trend, anomaly_types, captured_events, timestamp] ) demo.launch()