File size: 6,672 Bytes
e81860f
2948299
 
e81860f
bf0e73e
 
 
e81860f
2948299
e81860f
 
2948299
 
 
 
 
e81860f
 
2948299
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e81860f
bf0e73e
2948299
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf0e73e
 
2948299
bf0e73e
2948299
bf0e73e
 
2948299
bf0e73e
2948299
 
 
 
bf0e73e
 
2948299
 
bf0e73e
2948299
 
 
 
b4d7818
2948299
 
 
 
bf0e73e
 
 
 
 
2948299
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf0e73e
2948299
 
bf0e73e
 
2948299
bf0e73e
e81860f
 
bf0e73e
e81860f
2948299
 
 
 
 
 
 
 
 
 
 
 
bf0e73e
2948299
 
 
 
 
 
bf0e73e
2948299
bf0e73e
2948299
 
 
bf0e73e
 
 
2948299
bf0e73e
2948299
bf0e73e
2948299
bf0e73e
2948299
 
 
bf0e73e
2948299
 
 
e81860f
2948299
e81860f
2948299
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import streamlit as st
import torch
from transformers import DetrImageProcessor, DetrForObjectDetection
import cv2
import numpy as np
import tempfile
import os

# Set page configuration
st.set_page_config(page_title="Solar Panel Fault Detection", layout="wide")

# Title and description
st.title("Solar Panel Fault Detection PoC")
st.write("Upload a thermal video (MP4) of a solar panel to detect thermal, dust, and power generation faults.")

# Load model and processor
@st.cache_resource
def load_model():
    processor = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50")
    model = DetrForObjectDetection.from_pretrained("facebook/detr-resnet-50")
    return processor, model

processor, model = load_model()

# Function to process frame and detect faults
def detect_faults(frame):
    # Convert frame to RGB if necessary
    if frame.shape[-1] == 4:
        frame = frame[:, :, :3]
    
    # Prepare frame for model
    inputs = processor(images=frame, return_tensors="pt")
    
    # Run inference
    with torch.no_grad():
        outputs = model(**inputs)
    
    # Post-process outputs
    target_sizes = torch.tensor([frame.shape[:2]])
    results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.9)[0]
    
    # Initialize fault detection
    faults = {"Thermal Fault": False, "Dust Fault": False, "Power Generation Fault": False}
    annotated_frame = frame.copy()
    
    # Analyze frame for faults
    for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
        box = [int(i) for i in box.tolist()]
        # Simulate fault detection based on bounding box and pixel intensity
        roi = frame[box[1]:box[3], box[0]:box[2]]
        mean_intensity = np.mean(roi)
        
        # Thermal Fault: High intensity (hotspot)
        if mean_intensity > 200:  # Adjust threshold based on thermal video scale
            faults["Thermal Fault"] = True
            cv2.rectangle(annotated_frame, (box[0], box[1]), (box[2], box[3]), (255, 0, 0), 2)
            cv2.putText(annotated_frame, "Thermal Fault", (box[0], box[1]-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
        
        # Dust Fault: Low intensity or irregular patterns
        elif mean_intensity < 100:  # Adjust threshold
            faults["Dust Fault"] = True
            cv2.rectangle(annotated_frame, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2)
            cv2.putText(annotated_frame, "Dust Fault", (box[0], box[1]-10), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        
        # Power Generation Fault: Any detected anomaly may indicate reduced efficiency
        if faults["Thermal Fault"] or faults["Dust Fault"]:
            faults["Power Generation Fault"] = True
    
    return annotated_frame, faults

# Function to process video and generate annotated output
def process_video(video_path):
    # Open video
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        st.error("Error: Could not open video file.")
        return None, None
    
    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Create temporary file for output video
    output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
    
    # Initialize fault summary
    video_faults = {"Thermal Fault": False, "Dust Fault": False, "Power Generation Fault": False}
    
    # Process each frame
    frame_count = 0
    with st.spinner("Analyzing video..."):
        progress = st.progress(0)
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            
            # Convert BGR to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            # Detect faults in frame
            annotated_frame, faults = detect_faults(frame_rgb)
            
            # Update video faults
            for fault in video_faults:
                video_faults[fault] |= faults[fault]
            
            # Convert back to BGR for writing
            annotated_frame_bgr = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR)
            out.write(annotated_frame_bgr)
            
            # Update progress
            frame_count += 1
            progress.progress(frame_count / total_frames)
    
    cap.release()
    out.release()
    
    return output_path, video_faults

# File uploader
uploaded_file = st.file_uploader("Upload a thermal video", type=["mp4"])

if uploaded_file is not None:
    # Save uploaded video to temporary file
    tfile = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
    tfile.write(uploaded_file.read())
    tfile.close()
    
    # Display uploaded video
    st.video(tfile.name, format="video/mp4")
    
    # Process video
    output_path, video_faults = process_video(tfile.name)
    
    if output_path:
        # Display results
        st.subheader("Fault Detection Results")
        st.video(output_path, format="video/mp4")
        
        # Show fault summary
        st.write("**Detected Faults in Video:**")
        for fault, detected in video_faults.items():
            status = "Detected" if detected else "Not Detected"
            color = "red" if detected else "green"
            st.markdown(f"- **{fault}**: <span style='color:{color}'>{status}</span>", unsafe_allow_html=True)
        
        # Provide recommendations
        if any(video_faults.values()):
            st.subheader("Recommendations")
            if video_faults["Thermal Fault"]:
                st.write("- **Thermal Fault**: Inspect for damaged components or overheating issues.")
            if video_faults["Dust Fault"]:
                st.write("- **Dust Fault**: Schedule cleaning to remove dust accumulation.")
            if video_faults["Power Generation Fault"]:
                st.write("- **Power Generation Fault**: Investigate efficiency issues due to detected faults.")
        else:
            st.write("No faults detected. The solar panel appears to be functioning normally.")
        
        # Clean up temporary files
        os.unlink(output_path)
    
    # Clean up uploaded file
    os.unlink(tfile.name)

# Footer
st.markdown("---")
st.write("Built with Streamlit, Hugging Face Transformers, and OpenCV for Solar Panel Fault Detection PoC")