Spaces:
Runtime error
Runtime error
| # Wild Fire Tracker - Fire Detection MCP Server | |
| # Copyright (c) 2024 Wild Fire Tracker | |
| # Licensed under MIT License - see LICENSE file for details | |
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from PIL import Image | |
| from transformers import BlipProcessor, BlipForQuestionAnswering | |
| import torch | |
| # Load BLIP-2 model | |
| print("Loading BLIP-2 model...") | |
| vqa_processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base") | |
| vqa_model = BlipForQuestionAnswering.from_pretrained("Salesforce/blip-vqa-base", torch_dtype=torch.float16) | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| vqa_model = vqa_model.to(device) | |
| print(f"Model loaded on {device}") | |
| class FireDetectionMCP: | |
| def __init__(self): | |
| self.running = False | |
| self.current_frame = None | |
| self.status = "No video source" | |
| self.status_color = "#808080" # Gray | |
| self.last_analysis_time = 0 | |
| self.frame_count = 0 | |
| self.last_detection_time = None | |
| self.display_status = "No video source" # For video overlay (no emojis) | |
| def analyze_frame(self, frame): | |
| """Analyze frame for fire/smoke""" | |
| try: | |
| # Convert to PIL Image | |
| image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| image = image.resize((224, 224)) | |
| # Ask multiple questions for better detection | |
| fire_question = "Is there fire or flames in this image?" | |
| smoke_question = "Is there smoke in this image?" | |
| # Check for fire with confidence | |
| fire_inputs = vqa_processor(image, fire_question, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| fire_outputs = vqa_model.generate(**fire_inputs, max_length=10, return_dict_in_generate=True, output_scores=True) | |
| fire_answer = vqa_processor.decode(fire_outputs.sequences[0], skip_special_tokens=True).lower() | |
| fire_confidence = torch.softmax(fire_outputs.scores[0][0], dim=0).max().item() * 100 | |
| # Check for smoke with confidence | |
| smoke_inputs = vqa_processor(image, smoke_question, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| smoke_outputs = vqa_model.generate(**smoke_inputs, max_length=10, return_dict_in_generate=True, output_scores=True) | |
| smoke_answer = vqa_processor.decode(smoke_outputs.sequences[0], skip_special_tokens=True).lower() | |
| smoke_confidence = torch.softmax(smoke_outputs.scores[0][0], dim=0).max().item() * 100 | |
| # Determine result | |
| has_fire = 'yes' in fire_answer or 'fire' in fire_answer or 'flame' in fire_answer | |
| has_smoke = 'yes' in smoke_answer or 'smoke' in smoke_answer | |
| if has_fire and has_smoke: | |
| status_with_emoji = f"π₯π¨ FIRE & SMOKE DETECTED (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" | |
| status_no_emoji = f"FIRE & SMOKE DETECTED (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" | |
| return status_with_emoji, status_no_emoji, "#FF0000" # Red | |
| elif has_fire: | |
| status_with_emoji = f"π₯ FIRE DETECTED ({fire_confidence:.0f}%)" | |
| status_no_emoji = f"FIRE DETECTED ({fire_confidence:.0f}%)" | |
| return status_with_emoji, status_no_emoji, "#FF4500" # Orange | |
| elif has_smoke: | |
| status_with_emoji = f"π¨ SMOKE DETECTED ({smoke_confidence:.0f}%)" | |
| status_no_emoji = f"SMOKE DETECTED ({smoke_confidence:.0f}%)" | |
| return status_with_emoji, status_no_emoji, "#696969" # Gray | |
| else: | |
| status_with_emoji = f"β ALL CLEAR (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" | |
| status_no_emoji = f"ALL CLEAR (F:{fire_confidence:.0f}% S:{smoke_confidence:.0f}%)" | |
| return status_with_emoji, status_no_emoji, "#32CD32" # Green | |
| except Exception as e: | |
| return f"β ERROR: {str(e)}", "#FF0000" # Red | |
| def monitor_video(self, video_source): | |
| """Monitor video source""" | |
| if video_source.isdigit(): | |
| cap = cv2.VideoCapture(int(video_source)) | |
| else: | |
| cap = cv2.VideoCapture(video_source) | |
| if not cap.isOpened(): | |
| self.status = "β Cannot open video source" | |
| self.status_color = "#FF0000" | |
| return | |
| # Check if MP4 for looping | |
| is_mp4 = isinstance(video_source, str) and video_source.lower().endswith('.mp4') | |
| self.running = True | |
| self.frame_count = 0 | |
| while self.running: | |
| ret, frame = cap.read() | |
| # Loop MP4 files | |
| if not ret and is_mp4: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
| ret, frame = cap.read() | |
| self.frame_count = 0 | |
| if not ret: | |
| break | |
| self.frame_count += 1 | |
| current_time = time.time() | |
| # Resize for display | |
| display_frame = cv2.resize(frame, (640, 480)) | |
| # Analyze every 10 seconds (only if still running) | |
| if self.running and current_time - self.last_analysis_time >= 10.0: | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Analyzing frame {self.frame_count}...") | |
| self.status, self.display_status, self.status_color = self.analyze_frame(frame) | |
| self.last_analysis_time = current_time | |
| self.last_detection_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z') | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] Result: {self.status}") | |
| # Add status overlay | |
| cv2.rectangle(display_frame, (0, 0), (640, 80), (0, 0, 0), -1) | |
| # Convert hex color to BGR | |
| if self.status_color == "#32CD32": # Green | |
| color = (50, 205, 50) | |
| elif self.status_color == "#FF4500": # Orange | |
| color = (0, 69, 255) | |
| elif self.status_color == "#696969": # Gray | |
| color = (105, 105, 105) | |
| else: # Red | |
| color = (0, 0, 255) | |
| # cv2.putText(display_frame, self.status, (10, 30), | |
| # cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) | |
| cv2.putText(display_frame, self.display_status, (10, 60), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) | |
| # Add full timestamp | |
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S %Z') | |
| cv2.putText(display_frame, f"Time: {timestamp}", (10, 460), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2) | |
| # Store frame | |
| self.current_frame = cv2.cvtColor(display_frame, cv2.COLOR_BGR2RGB) | |
| time.sleep(0.04) # ~25 FPS | |
| cap.release() | |
| self.status = "Monitoring stopped" | |
| self.status_color = "#808080" | |
| def start_monitoring(self, video_source): | |
| """Start monitoring in thread""" | |
| if self.running: | |
| return "Already monitoring" | |
| if not video_source or (isinstance(video_source, str) and not video_source.strip()): | |
| return "Please provide a video source" | |
| thread = threading.Thread(target=self.monitor_video, args=(video_source,), daemon=True) | |
| thread.start() | |
| return f"β Started monitoring: {video_source}" | |
| def stop_monitoring(self): | |
| """Stop monitoring""" | |
| self.running = False | |
| self.current_frame = None | |
| self.status = "π Monitoring stopped" | |
| self.display_status = "Monitoring stopped" | |
| self.status_color = "#808080" | |
| return "π Monitoring stopped" | |
| def get_frame(self): | |
| """Get current frame""" | |
| if self.current_frame is not None: | |
| return self.current_frame | |
| else: | |
| # Placeholder | |
| placeholder = np.zeros((480, 640, 3), dtype=np.uint8) | |
| cv2.putText(placeholder, "Waiting for video stream...", (150, 240), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 255, 255), 2) | |
| return placeholder | |
| def get_status(self): | |
| """Get current status""" | |
| if self.last_detection_time: | |
| return f"{self.status} (Last check: {self.last_detection_time})" | |
| return self.status | |
| # Initialize MCP server | |
| mcp_server = FireDetectionMCP() | |
| def create_interface(): | |
| """Create Gradio interface""" | |
| with gr.Blocks(title="π₯ Fire Detection MCP Server", theme=gr.themes.Soft()) as interface: | |
| gr.Markdown("# π₯ Fire Detection MCP Server") | |
| gr.Markdown("Real-time fire and smoke detection from video streams (analyzes every 10 seconds)") | |
| gr.Markdown("β οΈ **Usage**: Upload your own video file or use live sources (webcam/RTSP). It may take few seconds to load stream and show analysis. Webcam may not work on HF Spaces.") | |
| gr.Markdown("π **Sample Videos**: [Fire Test Video](https://www.pexels.com/video/a-man-carrying-gear-walking-away-from-a-controlled-fire-8552246/) | [Smoke Test Video](https://www.pexels.com/video/aerial-view-of-controlled-forest-fire-in-spring-31361444/)") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Video Source Options") | |
| with gr.Tabs(): | |
| with gr.Tab("π Upload Video"): | |
| video_upload = gr.File( | |
| label="Upload MP4 Video", | |
| file_types=[".mp4", ".avi", ".mov"], | |
| type="filepath" | |
| ) | |
| upload_btn = gr.Button("π Start Monitoring", variant="primary") | |
| with gr.Tab("πΉ Live Sources"): | |
| video_input = gr.Textbox( | |
| label="Video Source", | |
| placeholder="0 (webcam), rtsp://url, or path/to/video.mp4", | |
| value="0" | |
| ) | |
| live_btn = gr.Button("π Start Monitoring", variant="primary") | |
| stop_btn = gr.Button("π Stop Monitoring", variant="secondary") | |
| control_output = gr.Textbox(label="Control Status", interactive=False) | |
| gr.Markdown("### Detection Status") | |
| status_display = gr.Textbox(label="Current Status", interactive=False) | |
| gr.Markdown("### Status Legend") | |
| gr.Markdown("π’ Clear | π Fire | β« Smoke | π΄ Error") | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Live Video Stream") | |
| video_display = gr.Image( | |
| label="Video Feed", | |
| height=480, | |
| width=640, | |
| interactive=False | |
| ) | |
| # Update functions | |
| def update_display(): | |
| frame = mcp_server.get_frame() | |
| status = mcp_server.get_status() | |
| return frame, status | |
| # Event handlers | |
| def start_from_upload(video_file): | |
| mcp_server.stop_monitoring() # Stop current stream | |
| if video_file is None: | |
| return "β Please upload a video file first" | |
| return mcp_server.start_monitoring(video_file) | |
| def start_live_source(video_source): | |
| mcp_server.stop_monitoring() # Stop current stream | |
| return mcp_server.start_monitoring(video_source) | |
| upload_btn.click( | |
| fn=start_from_upload, | |
| inputs=video_upload, | |
| outputs=control_output | |
| ) | |
| live_btn.click( | |
| fn=start_live_source, | |
| inputs=video_input, | |
| outputs=control_output | |
| ) | |
| stop_btn.click( | |
| fn=mcp_server.stop_monitoring, | |
| outputs=control_output | |
| ) | |
| # Auto-refresh every 0.5 seconds | |
| timer = gr.Timer(0.5) | |
| timer.tick( | |
| fn=update_display, | |
| outputs=[video_display, status_display] | |
| ) | |
| return interface | |
| if __name__ == "__main__": | |
| interface = create_interface() | |
| interface.launch(mcp_server=True, server_port=7860, share=False) |