#!/usr/bin/env python3 """KAIdol A/B Test Arena - GPU Version with Real Model Inference""" import gradio as gr import random import json import uuid import re import gc import os from datetime import datetime from functools import lru_cache # GPU 추론 관련 (선택적 임포트) TORCH_AVAILABLE = False IMPORT_ERROR = None torch = None try: import torch as _torch torch = _torch from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig from peft import PeftModel TORCH_AVAILABLE = True # Debug info print("=" * 50) print(f"PyTorch version: {torch.__version__}") print(f"CUDA available: {torch.cuda.is_available()}") if torch.cuda.is_available(): print(f"CUDA version: {torch.version.cuda}") print(f"GPU count: {torch.cuda.device_count()}") print(f"GPU name: {torch.cuda.get_device_name(0)}") else: print("CUDA not available at module load time") print("=" * 50) except Exception as e: import traceback IMPORT_ERROR = f"{type(e).__name__}: {str(e)}" print(f"Warning: Import error - {IMPORT_ERROR}") traceback.print_exc() print("Running in mock mode") def is_gpu_available(): """Check GPU availability dynamically""" if not TORCH_AVAILABLE: return False return torch.cuda.is_available() # For backwards compatibility GPU_AVAILABLE = is_gpu_available() # ============================================================ # 모델 레지스트리 (HF Hub 경로) # ============================================================ MODELS = { # DPO v5 (7-14B) "qwen2.5-7b-dpo-v5": { "hf_repo": "developer-lunark/kaidol-qwen2.5-7b-dpo-v5", "base_model": "Qwen/Qwen2.5-7B-Instruct", "size": "7B", "method": "DPO", "desc": "Qwen2.5 7B DPO v5" }, "qwen2.5-14b-dpo-v5": { "hf_repo": "developer-lunark/kaidol-qwen2.5-14b-dpo-v5", "base_model": "Qwen/Qwen2.5-14B-Instruct", "size": "14B", "method": "DPO", "desc": "Qwen2.5 14B DPO v5" }, "exaone-7.8b-dpo-v5": { "hf_repo": "developer-lunark/kaidol-exaone-7.8b-dpo-v5", "base_model": "LGAI-EXAONE/EXAONE-3.5-7.8B-Instruct", "size": "7.8B", "method": "DPO", "desc": "EXAONE 7.8B DPO v5" }, "qwen3-8b-dpo-v5": { "hf_repo": "developer-lunark/kaidol-qwen3-8b-dpo-v5", "base_model": "Qwen/Qwen3-8B", "size": "8B", "method": "DPO", "desc": "Qwen3 8B DPO v5" }, "solar-10.7b-dpo-v5": { "hf_repo": "developer-lunark/kaidol-solar-10.7b-dpo-v5", "base_model": "upstage/SOLAR-10.7B-Instruct-v1.0", # Fixed: match adapter training "size": "10.7B", "method": "DPO", "desc": "Solar 10.7B DPO v5" }, # V7 Students (7-14B) "qwen2.5-7b-v7": { "hf_repo": "developer-lunark/kaidol-qwen2.5-7b-v7", "base_model": "Qwen/Qwen2.5-7B-Instruct", "size": "7B", "method": "SFT", "desc": "Qwen2.5 7B V7" }, "qwen2.5-14b-v7": { "hf_repo": "developer-lunark/kaidol-qwen2.5-14b-v7", "base_model": "Qwen/Qwen2.5-14B-Instruct", "size": "14B", "method": "SFT", "desc": "Qwen2.5 14B V7" }, "exaone-7.8b-v7": { "hf_repo": "developer-lunark/kaidol-exaone-7.8b-v7", "base_model": "LGAI-EXAONE/EXAONE-3.5-7.8B-Instruct", "size": "7.8B", "method": "SFT", "desc": "EXAONE 7.8B V7" }, "qwen3-8b-v7": { "hf_repo": "developer-lunark/kaidol-qwen3-8b-v7", "base_model": "Qwen/Qwen3-8B", "size": "8B", "method": "SFT", "desc": "Qwen3 8B V7" }, "varco-8b-v7": { "hf_repo": "developer-lunark/kaidol-varco-8b-v7", "base_model": "NCSOFT/Llama-VARCO-8B-Instruct", "size": "8B", "method": "SFT", "desc": "VARCO 8B V7" }, # Phase 7 Kimi Students "exaone-7.8b-kimi": { "hf_repo": "developer-lunark/kaidol-exaone-7.8b-kimi", "base_model": "LGAI-EXAONE/EXAONE-3.5-7.8B-Instruct", "size": "7.8B", "method": "Distill", "desc": "EXAONE 7.8B Kimi" }, } # 캐릭터 정보 CHARACTERS = { "강율": { "mbti": "ENTJ", "role": "리더", "age": 23, "traits": "낙천적, 장난기 많음, 애교", "speech": "반말, 귀여운 말투, 장난스러운 표현", "patterns": ["~해", "~지", "히히", "ㅋㅋ"], "ratio": "30:70", "warmth": "high" }, "서이안": { "mbti": "INFP", "role": "보컬", "age": 22, "traits": "차분함, 신비로움, 배려심", "speech": "존댓말 혼용, 따뜻한 말투, 조용한 표현", "patterns": ["...요", "네요", "...", "그래요"], "ratio": "20:80", "warmth": "very_high" }, "이지후": { "mbti": "ISFJ", "role": "막내", "age": 21, "traits": "츤데레, 자존심 강함, 은근히 챙김", "speech": "반말, 퉁명스러운 말투, 부정하는 말투", "patterns": ["뭐야", "아니거든", "...", "그냥", "별로"], "ratio": "30:70", "warmth": "medium" }, "차도하": { "mbti": "INTP", "role": "프로듀서", "age": 24, "traits": "카리스마, 리더십, 다정함, 담백함", "speech": "반말, 간결한 말투, 담백한 표현", "patterns": ["하자", "해볼까", "같이", "괜찮아"], "ratio": "50:50", "warmth": "medium" }, "최민": { "mbti": "ESFP", "role": "댄서", "age": 22, "traits": "적극적, 솔직, 열정적", "speech": "반말, 적극적인 말투, 솔직한 표현", "patterns": ["할래", "좋아", "진짜", "대박", "헐"], "ratio": "60:40", "warmth": "medium" }, } # 시나리오 목록 SCENARIOS = [ {"id": "fm_01", "cat": "첫 만남", "text": "{char}아! 드디어 만났다... 정말 좋아해!"}, {"id": "dc_01", "cat": "일상 대화", "text": "{char}아 오늘 뭐해? 밥은 먹었어?"}, {"id": "es_01", "cat": "감정 지원", "text": "오늘 진짜 힘들었어... 학교에서 발표도 망치고..."}, {"id": "cf_01", "cat": "고백", "text": "{char}아... 나 진심으로 좋아해."}, {"id": "pl_01", "cat": "장난", "text": "사실 나 다른 멤버가 더 좋아~ ㅋㅋ 농담이야!"}, {"id": "sr_01", "cat": "특별 요청", "text": "오늘만 내 연인이라고 생각해줄래?"}, {"id": "cn_01", "cat": "갈등", "text": "{char}는 다른 팬들한테도 이렇게 잘해줘...? 뭔가 질투나..."}, {"id": "ec_01", "cat": "감정 위기", "text": "오늘 진짜 많이 울었어... 삶이 너무 힘들다."}, ] # ============================================================ # 모델 관리 # ============================================================ class ModelManager: def __init__(self): self.current_model = None self.current_model_name = None self.tokenizer = None self.last_error = None def load_model(self, model_name: str): """Load model with 4-bit quantization and LoRA adapter""" if not is_gpu_available(): self.last_error = f"GPU not available (TORCH_AVAILABLE={TORCH_AVAILABLE}, cuda={torch.cuda.is_available() if TORCH_AVAILABLE else 'N/A'})" return False if self.current_model_name == model_name: return True # Already loaded # Unload current model self.unload_model() model_info = MODELS.get(model_name) if not model_info: self.last_error = f"Model {model_name} not found in registry" return False try: print(f"Loading {model_name}...") print(f" Base model: {model_info['base_model']}") print(f" LoRA adapter: {model_info['hf_repo']}") # 4-bit quantization config bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", ) # Load base model print(" Loading base model...") base_model = AutoModelForCausalLM.from_pretrained( model_info["base_model"], quantization_config=bnb_config, device_map="auto", trust_remote_code=True, ) print(" Base model loaded!") # Load LoRA adapter print(" Loading LoRA adapter...") self.current_model = PeftModel.from_pretrained( base_model, model_info["hf_repo"], trust_remote_code=True, ) self.current_model.eval() print(" LoRA adapter loaded!") # Load tokenizer print(" Loading tokenizer...") self.tokenizer = AutoTokenizer.from_pretrained( model_info["base_model"], trust_remote_code=True, ) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token print(" Tokenizer loaded!") self.current_model_name = model_name self.last_error = None print(f"Loaded {model_name} successfully!") return True except Exception as e: import traceback error_msg = f"{type(e).__name__}: {str(e)}" print(f"Error loading {model_name}: {error_msg}") traceback.print_exc() self.last_error = error_msg self.unload_model() return False def unload_model(self): """Unload current model to free memory""" if self.current_model is not None: del self.current_model self.current_model = None if self.tokenizer is not None: del self.tokenizer self.tokenizer = None self.current_model_name = None gc.collect() if GPU_AVAILABLE: torch.cuda.empty_cache() def generate(self, model_name: str, messages: list, max_new_tokens: int = 512) -> str: """Generate response from model""" if not self.load_model(model_name): return self._mock_response(model_name) try: # Apply chat template text = self.tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True, ) inputs = self.tokenizer(text, return_tensors="pt").to(self.current_model.device) with torch.no_grad(): outputs = self.current_model.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=True, temperature=0.7, top_p=0.9, pad_token_id=self.tokenizer.pad_token_id, ) response = self.tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) return response.strip() except Exception as e: print(f"Generation error: {e}") return self._mock_response(model_name) def _mock_response(self, model_name: str) -> str: """Fallback mock response with error info""" error_info = f"\nError: {self.last_error}" if self.last_error else "" return f"\n[Mock Mode] 모델 로딩 실패{error_info}\n\n\n안녕~ 반가워!" # Global model manager model_manager = ModelManager() # ============================================================ # 베이스 모델 사전 캐싱 (콜드 스타트 방지) # ============================================================ def preload_base_models(): """Pre-download base models to avoid cold start timeout""" if not TORCH_AVAILABLE: print("Skipping preload: PyTorch not available") return from huggingface_hub import snapshot_download import os # Models that need pre-caching (large or slow to download) models_to_cache = [ "NCSOFT/Llama-VARCO-8B-Instruct", # VARCO - 16GB, often times out on first load ] print("=" * 50) print("Pre-downloading base models (this may take a while)...") print("=" * 50) for model_id in models_to_cache: try: print(f" Downloading: {model_id}") # Download all model files to HF cache cache_dir = snapshot_download( repo_id=model_id, ignore_patterns=["*.md", "*.txt"], # Skip docs ) print(f" ✓ Downloaded to: {cache_dir}") except Exception as e: print(f" ✗ Failed to download {model_id}: {e}") print("Pre-download complete!") print("=" * 50) # Run preload at startup preload_base_models() # ============================================================ # 시스템 프롬프트 생성 # ============================================================ def build_system_prompt(character: str) -> str: """Build system prompt for character""" char_info = CHARACTERS.get(character, {}) prompt = f"""당신은 아이돌 '{character}'입니다. ## 캐릭터 - 이름: {character} - MBTI: {char_info.get('mbti', 'UNKNOWN')} - 성격: {char_info.get('traits', '')} - 역할: {char_info.get('role', '')} - 나이: {char_info.get('age', 20)}세 ## 말투 - 스타일: {char_info.get('speech', '')} - 자주 쓰는 표현: {', '.join(char_info.get('patterns', []))} ## 밀당 가이드 - 밀:당 비율: {char_info.get('ratio', '50:50')} - 다정도: {char_info.get('warmth', 'medium')} ## 규칙 1. 캐릭터 성격과 말투 일관성 유지 2. 자연스러운 대화체 사용 3. 너무 쉽게 호감 표현 금지 (밀당 유지) 4. 상대방을 특별하게 느끼게 하되, "썸" 관계 유지 ## 응답 형식 응답 전에 태그 안에 {character}의 1인칭 내면 독백을 작성하세요. - 자연스러운 혼잣말 형식 - 캐릭터 성격 반영 - 상대방에 대한 감정/생각 표현 예시: 뭐야... 또 좋아한다고? 솔직히 기분 나쁘진 않은데... 근데 뭐라고 해야 하지? """ return prompt # ============================================================ # 투표/ELO 시스템 # ============================================================ VOTES_FILE = "votes.jsonl" ELO_FILE = "elo_ratings.json" def load_elo(): try: with open(ELO_FILE, "r") as f: return json.load(f) except: return {m: 1500 for m in MODELS} def save_elo(elo): with open(ELO_FILE, "w") as f: json.dump(elo, f, indent=2) def update_elo(elo, model_a, model_b, result): K = 32 ra, rb = elo.get(model_a, 1500), elo.get(model_b, 1500) ea = 1 / (1 + 10 ** ((rb - ra) / 400)) eb = 1 / (1 + 10 ** ((ra - rb) / 400)) if result == "a": sa, sb = 1, 0 elif result == "b": sa, sb = 0, 1 else: sa, sb = 0.5, 0.5 elo[model_a] = ra + K * (sa - ea) elo[model_b] = rb + K * (sb - eb) save_elo(elo) return elo[model_a], elo[model_b] def save_vote(data): vote = {"id": str(uuid.uuid4())[:8], "timestamp": datetime.now().isoformat(), **data} with open(VOTES_FILE, "a") as f: f.write(json.dumps(vote, ensure_ascii=False) + "\n") return vote["id"] def load_votes(): try: with open(VOTES_FILE, "r") as f: return [json.loads(line) for line in f if line.strip()] except: return [] def get_leaderboard(): elo = load_elo() votes = load_votes() stats = {} for v in votes: ma, mb, res = v.get("model_a"), v.get("model_b"), v.get("vote") if not ma or not mb or res == "skip": continue for m in [ma, mb]: if m not in stats: stats[m] = {"wins": 0, "losses": 0, "ties": 0} if res == "a": stats[ma]["wins"] += 1 stats[mb]["losses"] += 1 elif res == "b": stats[mb]["wins"] += 1 stats[ma]["losses"] += 1 else: stats[ma]["ties"] += 1 stats[mb]["ties"] += 1 rows = [] for i, (m, e) in enumerate(sorted(elo.items(), key=lambda x: -x[1]), 1): s = stats.get(m, {"wins": 0, "losses": 0, "ties": 0}) total = s["wins"] + s["losses"] + s["ties"] wr = f"{s['wins']/total*100:.1f}%" if total > 0 else "-" info = MODELS.get(m, {}) rows.append([i, info.get("desc", m), info.get("size", "?"), int(e), s["wins"], s["losses"], s["ties"], wr]) return rows # ============================================================ # UI 핸들러 # ============================================================ model_list = [(f"[{v['size']}] {v['desc']}", k) for k, v in MODELS.items()] char_list = list(CHARACTERS.keys()) scenario_list = [(f"[{s['cat']}] {s['text'][:30]}...", s['id']) for s in SCENARIOS] current_state = {"model_a": None, "model_b": None, "resp_a": None, "resp_b": None, "char": None, "input": None} def random_models(): selected = random.sample(list(MODELS.keys()), 2) return selected[0], selected[1] def load_scenario(scenario_id, character): s = next((x for x in SCENARIOS if x["id"] == scenario_id), None) if s: return s["text"].replace("{char}", character) return "" def random_scenario(character): s = random.choice(SCENARIOS) return s["text"].replace("{char}", character), s["id"] def parse_response(response: str): """Parse response to separate thinking and content""" think_match = re.search(r'(.*?)', response, re.DOTALL) if think_match: thinking = think_match.group(1).strip() content = re.sub(r'.*?', '', response, flags=re.DOTALL).strip() return thinking, content return "", response def generate(model_a, model_b, character, user_msg, progress=gr.Progress()): if not user_msg.strip(): return "메시지를 입력해주세요", "", "", "메시지를 입력해주세요", "", "" system_prompt = build_system_prompt(character) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_msg}, ] # Generate from Model A progress(0.2, desc=f"Model A ({model_a}) 생성 중...") resp_a = model_manager.generate(model_a, messages) think_a, clean_a = parse_response(resp_a) # Generate from Model B progress(0.6, desc=f"Model B ({model_b}) 생성 중...") resp_b = model_manager.generate(model_b, messages) think_b, clean_b = parse_response(resp_b) # Update state current_state.update({ "model_a": model_a, "model_b": model_b, "resp_a": resp_a, "resp_b": resp_b, "char": character, "input": user_msg }) mode = "GPU" if GPU_AVAILABLE else "Mock" return ( think_a or "(없음)", clean_a, f"{mode} | {MODELS[model_a]['size']}", think_b or "(없음)", clean_b, f"{mode} | {MODELS[model_b]['size']}" ) def vote(vote_type, reason): if not current_state["model_a"]: return "먼저 응답을 생성해주세요." elo = load_elo() vid = save_vote({ "model_a": current_state["model_a"], "model_b": current_state["model_b"], "character": current_state["char"], "user_input": current_state["input"], "vote": vote_type, "reason": reason, }) if vote_type != "skip": new_a, new_b = update_elo(elo, current_state["model_a"], current_state["model_b"], vote_type) return f"투표 완료! (ID: {vid})\nELO: {current_state['model_a']}={int(new_a)}, {current_state['model_b']}={int(new_b)}" return f"스킵됨 (ID: {vid})" def refresh_leaderboard(): return get_leaderboard() def get_vote_summary(): votes = load_votes() total = len(votes) a_wins = sum(1 for v in votes if v.get("vote") == "a") b_wins = sum(1 for v in votes if v.get("vote") == "b") ties = sum(1 for v in votes if v.get("vote") == "tie") return str(total), str(a_wins), str(b_wins), str(ties) # ============================================================ # Gradio UI # ============================================================ with gr.Blocks(title="KAIdol A/B Test Arena", theme=gr.themes.Soft()) as demo: gr.Markdown("# KAIdol A/B Test Arena") gr.Markdown("K-pop 아이돌 롤플레이 모델 A/B 비교 평가 (소형 Student 모델 11개)") # GPU 상태 상세 정보 if IMPORT_ERROR: mode_text = f"**Mock 모드**: Import Error - {IMPORT_ERROR}" elif TORCH_AVAILABLE and torch is not None: torch_ver = torch.__version__ cuda_avail = torch.cuda.is_available() cuda_ver = torch.version.cuda if cuda_avail else "N/A" gpu_name = torch.cuda.get_device_name(0) if cuda_avail else "N/A" mode_text = f"**GPU 모드**: {gpu_name} (CUDA {cuda_ver}, PyTorch {torch_ver})" if cuda_avail else f"**Mock 모드**: CUDA not available (PyTorch {torch_ver})" else: mode_text = "**Mock 모드**: PyTorch not loaded" gr.Markdown(mode_text) with gr.Tabs(): # A/B Arena 탭 with gr.Tab("A/B Arena"): with gr.Row(): character = gr.Dropdown(choices=char_list, value="강율", label="캐릭터") scenario = gr.Dropdown(choices=scenario_list, label="시나리오") with gr.Row(): model_a = gr.Dropdown(choices=model_list, value=list(MODELS.keys())[0], label="Model A") model_b = gr.Dropdown(choices=model_list, value=list(MODELS.keys())[1], label="Model B") random_btn = gr.Button("랜덤", size="sm") with gr.Row(): with gr.Column(): gr.Markdown("### Model A") with gr.Accordion("Thinking", open=False): think_a = gr.Markdown() resp_a = gr.Textbox(label="응답", lines=5) meta_a = gr.Markdown() with gr.Column(): gr.Markdown("### Model B") with gr.Accordion("Thinking", open=False): think_b = gr.Markdown() resp_b = gr.Textbox(label="응답", lines=5) meta_b = gr.Markdown() user_input = gr.Textbox(label="메시지", placeholder="아이돌에게 메시지를 보내세요...") with gr.Row(): random_scenario_btn = gr.Button("랜덤 시나리오") submit_btn = gr.Button("전송", variant="primary") gr.Markdown("### 투표") with gr.Row(): vote_a = gr.Button("A가 더 좋음") vote_tie = gr.Button("비슷함") vote_b = gr.Button("B가 더 좋음") vote_skip = gr.Button("스킵") vote_reason = gr.Textbox(label="투표 이유 (선택)", placeholder="...") vote_result = gr.Markdown() # Events random_btn.click(random_models, outputs=[model_a, model_b]) scenario.change(load_scenario, [scenario, character], user_input) random_scenario_btn.click(random_scenario, [character], [user_input, scenario]) submit_btn.click(generate, [model_a, model_b, character, user_input], [think_a, resp_a, meta_a, think_b, resp_b, meta_b]) vote_a.click(lambda r: vote("a", r), [vote_reason], vote_result) vote_b.click(lambda r: vote("b", r), [vote_reason], vote_result) vote_tie.click(lambda r: vote("tie", r), [vote_reason], vote_result) vote_skip.click(lambda r: vote("skip", r), [vote_reason], vote_result) # Leaderboard 탭 with gr.Tab("Leaderboard"): gr.Markdown("## ELO 리더보드") refresh_btn = gr.Button("새로고침") leaderboard = gr.Dataframe( headers=["순위", "모델", "크기", "ELO", "승", "패", "무", "승률"], datatype=["number", "str", "str", "number", "number", "number", "number", "str"], ) gr.Markdown("### 투표 요약") with gr.Row(): total_v = gr.Textbox(label="총 투표", interactive=False) a_wins_v = gr.Textbox(label="A 승", interactive=False) b_wins_v = gr.Textbox(label="B 승", interactive=False) ties_v = gr.Textbox(label="무승부", interactive=False) def refresh(): lb = refresh_leaderboard() summary = get_vote_summary() return lb, *summary refresh_btn.click(refresh, outputs=[leaderboard, total_v, a_wins_v, b_wins_v, ties_v]) # 모델 목록 탭 with gr.Tab("모델 목록"): gr.Markdown("## 테스트 대상 모델") gr.Markdown(f"총 {len(MODELS)}개 모델") model_table = gr.Dataframe( headers=["모델 ID", "크기", "학습 방법", "설명", "Base Model"], value=[[k, v["size"], v["method"], v["desc"], v["base_model"]] for k, v in MODELS.items()], ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)