import gradio as gr import pandas as pd import torch from datasets import Dataset, DatasetDict from transformers import ( AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, DataCollatorWithPadding ) from peft import ( LoraConfig, AdaLoraConfig, AdaptionPromptConfig, PromptTuningConfig, PrefixTuningConfig, get_peft_model, TaskType, PeftModel ) from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix from sklearn.utils import resample import numpy as np import json from datetime import datetime import os import gc import random from huggingface_hub import login # ==================== 全域變數 ==================== LAST_MODEL_PATH = None LAST_TOKENIZER = None MAX_LENGTH = 512 RANDOM_SEED = 42 # ⭐ 全域隨機種子 # ==================== 🎲 完整的隨機種子控制 ==================== def set_seed(seed=42): """ ⭐ 設定所有隨機種子以確保結果完全可重現 ⭐ 這個函數會設定: 1. Python 內建 random 模組 2. NumPy 隨機數生成器 3. PyTorch CPU 隨機數生成器 4. PyTorch CUDA 隨機數生成器(所有 GPU) 5. CUDA 確定性行為 6. 環境變數 注意:開啟確定性模式可能會降低 10-20% 的訓練速度 """ print(f"\n{'='*70}") print(f"🎲 設定隨機種子以確保可重現性") print(f"{'='*70}") # 1. Python 內建 random random.seed(seed) print(f"✅ Python random.seed({seed})") # 2. NumPy np.random.seed(seed) print(f"✅ NumPy random.seed({seed})") # 3. PyTorch CPU torch.manual_seed(seed) print(f"✅ PyTorch manual_seed({seed})") # 4. PyTorch CUDA(所有 GPU) if torch.cuda.is_available(): torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) print(f"✅ PyTorch CUDA seed({seed}) - 適用於所有 GPU") # 5. CUDA 確定性設定 torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False print(f"✅ CUDA deterministic mode: ON") print(f"✅ CUDA benchmark: OFF") # 6. 環境變數 os.environ['PYTHONHASHSEED'] = str(seed) os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8' print(f"✅ PYTHONHASHSEED = {seed}") print(f"✅ CUBLAS_WORKSPACE_CONFIG = :4096:8") # 7. PyTorch 確定性操作 try: torch.use_deterministic_algorithms(True) print(f"✅ PyTorch deterministic algorithms: ON") except Exception as e: print(f"⚠️ PyTorch deterministic algorithms: 不支援 ({e})") print(f"{'='*70}") print(f"✅ 隨機種子設定完成!結果應該完全可重現") print(f"⚠️ 注意:確定性模式可能會稍微降低訓練速度") print(f"{'='*70}\n") # ==================== 程式啟動時立即設定種子 ==================== set_seed(RANDOM_SEED) # ==================== HF Token 登入 ==================== print("🔐 檢查 Hugging Face Token...") if "HF_TOKEN" in os.environ: try: login(token=os.environ["HF_TOKEN"]) print("✅ 已使用 HF Token 登入") except Exception as e: print(f"⚠️ Token 登入失敗: {e}") else: print("⚠️ 未找到 HF_TOKEN,可能無法下載 Llama 模型") # 檢測設備 device = "cuda" if torch.cuda.is_available() else "cpu" print(f"🖥️ 使用設備: {device}") # ==================== 核心訓練函數(你的原始邏輯) ==================== def run_llama_training( file_path, model_name, target_samples, use_class_weights, num_epochs, batch_size, learning_rate, tuning_method, lora_r, lora_alpha, lora_dropout, lora_target_modules, adalora_init_r, adalora_target_r, adalora_alpha, adalora_tinit, adalora_tfinal, adalora_delta_t, adapter_reduction_factor, prompt_tuning_num_tokens, prefix_tuning_num_tokens, best_metric ): """ 你的原始 Llama 訓練邏輯,加入多種微調方法選擇 """ global LAST_MODEL_PATH, LAST_TOKENIZER # ⭐ 訓練前重新確保隨機種子設定 print("\n" + "="*70) print("🔄 訓練前重新確認隨機種子...") print("="*70) set_seed(RANDOM_SEED) # ==================== 清空記憶體(訓練前) ==================== torch.cuda.empty_cache() gc.collect() print("🧹 記憶體已清空") # ==================== 1. 載入數據 ==================== print("📂 載入訓練數據...") df = pd.read_csv(file_path) print(f"✅ 成功載入 {len(df)} 筆數據") # 自動偵測文本和標籤欄位 text_col = None label_col = None # 支援的文本欄位名稱 if 'Text' in df.columns: text_col = 'Text' elif 'text' in df.columns: text_col = 'text' # 支援的標籤欄位名稱 if 'Label' in df.columns: label_col = 'Label' elif 'label' in df.columns: label_col = 'label' if text_col is None or label_col is None: raise ValueError( f"❌ 無法偵測到正確的欄位名稱!\n" f"📋 您的 CSV 欄位: {list(df.columns)}\n\n" f"✅ 請使用以下欄位名稱:\n" f" 文本欄位: 'Text' 或 'text'\n" f" 標籤欄位: 'Label' 或 'label'" ) print(f" ✅ 偵測到文本欄位: '{text_col}'") print(f" ✅ 偵測到標籤欄位: '{label_col}'") # 統一重命名為標準欄位名 df = df.rename(columns={text_col: 'Text', label_col: 'nbcd'}) print(f" 原始 Class 0: {(df['nbcd']==0).sum()} 筆") print(f" 原始 Class 1: {(df['nbcd']==1).sum()} 筆") # ==================== 2. 資料平衡處理 ==================== print("\n⚖️ 執行資料平衡...") df_class_0 = df[df['nbcd'] == 0] df_class_1 = df[df['nbcd'] == 1] target_n = int(target_samples) # 欠採樣 Class 0 if len(df_class_0) > target_n: df_class_0_balanced = resample(df_class_0, n_samples=target_n, random_state=42, replace=False) print(f"✅ Class 0 欠採樣: {len(df_class_0)} → {len(df_class_0_balanced)} 筆") else: df_class_0_balanced = df_class_0 print(f"⚠️ Class 0 樣本數不足,保持 {len(df_class_0)} 筆") # 過採樣 Class 1 if len(df_class_1) < target_n: df_class_1_balanced = resample(df_class_1, n_samples=target_n, random_state=42, replace=True) print(f"✅ Class 1 過採樣: {len(df_class_1)} → {len(df_class_1_balanced)} 筆") else: df_class_1_balanced = df_class_1 print(f"⚠️ Class 1 樣本數充足,保持 {len(df_class_1)} 筆") df_balanced = pd.concat([df_class_0_balanced, df_class_1_balanced]) df_balanced = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True) print(f"\n📊 平衡後數據:") print(f" 總樣本數: {len(df_balanced)} 筆") print(f" Class 0: {(df_balanced['nbcd']==0).sum()} 筆") print(f" Class 1: {(df_balanced['nbcd']==1).sum()} 筆") # ==================== 3. 計算類別權重 ==================== if use_class_weights: print("\n⚖️ 計算類別權重...") class_counts = df_balanced['nbcd'].value_counts().sort_index() total = len(df_balanced) num_classes = 2 class_weight_0 = total / (num_classes * class_counts[0]) class_weight_1 = total / (num_classes * class_counts[1]) class_weights = torch.tensor([class_weight_0, class_weight_1], dtype=torch.float32) print(f"✅ 類別權重計算完成:") print(f" Class 0 權重: {class_weight_0:.4f}") print(f" Class 1 權重: {class_weight_1:.4f}") if device == "cuda": class_weights = class_weights.to(device) else: class_weights = None print("\n⚠️ 未使用類別權重") # ==================== 4. 分割數據 ==================== print("\n✂️ 分割訓練集和測試集...") train_df, test_df = train_test_split( df_balanced, test_size=0.2, stratify=df_balanced['nbcd'], random_state=42 ) print(f"✅ 訓練集: {len(train_df)} 筆 (Class 0: {(train_df['nbcd']==0).sum()}, Class 1: {(train_df['nbcd']==1).sum()})") print(f"✅ 測試集: {len(test_df)} 筆 (Class 0: {(test_df['nbcd']==0).sum()}, Class 1: {(test_df['nbcd']==1).sum()})") dataset = DatasetDict({ 'train': Dataset.from_pandas(train_df[['Text', 'nbcd']]), 'test': Dataset.from_pandas(test_df[['Text', 'nbcd']]) }) # ==================== 5. 載入模型和 Tokenizer ==================== print("\n🤖 載入 Llama 模型和 Tokenizer...") tokenizer = AutoTokenizer.from_pretrained(model_name) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token tokenizer.pad_token_id = tokenizer.eos_token_id # ==================== 6. 載入未微調的基礎模型 (Baseline) ==================== print("\n📦 載入未微調的基礎模型 (Baseline)...") baseline_model = AutoModelForSequenceClassification.from_pretrained( model_name, num_labels=2, torch_dtype=torch.float16 if device == "cuda" else torch.float32, device_map="auto" if device == "cuda" else None ) baseline_model.config.pad_token_id = tokenizer.pad_token_id print("✅ Baseline 模型載入完成") # ==================== 7. 載入要微調的模型 ==================== print("\n🔧 載入用於微調的模型...") base_model = AutoModelForSequenceClassification.from_pretrained( model_name, num_labels=2, torch_dtype=torch.float16 if device == "cuda" else torch.float32, device_map="auto" if device == "cuda" else None ) base_model.config.pad_token_id = tokenizer.pad_token_id print("✅ 基礎模型載入完成") # ==================== 8. 配置微調方法 ==================== print(f"\n🔧 配置 {tuning_method}...") if tuning_method == "LoRA": # LoRA 配置 - 使用完整參數 target_modules_map = { "query,value": ["q_proj", "v_proj"], "query,key,value": ["q_proj", "k_proj", "v_proj"], "all": ["q_proj", "k_proj", "v_proj", "o_proj"] } peft_config = LoraConfig( task_type=TaskType.SEQ_CLS, r=int(lora_r), lora_alpha=int(lora_alpha), lora_dropout=float(lora_dropout), target_modules=target_modules_map.get(lora_target_modules, ["q_proj", "v_proj"]), bias="none" ) print(f"✅ LoRA 配置完成") print(f" LoRA rank (r): {lora_r}") print(f" LoRA alpha: {lora_alpha}") print(f" LoRA dropout: {lora_dropout}") print(f" 目標模組: {lora_target_modules}") elif tuning_method == "AdaLoRA": # AdaLoRA 配置 - 使用獨立參數 try: peft_config = AdaLoraConfig( task_type=TaskType.SEQ_CLS, inference_mode=False, r=int(adalora_target_r), lora_alpha=int(adalora_alpha), lora_dropout=0.1, target_modules=["q_proj", "v_proj"], # AdaLoRA 特定參數 init_r=int(adalora_init_r), target_r=int(adalora_target_r), tinit=int(adalora_tinit), tfinal=int(adalora_tfinal), deltaT=int(adalora_delta_t), ) print(f"✅ AdaLoRA 配置完成") print(f" 初始 rank: {adalora_init_r}") print(f" 目標 rank: {adalora_target_r}") print(f" Alpha: {adalora_alpha}") print(f" Tinit: {adalora_tinit}, Tfinal: {adalora_tfinal}") print(f" Delta T: {adalora_delta_t}") print(f" 自適應秩調整: 啟用") except Exception as e: print(f"⚠️ AdaLoRA 配置失敗,回退到 LoRA: {e}") peft_config = LoraConfig( task_type=TaskType.SEQ_CLS, r=int(adalora_target_r), lora_alpha=int(adalora_alpha), lora_dropout=0.1, target_modules=["q_proj", "v_proj"], bias="none" ) elif tuning_method == "Adapter": # Adapter (Bottleneck Adapters) peft_config = AdaptionPromptConfig( task_type=TaskType.SEQ_CLS, adapter_len=10, adapter_layers=30, reduction_factor=int(adapter_reduction_factor) ) print(f"✅ Adapter 配置完成") print(f" Reduction factor: {adapter_reduction_factor}") elif tuning_method == "Prompt Tuning": # Soft Prompt Tuning peft_config = PromptTuningConfig( task_type=TaskType.SEQ_CLS, num_virtual_tokens=int(prompt_tuning_num_tokens), prompt_tuning_init="TEXT", prompt_tuning_init_text="Classify if the following text indicates NBCD:", tokenizer_name_or_path=model_name ) print(f"✅ Prompt Tuning 配置完成") print(f" Virtual tokens: {prompt_tuning_num_tokens}") elif tuning_method == "Prefix Tuning": # Prefix Tuning - 可能有兼容性問題,但仍然嘗試 print(f"⚠️ Prefix Tuning 在某些環境可能有兼容性問題") print(f" 如果遇到錯誤,建議使用 Prompt Tuning 替代") try: # 先禁用模型的緩存功能 base_model.config.use_cache = False peft_config = PrefixTuningConfig( task_type=TaskType.SEQ_CLS, num_virtual_tokens=int(prefix_tuning_num_tokens), prefix_projection=False, inference_mode=False ) print(f"✅ Prefix Tuning 配置完成") print(f" Virtual tokens: {prefix_tuning_num_tokens}") print(f" 已禁用緩存") except Exception as e: print(f"❌ Prefix Tuning 配置失敗: {e}") raise ValueError( f"Prefix Tuning 配置失敗,原因: {e}\n" f"建議使用 Prompt Tuning 作為替代方案" ) elif tuning_method == "BitFit": # BitFit: 只訓練 bias 參數 - 完全修復版 model = base_model # 凍結所有參數 for param in model.parameters(): param.requires_grad = False # 只解凍 bias 和 分類頭 trainable_params_list = [] for name, param in model.named_parameters(): if 'bias' in name or 'score' in name or 'classifier' in name: param.requires_grad = True trainable_params_list.append(name) print(f"✅ BitFit 配置完成") print(f" 僅訓練 bias 和分類頭參數") print(f" 可訓練參數: {', '.join(trainable_params_list[:5])}...") # 應用 PEFT 配置(BitFit 除外) if tuning_method != "BitFit": model = get_peft_model(base_model, peft_config) # Prefix Tuning 額外設置 if tuning_method == "Prefix Tuning": model.config.use_cache = False # 計算可訓練參數 trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) total_params = sum(p.numel() for p in model.parameters()) print(f" 可訓練參數: {trainable_params:,} / {total_params:,} ({trainable_params/total_params*100:.2f}%)") # ==================== 9. 預處理數據 ==================== print("\n🔄 預處理數據...") def preprocess_function(examples): return tokenizer( examples['Text'], truncation=True, padding='max_length', max_length=MAX_LENGTH ) tokenized_dataset = dataset.map(preprocess_function, batched=True, remove_columns=['Text']) tokenized_dataset = tokenized_dataset.rename_column("nbcd", "labels") print("✅ 數據預處理完成") # ==================== 10. 評估指標函數 ==================== def compute_metrics(eval_pred): predictions, labels = eval_pred predictions = np.argmax(predictions, axis=1) accuracy = accuracy_score(labels, predictions) precision, recall, f1, _ = precision_recall_fscore_support( labels, predictions, average='binary', zero_division=0 ) # 計算混淆矩陣以得到 sensitivity 和 specificity cm = confusion_matrix(labels, predictions) if cm.shape == (2, 2): tn, fp, fn, tp = cm.ravel() sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0 # 敏感度 = Recall specificity = tn / (tn + fp) if (tn + fp) > 0 else 0 # 特異性 else: sensitivity = 0 specificity = 0 return { 'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1': f1, 'sensitivity': sensitivity, 'specificity': specificity } # ==================== 11. 評估 Baseline 模型 ==================== print("\n" + "="*70) print("📊 評估未微調的 Baseline 模型...") print("="*70) baseline_trainer = Trainer( model=baseline_model, args=TrainingArguments( output_dir="./temp_baseline_llama", per_device_eval_batch_size=int(batch_size), bf16=(device == "cuda"), report_to="none", seed=RANDOM_SEED # ⭐ Baseline 也使用相同種子 ), tokenizer=tokenizer, data_collator=DataCollatorWithPadding(tokenizer=tokenizer), compute_metrics=compute_metrics ) baseline_test_results = baseline_trainer.evaluate(eval_dataset=tokenized_dataset['test']) print("\n📝 Baseline 模型 - 測試集結果:") print(f" Accuracy: {baseline_test_results['eval_accuracy']:.4f}") print(f" Precision: {baseline_test_results['eval_precision']:.4f}") print(f" Recall: {baseline_test_results['eval_recall']:.4f}") print(f" F1 Score: {baseline_test_results['eval_f1']:.4f}") print(f" Sensitivity: {baseline_test_results['eval_sensitivity']:.4f}") print(f" Specificity: {baseline_test_results['eval_specificity']:.4f}") # 計算 Baseline 混淆矩陣 baseline_predictions = baseline_trainer.predict(tokenized_dataset['test']) baseline_pred_labels = np.argmax(baseline_predictions.predictions, axis=1) baseline_true_labels = baseline_predictions.label_ids baseline_cm = confusion_matrix(baseline_true_labels, baseline_pred_labels) # 清空 baseline 模型記憶體 del baseline_model del baseline_trainer torch.cuda.empty_cache() gc.collect() # ==================== 12. 自定義 Trainer ==================== if use_class_weights: class WeightedTrainer(Trainer): def __init__(self, *args, class_weights=None, **kwargs): super().__init__(*args, **kwargs) self.class_weights = class_weights def compute_loss(self, model, inputs, return_outputs=False, **kwargs): labels = inputs.pop("labels") outputs = model(**inputs) logits = outputs.logits loss_fct = torch.nn.CrossEntropyLoss(weight=self.class_weights) loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1)) return (loss, outputs) if return_outputs else loss TrainerClass = WeightedTrainer else: TrainerClass = Trainer # ==================== 13. 訓練配置 ==================== print("\n" + "="*70) print("⚙️ 配置微調訓練器...") print("="*70) # 指標映射 metric_map = { "f1": "f1", "accuracy": "accuracy", "precision": "precision", "recall": "recall", "sensitivity": "sensitivity", "specificity": "specificity" } output_dir = f'./llama_nbcd_{datetime.now().strftime("%Y%m%d_%H%M%S")}' training_args = TrainingArguments( output_dir=output_dir, num_train_epochs=int(num_epochs), per_device_train_batch_size=int(batch_size), per_device_eval_batch_size=int(batch_size), learning_rate=float(learning_rate), weight_decay=0.01, eval_strategy="epoch", save_strategy="epoch", load_best_model_at_end=True, metric_for_best_model=metric_map.get(best_metric, "recall"), logging_dir=f"{output_dir}/logs", logging_steps=10, bf16=(device == "cuda"), gradient_accumulation_steps=2, warmup_steps=50, report_to="none", seed=RANDOM_SEED, # ⭐ 使用全域種子 data_seed=RANDOM_SEED, # ⭐ 資料載入種子 dataloader_num_workers=0 # ⭐ 單執行緒以確保可重現 ) if use_class_weights: trainer = TrainerClass( model=model, args=training_args, train_dataset=tokenized_dataset['train'], eval_dataset=tokenized_dataset['test'], tokenizer=tokenizer, data_collator=DataCollatorWithPadding(tokenizer=tokenizer), compute_metrics=compute_metrics, class_weights=class_weights ) else: trainer = TrainerClass( model=model, args=training_args, train_dataset=tokenized_dataset['train'], eval_dataset=tokenized_dataset['test'], tokenizer=tokenizer, data_collator=DataCollatorWithPadding(tokenizer=tokenizer), compute_metrics=compute_metrics ) # ==================== 14. 開始訓練 ==================== print("\n" + "="*70) print("🚀 開始微調訓練...") print("="*70 + "\n") start_time = datetime.now() train_result = trainer.train() end_time = datetime.now() duration = (end_time - start_time).total_seconds() / 60 print("\n" + "="*70) print(f"✅ 訓練完成!") print(f" 耗時: {duration:.1f} 分鐘") print("="*70) # ==================== 15. 評估微調後的模型 ==================== print("\n" + "="*70) print("📊 評估微調後的模型...") print("="*70) finetuned_test_results = trainer.evaluate(eval_dataset=tokenized_dataset['test']) print("\n📝 微調模型 - 測試集結果:") print(f" Accuracy: {finetuned_test_results['eval_accuracy']:.4f}") print(f" Precision: {finetuned_test_results['eval_precision']:.4f}") print(f" Recall: {finetuned_test_results['eval_recall']:.4f}") print(f" F1 Score: {finetuned_test_results['eval_f1']:.4f}") print(f" Sensitivity: {finetuned_test_results['eval_sensitivity']:.4f}") print(f" Specificity: {finetuned_test_results['eval_specificity']:.4f}") # 計算微調模型混淆矩陣 finetuned_predictions = trainer.predict(tokenized_dataset['test']) finetuned_pred_labels = np.argmax(finetuned_predictions.predictions, axis=1) finetuned_true_labels = finetuned_predictions.label_ids finetuned_cm = confusion_matrix(finetuned_true_labels, finetuned_pred_labels) # ==================== 16. 保存模型和結果 ==================== print("\n💾 保存模型和結果...") trainer.save_model() tokenizer.save_pretrained(output_dir) # 儲存模型資訊到 JSON 檔案 model_info = { 'model_path': output_dir, 'model_name': model_name, 'tuning_method': tuning_method, 'best_metric': best_metric, 'best_metric_value': float(finetuned_test_results[f'eval_{metric_map.get(best_metric, "recall")}']), 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'target_samples': target_samples, 'epochs': num_epochs, 'batch_size': batch_size, 'learning_rate': learning_rate, 'lora_r': lora_r if tuning_method in ["LoRA", "AdaLoRA"] else None, 'lora_alpha': lora_alpha if tuning_method in ["LoRA", "AdaLoRA"] else None } # 讀取現有的模型列表 models_list_file = './saved_llama_models_list.json' if os.path.exists(models_list_file): with open(models_list_file, 'r') as f: models_list = json.load(f) else: models_list = [] # 加入新模型資訊 models_list.append(model_info) # 儲存更新後的列表 with open(models_list_file, 'w') as f: json.dump(models_list, f, indent=2) # 更新全域變數 LAST_MODEL_PATH = output_dir LAST_TOKENIZER = tokenizer print(f"✅ 模型已儲存至: {output_dir}") # ==================== 清空記憶體(訓練後) ==================== del model del trainer torch.cuda.empty_cache() gc.collect() print("🧹 訓練後記憶體已清空") # 準備返回結果 results = { 'baseline_results': baseline_test_results, 'finetuned_results': finetuned_test_results, 'baseline_cm': baseline_cm, 'finetuned_cm': finetuned_cm, 'model_path': output_dir, 'duration': duration, 'best_metric': best_metric, 'model_name': model_name, 'tuning_method': tuning_method } return results # ==================== Gradio Wrapper 函數 ==================== def train_wrapper( file, model_name, target_samples, use_class_weights, num_epochs, batch_size, learning_rate, tuning_method, lora_r, lora_alpha, lora_dropout, lora_target_modules, adalora_init_r, adalora_target_r, adalora_alpha, adalora_tinit, adalora_tfinal, adalora_delta_t, adapter_reduction_factor, prompt_tuning_num_tokens, prefix_tuning_num_tokens, best_metric ): """包裝函數,處理 Gradio 的輸入輸出""" if file is None: return "請上傳 CSV 檔案", "", "" try: # 呼叫訓練函數 results = run_llama_training( file_path=file.name, model_name=model_name, target_samples=target_samples, use_class_weights=use_class_weights, num_epochs=num_epochs, batch_size=batch_size, learning_rate=learning_rate, tuning_method=tuning_method, lora_r=lora_r, lora_alpha=lora_alpha, lora_dropout=lora_dropout, lora_target_modules=lora_target_modules, adalora_init_r=adalora_init_r, adalora_target_r=adalora_target_r, adalora_alpha=adalora_alpha, adalora_tinit=adalora_tinit, adalora_tfinal=adalora_tfinal, adalora_delta_t=adalora_delta_t, adapter_reduction_factor=adapter_reduction_factor, prompt_tuning_num_tokens=prompt_tuning_num_tokens, prefix_tuning_num_tokens=prefix_tuning_num_tokens, best_metric=best_metric ) baseline_results = results['baseline_results'] finetuned_results = results['finetuned_results'] baseline_cm = results['baseline_cm'] finetuned_cm = results['finetuned_cm'] # 第一格:資料資訊 data_info = f""" # 📊 資料資訊 ## 🔧 訓練配置 - **模型**: {results['model_name']} - **微調方法**: {results['tuning_method']} - **最佳化指標**: {results['best_metric']} - **訓練時長**: {results['duration']:.1f} 分鐘 ## ⚙️ 訓練參數 - **目標樣本數**: {target_samples} 筆/類別 - **使用類別權重**: {'是' if use_class_weights else '否'} - **訓練輪數**: {num_epochs} - **批次大小**: {batch_size} - **學習率**: {learning_rate} ✅ 訓練完成!模型已儲存,可在「預測」頁面使用! """ # 第二格:未微調 Llama baseline_output = f""" # 🔵 未微調 Llama (Baseline) ## 未經訓練 ### 📈 評估指標 | 指標 | 數值 | |------|------| | **Accuracy** | {baseline_results['eval_accuracy']:.4f} | | **Precision** | {baseline_results['eval_precision']:.4f} | | **Recall** | {baseline_results['eval_recall']:.4f} | | **F1 Score** | {baseline_results['eval_f1']:.4f} | | **Sensitivity** | {baseline_results['eval_sensitivity']:.4f} | | **Specificity** | {baseline_results['eval_specificity']:.4f} | ### 📊 混淆矩陣 (Confusion Matrix) | | 預測: 存活 (0) | 預測: 死亡 (1) | |------|------|------| | **實際: 存活 (0)** | {baseline_cm[0][0]} | {baseline_cm[0][1]} | | **實際: 死亡 (1)** | {baseline_cm[1][0]} | {baseline_cm[1][1]} | - **True Negatives (TN)**: {baseline_cm[0][0]} - 正確預測為存活 - **False Positives (FP)**: {baseline_cm[0][1]} - 誤判為死亡 - **False Negatives (FN)**: {baseline_cm[1][0]} - 誤判為存活 - **True Positives (TP)**: {baseline_cm[1][1]} - 正確預測為死亡 """ # 第三格:微調後 Llama finetuned_output = f""" # 🟢 微調後 Llama ## {results['tuning_method']} ### 📈 評估指標 | 指標 | 數值 | |------|------| | **Accuracy** | {finetuned_results['eval_accuracy']:.4f} | | **Precision** | {finetuned_results['eval_precision']:.4f} | | **Recall** | {finetuned_results['eval_recall']:.4f} | | **F1 Score** | {finetuned_results['eval_f1']:.4f} | | **Sensitivity** | {finetuned_results['eval_sensitivity']:.4f} | | **Specificity** | {finetuned_results['eval_specificity']:.4f} | ### 📊 混淆矩陣 (Confusion Matrix) | | 預測: 存活 (0) | 預測: 死亡 (1) | |------|------|------| | **實際: 存活 (0)** | {finetuned_cm[0][0]} | {finetuned_cm[0][1]} | | **實際: 死亡 (1)** | {finetuned_cm[1][0]} | {finetuned_cm[1][1]} | - **True Negatives (TN)**: {finetuned_cm[0][0]} - 正確預測為存活 - **False Positives (FP)**: {finetuned_cm[0][1]} - 誤判為死亡 - **False Negatives (FN)**: {finetuned_cm[1][0]} - 誤判為存活 - **True Positives (TP)**: {finetuned_cm[1][1]} - 正確預測為死亡 """ return data_info, baseline_output, finetuned_output except Exception as e: import traceback error_msg = f"❌ 錯誤:{str(e)}\n\n詳細錯誤訊息:\n{traceback.format_exc()}" return error_msg, "", "" # ==================== 預測函數 ==================== def predict_text(model_choice, text_input): """ 預測功能 - 支持選擇已訓練的模型,並同時顯示未微調和微調的預測結果 """ if not text_input or text_input.strip() == "": return "請輸入文本", "請輸入文本" try: # 從選擇中解析模型名稱 if model_choice == "請先訓練模型": # 只顯示未微調的預測 pass else: # 解析選擇的模型路徑和名稱 model_path = model_choice.split(" | ")[0].replace("路徑: ", "") # 從 JSON 讀取模型資訊 with open('./saved_llama_models_list.json', 'r') as f: models_list = json.load(f) selected_model_info = None for model_info in models_list: if model_info['model_path'] == model_path: selected_model_info = model_info break if selected_model_info is None: return "找不到模型資訊", "找不到模型資訊" model_name = selected_model_info['model_name'] # ==================== 未微調的 Llama 預測 ==================== print("\n使用未微調 Llama 預測...") # 載入 tokenizer if model_choice != "請先訓練模型": baseline_tokenizer = AutoTokenizer.from_pretrained(model_name) else: baseline_tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-1B") model_name = "meta-llama/Llama-3.2-1B" if baseline_tokenizer.pad_token is None: baseline_tokenizer.pad_token = baseline_tokenizer.eos_token baseline_tokenizer.pad_token_id = baseline_tokenizer.eos_token_id baseline_model = AutoModelForSequenceClassification.from_pretrained( model_name, num_labels=2, torch_dtype=torch.float16 if device == "cuda" else torch.float32, device_map="auto" if device == "cuda" else None ) baseline_model.config.pad_token_id = baseline_tokenizer.pad_token_id baseline_model.eval() # Tokenize 輸入(未微調) baseline_inputs = baseline_tokenizer( text_input, return_tensors="pt", truncation=True, max_length=MAX_LENGTH ) if device == "cuda": baseline_inputs = {k: v.to(baseline_model.device) for k, v in baseline_inputs.items()} # 預測(未微調) with torch.no_grad(): baseline_outputs = baseline_model(**baseline_inputs) baseline_probs = torch.nn.functional.softmax(baseline_outputs.logits, dim=-1) baseline_pred_class = torch.argmax(baseline_probs, dim=-1).item() baseline_confidence = baseline_probs[0][baseline_pred_class].item() baseline_result = "存活" if baseline_pred_class == 0 else "死亡" baseline_prob_class0 = baseline_probs[0][0].item() baseline_prob_class1 = baseline_probs[0][1].item() baseline_output = f""" # 🔵 未微調 Llama 預測結果 ## 預測類別: **{baseline_result}** ## 信心度: **{baseline_confidence:.1%}** ## 機率分布: - **存活機率**: {baseline_prob_class0:.2%} - **死亡機率**: {baseline_prob_class1:.2%} --- **說明**: 此為原始 Llama 模型,未經任何領域資料訓練 """ # 清空記憶體 del baseline_model del baseline_tokenizer torch.cuda.empty_cache() # ==================== 微調後的 Llama 預測 ==================== if model_choice == "請先訓練模型": finetuned_output = """ # 🟢 微調 Llama 預測結果 ❌ 尚未訓練任何模型,請先在「模型訓練」頁面訓練模型 """ return baseline_output, finetuned_output print(f"\n使用微調模型: {model_path}") # 載入 tokenizer finetuned_tokenizer = AutoTokenizer.from_pretrained(model_path) if finetuned_tokenizer.pad_token is None: finetuned_tokenizer.pad_token = finetuned_tokenizer.eos_token finetuned_tokenizer.pad_token_id = finetuned_tokenizer.eos_token_id # 載入 PEFT 模型(根據微調方法) base_model = AutoModelForSequenceClassification.from_pretrained( model_name, num_labels=2, torch_dtype=torch.float16 if device == "cuda" else torch.float32, device_map="auto" if device == "cuda" else None ) # 根據微調方法載入模型 tuning_method = selected_model_info.get('tuning_method', 'LoRA') if tuning_method == "BitFit": # BitFit 直接載入完整模型 finetuned_model = AutoModelForSequenceClassification.from_pretrained( model_path, num_labels=2, torch_dtype=torch.float16 if device == "cuda" else torch.float32, device_map="auto" if device == "cuda" else None ) else: # 其他方法使用 PEFT finetuned_model = PeftModel.from_pretrained(base_model, model_path) # Prefix Tuning 需要禁用緩存 if tuning_method == "Prefix Tuning": finetuned_model.config.use_cache = False finetuned_model.config.pad_token_id = finetuned_tokenizer.pad_token_id finetuned_model.eval() # Tokenize 輸入(微調) finetuned_inputs = finetuned_tokenizer( text_input, return_tensors="pt", truncation=True, max_length=MAX_LENGTH ) if device == "cuda": finetuned_inputs = {k: v.to(finetuned_model.device) for k, v in finetuned_inputs.items()} # 預測(微調) with torch.no_grad(): finetuned_outputs = finetuned_model(**finetuned_inputs) finetuned_probs = torch.nn.functional.softmax(finetuned_outputs.logits, dim=-1) finetuned_pred_class = torch.argmax(finetuned_probs, dim=-1).item() finetuned_confidence = finetuned_probs[0][finetuned_pred_class].item() finetuned_result = "存活" if finetuned_pred_class == 0 else "死亡" finetuned_prob_class0 = finetuned_probs[0][0].item() finetuned_prob_class1 = finetuned_probs[0][1].item() finetuned_output = f""" # 🟢 微調 Llama 預測結果 ## 預測類別: **{finetuned_result}** ## 信心度: **{finetuned_confidence:.1%}** ## 機率分布: - **存活機率**: {finetuned_prob_class0:.2%} - **死亡機率**: {finetuned_prob_class1:.2%} --- ### 模型資訊: - **模型名稱**: {selected_model_info['model_name']} - **微調方法**: {selected_model_info['tuning_method']} - **最佳化指標**: {selected_model_info['best_metric']} - **訓練時間**: {selected_model_info['timestamp']} - **模型路徑**: {model_path} --- **注意**: 此預測僅供參考。 """ # 清空記憶體 del finetuned_model del finetuned_tokenizer torch.cuda.empty_cache() return baseline_output, finetuned_output except Exception as e: import traceback error_msg = f"❌ 預測錯誤:{str(e)}\n\n詳細錯誤訊息:\n{traceback.format_exc()}" return error_msg, error_msg def get_available_models(): """ 取得所有已訓練的模型列表 """ models_list_file = './saved_llama_models_list.json' if not os.path.exists(models_list_file): return ["請先訓練模型"] with open(models_list_file, 'r') as f: models_list = json.load(f) if len(models_list) == 0: return ["請先訓練模型"] # 格式化模型選項 model_choices = [] for i, model_info in enumerate(models_list, 1): choice = f"路徑: {model_info['model_path']} | 模型: {model_info['model_name']} | 時間: {model_info['timestamp']}" model_choices.append(choice) return model_choices # ==================== Gradio 介面 ==================== with gr.Blocks(title="Llama NBCD 訓練與預測平台", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🦙 Llama乳癌存活預測大型微調應用(Fine-tuning) ### 🌟 功能特色: - 🎯 使用多種 PEFT 方法進行參數高效微調 (LoRA, **AdaLoRA**, Adapter, BitFit, Prompt Tuning) - 📊 自動比較有/無微調的表現差異 - 🎨 可選擇最佳化指標(F1、Accuracy、Precision、Recall) - 🔮 訓練後可直接預測新樣本 - 💾 自動儲存最佳模型 - 🧹 自動記憶體管理 """) with gr.Tab("🎯 模型訓練"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📤 資料上傳") file_input = gr.File( label="上傳 CSV 檔案", file_types=[".csv"] ) gr.Markdown("### 🤖 模型選擇") model_name_input = gr.Textbox( value="meta-llama/Llama-3.2-1B", label="Hugging Face 模型名稱", info="例如: meta-llama/Llama-3.2-1B" ) gr.Markdown("### 🔧 微調方法選擇") tuning_method = gr.Radio( choices=["LoRA", "AdaLoRA", "Adapter", "BitFit", "Prompt Tuning"], value="LoRA", label="選擇微調方法", info="不同的參數效率微調方法" ) gr.Markdown("### 🎯 最佳模型選擇") best_metric = gr.Dropdown( choices=["f1", "accuracy", "precision", "recall", "sensitivity", "specificity"], value="recall", label="選擇最佳化指標", info="模型會根據此指標選擇最佳檢查點" ) gr.Markdown("### ⚙️ 資料平衡參數") target_samples_input = gr.Number( value=700, label="目標樣本數(每類別)" ) use_weights_checkbox = gr.Checkbox( value=True, label="使用類別權重", info="在損失函數中使用類別權重" ) gr.Markdown("### ⚙️ 訓練參數") epochs_input = gr.Number( value=3, label="訓練輪數 (Epochs)" ) batch_size_input = gr.Number( value=4, label="批次大小 (Batch Size)" ) lr_input = gr.Number( value=1e-4, label="學習率 (Learning Rate)" ) gr.Markdown("---") # ==================== LoRA 參數 ==================== with gr.Column(visible=True) as lora_params: gr.Markdown("### 🔷 LoRA 參數") lora_r_input = gr.Slider( minimum=4, maximum=64, value=16, step=4, label="LoRA Rank (r)", info="低秩分解的秩" ) lora_alpha_input = gr.Slider( minimum=8, maximum=128, value=32, step=8, label="LoRA Alpha", info="LoRA 縮放參數" ) lora_dropout_input = gr.Slider( minimum=0.0, maximum=0.5, value=0.1, step=0.05, label="LoRA Dropout", info="Dropout 率" ) lora_target_input = gr.Dropdown( choices=["query,value", "query,key,value", "all"], value="query,value", label="目標模組", info="用逗號分隔" ) # ==================== AdaLoRA 參數 ==================== with gr.Column(visible=False) as adalora_params: gr.Markdown("### 🔶 AdaLoRA 參數") adalora_init_r_input = gr.Slider( minimum=4, maximum=64, value=12, step=4, label="初始 Rank", info="訓練開始時的秩" ) adalora_target_r_input = gr.Slider( minimum=4, maximum=64, value=8, step=4, label="目標 Rank", info="訓練結束時的目標秩" ) adalora_alpha_input = gr.Slider( minimum=8, maximum=128, value=32, step=8, label="LoRA Alpha", info="縮放參數" ) adalora_tinit_input = gr.Number( value=0, label="Tinit", info="開始剪枝的步數" ) adalora_tfinal_input = gr.Number( value=0, label="Tfinal", info="結束剪枝的步數" ) adalora_delta_t_input = gr.Number( value=1, label="Delta T", info="剪枝頻率" ) # ==================== Adapter 參數 ==================== with gr.Column(visible=False) as adapter_params: gr.Markdown("### 🔶 Adapter 參數") adapter_reduction_input = gr.Slider( minimum=2, maximum=64, value=16, step=2, label="Reduction Factor", info="降維因子,越大參數越少" ) with gr.Column(visible=False) as prompt_tuning_params: gr.Markdown("### 🔷 Prompt Tuning 參數") prompt_tokens_input = gr.Slider( minimum=1, maximum=100, value=20, step=1, label="Virtual Tokens 數量" ) with gr.Column(visible=False) as prefix_tuning_params: gr.Markdown("### 🔶 Prefix Tuning 參數") gr.Markdown("⚠️ **注意**: 目前版本可能有兼容性問題,建議使用 Prompt Tuning") prefix_tokens_input = gr.Slider( minimum=1, maximum=100, value=30, step=1, label="Virtual Tokens 數量" ) train_button = gr.Button( "🚀 開始訓練", variant="primary", size="lg" ) with gr.Column(scale=2): gr.Markdown("### 📊 訓練結果與比較") # 第一格:資料資訊 data_info_output = gr.Markdown( value="### 等待訓練...\n\n訓練完成後會顯示資料資訊和訓練配置", label="資料資訊" ) # 第二和第三格:並排顯示 with gr.Row(): # 第二格:未微調 Llama baseline_output = gr.Markdown( value="### 未微調 Llama\n等待訓練完成...", label="未微調 Llama" ) # 第三格:微調後 Llama finetuned_output = gr.Markdown( value="### 微調後 Llama\n等待訓練完成...", label="微調後 Llama" ) with gr.Tab("🔮 模型預測"): gr.Markdown(""" ### 使用訓練好的模型進行預測 選擇已訓練的模型,輸入文本進行預測。會同時顯示未微調和微調模型的預測結果以供比較。 """) with gr.Row(): with gr.Column(): # 模型選擇下拉選單 model_dropdown = gr.Dropdown( label="選擇模型", choices=["請先訓練模型"], value="請先訓練模型", info="選擇要使用的已訓練模型" ) refresh_button = gr.Button( "🔄 重新整理模型列表", size="sm" ) text_input = gr.Textbox( label="輸入文本", placeholder="請輸入要預測的文本...", lines=10 ) predict_button = gr.Button( "🔮 開始預測", variant="primary", size="lg" ) with gr.Column(): gr.Markdown("### 預測結果比較") # 上框:未微調 Llama 預測結果 baseline_prediction_output = gr.Markdown( label="未微調 Llama", value="等待預測..." ) # 下框:微調 Llama 預測結果 finetuned_prediction_output = gr.Markdown( label="微調 Llama", value="等待預測..." ) with gr.Tab("📖 使用說明"): gr.Markdown(""" ## 🔧 微調方法說明 ### 五種參數高效微調方法 (已測試可用) | 方法 | 參數量 | 記憶體 | 訓練速度 | 適用場景 | |------|--------|--------|----------|----------| | **LoRA** | 很少 (~1%) | 低 | 快 | 通用,效果好 | | **AdaLoRA** | 很少 (~1%) | 低 | 快 | 自適應,效果更優 | | **Adapter** | 少 (~2-5%) | 低 | 中 | 多任務學習 | | **BitFit** | 極少 (~0.1%) | 極低 | 極快 | 快速微調 | | **Prompt Tuning** | 極少 (可調) | 極低 | 快 | 小數據集 | ### 方法詳解 #### 🔷 LoRA (Low-Rank Adaptation) - **原理**: 在原模型旁加入低秩矩陣 - **優點**: 效果接近全參數微調,參數量極少 - **參數**: rank (r) 和 alpha 控制適配器大小 - **推薦**: 最平衡的選擇,適合大多數任務 #### 🔷 AdaLoRA (Adaptive LoRA) ✅ 已修復 - **原理**: 基於 LoRA,但動態調整每個模組的秩 - **優點**: 比固定秩的 LoRA 效果更好,自動優化參數分配 - **參數**: 與 LoRA 相同,但會自動調整秩的分配 - **推薦**: 追求最佳效果,且願意稍微增加訓練時間 #### 🔶 Adapter (Bottleneck Adapters) - **原理**: 在 Transformer 層之間插入小型神經網路 - **優點**: 適合多任務學習,可以為不同任務訓練不同 adapter - **參數**: reduction factor 控制瓶頸層大小 - **推薦**: 需要處理多個相關任務時使用 #### 🔷 BitFit ✅ 已修復 - **原理**: 僅訓練模型中的 bias 參數 - **優點**: 參數量最少,訓練最快 - **缺點**: 效果可能略遜於其他方法 - **推薦**: 資源極度受限或需要快速實驗時使用 #### 🔶 Prompt Tuning (建議用來替代 Prefix Tuning) - **原理**: 在輸入前加入可學習的 soft prompts - **優點**: 參數量極少,不修改原模型 - **參數**: virtual tokens 數量 - **推薦**: 小數據集或想保持原模型完整時使用 ### 📊 指標說明 - **F1 Score**: 精確率和召回率的調和平均,平衡指標 - **Accuracy**: 整體準確率 - **Precision**: 預測為正類中的準確率 - **Recall**: 實際正類中被正確識別的比例 - **Sensitivity**: 敏感度,等同於 Recall - **Specificity**: 特異性,正確識別負類的能力 ### 📊 混淆矩陣說明 混淆矩陣顯示模型預測結果與實際結果的對照: - **True Negatives (TN)**: 實際為存活,預測也為存活 ✅ - **False Positives (FP)**: 實際為存活,但預測為死亡 ❌ - **False Negatives (FN)**: 實際為死亡,但預測為存活 ❌ - **True Positives (TP)**: 實際為死亡,預測也為死亡 ✅ """) # 根據選擇的微調方法顯示/隱藏相應參數 def update_params_visibility(method): if method == "LoRA": return ( gr.update(visible=True), # lora_params gr.update(visible=False), # adalora_params gr.update(visible=False), # adapter_params gr.update(visible=False), # prompt_tuning_params gr.update(visible=False) # prefix_tuning_params ) elif method == "AdaLoRA": return ( gr.update(visible=False), # lora_params gr.update(visible=True), # adalora_params gr.update(visible=False), # adapter_params gr.update(visible=False), # prompt_tuning_params gr.update(visible=False) # prefix_tuning_params ) elif method == "Adapter": return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) ) elif method == "Prompt Tuning": return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) ) elif method == "Prefix Tuning": return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) ) else: # BitFit return ( gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) ) tuning_method.change( fn=update_params_visibility, inputs=[tuning_method], outputs=[lora_params, adalora_params, adapter_params, prompt_tuning_params, prefix_tuning_params] ) # 設定訓練按鈕動作 train_button.click( fn=train_wrapper, inputs=[ file_input, model_name_input, target_samples_input, use_weights_checkbox, epochs_input, batch_size_input, lr_input, tuning_method, lora_r_input, lora_alpha_input, lora_dropout_input, lora_target_input, adalora_init_r_input, adalora_target_r_input, adalora_alpha_input, adalora_tinit_input, adalora_tfinal_input, adalora_delta_t_input, adapter_reduction_input, prompt_tokens_input, prefix_tokens_input, best_metric ], outputs=[data_info_output, baseline_output, finetuned_output] ) # 重新整理模型列表按鈕 def refresh_models(): return gr.update(choices=get_available_models(), value=get_available_models()[0]) refresh_button.click( fn=refresh_models, inputs=[], outputs=[model_dropdown] ) # 預測按鈕動作 predict_button.click( fn=predict_text, inputs=[model_dropdown, text_input], outputs=[baseline_prediction_output, finetuned_prediction_output] ) if __name__ == "__main__": demo.launch()