Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from PIL import Image, ImageDraw | |
| import wave | |
| import os | |
| import json | |
| import time | |
| import threading | |
| import queue | |
| # Try to import cv2, but make it optional | |
| try: | |
| import cv2 | |
| CV2_AVAILABLE = True | |
| except ImportError: | |
| CV2_AVAILABLE = False | |
| # Try to import sound libraries | |
| try: | |
| import pyaudio | |
| import numpy as np | |
| AUDIO_AVAILABLE = True | |
| except ImportError: | |
| AUDIO_AVAILABLE = False | |
| def generate_tone(frequency, duration, sample_rate=44100, volume=0.5): | |
| """Generate a simple tone.""" | |
| if not AUDIO_AVAILABLE: | |
| return None | |
| frames = int(duration * sample_rate) | |
| arr = np.zeros(frames) | |
| for i in range(frames): | |
| arr[i] = volume * np.sin(2 * np.pi * frequency * i / sample_rate) | |
| return arr.astype(np.float32) | |
| def play_sound(sound_type): | |
| """Play different alarm sounds or a custom audio file.""" | |
| if not AUDIO_AVAILABLE: | |
| print(f"Alarm: {sound_type} (audio not available)") | |
| return | |
| p = pyaudio.PyAudio() | |
| try: | |
| # Check if sound_type is a path to a custom .wav file | |
| if isinstance(sound_type, str) and sound_type.endswith('.wav') and os.path.exists(sound_type): | |
| with wave.open(sound_type, 'rb') as wf: | |
| stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), | |
| channels=wf.getnchannels(), | |
| rate=wf.getframerate(), | |
| output=True) | |
| data = wf.readframes(1024) | |
| while data: | |
| stream.write(data) | |
| data = wf.readframes(1024) | |
| stream.stop_stream() | |
| stream.close() | |
| else: | |
| # Existing tone generation logic | |
| sound_patterns = { | |
| "Beep": [(440, 0.2), (440, 0.2)], | |
| "Siren": [(600, 0.1), (800, 0.1), (600, 0.1), (800, 0.1)], | |
| "Chime": [(523, 0.3), (659, 0.3), (784, 0.5)], | |
| "Alert": [(1000, 0.1), (1500, 0.1), (2000, 0.1)], | |
| "Buzzer": [(200, 0.5)], | |
| "Ring": [(800, 0.2), (600, 0.2), (800, 0.2), (600, 0.2)] | |
| } | |
| stream = p.open(format=pyaudio.paFloat32, | |
| channels=1, | |
| rate=44100, | |
| output=True) | |
| if sound_type in sound_patterns: | |
| for freq, duration in sound_patterns[sound_type]: | |
| tone = generate_tone(freq, duration) | |
| if tone is not None: | |
| stream.write(tone.tobytes()) | |
| stream.stop_stream() | |
| stream.close() | |
| except Exception as e: | |
| print(f"Error playing sound: {e}") | |
| finally: | |
| p.terminate() | |
| class AlarmSystem: | |
| """Manages alarm functionality.""" | |
| def __init__(self): | |
| self.alarm_queue = queue.Queue() | |
| self.alarm_thread = threading.Thread(target=self._alarm_worker, daemon=True) | |
| self.alarm_thread.start() | |
| self.last_alarm_time = 0 | |
| self.alarm_cooldown = 2 # seconds between alarms | |
| def _alarm_worker(self): | |
| """Worker thread for playing alarms.""" | |
| while True: | |
| try: | |
| sound_type = self.alarm_queue.get(timeout=1) | |
| if sound_type: | |
| play_sound(sound_type) | |
| self.alarm_queue.task_done() | |
| except queue.Empty: | |
| continue | |
| except Exception as e: | |
| print(f"Alarm worker error: {e}") | |
| def trigger_alarm(self, sound_type): | |
| """Trigger an alarm with cooldown.""" | |
| current_time = time.time() | |
| if current_time - self.last_alarm_time > self.alarm_cooldown: | |
| self.alarm_queue.put(sound_type) | |
| self.last_alarm_time = current_time | |
| return True | |
| return False | |
| def draw_detections(image, face_results, object_results, show_labels, box_color): | |
| """Draw detection boxes on image using PIL.""" | |
| try: | |
| pil_image = Image.fromarray(image) | |
| draw = ImageDraw.Draw(pil_image) | |
| # Convert color name to RGB | |
| color_map = { | |
| "red": (255, 0, 0), | |
| "green": (0, 255, 0), | |
| "blue": (0, 0, 255), | |
| "yellow": (255, 255, 0), | |
| "purple": (128, 0, 128), | |
| "orange": (255, 165, 0) | |
| } | |
| color = color_map.get(box_color, (255, 0, 0)) | |
| # Draw face boxes | |
| for face in face_results: | |
| x, y, w, h = face["bbox"] | |
| draw.rectangle([x, y, x + w, y + h], outline=color, width=3) | |
| if show_labels: | |
| label = f"Face {face.get('confidence', '')}" | |
| draw.text((x, y - 20), label, fill=color) | |
| # Draw object boxes | |
| for obj in object_results: | |
| x, y, w, h = obj["bbox"] | |
| draw.rectangle([x, y, x + w, y + h], outline=color, width=3) | |
| if show_labels: | |
| label = f"{obj['label']} {obj.get('confidence', '')}" | |
| draw.text((x, y - 20), label, fill=color) | |
| return np.array(pil_image) | |
| except Exception as e: | |
| print(f"Error drawing detections: {e}") | |
| return image | |
| def process_image(image, face_cascade, object_net, object_classes, enable_face, enable_objects, face_conf, object_conf): | |
| """Process image and detect faces and objects.""" | |
| from models import detect_faces, detect_objects | |
| face_results = [] | |
| object_results = [] | |
| if enable_face: | |
| face_results = detect_faces(image, face_cascade, face_conf) | |
| if enable_objects: | |
| object_results = detect_objects(image, object_net, object_classes, object_conf) | |
| return image.copy(), face_results, object_results |