Spaces:
Runtime error
Runtime error
File size: 11,072 Bytes
59f8356 f9291b6 62dde81 59f8356 6731aa1 59f8356 26831a6 59f8356 6731aa1 f9291b6 d06bb22 f9291b6 59f8356 62b138f 59f8356 7b0883c 62b138f f9291b6 d06bb22 59f8356 f9291b6 d06bb22 13928c5 d06bb22 f9291b6 62b138f 13928c5 59f8356 f9291b6 d06bb22 59f8356 d06bb22 59f8356 f9291b6 59f8356 7b0883c 59f8356 7b0883c 59f8356 7b0883c 59f8356 d06bb22 59f8356 f9291b6 d06bb22 176b140 7b0883c f9291b6 59f8356 f9291b6 59f8356 f9291b6 59f8356 f9291b6 59f8356 ae655aa f9291b6 59f8356 f9291b6 a3ecb21 59f8356 a3ecb21 f9291b6 a3ecb21 f9291b6 59f8356 f9291b6 a3ecb21 f9291b6 d06bb22 7b0883c f9291b6 59f8356 a3ecb21 59f8356 a3ecb21 f9291b6 a3ecb21 f9291b6 a3ecb21 f9291b6 59f8356 a3ecb21 59f8356 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 |
import gradio as gr
import cv2
import numpy as np
import time
from datetime import datetime
from collections import deque, defaultdict
import matplotlib.pyplot as plt
from services.video_service import VideoService
from services.detection_service import DetectionService
from services.thermal_service import ThermalService
from services.shadow_detection import ShadowDetection
from services.salesforce_dispatcher import SalesforceDispatcher
import os
# Initialize services
video_service = VideoService()
detection_service = DetectionService(model_name="facebook/detr-resnet-50")
thermal_service = ThermalService()
shadow_detection = ShadowDetection()
salesforce_dispatcher = SalesforceDispatcher()
# Get the absolute path of the directory where this script (app.py) resides
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# Paths to video files with absolute paths
VIDEO_PATHS = {
"Day Feed": os.path.join(BASE_DIR, "data", "alert_response (1).mp4"),
"Night Feed": os.path.join(BASE_DIR, "data", "night_intrusion (1).mp4"),
"Thermal Feed": os.path.join(BASE_DIR, "data", "thermal_hotspot (1).mp4"),
}
#"Shadow/Dust Feed": os.path.join(BASE_DIR, "data", "shadow_dust_issue.mp4"),
# Debug prints to verify paths and existence of files
for feed_name, path in VIDEO_PATHS.items():
print(f"Video path for '{feed_name}': {path}")
print(f"Exists? {os.path.exists(path)}")
# State for live feed
class LiveFeedState:
def __init__(self):
self.anomaly_history = deque(maxlen=100) # Last 100 frames for trend
self.anomaly_types = defaultdict(int) # Count of each anomaly type
self.captured_events = deque(maxlen=5) # Last 5 events with frames
self.total_detected = 0 # Total anomalies detected
self.logs = deque(maxlen=10) # Last 10 log entries
self.frame_count = 0 # Frame counter
def simple_test_generator():
import numpy as np
import time
img = np.zeros((480, 640, 3), dtype=np.uint8) # black image
count = 0
while True:
count += 1
yield (
img,
f"Test metrics count: {count}",
f"Test logs count: {count}",
None,
"Test anomaly types",
[],
"Test timestamp"
)
time.sleep(0.5)
def live_feed_generator(video_type, confidence_threshold=0.9):
"""Generator for live feed with real-time detection."""
state = LiveFeedState()
video_path = VIDEO_PATHS.get(video_type)
print(f"Selected video path: {video_path}")
if not video_path or not os.path.exists(video_path):
print("Video file not found or path is incorrect.")
yield gr.update(value="Video file not found."), None, None, None, None, None, None
return
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = 1 / fps # Time between frames for real-time simulation
print(f"[DEBUG] Starting live feed for {video_type} at path: {video_path}")
while cap.isOpened():
ret, frame = cap.read()
print(f"[DEBUG] Frame read success: {ret}, frame count: {state.frame_count}")
if not ret or frame is None or frame.size == 0:
print("[DEBUG] End of video reached or cannot read frame. Looping video.")
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # Loop the video
continue
else:
print(f"Frame shape: {frame.shape} at frame count {state.frame_count}")
state.frame_count += 1
# Convert frame to PIL Image
frame_pil = video_service.frame_to_pil(frame)
if frame_pil is None:
print("[DEBUG] Failed to convert frame to PIL image. Skipping frame.")
continue
print(f"[DEBUG] Processing frame {state.frame_count}")
# Perform detection based on video type
if video_type == "Thermal Feed":
detections = thermal_service.detect_hotspots(frame_pil, detection_service, confidence_threshold)
print(f"Detections for Frame {state.frame_count}: {detections}")
alert_type = "Overheating"
elif video_type == "Shadow/Dust Feed":
detections = shadow_detection.detect_shadow_dust(frame_pil, detection_service, confidence_threshold)
print(f"Detections for Frame {state.frame_count}: {detections}")
alert_type = "Shadow/Dust"
else:
detections = detection_service.detect_objects(frame_pil, confidence_threshold)
print(f"Detections for Frame {state.frame_count}: {detections}")
alert_type = "General"
print(f"[DEBUG] Detections: {detections}")
# Draw detections on frame
annotated_frame = video_service.draw_detections(frame, detections)
annotated_frame_rgb = cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)
print(f"[DEBUG] Yielding frame {state.frame_count}, shape: {annotated_frame_rgb.shape}")
cv2.imshow("Debug Frame", annotated_frame_rgb)
cv2.waitKey(1)
print(f"Annotated frame shape: {annotated_frame_rgb.shape}")
# Update state
num_anomalies = len(detections)
state.anomaly_history.append(num_anomalies)
state.total_detected += num_anomalies
# Update anomaly types
for detection in detections:
label = detection["label"]
state.anomaly_types[label] += 1
# Log detection
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"{timestamp} - Frame {state.frame_count} - Anomalies: {num_anomalies}"
state.logs.append(log_entry)
# Capture events (frames with anomalies)
if num_anomalies > 0:
state.captured_events.append(annotated_frame_rgb)
# Generate Salesforce case and notifications
if num_anomalies > 0:
case_id = salesforce_dispatcher.create_case(
subject=f"{alert_type} Detected in {video_type} (Frame {state.frame_count})",
description=str(detections)
)
salesforce_dispatcher.send_email(
to="admin@solarplant.com",
subject=f"Alert: {alert_type} in {video_type}",
body=f"Case ID: {case_id}\nDetails: {detections}\nFrame: {state.frame_count}"
)
salesforce_dispatcher.notify_security_team(
message=f"Alert: {alert_type} detected in {video_type}. Case ID: {case_id}, Frame: {state.frame_count}"
)
# Generate live metrics
metrics = []
for detection in detections:
box = detection["box"]
coords = f"[{box['xmin']},{box['ymin']},{box['xmax']},{box['ymax']}]"
metrics.append(coords)
metrics_str = f"Coordinates: {metrics}\nTotal Detected: {state.total_detected}"
# Generate detection trend plot with dark theme
plt.style.use('dark_background')
plt.figure(figsize=(4, 2))
plt.plot(list(state.anomaly_history), marker='o', color='yellow')
plt.title("Anomalies Over Time", color='white')
plt.xlabel("Frame", color='white')
plt.ylabel("Count", color='white')
plt.grid(True, color='gray')
plt.tick_params(colors='white')
trend_plot = plt.gcf()
plt.close()
# Generate anomaly types summary
anomaly_types_str = "\n".join([f"{k}: {v}" for k, v in state.anomaly_types.items()])
# Update timestamp
timestamp_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Yield updated UI components
yield (
gr.update(value=annotated_frame_rgb),
gr.update(value=metrics_str),
gr.update(value="\n".join(state.logs)),
gr.update(value=trend_plot),
gr.update(value=anomaly_types_str),
gr.update(value=list(state.captured_events)),
gr.update(value=timestamp_str)
)
# Simulate real-time by sleeping between frames
time.sleep(frame_interval)
cap.release()
# Custom CSS for dark theme and styling
custom_css = """
body, .gradio-container {
background-color: #1a1a1a !important;
color: white !important;
font-family: Arial, sans-serif !important;
}
h1, h2, h3, label {
color: white !important;
font-weight: bold !important;
}
.gradio-row, .gradio-column {
background-color: #2b2b2b !important;
border-radius: 8px !important;
padding: 10px !important;
margin: 5px !important;
}
#live-feed {
border: 2px solid #444 !important;
border-radius: 8px !important;
}
#live-metrics, #live-logs, #anomaly-types {
background-color: #333 !important;
color: white !important;
border: 1px solid #555 !important;
border-radius: 8px !important;
padding: 10px !important;
height: 100px !important;
overflow-y: auto !important;
}
#detection-trend, #captured-events {
background-color: #333 !important;
border: 1px solid #555 !important;
border-radius: 8px !important;
padding: 10px !important;
}
#status-indicator {
color: #00ff00 !important;
font-size: 14px !important;
}
#timestamp {
font-size: 16px !important;
color: #cccccc !important;
}
"""
# Gradio Interface
with gr.Blocks(css=custom_css) as demo:
gr.Markdown("# Fault Detection")
timestamp = gr.Textbox(label="", value=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), elem_id="timestamp")
with gr.Row():
# Left Panel: Live Feed and Controls
with gr.Column(scale=7):
with gr.Row():
video_type = gr.Dropdown(
choices=["Day Feed", "Night Feed", "Thermal Feed", "Shadow/Dust Feed"],
label="Select Drone Feed",
value="Thermal Feed"
)
confidence_threshold = gr.Slider(0.5, 1.0, value=0.9, label="Confidence Threshold")
start_button = gr.Button("Start Live Feed")
live_feed = gr.Image(label="Live Video Feed", streaming=True, elem_id="live-feed")
status_indicator = gr.HTML(
'<p id="status-indicator">Status: <span style="color: green;">Running</span> β’</p>',
label=""
)
# Right Panel: Analytics
with gr.Column(scale=3):
live_metrics = gr.Textbox(label="Live Metrics", elem_id="live-metrics")
live_logs = gr.Textbox(label="Live Logs", elem_id="live-logs")
detection_trend = gr.Plot(label="Detection Trend", elem_id="detection-trend")
anomaly_types = gr.Textbox(label="Anomaly Types", elem_id="anomaly-types")
captured_events = gr.Gallery(label="Captured Events (Last 5)", elem_id="captured-events")
start_button.click(
fn=live_feed_generator,
inputs=[video_type, confidence_threshold],
outputs=[live_feed, live_metrics, live_logs, detection_trend, anomaly_types, captured_events, timestamp]
)
demo.launch() |