Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import torch | |
| from transformers import DetrImageProcessor, DetrForObjectDetection | |
| import cv2 | |
| import numpy as np | |
| import tempfile | |
| import os | |
| # Set page configuration | |
| st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide") | |
| # Title and description | |
| st.title("Solar Panel Fault Detection PoC") | |
| st.write("Upload a thermal video (MP4) of a solar panel to detect thermal, dust, and power generation faults.") | |
| # Load model and processor | |
| def load_model(): | |
| processor = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50") | |
| model = DetrForObjectDetection.from_pretrained("facebook/detr-resnet-50") | |
| return processor, model | |
| processor, model = load_model() | |
| # Function to process frame and detect faults | |
| def detect_faults(frame): | |
| # Convert frame to RGB if necessary | |
| if frame.shape[-1] == 4: | |
| frame = frame[:, :, :3] | |
| # Prepare frame for model | |
| inputs = processor(images=frame, return_tensors="pt") | |
| # Run inference | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| # Post-process outputs | |
| target_sizes = torch.tensor([frame.shape[:2]]) | |
| results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.9)[0] | |
| # Initialize fault detection | |
| faults = {"Thermal Fault": False, "Dust Fault": False, "Power Generation Fault": False} | |
| annotated_frame = frame.copy() | |
| # Analyze frame for faults | |
| for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): | |
| box = [int(i) for i in box.tolist()] | |
| # Simulate fault detection based on bounding box and pixel intensity | |
| roi = frame[box[1]:box[3], box[0]:box[2]] | |
| mean_intensity = np.mean(roi) | |
| # Thermal Fault: High intensity (hotspot) | |
| if mean_intensity > 200: # Adjust threshold based on thermal video scale | |
| faults["Thermal Fault"] = True | |
| cv2.rectangle(annotated_frame, (box[0], box[1]), (box[2], box[3]), (255, 0, 0), 2) | |
| cv2.putText(annotated_frame, "Thermal Fault", (box[0], box[1]-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) | |
| # Dust Fault: Low intensity or irregular patterns | |
| elif mean_intensity < 100: # Adjust threshold | |
| faults["Dust Fault"] = True | |
| cv2.rectangle(annotated_frame, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2) | |
| cv2.putText(annotated_frame, "Dust Fault", (box[0], box[1]-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) | |
| # Power Generation Fault: Any detected anomaly may indicate reduced efficiency | |
| if faults["Thermal Fault"] or faults["Dust Fault"]: | |
| faults["Power Generation Fault"] = True | |
| return annotated_frame, faults | |
| # Function to process video and generate annotated output | |
| def process_video(video_path): | |
| # Open video | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| st.error("Error: Could not open video file.") | |
| return None, None | |
| # Get video properties | |
| frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Create temporary file for output video | |
| output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name | |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height)) | |
| # Initialize fault summary | |
| video_faults = {"Thermal Fault": False, "Dust Fault": False, "Power Generation Fault": False} | |
| # Process each frame | |
| frame_count = 0 | |
| with st.spinner("Analyzing video..."): | |
| progress = st.progress(0) | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Convert BGR to RGB | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Detect faults in frame | |
| annotated_frame, faults = detect_faults(frame_rgb) | |
| # Update video faults | |
| for fault in video_faults: | |
| video_faults[fault] |= faults[fault] | |
| # Convert back to BGR for writing | |
| annotated_frame_bgr = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR) | |
| out.write(annotated_frame_bgr) | |
| # Update progress | |
| frame_count += 1 | |
| progress.progress(frame_count / total_frames) | |
| cap.release() | |
| out.release() | |
| return output_path, video_faults | |
| # File uploader | |
| uploaded_file = st.file_uploader("Upload a thermal video", type=["mp4"]) | |
| if uploaded_file is not None: | |
| # Save uploaded video to temporary file | |
| tfile = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) | |
| tfile.write(uploaded_file.read()) | |
| tfile.close() | |
| # Display uploaded video | |
| st.video(tfile.name, format="video/mp4") | |
| # Process video | |
| output_path, video_faults = process_video(tfile.name) | |
| if output_path: | |
| # Display results | |
| st.subheader("Fault Detection Results") | |
| st.video(output_path, format="video/mp4") | |
| # Show fault summary | |
| st.write("**Detected Faults in Video:**") | |
| for fault, detected in video_faults.items(): | |
| status = "Detected" if detected else "Not Detected" | |
| color = "red" if detected else "green" | |
| st.markdown(f"- **{fault}**: <span style='color:{color}'>{status}</span>", unsafe_allow_html=True) | |
| # Provide recommendations | |
| if any(video_faults.values()): | |
| st.subheader("Recommendations") | |
| if video_faults["Thermal Fault"]: | |
| st.write("- **Thermal Fault**: Inspect for damaged components or overheating issues.") | |
| if video_faults["Dust Fault"]: | |
| st.write("- **Dust Fault**: Schedule cleaning to remove dust accumulation.") | |
| if video_faults["Power Generation Fault"]: | |
| st.write("- **Power Generation Fault**: Investigate efficiency issues due to detected faults.") | |
| else: | |
| st.write("No faults detected. The solar panel appears to be functioning normally.") | |
| # Clean up temporary files | |
| os.unlink(output_path) | |
| # Clean up uploaded file | |
| os.unlink(tfile.name) | |
| # Footer | |
| st.markdown("---") | |
| st.write("Built with Streamlit, Hugging Face Transformers, and OpenCV for Solar Panel Fault Detection PoC") |