import streamlit as st import torch from transformers import DetrImageProcessor, DetrForObjectDetection import cv2 import numpy as np import tempfile import os # Set page configuration st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide") # Title and description st.title("Solar Panel Fault Detection PoC") st.write("Upload a thermal video (MP4) of a solar panel to detect thermal, dust, and power generation faults.") # Load model and processor @st.cache_resource def load_model(): processor = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50") model = DetrForObjectDetection.from_pretrained("facebook/detr-resnet-50") return processor, model processor, model = load_model() # Function to process frame and detect faults def detect_faults(frame): # Convert frame to RGB if necessary if frame.shape[-1] == 4: frame = frame[:, :, :3] # Prepare frame for model inputs = processor(images=frame, return_tensors="pt") # Run inference with torch.no_grad(): outputs = model(**inputs) # Post-process outputs target_sizes = torch.tensor([frame.shape[:2]]) results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.9)[0] # Initialize fault detection faults = {"Thermal Fault": False, "Dust Fault": False, "Power Generation Fault": False} annotated_frame = frame.copy() # Analyze frame for faults for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): box = [int(i) for i in box.tolist()] # Simulate fault detection based on bounding box and pixel intensity roi = frame[box[1]:box[3], box[0]:box[2]] mean_intensity = np.mean(roi) # Thermal Fault: High intensity (hotspot) if mean_intensity > 200: # Adjust threshold based on thermal video scale faults["Thermal Fault"] = True cv2.rectangle(annotated_frame, (box[0], box[1]), (box[2], box[3]), (255, 0, 0), 2) cv2.putText(annotated_frame, "Thermal Fault", (box[0], box[1]-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) # Dust Fault: Low intensity or irregular patterns elif mean_intensity < 100: # Adjust threshold faults["Dust Fault"] = True cv2.rectangle(annotated_frame, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2) cv2.putText(annotated_frame, "Dust Fault", (box[0], box[1]-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) # Power Generation Fault: Any detected anomaly may indicate reduced efficiency if faults["Thermal Fault"] or faults["Dust Fault"]: faults["Power Generation Fault"] = True return annotated_frame, faults # Function to process video and generate annotated output def process_video(video_path): # Open video cap = cv2.VideoCapture(video_path) if not cap.isOpened(): st.error("Error: Could not open video file.") return None, None # Get video properties frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = int(cap.get(cv2.CAP_PROP_FPS)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) # Create temporary file for output video output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name fourcc = cv2.VideoWriter_fourcc(*"mp4v") out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height)) # Initialize fault summary video_faults = {"Thermal Fault": False, "Dust Fault": False, "Power Generation Fault": False} # Process each frame frame_count = 0 with st.spinner("Analyzing video..."): progress = st.progress(0) while cap.isOpened(): ret, frame = cap.read() if not ret: break # Convert BGR to RGB frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Detect faults in frame annotated_frame, faults = detect_faults(frame_rgb) # Update video faults for fault in video_faults: video_faults[fault] |= faults[fault] # Convert back to BGR for writing annotated_frame_bgr = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR) out.write(annotated_frame_bgr) # Update progress frame_count += 1 progress.progress(frame_count / total_frames) cap.release() out.release() return output_path, video_faults # File uploader uploaded_file = st.file_uploader("Upload a thermal video", type=["mp4"]) if uploaded_file is not None: # Save uploaded video to temporary file tfile = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) tfile.write(uploaded_file.read()) tfile.close() # Display uploaded video st.video(tfile.name, format="video/mp4") # Process video output_path, video_faults = process_video(tfile.name) if output_path: # Display results st.subheader("Fault Detection Results") st.video(output_path, format="video/mp4") # Show fault summary st.write("**Detected Faults in Video:**") for fault, detected in video_faults.items(): status = "Detected" if detected else "Not Detected" color = "red" if detected else "green" st.markdown(f"- **{fault}**: {status}", unsafe_allow_html=True) # Provide recommendations if any(video_faults.values()): st.subheader("Recommendations") if video_faults["Thermal Fault"]: st.write("- **Thermal Fault**: Inspect for damaged components or overheating issues.") if video_faults["Dust Fault"]: st.write("- **Dust Fault**: Schedule cleaning to remove dust accumulation.") if video_faults["Power Generation Fault"]: st.write("- **Power Generation Fault**: Investigate efficiency issues due to detected faults.") else: st.write("No faults detected. The solar panel appears to be functioning normally.") # Clean up temporary files os.unlink(output_path) # Clean up uploaded file os.unlink(tfile.name) # Footer st.markdown("---") st.write("Built with Streamlit, Hugging Face Transformers, and OpenCV for Solar Panel Fault Detection PoC")