KU_SW_Academy / models /ai_effector.py
heybaeheef's picture
Upload ai_effector.py
17a7958 verified
"""
AI Effector - DiffVox LLM 기반 μ΄νŽ™νŠΈ νŒŒλΌλ―Έν„° 예츑
===================================================
V9: Compressor threshold λ²”μœ„ μˆ˜μ • (0 ~ -5dB)
"""
import os
import json
import re
import math
import torch
import numpy as np
from typing import Dict, List, Optional, Any, Tuple
from pathlib import Path
from datetime import datetime
import warnings
warnings.filterwarnings("ignore")
def sigmoid(x: float) -> float:
try:
return 1 / (1 + math.exp(-x))
except OverflowError:
return 0.0 if x < 0 else 1.0
def minmax_transform(raw: float, min_val: float, max_val: float) -> float:
return sigmoid(raw) * (max_val - min_val) + min_val
PARAM_TRANSFORMS = {
"eq_peak1.params.freq": {"type": "minmax", "min": 33.0, "max": 17500.0},
"eq_peak1.params.Q": {"type": "minmax", "min": 0.2, "max": 20.0},
"eq_peak1.params.gain": {"type": "none"},
"eq_peak2.params.freq": {"type": "minmax", "min": 33.0, "max": 17500.0},
"eq_peak2.params.Q": {"type": "minmax", "min": 0.2, "max": 20.0},
"eq_peak2.params.gain": {"type": "none"},
"eq_lowshelf.params.freq": {"type": "minmax", "min": 30.0, "max": 200.0},
"eq_lowshelf.params.gain": {"type": "none"},
"eq_highshelf.params.freq": {"type": "minmax", "min": 2500.0, "max": 16000.0},
"eq_highshelf.params.gain": {"type": "none"},
"delay.delay_time": {"type": "none"},
"delay.feedback": {"type": "sigmoid"},
"delay.mix": {"type": "sigmoid"},
"distortion_amount": {"type": "sigmoid_scale", "scale": 0.1},
"final_wet_mix": {"type": "sigmoid"},
}
DEFAULT_PARAMETERS = {
"eq_peak1.params.freq": 1000.0,
"eq_peak1.params.gain": 0.0,
"eq_peak1.params.Q": 1.0,
"eq_peak2.params.freq": 4000.0,
"eq_peak2.params.gain": 0.0,
"eq_peak2.params.Q": 1.0,
"eq_lowshelf.params.freq": 115.0,
"eq_lowshelf.params.gain": 0.0,
"eq_highshelf.params.freq": 8000.0,
"eq_highshelf.params.gain": 0.0,
# V9: Compressor threshold κΈ°λ³Έκ°’ -3dB
"compressor.threshold": -3.0,
"compressor.ratio": 2.0,
"distortion_amount": 0.0,
"delay.delay_time": 0.02,
"delay.feedback": 0.15,
"delay.mix": 0.1,
"reverb.room_size": 0.3,
"reverb.damping": 0.5,
"reverb.wet_level": 0.0,
"reverb.dry_level": 1.0,
"final_wet_mix": 0.5
}
# V9: Compressor threshold λ²”μœ„ 0 ~ -5dB
PARAM_RANGES = {
"eq_peak1.params.freq": (33.0, 17500.0),
"eq_peak1.params.gain": (-12.0, 12.0),
"eq_peak1.params.Q": (0.2, 20.0),
"eq_peak2.params.freq": (33.0, 17500.0),
"eq_peak2.params.gain": (-12.0, 12.0),
"eq_peak2.params.Q": (0.2, 20.0),
"eq_lowshelf.params.freq": (30.0, 200.0),
"eq_lowshelf.params.gain": (-12.0, 12.0),
"eq_highshelf.params.freq": (2500.0, 16000.0),
"eq_highshelf.params.gain": (-12.0, 12.0),
# V9: 0 ~ -5dB (κ°€λ²Όμš΄ μ••μΆ•)
"compressor.threshold": (-5.0, 0.0),
"compressor.ratio": (1.5, 4.0),
"distortion_amount": (0.0, 0.05),
"delay.delay_time": (0.01, 0.3),
"delay.feedback": (0.0, 0.25),
"delay.mix": (0.0, 0.2),
"reverb.room_size": (0.0, 0.6),
"reverb.damping": (0.0, 1.0),
"reverb.wet_level": (0.0, 0.3),
"reverb.dry_level": (0.7, 1.0),
"final_wet_mix": (0.3, 0.7),
}
SYNONYM_MAP = {
"calm": "warm soft", "relaxed": "warm soft", "chill": "warm soft",
"smooth": "warm", "mellow": "warm soft", "breezy": "bright spacious",
"airy": "bright spacious", "light": "bright", "crisp": "bright",
"clean": "bright", "dreamy": "warm spacious", "ethereal": "bright spacious",
"atmospheric": "spacious", "ambient": "spacious warm",
"aggressive": "saturated bright", "powerful": "saturated",
"punchy": "saturated bright", "hard": "saturated",
"gritty": "saturated dark", "soft": "warm", "harsh": "bright saturated",
"muddy": "dark", "thin": "bright", "thick": "warm dark",
"full": "warm", "reverb": "spacious", "echo": "spacious", "wet": "spacious",
}
# V9: Compressor threshold 0 ~ -5dB λ²”μœ„
STYLE_PRESETS = {
"warm": {
"compressor.threshold": -3.0,
"compressor.ratio": 2.0,
"delay.delay_time": 0.02,
"delay.feedback": 0.12,
"delay.mix": 0.08,
"reverb.room_size": 0.25,
"reverb.wet_level": 0.1,
"reverb.dry_level": 0.9,
},
"bright": {
"compressor.threshold": -2.0,
"compressor.ratio": 2.0,
"delay.delay_time": 0.02,
"delay.feedback": 0.1,
"delay.mix": 0.06,
"reverb.room_size": 0.2,
"reverb.wet_level": 0.08,
"reverb.dry_level": 0.92,
},
"spacious": {
"compressor.threshold": -4.0,
"compressor.ratio": 1.8,
"delay.delay_time": 0.06,
"delay.feedback": 0.2,
"delay.mix": 0.15,
"reverb.room_size": 0.45,
"reverb.wet_level": 0.2,
"reverb.dry_level": 0.8,
},
"dark": {
"compressor.threshold": -4.0,
"compressor.ratio": 2.0,
"delay.delay_time": 0.03,
"delay.feedback": 0.15,
"delay.mix": 0.1,
"reverb.room_size": 0.35,
"reverb.wet_level": 0.15,
"reverb.dry_level": 0.85,
},
"saturated": {
"compressor.threshold": -2.0,
"compressor.ratio": 3.0,
"delay.delay_time": 0.02,
"delay.feedback": 0.08,
"delay.mix": 0.05,
"reverb.room_size": 0.15,
"reverb.wet_level": 0.06,
"reverb.dry_level": 0.94,
},
"soft": {
"compressor.threshold": -5.0,
"compressor.ratio": 1.5,
"delay.delay_time": 0.025,
"delay.feedback": 0.15,
"delay.mix": 0.1,
"reverb.room_size": 0.3,
"reverb.wet_level": 0.12,
"reverb.dry_level": 0.88,
},
}
class CLAPAudioEncoder:
def __init__(self, output_dim: int = 64, model_name: str = "laion/larger_clap_music"):
self.output_dim = output_dim
self.model_name = model_name
self.target_sr = 48000
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model = None
self.processor = None
self._load_model()
def _load_model(self):
try:
from transformers import ClapModel, ClapProcessor
print(f"[CLAPEncoder] CLAP λͺ¨λΈ λ‘œλ”© 쀑: {self.model_name}")
self.processor = ClapProcessor.from_pretrained(self.model_name)
self.model = ClapModel.from_pretrained(self.model_name)
self.model = self.model.to(self.device)
self.model.eval()
print(f"[CLAPEncoder] βœ… CLAP λͺ¨λΈ λ‘œλ“œ μ™„λ£Œ")
except Exception as e:
print(f"[CLAPEncoder] ❌ λͺ¨λΈ λ‘œλ“œ μ‹€νŒ¨: {e}")
def get_audio_features(self, audio_path: str) -> List[float]:
if self.model is None:
return [0.0] * self.output_dim
try:
import librosa
audio, sr = librosa.load(audio_path, sr=self.target_sr, mono=True)
inputs = self.processor(audios=audio, sampling_rate=self.target_sr, return_tensors="pt", padding=True).to(self.device)
with torch.no_grad():
outputs = self.model.get_audio_features(**inputs)
features_512 = outputs[0].cpu().numpy()
return self._reduce_dimension(features_512).tolist()
except Exception as e:
print(f"[CLAPEncoder] νŠΉμ§• μΆ”μΆœ μ‹€νŒ¨: {e}")
return [0.0] * self.output_dim
def _reduce_dimension(self, features: np.ndarray) -> np.ndarray:
current_dim = len(features)
if current_dim == self.output_dim:
return features
pool_size = current_dim // self.output_dim
remainder = current_dim % self.output_dim
pooled = []
idx = 0
for i in range(self.output_dim):
size = pool_size + (1 if i < remainder else 0)
pooled.append(np.mean(features[idx:idx+size]))
idx += size
return np.array(pooled)
def is_loaded(self) -> bool:
return self.model is not None
class AIEffector:
def __init__(self, model_repo_id: str = "heybaeheef/KU_SW_Academy", model_subfolder: str = "checkpoints", base_model_name: str = "Qwen/Qwen3-8B", audio_feature_dim: int = 64, use_huggingface: bool = True):
self.model_repo_id = model_repo_id
self.model_subfolder = model_subfolder
self.base_model_name = base_model_name
self.audio_feature_dim = audio_feature_dim
self.use_huggingface = use_huggingface
self.model = None
self.tokenizer = None
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[AIEffector V9] CLAP 인코더 μ΄ˆκΈ°ν™”...")
self.audio_encoder = CLAPAudioEncoder(output_dim=audio_feature_dim)
self.request_count = 0
self._load_model()
def _load_model(self):
try:
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from peft import PeftModel
print(f"[AIEffector] 베이슀 λͺ¨λΈ λ‘œλ”©: {self.base_model_name}")
if torch.cuda.is_available():
bnb_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.float16, bnb_4bit_use_double_quant=True)
base_model = AutoModelForCausalLM.from_pretrained(self.base_model_name, quantization_config=bnb_config, device_map="auto", trust_remote_code=True)
else:
base_model = AutoModelForCausalLM.from_pretrained(self.base_model_name, torch_dtype=torch.float32, device_map="auto", trust_remote_code=True)
self.tokenizer = AutoTokenizer.from_pretrained(self.base_model_name, trust_remote_code=True)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
print(f"[AIEffector] LoRA μ–΄λŒ‘ν„° λ‘œλ”©...")
if self.use_huggingface:
self.model = PeftModel.from_pretrained(base_model, self.model_repo_id, subfolder=self.model_subfolder, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32)
else:
local_path = os.path.join(self.model_repo_id, self.model_subfolder)
self.model = PeftModel.from_pretrained(base_model, local_path, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32)
self.model.eval()
print(f"[AIEffector] βœ… λͺ¨λΈ λ‘œλ“œ 성곡!")
except Exception as e:
print(f"[AIEffector] ❌ λͺ¨λΈ λ‘œλ“œ μ‹€νŒ¨: {e}")
import traceback
traceback.print_exc()
self.model = None
self.tokenizer = None
def is_loaded(self) -> bool:
return self.model is not None
def _preprocess_text(self, text: str) -> str:
text_lower = text.lower()
for synonym, replacement in SYNONYM_MAP.items():
if synonym in text_lower:
text_lower = text_lower.replace(synonym, replacement)
return text_lower
def _apply_preset(self, prompt: str) -> Dict[str, float]:
params = {}
prompt_lower = prompt.lower()
matched = []
for style_name, style_params in STYLE_PRESETS.items():
if style_name in prompt_lower:
params.update(style_params)
matched.append(style_name)
if matched:
print(f" [Preset] λ§€μΉ­: {matched}")
else:
params.update(STYLE_PRESETS["warm"])
print(f" [Preset] κΈ°λ³Έκ°’ 적용: warm")
return params
def _format_prompt(self, text_prompt: str, audio_features: List[float]) -> str:
audio_state_str = json.dumps(audio_features)
return f"""Task: Convert text to audio parameters.
Audio: {audio_state_str}
Text: {text_prompt}
Parameters:"""
def _preprocess_json(self, json_str: str) -> str:
json_str = re.sub(r'(\d)_(\d)', r'\1\2', json_str)
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str)
json_str = re.sub(r'\bNaN\b', '0', json_str)
json_str = re.sub(r'\bInfinity\b', '999999', json_str)
json_str = re.sub(r'-Infinity\b', '-999999', json_str)
return json_str
def _normalize_key(self, key: str) -> str:
return re.sub(r'\.parametrizations\.(\w+)\.original', r'.\1', key)
def _extract_json_object(self, text: str) -> Optional[str]:
start = text.find('{')
if start == -1:
return None
depth = 0
for i, char in enumerate(text[start:], start):
if char == '{':
depth += 1
elif char == '}':
depth -= 1
if depth == 0:
return text[start:i+1]
return None
def _convert_raw_to_actual(self, params: Dict[str, float]) -> Dict[str, float]:
result = params.copy()
for key, transform in PARAM_TRANSFORMS.items():
if key not in result:
continue
raw = result[key]
transform_type = transform["type"]
if transform_type == "none":
actual = raw
elif transform_type == "minmax":
actual = minmax_transform(raw, transform["min"], transform["max"])
print(f" [MinMax] {key}: {raw:.4f} β†’ {actual:.2f}")
elif transform_type == "sigmoid":
actual = sigmoid(raw)
print(f" [Sigmoid] {key}: {raw:.4f} β†’ {actual:.4f}")
elif transform_type == "sigmoid_scale":
actual = sigmoid(raw) * transform["scale"]
print(f" [Sigmoid*{transform['scale']}] {key}: {raw:.4f} β†’ {actual:.4f}")
else:
actual = raw
result[key] = actual
return result
def _parse_output(self, output_text: str) -> Dict[str, float]:
print(f" [Parse] Raw output 길이: {len(output_text)} 문자")
try:
text = re.sub(r'<think>.*?</think>', '', output_text, flags=re.DOTALL)
code_match = re.search(r'```(?:json)?\s*([\s\S]*?)```', text)
if code_match:
text = code_match.group(1)
json_str = self._extract_json_object(text)
if json_str:
print(f" [Parse] JSON 발견 (길이: {len(json_str)})")
json_str = self._preprocess_json(json_str)
raw_params = json.loads(json_str)
result = DEFAULT_PARAMETERS.copy()
parsed_count = 0
for key, value in raw_params.items():
try:
norm_key = self._normalize_key(key)
float_val = float(value)
if norm_key in DEFAULT_PARAMETERS:
result[norm_key] = float_val
parsed_count += 1
else:
for default_key in DEFAULT_PARAMETERS.keys():
norm_parts = norm_key.split('.')
default_parts = default_key.split('.')
if len(norm_parts) >= 3 and len(default_parts) >= 3:
if norm_parts[0] == default_parts[0] and norm_parts[-1] == default_parts[-1]:
result[default_key] = float_val
parsed_count += 1
break
except (ValueError, TypeError):
pass
print(f" [Parse] βœ… {parsed_count}개 νŒŒλΌλ―Έν„° 맀핑됨")
return result
except json.JSONDecodeError as e:
print(f" [Parse] ❌ JSON μ—λŸ¬: {e}")
except Exception as e:
print(f" [Parse] ❌ μ˜ˆμ™Έ: {e}")
print(f" [Parse] ⚠️ κΈ°λ³Έκ°’ 폴백")
return DEFAULT_PARAMETERS.copy()
def predict(self, audio_path: str, text_prompt: str = "") -> Dict[str, float]:
self.request_count += 1
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n{'='*60}")
print(f"[AIEffector V9] 🎡 μš”μ²­ #{self.request_count} - {timestamp}")
print(f"{'='*60}")
print(f" πŸ“‚ μ˜€λ””μ˜€: {Path(audio_path).name}")
print(f" πŸ’¬ 원본: '{text_prompt}'")
processed_prompt = self._preprocess_text(text_prompt)
print(f" πŸ€– λͺ¨λΈ: {'AI' if self.is_loaded() else '프리셋'}")
if not self.is_loaded():
print(f"\n ⚠️ AI λͺ¨λΈ λ―Έλ‘œλ“œ")
params = DEFAULT_PARAMETERS.copy()
params.update(self._apply_preset(processed_prompt))
self._log_parameters(params)
return self._convert_to_effect_chain_format(params)
try:
print(f"\n πŸ“Š [Step 1] CLAP νŠΉμ§• μΆ”μΆœ...")
audio_features = self.audio_encoder.get_audio_features(audio_path)
if not audio_features or all(f == 0 for f in audio_features):
print(f" ⚠️ μ‹€νŒ¨, 프리셋 폴백")
params = DEFAULT_PARAMETERS.copy()
params.update(self._apply_preset(processed_prompt))
self._log_parameters(params)
return self._convert_to_effect_chain_format(params)
print(f" βœ… {len(audio_features)}차원")
print(f"\n πŸ”€ [Step 2] ν”„λ‘¬ν”„νŠΈ 생성...")
prompt = self._format_prompt(processed_prompt, audio_features)
print(f"\n πŸ”’ [Step 3] 토큰화...")
inputs = self.tokenizer(prompt, return_tensors="pt", truncation=False).to(self.device)
print(f" 토큰 수: {inputs['input_ids'].shape[1]}")
print(f"\n 🧠 [Step 4] LLM μΆ”λ‘ ...")
import time
start = time.time()
with torch.no_grad():
outputs = self.model.generate(**inputs, max_new_tokens=500, do_sample=False, temperature=0.1, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id)
print(f" μΆ”λ‘  μ‹œκ°„: {time.time()-start:.2f}초")
print(f"\n πŸ“ [Step 5] λ””μ½”λ”©...")
gen_tokens = outputs[0][inputs['input_ids'].shape[1]:]
output_text = self.tokenizer.decode(gen_tokens, skip_special_tokens=True).strip()
print(f" 좜λ ₯ (처음 500자):\n{output_text[:500]}")
print(f"\n πŸ”§ [Step 6] νŒŒμ‹±...")
raw_params = self._parse_output(output_text)
print(f"\n πŸ”„ [Step 7] Raw β†’ Actual λ³€ν™˜...")
actual_params = self._convert_raw_to_actual(raw_params)
print(f"\n πŸ“ [Step 8] κ°’ ν΄λž¨ν•‘ (EQ만)...")
eq_keys = [k for k in PARAM_RANGES.keys() if k.startswith('eq_')]
for key in eq_keys:
if key in actual_params:
min_val, max_val = PARAM_RANGES[key]
original = actual_params[key]
clamped = max(min_val, min(max_val, original))
if abs(clamped - original) > 0.001:
print(f" [Clamp] {key}: {original:.4f} β†’ {clamped:.4f}")
actual_params[key] = clamped
print(f"\n πŸŽ›οΈ [Step 9] 프리셋 적용 (Compressor/Reverb/Delay)...")
preset = self._apply_preset(processed_prompt)
for key in preset:
actual_params[key] = preset[key]
print(f" {key}: {preset[key]}")
actual_params["final_wet_mix"] = max(0.3, min(0.7, actual_params.get("final_wet_mix", 0.5)))
print(f" final_wet_mix: {actual_params['final_wet_mix']:.2f}")
self._log_parameters(actual_params)
print(f"\n βœ… μ™„λ£Œ!")
print(f"{'='*60}\n")
return self._convert_to_effect_chain_format(actual_params)
except Exception as e:
print(f"\n ❌ μ‹€νŒ¨: {e}")
import traceback
traceback.print_exc()
params = DEFAULT_PARAMETERS.copy()
params.update(self._apply_preset(processed_prompt))
self._log_parameters(params)
return self._convert_to_effect_chain_format(params)
def _convert_to_effect_chain_format(self, params: Dict[str, float]) -> Dict[str, float]:
result = {}
for key, value in params.items():
new_key = key.replace('.Q', '.q')
result[new_key] = value
return result
def _log_parameters(self, params: Dict[str, float]):
print(f"\n πŸ“‹ μ΅œμ’… νŒŒλΌλ―Έν„°:")
print(f" [EQ Peak 1] freq={params.get('eq_peak1.params.freq',0):.0f}Hz, gain={params.get('eq_peak1.params.gain',0):.2f}dB, Q={params.get('eq_peak1.params.Q',0):.2f}")
print(f" [EQ Peak 2] freq={params.get('eq_peak2.params.freq',0):.0f}Hz, gain={params.get('eq_peak2.params.gain',0):.2f}dB, Q={params.get('eq_peak2.params.Q',0):.2f}")
print(f" [Low Shelf] freq={params.get('eq_lowshelf.params.freq',0):.0f}Hz, gain={params.get('eq_lowshelf.params.gain',0):.2f}dB")
print(f" [High Shelf] freq={params.get('eq_highshelf.params.freq',0):.0f}Hz, gain={params.get('eq_highshelf.params.gain',0):.2f}dB")
print(f" [Compressor] threshold={params.get('compressor.threshold',-3):.1f}dB, ratio={params.get('compressor.ratio',2):.1f}")
print(f" [Distortion] {params.get('distortion_amount',0):.4f}")
print(f" [Delay] time={params.get('delay.delay_time',0):.3f}s, fb={params.get('delay.feedback',0):.2f}, mix={params.get('delay.mix',0):.2f}")
print(f" [Reverb] room={params.get('reverb.room_size',0):.2f}, damp={params.get('reverb.damping',0):.2f}, wet={params.get('reverb.wet_level',0):.2f}, dry={params.get('reverb.dry_level',1):.2f}")
print(f" [Wet Mix] {params.get('final_wet_mix',0):.2f}")