| | |
| | |
| | """ |
| | ============================================================================== |
| | ํ๊ตญ์ด ํธ์ถ์ด ๋ชจ๋ธ ํ์ต ์คํฌ๋ฆฝํธ |
| | ============================================================================== |
| | openWakeWord์ Transfer Learning ๋ฐฉ์์ ์ฌ์ฉํ์ฌ ์ปค์คํ
ํธ์ถ์ด ๋ชจ๋ธ์ ํ์ตํฉ๋๋ค. |
| | |
| | ํ์ต ํ๋ก์ธ์ค: |
| | 1. positive ๋ฐ์ดํฐ(ํธ์ถ์ด ์์ฑ)์์ embedding ์ถ์ถ |
| | 2. negative ๋ฐ์ดํฐ(๋นํธ์ถ์ด ์์ฑ)์์ embedding ์ถ์ถ |
| | 3. ๊ฐ๋จํ DNN ๋ถ๋ฅ๊ธฐ ํ์ต |
| | 4. ONNX ๋ชจ๋ธ๋ก ๋ด๋ณด๋ด๊ธฐ |
| | |
| | ์ฌ์ฉ๋ฒ: |
| | python train_model.py --positive_dir ./positive --model_name my_model |
| | """ |
| |
|
| | import os |
| | import sys |
| | from pathlib import Path |
| | from typing import List, Tuple, Optional |
| | import glob |
| | import importlib.util |
| |
|
| | import numpy as np |
| | import torch |
| | import torch.nn as nn |
| | import torch.optim as optim |
| | from torch.utils.data import DataLoader, TensorDataset |
| | from tqdm import tqdm |
| | import soundfile as sf |
| | from scipy import signal as scipy_signal |
| |
|
| | |
| | sys.path.insert(0, str(Path(__file__).parent.parent / "openWakeWord")) |
| |
|
| | try: |
| | from openwakeword.utils import AudioFeatures |
| | print("โ
openwakeword.utils.AudioFeatures ์ํฌํธ ์ฑ๊ณต") |
| | except ImportError as e: |
| | print(f"โ openwakeword ์ํฌํธ ์คํจ: {e}") |
| | print(" -> 'pip install openwakeword' ์คํ ํ์") |
| | sys.exit(1) |
| |
|
| |
|
| | |
| | |
| | |
| | POSITIVE_DIR = "./positive" |
| | MODEL_NAME = "my_model" |
| | OUTPUT_DIR = "./" |
| | SAMPLE_RATE = 16000 |
| | CLIP_DURATION_SAMPLES = 32000 |
| | |
| | DEFAULT_NEGATIVE_DIR = "/home/dusen0528/Keyword-Spotting/data" |
| |
|
| | |
| | EPOCHS = 50 |
| | BATCH_SIZE = 32 |
| | LEARNING_RATE = 0.001 |
| | LAYER_DIM = 128 |
| | N_BLOCKS = 1 |
| |
|
| |
|
| | class WakeWordModel(nn.Module): |
| | """ |
| | ๊ฐ๋จํ Wake Word ๋ถ๋ฅ ๋ชจ๋ธ |
| | |
| | openWakeWord embedding (96์ฐจ์)์ ์
๋ ฅ๋ฐ์ ์ด์ง ๋ถ๋ฅ๋ฅผ ์ํํฉ๋๋ค. |
| | """ |
| | |
| | def __init__(self, input_shape: Tuple[int, int], layer_dim: int = 128, n_blocks: int = 1): |
| | """ |
| | Args: |
| | input_shape: (timesteps, features) - ์: (16, 96) |
| | layer_dim: ํ๋ ๋ ์ด์ด ์ฐจ์ |
| | n_blocks: ์ถ๊ฐ ๋ธ๋ก ์ |
| | """ |
| | super().__init__() |
| | |
| | self.input_shape = input_shape |
| | flat_size = input_shape[0] * input_shape[1] |
| | |
| | |
| | layers = [ |
| | nn.Flatten(), |
| | nn.Linear(flat_size, layer_dim), |
| | nn.LayerNorm(layer_dim), |
| | nn.ReLU() |
| | ] |
| | |
| | |
| | for _ in range(n_blocks): |
| | layers.extend([ |
| | nn.Linear(layer_dim, layer_dim), |
| | nn.LayerNorm(layer_dim), |
| | nn.ReLU() |
| | ]) |
| | |
| | |
| | layers.extend([ |
| | nn.Linear(layer_dim, 1), |
| | nn.Sigmoid() |
| | ]) |
| | |
| | self.model = nn.Sequential(*layers) |
| | |
| | def forward(self, x): |
| | return self.model(x) |
| |
|
| |
|
| | def load_audio_files(directory: str, max_files: Optional[int] = None) -> List[np.ndarray]: |
| | """ |
| | ๋๋ ํ ๋ฆฌ์์ WAV ํ์ผ๋ค์ ๋ก๋ |
| | |
| | Args: |
| | directory: WAV ํ์ผ๋ค์ด ์๋ ๋๋ ํ ๋ฆฌ |
| | max_files: ์ต๋ ๋ก๋ํ ํ์ผ ์ (None์ด๋ฉด ์ ์ฒด) |
| | |
| | Returns: |
| | ์ค๋์ค ๋ฐ์ดํฐ ๋ฆฌ์คํธ (๊ฐ ์์๋ 16kHz, 16-bit PCM numpy array) |
| | """ |
| | wav_files = sorted(Path(directory).glob("*.wav")) |
| | |
| | if max_files: |
| | wav_files = wav_files[:max_files] |
| | |
| | audio_data = [] |
| | |
| | for wav_file in tqdm(wav_files, desc=f"{directory} ๋ก๋ ์ค"): |
| | try: |
| | |
| | data, sr = sf.read(str(wav_file), dtype='float64') |
| | if len(data.shape) > 1: |
| | data = data[:, 0] |
| | if sr != SAMPLE_RATE: |
| | data = _resample_to_16k(data, sr) |
| | else: |
| | data = (np.clip(data, -1.0, 1.0) * 32767).astype(np.int16) |
| | |
| | |
| | if len(data) < CLIP_DURATION_SAMPLES: |
| | |
| | padded = np.zeros(CLIP_DURATION_SAMPLES, dtype=np.int16) |
| | start_idx = CLIP_DURATION_SAMPLES - len(data) |
| | padded[start_idx:] = data |
| | data = padded |
| | elif len(data) > CLIP_DURATION_SAMPLES: |
| | |
| | data = data[-CLIP_DURATION_SAMPLES:] |
| | |
| | audio_data.append(data) |
| | |
| | except Exception as e: |
| | print(f"โ {wav_file.name} ๋ก๋ ์คํจ: {e}") |
| | |
| | return audio_data |
| |
|
| |
|
| | def _resample_to_16k(audio: np.ndarray, orig_sr: int) -> np.ndarray: |
| | """์ค๋์ค๋ฅผ 16kHz๋ก ๋ฆฌ์ํ๋ง ํ int16 ๋ฐํ.""" |
| | if orig_sr == SAMPLE_RATE: |
| | return (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16) if audio.dtype == np.float64 or audio.dtype == np.float32 else audio.astype(np.int16) |
| | n = int(len(audio) * SAMPLE_RATE / orig_sr) |
| | out = scipy_signal.resample(audio, n) |
| | out = np.clip(out, -1.0, 1.0) |
| | return (out * 32767).astype(np.int16) |
| |
|
| |
|
| | def load_negative_from_dir( |
| | negative_dir: str, |
| | num_samples: int, |
| | clip_length: int = CLIP_DURATION_SAMPLES, |
| | ) -> List[np.ndarray]: |
| | """ |
| | ์ผ๋ฐ ๋ํ ์์ฑ ๋๋ ํ ๋ฆฌ์์ negative ํด๋ฆฝ ๋ก๋. |
| | ํ์ผ์ด 44.1kHz ๋ฑ์ด๋ฉด 16kHz๋ก ๋ฆฌ์ํ๋ง ํ, 2์ด(32000์ํ) ๊ตฌ๊ฐ์ ๋๋ค ์์์ผ๋ก ์๋ผ ์ฌ์ฉ. |
| | |
| | Args: |
| | negative_dir: WAV ํ์ผ๋ค์ด ์๋ ๋๋ ํ ๋ฆฌ |
| | num_samples: ํ์ํ negative ํด๋ฆฝ ์ |
| | clip_length: ํด๋ฆฝ ๊ธธ์ด(์ํ ์) |
| | |
| | Returns: |
| | 16kHz, int16, ๊ธธ์ด clip_length์ธ ์ค๋์ค ๋ฐฐ์ด ๋ฆฌ์คํธ |
| | """ |
| | wav_files = sorted(Path(negative_dir).glob("*.wav")) |
| | if not wav_files: |
| | return [] |
| | |
| | negative_data = [] |
| | rng = np.random.default_rng() |
| | |
| | for _ in tqdm(range(num_samples), desc="Negative(์ผ๋ฐ๋ํ) ๋ก๋ ์ค"): |
| | |
| | fpath = str(rng.choice(wav_files)) |
| | try: |
| | data, sr = sf.read(fpath, dtype="float64") |
| | except Exception: |
| | continue |
| | if len(data) == 0: |
| | continue |
| | if data.ndim > 1: |
| | data = data[:, 0] |
| | |
| | data_16k = _resample_to_16k(data, sr) |
| | if len(data_16k) < clip_length: |
| | |
| | pad = np.zeros(clip_length - len(data_16k), dtype=np.int16) |
| | data_16k = np.concatenate([data_16k, pad]) |
| | else: |
| | |
| | start = rng.integers(0, len(data_16k) - clip_length + 1) |
| | data_16k = data_16k[start : start + clip_length] |
| | negative_data.append(data_16k) |
| | |
| | return negative_data |
| |
|
| |
|
| | def generate_negative_data(num_samples: int) -> List[np.ndarray]: |
| | """ |
| | Negative ๋ฐ์ดํฐ ์์ฑ (๋ฌด์์ ๋
ธ์ด์ฆ + ๋ฌด์). |
| | negative_dir์ ์ฐ์ง ์์ ๋๋ง ์ฌ์ฉ. |
| | """ |
| | negative_data = [] |
| | for i in tqdm(range(num_samples), desc="Negative(๋
ธ์ด์ฆ) ์์ฑ ์ค"): |
| | if np.random.random() < 0.8: |
| | noise_level = np.random.uniform(100, 1000) |
| | data = np.random.normal(0, noise_level, CLIP_DURATION_SAMPLES).astype(np.int16) |
| | else: |
| | data = np.zeros(CLIP_DURATION_SAMPLES, dtype=np.int16) |
| | negative_data.append(data) |
| | return negative_data |
| |
|
| |
|
| | def extract_embeddings( |
| | audio_clips: List[np.ndarray], |
| | feature_extractor: AudioFeatures, |
| | use_gpu: bool = True, |
| | ) -> np.ndarray: |
| | """ |
| | ์ค๋์ค ํด๋ฆฝ๋ค์์ openWakeWord embedding ์ถ์ถ |
| | |
| | Args: |
| | audio_clips: ์ค๋์ค ๋ฐ์ดํฐ ๋ฆฌ์คํธ |
| | feature_extractor: AudioFeatures ์ธ์คํด์ค |
| | use_gpu: GPU ์ฌ์ฉ ์ True (๋ฐฐ์น ํฌ๊ธฐ ํ๋) |
| | |
| | Returns: |
| | (N, timesteps, 96) ํํ์ embedding array |
| | """ |
| | |
| | clips_array = np.vstack([c[None, :] for c in audio_clips]) |
| | |
| | |
| | batch_size = 128 if (use_gpu and torch.cuda.is_available()) else 32 |
| | print(f"\n๐ Embedding ์ถ์ถ ์ค... (์ด {len(clips_array)}๊ฐ ํด๋ฆฝ, batch_size={batch_size})") |
| | |
| | |
| | embeddings = feature_extractor.embed_clips(clips_array, batch_size=batch_size) |
| | |
| | print(f" Embedding ํํ: {embeddings.shape}") |
| | |
| | return embeddings |
| |
|
| |
|
| | def _resolve_openwakeword_resource_paths() -> Tuple[Optional[str], Optional[str]]: |
| | """ |
| | openwakeword ๊ธฐ๋ณธ ๋ฆฌ์์ค ๊ฒฝ๋ก๋ฅผ ์ฐพ๋๋ค. |
| | 1) ๋ก์ปฌ ๋ฆฌํฌ(openWakeWord/openwakeword/resources/models) |
| | 2) ํ์ฌ venv site-packages/openwakeword/resources/models |
| | """ |
| | local_models_dir = Path(__file__).parent.parent / "openWakeWord" / "openwakeword" / "resources" / "models" |
| | mel_local = local_models_dir / "melspectrogram.onnx" |
| | emb_local = local_models_dir / "embedding_model.onnx" |
| | if mel_local.exists() and emb_local.exists(): |
| | return str(mel_local), str(emb_local) |
| |
|
| | |
| | venv_base = Path(__file__).parent / ".venv" / "lib" |
| | candidates = sorted(glob.glob(str(venv_base / "python*" / "site-packages" / "openwakeword" / "resources" / "models"))) |
| | for c in candidates: |
| | cdir = Path(c) |
| | mel = cdir / "melspectrogram.onnx" |
| | emb = cdir / "embedding_model.onnx" |
| | if mel.exists() and emb.exists(): |
| | return str(mel), str(emb) |
| |
|
| | return None, None |
| |
|
| |
|
| | def _check_onnx_export_dependencies() -> None: |
| | """ |
| | ONNX ๋ด๋ณด๋ด๊ธฐ ํ์ ๋ชจ๋ ์ฌ์ ์ ๊ฒ. |
| | ํ์ต 50 epoch ํ export ๋จ๊ณ์์ ์คํจํ๋ ์๊ฐ ๋ญ๋น๋ฅผ ๋ฐฉ์งํ๋ค. |
| | """ |
| | missing = [] |
| | for mod in ("onnx", "onnxscript"): |
| | if importlib.util.find_spec(mod) is None: |
| | missing.append(mod) |
| |
|
| | if missing: |
| | mods = ", ".join(missing) |
| | print("\nโ ONNX export ์์กด์ฑ ๋๋ฝ:", mods) |
| | print(" ํ์ฌ Python:", sys.executable) |
| | print(" ์๋๋ฅผ ๋จผ์ ์คํํ์ธ์:") |
| | print(" python -m ensurepip --upgrade") |
| | print(" python -m pip install -U onnx onnxscript") |
| | sys.exit(1) |
| |
|
| |
|
| | def train_model( |
| | positive_embeddings: np.ndarray, |
| | negative_embeddings: np.ndarray, |
| | epochs: int = 50, |
| | batch_size: int = 32, |
| | learning_rate: float = 0.001, |
| | use_gpu: bool = True, |
| | ) -> Tuple[WakeWordModel, dict]: |
| | """ |
| | Wake Word ๋ชจ๋ธ ํ์ต |
| | |
| | Args: |
| | positive_embeddings: Positive ์ํ embeddings |
| | negative_embeddings: Negative ์ํ embeddings |
| | epochs: ํ์ต ์ํญ ์ |
| | batch_size: ๋ฐฐ์น ํฌ๊ธฐ |
| | learning_rate: ํ์ต๋ฅ |
| | |
| | Returns: |
| | ํ์ต๋ ๋ชจ๋ธ๊ณผ ํ์ต ํ์คํ ๋ฆฌ |
| | """ |
| | |
| | |
| | n_timesteps = 16 |
| | n_features = positive_embeddings.shape[-1] |
| | input_shape = (n_timesteps, n_features) |
| | |
| | print(f"\n{'='*60}") |
| | print(f"๋ชจ๋ธ ํ์ต ์์") |
| | print(f"Input shape: {input_shape}") |
| | print(f"Positive ์ํ: {len(positive_embeddings)}๊ฐ") |
| | print(f"Negative ์ํ: {len(negative_embeddings)}๊ฐ") |
| | print(f"{'='*60}\n") |
| | |
| | |
| | def prepare_features(embeddings: np.ndarray, n_timesteps: int = 16) -> np.ndarray: |
| | """Embedding์์ ๋ง์ง๋ง n_timesteps ํ๋ ์ ์ถ์ถ""" |
| | prepared = [] |
| | for emb in embeddings: |
| | if emb.shape[0] >= n_timesteps: |
| | prepared.append(emb[-n_timesteps:]) |
| | else: |
| | |
| | padded = np.zeros((n_timesteps, n_features)) |
| | padded[-emb.shape[0]:] = emb |
| | prepared.append(padded) |
| | return np.array(prepared) |
| | |
| | X_pos = prepare_features(positive_embeddings, n_timesteps) |
| | X_neg = prepare_features(negative_embeddings, n_timesteps) |
| | |
| | |
| | X = np.vstack([X_pos, X_neg]).astype(np.float32) |
| | y = np.hstack([ |
| | np.ones(len(X_pos)), |
| | np.zeros(len(X_neg)) |
| | ]).astype(np.float32) |
| | |
| | |
| | indices = np.random.permutation(len(X)) |
| | X, y = X[indices], y[indices] |
| | |
| | |
| | split_idx = int(len(X) * 0.8) |
| | X_train, X_val = X[:split_idx], X[split_idx:] |
| | y_train, y_val = y[:split_idx], y[split_idx:] |
| | |
| | |
| | device = torch.device('cuda' if (use_gpu and torch.cuda.is_available()) else 'cpu') |
| | if device.type == 'cuda': |
| | print(f"๐ฅ๏ธ ํ์ต ๋๋ฐ์ด์ค: GPU ({torch.cuda.get_device_name(0)})") |
| | batch_size = min(128, max(BATCH_SIZE, len(X_train) // 16)) |
| | else: |
| | print(f"๐ฅ๏ธ ํ์ต ๋๋ฐ์ด์ค: CPU") |
| | batch_size = BATCH_SIZE |
| |
|
| | |
| | train_dataset = TensorDataset( |
| | torch.from_numpy(X_train), |
| | torch.from_numpy(y_train) |
| | ) |
| | val_dataset = TensorDataset( |
| | torch.from_numpy(X_val), |
| | torch.from_numpy(y_val) |
| | ) |
| | |
| | train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=(device.type == 'cuda')) |
| | val_loader = DataLoader(val_dataset, batch_size=batch_size, pin_memory=(device.type == 'cuda')) |
| |
|
| | model = WakeWordModel(input_shape=input_shape, layer_dim=LAYER_DIM, n_blocks=N_BLOCKS) |
| | model = model.to(device) |
| | |
| | |
| | criterion = nn.BCELoss() |
| | optimizer = optim.Adam(model.parameters(), lr=learning_rate) |
| | |
| | |
| | history = { |
| | 'train_loss': [], |
| | 'val_loss': [], |
| | 'val_accuracy': [] |
| | } |
| | |
| | best_val_loss = float('inf') |
| | best_model_state = None |
| | |
| | |
| | for epoch in range(epochs): |
| | |
| | model.train() |
| | train_losses = [] |
| | |
| | for X_batch, y_batch in train_loader: |
| | X_batch = X_batch.to(device) |
| | y_batch = y_batch.to(device).unsqueeze(1) |
| | |
| | optimizer.zero_grad() |
| | outputs = model(X_batch) |
| | loss = criterion(outputs, y_batch) |
| | loss.backward() |
| | optimizer.step() |
| | |
| | train_losses.append(loss.item()) |
| | |
| | |
| | model.eval() |
| | val_losses = [] |
| | correct = 0 |
| | total = 0 |
| | |
| | with torch.no_grad(): |
| | for X_batch, y_batch in val_loader: |
| | X_batch = X_batch.to(device) |
| | y_batch = y_batch.to(device).unsqueeze(1) |
| | |
| | outputs = model(X_batch) |
| | loss = criterion(outputs, y_batch) |
| | val_losses.append(loss.item()) |
| | |
| | predicted = (outputs > 0.5).float() |
| | total += y_batch.size(0) |
| | correct += (predicted == y_batch).sum().item() |
| | |
| | avg_train_loss = np.mean(train_losses) |
| | avg_val_loss = np.mean(val_losses) |
| | val_accuracy = correct / total |
| | |
| | history['train_loss'].append(avg_train_loss) |
| | history['val_loss'].append(avg_val_loss) |
| | history['val_accuracy'].append(val_accuracy) |
| | |
| | |
| | if avg_val_loss < best_val_loss: |
| | best_val_loss = avg_val_loss |
| | best_model_state = model.state_dict().copy() |
| | |
| | |
| | if (epoch + 1) % 10 == 0 or epoch == 0: |
| | print(f"Epoch [{epoch+1}/{epochs}] " |
| | f"Train Loss: {avg_train_loss:.4f} | " |
| | f"Val Loss: {avg_val_loss:.4f} | " |
| | f"Val Acc: {val_accuracy:.4f}") |
| | |
| | |
| | if best_model_state: |
| | model.load_state_dict(best_model_state) |
| | |
| | return model, history |
| |
|
| |
|
| | def export_to_onnx( |
| | model: WakeWordModel, |
| | model_name: str, |
| | output_dir: str |
| | ) -> str: |
| | """ |
| | ํ์ต๋ ๋ชจ๋ธ์ ONNX ํ์์ผ๋ก ๋ด๋ณด๋ด๊ธฐ |
| | |
| | Args: |
| | model: ํ์ต๋ PyTorch ๋ชจ๋ธ |
| | model_name: ๋ชจ๋ธ ์ด๋ฆ |
| | output_dir: ์ถ๋ ฅ ๋๋ ํ ๋ฆฌ |
| | |
| | Returns: |
| | ์ ์ฅ๋ ONNX ํ์ผ ๊ฒฝ๋ก |
| | """ |
| | model.eval() |
| | model = model.to('cpu') |
| | |
| | |
| | dummy_input = torch.randn(1, *model.input_shape) |
| | |
| | |
| | onnx_path = os.path.join(output_dir, f"{model_name}.onnx") |
| | |
| | |
| | torch.onnx.export( |
| | model, |
| | dummy_input, |
| | onnx_path, |
| | export_params=True, |
| | opset_version=13, |
| | do_constant_folding=True, |
| | input_names=['input'], |
| | output_names=[model_name], |
| | dynamic_axes={ |
| | 'input': {0: 'batch_size'}, |
| | model_name: {0: 'batch_size'} |
| | } |
| | ) |
| | |
| | print(f"\nโ
ONNX ๋ชจ๋ธ ์ ์ฅ ์๋ฃ: {onnx_path}") |
| | |
| | return onnx_path |
| |
|
| |
|
| | def main(): |
| | """๋ฉ์ธ ํจ์""" |
| | import argparse |
| | |
| | parser = argparse.ArgumentParser( |
| | description="ํ๊ตญ์ด ํธ์ถ์ด ๋ชจ๋ธ ํ์ต๊ธฐ" |
| | ) |
| | parser.add_argument( |
| | "--positive_dir", "-p", |
| | type=str, |
| | default=POSITIVE_DIR, |
| | help=f"Positive ๋ฐ์ดํฐ ๋๋ ํ ๋ฆฌ (๊ธฐ๋ณธ๊ฐ: {POSITIVE_DIR})" |
| | ) |
| | parser.add_argument( |
| | "--model_name", "-m", |
| | type=str, |
| | default=MODEL_NAME, |
| | help=f"์ถ๋ ฅ ๋ชจ๋ธ ์ด๋ฆ (๊ธฐ๋ณธ๊ฐ: {MODEL_NAME})" |
| | ) |
| | parser.add_argument( |
| | "--output_dir", "-o", |
| | type=str, |
| | default=OUTPUT_DIR, |
| | help=f"์ถ๋ ฅ ๋๋ ํ ๋ฆฌ (๊ธฐ๋ณธ๊ฐ: {OUTPUT_DIR})" |
| | ) |
| | parser.add_argument( |
| | "--epochs", "-e", |
| | type=int, |
| | default=EPOCHS, |
| | help=f"ํ์ต ์ํญ ์ (๊ธฐ๋ณธ๊ฐ: {EPOCHS})" |
| | ) |
| | parser.add_argument( |
| | "--negative_ratio", |
| | type=float, |
| | default=3.0, |
| | help="Positive ๋๋น Negative ์ํ ๋น์จ (๊ธฐ๋ณธ๊ฐ: 3.0)" |
| | ) |
| | parser.add_argument( |
| | "--negative_dir", "-n", |
| | type=str, |
| | default=DEFAULT_NEGATIVE_DIR, |
| | help=f"์ผ๋ฐ ๋ํ ์์ฑ(negative) WAV ๋๋ ํ ๋ฆฌ. ๋น์ฐ๋ฉด ๋
ธ์ด์ฆ๋ง ์ฌ์ฉ (๊ธฐ๋ณธ: {DEFAULT_NEGATIVE_DIR})" |
| | ) |
| | parser.add_argument( |
| | "--no-gpu", |
| | action="store_true", |
| | help="GPU ๋นํ์ฑํ (CPU๋ง ์ฌ์ฉ)" |
| | ) |
| | |
| | args = parser.parse_args() |
| |
|
| | |
| | _check_onnx_export_dependencies() |
| | |
| | |
| | use_gpu_flag = torch.cuda.is_available() and not getattr(args, 'no_gpu', False) |
| | |
| | |
| | Path(args.output_dir).mkdir(parents=True, exist_ok=True) |
| | |
| | print("\n" + "="*60) |
| | print("๐ค ํ๊ตญ์ด ํธ์ถ์ด ๋ชจ๋ธ ํ์ต ํ์ดํ๋ผ์ธ") |
| | print("="*60) |
| | |
| | |
| | print("\n[Step 1/5] Positive ๋ฐ์ดํฐ ๋ก๋") |
| | positive_clips = load_audio_files(args.positive_dir) |
| | |
| | if len(positive_clips) == 0: |
| | print("โ Positive ๋ฐ์ดํฐ๊ฐ ์์ต๋๋ค! ๋จผ์ generate_data.py๋ฅผ ์คํํ์ธ์.") |
| | sys.exit(1) |
| | |
| | print(f" โ
{len(positive_clips)}๊ฐ Positive ํด๋ฆฝ ๋ก๋ ์๋ฃ") |
| | |
| | |
| | print("\n[Step 2/5] Negative ๋ฐ์ดํฐ ์ค๋น") |
| | num_negative = int(len(positive_clips) * args.negative_ratio) |
| | negative_dir = (args.negative_dir or "").strip() |
| | if negative_dir and Path(negative_dir).is_dir() and list(Path(negative_dir).glob("*.wav")): |
| | negative_clips = load_negative_from_dir(negative_dir, num_negative) |
| | print(f" โ
{len(negative_clips)}๊ฐ Negative ํด๋ฆฝ ๋ก๋ (์ผ๋ฐ ๋ํ ์์ฑ: {negative_dir})") |
| | else: |
| | negative_clips = generate_negative_data(num_negative) |
| | print(f" โ
{len(negative_clips)}๊ฐ Negative ํด๋ฆฝ ์์ฑ (๋
ธ์ด์ฆ)") |
| | |
| | |
| | print("\n[Step 3/5] openWakeWord Embedding ์ถ์ถ") |
| | feat_device = 'gpu' if use_gpu_flag else 'cpu' |
| | if use_gpu_flag: |
| | print(f" ๐ฅ๏ธ GPU ์ฌ์ฉ: {torch.cuda.get_device_name(0)}") |
| | else: |
| | print(" ๐ฅ๏ธ CPU ์ฌ์ฉ") |
| | melspec_model_path, embedding_model_path = _resolve_openwakeword_resource_paths() |
| | if melspec_model_path and embedding_model_path: |
| | feature_extractor = AudioFeatures( |
| | inference_framework='onnx', |
| | device=feat_device, |
| | melspec_model_path=melspec_model_path, |
| | embedding_model_path=embedding_model_path, |
| | ) |
| | else: |
| | feature_extractor = AudioFeatures(inference_framework='onnx', device=feat_device) |
| | |
| | positive_embeddings = extract_embeddings(positive_clips, feature_extractor, use_gpu=use_gpu_flag) |
| | negative_embeddings = extract_embeddings(negative_clips, feature_extractor, use_gpu=use_gpu_flag) |
| | |
| | print(f" โ
Positive embeddings: {positive_embeddings.shape}") |
| | print(f" โ
Negative embeddings: {negative_embeddings.shape}") |
| | |
| | |
| | print("\n[Step 4/5] ๋ชจ๋ธ ํ์ต") |
| | model, history = train_model( |
| | positive_embeddings, |
| | negative_embeddings, |
| | epochs=args.epochs, |
| | batch_size=BATCH_SIZE, |
| | learning_rate=LEARNING_RATE, |
| | use_gpu=use_gpu_flag, |
| | ) |
| | |
| | |
| | print("\n[Step 5/5] ONNX ๋ชจ๋ธ ๋ด๋ณด๋ด๊ธฐ") |
| | onnx_path = export_to_onnx(model, args.model_name, args.output_dir) |
| | |
| | |
| | print("\n" + "="*60) |
| | print("๐ ํ์ต ์๋ฃ!") |
| | print("="*60) |
| | print(f"๐ ONNX ๋ชจ๋ธ: {onnx_path}") |
| | print(f"๐ ์ต์ข
Validation Accuracy: {history['val_accuracy'][-1]:.4f}") |
| | print(f"\n๐ก ์ค์๊ฐ ์ถ๋ก ์ ์คํํ๋ ค๋ฉด:") |
| | print(f" python run_live.py --model {onnx_path}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|