import streamlit as st import cv2 import numpy as np import tempfile import os import torch import pandas as pd import base64 from ultralytics import YOLO st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide") st.title("Solar Panel Fault Detection (Enhanced)") st.write("Upload a thermal video (MP4) to detect faults with location, snapshots, and export options.") @st.cache_resource def load_model(): return YOLO("yolov5s.pt") model = load_model() def detect_faults(frame, results, frame_idx, fps): faults_found = [] annotated_frame = frame.copy() for result in results: for box in result.boxes: x1, y1, x2, y2 = map(int, box.xyxy[0]) roi = frame[y1:y2, x1:x2] if roi.size == 0: continue mean_intensity = np.mean(roi) area = (x2 - x1) * (y2 - y1) # Fault classification logic if mean_intensity > 240: label = "Burned Solar Panel" if area >= 10000 else "Burned Solar Cell" color = (0, 0, 255) elif mean_intensity > 200: label = "Overheat" color = (255, 165, 0) elif mean_intensity < 100: label = "Dust Fault" color = (0, 255, 0) else: continue timestamp = round(frame_idx / fps, 2) faults_found.append({ "Frame": frame_idx, "Time (s)": timestamp, "Fault Type": label, "Confidence": round(float(box.conf[0]), 2) if box.conf is not None else None, "Intensity": round(mean_intensity, 2), "Box": f"({x1}, {y1}, {x2}, {y2})" }) # Annotate frame cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2) cv2.putText(annotated_frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) return annotated_frame, faults_found def process_video(video_path): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): st.error("Failed to open video.") return None, None, None, None fps = int(cap.get(cv2.CAP_PROP_FPS)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) heatmap = np.zeros((height, width), dtype=np.float32) snapshot_dir = tempfile.mkdtemp() output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height)) fault_log = [] snapshots = [] frame_count = 0 process_every_n_frames = fps with st.spinner("Processing video..."): progress = st.progress(0) while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_count % process_every_n_frames == 0: resized = cv2.resize(frame, (640, 480)) frame_rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) results = model(frame_rgb, verbose=False) annotated_frame, faults_found = detect_faults(frame, results, frame_count, fps) fault_log.extend(faults_found) if faults_found: snapshot_path = os.path.join(snapshot_dir, f"frame_{frame_count}.jpg") cv2.imwrite(snapshot_path, annotated_frame) snapshots.append(snapshot_path) for fault in faults_found: x1, y1, x2, y2 = map(int, fault["Box"].strip("()").split(",")) heatmap[y1:y2, x1:x2] += 1 else: annotated_frame = frame out.write(annotated_frame) frame_count += 1 progress.progress(min(frame_count / total_frames, 1.0)) cap.release() out.release() return output_path, fault_log, snapshots, heatmap def get_heatmap_overlay(heatmap, base_frame): normalized = cv2.normalize(heatmap, None, 0, 255, cv2.NORM_MINMAX) heatmap_img = cv2.applyColorMap(normalized.astype(np.uint8), cv2.COLORMAP_JET) overlay = cv2.addWeighted(base_frame, 0.6, heatmap_img, 0.4, 0) return overlay def convert_df_to_csv(df): return df.to_csv(index=False).encode('utf-8') uploaded_file = st.file_uploader("Upload a thermal video", type=["mp4"]) if uploaded_file: temp_input_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name with open(temp_input_path, "wb") as f: f.write(uploaded_file.read()) st.video(temp_input_path) output_path, fault_log, snapshots, heatmap = process_video(temp_input_path) if output_path: st.subheader("Detection Results") st.video(output_path) if fault_log: df = pd.DataFrame(fault_log) st.write("### Detected Faults Table") st.dataframe(df) st.download_button("Download Fault Log as CSV", data=convert_df_to_csv(df), file_name="fault_log.csv", mime="text/csv") st.write("### Snapshots of Faults") cols = st.columns(4) for i, path in enumerate(snapshots): with cols[i % 4]: st.image(path, use_column_width=True) st.write("### Fault Location Heatmap") cap = cv2.VideoCapture(temp_input_path) ret, frame = cap.read() cap.release() if ret: heatmap_overlay = get_heatmap_overlay(heatmap, frame) st.image(heatmap_overlay, caption="Fault Heatmap", use_column_width=True) else: st.success("No faults detected.") os.unlink(output_path) os.unlink(temp_input_path) st.markdown("---") st.caption("Built with Streamlit + YOLOv5 (Ultralytics) + OpenCV")