import numpy as np from PIL import Image, ImageDraw import wave import os import json import time import threading import queue # Try to import cv2, but make it optional try: import cv2 CV2_AVAILABLE = True except ImportError: CV2_AVAILABLE = False # Try to import sound libraries try: import pyaudio import numpy as np AUDIO_AVAILABLE = True except ImportError: AUDIO_AVAILABLE = False def generate_tone(frequency, duration, sample_rate=44100, volume=0.5): """Generate a simple tone.""" if not AUDIO_AVAILABLE: return None frames = int(duration * sample_rate) arr = np.zeros(frames) for i in range(frames): arr[i] = volume * np.sin(2 * np.pi * frequency * i / sample_rate) return arr.astype(np.float32) def play_sound(sound_type): """Play different alarm sounds or a custom audio file.""" if not AUDIO_AVAILABLE: print(f"Alarm: {sound_type} (audio not available)") return p = pyaudio.PyAudio() try: # Check if sound_type is a path to a custom .wav file if isinstance(sound_type, str) and sound_type.endswith('.wav') and os.path.exists(sound_type): with wave.open(sound_type, 'rb') as wf: stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True) data = wf.readframes(1024) while data: stream.write(data) data = wf.readframes(1024) stream.stop_stream() stream.close() else: # Existing tone generation logic sound_patterns = { "Beep": [(440, 0.2), (440, 0.2)], "Siren": [(600, 0.1), (800, 0.1), (600, 0.1), (800, 0.1)], "Chime": [(523, 0.3), (659, 0.3), (784, 0.5)], "Alert": [(1000, 0.1), (1500, 0.1), (2000, 0.1)], "Buzzer": [(200, 0.5)], "Ring": [(800, 0.2), (600, 0.2), (800, 0.2), (600, 0.2)] } stream = p.open(format=pyaudio.paFloat32, channels=1, rate=44100, output=True) if sound_type in sound_patterns: for freq, duration in sound_patterns[sound_type]: tone = generate_tone(freq, duration) if tone is not None: stream.write(tone.tobytes()) stream.stop_stream() stream.close() except Exception as e: print(f"Error playing sound: {e}") finally: p.terminate() class AlarmSystem: """Manages alarm functionality.""" def __init__(self): self.alarm_queue = queue.Queue() self.alarm_thread = threading.Thread(target=self._alarm_worker, daemon=True) self.alarm_thread.start() self.last_alarm_time = 0 self.alarm_cooldown = 2 # seconds between alarms def _alarm_worker(self): """Worker thread for playing alarms.""" while True: try: sound_type = self.alarm_queue.get(timeout=1) if sound_type: play_sound(sound_type) self.alarm_queue.task_done() except queue.Empty: continue except Exception as e: print(f"Alarm worker error: {e}") def trigger_alarm(self, sound_type): """Trigger an alarm with cooldown.""" current_time = time.time() if current_time - self.last_alarm_time > self.alarm_cooldown: self.alarm_queue.put(sound_type) self.last_alarm_time = current_time return True return False def draw_detections(image, face_results, object_results, show_labels, box_color): """Draw detection boxes on image using PIL.""" try: pil_image = Image.fromarray(image) draw = ImageDraw.Draw(pil_image) # Convert color name to RGB color_map = { "red": (255, 0, 0), "green": (0, 255, 0), "blue": (0, 0, 255), "yellow": (255, 255, 0), "purple": (128, 0, 128), "orange": (255, 165, 0) } color = color_map.get(box_color, (255, 0, 0)) # Draw face boxes for face in face_results: x, y, w, h = face["bbox"] draw.rectangle([x, y, x + w, y + h], outline=color, width=3) if show_labels: label = f"Face {face.get('confidence', '')}" draw.text((x, y - 20), label, fill=color) # Draw object boxes for obj in object_results: x, y, w, h = obj["bbox"] draw.rectangle([x, y, x + w, y + h], outline=color, width=3) if show_labels: label = f"{obj['label']} {obj.get('confidence', '')}" draw.text((x, y - 20), label, fill=color) return np.array(pil_image) except Exception as e: print(f"Error drawing detections: {e}") return image def process_image(image, face_cascade, object_net, object_classes, enable_face, enable_objects, face_conf, object_conf): """Process image and detect faces and objects.""" from models import detect_faces, detect_objects face_results = [] object_results = [] if enable_face: face_results = detect_faces(image, face_cascade, face_conf) if enable_objects: object_results = detect_objects(image, object_net, object_classes, object_conf) return image.copy(), face_results, object_results