Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| SafetyMaster Pro - Gradio Interface | |
| Real-time safety equipment detection with modern web UI | |
| Optimized for easy deployment on Hugging Face Spaces, Gradio Cloud, and other platforms | |
| """ | |
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| import PIL.Image | |
| import time | |
| import json | |
| import os | |
| from datetime import datetime | |
| from typing import Dict, List, Tuple, Optional | |
| import threading | |
| import queue | |
| # Import our existing safety detector | |
| from safety_detector import SafetyDetector | |
| from camera_manager import CameraManager | |
| class SafetyMasterGradio: | |
| """Gradio interface for SafetyMaster Pro""" | |
| def __init__(self): | |
| """Initialize the Gradio interface""" | |
| self.detector = None | |
| self.camera_manager = None | |
| self.monitoring_active = False | |
| self.violation_log = [] | |
| self.frame_queue = queue.Queue(maxsize=10) | |
| # Initialize detector | |
| self._initialize_detector() | |
| def _initialize_detector(self): | |
| """Initialize the safety detector""" | |
| try: | |
| print("π€ Loading AI model for safety detection...") | |
| self.detector = SafetyDetector() | |
| print("β Safety detector initialized successfully") | |
| return True | |
| except Exception as e: | |
| print(f"β Error initializing detector: {e}") | |
| return False | |
| def detect_safety_violations_image(self, image: PIL.Image.Image) -> Tuple[PIL.Image.Image, str, str]: | |
| """ | |
| Detect safety violations in uploaded image | |
| Args: | |
| image: PIL Image from Gradio | |
| Returns: | |
| Tuple of (annotated_image, violations_json, summary_text) | |
| """ | |
| if image is None: | |
| return None, "No image provided", "Please upload an image" | |
| if self.detector is None: | |
| return image, "Detector not initialized", "Error: AI model not loaded" | |
| try: | |
| # Convert PIL to OpenCV format | |
| cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) | |
| # Run detection | |
| results = self.detector.detect_safety_violations(cv_image) | |
| # Draw annotations | |
| annotated_frame = self.detector.draw_detections(cv_image, results) | |
| # Convert back to PIL for Gradio | |
| annotated_image = PIL.Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)) | |
| # Create violation summary | |
| violations = results.get('violations', []) | |
| people_count = results.get('people_count', 0) | |
| safety_equipment = results.get('safety_equipment', {}) | |
| # Format violations as JSON | |
| violations_json = json.dumps({ | |
| 'people_detected': people_count, | |
| 'safety_equipment_detected': safety_equipment, | |
| 'violations': violations, | |
| 'processing_time': results.get('processing_time', 0), | |
| 'timestamp': datetime.now().isoformat() | |
| }, indent=2) | |
| # Create human-readable summary | |
| summary_parts = [ | |
| f"π₯ People Detected: {people_count}", | |
| f"β‘ Processing Time: {results.get('processing_time', 0):.3f}s" | |
| ] | |
| if safety_equipment: | |
| summary_parts.append("\nπ‘οΈ Safety Equipment Detected:") | |
| for equipment, count in safety_equipment.items(): | |
| if count > 0: | |
| summary_parts.append(f" β’ {equipment.replace('_', ' ').title()}: {count}") | |
| if violations: | |
| summary_parts.append(f"\nβ οΈ Safety Violations Found: {len(violations)}") | |
| for violation in violations: | |
| severity_emoji = "π΄" if violation.get('severity') == 'high' else "π‘" | |
| summary_parts.append(f" {severity_emoji} {violation.get('description', 'Unknown violation')}") | |
| else: | |
| summary_parts.append("\nβ No Safety Violations Detected") | |
| summary_text = "\n".join(summary_parts) | |
| # Log violation if any | |
| if violations: | |
| self._log_violation(violations, 'image_upload') | |
| return annotated_image, violations_json, summary_text | |
| except Exception as e: | |
| error_msg = f"Error processing image: {str(e)}" | |
| return image, f'{{"error": "{error_msg}"}}', f"β {error_msg}" | |
| def start_camera_monitoring(self) -> Tuple[str, str]: | |
| """Start real-time camera monitoring""" | |
| try: | |
| if self.monitoring_active: | |
| return "β οΈ Monitoring already active", "Camera monitoring is already running" | |
| # Initialize camera | |
| self.camera_manager = CameraManager(source=0) | |
| if not self.camera_manager.start_capture(): | |
| return "β Failed to start camera", "Could not access camera. Please check permissions." | |
| self.monitoring_active = True | |
| # Start monitoring thread | |
| monitor_thread = threading.Thread(target=self._camera_monitoring_loop, daemon=True) | |
| monitor_thread.start() | |
| return "β Camera monitoring started", "Real-time safety monitoring is now active" | |
| except Exception as e: | |
| return f"β Error: {str(e)}", f"Failed to start monitoring: {str(e)}" | |
| def stop_camera_monitoring(self) -> Tuple[str, str]: | |
| """Stop real-time camera monitoring""" | |
| try: | |
| self.monitoring_active = False | |
| if self.camera_manager: | |
| self.camera_manager.stop_capture() | |
| self.camera_manager = None | |
| return "π Camera monitoring stopped", "Real-time monitoring has been stopped" | |
| except Exception as e: | |
| return f"β Error: {str(e)}", f"Failed to stop monitoring: {str(e)}" | |
| def _camera_monitoring_loop(self): | |
| """Background loop for camera monitoring""" | |
| while self.monitoring_active and self.camera_manager: | |
| try: | |
| frame_data = self.camera_manager.get_latest_frame() | |
| if frame_data is not None: | |
| frame, timestamp = frame_data | |
| # Run detection | |
| results = self.detector.detect_safety_violations(frame) | |
| # Draw annotations | |
| annotated_frame = self.detector.draw_detections(frame, results) | |
| # Convert to PIL for Gradio | |
| pil_image = PIL.Image.fromarray(cv2.cvtColor(annotated_frame, cv2.COLOR_BGR2RGB)) | |
| # Add to queue (non-blocking) | |
| try: | |
| self.frame_queue.put_nowait((pil_image, results)) | |
| except queue.Full: | |
| # Remove old frame and add new one | |
| try: | |
| self.frame_queue.get_nowait() | |
| self.frame_queue.put_nowait((pil_image, results)) | |
| except queue.Empty: | |
| pass | |
| # Log violations | |
| if results.get('violations'): | |
| self._log_violation(results['violations'], 'camera_monitoring') | |
| time.sleep(0.1) # 10 FPS | |
| except Exception as e: | |
| print(f"Error in camera monitoring: {e}") | |
| time.sleep(1) | |
| def get_camera_frame(self) -> Tuple[PIL.Image.Image, str]: | |
| """Get latest camera frame for Gradio display""" | |
| try: | |
| if not self.monitoring_active: | |
| return None, "Camera monitoring not active" | |
| # Get latest frame from queue | |
| try: | |
| pil_image, results = self.frame_queue.get_nowait() | |
| # Create status text | |
| people_count = results.get('people_count', 0) | |
| violations = results.get('violations', []) | |
| status_parts = [ | |
| f"π₯ People: {people_count}", | |
| f"β οΈ Violations: {len(violations)}", | |
| f"π {datetime.now().strftime('%H:%M:%S')}" | |
| ] | |
| if violations: | |
| status_parts.append("π΄ SAFETY VIOLATIONS DETECTED!") | |
| else: | |
| status_parts.append("β All Clear") | |
| status_text = " | ".join(status_parts) | |
| return pil_image, status_text | |
| except queue.Empty: | |
| return None, "Waiting for camera frame..." | |
| except Exception as e: | |
| return None, f"Error: {str(e)}" | |
| def _log_violation(self, violations: List[Dict], source: str): | |
| """Log violations to internal log""" | |
| timestamp = datetime.now().isoformat() | |
| for violation in violations: | |
| log_entry = { | |
| 'timestamp': timestamp, | |
| 'source': source, | |
| 'type': violation.get('type', 'unknown'), | |
| 'description': violation.get('description', 'Unknown violation'), | |
| 'severity': violation.get('severity', 'medium') | |
| } | |
| self.violation_log.append(log_entry) | |
| # Keep only last 100 violations | |
| if len(self.violation_log) > 100: | |
| self.violation_log = self.violation_log[-100:] | |
| def get_violation_log(self) -> str: | |
| """Get formatted violation log""" | |
| if not self.violation_log: | |
| return "No violations recorded" | |
| log_text = "π Recent Safety Violations:\n\n" | |
| # Show last 10 violations | |
| recent_violations = self.violation_log[-10:] | |
| for i, violation in enumerate(reversed(recent_violations), 1): | |
| timestamp = datetime.fromisoformat(violation['timestamp']).strftime('%H:%M:%S') | |
| severity_emoji = "π΄" if violation['severity'] == 'high' else "π‘" | |
| log_text += f"{i}. [{timestamp}] {severity_emoji} {violation['description']}\n" | |
| log_text += f" Source: {violation['source']} | Type: {violation['type']}\n\n" | |
| if len(self.violation_log) > 10: | |
| log_text += f"... and {len(self.violation_log) - 10} more violations\n" | |
| log_text += f"\nTotal violations logged: {len(self.violation_log)}" | |
| return log_text | |
| def get_model_info(self) -> str: | |
| """Get information about the loaded model""" | |
| if self.detector is None: | |
| return "β Detector not initialized" | |
| try: | |
| classes = self.detector.get_model_classes() | |
| device = getattr(self.detector, 'device', 'unknown') | |
| info_text = f""" | |
| π€ **SafetyMaster Pro AI Model Information** | |
| **Device**: {device} | |
| **Model Type**: YOLOv8 PPE Detection | |
| **Classes Detected**: {len(classes)} total | |
| **Safety Equipment**: | |
| β’ Hard Hats / Helmets | |
| β’ Safety Vests | |
| β’ Face Masks | |
| β’ Safety Glasses | |
| β’ Gloves | |
| β’ Hearing Protection | |
| **Violations Detected**: | |
| β’ Missing Hard Hat | |
| β’ Missing Safety Vest | |
| β’ Missing Face Mask | |
| β’ Person without PPE | |
| **Model Classes**: {', '.join(classes[:10])}{'...' if len(classes) > 10 else ''} | |
| """ | |
| return info_text.strip() | |
| except Exception as e: | |
| return f"β Error getting model info: {str(e)}" | |
| def create_interface(self) -> gr.Blocks: | |
| """Create the Gradio interface""" | |
| # Custom CSS for better styling | |
| css = """ | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| } | |
| .violation-box { | |
| background-color: #fee; | |
| border: 2px solid #f88; | |
| border-radius: 8px; | |
| padding: 10px; | |
| } | |
| .success-box { | |
| background-color: #efe; | |
| border: 2px solid #8f8; | |
| border-radius: 8px; | |
| padding: 10px; | |
| } | |
| """ | |
| with gr.Blocks( | |
| title="SafetyMaster Pro - AI Safety Monitoring", | |
| theme=gr.themes.Soft(), | |
| css=css | |
| ) as interface: | |
| # Header | |
| gr.Markdown(""" | |
| # π‘οΈ SafetyMaster Pro - AI Safety Monitoring | |
| **Real-time PPE detection and safety compliance monitoring** | |
| Detects: Hard Hats, Safety Vests, Face Masks, Safety Glasses, and Safety Violations | |
| """) | |
| with gr.Tabs(): | |
| # Tab 1: Image Upload Detection | |
| with gr.Tab("π· Image Analysis"): | |
| gr.Markdown("### Upload an image to detect safety equipment and violations") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| input_image = gr.Image( | |
| type="pil", | |
| label="Upload Image", | |
| height=400 | |
| ) | |
| detect_btn = gr.Button( | |
| "π Analyze Safety Compliance", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=1): | |
| output_image = gr.Image( | |
| label="Detection Results", | |
| height=400 | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| summary_text = gr.Textbox( | |
| label="π Summary", | |
| lines=8, | |
| max_lines=15 | |
| ) | |
| with gr.Column(): | |
| violations_json = gr.JSON( | |
| label="π Detailed Results", | |
| height=300 | |
| ) | |
| # Connect the detection function | |
| detect_btn.click( | |
| fn=self.detect_safety_violations_image, | |
| inputs=[input_image], | |
| outputs=[output_image, violations_json, summary_text] | |
| ) | |
| # Tab 2: Real-time Camera Monitoring | |
| with gr.Tab("πΉ Live Camera Monitoring"): | |
| gr.Markdown("### Real-time safety monitoring using your camera") | |
| with gr.Row(): | |
| start_btn = gr.Button("βΆοΈ Start Monitoring", variant="primary") | |
| stop_btn = gr.Button("βΉοΈ Stop Monitoring", variant="stop") | |
| with gr.Row(): | |
| camera_status = gr.Textbox( | |
| label="π‘ Camera Status", | |
| value="Camera not started", | |
| interactive=False | |
| ) | |
| frame_status = gr.Textbox( | |
| label="π Live Status", | |
| value="No data", | |
| interactive=False | |
| ) | |
| live_image = gr.Image( | |
| label="π΄ Live Camera Feed", | |
| height=500 | |
| ) | |
| # Connect camera functions | |
| start_btn.click( | |
| fn=self.start_camera_monitoring, | |
| outputs=[camera_status, frame_status] | |
| ) | |
| stop_btn.click( | |
| fn=self.stop_camera_monitoring, | |
| outputs=[camera_status, frame_status] | |
| ) | |
| # Auto-refresh camera feed every 2 seconds | |
| interface.load( | |
| fn=self.get_camera_frame, | |
| outputs=[live_image, frame_status], | |
| every=2 | |
| ) | |
| # Tab 3: Violation Log | |
| with gr.Tab("π Violation Log"): | |
| gr.Markdown("### Recent safety violations and compliance history") | |
| refresh_log_btn = gr.Button("π Refresh Log", variant="secondary") | |
| violation_log_display = gr.Textbox( | |
| label="π Violation History", | |
| lines=20, | |
| max_lines=30, | |
| value="No violations recorded" | |
| ) | |
| refresh_log_btn.click( | |
| fn=self.get_violation_log, | |
| outputs=[violation_log_display] | |
| ) | |
| # Auto-refresh log every 10 seconds | |
| interface.load( | |
| fn=self.get_violation_log, | |
| outputs=[violation_log_display], | |
| every=10 | |
| ) | |
| # Tab 4: Model Information | |
| with gr.Tab("π€ AI Model Info"): | |
| gr.Markdown("### Information about the AI detection model") | |
| model_info_display = gr.Markdown( | |
| value=self.get_model_info() | |
| ) | |
| refresh_model_btn = gr.Button("π Refresh Model Info") | |
| refresh_model_btn.click( | |
| fn=self.get_model_info, | |
| outputs=[model_info_display] | |
| ) | |
| # Footer | |
| gr.Markdown(""" | |
| --- | |
| **SafetyMaster Pro** - Powered by YOLOv8 AI Detection | Built with β€οΈ for workplace safety | |
| β οΈ **Note**: For camera monitoring, please allow camera access when prompted by your browser. | |
| """) | |
| return interface | |
| def main(): | |
| """Main function to launch the Gradio app""" | |
| print("π Starting SafetyMaster Pro - Gradio Interface") | |
| # Create the Gradio app | |
| app = SafetyMasterGradio() | |
| interface = app.create_interface() | |
| # Launch configuration | |
| launch_kwargs = { | |
| "server_name": "0.0.0.0", # Allow external access | |
| "server_port": int(os.environ.get("PORT", 7860)), # Use PORT env var or default | |
| "share": False, # Set to True for public sharing | |
| "debug": False, | |
| "show_error": True, | |
| "quiet": False | |
| } | |
| print(f"π Launching on port {launch_kwargs['server_port']}") | |
| print("π± Access the app at: http://localhost:7860") | |
| # Launch the interface | |
| interface.launch(**launch_kwargs) | |
| if __name__ == "__main__": | |
| main() |