""" AI Effector - DiffVox LLM 기반 이펙트 파라미터 예측 =================================================== V9: Compressor threshold 범위 수정 (0 ~ -5dB) """ import os import json import re import math import torch import numpy as np from typing import Dict, List, Optional, Any, Tuple from pathlib import Path from datetime import datetime import warnings warnings.filterwarnings("ignore") def sigmoid(x: float) -> float: try: return 1 / (1 + math.exp(-x)) except OverflowError: return 0.0 if x < 0 else 1.0 def minmax_transform(raw: float, min_val: float, max_val: float) -> float: return sigmoid(raw) * (max_val - min_val) + min_val PARAM_TRANSFORMS = { "eq_peak1.params.freq": {"type": "minmax", "min": 33.0, "max": 17500.0}, "eq_peak1.params.Q": {"type": "minmax", "min": 0.2, "max": 20.0}, "eq_peak1.params.gain": {"type": "none"}, "eq_peak2.params.freq": {"type": "minmax", "min": 33.0, "max": 17500.0}, "eq_peak2.params.Q": {"type": "minmax", "min": 0.2, "max": 20.0}, "eq_peak2.params.gain": {"type": "none"}, "eq_lowshelf.params.freq": {"type": "minmax", "min": 30.0, "max": 200.0}, "eq_lowshelf.params.gain": {"type": "none"}, "eq_highshelf.params.freq": {"type": "minmax", "min": 2500.0, "max": 16000.0}, "eq_highshelf.params.gain": {"type": "none"}, "delay.delay_time": {"type": "none"}, "delay.feedback": {"type": "sigmoid"}, "delay.mix": {"type": "sigmoid"}, "distortion_amount": {"type": "sigmoid_scale", "scale": 0.1}, "final_wet_mix": {"type": "sigmoid"}, } DEFAULT_PARAMETERS = { "eq_peak1.params.freq": 1000.0, "eq_peak1.params.gain": 0.0, "eq_peak1.params.Q": 1.0, "eq_peak2.params.freq": 4000.0, "eq_peak2.params.gain": 0.0, "eq_peak2.params.Q": 1.0, "eq_lowshelf.params.freq": 115.0, "eq_lowshelf.params.gain": 0.0, "eq_highshelf.params.freq": 8000.0, "eq_highshelf.params.gain": 0.0, # V9: Compressor threshold 기본값 -3dB "compressor.threshold": -3.0, "compressor.ratio": 2.0, "distortion_amount": 0.0, "delay.delay_time": 0.02, "delay.feedback": 0.15, "delay.mix": 0.1, "reverb.room_size": 0.3, "reverb.damping": 0.5, "reverb.wet_level": 0.0, "reverb.dry_level": 1.0, "final_wet_mix": 0.5 } # V9: Compressor threshold 범위 0 ~ -5dB PARAM_RANGES = { "eq_peak1.params.freq": (33.0, 17500.0), "eq_peak1.params.gain": (-12.0, 12.0), "eq_peak1.params.Q": (0.2, 20.0), "eq_peak2.params.freq": (33.0, 17500.0), "eq_peak2.params.gain": (-12.0, 12.0), "eq_peak2.params.Q": (0.2, 20.0), "eq_lowshelf.params.freq": (30.0, 200.0), "eq_lowshelf.params.gain": (-12.0, 12.0), "eq_highshelf.params.freq": (2500.0, 16000.0), "eq_highshelf.params.gain": (-12.0, 12.0), # V9: 0 ~ -5dB (가벼운 압축) "compressor.threshold": (-5.0, 0.0), "compressor.ratio": (1.5, 4.0), "distortion_amount": (0.0, 0.05), "delay.delay_time": (0.01, 0.3), "delay.feedback": (0.0, 0.25), "delay.mix": (0.0, 0.2), "reverb.room_size": (0.0, 0.6), "reverb.damping": (0.0, 1.0), "reverb.wet_level": (0.0, 0.3), "reverb.dry_level": (0.7, 1.0), "final_wet_mix": (0.3, 0.7), } SYNONYM_MAP = { "calm": "warm soft", "relaxed": "warm soft", "chill": "warm soft", "smooth": "warm", "mellow": "warm soft", "breezy": "bright spacious", "airy": "bright spacious", "light": "bright", "crisp": "bright", "clean": "bright", "dreamy": "warm spacious", "ethereal": "bright spacious", "atmospheric": "spacious", "ambient": "spacious warm", "aggressive": "saturated bright", "powerful": "saturated", "punchy": "saturated bright", "hard": "saturated", "gritty": "saturated dark", "soft": "warm", "harsh": "bright saturated", "muddy": "dark", "thin": "bright", "thick": "warm dark", "full": "warm", "reverb": "spacious", "echo": "spacious", "wet": "spacious", } # V9: Compressor threshold 0 ~ -5dB 범위 STYLE_PRESETS = { "warm": { "compressor.threshold": -3.0, "compressor.ratio": 2.0, "delay.delay_time": 0.02, "delay.feedback": 0.12, "delay.mix": 0.08, "reverb.room_size": 0.25, "reverb.wet_level": 0.1, "reverb.dry_level": 0.9, }, "bright": { "compressor.threshold": -2.0, "compressor.ratio": 2.0, "delay.delay_time": 0.02, "delay.feedback": 0.1, "delay.mix": 0.06, "reverb.room_size": 0.2, "reverb.wet_level": 0.08, "reverb.dry_level": 0.92, }, "spacious": { "compressor.threshold": -4.0, "compressor.ratio": 1.8, "delay.delay_time": 0.06, "delay.feedback": 0.2, "delay.mix": 0.15, "reverb.room_size": 0.45, "reverb.wet_level": 0.2, "reverb.dry_level": 0.8, }, "dark": { "compressor.threshold": -4.0, "compressor.ratio": 2.0, "delay.delay_time": 0.03, "delay.feedback": 0.15, "delay.mix": 0.1, "reverb.room_size": 0.35, "reverb.wet_level": 0.15, "reverb.dry_level": 0.85, }, "saturated": { "compressor.threshold": -2.0, "compressor.ratio": 3.0, "delay.delay_time": 0.02, "delay.feedback": 0.08, "delay.mix": 0.05, "reverb.room_size": 0.15, "reverb.wet_level": 0.06, "reverb.dry_level": 0.94, }, "soft": { "compressor.threshold": -5.0, "compressor.ratio": 1.5, "delay.delay_time": 0.025, "delay.feedback": 0.15, "delay.mix": 0.1, "reverb.room_size": 0.3, "reverb.wet_level": 0.12, "reverb.dry_level": 0.88, }, } class CLAPAudioEncoder: def __init__(self, output_dim: int = 64, model_name: str = "laion/larger_clap_music"): self.output_dim = output_dim self.model_name = model_name self.target_sr = 48000 self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model = None self.processor = None self._load_model() def _load_model(self): try: from transformers import ClapModel, ClapProcessor print(f"[CLAPEncoder] CLAP 모델 로딩 중: {self.model_name}") self.processor = ClapProcessor.from_pretrained(self.model_name) self.model = ClapModel.from_pretrained(self.model_name) self.model = self.model.to(self.device) self.model.eval() print(f"[CLAPEncoder] ✅ CLAP 모델 로드 완료") except Exception as e: print(f"[CLAPEncoder] ❌ 모델 로드 실패: {e}") def get_audio_features(self, audio_path: str) -> List[float]: if self.model is None: return [0.0] * self.output_dim try: import librosa audio, sr = librosa.load(audio_path, sr=self.target_sr, mono=True) inputs = self.processor(audios=audio, sampling_rate=self.target_sr, return_tensors="pt", padding=True).to(self.device) with torch.no_grad(): outputs = self.model.get_audio_features(**inputs) features_512 = outputs[0].cpu().numpy() return self._reduce_dimension(features_512).tolist() except Exception as e: print(f"[CLAPEncoder] 특징 추출 실패: {e}") return [0.0] * self.output_dim def _reduce_dimension(self, features: np.ndarray) -> np.ndarray: current_dim = len(features) if current_dim == self.output_dim: return features pool_size = current_dim // self.output_dim remainder = current_dim % self.output_dim pooled = [] idx = 0 for i in range(self.output_dim): size = pool_size + (1 if i < remainder else 0) pooled.append(np.mean(features[idx:idx+size])) idx += size return np.array(pooled) def is_loaded(self) -> bool: return self.model is not None class AIEffector: def __init__(self, model_repo_id: str = "heybaeheef/KU_SW_Academy", model_subfolder: str = "checkpoints", base_model_name: str = "Qwen/Qwen3-8B", audio_feature_dim: int = 64, use_huggingface: bool = True): self.model_repo_id = model_repo_id self.model_subfolder = model_subfolder self.base_model_name = base_model_name self.audio_feature_dim = audio_feature_dim self.use_huggingface = use_huggingface self.model = None self.tokenizer = None self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"[AIEffector V9] CLAP 인코더 초기화...") self.audio_encoder = CLAPAudioEncoder(output_dim=audio_feature_dim) self.request_count = 0 self._load_model() def _load_model(self): try: from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig from peft import PeftModel print(f"[AIEffector] 베이스 모델 로딩: {self.base_model_name}") if torch.cuda.is_available(): bnb_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.float16, bnb_4bit_use_double_quant=True) base_model = AutoModelForCausalLM.from_pretrained(self.base_model_name, quantization_config=bnb_config, device_map="auto", trust_remote_code=True) else: base_model = AutoModelForCausalLM.from_pretrained(self.base_model_name, torch_dtype=torch.float32, device_map="auto", trust_remote_code=True) self.tokenizer = AutoTokenizer.from_pretrained(self.base_model_name, trust_remote_code=True) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token print(f"[AIEffector] LoRA 어댑터 로딩...") if self.use_huggingface: self.model = PeftModel.from_pretrained(base_model, self.model_repo_id, subfolder=self.model_subfolder, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32) else: local_path = os.path.join(self.model_repo_id, self.model_subfolder) self.model = PeftModel.from_pretrained(base_model, local_path, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32) self.model.eval() print(f"[AIEffector] ✅ 모델 로드 성공!") except Exception as e: print(f"[AIEffector] ❌ 모델 로드 실패: {e}") import traceback traceback.print_exc() self.model = None self.tokenizer = None def is_loaded(self) -> bool: return self.model is not None def _preprocess_text(self, text: str) -> str: text_lower = text.lower() for synonym, replacement in SYNONYM_MAP.items(): if synonym in text_lower: text_lower = text_lower.replace(synonym, replacement) return text_lower def _apply_preset(self, prompt: str) -> Dict[str, float]: params = {} prompt_lower = prompt.lower() matched = [] for style_name, style_params in STYLE_PRESETS.items(): if style_name in prompt_lower: params.update(style_params) matched.append(style_name) if matched: print(f" [Preset] 매칭: {matched}") else: params.update(STYLE_PRESETS["warm"]) print(f" [Preset] 기본값 적용: warm") return params def _format_prompt(self, text_prompt: str, audio_features: List[float]) -> str: audio_state_str = json.dumps(audio_features) return f"""Task: Convert text to audio parameters. Audio: {audio_state_str} Text: {text_prompt} Parameters:""" def _preprocess_json(self, json_str: str) -> str: json_str = re.sub(r'(\d)_(\d)', r'\1\2', json_str) json_str = re.sub(r',(\s*[}\]])', r'\1', json_str) json_str = re.sub(r'\bNaN\b', '0', json_str) json_str = re.sub(r'\bInfinity\b', '999999', json_str) json_str = re.sub(r'-Infinity\b', '-999999', json_str) return json_str def _normalize_key(self, key: str) -> str: return re.sub(r'\.parametrizations\.(\w+)\.original', r'.\1', key) def _extract_json_object(self, text: str) -> Optional[str]: start = text.find('{') if start == -1: return None depth = 0 for i, char in enumerate(text[start:], start): if char == '{': depth += 1 elif char == '}': depth -= 1 if depth == 0: return text[start:i+1] return None def _convert_raw_to_actual(self, params: Dict[str, float]) -> Dict[str, float]: result = params.copy() for key, transform in PARAM_TRANSFORMS.items(): if key not in result: continue raw = result[key] transform_type = transform["type"] if transform_type == "none": actual = raw elif transform_type == "minmax": actual = minmax_transform(raw, transform["min"], transform["max"]) print(f" [MinMax] {key}: {raw:.4f} → {actual:.2f}") elif transform_type == "sigmoid": actual = sigmoid(raw) print(f" [Sigmoid] {key}: {raw:.4f} → {actual:.4f}") elif transform_type == "sigmoid_scale": actual = sigmoid(raw) * transform["scale"] print(f" [Sigmoid*{transform['scale']}] {key}: {raw:.4f} → {actual:.4f}") else: actual = raw result[key] = actual return result def _parse_output(self, output_text: str) -> Dict[str, float]: print(f" [Parse] Raw output 길이: {len(output_text)} 문자") try: text = re.sub(r'.*?', '', output_text, flags=re.DOTALL) code_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', text) if code_match: text = code_match.group(1) json_str = self._extract_json_object(text) if json_str: print(f" [Parse] JSON 발견 (길이: {len(json_str)})") json_str = self._preprocess_json(json_str) raw_params = json.loads(json_str) result = DEFAULT_PARAMETERS.copy() parsed_count = 0 for key, value in raw_params.items(): try: norm_key = self._normalize_key(key) float_val = float(value) if norm_key in DEFAULT_PARAMETERS: result[norm_key] = float_val parsed_count += 1 else: for default_key in DEFAULT_PARAMETERS.keys(): norm_parts = norm_key.split('.') default_parts = default_key.split('.') if len(norm_parts) >= 3 and len(default_parts) >= 3: if norm_parts[0] == default_parts[0] and norm_parts[-1] == default_parts[-1]: result[default_key] = float_val parsed_count += 1 break except (ValueError, TypeError): pass print(f" [Parse] ✅ {parsed_count}개 파라미터 매핑됨") return result except json.JSONDecodeError as e: print(f" [Parse] ❌ JSON 에러: {e}") except Exception as e: print(f" [Parse] ❌ 예외: {e}") print(f" [Parse] ⚠️ 기본값 폴백") return DEFAULT_PARAMETERS.copy() def predict(self, audio_path: str, text_prompt: str = "") -> Dict[str, float]: self.request_count += 1 timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"\n{'='*60}") print(f"[AIEffector V9] 🎵 요청 #{self.request_count} - {timestamp}") print(f"{'='*60}") print(f" 📂 오디오: {Path(audio_path).name}") print(f" 💬 원본: '{text_prompt}'") processed_prompt = self._preprocess_text(text_prompt) print(f" 🤖 모델: {'AI' if self.is_loaded() else '프리셋'}") if not self.is_loaded(): print(f"\n ⚠️ AI 모델 미로드") params = DEFAULT_PARAMETERS.copy() params.update(self._apply_preset(processed_prompt)) self._log_parameters(params) return self._convert_to_effect_chain_format(params) try: print(f"\n 📊 [Step 1] CLAP 특징 추출...") audio_features = self.audio_encoder.get_audio_features(audio_path) if not audio_features or all(f == 0 for f in audio_features): print(f" ⚠️ 실패, 프리셋 폴백") params = DEFAULT_PARAMETERS.copy() params.update(self._apply_preset(processed_prompt)) self._log_parameters(params) return self._convert_to_effect_chain_format(params) print(f" ✅ {len(audio_features)}차원") print(f"\n 🔤 [Step 2] 프롬프트 생성...") prompt = self._format_prompt(processed_prompt, audio_features) print(f"\n 🔢 [Step 3] 토큰화...") inputs = self.tokenizer(prompt, return_tensors="pt", truncation=False).to(self.device) print(f" 토큰 수: {inputs['input_ids'].shape[1]}") print(f"\n 🧠 [Step 4] LLM 추론...") import time start = time.time() with torch.no_grad(): outputs = self.model.generate(**inputs, max_new_tokens=500, do_sample=False, temperature=0.1, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id) print(f" 추론 시간: {time.time()-start:.2f}초") print(f"\n 📝 [Step 5] 디코딩...") gen_tokens = outputs[0][inputs['input_ids'].shape[1]:] output_text = self.tokenizer.decode(gen_tokens, skip_special_tokens=True).strip() print(f" 출력 (처음 500자):\n{output_text[:500]}") print(f"\n 🔧 [Step 6] 파싱...") raw_params = self._parse_output(output_text) print(f"\n 🔄 [Step 7] Raw → Actual 변환...") actual_params = self._convert_raw_to_actual(raw_params) print(f"\n 📐 [Step 8] 값 클램핑 (EQ만)...") eq_keys = [k for k in PARAM_RANGES.keys() if k.startswith('eq_')] for key in eq_keys: if key in actual_params: min_val, max_val = PARAM_RANGES[key] original = actual_params[key] clamped = max(min_val, min(max_val, original)) if abs(clamped - original) > 0.001: print(f" [Clamp] {key}: {original:.4f} → {clamped:.4f}") actual_params[key] = clamped print(f"\n 🎛️ [Step 9] 프리셋 적용 (Compressor/Reverb/Delay)...") preset = self._apply_preset(processed_prompt) for key in preset: actual_params[key] = preset[key] print(f" {key}: {preset[key]}") actual_params["final_wet_mix"] = max(0.3, min(0.7, actual_params.get("final_wet_mix", 0.5))) print(f" final_wet_mix: {actual_params['final_wet_mix']:.2f}") self._log_parameters(actual_params) print(f"\n ✅ 완료!") print(f"{'='*60}\n") return self._convert_to_effect_chain_format(actual_params) except Exception as e: print(f"\n ❌ 실패: {e}") import traceback traceback.print_exc() params = DEFAULT_PARAMETERS.copy() params.update(self._apply_preset(processed_prompt)) self._log_parameters(params) return self._convert_to_effect_chain_format(params) def _convert_to_effect_chain_format(self, params: Dict[str, float]) -> Dict[str, float]: result = {} for key, value in params.items(): new_key = key.replace('.Q', '.q') result[new_key] = value return result def _log_parameters(self, params: Dict[str, float]): print(f"\n 📋 최종 파라미터:") print(f" [EQ Peak 1] freq={params.get('eq_peak1.params.freq',0):.0f}Hz, gain={params.get('eq_peak1.params.gain',0):.2f}dB, Q={params.get('eq_peak1.params.Q',0):.2f}") print(f" [EQ Peak 2] freq={params.get('eq_peak2.params.freq',0):.0f}Hz, gain={params.get('eq_peak2.params.gain',0):.2f}dB, Q={params.get('eq_peak2.params.Q',0):.2f}") print(f" [Low Shelf] freq={params.get('eq_lowshelf.params.freq',0):.0f}Hz, gain={params.get('eq_lowshelf.params.gain',0):.2f}dB") print(f" [High Shelf] freq={params.get('eq_highshelf.params.freq',0):.0f}Hz, gain={params.get('eq_highshelf.params.gain',0):.2f}dB") print(f" [Compressor] threshold={params.get('compressor.threshold',-3):.1f}dB, ratio={params.get('compressor.ratio',2):.1f}") print(f" [Distortion] {params.get('distortion_amount',0):.4f}") print(f" [Delay] time={params.get('delay.delay_time',0):.3f}s, fb={params.get('delay.feedback',0):.2f}, mix={params.get('delay.mix',0):.2f}") print(f" [Reverb] room={params.get('reverb.room_size',0):.2f}, damp={params.get('reverb.damping',0):.2f}, wet={params.get('reverb.wet_level',0):.2f}, dry={params.get('reverb.dry_level',1):.2f}") print(f" [Wet Mix] {params.get('final_wet_mix',0):.2f}")