#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ============================================================================== 한국어 호출어 모델 학습 스크립트 ============================================================================== openWakeWord의 Transfer Learning 방식을 사용하여 커스텀 호출어 모델을 학습합니다. 학습 프로세스: 1. positive 데이터(호출어 음성)에서 embedding 추출 2. negative 데이터(비호출어 음성)에서 embedding 추출 3. 간단한 DNN 분류기 학습 4. ONNX 모델로 내보내기 사용법: python train_model.py --positive_dir ./positive --model_name my_model """ import os import sys from pathlib import Path from typing import List, Tuple, Optional import glob import importlib.util import numpy as np import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset from tqdm import tqdm import soundfile as sf from scipy import signal as scipy_signal # openWakeWord 라이브러리 경로 추가 (로컬 리포지토리 사용) sys.path.insert(0, str(Path(__file__).parent.parent / "openWakeWord")) try: from openwakeword.utils import AudioFeatures print("✅ openwakeword.utils.AudioFeatures 임포트 성공") except ImportError as e: print(f"❌ openwakeword 임포트 실패: {e}") print(" -> 'pip install openwakeword' 실행 필요") sys.exit(1) # ============================================ # 설정 # ============================================ POSITIVE_DIR = "./positive" # positive 데이터 디렉토리 MODEL_NAME = "my_model" # 출력 모델 이름 OUTPUT_DIR = "./" # 출력 디렉토리 SAMPLE_RATE = 16000 # openWakeWord 요구사항 CLIP_DURATION_SAMPLES = 32000 # 2초 클립 (16000 * 2) # 일반 대화 음성 negative 기본 경로 (Keyword-Spotting 데이터) DEFAULT_NEGATIVE_DIR = "/home/dusen0528/Keyword-Spotting/data" # 학습 하이퍼파라미터 (GPU 사용 시 BATCH_SIZE 자동 확대) EPOCHS = 50 BATCH_SIZE = 32 # CPU 기본값, GPU 시 train_model()에서 128로 확대 LEARNING_RATE = 0.001 LAYER_DIM = 128 # DNN 히든 레이어 크기 N_BLOCKS = 1 # DNN 블록 수 class WakeWordModel(nn.Module): """ 간단한 Wake Word 분류 모델 openWakeWord embedding (96차원)을 입력받아 이진 분류를 수행합니다. """ def __init__(self, input_shape: Tuple[int, int], layer_dim: int = 128, n_blocks: int = 1): """ Args: input_shape: (timesteps, features) - 예: (16, 96) layer_dim: 히든 레이어 차원 n_blocks: 추가 블록 수 """ super().__init__() self.input_shape = input_shape flat_size = input_shape[0] * input_shape[1] # 첫 번째 레이어 layers = [ nn.Flatten(), nn.Linear(flat_size, layer_dim), nn.LayerNorm(layer_dim), nn.ReLU() ] # 추가 블록 for _ in range(n_blocks): layers.extend([ nn.Linear(layer_dim, layer_dim), nn.LayerNorm(layer_dim), nn.ReLU() ]) # 출력 레이어 layers.extend([ nn.Linear(layer_dim, 1), nn.Sigmoid() ]) self.model = nn.Sequential(*layers) def forward(self, x): return self.model(x) def load_audio_files(directory: str, max_files: Optional[int] = None) -> List[np.ndarray]: """ 디렉토리에서 WAV 파일들을 로드 Args: directory: WAV 파일들이 있는 디렉토리 max_files: 최대 로드할 파일 수 (None이면 전체) Returns: 오디오 데이터 리스트 (각 요소는 16kHz, 16-bit PCM numpy array) """ wav_files = sorted(Path(directory).glob("*.wav")) if max_files: wav_files = wav_files[:max_files] audio_data = [] for wav_file in tqdm(wav_files, desc=f"{directory} 로드 중"): try: # 오디오 파일 읽기 (24kHz 등이면 float로 읽어서 리샘플) data, sr = sf.read(str(wav_file), dtype='float64') if len(data.shape) > 1: data = data[:, 0] if sr != SAMPLE_RATE: data = _resample_to_16k(data, sr) else: data = (np.clip(data, -1.0, 1.0) * 32767).astype(np.int16) # 클립 길이 조정 (패딩 또는 자르기) if len(data) < CLIP_DURATION_SAMPLES: # 앞쪽에 패딩 추가 (호출어가 끝에 오도록) padded = np.zeros(CLIP_DURATION_SAMPLES, dtype=np.int16) start_idx = CLIP_DURATION_SAMPLES - len(data) padded[start_idx:] = data data = padded elif len(data) > CLIP_DURATION_SAMPLES: # 끝부분 유지 (호출어가 끝에 있다고 가정) data = data[-CLIP_DURATION_SAMPLES:] audio_data.append(data) except Exception as e: print(f"❌ {wav_file.name} 로드 실패: {e}") return audio_data def _resample_to_16k(audio: np.ndarray, orig_sr: int) -> np.ndarray: """오디오를 16kHz로 리샘플링 후 int16 반환.""" if orig_sr == SAMPLE_RATE: return (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16) if audio.dtype == np.float64 or audio.dtype == np.float32 else audio.astype(np.int16) n = int(len(audio) * SAMPLE_RATE / orig_sr) out = scipy_signal.resample(audio, n) out = np.clip(out, -1.0, 1.0) return (out * 32767).astype(np.int16) def load_negative_from_dir( negative_dir: str, num_samples: int, clip_length: int = CLIP_DURATION_SAMPLES, ) -> List[np.ndarray]: """ 일반 대화 음성 디렉토리에서 negative 클립 로드. 파일이 44.1kHz 등이면 16kHz로 리샘플링 후, 2초(32000샘플) 구간을 랜덤 시작으로 잘라 사용. Args: negative_dir: WAV 파일들이 있는 디렉토리 num_samples: 필요한 negative 클립 수 clip_length: 클립 길이(샘플 수) Returns: 16kHz, int16, 길이 clip_length인 오디오 배열 리스트 """ wav_files = sorted(Path(negative_dir).glob("*.wav")) if not wav_files: return [] negative_data = [] rng = np.random.default_rng() for _ in tqdm(range(num_samples), desc="Negative(일반대화) 로드 중"): # 무작위 파일 선택 fpath = str(rng.choice(wav_files)) try: data, sr = sf.read(fpath, dtype="float64") except Exception: continue if len(data) == 0: continue if data.ndim > 1: data = data[:, 0] # 16kHz로 리샘플 data_16k = _resample_to_16k(data, sr) if len(data_16k) < clip_length: # 패딩 pad = np.zeros(clip_length - len(data_16k), dtype=np.int16) data_16k = np.concatenate([data_16k, pad]) else: # 랜덤 구간 슬라이스 start = rng.integers(0, len(data_16k) - clip_length + 1) data_16k = data_16k[start : start + clip_length] negative_data.append(data_16k) return negative_data def generate_negative_data(num_samples: int) -> List[np.ndarray]: """ Negative 데이터 생성 (무작위 노이즈 + 무음). negative_dir을 쓰지 않을 때만 사용. """ negative_data = [] for i in tqdm(range(num_samples), desc="Negative(노이즈) 생성 중"): if np.random.random() < 0.8: noise_level = np.random.uniform(100, 1000) data = np.random.normal(0, noise_level, CLIP_DURATION_SAMPLES).astype(np.int16) else: data = np.zeros(CLIP_DURATION_SAMPLES, dtype=np.int16) negative_data.append(data) return negative_data def extract_embeddings( audio_clips: List[np.ndarray], feature_extractor: AudioFeatures, use_gpu: bool = True, ) -> np.ndarray: """ 오디오 클립들에서 openWakeWord embedding 추출 Args: audio_clips: 오디오 데이터 리스트 feature_extractor: AudioFeatures 인스턴스 use_gpu: GPU 사용 시 True (배치 크기 확대) Returns: (N, timesteps, 96) 형태의 embedding array """ # 배열로 변환 clips_array = np.vstack([c[None, :] for c in audio_clips]) # GPU 사용 시 배치 크기 증가로 속도 향상 batch_size = 128 if (use_gpu and torch.cuda.is_available()) else 32 print(f"\n📊 Embedding 추출 중... (총 {len(clips_array)}개 클립, batch_size={batch_size})") # embed_clips 메서드 사용 embeddings = feature_extractor.embed_clips(clips_array, batch_size=batch_size) print(f" Embedding 형태: {embeddings.shape}") return embeddings def _resolve_openwakeword_resource_paths() -> Tuple[Optional[str], Optional[str]]: """ openwakeword 기본 리소스 경로를 찾는다. 1) 로컬 리포(openWakeWord/openwakeword/resources/models) 2) 현재 venv site-packages/openwakeword/resources/models """ local_models_dir = Path(__file__).parent.parent / "openWakeWord" / "openwakeword" / "resources" / "models" mel_local = local_models_dir / "melspectrogram.onnx" emb_local = local_models_dir / "embedding_model.onnx" if mel_local.exists() and emb_local.exists(): return str(mel_local), str(emb_local) # venv site-packages fallback venv_base = Path(__file__).parent / ".venv" / "lib" candidates = sorted(glob.glob(str(venv_base / "python*" / "site-packages" / "openwakeword" / "resources" / "models"))) for c in candidates: cdir = Path(c) mel = cdir / "melspectrogram.onnx" emb = cdir / "embedding_model.onnx" if mel.exists() and emb.exists(): return str(mel), str(emb) return None, None def _check_onnx_export_dependencies() -> None: """ ONNX 내보내기 필수 모듈 사전 점검. 학습 50 epoch 후 export 단계에서 실패하는 시간 낭비를 방지한다. """ missing = [] for mod in ("onnx", "onnxscript"): if importlib.util.find_spec(mod) is None: missing.append(mod) if missing: mods = ", ".join(missing) print("\n❌ ONNX export 의존성 누락:", mods) print(" 현재 Python:", sys.executable) print(" 아래를 먼저 실행하세요:") print(" python -m ensurepip --upgrade") print(" python -m pip install -U onnx onnxscript") sys.exit(1) def train_model( positive_embeddings: np.ndarray, negative_embeddings: np.ndarray, epochs: int = 50, batch_size: int = 32, learning_rate: float = 0.001, use_gpu: bool = True, ) -> Tuple[WakeWordModel, dict]: """ Wake Word 모델 학습 Args: positive_embeddings: Positive 샘플 embeddings negative_embeddings: Negative 샘플 embeddings epochs: 학습 에폭 수 batch_size: 배치 크기 learning_rate: 학습률 Returns: 학습된 모델과 학습 히스토리 """ # 입력 형태 결정 (timesteps, features) # openWakeWord 모델은 보통 16 timesteps를 사용 n_timesteps = 16 # 고정값 n_features = positive_embeddings.shape[-1] # 96 input_shape = (n_timesteps, n_features) print(f"\n{'='*60}") print(f"모델 학습 시작") print(f"Input shape: {input_shape}") print(f"Positive 샘플: {len(positive_embeddings)}개") print(f"Negative 샘플: {len(negative_embeddings)}개") print(f"{'='*60}\n") # 데이터 준비 - 마지막 16 timesteps만 사용 def prepare_features(embeddings: np.ndarray, n_timesteps: int = 16) -> np.ndarray: """Embedding에서 마지막 n_timesteps 프레임 추출""" prepared = [] for emb in embeddings: if emb.shape[0] >= n_timesteps: prepared.append(emb[-n_timesteps:]) else: # 패딩 필요한 경우 padded = np.zeros((n_timesteps, n_features)) padded[-emb.shape[0]:] = emb prepared.append(padded) return np.array(prepared) X_pos = prepare_features(positive_embeddings, n_timesteps) X_neg = prepare_features(negative_embeddings, n_timesteps) # 데이터셋 생성 X = np.vstack([X_pos, X_neg]).astype(np.float32) y = np.hstack([ np.ones(len(X_pos)), np.zeros(len(X_neg)) ]).astype(np.float32) # 셔플 indices = np.random.permutation(len(X)) X, y = X[indices], y[indices] # Train/Validation 분할 (80/20) split_idx = int(len(X) * 0.8) X_train, X_val = X[:split_idx], X[split_idx:] y_train, y_val = y[:split_idx], y[split_idx:] # 모델 생성 (use_gpu=True이고 CUDA 가능 시 GPU 사용) device = torch.device('cuda' if (use_gpu and torch.cuda.is_available()) else 'cpu') if device.type == 'cuda': print(f"🖥️ 학습 디바이스: GPU ({torch.cuda.get_device_name(0)})") batch_size = min(128, max(BATCH_SIZE, len(X_train) // 16)) # GPU 시 배치 확대 else: print(f"🖥️ 학습 디바이스: CPU") batch_size = BATCH_SIZE # PyTorch DataLoader 생성 train_dataset = TensorDataset( torch.from_numpy(X_train), torch.from_numpy(y_train) ) val_dataset = TensorDataset( torch.from_numpy(X_val), torch.from_numpy(y_val) ) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=(device.type == 'cuda')) val_loader = DataLoader(val_dataset, batch_size=batch_size, pin_memory=(device.type == 'cuda')) model = WakeWordModel(input_shape=input_shape, layer_dim=LAYER_DIM, n_blocks=N_BLOCKS) model = model.to(device) # 손실 함수와 옵티마이저 criterion = nn.BCELoss() optimizer = optim.Adam(model.parameters(), lr=learning_rate) # 학습 히스토리 history = { 'train_loss': [], 'val_loss': [], 'val_accuracy': [] } best_val_loss = float('inf') best_model_state = None # 학습 루프 for epoch in range(epochs): # Training model.train() train_losses = [] for X_batch, y_batch in train_loader: X_batch = X_batch.to(device) y_batch = y_batch.to(device).unsqueeze(1) optimizer.zero_grad() outputs = model(X_batch) loss = criterion(outputs, y_batch) loss.backward() optimizer.step() train_losses.append(loss.item()) # Validation model.eval() val_losses = [] correct = 0 total = 0 with torch.no_grad(): for X_batch, y_batch in val_loader: X_batch = X_batch.to(device) y_batch = y_batch.to(device).unsqueeze(1) outputs = model(X_batch) loss = criterion(outputs, y_batch) val_losses.append(loss.item()) predicted = (outputs > 0.5).float() total += y_batch.size(0) correct += (predicted == y_batch).sum().item() avg_train_loss = np.mean(train_losses) avg_val_loss = np.mean(val_losses) val_accuracy = correct / total history['train_loss'].append(avg_train_loss) history['val_loss'].append(avg_val_loss) history['val_accuracy'].append(val_accuracy) # 최고 모델 저장 if avg_val_loss < best_val_loss: best_val_loss = avg_val_loss best_model_state = model.state_dict().copy() # 진행 상황 출력 if (epoch + 1) % 10 == 0 or epoch == 0: print(f"Epoch [{epoch+1}/{epochs}] " f"Train Loss: {avg_train_loss:.4f} | " f"Val Loss: {avg_val_loss:.4f} | " f"Val Acc: {val_accuracy:.4f}") # 최고 성능 모델로 복원 if best_model_state: model.load_state_dict(best_model_state) return model, history def export_to_onnx( model: WakeWordModel, model_name: str, output_dir: str ) -> str: """ 학습된 모델을 ONNX 형식으로 내보내기 Args: model: 학습된 PyTorch 모델 model_name: 모델 이름 output_dir: 출력 디렉토리 Returns: 저장된 ONNX 파일 경로 """ model.eval() model = model.to('cpu') # 더미 입력 생성 dummy_input = torch.randn(1, *model.input_shape) # 출력 경로 onnx_path = os.path.join(output_dir, f"{model_name}.onnx") # ONNX 내보내기 torch.onnx.export( model, dummy_input, onnx_path, export_params=True, opset_version=13, do_constant_folding=True, input_names=['input'], output_names=[model_name], dynamic_axes={ 'input': {0: 'batch_size'}, model_name: {0: 'batch_size'} } ) print(f"\n✅ ONNX 모델 저장 완료: {onnx_path}") return onnx_path def main(): """메인 함수""" import argparse parser = argparse.ArgumentParser( description="한국어 호출어 모델 학습기" ) parser.add_argument( "--positive_dir", "-p", type=str, default=POSITIVE_DIR, help=f"Positive 데이터 디렉토리 (기본값: {POSITIVE_DIR})" ) parser.add_argument( "--model_name", "-m", type=str, default=MODEL_NAME, help=f"출력 모델 이름 (기본값: {MODEL_NAME})" ) parser.add_argument( "--output_dir", "-o", type=str, default=OUTPUT_DIR, help=f"출력 디렉토리 (기본값: {OUTPUT_DIR})" ) parser.add_argument( "--epochs", "-e", type=int, default=EPOCHS, help=f"학습 에폭 수 (기본값: {EPOCHS})" ) parser.add_argument( "--negative_ratio", type=float, default=3.0, help="Positive 대비 Negative 샘플 비율 (기본값: 3.0)" ) parser.add_argument( "--negative_dir", "-n", type=str, default=DEFAULT_NEGATIVE_DIR, help=f"일반 대화 음성(negative) WAV 디렉토리. 비우면 노이즈만 사용 (기본: {DEFAULT_NEGATIVE_DIR})" ) parser.add_argument( "--no-gpu", action="store_true", help="GPU 비활성화 (CPU만 사용)" ) args = parser.parse_args() # ONNX export 의존성 사전 점검 (학습 시작 전에 실패 처리) _check_onnx_export_dependencies() # GPU 사용 여부 (--no-gpu면 CPU 강제) use_gpu_flag = torch.cuda.is_available() and not getattr(args, 'no_gpu', False) # 출력 디렉토리 생성 Path(args.output_dir).mkdir(parents=True, exist_ok=True) print("\n" + "="*60) print("🎤 한국어 호출어 모델 학습 파이프라인") print("="*60) # Step 1: Positive 데이터 로드 print("\n[Step 1/5] Positive 데이터 로드") positive_clips = load_audio_files(args.positive_dir) if len(positive_clips) == 0: print("❌ Positive 데이터가 없습니다! 먼저 generate_data.py를 실행하세요.") sys.exit(1) print(f" ✅ {len(positive_clips)}개 Positive 클립 로드 완료") # Step 2: Negative 데이터 (일반 대화 음성 우선, 없으면 노이즈 생성) print("\n[Step 2/5] Negative 데이터 준비") num_negative = int(len(positive_clips) * args.negative_ratio) negative_dir = (args.negative_dir or "").strip() if negative_dir and Path(negative_dir).is_dir() and list(Path(negative_dir).glob("*.wav")): negative_clips = load_negative_from_dir(negative_dir, num_negative) print(f" ✅ {len(negative_clips)}개 Negative 클립 로드 (일반 대화 음성: {negative_dir})") else: negative_clips = generate_negative_data(num_negative) print(f" ✅ {len(negative_clips)}개 Negative 클립 생성 (노이즈)") # Step 3: Feature 추출 (GPU 사용 가능 시 자동 사용) print("\n[Step 3/5] openWakeWord Embedding 추출") feat_device = 'gpu' if use_gpu_flag else 'cpu' if use_gpu_flag: print(f" 🖥️ GPU 사용: {torch.cuda.get_device_name(0)}") else: print(" 🖥️ CPU 사용") melspec_model_path, embedding_model_path = _resolve_openwakeword_resource_paths() if melspec_model_path and embedding_model_path: feature_extractor = AudioFeatures( inference_framework='onnx', device=feat_device, melspec_model_path=melspec_model_path, embedding_model_path=embedding_model_path, ) else: feature_extractor = AudioFeatures(inference_framework='onnx', device=feat_device) positive_embeddings = extract_embeddings(positive_clips, feature_extractor, use_gpu=use_gpu_flag) negative_embeddings = extract_embeddings(negative_clips, feature_extractor, use_gpu=use_gpu_flag) print(f" ✅ Positive embeddings: {positive_embeddings.shape}") print(f" ✅ Negative embeddings: {negative_embeddings.shape}") # Step 4: 모델 학습 print("\n[Step 4/5] 모델 학습") model, history = train_model( positive_embeddings, negative_embeddings, epochs=args.epochs, batch_size=BATCH_SIZE, learning_rate=LEARNING_RATE, use_gpu=use_gpu_flag, ) # Step 5: ONNX 내보내기 print("\n[Step 5/5] ONNX 모델 내보내기") onnx_path = export_to_onnx(model, args.model_name, args.output_dir) # 최종 결과 출력 print("\n" + "="*60) print("🎉 학습 완료!") print("="*60) print(f"📁 ONNX 모델: {onnx_path}") print(f"📊 최종 Validation Accuracy: {history['val_accuracy'][-1]:.4f}") print(f"\n💡 실시간 추론을 실행하려면:") print(f" python run_live.py --model {onnx_path}") if __name__ == "__main__": main()