surveillance12 / app.py
lokesh341's picture
Update app.py
00d4eb1 verified
import os
import gradio as gr
import cv2
import time
import json
import random
import logging
import matplotlib.pyplot as plt
import shutil
from datetime import datetime
from collections import Counter
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics"
try:
from services.video_service import get_next_video_frame, reset_video_index, preload_video, release_video
from services.metrics_service import update_metrics
from services.salesforce_dispatcher import send_to_salesforce
from services.shadow_detection import detect_shadow_coverage
from services.thermal_service import process_thermal
from services.map_service import generate_map
from services.plantation.plant_count import process_plants
from services.plantation.plant_health import process_plant_health
from services.plantation.missing_patch_check import process_missing_patches
except ImportError as e:
print(f"Failed to import service modules: {str(e)}")
logging.error(f"Import error: {str(e)}")
exit(1)
logging.basicConfig(
filename="app.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
paused: bool = False
frame_rate: float = 0.3
frame_count: int = 0
log_entries: List[str] = []
detected_counts: List[int] = []
last_frame: Optional[np.ndarray] = None
last_metrics: Dict[str, Any] = {}
last_timestamp: str = ""
detected_plants: List[str] = []
gps_coordinates: List[List[float]] = []
media_loaded: bool = False
active_service: Optional[str] = None
is_video: bool = True
static_image: Optional[np.ndarray] = None
enabled_services: List[str] = []
DEFAULT_VIDEO_PATH = "sample.mp4"
TEMP_IMAGE_PATH = os.path.abspath("temp.jpg")
CAPTURED_FRAMES_DIR = os.path.abspath("captured_frames")
OUTPUT_DIR = os.path.abspath("outputs")
TEMP_MEDIA_DIR = os.path.abspath("temp_media")
for directory in [CAPTURED_FRAMES_DIR, OUTPUT_DIR, TEMP_MEDIA_DIR]:
os.makedirs(directory, exist_ok=True)
os.chmod(directory, 0o777)
def initialize_media(media_file: Optional[Any] = None) -> str:
global media_loaded, is_video, static_image, log_entries, frame_count
release_video()
static_image = None
frame_count = 0
if media_file is None:
media_path = DEFAULT_VIDEO_PATH
log_entries.append(f"No media uploaded, attempting to load default: {media_path}")
logging.info(f"No media uploaded, attempting to load default: {media_path}")
else:
if not hasattr(media_file, 'name') or not media_file.name:
status = "Error: Invalid media file uploaded."
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
original_path = media_file.name
file_extension = os.path.splitext(original_path)[1].lower()
temp_media_path = os.path.join(TEMP_MEDIA_DIR, f"uploaded_media{file_extension}")
try:
shutil.copy(original_path, temp_media_path)
media_path = temp_media_path
log_entries.append(f"Copied uploaded file to: {media_path}")
logging.info(f"Copied uploaded file to: {media_path}")
except Exception as e:
status = f"Error copying uploaded file: {str(e)}"
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
if not os.path.exists(media_path):
status = f"Error: Media file '{media_path}' not found."
log_entries.append(status)
logging.error(status)
media_loaded = False
return status
try:
if file_extension in (".mp4", ".avi"):
is_video = True
preload_video(media_path)
media_loaded = True
status = f"Successfully loaded video: {media_path}"
elif file_extension in (".jpg", ".jpeg", ".png"):
is_video = False
static_image = cv2.imread(media_path)
if static_image is None:
raise RuntimeError(f"Failed to load image: {media_path}")
static_image = cv2.resize(static_image, (320, 240))
media_loaded = True
status = f"Successfully loaded image: {media_path}"
else:
media_loaded = False
status = "Error: Unsupported file format. Use .mp4, .avi, .jpg, .jpeg, or .png."
log_entries.append(status)
logging.error(status)
return status
log_entries.append(status)
logging.info(status)
return status
except Exception as e:
media_loaded = False
status = f"Error loading media: {str(e)}"
log_entries.append(status)
logging.error(status)
return status
def set_active_service(pl_val: bool) -> Tuple[Optional[str], str]:
global active_service, enabled_services
enabled_services = []
if pl_val:
enabled_services.append("plantation")
if not enabled_services:
active_service = None
log_entries.append("Plantation service disabled.")
logging.info("Plantation service disabled.")
return None, "No Service Enabled"
active_service = "plantation"
log_entries.append("Enabled service: Plantation")
logging.info("Enabled service: Plantation")
return active_service, "Enabled: Plantation"
def generate_line_chart() -> Optional[str]:
if not detected_counts:
return None
fig, ax = plt.subplots(figsize=(4, 2))
ax.plot(detected_counts[-50:], marker='o', color='#FF8C00')
ax.set_title("Detections Over Time")
ax.set_xlabel("Frame")
ax.set_ylabel("Count")
ax.grid(True)
fig.tight_layout()
chart_path = "chart_temp.png"
try:
fig.savefig(chart_path)
plt.close(fig)
return chart_path
except Exception as e:
log_entries.append(f"Error generating chart: {str(e)}")
logging.error(f"Error generating chart: {str(e)}")
return None
def monitor_feed() -> Tuple[
Optional[np.ndarray],
str,
str,
List[str],
Optional[str],
Optional[str]
]:
global paused, frame_count, last_frame, last_metrics, last_timestamp
global gps_coordinates, detected_plants, media_loaded
global is_video, static_image, enabled_services
if not media_loaded:
log_entries.append("Cannot start processing: Media not loaded successfully.")
logging.error("Media not loaded successfully.")
return (
None,
json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
None,
None
)
if paused and last_frame is not None:
frame = last_frame.copy()
metrics = last_metrics.copy()
else:
max_retries = 3
start_time = time.time()
for attempt in range(max_retries):
try:
if is_video:
frame = get_next_video_frame()
if frame is None:
log_entries.append(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.")
logging.warning(f"Frame retrieval failed on attempt {attempt + 1}, resetting video.")
reset_video_index()
continue
break
else:
frame = static_image.copy()
break
except Exception as e:
log_entries.append(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}")
logging.error(f"Frame retrieval error on attempt {attempt + 1}: {str(e)}")
if attempt == max_retries - 1:
return (
None,
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
None,
None
)
else:
log_entries.append("Failed to retrieve frame after maximum retries.")
logging.error("Failed to retrieve frame after maximum retries.")
return (
None,
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
None,
None
)
detection_frame = cv2.resize(frame, (512, 320))
all_detected_items: List[Dict[str, Any]] = []
shadow_issue = False
thermal_flag = False
try:
if "plantation" in enabled_services:
plant_dets, detection_frame = process_plants(detection_frame)
health_dets, detection_frame = process_plant_health(detection_frame)
missing_dets, detection_frame = process_missing_patches(detection_frame)
all_detected_items.extend(plant_dets + health_dets + missing_dets)
try:
cv2.imwrite(TEMP_IMAGE_PATH, detection_frame)
shadow_issue = detect_shadow_coverage(TEMP_IMAGE_PATH)
except Exception as e:
log_entries.append(f"Error saving temp image for shadow detection: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
shadow_issue = False
if len(detection_frame.shape) == 2:
thermal_results = process_thermal(detection_frame)
thermal_dets = thermal_results["detections"]
detection_frame = thermal_results["frame"]
all_detected_items.extend(thermal_dets)
thermal_flag = bool(thermal_dets)
orig_h, orig_w = frame.shape[:2]
det_h, det_w = detection_frame.shape[:2]
scale_x, scale_y = orig_w / det_w, orig_h / det_h
for item in all_detected_items:
if "box" in item:
box = item["box"]
item["box"] = [
int(box[0] * scale_x),
int(box[1] * scale_y),
int(box[2] * scale_x),
int(box[3] * scale_y)
]
for item in all_detected_items:
box = item.get("box", [])
if not box:
continue
x_min, y_min, x_max, y_max = box
label = item.get("label", "")
dtype = item.get("type", "")
health = item.get("health", "")
if dtype == "plant":
color = (0, 255, 255) # Cyan
if health == "healthy":
color = (255, 165, 0) # Orange
elif dtype == "missing_patch":
color = (255, 0, 255) # Magenta
else:
continue
cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 3)
(text_w, text_h), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
label_background = frame[y_min - text_h - 15:y_min - 5, x_min:x_min + text_w + 10]
if label_background.size > 0:
overlay = label_background.copy()
cv2.rectangle(overlay, (0, 0), (text_w + 10, text_h + 10), (0, 0, 0), -1)
alpha = 0.5
cv2.addWeighted(overlay, alpha, label_background, 1 - alpha, 0, label_background)
cv2.putText(frame, label, (x_min + 5, y_min - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
try:
cv2.imwrite(TEMP_IMAGE_PATH, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
except Exception as e:
log_entries.append(f"Error saving temp image: {str(e)}")
logging.error(f"Error saving temp image: {str(e)}")
except Exception as e:
log_entries.append(f"Processing Error: {str(e)}")
logging.error(f"Processing error: {str(e)}")
all_detected_items = []
metrics = update_metrics(all_detected_items)
gps_coord = [17.385044 + random.uniform(-0.001, 0.001), 78.486671 + frame_count * 0.0001]
gps_coordinates.append(gps_coord)
for item in all_detected_items:
item["gps"] = gps_coord
detection_types = {item.get("type") for item in all_detected_items if "type" in item}
if detection_types:
try:
captured_frame_path = os.path.join(CAPTURED_FRAMES_DIR, f"detected_{frame_count}.jpg")
success = cv2.imwrite(captured_frame_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
if not success:
raise RuntimeError(f"Failed to save captured frame: {captured_frame_path}")
for item in all_detected_items:
if item.get("type") == "plant":
detected_plants.append(captured_frame_path)
if len(detected_plants) > 100:
detected_plants.pop(0)
except Exception as e:
log_entries.append(f"Error saving captured frame: {str(e)}")
logging.error(f"Error saving captured frame: {str(e)}")
all_detections = {
"detections": all_detected_items,
"metrics": metrics,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"frame_count": frame_count,
"gps_coordinates": gps_coord,
"shadow_issue": shadow_issue,
"thermal": thermal_flag
}
try:
send_to_salesforce(all_detections)
except Exception as e:
log_entries.append(f"Salesforce Dispatch Error: {str(e)}")
logging.error(f"Salesforce dispatch error: {str(e)}")
try:
frame_path = os.path.join(OUTPUT_DIR, f"frame_{frame_count:04d}.jpg")
success = cv2.imwrite(frame_path, frame, [int(cv2.IMWRITE_JPEG_QUALITY), 100])
if not success:
raise RuntimeError(f"Failed to save output frame: {frame_path}")
except Exception as e:
log_entries.append(f"Error saving output frame: {str(e)}")
logging.error(f"Error saving output frame: {str(e)}")
frame_count += 1
last_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
last_frame = frame.copy()
last_metrics = metrics
plant_detected = len([item for item in all_detected_items if item.get("type") == "plant"])
missing_detected = len([item for item in all_detected_items if item.get("type") == "missing_patch"])
detected_counts.append(plant_detected + missing_detected)
processing_time = time.time() - start_time
detection_summary = {
"timestamp": last_timestamp,
"frame": frame_count,
"plants": plant_detected,
"missing_patches": missing_detected,
"gps": gps_coord,
"processing_time_ms": processing_time * 1000
}
log_message = json.dumps(detection_summary, indent=2)
log_entries.append(log_message)
logging.info(log_message)
if len(log_entries) > 100:
log_entries.pop(0)
if len(detected_counts) > 500:
detected_counts.pop(0)
frame = cv2.resize(last_frame, (640, 480))
cv2.putText(frame, f"Frame: {frame_count}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"{last_timestamp}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
map_items = [item for item in last_metrics.get("items", []) if item.get("type") in ["plant", "missing_patch"]]
map_path = generate_map(gps_coordinates[-5:], map_items)
return (
frame[:, :, ::-1],
json.dumps(last_metrics, indent=2),
"\n".join(log_entries[-10:]),
detected_plants,
generate_line_chart(),
map_path
)
with gr.Blocks(theme=gr.themes.Soft(primary_hue="orange", secondary_hue="amber")) as app:
gr.Markdown(
"""
# 🌳 Plantation Inspection Dashboard
Monitor plantation conditions in real-time using drone footage or static images.
"""
)
with gr.Row():
with gr.Column(scale=3):
media_input = gr.File(label="Upload Media File (e.g., sample.mp4, image.jpg)", file_types=[".mp4", ".avi", ".jpg", ".jpeg", ".png"])
load_button = gr.Button("Load Media", variant="primary")
with gr.Column(scale=1):
media_status = gr.Textbox(
label="Media Load Status",
value="Please upload a video/image file or ensure 'sample.mp4' exists in the root directory.",
interactive=False
)
with gr.Row():
with gr.Column():
pl_toggle = gr.Checkbox(label="Enable Plantation Services", value=False)
pl_status = gr.Textbox(label="Plantation Status", value="Disabled", interactive=False)
status_text = gr.Markdown("**Status:** 🟢 Ready (Upload a media file and enable the service to start)")
with gr.Row():
with gr.Column(scale=3):
media_output = gr.Image(label="Live Feed", width=640, height=480, elem_id="live-feed")
with gr.Column(scale=1):
metrics_output = gr.Textbox(
label="Detection Metrics",
lines=10,
interactive=False,
placeholder="Detection metrics, counts will appear here."
)
with gr.Row():
with gr.Column(scale=2):
logs_output = gr.Textbox(label="Live Logs", lines=8, interactive=False)
with gr.Column(scale=1):
plant_images = gr.Gallery(label="Detected Plants (Last 100+)", columns=4, rows=13, height="auto")
with gr.Row():
chart_output = gr.Image(label="Detection Trend")
map_output = gr.Image(label="Issue Locations Map")
with gr.Row():
pause_btn = gr.Button("⏸️ Pause", variant="secondary")
resume_btn = gr.Button("▶️ Resume", variant="primary")
frame_slider = gr.Slider(0.05, 1.0, value=0.3, label="Frame Interval (seconds)", step=0.05)
gr.HTML("""
<style>
body {
background-color: #FFDAB9 !important;
}
#live-feed {
border: 2px solid #FF8C00;
border-radius: 10px;
}
.gr-button-primary {
background-color: #FF8C00 !important;
}
.gr-button-secondary {
background-color: #FF6347 !important;
}
</style>
""")
def toggle_pause() -> str:
global paused
paused = True
return "**Status:** ⏸️ Paused"
def toggle_resume() -> str:
global paused
paused = False
return "**Status:** 🟢 Streaming"
def set_frame_rate(val: float) -> None:
global frame_rate
frame_rate = val
media_status.value = initialize_media()
load_button.click(
initialize_media,
inputs=[media_input],
outputs=[media_status]
)
def update_toggles(pl_val: bool) -> Tuple[str, str]:
active, status_message = set_active_service(pl_val)
pl_status_val = "Enabled" if pl_val else "Disabled"
return pl_status_val, status_message
pl_toggle.change(update_toggles, inputs=[pl_toggle], outputs=[pl_status, status_text])
pause_btn.click(toggle_pause, outputs=status_text)
resume_btn.click(toggle_resume, outputs=status_text)
frame_slider.change(set_frame_rate, inputs=[frame_slider])
def streaming_loop():
while True:
if not media_loaded:
yield None, json.dumps({"error": "Media not loaded. Please upload a video or image file."}, indent=2), "\n".join(log_entries[-10:]), detected_plants, None, None
else:
frame, metrics, logs, plants, chart, map_path = monitor_feed()
if frame is None:
yield None, metrics, logs, plants, chart, map_path
else:
yield frame, metrics, logs, plants, chart, map_path
if not is_video:
break
time.sleep(frame_rate)
app.load(streaming_loop, outputs=[media_output, metrics_output, logs_output, plant_images, chart_output, map_output])
if __name__ == "__main__":
app.launch(share=True)