Spaces:
Paused
Paused
| """ | |
| Llama NBCD Fine-tuning Script with Baseline Comparison | |
| 比較未微調 vs 微調模型的效果 | |
| """ | |
| import pandas as pd | |
| import torch | |
| from datasets import Dataset, DatasetDict | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| TrainingArguments, | |
| Trainer, | |
| DataCollatorWithPadding | |
| ) | |
| from peft import LoraConfig, get_peft_model, TaskType | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import accuracy_score, precision_recall_fscore_support | |
| from sklearn.utils import resample | |
| import numpy as np | |
| import json | |
| from datetime import datetime | |
| import os | |
| from huggingface_hub import login | |
| # ==================== HF Token 登入 ==================== | |
| print("🔐 檢查 Hugging Face Token...") | |
| if "HF_TOKEN" in os.environ: | |
| try: | |
| login(token=os.environ["HF_TOKEN"]) | |
| print("✅ 已使用 HF Token 登入") | |
| except Exception as e: | |
| print(f"⚠️ Token 登入失敗: {e}") | |
| else: | |
| print("⚠️ 未找到 HF_TOKEN,可能無法下載 Llama 模型") | |
| # ==================== 配置參數 ==================== | |
| MODEL_NAME = "meta-llama/Llama-3.2-1B" | |
| TRAINING_DATA_PATH = "./training_data.csv" | |
| OUTPUT_DIR = "./trained_model" | |
| MAX_LENGTH = 512 | |
| # 訓練參數 | |
| TRAIN_CONFIG = { | |
| "num_epochs": 3, | |
| "batch_size": 4, | |
| "learning_rate": 1e-4, | |
| "lora_r": 8, | |
| "lora_alpha": 16, | |
| } | |
| # 資料平衡配置 | |
| BALANCE_CONFIG = { | |
| "target_samples_per_class": 700, | |
| "use_class_weights": True, | |
| } | |
| print("\n" + "="*70) | |
| print("🦙 Llama NBCD Fine-tuning with Baseline Comparison") | |
| print(" (未微調 vs 微調模型比較)") | |
| print("="*70) | |
| print(f"\n📋 配置:") | |
| print(f" 模型: {MODEL_NAME}") | |
| print(f" 訓練數據: {TRAINING_DATA_PATH}") | |
| print(f" 輸出目錄: {OUTPUT_DIR}") | |
| print(f" Epochs: {TRAIN_CONFIG['num_epochs']}") | |
| print(f" Batch Size: {TRAIN_CONFIG['batch_size']}") | |
| print(f" Learning Rate: {TRAIN_CONFIG['learning_rate']}") | |
| print(f" 目標樣本數: {BALANCE_CONFIG['target_samples_per_class']} 筆/類別") | |
| print("="*70 + "\n") | |
| # ==================== 1. 載入數據 ==================== | |
| print("📂 載入訓練數據...") | |
| try: | |
| df = pd.read_csv(TRAINING_DATA_PATH) | |
| print(f"✅ 成功載入 {len(df)} 筆數據") | |
| print(f" 欄位: {list(df.columns)}") | |
| print(f" 原始 Class 0: {(df['nbcd']==0).sum()} 筆") | |
| print(f" 原始 Class 1: {(df['nbcd']==1).sum()} 筆") | |
| except Exception as e: | |
| print(f"❌ 無法載入數據: {e}") | |
| print(f" 請確認 {TRAINING_DATA_PATH} 存在且格式正確") | |
| exit(1) | |
| # ==================== 2. 資料平衡處理 ==================== | |
| print("\n⚖️ 執行資料平衡...") | |
| df_class_0 = df[df['nbcd'] == 0] | |
| df_class_1 = df[df['nbcd'] == 1] | |
| target_n = BALANCE_CONFIG['target_samples_per_class'] | |
| # 欠採樣 Class 0 | |
| if len(df_class_0) > target_n: | |
| df_class_0_balanced = resample(df_class_0, n_samples=target_n, random_state=42, replace=False) | |
| print(f"✅ Class 0 欠採樣: {len(df_class_0)} → {len(df_class_0_balanced)} 筆") | |
| else: | |
| df_class_0_balanced = df_class_0 | |
| print(f"⚠️ Class 0 樣本數不足,保持 {len(df_class_0)} 筆") | |
| # 過採樣 Class 1 | |
| if len(df_class_1) < target_n: | |
| df_class_1_balanced = resample(df_class_1, n_samples=target_n, random_state=42, replace=True) | |
| print(f"✅ Class 1 過採樣: {len(df_class_1)} → {len(df_class_1_balanced)} 筆") | |
| else: | |
| df_class_1_balanced = df_class_1 | |
| print(f"⚠️ Class 1 樣本數充足,保持 {len(df_class_1)} 筆") | |
| df_balanced = pd.concat([df_class_0_balanced, df_class_1_balanced]) | |
| df_balanced = df_balanced.sample(frac=1, random_state=42).reset_index(drop=True) | |
| print(f"\n📊 平衡後數據:") | |
| print(f" 總樣本數: {len(df_balanced)} 筆") | |
| print(f" Class 0: {(df_balanced['nbcd']==0).sum()} 筆") | |
| print(f" Class 1: {(df_balanced['nbcd']==1).sum()} 筆") | |
| # ==================== 3. 計算類別權重 ==================== | |
| if BALANCE_CONFIG['use_class_weights']: | |
| print("\n⚖️ 計算類別權重...") | |
| class_counts = df_balanced['nbcd'].value_counts().sort_index() | |
| total = len(df_balanced) | |
| num_classes = 2 | |
| class_weight_0 = total / (num_classes * class_counts[0]) | |
| class_weight_1 = total / (num_classes * class_counts[1]) | |
| class_weights = torch.tensor([class_weight_0, class_weight_1], dtype=torch.float32) | |
| print(f"✅ 類別權重計算完成:") | |
| print(f" Class 0 權重: {class_weight_0:.4f}") | |
| print(f" Class 1 權重: {class_weight_1:.4f}") | |
| else: | |
| class_weights = None | |
| print("\n⚠️ 未使用類別權重") | |
| # ==================== 4. 分割數據 ==================== | |
| print("\n✂️ 分割訓練集和測試集...") | |
| train_df, test_df = train_test_split( | |
| df_balanced, | |
| test_size=0.2, | |
| stratify=df_balanced['nbcd'], | |
| random_state=42 | |
| ) | |
| print(f"✅ 訓練集: {len(train_df)} 筆 (Class 0: {(train_df['nbcd']==0).sum()}, Class 1: {(train_df['nbcd']==1).sum()})") | |
| print(f"✅ 測試集: {len(test_df)} 筆 (Class 0: {(test_df['nbcd']==0).sum()}, Class 1: {(test_df['nbcd']==1).sum()})") | |
| dataset = DatasetDict({ | |
| 'train': Dataset.from_pandas(train_df[['Text', 'nbcd']]), | |
| 'test': Dataset.from_pandas(test_df[['Text', 'nbcd']]) | |
| }) | |
| # ==================== 5. 檢測設備 ==================== | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f"\n🖥️ 使用設備: {device}") | |
| if device == "cpu": | |
| print("⚠️ 警告: 使用 CPU 訓練會非常慢!") | |
| else: | |
| print(f"✅ GPU 可用: {torch.cuda.get_device_name(0)}") | |
| if class_weights is not None and device == "cuda": | |
| class_weights = class_weights.to(device) | |
| # ==================== 6. 載入模型和 Tokenizer ==================== | |
| print("\n🤖 載入 Llama 模型和 Tokenizer...") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| tokenizer.pad_token_id = tokenizer.eos_token_id | |
| # ==================== 7. 載入未微調的基礎模型 (用於比較) ==================== | |
| print("\n📦 載入未微調的基礎模型 (Baseline)...") | |
| baseline_model = AutoModelForSequenceClassification.from_pretrained( | |
| MODEL_NAME, | |
| num_labels=2, | |
| torch_dtype=torch.float16 if device == "cuda" else torch.float32, | |
| device_map="auto" if device == "cuda" else None | |
| ) | |
| baseline_model.config.pad_token_id = tokenizer.pad_token_id | |
| print("✅ Baseline 模型載入完成") | |
| # ==================== 8. 載入要微調的模型 ==================== | |
| print("\n🔧 載入用於微調的模型...") | |
| base_model = AutoModelForSequenceClassification.from_pretrained( | |
| MODEL_NAME, | |
| num_labels=2, | |
| torch_dtype=torch.float16 if device == "cuda" else torch.float32, | |
| device_map="auto" if device == "cuda" else None | |
| ) | |
| base_model.config.pad_token_id = tokenizer.pad_token_id | |
| print("✅ 基礎模型載入完成") | |
| # ==================== 9. 配置 LoRA ==================== | |
| print("\n🔧 配置 LoRA...") | |
| lora_config = LoraConfig( | |
| task_type=TaskType.SEQ_CLS, | |
| r=TRAIN_CONFIG["lora_r"], | |
| lora_alpha=TRAIN_CONFIG["lora_alpha"], | |
| lora_dropout=0.1, | |
| target_modules=["q_proj", "v_proj"], | |
| bias="none" | |
| ) | |
| model = get_peft_model(base_model, lora_config) | |
| trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) | |
| total_params = sum(p.numel() for p in model.parameters()) | |
| print(f"✅ LoRA 配置完成") | |
| print(f" 可訓練參數: {trainable_params:,} ({trainable_params/total_params*100:.2f}%)") | |
| # ==================== 10. 預處理數據 ==================== | |
| print("\n🔄 預處理數據...") | |
| def preprocess_function(examples): | |
| return tokenizer( | |
| examples['Text'], | |
| truncation=True, | |
| padding='max_length', | |
| max_length=MAX_LENGTH | |
| ) | |
| tokenized_dataset = dataset.map(preprocess_function, batched=True, remove_columns=['Text']) | |
| tokenized_dataset = tokenized_dataset.rename_column("nbcd", "labels") | |
| print("✅ 數據預處理完成") | |
| # ==================== 11. 評估指標函數 ==================== | |
| def compute_metrics(eval_pred): | |
| predictions, labels = eval_pred | |
| predictions = np.argmax(predictions, axis=1) | |
| accuracy = accuracy_score(labels, predictions) | |
| precision, recall, f1, _ = precision_recall_fscore_support( | |
| labels, predictions, average='binary', zero_division=0 | |
| ) | |
| return { | |
| 'accuracy': accuracy, | |
| 'precision': precision, | |
| 'recall': recall, | |
| 'f1': f1 | |
| } | |
| # ==================== 12. 評估 Baseline 模型 (未微調) ==================== | |
| print("\n" + "="*70) | |
| print("📊 評估未微調的 Baseline 模型...") | |
| print("="*70) | |
| baseline_trainer = Trainer( | |
| model=baseline_model, | |
| args=TrainingArguments( | |
| output_dir="./temp_baseline", | |
| per_device_eval_batch_size=TRAIN_CONFIG["batch_size"], | |
| bf16=(device == "cuda"), | |
| report_to="none" | |
| ), | |
| tokenizer=tokenizer, | |
| data_collator=DataCollatorWithPadding(tokenizer=tokenizer), | |
| compute_metrics=compute_metrics | |
| ) | |
| baseline_train_results = baseline_trainer.evaluate(eval_dataset=tokenized_dataset['train']) | |
| baseline_test_results = baseline_trainer.evaluate(eval_dataset=tokenized_dataset['test']) | |
| print("\n🔍 Baseline 模型 - 訓練集結果:") | |
| print(f" Accuracy: {baseline_train_results['eval_accuracy']:.4f}") | |
| print(f" Precision: {baseline_train_results['eval_precision']:.4f}") | |
| print(f" Recall: {baseline_train_results['eval_recall']:.4f}") | |
| print(f" F1 Score: {baseline_train_results['eval_f1']:.4f}") | |
| print("\n🔍 Baseline 模型 - 測試集結果:") | |
| print(f" Accuracy: {baseline_test_results['eval_accuracy']:.4f}") | |
| print(f" Precision: {baseline_test_results['eval_precision']:.4f}") | |
| print(f" Recall: {baseline_test_results['eval_recall']:.4f}") | |
| print(f" F1 Score: {baseline_test_results['eval_f1']:.4f}") | |
| # ==================== 13. 自定義 Trainer ==================== | |
| if BALANCE_CONFIG['use_class_weights']: | |
| class WeightedTrainer(Trainer): | |
| def __init__(self, *args, class_weights=None, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.class_weights = class_weights | |
| def compute_loss(self, model, inputs, return_outputs=False, **kwargs): | |
| labels = inputs.pop("labels") | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| loss_fct = torch.nn.CrossEntropyLoss(weight=self.class_weights) | |
| loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1)) | |
| return (loss, outputs) if return_outputs else loss | |
| TrainerClass = WeightedTrainer | |
| else: | |
| TrainerClass = Trainer | |
| # ==================== 14. 訓練配置 ==================== | |
| print("\n" + "="*70) | |
| print("⚙️ 配置微調訓練器...") | |
| print("="*70) | |
| training_args = TrainingArguments( | |
| output_dir=OUTPUT_DIR, | |
| num_train_epochs=TRAIN_CONFIG["num_epochs"], | |
| per_device_train_batch_size=TRAIN_CONFIG["batch_size"], | |
| per_device_eval_batch_size=TRAIN_CONFIG["batch_size"], | |
| learning_rate=TRAIN_CONFIG["learning_rate"], | |
| weight_decay=0.01, | |
| eval_strategy="epoch", | |
| save_strategy="epoch", | |
| load_best_model_at_end=True, | |
| metric_for_best_model="f1", | |
| logging_dir=f"{OUTPUT_DIR}/logs", | |
| logging_steps=10, | |
| bf16=(device == "cuda"), | |
| gradient_accumulation_steps=2, | |
| warmup_steps=50, | |
| report_to="none", | |
| seed=42 | |
| ) | |
| if BALANCE_CONFIG['use_class_weights']: | |
| trainer = TrainerClass( | |
| model=model, | |
| args=training_args, | |
| train_dataset=tokenized_dataset['train'], | |
| eval_dataset=tokenized_dataset['test'], | |
| tokenizer=tokenizer, | |
| data_collator=DataCollatorWithPadding(tokenizer=tokenizer), | |
| compute_metrics=compute_metrics, | |
| class_weights=class_weights | |
| ) | |
| else: | |
| trainer = TrainerClass( | |
| model=model, | |
| args=training_args, | |
| train_dataset=tokenized_dataset['train'], | |
| eval_dataset=tokenized_dataset['test'], | |
| tokenizer=tokenizer, | |
| data_collator=DataCollatorWithPadding(tokenizer=tokenizer), | |
| compute_metrics=compute_metrics | |
| ) | |
| # ==================== 15. 開始訓練 ==================== | |
| print("\n" + "="*70) | |
| print("🚀 開始微調訓練...") | |
| print("="*70 + "\n") | |
| start_time = datetime.now() | |
| try: | |
| train_result = trainer.train() | |
| end_time = datetime.now() | |
| duration = (end_time - start_time).total_seconds() / 60 | |
| print("\n" + "="*70) | |
| print(f"✅ 訓練完成!") | |
| print(f" 耗時: {duration:.1f} 分鐘") | |
| print("="*70) | |
| except Exception as e: | |
| print(f"\n❌ 訓練失敗: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| exit(1) | |
| # ==================== 16. 評估微調後的模型 ==================== | |
| print("\n" + "="*70) | |
| print("📊 評估微調後的模型...") | |
| print("="*70) | |
| finetuned_train_results = trainer.evaluate(eval_dataset=tokenized_dataset['train']) | |
| finetuned_test_results = trainer.evaluate(eval_dataset=tokenized_dataset['test']) | |
| print("\n🔍 微調模型 - 訓練集結果:") | |
| print(f" Accuracy: {finetuned_train_results['eval_accuracy']:.4f}") | |
| print(f" Precision: {finetuned_train_results['eval_precision']:.4f}") | |
| print(f" Recall: {finetuned_train_results['eval_recall']:.4f}") | |
| print(f" F1 Score: {finetuned_train_results['eval_f1']:.4f}") | |
| print("\n🔍 微調模型 - 測試集結果:") | |
| print(f" Accuracy: {finetuned_test_results['eval_accuracy']:.4f}") | |
| print(f" Precision: {finetuned_test_results['eval_precision']:.4f}") | |
| print(f" Recall: {finetuned_test_results['eval_recall']:.4f}") | |
| print(f" F1 Score: {finetuned_test_results['eval_f1']:.4f}") | |
| # ==================== 17. 比較結果 ==================== | |
| print("\n" + "="*70) | |
| print("📈 Baseline vs Fine-tuned 比較 (測試集)") | |
| print("="*70) | |
| metrics = ['accuracy', 'precision', 'recall', 'f1'] | |
| print(f"\n{'指標':<12} {'Baseline':<12} {'Fine-tuned':<12} {'改善':<12} {'狀態'}") | |
| print("-" * 70) | |
| for metric in metrics: | |
| baseline_val = baseline_test_results[f'eval_{metric}'] | |
| finetuned_val = finetuned_test_results[f'eval_{metric}'] | |
| improvement = finetuned_val - baseline_val | |
| improvement_pct = (improvement / baseline_val * 100) if baseline_val > 0 else 0 | |
| status = "✅ 提升" if improvement > 0 else "⚠️ 下降" if improvement < 0 else "➖ 持平" | |
| print(f"{metric.capitalize():<12} {baseline_val:<12.4f} {finetuned_val:<12.4f} " | |
| f"{improvement:+.4f} ({improvement_pct:+.1f}%) {status}") | |
| print("="*70) | |
| # ==================== 18. 測試推論比較 ==================== | |
| print("\n" + "="*70) | |
| print("🧪 測試推論比較 (5個樣本)") | |
| print("="*70) | |
| def predict_with_model(model_obj, text): | |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_LENGTH) | |
| if device == "cuda": | |
| inputs = {k: v.to(model_obj.device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = model_obj(**inputs) | |
| probs = torch.nn.functional.softmax(outputs.logits, dim=-1) | |
| predicted_class = torch.argmax(probs, dim=-1).item() | |
| confidence = probs[0][predicted_class].item() | |
| return predicted_class, confidence | |
| test_samples = test_df.head(5) | |
| baseline_correct = 0 | |
| finetuned_correct = 0 | |
| baseline_class1_correct = 0 | |
| finetuned_class1_correct = 0 | |
| class1_total = 0 | |
| for idx, (_, row) in enumerate(test_samples.iterrows(), 1): | |
| true_label = row['nbcd'] | |
| text = row['Text'] | |
| # Baseline 預測 | |
| baseline_pred, baseline_conf = predict_with_model(baseline_model, text) | |
| baseline_match = "✅" if baseline_pred == true_label else "❌" | |
| if baseline_pred == true_label: | |
| baseline_correct += 1 | |
| # Fine-tuned 預測 | |
| finetuned_pred, finetuned_conf = predict_with_model(model, text) | |
| finetuned_match = "✅" if finetuned_pred == true_label else "❌" | |
| if finetuned_pred == true_label: | |
| finetuned_correct += 1 | |
| # Class 1 統計 | |
| if true_label == 1: | |
| class1_total += 1 | |
| if baseline_pred == 1: | |
| baseline_class1_correct += 1 | |
| if finetuned_pred == 1: | |
| finetuned_class1_correct += 1 | |
| print(f"\n樣本 {idx} (實際標籤: {true_label}):") | |
| print(f" 文本: {text[:100]}...") | |
| print(f" {baseline_match} Baseline: 預測={baseline_pred} 信心度={baseline_conf:.3f}") | |
| print(f" {finetuned_match} Fine-tuned: 預測={finetuned_pred} 信心度={finetuned_conf:.3f}") | |
| print("\n" + "="*70) | |
| print("📊 5個樣本預測準確率:") | |
| print(f" Baseline: {baseline_correct}/5 = {baseline_correct/5*100:.1f}%") | |
| print(f" Fine-tuned: {finetuned_correct}/5 = {finetuned_correct/5*100:.1f}%") | |
| if class1_total > 0: | |
| print(f"\n Class 1 識別率 (共 {class1_total} 個):") | |
| print(f" Baseline: {baseline_class1_correct}/{class1_total}") | |
| print(f" Fine-tuned: {finetuned_class1_correct}/{class1_total}") | |
| print("="*70) | |
| # ==================== 19. 保存模型和結果 ==================== | |
| print("\n💾 保存模型和結果...") | |
| trainer.save_model() | |
| tokenizer.save_pretrained(OUTPUT_DIR) | |
| comparison_results = { | |
| "model": MODEL_NAME, | |
| "config": TRAIN_CONFIG, | |
| "balance_config": BALANCE_CONFIG, | |
| "train_time_minutes": duration, | |
| "baseline_results": { | |
| "train": { | |
| "accuracy": float(baseline_train_results['eval_accuracy']), | |
| "precision": float(baseline_train_results['eval_precision']), | |
| "recall": float(baseline_train_results['eval_recall']), | |
| "f1": float(baseline_train_results['eval_f1']) | |
| }, | |
| "test": { | |
| "accuracy": float(baseline_test_results['eval_accuracy']), | |
| "precision": float(baseline_test_results['eval_precision']), | |
| "recall": float(baseline_test_results['eval_recall']), | |
| "f1": float(baseline_test_results['eval_f1']) | |
| } | |
| }, | |
| "finetuned_results": { | |
| "train": { | |
| "accuracy": float(finetuned_train_results['eval_accuracy']), | |
| "precision": float(finetuned_train_results['eval_precision']), | |
| "recall": float(finetuned_train_results['eval_recall']), | |
| "f1": float(finetuned_train_results['eval_f1']) | |
| }, | |
| "test": { | |
| "accuracy": float(finetuned_test_results['eval_accuracy']), | |
| "precision": float(finetuned_test_results['eval_precision']), | |
| "recall": float(finetuned_test_results['eval_recall']), | |
| "f1": float(finetuned_test_results['eval_f1']) | |
| } | |
| }, | |
| "improvements": { | |
| "accuracy": float(finetuned_test_results['eval_accuracy'] - baseline_test_results['eval_accuracy']), | |
| "precision": float(finetuned_test_results['eval_precision'] - baseline_test_results['eval_precision']), | |
| "recall": float(finetuned_test_results['eval_recall'] - baseline_test_results['eval_recall']), | |
| "f1": float(finetuned_test_results['eval_f1'] - baseline_test_results['eval_f1']) | |
| }, | |
| "timestamp": datetime.now().isoformat(), | |
| "device": device | |
| } | |
| with open(f"{OUTPUT_DIR}/comparison_results.json", "w", encoding='utf-8') as f: | |
| json.dump(comparison_results, f, indent=2, ensure_ascii=False) | |
| print(f"✅ 結果已保存到: {OUTPUT_DIR}/comparison_results.json") | |
| # ==================== 20. 總結 ==================== | |
| print("\n" + "="*70) | |
| print("🎉 訓練和比較流程全部完成!") | |
| print("="*70) | |
| print(f"\n📦 輸出內容:") | |
| print(f" 微調模型: {OUTPUT_DIR}/") | |
| print(f" 比較結果: {OUTPUT_DIR}/comparison_results.json") | |
| print(f" 訓練日誌: {OUTPUT_DIR}/logs/") | |
| print("\n💡 關鍵發現:") | |
| print(f" 測試集 F1 Score 提升: {comparison_results['improvements']['f1']:+.4f}") | |
| print(f" 測試集 Recall 提升: {comparison_results['improvements']['recall']:+.4f}") | |
| print(f" 測試集 Accuracy 提升: {comparison_results['improvements']['accuracy']:+.4f}") | |
| print("="*70 + "\n") |