""" PaddleNLP情绪分析微调训练脚本 针对HR对话场景优化情感分类模型 """ import os import json import paddle import pandas as pd from pathlib import Path from typing import List, Dict, Tuple from paddle.io import Dataset from paddlenlp.transformers import AutoModelForSequenceClassification, AutoTokenizer from paddlenlp.datasets import load_dataset from paddle.optimizer import AdamW from paddlenlp.trainer import PdArgumentParser from paddle.metric import Accuracy class HRSentimentDataset(Dataset): """HR情绪分析数据集""" def __init__(self, data_path: str, tokenizer: AutoTokenizer, max_length: int = 128): """ Args: data_path: JSON数据文件路径 tokenizer: 分词器 max_length: 最大序列长度 """ self.tokenizer = tokenizer self.max_length = max_length # 加载数据 with open(data_path, 'r', encoding='utf-8') as f: raw_data = json.load(f) # 解析数据 self.texts = [] self.labels = [] self.label_map = {'positive': 0, 'neutral': 1, 'negative': 2} for item in raw_data: self.texts.append(item['text']) self.labels.append(self.label_map[item['label']]) print(f"已加载 {len(self.texts)} 条样本") def __getitem__(self, idx: int) -> Dict: text = str(self.texts[idx]) label = self.labels[idx] # 分词 encoded = self.tokenizer( text, max_length=self.max_length, padding='max_length', truncation=True, return_tensors='pd' ) result = { 'input_ids': encoded['input_ids'].squeeze(0), 'token_type_ids': encoded['token_type_ids'].squeeze(0), 'labels': paddle.to_tensor([label], dtype='int64') } # 只在存在attention_mask时才添加 if 'attention_mask' in encoded and encoded['attention_mask'] is not None: result['attention_mask'] = encoded['attention_mask'].squeeze(0) return result def __len__(self) -> int: return len(self.texts) class HRSentimentTrainer: """HR情绪分析训练器""" def __init__( self, model_name: str = "utterless/electra-small-zh", train_data_path: str = None, val_data_path: str = None, output_dir: str = "./models/sentiment-hr", max_length: int = 128, batch_size: int = 32, num_epochs: int = 3, learning_rate: float = 2e-5, warmup_steps: int = 100 ): """ Args: model_name: 预训练模型名称 (支持中文模型) train_data_path: 训练数据路径 val_data_path: 验证数据路径 output_dir: 输出目录 max_length: 最大序列长度 batch_size: 批次大小 num_epochs: 训练轮数 learning_rate: 学习率 warmup_steps: 预热步数 """ self.model_name = model_name self.train_data_path = train_data_path self.val_data_path = val_data_path self.output_dir = Path(output_dir) self.max_length = max_length self.batch_size = batch_size self.num_epochs = num_epochs self.learning_rate = learning_rate self.warmup_steps = warmup_steps # 创建输出目录 self.output_dir.mkdir(parents=True, exist_ok=True) # 加载tokenizer print(f"加载分词器: {model_name}") self.tokenizer = AutoTokenizer.from_pretrained(model_name) # 加载模型 (3分类: positive, neutral, negative) print(f"加载预训练模型: {model_name}") self.model = AutoModelForSequenceClassification.from_pretrained( model_name, num_classes=3 ) # 标签映射 self.label_map = {0: 'positive', 1: 'neutral', 2: 'negative'} self.id2label = {0: 'positive', 1: 'neutral', 2: 'negative'} print(f"输出目录: {self.output_dir}") def prepare_data(self) -> Tuple[HRSentimentDataset, HRSentimentDataset]: """准备训练和验证数据""" print("\n准备数据...") train_dataset = HRSentimentDataset( self.train_data_path, self.tokenizer, self.max_length ) val_dataset = None if self.val_data_path and os.path.exists(self.val_data_path): val_dataset = HRSentimentDataset( self.val_data_path, self.tokenizer, self.max_length ) return train_dataset, val_dataset def train(self): """训练模型""" print("\n开始训练...") # 准备数据 train_dataset, val_dataset = self.prepare_data() # 创建DataLoader train_loader = paddle.io.DataLoader( train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=0 ) # 优化器 num_training_steps = len(train_loader) * self.num_epochs optimizer = AdamW( parameters=self.model.parameters(), learning_rate=self.learning_rate ) # 学习率调度 lr_scheduler = paddle.optimizer.lr.LinearWarmup( learning_rate=self.learning_rate, warmup_steps=self.warmup_steps, start_lr=5e-7, end_lr=self.learning_rate ) # 损失函数 criterion = paddle.nn.loss.CrossEntropyLoss() # 训练循环 global_step = 0 best_val_loss = float('inf') for epoch in range(self.num_epochs): print(f"\n=== Epoch {epoch + 1}/{self.num_epochs} ===") self.model.train() total_loss = 0 metric = Accuracy() for batch_idx, batch in enumerate(train_loader): # 前向传播 input_ids = batch['input_ids'] token_type_ids = batch['token_type_ids'] labels = batch['labels'].squeeze(1) logits = self.model(input_ids, token_type_ids=token_type_ids) loss = criterion(logits, labels) # 反向传播 loss.backward() optimizer.step() lr_scheduler.step() optimizer.clear_grad() # 统计 total_loss += float(loss.numpy()) correct = metric.compute(logits, labels) metric.update(correct) global_step += 1 # 打印进度 if global_step % 50 == 0: avg_loss = total_loss / (batch_idx + 1) acc = metric.accumulate() print(f"Step {global_step} - Loss: {avg_loss:.4f} - Acc: {acc:.4f}") # Epoch统计 avg_train_loss = total_loss / len(train_loader) train_acc = metric.accumulate() print(f"Epoch {epoch + 1} - Avg Loss: {avg_train_loss:.4f} - Train Acc: {train_acc:.4f}") # 验证 if val_dataset: val_loss, val_acc = self.evaluate(val_dataset) print(f"Val Loss: {val_loss:.4f} - Val Acc: {val_acc:.4f}") # 保存最佳模型 if val_loss < best_val_loss: best_val_loss = val_loss self.save_model(epoch, val_loss, val_acc) print(f"保存最佳模型 (Val Loss: {val_loss:.4f})") else: # 保存每个epoch的模型 self.save_model(epoch, avg_train_loss, train_acc) print("\n训练完成!") print(f"模型已保存至: {self.output_dir}") def evaluate(self, val_dataset: HRSentimentDataset) -> Tuple[float, float]: """评估模型""" self.model.eval() val_loader = paddle.io.DataLoader( val_dataset, batch_size=self.batch_size, shuffle=False ) criterion = paddle.nn.loss.CrossEntropyLoss() metric = Accuracy() total_loss = 0 with paddle.no_grad(): for batch in val_loader: input_ids = batch['input_ids'] token_type_ids = batch['token_type_ids'] labels = batch['labels'].squeeze(1) logits = self.model(input_ids, token_type_ids=token_type_ids) loss = criterion(logits, labels) total_loss += float(loss.numpy()) correct = metric.compute(logits, labels) metric.update(correct) avg_loss = total_loss / len(val_loader) acc = metric.accumulate() return avg_loss, acc def save_model(self, epoch: int, val_loss: float, val_acc: float): """保存模型""" save_path = self.output_dir / f"checkpoint-epoch-{epoch + 1}" save_path.mkdir(exist_ok=True) self.model.save_pretrained(str(save_path)) self.tokenizer.save_pretrained(str(save_path)) # 保存训练信息 info = { 'epoch': epoch + 1, 'val_loss': float(val_loss), 'val_acc': float(val_acc), 'model_name': self.model_name, 'label_map': self.id2label } with open(save_path / 'training_info.json', 'w', encoding='utf-8') as f: json.dump(info, f, ensure_ascii=False, indent=2) def predict(self, texts: List[str]) -> List[Dict]: """预测文本情绪""" self.model.eval() results = [] for text in texts: encoded = self.tokenizer( text, max_length=self.max_length, padding='max_length', truncation=True, return_tensors='pd' ) with paddle.no_grad(): logits = self.model( encoded['input_ids'], token_type_ids=encoded['token_type_ids'] ) probs = paddle.nn.functional.softmax(logits, axis=1).numpy()[0] pred_id = int(probs.argmax()) results.append({ 'text': text, 'label': self.id2label[pred_id], 'confidence': float(probs[pred_id]), 'probabilities': { self.id2label[i]: float(prob) for i, prob in enumerate(probs) } }) return results def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description="PaddleNLP情绪分析微调") parser.add_argument("--model_name", type=str, default="utterless/electra-small-zh", help="预训练模型名称") parser.add_argument("--train_data", type=str, required=True, help="训练数据路径 (JSON格式)") parser.add_argument("--val_data", type=str, default=None, help="验证数据路径 (可选)") parser.add_argument("--output_dir", type=str, default="./models/sentiment-hr", help="输出目录") parser.add_argument("--max_length", type=int, default=128, help="最大序列长度") parser.add_argument("--batch_size", type=int, default=32, help="批次大小") parser.add_argument("--num_epochs", type=int, default=3, help="训练轮数") parser.add_argument("--learning_rate", type=float, default=2e-5, help="学习率") parser.add_argument("--warmup_steps", type=int, default=100, help="预热步数") args = parser.parse_args() # 创建训练器 trainer = HRSentimentTrainer( model_name=args.model_name, train_data_path=args.train_data, val_data_path=args.val_data, output_dir=args.output_dir, max_length=args.max_length, batch_size=args.batch_size, num_epochs=args.num_epochs, learning_rate=args.learning_rate, warmup_steps=args.warmup_steps ) # 训练 trainer.train() if __name__ == "__main__": main()