Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import pandas as pd | |
| import torch | |
| from transformers import ( | |
| BertTokenizer, BertForSequenceClassification, | |
| AutoTokenizer, AutoModelForSequenceClassification, | |
| TrainingArguments, Trainer, DataCollatorWithPadding | |
| ) | |
| from peft import ( | |
| LoraConfig, AdaLoraConfig, AdaptionPromptConfig, PrefixTuningConfig, | |
| get_peft_model, TaskType, prepare_model_for_kbit_training | |
| ) | |
| from datasets import Dataset, DatasetDict | |
| from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.utils import resample | |
| from torch import nn | |
| import os | |
| from datetime import datetime | |
| import gc | |
| import numpy as np | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "max_split_size_mb:512" | |
| torch.backends.cudnn.benchmark = False | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| # 全域變數 | |
| trained_models = {} | |
| model_counter = 0 | |
| baseline_results = {} | |
| baseline_model_cache = {} | |
| baseline_performance_cache = {} | |
| second_stage_models = {} # 儲存二次微調的模型 | |
| def calculate_improvement(baseline_val, finetuned_val): | |
| """安全計算改善率""" | |
| if baseline_val == 0: | |
| if finetuned_val > 0: | |
| return float('inf') | |
| else: | |
| return 0.0 | |
| return (finetuned_val - baseline_val) / baseline_val * 100 | |
| def format_improve(val): | |
| """格式化改善率""" | |
| if val == float('inf'): | |
| return "N/A (baseline=0)" | |
| return f"{val:+.1f}%" | |
| def thorough_memory_cleanup(): | |
| """徹底清理記憶體""" | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| torch.cuda.ipc_collect() | |
| torch.cuda.synchronize() | |
| def compute_metrics(pred): | |
| try: | |
| labels = pred.label_ids | |
| preds = pred.predictions.argmax(-1) | |
| precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='binary', pos_label=1, zero_division=0) | |
| acc = accuracy_score(labels, preds) | |
| cm = confusion_matrix(labels, preds) | |
| if cm.shape == (2, 2): | |
| tn, fp, fn, tp = cm.ravel() | |
| else: | |
| tn = fp = fn = tp = 0 | |
| sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0 | |
| specificity = tn / (tn + fp) if (tn + fp) > 0 else 0 | |
| return { | |
| 'accuracy': acc, 'f1': f1, 'precision': precision, 'recall': recall, | |
| 'sensitivity': sensitivity, 'specificity': specificity, | |
| 'tp': int(tp), 'tn': int(tn), 'fp': int(fp), 'fn': int(fn) | |
| } | |
| except Exception as e: | |
| print(f"Error in compute_metrics: {e}") | |
| return { | |
| 'accuracy': 0, 'f1': 0, 'precision': 0, 'recall': 0, | |
| 'sensitivity': 0, 'specificity': 0, 'tp': 0, 'tn': 0, 'fp': 0, 'fn': 0 | |
| } | |
| def evaluate_baseline(model, tokenizer, test_dataset, device, is_llama=False): | |
| """評估未微調的基準模型""" | |
| model.eval() | |
| all_preds = [] | |
| all_labels = [] | |
| from torch.utils.data import DataLoader | |
| def collate_fn(batch): | |
| return { | |
| 'input_ids': torch.stack([torch.tensor(item['input_ids']) for item in batch]), | |
| 'attention_mask': torch.stack([torch.tensor(item['attention_mask']) for item in batch]), | |
| 'labels': torch.tensor([item['label'] for item in batch]) | |
| } | |
| dataloader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn) | |
| with torch.no_grad(): | |
| for batch in dataloader: | |
| labels = batch.pop('labels') | |
| target_device = model.device if is_llama else device | |
| inputs = {k: v.to(target_device) for k, v in batch.items()} | |
| outputs = model(**inputs) | |
| preds = torch.argmax(outputs.logits, dim=-1) | |
| all_preds.extend(preds.cpu().numpy()) | |
| all_labels.extend(labels.numpy()) | |
| precision, recall, f1, _ = precision_recall_fscore_support(all_labels, all_preds, average='binary', pos_label=1, zero_division=0) | |
| acc = accuracy_score(all_labels, all_preds) | |
| cm = confusion_matrix(all_labels, all_preds) | |
| if cm.shape == (2, 2): | |
| tn, fp, fn, tp = cm.ravel() | |
| else: | |
| tn = fp = fn = tp = 0 | |
| sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0 | |
| specificity = tn / (tn + fp) if (tn + fp) > 0 else 0 | |
| return { | |
| 'accuracy': acc, 'f1': f1, 'precision': precision, 'recall': recall, | |
| 'sensitivity': sensitivity, 'specificity': specificity, | |
| 'tp': int(tp), 'tn': int(tn), 'fp': int(fp), 'fn': int(fn) | |
| } | |
| class WeightedTrainer(Trainer): | |
| def __init__(self, *args, class_weights=None, use_focal_loss=False, focal_gamma=2.0, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.class_weights = class_weights | |
| self.use_focal_loss = use_focal_loss | |
| self.focal_gamma = focal_gamma | |
| def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None): | |
| labels = inputs.pop("labels") | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| if self.class_weights is not None: | |
| class_weights = self.class_weights.to(logits.dtype).to(logits.device) | |
| else: | |
| class_weights = None | |
| if self.use_focal_loss: | |
| ce_loss = nn.CrossEntropyLoss(reduction='none')( | |
| logits.view(-1, 2), labels.view(-1) | |
| ) | |
| pt = torch.exp(-ce_loss) | |
| focal_weight = (1 - pt) ** self.focal_gamma | |
| focal_loss = focal_weight * ce_loss | |
| if class_weights is not None: | |
| sample_weights = class_weights[labels.view(-1)] | |
| focal_loss = focal_loss * sample_weights | |
| loss = focal_loss.mean() | |
| else: | |
| loss_fct = nn.CrossEntropyLoss(weight=class_weights) | |
| loss = loss_fct(logits.view(-1, 2), labels.view(-1)) | |
| return (loss, outputs) if return_outputs else loss | |
| def train_model(csv_file, base_model, method, num_epochs, batch_size, learning_rate, | |
| weight_decay, dropout, lora_r, lora_alpha, lora_dropout, | |
| adalora_init_r, adalora_tinit, adalora_tfinal, adalora_deltaT, | |
| adapter_len, prefix_len, best_metric): | |
| global trained_models, model_counter, baseline_results, baseline_performance_cache | |
| thorough_memory_cleanup() | |
| print(f"🧹 GPU 記憶體清理完成") | |
| model_mapping = { | |
| "BERT-base": "bert-base-uncased", | |
| "Llama-3.2-1B": "meta-llama/Llama-3.2-1B", | |
| } | |
| model_name = model_mapping.get(base_model, "bert-base-uncased") | |
| is_llama = "llama" in model_name.lower() | |
| try: | |
| if csv_file is None: | |
| return "❌ 請上傳 CSV", "", "", "" | |
| df = pd.read_csv(csv_file.name) | |
| text_col = 'Text' if 'Text' in df.columns else 'text' | |
| label_col = 'label' if 'label' in df.columns else 'nbcd' | |
| if text_col not in df.columns or label_col not in df.columns: | |
| return f"❌ 需要 {text_col} 和 {label_col} 欄位", "", "", "" | |
| df_clean = pd.DataFrame({ | |
| 'text': df[text_col].astype(str), | |
| 'label': df[label_col].astype(int) | |
| }).dropna() | |
| avg_length = df_clean['text'].str.len().mean() | |
| min_length = df_clean['text'].str.len().min() | |
| max_length = df_clean['text'].str.len().max() | |
| n0_original = int(sum(df_clean['label'] == 0)) | |
| n1_original = int(sum(df_clean['label'] == 1)) | |
| if n1_original == 0: | |
| return "❌ 無死亡樣本", "", "", "" | |
| ratio_original = n0_original / n1_original | |
| info = f"📊 原始資料: {len(df_clean)} 筆\n" | |
| info += f"📏 文本長度: 平均 {avg_length:.0f} | 最小 {min_length} | 最大 {max_length}\n" | |
| info += f"📈 原始分布 - 存活: {n0_original} | 死亡: {n1_original} (比例 {ratio_original:.2f}:1)\n" | |
| # ⭐ 改這裡:強制所有模型都使用資料平衡 | |
| if True: # 原本是 if is_llama,改成 True | |
| info += f"\n⚖️ 資料平衡策略:執行平衡處理...\n" | |
| df_class_0 = df_clean[df_clean['label'] == 0] | |
| df_class_1 = df_clean[df_clean['label'] == 1] | |
| # ⭐ 改這裡:BERT 和 Llama 用不同的平衡數量 | |
| target_n = 500 if not is_llama else 700 # BERT 用 500,Llama 用 700 | |
| if len(df_class_0) > target_n: | |
| df_class_0_balanced = resample(df_class_0, n_samples=target_n, random_state=42, replace=False) | |
| info += f" ✅ Class 0 欠採樣: {len(df_class_0)} → {len(df_class_0_balanced)} 筆\n" | |
| else: | |
| df_class_0_balanced = df_class_0 | |
| info += f" ⚠️ Class 0 樣本數不足,保持 {len(df_class_0)} 筆\n" | |
| if len(df_class_1) < target_n: | |
| df_class_1_balanced = resample(df_class_1, n_samples=target_n, random_state=42, replace=True) | |
| info += f" ✅ Class 1 過採樣: {len(df_class_1)} → {len(df_class_1_balanced)} 筆\n" | |
| else: | |
| df_class_1_balanced = df_class_1 | |
| info += f" ⚠️ Class 1 樣本數充足,保持 {len(df_class_1)} 筆\n" | |
| df_clean = pd.concat([df_class_0_balanced, df_class_1_balanced]) | |
| df_clean = df_clean.sample(frac=1, random_state=42).reset_index(drop=True) | |
| n0 = int(sum(df_clean['label'] == 0)) | |
| n1 = int(sum(df_clean['label'] == 1)) | |
| ratio = n0 / n1 | |
| info += f"\n📊 平衡後資料: {len(df_clean)} 筆\n" | |
| info += f"📈 平衡後分布 - 存活: {n0} | 死亡: {n1} (比例 {ratio:.2f}:1)\n" | |
| w0 = 1.0 | |
| w1 = 1.0 # 已平衡,不需要額外權重 | |
| info += f"🎯 類別權重: {w0:.4f} / {w1:.4f} (資料已平衡,使用相等權重)\n" | |
| info += f"🤖 模型: {base_model}\n" | |
| info += f"🔧 方法: {method.upper()}" | |
| if is_llama: | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| tokenizer.pad_token_id = tokenizer.eos_token_id | |
| max_length = 512 | |
| else: | |
| tokenizer = BertTokenizer.from_pretrained(model_name) | |
| max_length = 256 | |
| dataset = Dataset.from_pandas(df_clean[['text', 'label']]) | |
| def preprocess(ex): | |
| return tokenizer(ex['text'], truncation=True, padding='max_length', max_length=max_length) | |
| tokenized = dataset.map(preprocess, batched=True, remove_columns=['text']) | |
| split = tokenized.train_test_split(test_size=0.2, seed=42) | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| info += f"\n裝置: {'GPU ✅' if torch.cuda.is_available() else 'CPU ⚠️'}" | |
| # 評估基準模型 | |
| baseline_key = f"{base_model}_baseline" | |
| if baseline_key in baseline_performance_cache: | |
| info += f"\n✅ 使用快取的 Baseline 評估結果\n" | |
| baseline_perf = baseline_performance_cache[baseline_key] | |
| else: | |
| info += f"\n🔍 首次評估 Baseline 模型...\n" | |
| if is_llama: | |
| baseline_model = AutoModelForSequenceClassification.from_pretrained( | |
| model_name, num_labels=2, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| device_map="auto" if torch.cuda.is_available() else None | |
| ) | |
| baseline_model.config.pad_token_id = tokenizer.pad_token_id | |
| else: | |
| baseline_model = BertForSequenceClassification.from_pretrained(model_name, num_labels=2) | |
| baseline_model = baseline_model.to(device) | |
| baseline_perf = evaluate_baseline(baseline_model, tokenizer, split['test'], device, is_llama=is_llama) | |
| baseline_performance_cache[baseline_key] = baseline_perf | |
| baseline_results[baseline_key] = baseline_perf | |
| del baseline_model | |
| thorough_memory_cleanup() | |
| info += f"\n\n🔧 套用 {method.upper()} 微調..." | |
| if is_llama: | |
| model = AutoModelForSequenceClassification.from_pretrained( | |
| model_name, num_labels=2, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| device_map="auto" if torch.cuda.is_available() else None | |
| ) | |
| model.config.pad_token_id = tokenizer.pad_token_id | |
| else: | |
| model = BertForSequenceClassification.from_pretrained( | |
| model_name, num_labels=2, | |
| hidden_dropout_prob=dropout, | |
| attention_probs_dropout_prob=dropout | |
| ) | |
| peft_applied = False | |
| # 根據方法套用對應的 PEFT 配置 | |
| if method == "lora": | |
| if is_llama: | |
| config = LoraConfig( | |
| task_type=TaskType.SEQ_CLS, | |
| r=int(lora_r), | |
| lora_alpha=int(lora_alpha), | |
| lora_dropout=lora_dropout, | |
| target_modules=["q_proj", "v_proj"], | |
| bias="none" | |
| ) | |
| else: | |
| config = LoraConfig( | |
| task_type=TaskType.SEQ_CLS, | |
| r=int(lora_r), | |
| lora_alpha=int(lora_alpha), | |
| lora_dropout=lora_dropout, | |
| target_modules=["query", "value"], | |
| bias="none" | |
| ) | |
| model = get_peft_model(model, config) | |
| peft_applied = True | |
| info += f"\n✅ LoRA 已套用(r={int(lora_r)}, alpha={int(lora_alpha)})" | |
| elif method == "adalora": | |
| steps_per_epoch = len(split['train']) // int(batch_size) | |
| total_steps = steps_per_epoch * int(num_epochs) | |
| adjusted_tinit = min(int(adalora_tinit), int(total_steps * 0.2)) | |
| adjusted_tfinal = min(int(adalora_tfinal), int(total_steps * 0.9)) | |
| if adjusted_tinit >= adjusted_tfinal: | |
| adjusted_tinit = int(total_steps * 0.1) | |
| adjusted_tfinal = int(total_steps * 0.8) | |
| info += f"\n📊 AdaLoRA 步數調整:\n" | |
| info += f" 總訓練步數: {total_steps}\n" | |
| info += f" tinit: {int(adalora_tinit)} → {adjusted_tinit}\n" | |
| info += f" tfinal: {int(adalora_tfinal)} → {adjusted_tfinal}\n" | |
| if is_llama: | |
| config = AdaLoraConfig( | |
| task_type=TaskType.SEQ_CLS, | |
| r=int(lora_r), | |
| lora_alpha=int(lora_alpha), | |
| lora_dropout=lora_dropout, | |
| target_modules=["q_proj", "v_proj"], | |
| init_r=int(adalora_init_r), | |
| tinit=adjusted_tinit, | |
| tfinal=adjusted_tfinal, | |
| deltaT=int(adalora_deltaT) | |
| ) | |
| else: | |
| config = AdaLoraConfig( | |
| task_type=TaskType.SEQ_CLS, | |
| r=int(lora_r), | |
| lora_alpha=int(lora_alpha), | |
| lora_dropout=lora_dropout, | |
| target_modules=["query", "value"], | |
| init_r=int(adalora_init_r), | |
| tinit=adjusted_tinit, | |
| tfinal=adjusted_tfinal, | |
| deltaT=int(adalora_deltaT) | |
| ) | |
| model = get_peft_model(model, config) | |
| peft_applied = True | |
| info += f"\n✅ AdaLoRA 已套用(r={int(lora_r)}, alpha={int(lora_alpha)}, init_r={int(adalora_init_r)})" | |
| elif method == "adapter": | |
| # Adapter (LLaMA-Adapter style) | |
| if is_llama: | |
| config = AdaptionPromptConfig( | |
| task_type=TaskType.SEQ_CLS, | |
| adapter_len=int(adapter_len), | |
| adapter_layers=30 # 根據 Llama 層數調整 | |
| ) | |
| model = get_peft_model(model, config) | |
| peft_applied = True | |
| info += f"\n✅ Adapter 已套用(length={int(adapter_len)})" | |
| else: | |
| info += f"\n⚠️ Adapter 僅支援 Llama,改用 LoRA" | |
| config = LoraConfig( | |
| task_type=TaskType.SEQ_CLS, | |
| r=int(lora_r), | |
| lora_alpha=int(lora_alpha), | |
| lora_dropout=lora_dropout, | |
| target_modules=["query", "value"], | |
| bias="none" | |
| ) | |
| model = get_peft_model(model, config) | |
| peft_applied = True | |
| elif method == "prefix": | |
| # Prefix Tuning | |
| config = PrefixTuningConfig( | |
| task_type=TaskType.SEQ_CLS, | |
| num_virtual_tokens=int(prefix_len), | |
| prefix_projection=True # 使用 MLP 投影 | |
| ) | |
| model = get_peft_model(model, config) | |
| peft_applied = True | |
| info += f"\n✅ Prefix Tuning 已套用(tokens={int(prefix_len)})" | |
| elif method == "prompt": | |
| # Prompt Tuning (類似 Prefix 但更簡單) | |
| config = PrefixTuningConfig( | |
| task_type=TaskType.SEQ_CLS, | |
| num_virtual_tokens=int(prefix_len), | |
| prefix_projection=False # 不使用投影 | |
| ) | |
| model = get_peft_model(model, config) | |
| peft_applied = True | |
| info += f"\n✅ Prompt Tuning 已套用(tokens={int(prefix_len)})" | |
| elif method == "bitfit": | |
| # BitFit: 只訓練 bias | |
| for name, param in model.named_parameters(): | |
| if 'bias' not in name: | |
| param.requires_grad = False | |
| peft_applied = True | |
| info += f"\n✅ BitFit 已套用(僅訓練 bias 參數)" | |
| if not peft_applied: | |
| info += f"\n⚠️ 警告:{method} 方法未被識別,使用 Full Fine-tuning" | |
| if not is_llama: | |
| model = model.to(device) | |
| total = sum(p.numel() for p in model.parameters()) | |
| trainable = sum(p.numel() for p in model.parameters() if p.requires_grad) | |
| info += f"\n\n💾 參數量\n總參數: {total:,}\n可訓練: {trainable:,}\n比例: {trainable/total*100:.2f}%" | |
| if is_llama: | |
| weight_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| weights = torch.tensor([w0, w1], dtype=weight_dtype).to(model.device) | |
| else: | |
| weights = torch.tensor([w0, w1], dtype=torch.float32).to(device) | |
| info += f"\n⚖️ 權重 dtype: {weights.dtype} | device: {weights.device}\n" | |
| metrics_lower_is_better = ['loss'] | |
| is_greater_better = best_metric not in metrics_lower_is_better | |
| args = TrainingArguments( | |
| output_dir='./results', | |
| num_train_epochs=int(num_epochs), | |
| per_device_train_batch_size=int(batch_size), | |
| per_device_eval_batch_size=int(batch_size)*2, | |
| learning_rate=float(learning_rate), | |
| weight_decay=float(weight_decay), | |
| evaluation_strategy="epoch", | |
| save_strategy="epoch", | |
| load_best_model_at_end=True, | |
| metric_for_best_model=best_metric, | |
| greater_is_better=is_greater_better, | |
| report_to="none", | |
| logging_steps=10, | |
| warmup_steps=100 if is_llama else 50, | |
| warmup_ratio=0.1 if is_llama else 0.0, | |
| logging_first_step=True, | |
| bf16=(torch.cuda.is_available() and is_llama), | |
| gradient_accumulation_steps=4 if is_llama else 1, | |
| gradient_checkpointing=True if is_llama else False, | |
| optim="adamw_torch", | |
| seed=42, | |
| max_grad_norm=0.3 if is_llama else 1.0, | |
| ) | |
| info += f"\n📊 最佳模型選擇: {best_metric} ({'越大越好' if is_greater_better else '越小越好'})\n" | |
| focal_gamma = 2.0 | |
| trainer = WeightedTrainer( | |
| model=model, | |
| args=args, | |
| train_dataset=split['train'], | |
| eval_dataset=split['test'], | |
| compute_metrics=compute_metrics, | |
| class_weights=weights, | |
| use_focal_loss=True, | |
| focal_gamma=2.0 | |
| ) | |
| if is_llama: | |
| info += f"\n⚡ Llama 使用 Focal Loss (gamma={focal_gamma}) + {weight_boost}x 權重策略" | |
| info += "\n\n⏳ 開始訓練..." | |
| info += f"\n📊 訓練前檢查:" | |
| info += f"\n - 訓練樣本: {len(split['train'])}" | |
| info += f"\n - 測試樣本: {len(split['test'])}" | |
| info += f"\n - 批次數/epoch: {len(split['train']) // int(batch_size)}" | |
| train_result = trainer.train() | |
| info += f"\n\n✅ 訓練完成!" | |
| info += f"\n📉 最終 Training Loss: {train_result.training_loss:.4f}" | |
| results = trainer.evaluate() | |
| model_counter += 1 | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| model_id = f"{base_model}_{method}_{timestamp}" | |
| trained_models[model_id] = { | |
| 'model': model, | |
| 'tokenizer': tokenizer, | |
| 'results': results, | |
| 'baseline': baseline_perf, | |
| 'config': { | |
| 'type': base_model, | |
| 'model_name': model_name, | |
| 'method': method, | |
| 'metric': best_metric, | |
| 'is_llama': is_llama | |
| }, | |
| 'timestamp': timestamp, | |
| 'stage': 1 # 標記為第一階段訓練 | |
| } | |
| metric_name_map = { | |
| 'f1': 'F1', | |
| 'accuracy': 'Accuracy', | |
| 'precision': 'Precision', | |
| 'recall': 'Recall', | |
| 'sensitivity': 'Sensitivity', | |
| 'specificity': 'Specificity' | |
| } | |
| baseline_val = baseline_perf[best_metric] | |
| finetuned_val = results[f'eval_{best_metric}'] | |
| improvement = calculate_improvement(baseline_val, finetuned_val) | |
| baseline_output = f"🔬 純 {base_model}(未微調)\n\n" | |
| baseline_output += f"📊 {metric_name_map[best_metric]} 表現\n" | |
| baseline_output += f"{metric_name_map[best_metric]}: {baseline_val:.4f}\n\n" | |
| baseline_output += f"混淆矩陣\n" | |
| baseline_output += f"TP: {baseline_perf['tp']} | TN: {baseline_perf['tn']}\n" | |
| baseline_output += f"FP: {baseline_perf['fp']} | FN: {baseline_perf['fn']}" | |
| finetuned_output = f"✅ 微調 {base_model}\n" | |
| finetuned_output += f"模型: {model_id}\n\n" | |
| finetuned_output += f"📊 {metric_name_map[best_metric]} 表現\n" | |
| finetuned_output += f"{metric_name_map[best_metric]}: {finetuned_val:.4f}\n\n" | |
| finetuned_output += f"混淆矩陣\n" | |
| finetuned_output += f"TP: {results['eval_tp']} | TN: {results['eval_tn']}\n" | |
| finetuned_output += f"FP: {results['eval_fp']} | FN: {results['eval_fn']}" | |
| comparison_output = f"📊 純 {base_model} vs 微調 {base_model} 比較\n\n" | |
| comparison_output += f"🎯 選擇的評估指標: {metric_name_map[best_metric]}\n\n" | |
| comparison_output += f"{metric_name_map[best_metric]} 改善:\n" | |
| comparison_output += f"{baseline_val:.4f} → {finetuned_val:.4f} ({format_improve(improvement)})\n\n" | |
| comparison_output += f"混淆矩陣變化:\n" | |
| comparison_output += f"TP: {baseline_perf['tp']} → {results['eval_tp']} ({results['eval_tp'] - baseline_perf['tp']:+d})\n" | |
| comparison_output += f"TN: {baseline_perf['tn']} → {results['eval_tn']} ({results['eval_tn'] - baseline_perf['tn']:+d})\n" | |
| comparison_output += f"FP: {baseline_perf['fp']} → {results['eval_fp']} ({results['eval_fp'] - baseline_perf['fp']:+d})\n" | |
| comparison_output += f"FN: {baseline_perf['fn']} → {results['eval_fn']} ({results['eval_fn'] - baseline_perf['fn']:+d})" | |
| info += "\n\n✅ 訓練完成!" | |
| thorough_memory_cleanup() | |
| return info, baseline_output, finetuned_output, comparison_output | |
| except Exception as e: | |
| thorough_memory_cleanup() | |
| import traceback | |
| error_msg = f"❌ 錯誤: {str(e)}\n\n{traceback.format_exc()}" | |
| return error_msg, "", "", "" | |
| def second_stage_train(first_model_id, csv_file, num_epochs, batch_size, learning_rate, best_metric): | |
| """二次微調:基於已訓練模型繼續訓練""" | |
| global trained_models, second_stage_models | |
| if not first_model_id or first_model_id not in trained_models: | |
| return "❌ 請選擇第一階段模型", "", "" | |
| if csv_file is None: | |
| return "❌ 請上傳新的訓練資料", "", "" | |
| try: | |
| thorough_memory_cleanup() | |
| # 載入第一階段模型 | |
| first_model_info = trained_models[first_model_id] | |
| model = first_model_info['model'] | |
| tokenizer = first_model_info['tokenizer'] | |
| config = first_model_info['config'] | |
| is_llama = config['is_llama'] | |
| info = f"🔄 二次微調\n" | |
| info += f"基於模型: {first_model_id}\n" | |
| info += f"方法: {config['method'].upper()}\n\n" | |
| # 讀取新資料 | |
| df = pd.read_csv(csv_file.name) | |
| text_col = 'Text' if 'Text' in df.columns else 'text' | |
| label_col = 'label' if 'label' in df.columns else 'nbcd' | |
| df_clean = pd.DataFrame({ | |
| 'text': df[text_col].astype(str), | |
| 'label': df[label_col].astype(int) | |
| }).dropna() | |
| n0 = int(sum(df_clean['label'] == 0)) | |
| n1 = int(sum(df_clean['label'] == 1)) | |
| info += f"📊 新資料: {len(df_clean)} 筆\n" | |
| info += f"📈 分布 - 存活: {n0} | 死亡: {n1}\n\n" | |
| # 準備資料集 | |
| max_length = 512 if is_llama else 256 | |
| dataset = Dataset.from_pandas(df_clean[['text', 'label']]) | |
| def preprocess(ex): | |
| return tokenizer(ex['text'], truncation=True, padding='max_length', max_length=max_length) | |
| tokenized = dataset.map(preprocess, batched=True, remove_columns=['text']) | |
| split = tokenized.train_test_split(test_size=0.2, seed=42) | |
| # 計算權重 | |
| if is_llama: | |
| w0 = 1.0 | |
| w1 = (n0 / n1) * 1.5 | |
| weight_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| weights = torch.tensor([w0, w1], dtype=weight_dtype).to(model.device) | |
| else: | |
| w0 = 1.0 | |
| w1 = min((n0 / n1) * 0.8, 15.0) | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| weights = torch.tensor([w0, w1], dtype=torch.float32).to(device) | |
| info += f"🎯 類別權重: {w0:.4f} / {w1:.4f}\n" | |
| # 訓練配置 | |
| args = TrainingArguments( | |
| output_dir='./results_stage2', | |
| num_train_epochs=int(num_epochs), | |
| per_device_train_batch_size=int(batch_size), | |
| per_device_eval_batch_size=int(batch_size)*2, | |
| learning_rate=float(learning_rate) * 0.5, # 二次微調使用較小學習率 | |
| weight_decay=0.01, | |
| evaluation_strategy="epoch", | |
| save_strategy="epoch", | |
| load_best_model_at_end=True, | |
| metric_for_best_model=best_metric, | |
| greater_is_better=True, | |
| report_to="none", | |
| logging_steps=10, | |
| seed=43 # 不同的 seed | |
| ) | |
| info += f"\n⏳ 開始二次微調(學習率減半)...\n" | |
| trainer = WeightedTrainer( | |
| model=model, | |
| args=args, | |
| train_dataset=split['train'], | |
| eval_dataset=split['test'], | |
| compute_metrics=compute_metrics, | |
| class_weights=weights, | |
| use_focal_loss=is_llama | |
| ) | |
| train_result = trainer.train() | |
| results = trainer.evaluate() | |
| info += f"\n✅ 二次微調完成!\n" | |
| info += f"📉 Training Loss: {train_result.training_loss:.4f}\n" | |
| # 保存二次微調模型 | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| model_id = f"{first_model_id}_stage2_{timestamp}" | |
| second_stage_models[model_id] = { | |
| 'model': model, | |
| 'tokenizer': tokenizer, | |
| 'results': results, | |
| 'first_stage_id': first_model_id, | |
| 'first_stage_results': first_model_info['results'], | |
| 'baseline': first_model_info['baseline'], | |
| 'config': config, | |
| 'timestamp': timestamp, | |
| 'stage': 2 | |
| } | |
| # 同時加入 trained_models 以便預測使用 | |
| trained_models[model_id] = second_stage_models[model_id] | |
| metric_name_map = { | |
| 'f1': 'F1', 'accuracy': 'Accuracy', 'precision': 'Precision', | |
| 'recall': 'Recall', 'sensitivity': 'Sensitivity', 'specificity': 'Specificity' | |
| } | |
| # 比較結果 | |
| baseline_val = first_model_info['baseline'][best_metric] | |
| stage1_val = first_model_info['results'][f'eval_{best_metric}'] | |
| stage2_val = results[f'eval_{best_metric}'] | |
| stage1_improve = calculate_improvement(baseline_val, stage1_val) | |
| stage2_improve = calculate_improvement(stage1_val, stage2_val) | |
| total_improve = calculate_improvement(baseline_val, stage2_val) | |
| stage1_output = f"🥇 第一階段微調結果\n\n" | |
| stage1_output += f"模型: {first_model_id}\n" | |
| stage1_output += f"{metric_name_map[best_metric]}: {stage1_val:.4f}\n" | |
| stage1_output += f"較 Baseline 改善: {format_improve(stage1_improve)}\n\n" | |
| stage1_output += f"混淆矩陣\n" | |
| stage1_output += f"TP: {first_model_info['results']['eval_tp']} | TN: {first_model_info['results']['eval_tn']}\n" | |
| stage1_output += f"FP: {first_model_info['results']['eval_fp']} | FN: {first_model_info['results']['eval_fn']}" | |
| stage2_output = f"🥈 第二階段微調結果\n\n" | |
| stage2_output += f"模型: {model_id}\n" | |
| stage2_output += f"{metric_name_map[best_metric]}: {stage2_val:.4f}\n" | |
| stage2_output += f"較第一階段改善: {format_improve(stage2_improve)}\n" | |
| stage2_output += f"較 Baseline 總改善: {format_improve(total_improve)}\n\n" | |
| stage2_output += f"混淆矩陣\n" | |
| stage2_output += f"TP: {results['eval_tp']} | TN: {results['eval_tn']}\n" | |
| stage2_output += f"FP: {results['eval_fp']} | FN: {results['eval_fn']}" | |
| thorough_memory_cleanup() | |
| return info, stage1_output, stage2_output | |
| except Exception as e: | |
| thorough_memory_cleanup() | |
| import traceback | |
| return f"❌ 錯誤: {str(e)}\n\n{traceback.format_exc()}", "", "" | |
| def evaluate_on_new_data(csv_file, selected_models): | |
| """在全新資料上評估多個模型""" | |
| global trained_models, baseline_model_cache | |
| if csv_file is None: | |
| return "❌ 請上傳測試資料" | |
| if not selected_models: | |
| return "❌ 請至少選擇一個模型" | |
| try: | |
| # 讀取測試資料 | |
| df = pd.read_csv(csv_file.name) | |
| text_col = 'Text' if 'Text' in df.columns else 'text' | |
| label_col = 'label' if 'label' in df.columns else 'nbcd' | |
| df_clean = pd.DataFrame({ | |
| 'text': df[text_col].astype(str), | |
| 'label': df[label_col].astype(int) | |
| }).dropna() | |
| output = f"# 📊 全新資料評估報告\n\n" | |
| output += f"## 測試資料概況\n" | |
| output += f"- 總樣本數: {len(df_clean)}\n" | |
| output += f"- 存活 (0): {sum(df_clean['label']==0)}\n" | |
| output += f"- 死亡 (1): {sum(df_clean['label']==1)}\n\n" | |
| output += f"## 模型表現比較\n\n" | |
| results_table = [] | |
| for model_id in selected_models: | |
| if model_id not in trained_models: | |
| continue | |
| info = trained_models[model_id] | |
| model = info['model'] | |
| tokenizer = info['tokenizer'] | |
| config = info['config'] | |
| is_llama = config['is_llama'] | |
| # 準備資料 | |
| max_length = 512 if is_llama else 256 | |
| dataset = Dataset.from_pandas(df_clean[['text', 'label']]) | |
| def preprocess(ex): | |
| return tokenizer(ex['text'], truncation=True, padding='max_length', max_length=max_length) | |
| tokenized = dataset.map(preprocess, batched=True, remove_columns=['text']) | |
| # 評估 | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| perf = evaluate_baseline(model, tokenizer, tokenized, device, is_llama=is_llama) | |
| stage = info.get('stage', 1) | |
| stage_label = "🔬 Baseline" if "baseline" in model_id else f"🥇 Stage {stage}" | |
| results_table.append({ | |
| 'model': model_id, | |
| 'stage': stage_label, | |
| 'method': config['method'].upper(), | |
| 'f1': perf['f1'], | |
| 'acc': perf['accuracy'], | |
| 'prec': perf['precision'], | |
| 'recall': perf['recall'], | |
| 'sens': perf['sensitivity'], | |
| 'spec': perf['specificity'], | |
| 'tp': perf['tp'], | |
| 'tn': perf['tn'], | |
| 'fp': perf['fp'], | |
| 'fn': perf['fn'] | |
| }) | |
| # 也評估 baseline 模型 | |
| if results_table: | |
| first_model = trained_models[selected_models[0]] | |
| config = first_model['config'] | |
| model_name = config['model_name'] | |
| is_llama = config['is_llama'] | |
| cache_key = model_name | |
| if cache_key not in baseline_model_cache: | |
| if is_llama: | |
| baseline_model = AutoModelForSequenceClassification.from_pretrained( | |
| model_name, num_labels=2, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| device_map="auto" if torch.cuda.is_available() else None | |
| ) | |
| baseline_model.config.pad_token_id = first_model['tokenizer'].pad_token_id | |
| else: | |
| baseline_model = BertForSequenceClassification.from_pretrained(model_name, num_labels=2) | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| baseline_model = baseline_model.to(device) | |
| baseline_model.eval() | |
| baseline_model_cache[cache_key] = baseline_model | |
| else: | |
| baseline_model = baseline_model_cache[cache_key] | |
| tokenizer = first_model['tokenizer'] | |
| max_length = 512 if is_llama else 256 | |
| dataset = Dataset.from_pandas(df_clean[['text', 'label']]) | |
| def preprocess(ex): | |
| return tokenizer(ex['text'], truncation=True, padding='max_length', max_length=max_length) | |
| tokenized = dataset.map(preprocess, batched=True, remove_columns=['text']) | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| baseline_perf = evaluate_baseline(baseline_model, tokenizer, tokenized, device, is_llama=is_llama) | |
| results_table.insert(0, { | |
| 'model': f"{config['type']}-Baseline", | |
| 'stage': "🔬 Baseline", | |
| 'method': "None", | |
| 'f1': baseline_perf['f1'], | |
| 'acc': baseline_perf['accuracy'], | |
| 'prec': baseline_perf['precision'], | |
| 'recall': baseline_perf['recall'], | |
| 'sens': baseline_perf['sensitivity'], | |
| 'spec': baseline_perf['specificity'], | |
| 'tp': baseline_perf['tp'], | |
| 'tn': baseline_perf['tn'], | |
| 'fp': baseline_perf['fp'], | |
| 'fn': baseline_perf['fn'] | |
| }) | |
| # 輸出表格 | |
| output += "| 模型 | 階段 | 方法 | F1 | Acc | Prec | Recall | Sens | Spec |\n" | |
| output += "|------|------|------|-----|-----|------|--------|------|------|\n" | |
| for r in results_table: | |
| output += f"| {r['model'][:30]} | {r['stage']} | {r['method']} | " | |
| output += f"{r['f1']:.4f} | {r['acc']:.4f} | {r['prec']:.4f} | " | |
| output += f"{r['recall']:.4f} | {r['sens']:.4f} | {r['spec']:.4f} |\n" | |
| output += "\n## 混淆矩陣\n\n" | |
| output += "| 模型 | TP | TN | FP | FN |\n" | |
| output += "|------|----|----|----|\----|\n" | |
| for r in results_table: | |
| output += f"| {r['model'][:30]} | {r['tp']} | {r['tn']} | {r['fp']} | {r['fn']} |\n" | |
| # 找出最佳模型 | |
| output += "\n## 🏆 最佳模型\n\n" | |
| for metric in ['f1', 'acc', 'sens', 'spec']: | |
| best = max(results_table, key=lambda x: x[metric]) | |
| baseline_val = results_table[0][metric] | |
| improve = calculate_improvement(baseline_val, best[metric]) | |
| metric_names = {'f1': 'F1', 'acc': 'Accuracy', 'sens': 'Sensitivity', 'spec': 'Specificity'} | |
| output += f"**{metric_names[metric]}**: {best['model'][:30]} ({best[metric]:.4f}, 較 Baseline 改善 {format_improve(improve)})\n\n" | |
| return output | |
| except Exception as e: | |
| import traceback | |
| return f"❌ 錯誤: {str(e)}\n\n{traceback.format_exc()}" | |
| def predict(model_id, text): | |
| """預測功能(保持原樣)""" | |
| global baseline_model_cache | |
| if not model_id or model_id not in trained_models: | |
| return "❌ 請選擇模型" | |
| if not text: | |
| return "❌ 請輸入文字" | |
| try: | |
| info = trained_models[model_id] | |
| model, tokenizer = info['model'], info['tokenizer'] | |
| config = info['config'] | |
| is_llama = config.get('is_llama', False) | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| max_length = 512 if is_llama else 256 | |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=max_length) | |
| if not is_llama: | |
| inputs_cuda = {k: v.to(device) for k, v in inputs.items()} | |
| else: | |
| inputs_cuda = {k: v.to(model.device) for k, v in inputs.items()} | |
| model.eval() | |
| with torch.no_grad(): | |
| outputs = model(**inputs_cuda) | |
| probs_finetuned = torch.nn.functional.softmax(outputs.logits, dim=-1) | |
| pred_finetuned = torch.argmax(probs_finetuned, dim=-1).item() | |
| result_finetuned = "存活" if pred_finetuned == 0 else "死亡" | |
| cache_key = config['model_name'] | |
| if cache_key not in baseline_model_cache: | |
| if is_llama: | |
| baseline_model = AutoModelForSequenceClassification.from_pretrained( | |
| config['model_name'], num_labels=2, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| device_map="auto" if torch.cuda.is_available() else None | |
| ) | |
| baseline_model.config.pad_token_id = tokenizer.pad_token_id | |
| else: | |
| baseline_model = BertForSequenceClassification.from_pretrained(config['model_name'], num_labels=2) | |
| baseline_model = baseline_model.to(device) | |
| baseline_model.eval() | |
| baseline_model_cache[cache_key] = baseline_model | |
| else: | |
| baseline_model = baseline_model_cache[cache_key] | |
| with torch.no_grad(): | |
| if is_llama: | |
| inputs_baseline = {k: v.to(baseline_model.device) for k, v in inputs.items()} | |
| else: | |
| inputs_baseline = inputs_cuda | |
| outputs_baseline = baseline_model(**inputs_baseline) | |
| probs_baseline = torch.nn.functional.softmax(outputs_baseline.logits, dim=-1) | |
| pred_baseline = torch.argmax(probs_baseline, dim=-1).item() | |
| result_baseline = "存活" if pred_baseline == 0 else "死亡" | |
| agreement = "✅ 一致" if pred_finetuned == pred_baseline else "⚠️ 不一致" | |
| metric_name_map = { | |
| 'f1': 'F1', | |
| 'accuracy': 'Accuracy', | |
| 'precision': 'Precision', | |
| 'recall': 'Recall', | |
| 'sensitivity': 'Sensitivity', | |
| 'specificity': 'Specificity' | |
| } | |
| selected_metric = config['metric'] | |
| metric_display = metric_name_map[selected_metric] | |
| baseline_metric_val = info['baseline'][selected_metric] | |
| finetuned_metric_val = info['results'][f'eval_{selected_metric}'] | |
| improvement = calculate_improvement(baseline_metric_val, finetuned_metric_val) | |
| stage = info.get('stage', 1) | |
| stage_label = f"Stage {stage}" if stage > 1 else "微調" | |
| output = f"""🔮 預測結果比較 | |
| 📝 輸入文字: {text[:100]}{'...' if len(text) > 100 else ''} | |
| {'='*50} | |
| 🧬 {stage_label}模型 ({model_id}) | |
| 預測: {result_finetuned} | |
| 信心: {probs_finetuned[0][pred_finetuned].item():.2%} | |
| 機率分布: | |
| • 存活: {probs_finetuned[0][0].item():.2%} | |
| • 死亡: {probs_finetuned[0][1].item():.2%} | |
| {'='*50} | |
| 🔬 基準模型(未微調 {config['type']}) | |
| 預測: {result_baseline} | |
| 信心: {probs_baseline[0][pred_baseline].item():.2%} | |
| 機率分布: | |
| • 存活: {probs_baseline[0][0].item():.2%} | |
| • 死亡: {probs_baseline[0][1].item():.2%} | |
| {'='*50} | |
| 📊 結論 | |
| 兩模型預測: {agreement} | |
| """ | |
| if pred_finetuned != pred_baseline: | |
| output += f"\n💡 分析: {stage_label}模型預測為【{result_finetuned}】,而基準模型預測為【{result_baseline}】" | |
| output += f"\n 這顯示了 fine-tuning 對此案例的影響!" | |
| output += f""" | |
| 📈 模型表現(基於 {metric_display}) | |
| {stage_label}模型 {metric_display}: {finetuned_metric_val:.4f} | |
| 基準模型 {metric_display}: {baseline_metric_val:.4f} | |
| 改善幅度: {format_improve(improvement)} | |
| """ | |
| return output | |
| except Exception as e: | |
| import traceback | |
| return f"❌ 錯誤: {str(e)}\n\n{traceback.format_exc()}" | |
| def compare(): | |
| """比較所有模型""" | |
| if not trained_models: | |
| return "❌ 尚未訓練模型" | |
| text = "# 📊 模型比較\n\n" | |
| text += "## 微調模型表現\n\n" | |
| text += "| 模型 | 階段 | 基礎 | 方法 | F1 | Acc | Prec | Recall | Sens | Spec |\n" | |
| text += "|------|------|------|------|-----|-----|------|--------|------|------|\n" | |
| for mid, info in trained_models.items(): | |
| r = info['results'] | |
| c = info['config'] | |
| stage = info.get('stage', 1) | |
| text += f"| {mid} | Stage{stage} | {c['type']} | {c['method'].upper()} | {r['eval_f1']:.4f} | {r['eval_accuracy']:.4f} | " | |
| text += f"{r['eval_precision']:.4f} | {r['eval_recall']:.4f} | " | |
| text += f"{r['eval_sensitivity']:.4f} | {r['eval_specificity']:.4f} |\n" | |
| text += "\n## 基準模型表現(未微調)\n\n" | |
| text += "| 模型 | F1 | Acc | Prec | Recall | Sens | Spec |\n" | |
| text += "|------|-----|-----|------|--------|------|------|\n" | |
| seen_baselines = set() | |
| for mid, info in trained_models.items(): | |
| b = info['baseline'] | |
| c = info['config'] | |
| baseline_key = f"{c['type']}-baseline" | |
| if baseline_key not in seen_baselines: | |
| text += f"| {baseline_key} | {b['f1']:.4f} | {b['accuracy']:.4f} | " | |
| text += f"{b['precision']:.4f} | {b['recall']:.4f} | " | |
| text += f"{b['sensitivity']:.4f} | {b['specificity']:.4f} |\n" | |
| seen_baselines.add(baseline_key) | |
| text += "\n## 🏆 最佳模型\n\n" | |
| for metric in ['f1', 'accuracy', 'precision', 'recall', 'sensitivity', 'specificity']: | |
| best = max(trained_models.items(), key=lambda x: x[1]['results'][f'eval_{metric}']) | |
| baseline_val = best[1]['baseline'][metric] | |
| finetuned_val = best[1]['results'][f'eval_{metric}'] | |
| improvement = calculate_improvement(baseline_val, finetuned_val) | |
| text += f"**{metric.upper()}**: {best[0]} ({finetuned_val:.4f}, 改善 {format_improve(improvement)})\n\n" | |
| return text | |
| def refresh_model_list(): | |
| return gr.Dropdown(choices=list(trained_models.keys())) | |
| def refresh_model_checkboxes(): | |
| return gr.CheckboxGroup(choices=list(trained_models.keys())) | |
| def clear_gpu_memory(): | |
| """手動清理 GPU 記憶體""" | |
| global baseline_model_cache, baseline_performance_cache | |
| try: | |
| baseline_model_cache.clear() | |
| baseline_performance_cache.clear() | |
| thorough_memory_cleanup() | |
| if torch.cuda.is_available(): | |
| allocated = torch.cuda.memory_allocated(0) / 1024**3 | |
| reserved = torch.cuda.memory_reserved(0) / 1024**3 | |
| max_allocated = torch.cuda.max_memory_allocated(0) / 1024**3 | |
| return f"""✅ GPU 記憶體清理完成! | |
| 當前狀態: | |
| 已分配: {allocated:.2f} GB | |
| 已保留: {reserved:.2f} GB | |
| 峰值使用: {max_allocated:.2f} GB""" | |
| else: | |
| return "✅ 記憶體清理完成(CPU 模式)" | |
| except Exception as e: | |
| return f"❌ 清理失敗: {str(e)}" | |
| def update_method_params(method): | |
| """根據選擇的方法更新參數顯示""" | |
| return { | |
| lora_params: gr.update(visible=method in ["lora", "adalora"]), | |
| adalora_params: gr.update(visible=method == "adalora"), | |
| adapter_params: gr.update(visible=method == "adapter"), | |
| prefix_params: gr.update(visible=method in ["prefix", "prompt"]) | |
| } | |
| # Gradio UI | |
| with gr.Blocks(title="完整版 Fine-tuning 平台", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🤖 完整版 BERT & Llama Fine-tuning 平台 v3") | |
| gr.Markdown("### 支持 6 種微調方法 + 二次微調 + 全新資料測試") | |
| gr.Markdown("#### ✨ LoRA | AdaLoRA | Adapter | Prefix Tuning | Prompt Tuning | BitFit") | |
| with gr.Tab("🥇 第一階段訓練"): | |
| gr.Markdown("## 步驟 1: 選擇基礎模型") | |
| base_model = gr.Dropdown( | |
| choices=["BERT-base", "Llama-3.2-1B"], | |
| value="BERT-base", | |
| label="基礎模型" | |
| ) | |
| gr.Markdown("### 🧹 記憶體管理") | |
| with gr.Row(): | |
| clear_mem_btn = gr.Button("🧹 清理 GPU 記憶體", variant="secondary") | |
| mem_status = gr.Textbox(label="記憶體狀態", lines=4, interactive=False, scale=2) | |
| gr.Markdown("## 步驟 2: 選擇微調方法") | |
| method = gr.Radio( | |
| choices=["lora", "adalora", "adapter", "prefix", "prompt", "bitfit"], | |
| value="lora", | |
| label="微調方法" | |
| ) | |
| gr.Markdown("## 步驟 3: 上傳資料") | |
| csv_file = gr.File(label="CSV 檔案 (需包含 Text/text 和 label/nbcd 欄位)", file_types=[".csv"]) | |
| gr.Markdown("## 步驟 4: 設定訓練參數") | |
| with gr.Row(): | |
| num_epochs = gr.Number(value=8, label="訓練輪數", minimum=1, maximum=100, precision=0) | |
| batch_size = gr.Number(value=16, label="批次大小", minimum=1, maximum=128, precision=0) | |
| learning_rate = gr.Number(value=2e-5, label="學習率", minimum=0, maximum=1) | |
| with gr.Row(): | |
| weight_decay = gr.Number(value=0.01, label="權重衰減", minimum=0, maximum=1) | |
| dropout = gr.Number(value=0.3, label="Dropout", minimum=0, maximum=1) | |
| gr.Markdown("### 🔧 方法參數") | |
| with gr.Group(visible=True) as lora_params: | |
| gr.Markdown("#### LoRA 參數") | |
| with gr.Row(): | |
| lora_r = gr.Number(value=32, label="Rank (r)", minimum=1, maximum=256, precision=0) | |
| lora_alpha = gr.Number(value=64, label="Alpha", minimum=1, maximum=512, precision=0) | |
| lora_dropout = gr.Number(value=0.1, label="Dropout", minimum=0, maximum=1) | |
| with gr.Group(visible=False) as adalora_params: | |
| gr.Markdown("#### AdaLoRA 參數") | |
| with gr.Row(): | |
| adalora_init_r = gr.Number(value=12, label="初始 Rank", minimum=1, maximum=64, precision=0) | |
| adalora_tinit = gr.Number(value=200, label="Tinit", minimum=0, maximum=1000, precision=0) | |
| with gr.Row(): | |
| adalora_tfinal = gr.Number(value=1000, label="Tfinal", minimum=0, maximum=5000, precision=0) | |
| adalora_deltaT = gr.Number(value=10, label="DeltaT", minimum=1, maximum=100, precision=0) | |
| with gr.Group(visible=False) as adapter_params: | |
| gr.Markdown("#### Adapter 參數") | |
| adapter_len = gr.Number(value=10, label="Adapter Length", minimum=1, maximum=50, precision=0, | |
| info="Adapter tokens 數量") | |
| with gr.Group(visible=False) as prefix_params: | |
| gr.Markdown("#### Prefix/Prompt 參數") | |
| prefix_len = gr.Number(value=20, label="Virtual Tokens", minimum=1, maximum=100, precision=0, | |
| info="虛擬 token 數量") | |
| method.change( | |
| update_method_params, | |
| inputs=[method], | |
| outputs=[lora_params, adalora_params, adapter_params, prefix_params] | |
| ) | |
| best_metric = gr.Dropdown( | |
| choices=["f1", "accuracy", "precision", "recall", "sensitivity", "specificity"], | |
| value="f1", | |
| label="最佳模型選擇指標" | |
| ) | |
| train_btn = gr.Button("🚀 開始訓練", variant="primary", size="lg") | |
| gr.Markdown("## 📊 訓練結果") | |
| data_info = gr.Textbox(label="📋 資料資訊", lines=10) | |
| with gr.Row(): | |
| baseline_result = gr.Textbox(label="🔬 Baseline", lines=14) | |
| finetuned_result = gr.Textbox(label="✅ 微調模型", lines=14) | |
| comparison_result = gr.Textbox(label="📊 比較", lines=14) | |
| clear_mem_btn.click(clear_gpu_memory, outputs=[mem_status]) | |
| train_btn.click( | |
| train_model, | |
| inputs=[csv_file, base_model, method, num_epochs, batch_size, learning_rate, | |
| weight_decay, dropout, lora_r, lora_alpha, lora_dropout, | |
| adalora_init_r, adalora_tinit, adalora_tfinal, adalora_deltaT, | |
| adapter_len, prefix_len, best_metric], | |
| outputs=[data_info, baseline_result, finetuned_result, comparison_result] | |
| ) | |
| with gr.Tab("🥈 第二階段訓練"): | |
| gr.Markdown("## 二次微調:基於已訓練模型繼續訓練") | |
| gr.Markdown("### 選擇第一階段模型,上傳新資料,進行二次微調") | |
| with gr.Row(): | |
| first_model_select = gr.Dropdown(label="選擇第一階段模型", choices=list(trained_models.keys())) | |
| refresh_stage1 = gr.Button("🔄 刷新模型列表") | |
| stage2_csv = gr.File(label="上傳新的訓練資料 CSV", file_types=[".csv"]) | |
| gr.Markdown("### 二次微調參數") | |
| with gr.Row(): | |
| stage2_epochs = gr.Number(value=3, label="訓練輪數", minimum=1, maximum=20, precision=0, | |
| info="建議較少輪數") | |
| stage2_batch = gr.Number(value=16, label="批次大小", minimum=1, maximum=128, precision=0) | |
| stage2_lr = gr.Number(value=1e-5, label="學習率", minimum=0, maximum=1, | |
| info="自動減半,建議更小") | |
| stage2_metric = gr.Dropdown( | |
| choices=["f1", "accuracy", "precision", "recall", "sensitivity", "specificity"], | |
| value="f1", | |
| label="評估指標" | |
| ) | |
| stage2_train_btn = gr.Button("🔄 開始二次微調", variant="primary", size="lg") | |
| gr.Markdown("## 📊 二次微調結果") | |
| stage2_info = gr.Textbox(label="📋 訓練資訊", lines=8) | |
| with gr.Row(): | |
| stage1_result = gr.Textbox(label="🥇 第一階段", lines=12) | |
| stage2_result = gr.Textbox(label="🥈 第二階段", lines=12) | |
| refresh_stage1.click(refresh_model_list, outputs=[first_model_select]) | |
| stage2_train_btn.click( | |
| second_stage_train, | |
| inputs=[first_model_select, stage2_csv, stage2_epochs, stage2_batch, stage2_lr, stage2_metric], | |
| outputs=[stage2_info, stage1_result, stage2_result] | |
| ) | |
| with gr.Tab("🆕 全新資料測試"): | |
| gr.Markdown("## 在全新資料上測試所有模型") | |
| gr.Markdown("### 上傳模型未見過的測試資料,比較 Baseline、Stage1、Stage2 的表現") | |
| test_csv = gr.File(label="上傳測試資料 CSV", file_types=[".csv"]) | |
| with gr.Row(): | |
| test_models = gr.CheckboxGroup(label="選擇要測試的模型", choices=list(trained_models.keys())) | |
| refresh_test = gr.Button("🔄 刷新") | |
| test_btn = gr.Button("🧪 開始測試", variant="primary", size="lg") | |
| test_output = gr.Markdown(label="測試結果") | |
| refresh_test.click(refresh_model_checkboxes, outputs=[test_models]) | |
| test_btn.click( | |
| evaluate_on_new_data, | |
| inputs=[test_csv, test_models], | |
| outputs=[test_output] | |
| ) | |
| with gr.Tab("🔮 預測"): | |
| gr.Markdown("## 使用訓練好的模型預測") | |
| with gr.Row(): | |
| model_drop = gr.Dropdown(label="選擇模型", choices=list(trained_models.keys())) | |
| refresh = gr.Button("🔄 刷新") | |
| text_input = gr.Textbox(label="輸入病例描述", lines=4, | |
| placeholder="Patient diagnosed with...") | |
| predict_btn = gr.Button("預測", variant="primary", size="lg") | |
| pred_output = gr.Textbox(label="預測結果", lines=20) | |
| refresh.click(refresh_model_list, outputs=[model_drop]) | |
| predict_btn.click(predict, inputs=[model_drop, text_input], outputs=[pred_output]) | |
| gr.Examples( | |
| examples=[ | |
| ["Patient with stage II breast cancer, good response to treatment."], | |
| ["Advanced metastatic cancer, multiple organ involvement."] | |
| ], | |
| inputs=text_input | |
| ) | |
| with gr.Tab("📊 比較"): | |
| gr.Markdown("## 比較所有模型") | |
| compare_btn = gr.Button("比較", variant="primary", size="lg") | |
| compare_output = gr.Markdown() | |
| compare_btn.click(compare, outputs=[compare_output]) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| max_threads=4 | |
| ) |