Testing_PT / src /streamlit_app.py
NaveenKumar5's picture
Update src/streamlit_app.py
645f7e3 verified
raw
history blame
3.86 kB
import streamlit as st
import cv2
import numpy as np
import tempfile
import os
import pandas as pd
from PIL import Image
import torch
from transformers import DetrImageProcessor, DetrForObjectDetection
# === Load Hugging Face Model using default cache (handles permissions safely) ===
processor = DetrImageProcessor.from_pretrained("NaveenKumar5/Solar_panel_fault_detection")
model = DetrForObjectDetection.from_pretrained("NaveenKumar5/Solar_panel_fault_detection")
model.eval()
# === Streamlit App Setup ===
st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide")
st.title("πŸ” Solar Panel Fault Detection (DETR - Hugging Face)")
st.write("Upload a thermal video (MP4). Faults will be detected using a DETR model from Hugging Face.")
# === Fault Detection Function ===
def detect_faults(frame, frame_idx, fps):
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
inputs = processor(images=image, return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs)
target_sizes = torch.tensor([image.size[::-1]])
results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.7)[0]
faults = []
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
x1, y1, x2, y2 = map(int, box.tolist())
conf = score.item()
label_id = label.item()
label_name = f"class_{label_id}"
# Draw on frame
color = (0, 0, 255)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(frame, f"{label_name} ({conf:.2f})", (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
faults.append({
"Frame": frame_idx,
"Time (s)": round(frame_idx / fps, 2),
"Fault Type": label_name,
"Confidence": round(conf, 2),
"Box": f"({x1},{y1},{x2},{y2})"
})
return frame, faults
# === Video Processing Function ===
def process_video(video_path):
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
output_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (width, height))
fault_log = []
progress = st.progress(0)
for frame_idx in range(total_frames):
ret, frame = cap.read()
if not ret:
break
if frame_idx % fps == 0:
frame, faults = detect_faults(frame, frame_idx, fps)
fault_log.extend(faults)
writer.write(frame)
progress.progress(min(frame_idx / total_frames, 1.0))
cap.release()
writer.release()
return output_path, fault_log
# === CSV Helper ===
def convert_df(df):
return df.to_csv(index=False).encode('utf-8')
# === Streamlit Interface ===
uploaded_file = st.file_uploader("πŸ“€ Upload thermal video", type=["mp4"])
if uploaded_file:
st.video(uploaded_file)
temp_input_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
with open(temp_input_path, "wb") as f:
f.write(uploaded_file.read())
output_path, log = process_video(temp_input_path)
st.subheader("πŸ§ͺ Processed Output")
st.video(output_path)
if log:
df = pd.DataFrame(log)
st.write("### πŸ“Š Detected Faults Table")
st.dataframe(df)
st.download_button("πŸ“₯ Download Fault Log CSV", convert_df(df), "fault_log.csv", "text/csv")
else:
st.success("βœ… No faults detected.")
os.unlink(temp_input_path)
os.unlink(output_path)
st.markdown("---")
st.caption("Built with Streamlit + Hugging Face DETR + OpenCV")