Naveenkumar1546's picture
Update app.py
2948299 verified
import streamlit as st
import torch
from transformers import DetrImageProcessor, DetrForObjectDetection
import cv2
import numpy as np
import tempfile
import os
# Set page configuration
st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide")
# Title and description
st.title("Solar Panel Fault Detection PoC")
st.write("Upload a thermal video (MP4) of a solar panel to detect thermal, dust, and power generation faults.")
# Load model and processor
@st.cache_resource
def load_model():
processor = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50")
model = DetrForObjectDetection.from_pretrained("facebook/detr-resnet-50")
return processor, model
processor, model = load_model()
# Function to process frame and detect faults
def detect_faults(frame):
# Convert frame to RGB if necessary
if frame.shape[-1] == 4:
frame = frame[:, :, :3]
# Prepare frame for model
inputs = processor(images=frame, return_tensors="pt")
# Run inference
with torch.no_grad():
outputs = model(**inputs)
# Post-process outputs
target_sizes = torch.tensor([frame.shape[:2]])
results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.9)[0]
# Initialize fault detection
faults = {"Thermal Fault": False, "Dust Fault": False, "Power Generation Fault": False}
annotated_frame = frame.copy()
# Analyze frame for faults
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
box = [int(i) for i in box.tolist()]
# Simulate fault detection based on bounding box and pixel intensity
roi = frame[box[1]:box[3], box[0]:box[2]]
mean_intensity = np.mean(roi)
# Thermal Fault: High intensity (hotspot)
if mean_intensity > 200: # Adjust threshold based on thermal video scale
faults["Thermal Fault"] = True
cv2.rectangle(annotated_frame, (box[0], box[1]), (box[2], box[3]), (255, 0, 0), 2)
cv2.putText(annotated_frame, "Thermal Fault", (box[0], box[1]-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
# Dust Fault: Low intensity or irregular patterns
elif mean_intensity < 100: # Adjust threshold
faults["Dust Fault"] = True
cv2.rectangle(annotated_frame, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2)
cv2.putText(annotated_frame, "Dust Fault", (box[0], box[1]-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# Power Generation Fault: Any detected anomaly may indicate reduced efficiency
if faults["Thermal Fault"] or faults["Dust Fault"]:
faults["Power Generation Fault"] = True
return annotated_frame, faults
# Function to process video and generate annotated output
def process_video(video_path):
# Open video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
st.error("Error: Could not open video file.")
return None, None
# Get video properties
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Create temporary file for output video
output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
# Initialize fault summary
video_faults = {"Thermal Fault": False, "Dust Fault": False, "Power Generation Fault": False}
# Process each frame
frame_count = 0
with st.spinner("Analyzing video..."):
progress = st.progress(0)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Convert BGR to RGB
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Detect faults in frame
annotated_frame, faults = detect_faults(frame_rgb)
# Update video faults
for fault in video_faults:
video_faults[fault] |= faults[fault]
# Convert back to BGR for writing
annotated_frame_bgr = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)
out.write(annotated_frame_bgr)
# Update progress
frame_count += 1
progress.progress(frame_count / total_frames)
cap.release()
out.release()
return output_path, video_faults
# File uploader
uploaded_file = st.file_uploader("Upload a thermal video", type=["mp4"])
if uploaded_file is not None:
# Save uploaded video to temporary file
tfile = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
tfile.write(uploaded_file.read())
tfile.close()
# Display uploaded video
st.video(tfile.name, format="video/mp4")
# Process video
output_path, video_faults = process_video(tfile.name)
if output_path:
# Display results
st.subheader("Fault Detection Results")
st.video(output_path, format="video/mp4")
# Show fault summary
st.write("**Detected Faults in Video:**")
for fault, detected in video_faults.items():
status = "Detected" if detected else "Not Detected"
color = "red" if detected else "green"
st.markdown(f"- **{fault}**: <span style='color:{color}'>{status}</span>", unsafe_allow_html=True)
# Provide recommendations
if any(video_faults.values()):
st.subheader("Recommendations")
if video_faults["Thermal Fault"]:
st.write("- **Thermal Fault**: Inspect for damaged components or overheating issues.")
if video_faults["Dust Fault"]:
st.write("- **Dust Fault**: Schedule cleaning to remove dust accumulation.")
if video_faults["Power Generation Fault"]:
st.write("- **Power Generation Fault**: Investigate efficiency issues due to detected faults.")
else:
st.write("No faults detected. The solar panel appears to be functioning normally.")
# Clean up temporary files
os.unlink(output_path)
# Clean up uploaded file
os.unlink(tfile.name)
# Footer
st.markdown("---")
st.write("Built with Streamlit, Hugging Face Transformers, and OpenCV for Solar Panel Fault Detection PoC")