Spaces:
Sleeping
Sleeping
| # Full code integrating: | |
| # - Fault timestamps | |
| # - CSV export | |
| # - Clickable snapshots | |
| # - Heatmap overlay | |
| import streamlit as st | |
| import cv2 | |
| import numpy as np | |
| import tempfile | |
| import os | |
| import torch | |
| import pandas as pd | |
| import base64 | |
| from ultralytics import YOLO | |
| st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide") | |
| st.title("Solar Panel Fault Detection (Enhanced)") | |
| st.write("Upload a thermal video (MP4) to detect faults with location, snapshots, and export options.") | |
| def load_model(): | |
| return YOLO("yolov5s.pt") | |
| model = load_model() | |
| def detect_faults(frame, results, frame_idx, fps): | |
| faults_found = [] | |
| annotated_frame = frame.copy() | |
| for result in results: | |
| for box in result.boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| roi = frame[y1:y2, x1:x2] | |
| if roi.size == 0: | |
| continue | |
| mean_intensity = np.mean(roi) | |
| if mean_intensity > 200: | |
| label = "Thermal Fault" | |
| color = (255, 0, 0) | |
| elif mean_intensity < 100: | |
| label = "Dust Fault" | |
| color = (0, 255, 0) | |
| else: | |
| continue | |
| timestamp = round(frame_idx / fps, 2) | |
| faults_found.append({ | |
| "Frame": frame_idx, | |
| "Time (s)": timestamp, | |
| "Fault Type": label, | |
| "X1": x1, "Y1": y1, "X2": x2, "Y2": y2 | |
| }) | |
| cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2) | |
| cv2.putText(annotated_frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) | |
| return annotated_frame, faults_found | |
| def process_video(video_path): | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| st.error("Failed to open video.") | |
| return None, None, None, None | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| heatmap = np.zeros((height, width), dtype=np.float32) | |
| snapshot_dir = tempfile.mkdtemp() | |
| output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name | |
| out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height)) | |
| fault_log = [] | |
| snapshots = [] | |
| frame_count = 0 | |
| process_every_n_frames = fps | |
| with st.spinner("Processing video..."): | |
| progress = st.progress(0) | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_count % process_every_n_frames == 0: | |
| resized = cv2.resize(frame, (640, 480)) | |
| frame_rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) | |
| results = model(frame_rgb, verbose=False) | |
| annotated_frame, faults_found = detect_faults(frame, results, frame_count, fps) | |
| fault_log.extend(faults_found) | |
| if faults_found: | |
| snapshot_path = os.path.join(snapshot_dir, f"frame_{frame_count}.jpg") | |
| cv2.imwrite(snapshot_path, annotated_frame) | |
| snapshots.append(snapshot_path) | |
| for fault in faults_found: | |
| heatmap[fault["Y1"]:fault["Y2"], fault["X1"]:fault["X2"]] += 1 | |
| else: | |
| annotated_frame = frame | |
| out.write(annotated_frame) | |
| frame_count += 1 | |
| progress.progress(min(frame_count / total_frames, 1.0)) | |
| cap.release() | |
| out.release() | |
| return output_path, fault_log, snapshots, heatmap | |
| def get_heatmap_overlay(heatmap, base_frame): | |
| normalized = cv2.normalize(heatmap, None, 0, 255, cv2.NORM_MINMAX) | |
| heatmap_img = cv2.applyColorMap(normalized.astype(np.uint8), cv2.COLORMAP_JET) | |
| overlay = cv2.addWeighted(base_frame, 0.6, heatmap_img, 0.4, 0) | |
| return overlay | |
| def convert_df_to_csv(df): | |
| return df.to_csv(index=False).encode('utf-8') | |
| uploaded_file = st.file_uploader("Upload a thermal video", type=["mp4"]) | |
| if uploaded_file: | |
| temp_input_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
| with open(temp_input_path, "wb") as f: | |
| f.write(uploaded_file.read()) | |
| st.video(temp_input_path) | |
| output_path, fault_log, snapshots, heatmap = process_video(temp_input_path) | |
| if output_path: | |
| st.subheader("Detection Results") | |
| st.video(output_path) | |
| if fault_log: | |
| df = pd.DataFrame(fault_log) | |
| st.write("### Detected Faults Table") | |
| st.dataframe(df) | |
| st.download_button("Download Fault Log as CSV", data=convert_df_to_csv(df), file_name="fault_log.csv", mime="text/csv") | |
| st.write("### Snapshots of Faults") | |
| cols = st.columns(4) | |
| for i, path in enumerate(snapshots): | |
| with cols[i % 4]: | |
| st.image(path, use_column_width=True) | |
| st.write("### Fault Location Heatmap") | |
| cap = cv2.VideoCapture(temp_input_path) | |
| ret, frame = cap.read() | |
| cap.release() | |
| if ret: | |
| heatmap_overlay = get_heatmap_overlay(heatmap, frame) | |
| st.image(heatmap_overlay, caption="Fault Heatmap", use_column_width=True) | |
| else: | |
| st.success("No faults detected.") | |
| os.unlink(output_path) | |
| os.unlink(temp_input_path) | |
| st.markdown("---") | |
| st.caption("Built with Streamlit + YOLOv5 (Ultralytics) + OpenCV") | |