File size: 5,858 Bytes
f8f1764
 
c289710
 
f8f1764
c289710
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f8f1764
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c289710
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import numpy as np
from PIL import Image, ImageDraw
import wave
import os
import json
import time
import threading
import queue

# Try to import cv2, but make it optional
try:
    import cv2
    CV2_AVAILABLE = True
except ImportError:
    CV2_AVAILABLE = False

# Try to import sound libraries
try:
    import pyaudio
    import numpy as np
    AUDIO_AVAILABLE = True
except ImportError:
    AUDIO_AVAILABLE = False

def generate_tone(frequency, duration, sample_rate=44100, volume=0.5):
    """Generate a simple tone."""
    if not AUDIO_AVAILABLE:
        return None
    
    frames = int(duration * sample_rate)
    arr = np.zeros(frames)
    for i in range(frames):
        arr[i] = volume * np.sin(2 * np.pi * frequency * i / sample_rate)
    return arr.astype(np.float32)

def play_sound(sound_type):
    """Play different alarm sounds or a custom audio file."""
    if not AUDIO_AVAILABLE:
        print(f"Alarm: {sound_type} (audio not available)")
        return
    
    p = pyaudio.PyAudio()
    
    try:
        # Check if sound_type is a path to a custom .wav file
        if isinstance(sound_type, str) and sound_type.endswith('.wav') and os.path.exists(sound_type):
            with wave.open(sound_type, 'rb') as wf:
                stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
                               channels=wf.getnchannels(),
                               rate=wf.getframerate(),
                               output=True)
                
                data = wf.readframes(1024)
                while data:
                    stream.write(data)
                    data = wf.readframes(1024)

                stream.stop_stream()
                stream.close()
        else:
            # Existing tone generation logic
            sound_patterns = {
                "Beep": [(440, 0.2), (440, 0.2)],
                "Siren": [(600, 0.1), (800, 0.1), (600, 0.1), (800, 0.1)],
                "Chime": [(523, 0.3), (659, 0.3), (784, 0.5)],
                "Alert": [(1000, 0.1), (1500, 0.1), (2000, 0.1)],
                "Buzzer": [(200, 0.5)],
                "Ring": [(800, 0.2), (600, 0.2), (800, 0.2), (600, 0.2)]
            }
            
            stream = p.open(format=pyaudio.paFloat32,
                           channels=1,
                           rate=44100,
                           output=True)
            
            if sound_type in sound_patterns:
                for freq, duration in sound_patterns[sound_type]:
                    tone = generate_tone(freq, duration)
                    if tone is not None:
                        stream.write(tone.tobytes())
            
            stream.stop_stream()
            stream.close()

    except Exception as e:
        print(f"Error playing sound: {e}")
    finally:
        p.terminate()

class AlarmSystem:
    """Manages alarm functionality."""
    def __init__(self):
        self.alarm_queue = queue.Queue()
        self.alarm_thread = threading.Thread(target=self._alarm_worker, daemon=True)
        self.alarm_thread.start()
        self.last_alarm_time = 0
        self.alarm_cooldown = 2  # seconds between alarms
    
    def _alarm_worker(self):
        """Worker thread for playing alarms."""
        while True:
            try:
                sound_type = self.alarm_queue.get(timeout=1)
                if sound_type:
                    play_sound(sound_type)
                self.alarm_queue.task_done()
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Alarm worker error: {e}")
    
    def trigger_alarm(self, sound_type):
        """Trigger an alarm with cooldown."""
        current_time = time.time()
        if current_time - self.last_alarm_time > self.alarm_cooldown:
            self.alarm_queue.put(sound_type)
            self.last_alarm_time = current_time
            return True
        return False

def draw_detections(image, face_results, object_results, show_labels, box_color):
    """Draw detection boxes on image using PIL."""
    try:
        pil_image = Image.fromarray(image)
        draw = ImageDraw.Draw(pil_image)
        
        # Convert color name to RGB
        color_map = {
            "red": (255, 0, 0),
            "green": (0, 255, 0),
            "blue": (0, 0, 255),
            "yellow": (255, 255, 0),
            "purple": (128, 0, 128),
            "orange": (255, 165, 0)
        }
        color = color_map.get(box_color, (255, 0, 0))
        
        # Draw face boxes
        for face in face_results:
            x, y, w, h = face["bbox"]
            draw.rectangle([x, y, x + w, y + h], outline=color, width=3)
            if show_labels:
                label = f"Face {face.get('confidence', '')}"
                draw.text((x, y - 20), label, fill=color)
        
        # Draw object boxes
        for obj in object_results:
            x, y, w, h = obj["bbox"]
            draw.rectangle([x, y, x + w, y + h], outline=color, width=3)
            if show_labels:
                label = f"{obj['label']} {obj.get('confidence', '')}"
                draw.text((x, y - 20), label, fill=color)
        
        return np.array(pil_image)
    except Exception as e:
        print(f"Error drawing detections: {e}")
        return image

def process_image(image, face_cascade, object_net, object_classes, enable_face, enable_objects, face_conf, object_conf):
    """Process image and detect faces and objects."""
    from models import detect_faces, detect_objects
    
    face_results = []
    object_results = []
    
    if enable_face:
        face_results = detect_faces(image, face_cascade, face_conf)
    
    if enable_objects:
        object_results = detect_objects(image, object_net, object_classes, object_conf)
    
    return image.copy(), face_results, object_results