| |
| |
| """ |
| ============================================================================== |
| ์ค์๊ฐ ํธ์ถ์ด ๊ฐ์ง ์คํฌ๋ฆฝํธ |
| ============================================================================== |
| ๋ง์ดํฌ ์
๋ ฅ์ ๋ฐ์ ํ์ต๋ ์ปค์คํ
๋ชจ๋ธ๋ก ์ค์๊ฐ ํธ์ถ์ด ๊ฐ์ง๋ฅผ ์ํํฉ๋๋ค. |
| |
| ์ฌ์ฉ๋ฒ: |
| python run_live.py --model ./my_model.onnx --threshold 0.5 |
| """ |
|
|
| import os |
| import sys |
| import time |
| import argparse |
| from pathlib import Path |
| from datetime import datetime |
| from typing import Optional |
|
|
| import numpy as np |
|
|
| |
| sys.path.insert(0, str(Path(__file__).parent.parent / "openWakeWord")) |
|
|
| try: |
| from openwakeword.model import Model |
| print("โ
openwakeword.Model ์ํฌํธ ์ฑ๊ณต") |
| except ImportError as e: |
| print(f"โ openwakeword ์ํฌํธ ์คํจ: {e}") |
| print(" -> 'pip install openwakeword' ์คํ ํ์") |
| sys.exit(1) |
|
|
| try: |
| import sounddevice as sd |
| print("โ
sounddevice ์ํฌํธ ์ฑ๊ณต") |
| except ImportError as e: |
| print(f"โ sounddevice ์ํฌํธ ์คํจ: {e}") |
| print(" -> 'pip install sounddevice' ์คํ ํ์") |
| sys.exit(1) |
|
|
|
|
| |
| |
| |
| DEFAULT_MODEL_PATH = "./my_model.onnx" |
| SAMPLE_RATE = 16000 |
| CHUNK_SIZE = 1280 |
| DEFAULT_THRESHOLD = 0.5 |
|
|
|
|
| def print_banner(): |
| """์์ ๋ฐฐ๋ ์ถ๋ ฅ""" |
| print("\n" + "="*60) |
| print("๐ค ํ๊ตญ์ด ํธ์ถ์ด ์ค์๊ฐ ๊ฐ์ง ์์คํ
") |
| print("="*60) |
| print("๋ชจ๋ธ์ ๋ก๋ํ๊ณ ๋ง์ดํฌ ์
๋ ฅ์ ์์ํฉ๋๋ค...") |
| print("์ข
๋ฃํ๋ ค๋ฉด Ctrl+C๋ฅผ ๋๋ฅด์ธ์.\n") |
|
|
|
|
| def check_microphone_permission() -> bool: |
| """ |
| ๋ง์ดํฌ ๊ถํ ํ์ธ |
| |
| Returns: |
| ๋ง์ดํฌ ์ฌ์ฉ ๊ฐ๋ฅ ์ฌ๋ถ |
| """ |
| print("๐ ๋ง์ดํฌ ๊ถํ ํ์ธ ์ค...") |
| |
| try: |
| |
| devices = sd.query_devices() |
| |
| |
| input_devices = [d for d in devices if d['max_input_channels'] > 0] |
| |
| if not input_devices: |
| print("โ ์ฌ์ฉ ๊ฐ๋ฅํ ์
๋ ฅ ๋๋ฐ์ด์ค(๋ง์ดํฌ)๊ฐ ์์ต๋๋ค!") |
| print(" ์์คํ
์ ์ค๋์ค ์ค์ ์ ํ์ธํด์ฃผ์ธ์.") |
| return False |
| |
| |
| default_input = sd.query_devices(kind='input') |
| print(f"โ
๊ธฐ๋ณธ ๋ง์ดํฌ: {default_input['name']}") |
| print(f" ์ต๋ ์
๋ ฅ ์ฑ๋: {default_input['max_input_channels']}") |
| print(f" ๊ธฐ๋ณธ ์ํ ๋ ์ดํธ: {default_input['default_samplerate']}") |
| |
| |
| print(" ๋ง์ดํฌ ํ
์คํธ ๋
น์ ์ค... ", end="", flush=True) |
| test_recording = sd.rec( |
| int(0.1 * SAMPLE_RATE), |
| samplerate=SAMPLE_RATE, |
| channels=1, |
| dtype='int16' |
| ) |
| sd.wait() |
| print("โ
์ฑ๊ณต!") |
| |
| return True |
| |
| except Exception as e: |
| print(f"โ ๋ง์ดํฌ ๊ถํ ํ์ธ ์คํจ: {e}") |
| print("\n๐ก ํด๊ฒฐ ๋ฐฉ๋ฒ:") |
| print(" 1. ๋ง์ดํฌ๊ฐ ์ฐ๊ฒฐ๋์ด ์๋์ง ํ์ธ") |
| print(" 2. ์์คํ
์ค์ ์์ ๋ง์ดํฌ ๊ถํ ํ์ฉ") |
| print(" 3. Linux: 'sudo usermod -a -G audio $USER' ์คํ ํ ์ฌ๋ก๊ทธ์ธ") |
| return False |
|
|
|
|
| def play_beep(): |
| """ |
| ๊ฐ์ง ์ ๋นํ์ ์ฌ์ |
| |
| ๊ฐ๋จํ ์ฌ์ธํ ๋นํ์์ ์์ฑํ์ฌ ์ฌ์ํฉ๋๋ค. |
| """ |
| try: |
| duration = 0.15 |
| frequency = 880 |
| |
| t = np.linspace(0, duration, int(SAMPLE_RATE * duration), False) |
| beep = np.sin(frequency * 2 * np.pi * t) * 0.3 |
| |
| |
| fade_samples = int(0.01 * SAMPLE_RATE) |
| beep[:fade_samples] *= np.linspace(0, 1, fade_samples) |
| beep[-fade_samples:] *= np.linspace(1, 0, fade_samples) |
| |
| sd.play(beep.astype(np.float32), SAMPLE_RATE) |
| |
| except Exception as e: |
| |
| pass |
|
|
|
|
| def run_detection( |
| model_path: str, |
| threshold: float = 0.5, |
| debounce_time: float = 1.0, |
| log_file: Optional[str] = None |
| ): |
| """ |
| ์ค์๊ฐ ํธ์ถ์ด ๊ฐ์ง ์คํ |
| |
| Args: |
| model_path: ONNX ๋ชจ๋ธ ํ์ผ ๊ฒฝ๋ก |
| threshold: ๊ฐ์ง ์๊ณ๊ฐ (0~1) |
| debounce_time: ์ฐ์ ๊ฐ์ง ๋ฐฉ์ง ์๊ฐ (์ด) |
| log_file: ๊ฐ์ง ๋ก๊ทธ ํ์ผ ๊ฒฝ๋ก (์ ํ์ฌํญ) |
| """ |
| |
| if not os.path.exists(model_path): |
| print(f"โ ๋ชจ๋ธ ํ์ผ์ ์ฐพ์ ์ ์์ต๋๋ค: {model_path}") |
| print(" ๋จผ์ train_model.py๋ฅผ ์คํํ์ฌ ๋ชจ๋ธ์ ํ์ตํ์ธ์.") |
| sys.exit(1) |
| |
| |
| model_name = Path(model_path).stem |
| |
| print(f"\n๐ ๋ชจ๋ธ ๋ก๋ ์ค: {model_path}") |
| print(f"๐ฏ ๋ชจ๋ธ ์ด๋ฆ: {model_name}") |
| print(f"๐ ๊ฐ์ง ์๊ณ๊ฐ: {threshold}") |
| print(f"โฑ๏ธ Debounce ์๊ฐ: {debounce_time}์ด") |
| |
| |
| |
| try: |
| oww_model = Model( |
| wakeword_models=[model_path], |
| inference_framework='onnx' |
| ) |
| print("โ
๋ชจ๋ธ ๋ก๋ ์๋ฃ!") |
| except Exception as e: |
| print(f"โ ๋ชจ๋ธ ๋ก๋ ์คํจ: {e}") |
| sys.exit(1) |
| |
| |
| log_handle = None |
| if log_file: |
| log_handle = open(log_file, 'a', encoding='utf-8') |
| log_handle.write(f"\n--- ์ธ์
์์: {datetime.now()} ---\n") |
| print(f"๐ ๋ก๊ทธ ํ์ผ: {log_file}") |
| |
| |
| detection_count = 0 |
| last_detection_time = 0 |
| |
| print("\n" + "-"*60) |
| print("๐ง ์ค์๊ฐ ๊ฐ์ง ์์! ํธ์ถ์ด๋ฅผ ๋งํด๋ณด์ธ์.") |
| print("-"*60 + "\n") |
| |
| |
| def audio_callback(indata, frames, time_info, status): |
| nonlocal detection_count, last_detection_time |
| |
| if status: |
| print(f"โ ๏ธ ์ค๋์ค ์ํ: {status}") |
| |
| |
| audio_data = (indata[:, 0] * 32767).astype(np.int16) |
| |
| |
| prediction = oww_model.predict(audio_data) |
| |
| |
| score = prediction.get(model_name, 0) |
| |
| |
| current_time = time.time() |
| if score >= threshold and (current_time - last_detection_time) > debounce_time: |
| detection_count += 1 |
| last_detection_time = current_time |
| |
| |
| timestamp = datetime.now().strftime("%H:%M:%S") |
| print(f"\n๐ฏ [{timestamp}] DETECTED! (Score: {score:.3f}) - #{detection_count}") |
| |
| |
| play_beep() |
| |
| |
| if log_handle: |
| log_handle.write(f"[{timestamp}] Score: {score:.3f}\n") |
| log_handle.flush() |
| |
| |
| if score > 0.1: |
| bar_length = int(score * 30) |
| bar = "โ" * bar_length + "โ" * (30 - bar_length) |
| print(f"\r[{bar}] {score:.3f}", end="", flush=True) |
| |
| |
| try: |
| with sd.InputStream( |
| samplerate=SAMPLE_RATE, |
| channels=1, |
| dtype='float32', |
| blocksize=CHUNK_SIZE, |
| callback=audio_callback |
| ): |
| print("(๋ง์ดํฌ ์คํธ๋ฆผ ํ์ฑํ๋จ)\n") |
| |
| |
| while True: |
| time.sleep(0.1) |
| |
| except KeyboardInterrupt: |
| print("\n\n" + "="*60) |
| print("๐ ๊ฐ์ง ์ข
๋ฃ") |
| print("="*60) |
| print(f"๐ ์ด ๊ฐ์ง ํ์: {detection_count}ํ") |
| |
| if log_handle: |
| log_handle.write(f"--- ์ธ์
์ข
๋ฃ: {datetime.now()} ---\n") |
| log_handle.write(f"์ด ๊ฐ์ง: {detection_count}ํ\n") |
| log_handle.close() |
| print(f"๐ ๋ก๊ทธ ์ ์ฅ๋จ: {log_file}") |
| |
| except Exception as e: |
| print(f"\nโ ์ค๋ฅ ๋ฐ์: {e}") |
| if log_handle: |
| log_handle.close() |
| raise |
|
|
|
|
| def main(): |
| """๋ฉ์ธ ํจ์""" |
| parser = argparse.ArgumentParser( |
| description="์ค์๊ฐ ํธ์ถ์ด ๊ฐ์ง (๋ง์ดํฌ ์
๋ ฅ)" |
| ) |
| parser.add_argument( |
| "--model", "-m", |
| type=str, |
| default=DEFAULT_MODEL_PATH, |
| help=f"ONNX ๋ชจ๋ธ ํ์ผ ๊ฒฝ๋ก (๊ธฐ๋ณธ๊ฐ: {DEFAULT_MODEL_PATH})" |
| ) |
| parser.add_argument( |
| "--threshold", "-t", |
| type=float, |
| default=DEFAULT_THRESHOLD, |
| help=f"๊ฐ์ง ์๊ณ๊ฐ 0~1 (๊ธฐ๋ณธ๊ฐ: {DEFAULT_THRESHOLD})" |
| ) |
| parser.add_argument( |
| "--debounce", "-d", |
| type=float, |
| default=1.0, |
| help="์ฐ์ ๊ฐ์ง ๋ฐฉ์ง ์๊ฐ(์ด) (๊ธฐ๋ณธ๊ฐ: 1.0)" |
| ) |
| parser.add_argument( |
| "--log", "-l", |
| type=str, |
| default=None, |
| help="๊ฐ์ง ๋ก๊ทธ ํ์ผ ๊ฒฝ๋ก (์ ํ์ฌํญ)" |
| ) |
| parser.add_argument( |
| "--list-devices", |
| action="store_true", |
| help="์ฌ์ฉ ๊ฐ๋ฅํ ์ค๋์ค ๋๋ฐ์ด์ค ๋ชฉ๋ก ์ถ๋ ฅ" |
| ) |
| |
| args = parser.parse_args() |
| |
| |
| if args.list_devices: |
| print("\n๐ ์ฌ์ฉ ๊ฐ๋ฅํ ์ค๋์ค ๋๋ฐ์ด์ค:") |
| print("-"*60) |
| print(sd.query_devices()) |
| return |
| |
| |
| print_banner() |
| |
| |
| if not check_microphone_permission(): |
| sys.exit(1) |
| |
| |
| run_detection( |
| model_path=args.model, |
| threshold=args.threshold, |
| debounce_time=args.debounce, |
| log_file=args.log |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|