emergency_any_test / run_live.py
dusen0528's picture
Upload folder using huggingface_hub
77c8117 verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
==============================================================================
์‹ค์‹œ๊ฐ„ ํ˜ธ์ถœ์–ด ๊ฐ์ง€ ์Šคํฌ๋ฆฝํŠธ
==============================================================================
๋งˆ์ดํฌ ์ž…๋ ฅ์„ ๋ฐ›์•„ ํ•™์Šต๋œ ์ปค์Šคํ…€ ๋ชจ๋ธ๋กœ ์‹ค์‹œ๊ฐ„ ํ˜ธ์ถœ์–ด ๊ฐ์ง€๋ฅผ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
์‚ฌ์šฉ๋ฒ•:
python run_live.py --model ./my_model.onnx --threshold 0.5
"""
import os
import sys
import time
import argparse
from pathlib import Path
from datetime import datetime
from typing import Optional
import numpy as np
# openWakeWord ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ๊ฒฝ๋กœ ์ถ”๊ฐ€
sys.path.insert(0, str(Path(__file__).parent.parent / "openWakeWord"))
try:
from openwakeword.model import Model
print("โœ… openwakeword.Model ์ž„ํฌํŠธ ์„ฑ๊ณต")
except ImportError as e:
print(f"โŒ openwakeword ์ž„ํฌํŠธ ์‹คํŒจ: {e}")
print(" -> 'pip install openwakeword' ์‹คํ–‰ ํ•„์š”")
sys.exit(1)
try:
import sounddevice as sd
print("โœ… sounddevice ์ž„ํฌํŠธ ์„ฑ๊ณต")
except ImportError as e:
print(f"โŒ sounddevice ์ž„ํฌํŠธ ์‹คํŒจ: {e}")
print(" -> 'pip install sounddevice' ์‹คํ–‰ ํ•„์š”")
sys.exit(1)
# ============================================
# ์„ค์ •
# ============================================
DEFAULT_MODEL_PATH = "./my_model.onnx"
SAMPLE_RATE = 16000 # openWakeWord ์š”๊ตฌ์‚ฌํ•ญ: 16kHz
CHUNK_SIZE = 1280 # 80ms ํ”„๋ ˆ์ž„ (16000 * 0.08)
DEFAULT_THRESHOLD = 0.5 # ๊ฐ์ง€ ์ž„๊ณ„๊ฐ’
def print_banner():
"""์‹œ์ž‘ ๋ฐฐ๋„ˆ ์ถœ๋ ฅ"""
print("\n" + "="*60)
print("๐ŸŽค ํ•œ๊ตญ์–ด ํ˜ธ์ถœ์–ด ์‹ค์‹œ๊ฐ„ ๊ฐ์ง€ ์‹œ์Šคํ…œ")
print("="*60)
print("๋ชจ๋ธ์„ ๋กœ๋“œํ•˜๊ณ  ๋งˆ์ดํฌ ์ž…๋ ฅ์„ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค...")
print("์ข…๋ฃŒํ•˜๋ ค๋ฉด Ctrl+C๋ฅผ ๋ˆ„๋ฅด์„ธ์š”.\n")
def check_microphone_permission() -> bool:
"""
๋งˆ์ดํฌ ๊ถŒํ•œ ํ™•์ธ
Returns:
๋งˆ์ดํฌ ์‚ฌ์šฉ ๊ฐ€๋Šฅ ์—ฌ๋ถ€
"""
print("๐Ÿ” ๋งˆ์ดํฌ ๊ถŒํ•œ ํ™•์ธ ์ค‘...")
try:
# ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์˜ค๋””์˜ค ๋””๋ฐ”์ด์Šค ๋ชฉ๋ก ํ™•์ธ
devices = sd.query_devices()
# ์ž…๋ ฅ ๋””๋ฐ”์ด์Šค ์ฐพ๊ธฐ
input_devices = [d for d in devices if d['max_input_channels'] > 0]
if not input_devices:
print("โŒ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์ž…๋ ฅ ๋””๋ฐ”์ด์Šค(๋งˆ์ดํฌ)๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค!")
print(" ์‹œ์Šคํ…œ์˜ ์˜ค๋””์˜ค ์„ค์ •์„ ํ™•์ธํ•ด์ฃผ์„ธ์š”.")
return False
# ๊ธฐ๋ณธ ์ž…๋ ฅ ๋””๋ฐ”์ด์Šค ์ •๋ณด ์ถœ๋ ฅ
default_input = sd.query_devices(kind='input')
print(f"โœ… ๊ธฐ๋ณธ ๋งˆ์ดํฌ: {default_input['name']}")
print(f" ์ตœ๋Œ€ ์ž…๋ ฅ ์ฑ„๋„: {default_input['max_input_channels']}")
print(f" ๊ธฐ๋ณธ ์ƒ˜ํ”Œ ๋ ˆ์ดํŠธ: {default_input['default_samplerate']}")
# ์งง์€ ํ…Œ์ŠคํŠธ ๋…น์Œ
print(" ๋งˆ์ดํฌ ํ…Œ์ŠคํŠธ ๋…น์Œ ์ค‘... ", end="", flush=True)
test_recording = sd.rec(
int(0.1 * SAMPLE_RATE), # 100ms
samplerate=SAMPLE_RATE,
channels=1,
dtype='int16'
)
sd.wait()
print("โœ… ์„ฑ๊ณต!")
return True
except Exception as e:
print(f"โŒ ๋งˆ์ดํฌ ๊ถŒํ•œ ํ™•์ธ ์‹คํŒจ: {e}")
print("\n๐Ÿ’ก ํ•ด๊ฒฐ ๋ฐฉ๋ฒ•:")
print(" 1. ๋งˆ์ดํฌ๊ฐ€ ์—ฐ๊ฒฐ๋˜์–ด ์žˆ๋Š”์ง€ ํ™•์ธ")
print(" 2. ์‹œ์Šคํ…œ ์„ค์ •์—์„œ ๋งˆ์ดํฌ ๊ถŒํ•œ ํ—ˆ์šฉ")
print(" 3. Linux: 'sudo usermod -a -G audio $USER' ์‹คํ–‰ ํ›„ ์žฌ๋กœ๊ทธ์ธ")
return False
def play_beep():
"""
๊ฐ์ง€ ์‹œ ๋น„ํ”„์Œ ์žฌ์ƒ
๊ฐ„๋‹จํ•œ ์‚ฌ์ธํŒŒ ๋น„ํ”„์Œ์„ ์ƒ์„ฑํ•˜์—ฌ ์žฌ์ƒํ•ฉ๋‹ˆ๋‹ค.
"""
try:
duration = 0.15 # 150ms
frequency = 880 # Hz (A5 ์Œ๊ณ„)
t = np.linspace(0, duration, int(SAMPLE_RATE * duration), False)
beep = np.sin(frequency * 2 * np.pi * t) * 0.3
# Fade in/out ์ ์šฉ
fade_samples = int(0.01 * SAMPLE_RATE)
beep[:fade_samples] *= np.linspace(0, 1, fade_samples)
beep[-fade_samples:] *= np.linspace(1, 0, fade_samples)
sd.play(beep.astype(np.float32), SAMPLE_RATE)
except Exception as e:
# ๋น„ํ”„์Œ ์‹คํŒจํ•ด๋„ ๊ณ„์† ์ง„ํ–‰
pass
def run_detection(
model_path: str,
threshold: float = 0.5,
debounce_time: float = 1.0,
log_file: Optional[str] = None
):
"""
์‹ค์‹œ๊ฐ„ ํ˜ธ์ถœ์–ด ๊ฐ์ง€ ์‹คํ–‰
Args:
model_path: ONNX ๋ชจ๋ธ ํŒŒ์ผ ๊ฒฝ๋กœ
threshold: ๊ฐ์ง€ ์ž„๊ณ„๊ฐ’ (0~1)
debounce_time: ์—ฐ์† ๊ฐ์ง€ ๋ฐฉ์ง€ ์‹œ๊ฐ„ (์ดˆ)
log_file: ๊ฐ์ง€ ๋กœ๊ทธ ํŒŒ์ผ ๊ฒฝ๋กœ (์„ ํƒ์‚ฌํ•ญ)
"""
# ๋ชจ๋ธ ํŒŒ์ผ ํ™•์ธ
if not os.path.exists(model_path):
print(f"โŒ ๋ชจ๋ธ ํŒŒ์ผ์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค: {model_path}")
print(" ๋จผ์ € train_model.py๋ฅผ ์‹คํ–‰ํ•˜์—ฌ ๋ชจ๋ธ์„ ํ•™์Šตํ•˜์„ธ์š”.")
sys.exit(1)
# ๋ชจ๋ธ ์ด๋ฆ„ ์ถ”์ถœ (ํŒŒ์ผ๋ช…์—์„œ)
model_name = Path(model_path).stem
print(f"\n๐Ÿ“ ๋ชจ๋ธ ๋กœ๋“œ ์ค‘: {model_path}")
print(f"๐ŸŽฏ ๋ชจ๋ธ ์ด๋ฆ„: {model_name}")
print(f"๐Ÿ”Š ๊ฐ์ง€ ์ž„๊ณ„๊ฐ’: {threshold}")
print(f"โฑ๏ธ Debounce ์‹œ๊ฐ„: {debounce_time}์ดˆ")
# openWakeWord ๋ชจ๋ธ ๋กœ๋“œ
# inference_framework์„ 'onnx'๋กœ ์„ค์ •ํ•˜์—ฌ ONNX ๋ชจ๋ธ ์‚ฌ์šฉ
try:
oww_model = Model(
wakeword_models=[model_path],
inference_framework='onnx'
)
print("โœ… ๋ชจ๋ธ ๋กœ๋“œ ์™„๋ฃŒ!")
except Exception as e:
print(f"โŒ ๋ชจ๋ธ ๋กœ๋“œ ์‹คํŒจ: {e}")
sys.exit(1)
# ๋กœ๊ทธ ํŒŒ์ผ ์„ค์ •
log_handle = None
if log_file:
log_handle = open(log_file, 'a', encoding='utf-8')
log_handle.write(f"\n--- ์„ธ์…˜ ์‹œ์ž‘: {datetime.now()} ---\n")
print(f"๐Ÿ“ ๋กœ๊ทธ ํŒŒ์ผ: {log_file}")
# ๊ฐ์ง€ ์นด์šดํ„ฐ
detection_count = 0
last_detection_time = 0
print("\n" + "-"*60)
print("๐ŸŽง ์‹ค์‹œ๊ฐ„ ๊ฐ์ง€ ์‹œ์ž‘! ํ˜ธ์ถœ์–ด๋ฅผ ๋งํ•ด๋ณด์„ธ์š”.")
print("-"*60 + "\n")
# ์ฝœ๋ฐฑ ํ•จ์ˆ˜ ์ •์˜
def audio_callback(indata, frames, time_info, status):
nonlocal detection_count, last_detection_time
if status:
print(f"โš ๏ธ ์˜ค๋””์˜ค ์ƒํƒœ: {status}")
# ์˜ค๋””์˜ค ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ (int16์œผ๋กœ ๋ณ€ํ™˜)
audio_data = (indata[:, 0] * 32767).astype(np.int16)
# ์˜ˆ์ธก ์ˆ˜ํ–‰
prediction = oww_model.predict(audio_data)
# ๋ชจ๋ธ ์ ์ˆ˜ ํ™•์ธ
score = prediction.get(model_name, 0)
# ์ž„๊ณ„๊ฐ’ ์ดˆ๊ณผ ๋ฐ debounce ํ™•์ธ
current_time = time.time()
if score >= threshold and (current_time - last_detection_time) > debounce_time:
detection_count += 1
last_detection_time = current_time
# ์ฝ˜์†” ์ถœ๋ ฅ
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"\n๐ŸŽฏ [{timestamp}] DETECTED! (Score: {score:.3f}) - #{detection_count}")
# ๋น„ํ”„์Œ ์žฌ์ƒ
play_beep()
# ๋กœ๊ทธ ๊ธฐ๋ก
if log_handle:
log_handle.write(f"[{timestamp}] Score: {score:.3f}\n")
log_handle.flush()
# ํ˜„์žฌ ์ ์ˆ˜ ์‹ค์‹œ๊ฐ„ ํ‘œ์‹œ (๋†’์€ ์ ์ˆ˜๋งŒ)
if score > 0.1:
bar_length = int(score * 30)
bar = "โ–ˆ" * bar_length + "โ–‘" * (30 - bar_length)
print(f"\r[{bar}] {score:.3f}", end="", flush=True)
# ์˜ค๋””์˜ค ์ŠคํŠธ๋ฆผ ์‹œ์ž‘
try:
with sd.InputStream(
samplerate=SAMPLE_RATE,
channels=1,
dtype='float32',
blocksize=CHUNK_SIZE,
callback=audio_callback
):
print("(๋งˆ์ดํฌ ์ŠคํŠธ๋ฆผ ํ™œ์„ฑํ™”๋จ)\n")
# ๋ฌดํ•œ ๋ฃจํ”„ (Ctrl+C๋กœ ์ข…๋ฃŒ)
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print("\n\n" + "="*60)
print("๐Ÿ›‘ ๊ฐ์ง€ ์ข…๋ฃŒ")
print("="*60)
print(f"๐Ÿ“Š ์ด ๊ฐ์ง€ ํšŸ์ˆ˜: {detection_count}ํšŒ")
if log_handle:
log_handle.write(f"--- ์„ธ์…˜ ์ข…๋ฃŒ: {datetime.now()} ---\n")
log_handle.write(f"์ด ๊ฐ์ง€: {detection_count}ํšŒ\n")
log_handle.close()
print(f"๐Ÿ“ ๋กœ๊ทธ ์ €์žฅ๋จ: {log_file}")
except Exception as e:
print(f"\nโŒ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {e}")
if log_handle:
log_handle.close()
raise
def main():
"""๋ฉ”์ธ ํ•จ์ˆ˜"""
parser = argparse.ArgumentParser(
description="์‹ค์‹œ๊ฐ„ ํ˜ธ์ถœ์–ด ๊ฐ์ง€ (๋งˆ์ดํฌ ์ž…๋ ฅ)"
)
parser.add_argument(
"--model", "-m",
type=str,
default=DEFAULT_MODEL_PATH,
help=f"ONNX ๋ชจ๋ธ ํŒŒ์ผ ๊ฒฝ๋กœ (๊ธฐ๋ณธ๊ฐ’: {DEFAULT_MODEL_PATH})"
)
parser.add_argument(
"--threshold", "-t",
type=float,
default=DEFAULT_THRESHOLD,
help=f"๊ฐ์ง€ ์ž„๊ณ„๊ฐ’ 0~1 (๊ธฐ๋ณธ๊ฐ’: {DEFAULT_THRESHOLD})"
)
parser.add_argument(
"--debounce", "-d",
type=float,
default=1.0,
help="์—ฐ์† ๊ฐ์ง€ ๋ฐฉ์ง€ ์‹œ๊ฐ„(์ดˆ) (๊ธฐ๋ณธ๊ฐ’: 1.0)"
)
parser.add_argument(
"--log", "-l",
type=str,
default=None,
help="๊ฐ์ง€ ๋กœ๊ทธ ํŒŒ์ผ ๊ฒฝ๋กœ (์„ ํƒ์‚ฌํ•ญ)"
)
parser.add_argument(
"--list-devices",
action="store_true",
help="์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์˜ค๋””์˜ค ๋””๋ฐ”์ด์Šค ๋ชฉ๋ก ์ถœ๋ ฅ"
)
args = parser.parse_args()
# ๋””๋ฐ”์ด์Šค ๋ชฉ๋ก ์ถœ๋ ฅ ์˜ต์…˜
if args.list_devices:
print("\n๐Ÿ“‹ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์˜ค๋””์˜ค ๋””๋ฐ”์ด์Šค:")
print("-"*60)
print(sd.query_devices())
return
# ๋ฐฐ๋„ˆ ์ถœ๋ ฅ
print_banner()
# ๋งˆ์ดํฌ ๊ถŒํ•œ ํ™•์ธ
if not check_microphone_permission():
sys.exit(1)
# ์‹ค์‹œ๊ฐ„ ๊ฐ์ง€ ์‹คํ–‰
run_detection(
model_path=args.model,
threshold=args.threshold,
debounce_time=args.debounce,
log_file=args.log
)
if __name__ == "__main__":
main()