Spaces:
Sleeping
Sleeping
File size: 5,931 Bytes
adfbf25 e81860f bf0e73e 35aea9a 384126b 35aea9a e81860f 384126b e81860f 384126b 35aea9a 384126b bf0e73e 35aea9a 384126b 35aea9a c38239c 35aea9a c38239c 65c2d6d b6d1ef9 35aea9a 384126b 35aea9a 384126b 65c2d6d 35aea9a 384126b 35aea9a 384126b bf0e73e 35aea9a 384126b 35aea9a bf0e73e 35aea9a 384126b bf0e73e 35aea9a 384126b 2948299 384126b 35aea9a bf0e73e 35aea9a 4cd65a2 35aea9a 384126b 35aea9a 384126b 12e349c 35aea9a 4cd65a2 35aea9a bf0e73e 35aea9a bf0e73e e81860f 384126b e81860f 384126b 35aea9a 384126b 35aea9a bf0e73e 35aea9a 384126b bf0e73e 384126b 4cd65a2 bf0e73e 384126b e81860f 1d3cb9a 384126b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
import streamlit as st
import cv2
import numpy as np
import tempfile
import os
import torch
import pandas as pd
import base64
from ultralytics import YOLO
st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide")
st.title("Solar Panel Fault Detection (Enhanced)")
st.write("Upload a thermal video (MP4) to detect faults with location, snapshots, and export options.")
@st.cache_resource
def load_model():
return YOLO("yolov5s.pt")
model = load_model()
def detect_faults(frame, results, frame_idx, fps):
faults_found = []
annotated_frame = frame.copy()
for result in results:
for box in result.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
roi = frame[y1:y2, x1:x2]
if roi.size == 0:
continue
mean_intensity = np.mean(roi)
area = (x2 - x1) * (y2 - y1)
# Fault classification logic
if mean_intensity > 240:
label = "Burned Solar Panel" if area >= 10000 else "Burned Solar Cell"
color = (0, 0, 255)
elif mean_intensity > 200:
label = "Overheat"
color = (255, 165, 0)
elif mean_intensity < 100:
label = "Dust Fault"
color = (0, 255, 0)
else:
continue
timestamp = round(frame_idx / fps, 2)
faults_found.append({
"Frame": frame_idx,
"Time (s)": timestamp,
"Fault Type": label,
"Confidence": round(float(box.conf[0]), 2) if box.conf is not None else None,
"Intensity": round(mean_intensity, 2),
"Box": f"({x1}, {y1}, {x2}, {y2})"
})
# Annotate frame
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(annotated_frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
return annotated_frame, faults_found
def process_video(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
st.error("Failed to open video.")
return None, None, None, None
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
heatmap = np.zeros((height, width), dtype=np.float32)
snapshot_dir = tempfile.mkdtemp()
output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
fault_log = []
snapshots = []
frame_count = 0
process_every_n_frames = fps
with st.spinner("Processing video..."):
progress = st.progress(0)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % process_every_n_frames == 0:
resized = cv2.resize(frame, (640, 480))
frame_rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
results = model(frame_rgb, verbose=False)
annotated_frame, faults_found = detect_faults(frame, results, frame_count, fps)
fault_log.extend(faults_found)
if faults_found:
snapshot_path = os.path.join(snapshot_dir, f"frame_{frame_count}.jpg")
cv2.imwrite(snapshot_path, annotated_frame)
snapshots.append(snapshot_path)
for fault in faults_found:
x1, y1, x2, y2 = map(int, fault["Box"].strip("()").split(","))
heatmap[y1:y2, x1:x2] += 1
else:
annotated_frame = frame
out.write(annotated_frame)
frame_count += 1
progress.progress(min(frame_count / total_frames, 1.0))
cap.release()
out.release()
return output_path, fault_log, snapshots, heatmap
def get_heatmap_overlay(heatmap, base_frame):
normalized = cv2.normalize(heatmap, None, 0, 255, cv2.NORM_MINMAX)
heatmap_img = cv2.applyColorMap(normalized.astype(np.uint8), cv2.COLORMAP_JET)
overlay = cv2.addWeighted(base_frame, 0.6, heatmap_img, 0.4, 0)
return overlay
def convert_df_to_csv(df):
return df.to_csv(index=False).encode('utf-8')
uploaded_file = st.file_uploader("Upload a thermal video", type=["mp4"])
if uploaded_file:
temp_input_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
with open(temp_input_path, "wb") as f:
f.write(uploaded_file.read())
st.video(temp_input_path)
output_path, fault_log, snapshots, heatmap = process_video(temp_input_path)
if output_path:
st.subheader("Detection Results")
st.video(output_path)
if fault_log:
df = pd.DataFrame(fault_log)
st.write("### Detected Faults Table")
st.dataframe(df)
st.download_button("Download Fault Log as CSV", data=convert_df_to_csv(df), file_name="fault_log.csv", mime="text/csv")
st.write("### Snapshots of Faults")
cols = st.columns(4)
for i, path in enumerate(snapshots):
with cols[i % 4]:
st.image(path, use_column_width=True)
st.write("### Fault Location Heatmap")
cap = cv2.VideoCapture(temp_input_path)
ret, frame = cap.read()
cap.release()
if ret:
heatmap_overlay = get_heatmap_overlay(heatmap, frame)
st.image(heatmap_overlay, caption="Fault Heatmap", use_column_width=True)
else:
st.success("No faults detected.")
os.unlink(output_path)
os.unlink(temp_input_path)
st.markdown("---")
st.caption("Built with Streamlit + YOLOv5 (Ultralytics) + OpenCV")
|