Solar_Panel_V_F / app.py
Naveenkumar1546's picture
Update app.py
384126b verified
# Full code integrating:
# - Fault timestamps
# - CSV export
# - Clickable snapshots
# - Heatmap overlay
import streamlit as st
import cv2
import numpy as np
import tempfile
import os
import torch
import pandas as pd
import base64
from ultralytics import YOLO
st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide")
st.title("Solar Panel Fault Detection (Enhanced)")
st.write("Upload a thermal video (MP4) to detect faults with location, snapshots, and export options.")
@st.cache_resource
def load_model():
return YOLO("yolov5s.pt")
model = load_model()
def detect_faults(frame, results, frame_idx, fps):
faults_found = []
annotated_frame = frame.copy()
for result in results:
for box in result.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
roi = frame[y1:y2, x1:x2]
if roi.size == 0:
continue
mean_intensity = np.mean(roi)
if mean_intensity > 200:
label = "Thermal Fault"
color = (255, 0, 0)
elif mean_intensity < 100:
label = "Dust Fault"
color = (0, 255, 0)
else:
continue
timestamp = round(frame_idx / fps, 2)
faults_found.append({
"Frame": frame_idx,
"Time (s)": timestamp,
"Fault Type": label,
"X1": x1, "Y1": y1, "X2": x2, "Y2": y2
})
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(annotated_frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
return annotated_frame, faults_found
def process_video(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
st.error("Failed to open video.")
return None, None, None, None
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
heatmap = np.zeros((height, width), dtype=np.float32)
snapshot_dir = tempfile.mkdtemp()
output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
fault_log = []
snapshots = []
frame_count = 0
process_every_n_frames = fps
with st.spinner("Processing video..."):
progress = st.progress(0)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % process_every_n_frames == 0:
resized = cv2.resize(frame, (640, 480))
frame_rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
results = model(frame_rgb, verbose=False)
annotated_frame, faults_found = detect_faults(frame, results, frame_count, fps)
fault_log.extend(faults_found)
if faults_found:
snapshot_path = os.path.join(snapshot_dir, f"frame_{frame_count}.jpg")
cv2.imwrite(snapshot_path, annotated_frame)
snapshots.append(snapshot_path)
for fault in faults_found:
heatmap[fault["Y1"]:fault["Y2"], fault["X1"]:fault["X2"]] += 1
else:
annotated_frame = frame
out.write(annotated_frame)
frame_count += 1
progress.progress(min(frame_count / total_frames, 1.0))
cap.release()
out.release()
return output_path, fault_log, snapshots, heatmap
def get_heatmap_overlay(heatmap, base_frame):
normalized = cv2.normalize(heatmap, None, 0, 255, cv2.NORM_MINMAX)
heatmap_img = cv2.applyColorMap(normalized.astype(np.uint8), cv2.COLORMAP_JET)
overlay = cv2.addWeighted(base_frame, 0.6, heatmap_img, 0.4, 0)
return overlay
def convert_df_to_csv(df):
return df.to_csv(index=False).encode('utf-8')
uploaded_file = st.file_uploader("Upload a thermal video", type=["mp4"])
if uploaded_file:
temp_input_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
with open(temp_input_path, "wb") as f:
f.write(uploaded_file.read())
st.video(temp_input_path)
output_path, fault_log, snapshots, heatmap = process_video(temp_input_path)
if output_path:
st.subheader("Detection Results")
st.video(output_path)
if fault_log:
df = pd.DataFrame(fault_log)
st.write("### Detected Faults Table")
st.dataframe(df)
st.download_button("Download Fault Log as CSV", data=convert_df_to_csv(df), file_name="fault_log.csv", mime="text/csv")
st.write("### Snapshots of Faults")
cols = st.columns(4)
for i, path in enumerate(snapshots):
with cols[i % 4]:
st.image(path, use_column_width=True)
st.write("### Fault Location Heatmap")
cap = cv2.VideoCapture(temp_input_path)
ret, frame = cap.read()
cap.release()
if ret:
heatmap_overlay = get_heatmap_overlay(heatmap, frame)
st.image(heatmap_overlay, caption="Fault Heatmap", use_column_width=True)
else:
st.success("No faults detected.")
os.unlink(output_path)
os.unlink(temp_input_path)
st.markdown("---")
st.caption("Built with Streamlit + YOLOv5 (Ultralytics) + OpenCV")