Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import cv2 | |
| import numpy as np | |
| import tempfile | |
| import os | |
| import torch | |
| from ultralytics import YOLO | |
| # Set page config | |
| st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide") | |
| st.title("Solar Panel Fault Detection (Optimized)") | |
| st.write("Upload a thermal video (MP4) to detect thermal, dust, and power generation faults.") | |
| # Load YOLO model | |
| def load_model(): | |
| model = YOLO("yolov5s.pt") # Replace with your custom-trained model if available | |
| return model | |
| model = load_model() | |
| # Fault detection with frame & location tracking | |
| def detect_faults(frame, results, frame_number): | |
| faults = {"Thermal Fault": False, "Dust Fault": False, "Power Generation Fault": False} | |
| fault_locations = [] | |
| annotated_frame = frame.copy() | |
| for result in results: | |
| boxes = result.boxes | |
| for box in boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| conf = float(box.conf[0]) | |
| cls = int(box.cls[0]) | |
| roi = frame[y1:y2, x1:x2] | |
| if roi.size == 0: | |
| continue | |
| mean_intensity = np.mean(roi) | |
| if mean_intensity > 200: | |
| faults["Thermal Fault"] = True | |
| color = (255, 0, 0) | |
| label = "Thermal Fault" | |
| elif mean_intensity < 100: | |
| faults["Dust Fault"] = True | |
| color = (0, 255, 0) | |
| label = "Dust Fault" | |
| else: | |
| continue | |
| # Track location | |
| fault_locations.append({ | |
| "frame": frame_number, | |
| "label": label, | |
| "confidence": round(conf, 2), | |
| "intensity": round(mean_intensity, 2), | |
| "box": (x1, y1, x2, y2) | |
| }) | |
| # Annotate | |
| overlay = annotated_frame.copy() | |
| alpha = 0.3 | |
| cv2.rectangle(overlay, (x1, y1), (x2, y2), color, -1) | |
| cv2.addWeighted(overlay, alpha, annotated_frame, 1 - alpha, 0, annotated_frame) | |
| cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2) | |
| cv2.putText(annotated_frame, f"{label} ({mean_intensity:.1f})", (x1, y1 - 10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) | |
| if faults["Thermal Fault"] or faults["Dust Fault"]: | |
| faults["Power Generation Fault"] = True | |
| return annotated_frame, faults, fault_locations | |
| # Video processing | |
| def process_video(video_path): | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| st.error("Failed to open video.") | |
| return None, None, None | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name | |
| out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height)) | |
| frame_count = 0 | |
| video_faults = {"Thermal Fault": False, "Dust Fault": False, "Power Generation Fault": False} | |
| all_fault_locations = [] | |
| process_every_n_frames = fps # 1 frame per second | |
| with st.spinner("Processing video..."): | |
| progress = st.progress(0) | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_count % process_every_n_frames == 0: | |
| resized = cv2.resize(frame, (640, 480)) | |
| frame_rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) | |
| results = model(frame_rgb, verbose=False) | |
| annotated_frame, faults, locations = detect_faults(frame, results, frame_count) | |
| all_fault_locations.extend(locations) | |
| for fault in video_faults: | |
| video_faults[fault] |= faults[fault] | |
| else: | |
| annotated_frame = frame | |
| out.write(annotated_frame) | |
| frame_count += 1 | |
| progress.progress(min(frame_count / total_frames, 1.0)) | |
| cap.release() | |
| out.release() | |
| return output_path, video_faults, all_fault_locations | |
| # File uploader | |
| uploaded_file = st.file_uploader("Upload a thermal video", type=["mp4"]) | |
| if uploaded_file: | |
| temp_input_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
| with open(temp_input_path, "wb") as f: | |
| f.write(uploaded_file.read()) | |
| st.video(temp_input_path) | |
| output_path, video_faults, fault_locations = process_video(temp_input_path) | |
| if output_path: | |
| st.subheader("Detection Results") | |
| st.video(output_path) | |
| st.write("### Detected Faults:") | |
| for fault, detected in video_faults.items(): | |
| color = "red" if detected else "green" | |
| st.markdown(f"- **{fault}**: <span style='color:{color}'>{'Detected' if detected else 'Not Detected'}</span>", unsafe_allow_html=True) | |
| if any(video_faults.values()): | |
| st.subheader("Recommendations") | |
| if video_faults["Thermal Fault"]: | |
| st.write("- Check for overheating components.") | |
| if video_faults["Dust Fault"]: | |
| st.write("- Clean dust from solar panel surface.") | |
| if video_faults["Power Generation Fault"]: | |
| st.write("- Investigate potential efficiency issues.") | |
| else: | |
| st.success("No faults detected. The system seems to be functioning properly.") | |
| # Display fault locations | |
| if fault_locations: | |
| st.subheader("π Fault Locations in Video") | |
| st.dataframe([ | |
| { | |
| "Frame": loc["frame"], | |
| "Fault Type": loc["label"], | |
| "Confidence": loc["confidence"], | |
| "Intensity": loc["intensity"], | |
| "Box": f"{loc['box']}" | |
| } for loc in fault_locations | |
| ]) | |
| os.unlink(output_path) | |
| os.unlink(temp_input_path) | |
| st.markdown("---") | |
| st.caption("Built with Streamlit + YOLOv5 (Ultralytics) for fast fault detection.") | |