"""
AI Effector - DiffVox LLM 기반 이펙트 파라미터 예측
===================================================
V2: 학습과 동일한 CLAP 인코더 + 프롬프트 형식 사용
"""
import os
import json
import re
import torch
import numpy as np
from typing import Dict, List, Optional, Any
from pathlib import Path
from datetime import datetime
import warnings
warnings.filterwarnings("ignore")
# 기본 파라미터 (모델 로드 실패 시 사용)
DEFAULT_PARAMETERS = {
"eq_peak1.params.freq": 1000.0,
"eq_peak1.params.gain": 0.0,
"eq_peak1.params.Q": 1.0, # 대문자 Q (학습 데이터와 일치)
"eq_peak2.params.freq": 4000.0,
"eq_peak2.params.gain": 0.0,
"eq_peak2.params.Q": 1.0,
"eq_lowshelf.params.freq": 200.0,
"eq_lowshelf.params.gain": 0.0,
"eq_highshelf.params.freq": 8000.0,
"eq_highshelf.params.gain": 0.0,
"distortion_amount": 0.0,
"delay.delay_time": 0.02,
"delay.feedback": 0.3,
"delay.mix": 0.2,
"final_wet_mix": 0.5
}
# 스타일 프리셋 (AI 없이도 작동)
STYLE_PRESETS = {
"warm": {
"eq_lowshelf.params.gain": 3.0,
"eq_highshelf.params.gain": -1.0,
"distortion_amount": 0.05,
},
"bright": {
"eq_highshelf.params.gain": 4.0,
"eq_peak2.params.gain": 2.0,
"eq_lowshelf.params.gain": -1.0,
},
"vintage": {
"eq_lowshelf.params.gain": 2.0,
"eq_highshelf.params.gain": -2.0,
"distortion_amount": 0.1,
"delay.mix": 0.15,
},
"modern": {
"eq_peak1.params.gain": 2.0,
"eq_peak2.params.gain": 3.0,
"eq_highshelf.params.gain": 2.0,
},
"spacious": {
"delay.delay_time": 0.05,
"delay.feedback": 0.4,
"delay.mix": 0.35,
},
"dry": {
"final_wet_mix": 0.2,
"delay.mix": 0.0,
},
"saturated": {
"distortion_amount": 0.15,
"eq_lowshelf.params.gain": 1.0,
}
}
class CLAPAudioEncoder:
"""
CLAP 기반 오디오 인코더 (학습 시와 동일)
laion/larger_clap_music 모델 사용, 512→64 pooling
"""
def __init__(self, output_dim: int = 64, model_name: str = "laion/larger_clap_music"):
self.output_dim = output_dim
self.model_name = model_name
self.target_sr = 48000 # CLAP은 48kHz 사용
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = None
self.processor = None
self._load_model()
def _load_model(self):
"""CLAP 모델 로드"""
try:
from transformers import ClapModel, ClapProcessor
print(f"[CLAPEncoder] CLAP 모델 로딩 중: {self.model_name}")
self.processor = ClapProcessor.from_pretrained(self.model_name)
self.model = ClapModel.from_pretrained(self.model_name)
self.model = self.model.to(self.device)
self.model.eval()
print(f"[CLAPEncoder] ✅ CLAP 모델 로드 완료 (512→{self.output_dim} pooling)")
except ImportError:
print("[CLAPEncoder] ❌ transformers 미설치")
print(" pip install transformers")
except Exception as e:
print(f"[CLAPEncoder] ❌ 모델 로드 실패: {e}")
import traceback
traceback.print_exc()
def get_audio_features(self, audio_path: str) -> List[float]:
"""
오디오 파일에서 64차원 특징 벡터 추출 (학습과 동일한 방식)
"""
if self.model is None:
print("[CLAPEncoder] 모델이 로드되지 않음, 빈 특징 반환")
return [0.0] * self.output_dim
try:
import librosa
# 1. 오디오 로드 (48kHz로 리샘플링 - CLAP 요구사항)
audio, sr = librosa.load(audio_path, sr=self.target_sr, mono=True)
# 2. CLAP 입력 준비
inputs = self.processor(
audios=audio,
sampling_rate=self.target_sr,
return_tensors="pt",
padding=True
).to(self.device)
# 3. 특징 추출
with torch.no_grad():
outputs = self.model.get_audio_features(**inputs)
# [1, 512] 형태의 텐서
features_512 = outputs[0].cpu().numpy()
# 4. 512 → 64 차원 축소 (평균 풀링, 학습과 동일)
features_64 = self._reduce_dimension(features_512)
return features_64.tolist()
except Exception as e:
print(f"[CLAPEncoder] 특징 추출 실패: {e}")
import traceback
traceback.print_exc()
return [0.0] * self.output_dim
def _reduce_dimension(self, features: np.ndarray) -> np.ndarray:
"""512차원 → 64차원 평균 풀링 (학습과 동일한 방식)"""
current_dim = len(features)
if current_dim == self.output_dim:
return features
# 평균 풀링: 8개씩 묶어서 평균 (512 / 64 = 8)
pool_size = current_dim // self.output_dim
remainder = current_dim % self.output_dim
pooled = []
idx = 0
for i in range(self.output_dim):
size = pool_size + (1 if i < remainder else 0)
pooled.append(np.mean(features[idx:idx+size]))
idx += size
return np.array(pooled)
def is_loaded(self) -> bool:
return self.model is not None
class AIEffector:
"""AI 기반 이펙터 파라미터 예측 (V2: 학습과 동일한 설정)"""
def __init__(
self,
model_repo_id: str = "heybaeheef/KU_SW_Academy",
model_subfolder: str = "checkpoints",
base_model_name: str = "Qwen/Qwen3-8B",
audio_feature_dim: int = 64,
use_huggingface: bool = True
):
self.model_repo_id = model_repo_id
self.model_subfolder = model_subfolder
self.base_model_name = base_model_name
self.audio_feature_dim = audio_feature_dim
self.use_huggingface = use_huggingface
self.model = None
self.tokenizer = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# ★★★ 핵심 수정: CLAP 오디오 인코더 사용 (학습과 동일) ★★★
print(f"[AIEffector] CLAP 오디오 인코더 초기화 중...")
self.audio_encoder = CLAPAudioEncoder(output_dim=audio_feature_dim)
# 요청 카운터
self.request_count = 0
# 모델 로드 시도
self._load_model()
def _load_model(self):
"""모델 로드"""
try:
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from peft import PeftModel
print(f"[AIEffector] 베이스 모델 로딩 중: {self.base_model_name}")
# 4bit 양자화 설정
if torch.cuda.is_available():
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True
)
base_model = AutoModelForCausalLM.from_pretrained(
self.base_model_name,
quantization_config=bnb_config,
device_map="auto",
trust_remote_code=True
)
else:
base_model = AutoModelForCausalLM.from_pretrained(
self.base_model_name,
torch_dtype=torch.float32,
device_map="auto",
trust_remote_code=True
)
self.tokenizer = AutoTokenizer.from_pretrained(
self.base_model_name,
trust_remote_code=True
)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
print(f"[AIEffector] LoRA 어댑터 로딩 중...")
if self.use_huggingface:
print(f"[AIEffector] HuggingFace에서 LoRA 로딩: {self.model_repo_id}/{self.model_subfolder}")
self.model = PeftModel.from_pretrained(
base_model,
self.model_repo_id,
subfolder=self.model_subfolder,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
)
else:
local_path = os.path.join(self.model_repo_id, self.model_subfolder)
print(f"[AIEffector] 로컬에서 LoRA 어댑터 로딩: {local_path}")
self.model = PeftModel.from_pretrained(
base_model,
local_path,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
)
self.model.eval()
print(f"[AIEffector] ✅ 모델 로드 성공!")
except Exception as e:
print(f"[AIEffector] ❌ 모델 로드 실패: {e}")
import traceback
traceback.print_exc()
print(f"[AIEffector] 폴백 모드로 전환 (프리셋 기반)")
self.model = None
self.tokenizer = None
def is_loaded(self) -> bool:
"""모델 로드 여부"""
return self.model is not None
def _apply_preset(self, prompt: str) -> Dict[str, float]:
"""프롬프트에서 프리셋 매칭"""
params = DEFAULT_PARAMETERS.copy()
prompt_lower = prompt.lower()
matched_presets = []
for style_name, style_params in STYLE_PRESETS.items():
if style_name in prompt_lower:
params.update(style_params)
matched_presets.append(style_name)
if matched_presets:
print(f" [Preset] 매칭된 프리셋: {matched_presets}")
return params
def _format_prompt(self, text_prompt: str, audio_features: List[float]) -> str:
"""
★★★ 핵심 수정: 학습 시와 동일한 프롬프트 형식 사용 ★★★
train_model.py의 243-246줄과 동일한 형식
"""
audio_state_str = json.dumps(audio_features)
# 학습 시와 완전히 동일한 형식!
prompt = f"""Task: Convert text to audio parameters.
Audio: {audio_state_str}
Text: {text_prompt}
Parameters:"""
return prompt
def _parse_output(self, output_text: str) -> Dict[str, float]:
"""LLM 출력에서 파라미터 추출 (향상된 버전)"""
print(f" [Parse] Raw output 길이: {len(output_text)} 문자")
try:
text = output_text
# 1. ... 태그 제거 (Qwen3 thinking mode)
text = re.sub(r'.*?', '', text, flags=re.DOTALL)
# 2. 마크다운 코드블록 추출
code_block_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', text)
if code_block_match:
text = code_block_match.group(1)
print(f" [Parse] 코드블록에서 JSON 추출")
# 3. JSON 객체 찾기 (중첩 브레이스 지원)
json_str = self._extract_json_object(text)
if json_str:
print(f" [Parse] 추출된 JSON (처음 200자):\n{json_str[:200]}...")
# 4. JSON 전처리
json_str = self._preprocess_json(json_str)
# 5. 파싱 시도
params = json.loads(json_str)
# 6. 결과 검증 및 매핑
result = DEFAULT_PARAMETERS.copy()
for key, value in params.items():
# 키 정규화 (대소문자 처리)
normalized_key = self._normalize_key(key)
if normalized_key in result:
try:
result[normalized_key] = float(value)
except (ValueError, TypeError):
pass
print(f" [Parse] ✅ 파싱 성공! {len(params)}개 파라미터 추출")
return result
else:
print(f" [Parse] ❌ JSON 객체를 찾을 수 없음")
except json.JSONDecodeError as e:
print(f" [Parse] ❌ JSON 파싱 에러: {e}")
if json_str:
print(f" [Parse] 문제 위치 근처: ...{json_str[max(0, e.pos-20):e.pos+20]}...")
except Exception as e:
print(f" [Parse] ❌ 예외 발생: {e}")
print(f" [Parse] ⚠️ 기본값으로 폴백")
return DEFAULT_PARAMETERS.copy()
def _normalize_key(self, key: str) -> str:
"""파라미터 키 정규화 (대소문자 처리)"""
# Q/q 정규화
if key.endswith('.q'):
return key[:-2] + '.Q'
return key
def _extract_json_object(self, text: str) -> Optional[str]:
"""텍스트에서 JSON 객체 추출 (중첩 브레이스 지원)"""
start = text.find('{')
if start == -1:
return None
depth = 0
for i, char in enumerate(text[start:], start):
if char == '{':
depth += 1
elif char == '}':
depth -= 1
if depth == 0:
return text[start:i+1]
return None
def _preprocess_json(self, json_str: str) -> str:
"""JSON 문자열 전처리"""
# Trailing comma 제거
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str)
# NaN, Infinity 처리
json_str = re.sub(r'\bNaN\b', '0', json_str)
json_str = re.sub(r'\bInfinity\b', '999999', json_str)
json_str = re.sub(r'-Infinity\b', '-999999', json_str)
return json_str
def predict(self, audio_path: str, text_prompt: str = "") -> Dict[str, float]:
"""파라미터 예측"""
self.request_count += 1
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n{'='*60}")
print(f"[AIEffector] 🎵 요청 #{self.request_count} - {timestamp}")
print(f"{'='*60}")
print(f" 📂 오디오 파일: {Path(audio_path).name}")
print(f" 💬 텍스트 프롬프트: '{text_prompt}'")
print(f" 🤖 모델 상태: {'AI 모드' if self.is_loaded() else '프리셋 모드'}")
print(f" 🎧 인코더: CLAP (학습과 동일)")
# 모델이 없으면 프리셋 사용
if not self.is_loaded():
print(f"\n ⚠️ AI 모델 미로드 - 프리셋 모드 사용")
params = self._apply_preset(text_prompt)
self._log_parameters(params)
return self._convert_to_effect_chain_format(params)
try:
# 1. CLAP 오디오 특징 추출 (학습과 동일)
print(f"\n 📊 [Step 1] CLAP 오디오 특징 추출 중...")
audio_features = self.audio_encoder.get_audio_features(audio_path)
if not audio_features or all(f == 0 for f in audio_features):
print(f" ⚠️ 특징 추출 실패, 프리셋으로 폴백")
params = self._apply_preset(text_prompt)
self._log_parameters(params)
return self._convert_to_effect_chain_format(params)
print(f" ✅ {len(audio_features)}차원 특징 추출 완료")
print(f" - 특징 벡터 (처음 8개): {[round(v, 3) for v in audio_features[:8]]}")
# 2. LLM 프롬프트 생성 (학습과 동일한 형식)
print(f"\n 🔤 [Step 2] LLM 프롬프트 생성 중 (학습 형식)...")
prompt = self._format_prompt(text_prompt, audio_features)
print(f" - 프롬프트 길이: {len(prompt)} 문자")
# 3. 토큰화
print(f"\n 🔢 [Step 3] 토큰화 중...")
inputs = self.tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=1500 # 학습 시와 동일
).to(self.device)
print(f" - 입력 토큰 수: {inputs['input_ids'].shape[1]}")
# 4. LLM 생성
print(f"\n 🧠 [Step 4] LLM 추론 중...")
import time
start_time = time.time()
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=500,
do_sample=False,
temperature=0.1,
pad_token_id=self.tokenizer.pad_token_id,
eos_token_id=self.tokenizer.eos_token_id,
)
inference_time = time.time() - start_time
print(f" - 추론 시간: {inference_time:.2f}초")
# 5. 디코딩 (생성된 부분만)
print(f"\n 📝 [Step 5] 출력 디코딩 중...")
generated_tokens = outputs[0][inputs['input_ids'].shape[1]:]
output_text = self.tokenizer.decode(generated_tokens, skip_special_tokens=True).strip()
print(f" - LLM 출력 (처음 300자):\n{output_text[:300]}")
# 6. 파싱
print(f"\n 🔧 [Step 6] 파라미터 파싱 중...")
params = self._parse_output(output_text)
# 7. 결과 로깅
self._log_parameters(params)
print(f"\n ✅ AI 예측 완료!")
print(f"{'='*60}\n")
# effect_chain.py 형식으로 변환
return self._convert_to_effect_chain_format(params)
except Exception as e:
print(f"\n ❌ 예측 실패: {e}")
import traceback
traceback.print_exc()
print(f" ⚠️ 프리셋으로 폴백...")
params = self._apply_preset(text_prompt)
self._log_parameters(params)
return self._convert_to_effect_chain_format(params)
def _convert_to_effect_chain_format(self, params: Dict[str, float]) -> Dict[str, float]:
"""
학습 데이터 형식 → effect_chain.py 형식으로 변환
주로 Q/q 대소문자 처리
"""
result = {}
for key, value in params.items():
# Q → q 변환 (effect_chain.py는 소문자 q 사용)
new_key = key.replace('.Q', '.q')
result[new_key] = value
return result
def _log_parameters(self, params: Dict[str, float]):
"""예측된 파라미터 로깅"""
print(f"\n 📋 예측된 파라미터:")
print(f" [EQ Peak 1]")
print(f" - Freq: {params.get('eq_peak1.params.freq', 0):.1f} Hz")
print(f" - Gain: {params.get('eq_peak1.params.gain', 0):.2f} dB")
print(f" - Q: {params.get('eq_peak1.params.Q', params.get('eq_peak1.params.q', 0)):.2f}")
print(f" [EQ Peak 2]")
print(f" - Freq: {params.get('eq_peak2.params.freq', 0):.1f} Hz")
print(f" - Gain: {params.get('eq_peak2.params.gain', 0):.2f} dB")
print(f" - Q: {params.get('eq_peak2.params.Q', params.get('eq_peak2.params.q', 0)):.2f}")
print(f" [Low Shelf]")
print(f" - Freq: {params.get('eq_lowshelf.params.freq', 0):.1f} Hz")
print(f" - Gain: {params.get('eq_lowshelf.params.gain', 0):.2f} dB")
print(f" [High Shelf]")
print(f" - Freq: {params.get('eq_highshelf.params.freq', 0):.1f} Hz")
print(f" - Gain: {params.get('eq_highshelf.params.gain', 0):.2f} dB")
print(f" [Effects]")
print(f" - Distortion: {params.get('distortion_amount', 0):.3f}")
print(f" - Delay Time: {params.get('delay.delay_time', 0):.3f}s")
print(f" - Delay Feedback: {params.get('delay.feedback', 0):.2f}")
print(f" - Delay Mix: {params.get('delay.mix', 0):.2f}")
print(f" - Final Wet Mix: {params.get('final_wet_mix', 0):.2f}")