Solar_Panel_V_C / app.py
Naveenkumar1546's picture
Update app.py
12e349c verified
import streamlit as st
import cv2
import numpy as np
import tempfile
import os
import torch
import pandas as pd
import base64
from ultralytics import YOLO
st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide")
st.title("Solar Panel Fault Detection (Enhanced)")
st.write("Upload a thermal video (MP4) to detect faults with location, snapshots, and export options.")
@st.cache_resource
def load_model():
return YOLO("yolov5s.pt")
model = load_model()
def detect_faults(frame, results, frame_idx, fps):
faults_found = []
annotated_frame = frame.copy()
for result in results:
for box in result.boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
roi = frame[y1:y2, x1:x2]
if roi.size == 0:
continue
mean_intensity = np.mean(roi)
area = (x2 - x1) * (y2 - y1)
# Fault classification logic
if mean_intensity > 240:
label = "Burned Solar Panel" if area >= 10000 else "Burned Solar Cell"
color = (0, 0, 255)
elif mean_intensity > 200:
label = "Overheat"
color = (255, 165, 0)
elif mean_intensity < 100:
label = "Dust Fault"
color = (0, 255, 0)
else:
continue
timestamp = round(frame_idx / fps, 2)
faults_found.append({
"Frame": frame_idx,
"Time (s)": timestamp,
"Fault Type": label,
"Confidence": round(float(box.conf[0]), 2) if box.conf is not None else None,
"Intensity": round(mean_intensity, 2),
"Box": f"({x1}, {y1}, {x2}, {y2})"
})
# Annotate frame
cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(annotated_frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
return annotated_frame, faults_found
def process_video(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
st.error("Failed to open video.")
return None, None, None, None
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
heatmap = np.zeros((height, width), dtype=np.float32)
snapshot_dir = tempfile.mkdtemp()
output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
fault_log = []
snapshots = []
frame_count = 0
process_every_n_frames = fps
with st.spinner("Processing video..."):
progress = st.progress(0)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % process_every_n_frames == 0:
resized = cv2.resize(frame, (640, 480))
frame_rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
results = model(frame_rgb, verbose=False)
annotated_frame, faults_found = detect_faults(frame, results, frame_count, fps)
fault_log.extend(faults_found)
if faults_found:
snapshot_path = os.path.join(snapshot_dir, f"frame_{frame_count}.jpg")
cv2.imwrite(snapshot_path, annotated_frame)
snapshots.append(snapshot_path)
for fault in faults_found:
x1, y1, x2, y2 = map(int, fault["Box"].strip("()").split(","))
heatmap[y1:y2, x1:x2] += 1
else:
annotated_frame = frame
out.write(annotated_frame)
frame_count += 1
progress.progress(min(frame_count / total_frames, 1.0))
cap.release()
out.release()
return output_path, fault_log, snapshots, heatmap
def get_heatmap_overlay(heatmap, base_frame):
normalized = cv2.normalize(heatmap, None, 0, 255, cv2.NORM_MINMAX)
heatmap_img = cv2.applyColorMap(normalized.astype(np.uint8), cv2.COLORMAP_JET)
overlay = cv2.addWeighted(base_frame, 0.6, heatmap_img, 0.4, 0)
return overlay
def convert_df_to_csv(df):
return df.to_csv(index=False).encode('utf-8')
uploaded_file = st.file_uploader("Upload a thermal video", type=["mp4"])
if uploaded_file:
temp_input_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
with open(temp_input_path, "wb") as f:
f.write(uploaded_file.read())
st.video(temp_input_path)
output_path, fault_log, snapshots, heatmap = process_video(temp_input_path)
if output_path:
st.subheader("Detection Results")
st.video(output_path)
if fault_log:
df = pd.DataFrame(fault_log)
st.write("### Detected Faults Table")
st.dataframe(df)
st.download_button("Download Fault Log as CSV", data=convert_df_to_csv(df), file_name="fault_log.csv", mime="text/csv")
st.write("### Snapshots of Faults")
cols = st.columns(4)
for i, path in enumerate(snapshots):
with cols[i % 4]:
st.image(path, use_column_width=True)
st.write("### Fault Location Heatmap")
cap = cv2.VideoCapture(temp_input_path)
ret, frame = cap.read()
cap.release()
if ret:
heatmap_overlay = get_heatmap_overlay(heatmap, frame)
st.image(heatmap_overlay, caption="Fault Heatmap", use_column_width=True)
else:
st.success("No faults detected.")
os.unlink(output_path)
os.unlink(temp_input_path)
st.markdown("---")
st.caption("Built with Streamlit + YOLOv5 (Ultralytics) + OpenCV")