Spaces:
Running
on
A10G
Running
on
A10G
| """ | |
| Audio Encoder for MagicPath Server | |
| =================================== | |
| CLAP ๋ชจ๋ธ์ ์ฌ์ฉํ์ฌ ์ค๋์ค ํ์ผ์์ ํน์ง ๋ฒกํฐ ์ถ์ถ | |
| DiffVox LLM๊ณผ ๋์ผํ ์ธ์ฝ๋ ์ฌ์ฉ | |
| """ | |
| import torch | |
| import numpy as np | |
| from typing import List, Optional | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| class AudioEncoder: | |
| """CLAP ๊ธฐ๋ฐ ์ค๋์ค ์ธ์ฝ๋""" | |
| def __init__( | |
| self, | |
| output_dim: int = 64, | |
| reduction_method: str = "pool", | |
| model_name: str = "laion/larger_clap_general" | |
| ): | |
| """ | |
| ์ค๋์ค ์ธ์ฝ๋ ์ด๊ธฐํ | |
| Args: | |
| output_dim: ์ถ๋ ฅ ํน์ง ์ฐจ์ (๊ธฐ๋ณธ 64) | |
| reduction_method: ์ฐจ์ ์ถ์ ๋ฐฉ๋ฒ ("pool", "pca", "linear") | |
| model_name: CLAP ๋ชจ๋ธ ์ด๋ฆ | |
| """ | |
| self.output_dim = output_dim | |
| self.reduction_method = reduction_method | |
| self.model_name = model_name | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.model = None | |
| self.processor = None | |
| self.projection = None | |
| self._load_model() | |
| def _load_model(self): | |
| """CLAP ๋ชจ๋ธ ๋ก๋""" | |
| try: | |
| from transformers import ClapModel, ClapProcessor | |
| print(f"[AudioEncoder] CLAP ๋ชจ๋ธ ๋ก๋ฉ ์ค: {self.model_name}") | |
| self.processor = ClapProcessor.from_pretrained(self.model_name) | |
| self.model = ClapModel.from_pretrained(self.model_name) | |
| self.model = self.model.to(self.device) | |
| self.model.eval() | |
| # CLAP ์ถ๋ ฅ ์ฐจ์ ํ์ธ (๋ณดํต 512) | |
| clap_dim = self.model.config.projection_dim | |
| print(f"[AudioEncoder] CLAP ์ถ๋ ฅ ์ฐจ์: {clap_dim}") | |
| # ์ฐจ์ ์ถ์๋ฅผ ์ํ projection layer | |
| if self.reduction_method == "linear" and clap_dim != self.output_dim: | |
| self.projection = torch.nn.Linear(clap_dim, self.output_dim) | |
| self.projection = self.projection.to(self.device) | |
| print(f"[AudioEncoder] Linear projection: {clap_dim} โ {self.output_dim}") | |
| print("[AudioEncoder] โ ๋ชจ๋ธ ๋ก๋ ์๋ฃ") | |
| except ImportError: | |
| print("[AudioEncoder] โ transformers ๋ฏธ์ค์น") | |
| print(" pip install transformers") | |
| except Exception as e: | |
| print(f"[AudioEncoder] โ ๋ชจ๋ธ ๋ก๋ ์คํจ: {e}") | |
| def get_audio_features(self, audio_path: str) -> List[float]: | |
| """ | |
| ์ค๋์ค ํ์ผ์์ ํน์ง ๋ฒกํฐ ์ถ์ถ | |
| Args: | |
| audio_path: ์ค๋์ค ํ์ผ ๊ฒฝ๋ก | |
| Returns: | |
| ํน์ง ๋ฒกํฐ (output_dim ์ฐจ์) | |
| """ | |
| if self.model is None: | |
| print("[AudioEncoder] ๋ชจ๋ธ์ด ๋ก๋๋์ง ์์") | |
| return [] | |
| try: | |
| import librosa | |
| # ์ค๋์ค ๋ก๋ | |
| audio, sr = librosa.load(audio_path, sr=48000, mono=True) | |
| # CLAP ์ ๋ ฅ ์ค๋น | |
| inputs = self.processor( | |
| audios=audio, | |
| sampling_rate=48000, | |
| return_tensors="pt" | |
| ) | |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
| # ํน์ง ์ถ์ถ | |
| with torch.no_grad(): | |
| audio_features = self.model.get_audio_features(**inputs) | |
| # CPU๋ก ์ด๋ | |
| features = audio_features.squeeze().cpu().numpy() | |
| # ์ฐจ์ ์ถ์ | |
| features = self._reduce_dimension(features) | |
| return features.tolist() | |
| except Exception as e: | |
| print(f"[AudioEncoder] ํน์ง ์ถ์ถ ์คํจ: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return [] | |
| def _reduce_dimension(self, features: np.ndarray) -> np.ndarray: | |
| """ํน์ง ๋ฒกํฐ ์ฐจ์ ์ถ์""" | |
| current_dim = len(features) | |
| if current_dim == self.output_dim: | |
| return features | |
| if self.reduction_method == "pool": | |
| # ํ๊ท ํ๋ง์ผ๋ก ์ฐจ์ ์ถ์ | |
| if current_dim > self.output_dim: | |
| pool_size = current_dim // self.output_dim | |
| remainder = current_dim % self.output_dim | |
| pooled = [] | |
| idx = 0 | |
| for i in range(self.output_dim): | |
| size = pool_size + (1 if i < remainder else 0) | |
| pooled.append(np.mean(features[idx:idx+size])) | |
| idx += size | |
| return np.array(pooled) | |
| else: | |
| # ์ฐจ์์ด ์์ผ๋ฉด zero-padding | |
| padded = np.zeros(self.output_dim) | |
| padded[:current_dim] = features | |
| return padded | |
| elif self.reduction_method == "linear" and self.projection is not None: | |
| # Linear projection | |
| with torch.no_grad(): | |
| features_tensor = torch.tensor(features, dtype=torch.float32).to(self.device) | |
| projected = self.projection(features_tensor) | |
| return projected.cpu().numpy() | |
| else: | |
| # ๊ธฐ๋ณธ: ์์์๋ถํฐ ์๋ฅด๊ธฐ | |
| return features[:self.output_dim] | |
| def get_text_features(self, text: str) -> List[float]: | |
| """ | |
| ํ ์คํธ์์ ํน์ง ๋ฒกํฐ ์ถ์ถ (CLAP text encoder) | |
| Args: | |
| text: ์ ๋ ฅ ํ ์คํธ | |
| Returns: | |
| ํน์ง ๋ฒกํฐ | |
| """ | |
| if self.model is None: | |
| return [] | |
| try: | |
| inputs = self.processor( | |
| text=text, | |
| return_tensors="pt", | |
| padding=True | |
| ) | |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| text_features = self.model.get_text_features(**inputs) | |
| features = text_features.squeeze().cpu().numpy() | |
| features = self._reduce_dimension(features) | |
| return features.tolist() | |
| except Exception as e: | |
| print(f"[AudioEncoder] ํ ์คํธ ํน์ง ์ถ์ถ ์คํจ: {e}") | |
| return [] | |