import streamlit as st import cv2 import numpy as np import tempfile import os import pandas as pd from PIL import Image import torch from transformers import DetrImageProcessor, DetrForObjectDetection # === Load Hugging Face Model using default cache (handles permissions safely) === processor = DetrImageProcessor.from_pretrained("NaveenKumar5/Solar_panel_fault_detection") model = DetrForObjectDetection.from_pretrained("NaveenKumar5/Solar_panel_fault_detection") model.eval() # === Streamlit App Setup === st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide") st.title("๐Ÿ” Solar Panel Fault Detection (DETR - Hugging Face)") st.write("Upload a thermal video (MP4). Faults will be detected using a DETR model from Hugging Face.") # === Fault Detection Function === def detect_faults(frame, frame_idx, fps): image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) inputs = processor(images=image, return_tensors="pt") with torch.no_grad(): outputs = model(**inputs) target_sizes = torch.tensor([image.size[::-1]]) results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.7)[0] faults = [] for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): x1, y1, x2, y2 = map(int, box.tolist()) conf = score.item() label_id = label.item() label_name = f"class_{label_id}" # Draw on frame color = (0, 0, 255) cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) cv2.putText(frame, f"{label_name} ({conf:.2f})", (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) faults.append({ "Frame": frame_idx, "Time (s)": round(frame_idx / fps, 2), "Fault Type": label_name, "Confidence": round(conf, 2), "Box": f"({x1},{y1},{x2},{y2})" }) return frame, faults # === Video Processing Function === def process_video(video_path): cap = cv2.VideoCapture(video_path) fps = int(cap.get(cv2.CAP_PROP_FPS)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) output_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height)) fault_log = [] progress = st.progress(0) for frame_idx in range(total_frames): ret, frame = cap.read() if not ret: break if frame_idx % fps == 0: frame, faults = detect_faults(frame, frame_idx, fps) fault_log.extend(faults) writer.write(frame) progress.progress(min(frame_idx / total_frames, 1.0)) cap.release() writer.release() return output_path, fault_log # === CSV Helper === def convert_df(df): return df.to_csv(index=False).encode('utf-8') # === Streamlit Interface === uploaded_file = st.file_uploader("๐Ÿ“ค Upload thermal video", type=["mp4"]) if uploaded_file: st.video(uploaded_file) temp_input_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name with open(temp_input_path, "wb") as f: f.write(uploaded_file.read()) output_path, log = process_video(temp_input_path) st.subheader("๐Ÿงช Processed Output") st.video(output_path) if log: df = pd.DataFrame(log) st.write("### ๐Ÿ“Š Detected Faults Table") st.dataframe(df) st.download_button("๐Ÿ“ฅ Download Fault Log CSV", convert_df(df), "fault_log.csv", "text/csv") else: st.success("โœ… No faults detected.") os.unlink(temp_input_path) os.unlink(output_path) st.markdown("---") st.caption("Built with Streamlit + Hugging Face DETR + OpenCV")