Spaces:
Build error
Build error
| # app.py (All modules now in same directory) | |
| import streamlit as st | |
| import cv2 | |
| from video_service import get_video_frame | |
| from detection_service import detect_objects | |
| from thermal_service import detect_thermal_anomalies | |
| from shadow_detection import detect_shadow_coverage | |
| from salesforce_dispatcher import send_to_salesforce | |
| import os | |
| st.set_page_config(page_title="Solar Surveillance", layout="wide") | |
| st.title("π Solar Panel Surveillance - Drone + AI + Salesforce") | |
| video_option = st.selectbox("Choose a Simulation Feed", [ | |
| "drone_day.mp4", "night_intrusion.mp4", "thermal_hotspot.mp4", "shadow_dust_issue.mp4" | |
| ]) | |
| if st.button("Start Monitoring"): | |
| frame_gen = get_video_frame(f"data/{video_option}") | |
| frame_placeholder = st.empty() | |
| for frame in frame_gen: | |
| temp_path = "temp.jpg" | |
| cv2.imwrite(temp_path, frame) | |
| detections = detect_objects(temp_path) | |
| thermal = detect_thermal_anomalies(temp_path) | |
| shadow_flag = detect_shadow_coverage(temp_path) | |
| alert_payload = { | |
| "detections": detections, | |
| "thermal": bool(thermal), | |
| "shadow_issue": shadow_flag, | |
| } | |
| send_to_salesforce(alert_payload) | |
| frame_placeholder.image(frame, channels="BGR") | |
| # video_service.py | |
| import cv2 | |
| def get_video_frame(video_path): | |
| cap = cv2.VideoCapture(video_path) | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| yield frame | |
| cap.release() | |
| # detection_service.py | |
| from transformers import pipeline | |
| import cv2 | |
| object_detector = pipeline("object-detection", model="facebook/detr-resnet-50") | |
| def detect_objects(image_path): | |
| image = cv2.imread(image_path) | |
| results = object_detector(image) | |
| return [r for r in results if r['score'] > 0.7] | |
| # thermal_service.py | |
| try: | |
| from ultralytics import YOLO | |
| thermal_model = YOLO("models/thermal_model.pt") | |
| except Exception as e: | |
| thermal_model = None | |
| print("β οΈ Thermal model could not be loaded:", e) | |
| def detect_thermal_anomalies(image_path): | |
| if not thermal_model: | |
| return [] | |
| results = thermal_model(image_path) | |
| flagged = [] | |
| for r in results: | |
| if hasattr(r, 'temperature') and r.temperature > 75: | |
| flagged.append(r) | |
| return flagged | |
| # shadow_detection.py | |
| import random | |
| def detect_shadow_coverage(image_path): | |
| shadow_percent = random.randint(25, 40) | |
| return shadow_percent > 30 | |
| # salesforce_dispatcher.py | |
| import requests | |
| import json | |
| SALESFORCE_WEBHOOK_URL = "https://your-salesforce-instance/services/web-to-case" | |
| def send_to_salesforce(payload): | |
| alert_type = "Intrusion" if any(d["label"] == "person" for d in payload["detections"]) else "Anomaly" | |
| summary = { | |
| "Alert_Type__c": alert_type, | |
| "ThermalFlag__c": payload["thermal"], | |
| "ShadowFlag__c": payload["shadow_issue"], | |
| "Confidence_Score__c": max([d["score"] for d in payload["detections"]], default=0) | |
| } | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| response = requests.post(SALESFORCE_WEBHOOK_URL, json=summary, headers=headers) | |
| response.raise_for_status() | |
| except requests.exceptions.RequestException as e: | |
| print("β Error sending to Salesforce:", e) | |