anycoder-5932b618 / utils.py
kamcio1989's picture
Upload 8 files
c289710 verified
import numpy as np
from PIL import Image, ImageDraw
import wave
import os
import json
import time
import threading
import queue
# Try to import cv2, but make it optional
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
# Try to import sound libraries
try:
import pyaudio
import numpy as np
AUDIO_AVAILABLE = True
except ImportError:
AUDIO_AVAILABLE = False
def generate_tone(frequency, duration, sample_rate=44100, volume=0.5):
"""Generate a simple tone."""
if not AUDIO_AVAILABLE:
return None
frames = int(duration * sample_rate)
arr = np.zeros(frames)
for i in range(frames):
arr[i] = volume * np.sin(2 * np.pi * frequency * i / sample_rate)
return arr.astype(np.float32)
def play_sound(sound_type):
"""Play different alarm sounds or a custom audio file."""
if not AUDIO_AVAILABLE:
print(f"Alarm: {sound_type} (audio not available)")
return
p = pyaudio.PyAudio()
try:
# Check if sound_type is a path to a custom .wav file
if isinstance(sound_type, str) and sound_type.endswith('.wav') and os.path.exists(sound_type):
with wave.open(sound_type, 'rb') as wf:
stream = p.open(format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True)
data = wf.readframes(1024)
while data:
stream.write(data)
data = wf.readframes(1024)
stream.stop_stream()
stream.close()
else:
# Existing tone generation logic
sound_patterns = {
"Beep": [(440, 0.2), (440, 0.2)],
"Siren": [(600, 0.1), (800, 0.1), (600, 0.1), (800, 0.1)],
"Chime": [(523, 0.3), (659, 0.3), (784, 0.5)],
"Alert": [(1000, 0.1), (1500, 0.1), (2000, 0.1)],
"Buzzer": [(200, 0.5)],
"Ring": [(800, 0.2), (600, 0.2), (800, 0.2), (600, 0.2)]
}
stream = p.open(format=pyaudio.paFloat32,
channels=1,
rate=44100,
output=True)
if sound_type in sound_patterns:
for freq, duration in sound_patterns[sound_type]:
tone = generate_tone(freq, duration)
if tone is not None:
stream.write(tone.tobytes())
stream.stop_stream()
stream.close()
except Exception as e:
print(f"Error playing sound: {e}")
finally:
p.terminate()
class AlarmSystem:
"""Manages alarm functionality."""
def __init__(self):
self.alarm_queue = queue.Queue()
self.alarm_thread = threading.Thread(target=self._alarm_worker, daemon=True)
self.alarm_thread.start()
self.last_alarm_time = 0
self.alarm_cooldown = 2 # seconds between alarms
def _alarm_worker(self):
"""Worker thread for playing alarms."""
while True:
try:
sound_type = self.alarm_queue.get(timeout=1)
if sound_type:
play_sound(sound_type)
self.alarm_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Alarm worker error: {e}")
def trigger_alarm(self, sound_type):
"""Trigger an alarm with cooldown."""
current_time = time.time()
if current_time - self.last_alarm_time > self.alarm_cooldown:
self.alarm_queue.put(sound_type)
self.last_alarm_time = current_time
return True
return False
def draw_detections(image, face_results, object_results, show_labels, box_color):
"""Draw detection boxes on image using PIL."""
try:
pil_image = Image.fromarray(image)
draw = ImageDraw.Draw(pil_image)
# Convert color name to RGB
color_map = {
"red": (255, 0, 0),
"green": (0, 255, 0),
"blue": (0, 0, 255),
"yellow": (255, 255, 0),
"purple": (128, 0, 128),
"orange": (255, 165, 0)
}
color = color_map.get(box_color, (255, 0, 0))
# Draw face boxes
for face in face_results:
x, y, w, h = face["bbox"]
draw.rectangle([x, y, x + w, y + h], outline=color, width=3)
if show_labels:
label = f"Face {face.get('confidence', '')}"
draw.text((x, y - 20), label, fill=color)
# Draw object boxes
for obj in object_results:
x, y, w, h = obj["bbox"]
draw.rectangle([x, y, x + w, y + h], outline=color, width=3)
if show_labels:
label = f"{obj['label']} {obj.get('confidence', '')}"
draw.text((x, y - 20), label, fill=color)
return np.array(pil_image)
except Exception as e:
print(f"Error drawing detections: {e}")
return image
def process_image(image, face_cascade, object_net, object_classes, enable_face, enable_objects, face_conf, object_conf):
"""Process image and detect faces and objects."""
from models import detect_faces, detect_objects
face_results = []
object_results = []
if enable_face:
face_results = detect_faces(image, face_cascade, face_conf)
if enable_objects:
object_results = detect_objects(image, object_net, object_classes, object_conf)
return image.copy(), face_results, object_results