# app.py (All modules now in same directory) import streamlit as st import cv2 from video_service import get_video_frame from detection_service import detect_objects from thermal_service import detect_thermal_anomalies from shadow_detection import detect_shadow_coverage from salesforce_dispatcher import send_to_salesforce import os st.set_page_config(page_title="Solar Surveillance", layout="wide") st.title("🔭 Solar Panel Surveillance - Drone + AI + Salesforce") video_option = st.selectbox("Choose a Simulation Feed", [ "drone_day.mp4", "night_intrusion.mp4", "thermal_hotspot.mp4", "shadow_dust_issue.mp4" ]) if st.button("Start Monitoring"): frame_gen = get_video_frame(f"data/{video_option}") frame_placeholder = st.empty() for frame in frame_gen: temp_path = "temp.jpg" cv2.imwrite(temp_path, frame) detections = detect_objects(temp_path) thermal = detect_thermal_anomalies(temp_path) shadow_flag = detect_shadow_coverage(temp_path) alert_payload = { "detections": detections, "thermal": bool(thermal), "shadow_issue": shadow_flag, } send_to_salesforce(alert_payload) frame_placeholder.image(frame, channels="BGR") # video_service.py import cv2 def get_video_frame(video_path): cap = cv2.VideoCapture(video_path) while cap.isOpened(): ret, frame = cap.read() if not ret: break yield frame cap.release() # detection_service.py from transformers import pipeline import cv2 object_detector = pipeline("object-detection", model="facebook/detr-resnet-50") def detect_objects(image_path): image = cv2.imread(image_path) results = object_detector(image) return [r for r in results if r['score'] > 0.7] # thermal_service.py try: from ultralytics import YOLO thermal_model = YOLO("models/thermal_model.pt") except Exception as e: thermal_model = None print("⚠️ Thermal model could not be loaded:", e) def detect_thermal_anomalies(image_path): if not thermal_model: return [] results = thermal_model(image_path) flagged = [] for r in results: if hasattr(r, 'temperature') and r.temperature > 75: flagged.append(r) return flagged # shadow_detection.py import random def detect_shadow_coverage(image_path): shadow_percent = random.randint(25, 40) return shadow_percent > 30 # salesforce_dispatcher.py import requests import json SALESFORCE_WEBHOOK_URL = "https://your-salesforce-instance/services/web-to-case" def send_to_salesforce(payload): alert_type = "Intrusion" if any(d["label"] == "person" for d in payload["detections"]) else "Anomaly" summary = { "Alert_Type__c": alert_type, "ThermalFlag__c": payload["thermal"], "ShadowFlag__c": payload["shadow_issue"], "Confidence_Score__c": max([d["score"] for d in payload["detections"]], default=0) } headers = {"Content-Type": "application/json"} try: response = requests.post(SALESFORCE_WEBHOOK_URL, json=summary, headers=headers) response.raise_for_status() except requests.exceptions.RequestException as e: print("❌ Error sending to Salesforce:", e)