Spaces:
No application file
No application file
| import streamlit as st | |
| import cv2 | |
| import numpy as np | |
| import tempfile | |
| import os | |
| import pandas as pd | |
| from PIL import Image | |
| import torch | |
| from transformers import DetrImageProcessor, DetrForObjectDetection | |
| # === Load Hugging Face Model using default cache (handles permissions safely) === | |
| processor = DetrImageProcessor.from_pretrained("NaveenKumar5/Solar_panel_fault_detection") | |
| model = DetrForObjectDetection.from_pretrained("NaveenKumar5/Solar_panel_fault_detection") | |
| model.eval() | |
| # === Streamlit App Setup === | |
| st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide") | |
| st.title("π Solar Panel Fault Detection (DETR - Hugging Face)") | |
| st.write("Upload a thermal video (MP4). Faults will be detected using a DETR model from Hugging Face.") | |
| # === Fault Detection Function === | |
| def detect_faults(frame, frame_idx, fps): | |
| image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| inputs = processor(images=image, return_tensors="pt") | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| target_sizes = torch.tensor([image.size[::-1]]) | |
| results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.7)[0] | |
| faults = [] | |
| for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): | |
| x1, y1, x2, y2 = map(int, box.tolist()) | |
| conf = score.item() | |
| label_id = label.item() | |
| label_name = f"class_{label_id}" | |
| # Draw on frame | |
| color = (0, 0, 255) | |
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) | |
| cv2.putText(frame, f"{label_name} ({conf:.2f})", (x1, y1 - 5), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) | |
| faults.append({ | |
| "Frame": frame_idx, | |
| "Time (s)": round(frame_idx / fps, 2), | |
| "Fault Type": label_name, | |
| "Confidence": round(conf, 2), | |
| "Box": f"({x1},{y1},{x2},{y2})" | |
| }) | |
| return frame, faults | |
| # === Video Processing Function === | |
| def process_video(video_path): | |
| cap = cv2.VideoCapture(video_path) | |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| output_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
| writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height)) | |
| fault_log = [] | |
| progress = st.progress(0) | |
| for frame_idx in range(total_frames): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_idx % fps == 0: | |
| frame, faults = detect_faults(frame, frame_idx, fps) | |
| fault_log.extend(faults) | |
| writer.write(frame) | |
| progress.progress(min(frame_idx / total_frames, 1.0)) | |
| cap.release() | |
| writer.release() | |
| return output_path, fault_log | |
| # === CSV Helper === | |
| def convert_df(df): | |
| return df.to_csv(index=False).encode('utf-8') | |
| # === Streamlit Interface === | |
| uploaded_file = st.file_uploader("π€ Upload thermal video", type=["mp4"]) | |
| if uploaded_file: | |
| st.video(uploaded_file) | |
| temp_input_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
| with open(temp_input_path, "wb") as f: | |
| f.write(uploaded_file.read()) | |
| output_path, log = process_video(temp_input_path) | |
| st.subheader("π§ͺ Processed Output") | |
| st.video(output_path) | |
| if log: | |
| df = pd.DataFrame(log) | |
| st.write("### π Detected Faults Table") | |
| st.dataframe(df) | |
| st.download_button("π₯ Download Fault Log CSV", convert_df(df), "fault_log.csv", "text/csv") | |
| else: | |
| st.success("β No faults detected.") | |
| os.unlink(temp_input_path) | |
| os.unlink(output_path) | |
| st.markdown("---") | |
| st.caption("Built with Streamlit + Hugging Face DETR + OpenCV") | |