hr-eval-api-v2 / models /sentiment.py
KarenYYH
Fix: Monkey-patch aistudio_sdk.hub.download before importing PaddleNLP
26bc5b6
"""
情绪分析模块
支持微调后的PaddleNLP模型和规则引擎备用方案
"""
# 先导入 PaddleNLP 以避免与 datasets 的导入冲突
try:
import paddlenlp
except ImportError:
pass
from typing import Dict, List, Optional
from pathlib import Path
import numpy as np
from config import EVALUATION_CONFIG
class SentimentAnalyzer:
"""情绪分析器"""
def __init__(self, model_path: Optional[str] = None):
"""
初始化分析器
Args:
model_path: 微调模型路径(支持本地路径或 HuggingFace Hub 模型名)
"""
self.model_path = model_path
self.use_model = False
self.classifier = None
# 优先尝试加载微调模型
# model_path 可能是本地路径或 HuggingFace Hub 模型名(如 KarenYYH/sentiment-hr)
if model_path:
# 检查是否是本地路径
if Path(model_path).exists():
self._load_finetuned_model(model_path)
# 检查是否是 HuggingFace Hub 模型名(不包含 / 则不是 Hub 模型)
elif "/" in model_path:
self._load_finetuned_model(model_path)
else:
self._load_official_model()
else:
self._load_official_model()
# 情绪映射
self.emotion_map = {
"positive": "positive",
"neutral": "neutral",
"negative": "negative"
}
def _load_finetuned_model(self, model_path: str):
"""加载微调后的模型"""
print(f"正在加载微调模型: {model_path}...")
# 对于 HuggingFace Hub 模型,先下载到本地临时目录
# 避免 aistudio_sdk 的兼容性问题
if "/" in model_path:
local_path = self._download_from_hf_hub(model_path)
if local_path:
self._load_paddlenlp_model(local_path)
return
# 如果下载失败,继续尝试其他方式
# 尝试直接加载(本地路径或直接从 Hub)
self._load_paddlenlp_model(model_path)
def _download_from_hf_hub(self, model_id: str) -> Optional[str]:
"""使用 huggingface_hub 下载 PaddleNLP 模型到本地临时目录"""
import tempfile
try:
from huggingface_hub import snapshot_download
print(f"从 HuggingFace Hub 下载模型: {model_id}")
# 创建临时目录
temp_dir = tempfile.mkdtemp(prefix="hf_model_")
# 下载所有文件
snapshot_download(
repo_id=model_id,
local_dir=temp_dir,
local_dir_use_symlinks=False
)
print(f"模型已下载到: {temp_dir}")
return temp_dir
except ImportError:
print("huggingface_hub 未安装,尝试直接加载...")
return None
except Exception as e:
print(f"HuggingFace Hub 下载失败: {e}")
return None
def _load_paddlenlp_model(self, model_path: str):
"""使用 PaddleNLP 加载模型"""
# 先尝试禁用 aistudio_sdk,然后再导入 PaddleNLP
import os
os.environ['HUB_DISABLE_DOWNLOAD'] = '1'
try:
import paddle
from paddle.nn import Layer
from paddle.nn import Linear, Dropout
print(f"使用 PaddleNLP 加载模型: {model_path}")
# 检查必需文件
model_state_path = Path(model_path) / "model_state.pdparams"
config_path = Path(model_path) / "config.json"
vocab_path = Path(model_path) / "vocab.txt"
tokenizer_config_path = Path(model_path) / "tokenizer_config.json"
if model_state_path.exists() and config_path.exists() and vocab_path.exists():
print(f"从本地文件加载模型: {model_state_path}")
# 读取配置
import json
with open(config_path, 'r') as f:
config = json.load(f)
# 手动创建 tokenizer(完全避免 PaddleNLP 的自动初始化)
# 读取词汇表
with open(vocab_path, 'r', encoding='utf-8') as f:
vocab_list = f.readlines()
# 创建词汇表字典
vocab = {line.strip(): idx for idx, line in enumerate(vocab_list)}
# 读取 tokenizer 配置
tokenizer_config = {}
if tokenizer_config_path.exists():
with open(tokenizer_config_path, 'r') as f:
tokenizer_config = json.load(f)
# 延迟导入 - 只在真正需要时才导入
# 使用 exec 来避免模块级导入
import sys
import types
# 创建一个假的模块来拦截 aistudio_sdk 调用
class FakeAistudioHub:
@staticmethod
def download(*args, **kwargs):
return None
# 注入假模块
sys.modules['aistudio_sdk'] = types.ModuleType('aistudio_sdk', ())
sys.modules['aistudio_sdk.hub'] = types.ModuleType('aistudio_sdk.hub', ())
sys.modules['aistudio_sdk.hub'].download = FakeAistudioHub.download
# 现在可以安全地导入 BertTokenizer
from paddlenlp.transformers import BertTokenizer as BT
self.tokenizer = BT(
vocab,
do_lower_case=tokenizer_config.get('do_lower_case', False),
do_basic_tokenize=tokenizer_config.get('do_basic_tokenize', True),
never_split=tokenizer_config.get('never_split', None)
)
# 直接加载状态字典
state_dict = paddle.load(str(model_state_path))
# 创建一个自定义的 BERT 分类模型容器
class BertClassificationModel(Layer):
def __init__(self, config_dict, num_labels=3):
super().__init__()
# 延迟导入 BertModel
from paddlenlp.transformers import BertConfig, BertModel
bert_config = BertConfig(
vocab_size=config_dict.get('vocab_size', 21128),
hidden_size=config_dict.get('hidden_size', 768),
num_hidden_layers=config_dict.get('num_hidden_layers', 12),
num_attention_heads=config_dict.get('num_attention_heads', 12),
intermediate_size=config_dict.get('intermediate_size', 3072),
hidden_act=config_dict.get('hidden_act', 'gelu'),
hidden_dropout_prob=config_dict.get('hidden_dropout_prob', 0.1),
attention_probs_dropout_prob=config_dict.get('attention_probs_dropout_prob', 0.1),
max_position_embeddings=config_dict.get('max_position_embeddings', 512),
type_vocab_size=config_dict.get('type_vocab_size', 2),
initializer_range=config_dict.get('initializer_range', 0.02),
pad_token_id=config_dict.get('pad_token_id', 0),
)
# 直接创建 BertModel 实例
self.bert = BertModel(bert_config)
self.dropout = Dropout(p=config_dict.get('hidden_dropout_prob', 0.1))
self.classifier = Linear(
config_dict.get('hidden_size', 768),
num_labels
)
def forward(self, input_ids, token_type_ids=None, position_ids=None, attention_mask=None):
outputs = self.bert(
input_ids,
token_type_ids=token_type_ids,
position_ids=position_ids,
attention_mask=attention_mask
)
pooled_output = outputs[1]
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
return logits
# 创建模型实例
self.model = BertClassificationModel(config, num_labels=3)
# 加载权重
self.model.set_state_dict(state_dict)
self.model.eval()
else:
# 尝试标准加载方式
from paddlenlp.transformers import AutoTokenizer, AutoModelForSequenceClassification
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
self.model = AutoModelForSequenceClassification.from_pretrained(model_path)
self.model.eval()
self.model_type = "paddlenlp"
# 加载标签映射
self.id2label = self._load_label_mapping(model_path)
self.use_model = True
self.is_finetuned = True
print(f"微调模型加载完成 (使用 PaddleNLP, 标签映射: {self.id2label})")
except Exception as e:
print(f"警告: PaddleNLP 模型加载失败: {e}")
print("尝试加载官方预训练模型...")
self._load_official_model()
def _load_label_mapping(self, model_path: str) -> dict:
"""从模型配置加载标签映射"""
import json
# 首先尝试从 training_info.json 加载
if "/" not in model_path: # 本地路径
config_path = Path(model_path) / 'training_info.json'
else:
# HuggingFace Hub 模型,尝试从 config 加载
try:
from transformers import AutoConfig
config = AutoConfig.from_pretrained(model_path)
if hasattr(config, 'id2label'):
# 将 {0: 'LABEL_0', 1: 'LABEL_1', ...} 映射到情绪标签
# 根据训练时使用的标签顺序
label_values = list(config.id2label.values())
if len(label_values) == 3:
# 假设顺序是 positive, neutral, negative
return {0: 'positive', 1: 'neutral', 2: 'negative'}
else:
return {0: 'positive', 1: 'neutral', 2: 'negative'}
except:
pass
# 对于 HuggingFace Hub 模型,无法直接读取 training_info.json
# 使用默认映射
return {0: 'positive', 1: 'neutral', 2: 'negative'}
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
return config.get('label_map', {0: 'positive', 1: 'neutral', 2: 'negative'})
return {0: 'positive', 1: 'neutral', 2: 'negative'}
def _load_official_model(self):
"""加载官方预训练模型"""
print("正在加载PaddleNLP官方情感分类模型...")
try:
from paddlenlp import Taskflow
# 尝试使用正确的任务名称
try:
self.classifier = Taskflow("sentiment_analysis")
except AssertionError:
# 如果sentiment_analysis不可用,尝试其他名称
self.classifier = Taskflow("emotion")
self.use_model = True
self.is_finetuned = False
print("PaddleNLP官方模型加载完成")
except Exception as e:
print(f"警告: PaddleNLP加载失败: {e}")
print("使用基于规则的情绪分析(备用方案)")
self.use_model = False
def analyze_turn(
self,
utterance: str,
context: Optional[Dict] = None
) -> Dict:
"""
分析单轮对话的情感
Args:
utterance: 对话语句
context: 上下文
Returns:
情感分析结果
"""
if self.use_model:
return self._analyze_with_model(utterance)
else:
return self._analyze_with_rules(utterance)
def _analyze_with_model(self, utterance: str) -> Dict:
"""使用PaddleNLP模型分析"""
try:
# 判断是微调模型还是官方模型
if getattr(self, 'is_finetuned', False):
return self._predict_with_finetuned(utterance)
else:
return self._predict_with_official(utterance)
except Exception as e:
print(f"PaddleNLP分析失败: {e},使用备用方案")
return self._analyze_with_rules(utterance)
def _predict_with_finetuned(self, utterance: str) -> Dict:
"""使用微调模型预测"""
import paddle
# 分词
encoded = self.tokenizer(
utterance,
max_length=128,
padding='max_length',
truncation=True,
return_tensors='pd'
)
# 预测
with paddle.no_grad():
logits = self.model(
encoded['input_ids'],
token_type_ids=encoded['token_type_ids']
)
# 计算概率
probs = paddle.nn.functional.softmax(logits, axis=1).numpy()[0]
pred_id = int(probs.argmax())
return {
"emotion": self.id2label[pred_id],
"confidence": float(probs[pred_id]),
"probabilities": {self.id2label[i]: float(prob) for i, prob in enumerate(probs)},
"method": "finetuned_paddlenlp"
}
def _predict_with_official(self, utterance: str) -> Dict:
"""使用官方模型预测"""
# PaddleNLP Taskflow返回结果
result = self.classifier(utterance)
if isinstance(result, list) and len(result) > 0:
top_result = result[0]
# 提取标签和置信度
label = top_result.get('label', 'neutral')
confidence = top_result.get('score', 0.5)
# 映射标签
mapped_label = self.emotion_map.get(label, "neutral")
return {
"emotion": mapped_label,
"confidence": float(confidence),
"raw_label": label,
"method": "official_paddlenlp"
}
# 如果官方模型返回格式不符合预期,降级到规则引擎
return self._analyze_with_rules(utterance)
def _analyze_with_rules(self, utterance: str) -> Dict:
"""使用基于规则的分析(备用方案)"""
# 积极词汇
positive_words = [
"好", "满意", "喜欢", "谢谢", "感谢", "可以", "行",
"yes", "ok", "好的", "没问题", "支持"
]
# 消极词汇
negative_words = [
"不", "没", "不行", "不好", "讨厌", "烦", "生气", "愤怒",
"不满", "投诉", "举报", "错误", "失败", "no", "not"
]
# 检测积极词
positive_count = sum(1 for word in positive_words if word in utterance.lower())
# 检测消极词
negative_count = sum(1 for word in negative_words if word in utterance.lower())
# 判断情绪
if positive_count > negative_count:
label = "positive"
confidence = min(0.7, 0.5 + positive_count * 0.1)
elif negative_count > positive_count:
label = "negative"
confidence = min(0.7, 0.5 + negative_count * 0.1)
else:
label = "neutral"
confidence = 0.5
return {
"emotion": label,
"confidence": float(confidence),
"method": "rule_based",
"positive_words": positive_count,
"negative_words": negative_count
}
def analyze_dialogue(
self,
dialogue: List[Dict],
speaker_filter: Optional[str] = "Employee"
) -> Dict:
"""
分析整段对话的情绪时间线
Args:
dialogue: 对话列表
speaker_filter: 只分析特定说话人(默认Employee)
Returns:
情绪分析结果
"""
emotions = []
for turn in dialogue:
# 过滤说话人
if speaker_filter and turn.get("speaker") != speaker_filter:
continue
if "utterance" in turn:
result = self.analyze_turn(
turn["utterance"],
{"turn_id": turn.get("turn_id")}
)
result["turn_id"] = turn.get("turn_id")
emotions.append(result)
if not emotions:
return self._get_empty_result()
# 计算统计
total = len(emotions)
positive_count = sum(1 for e in emotions if e["emotion"] == "positive")
neutral_count = sum(1 for e in emotions if e["emotion"] == "neutral")
negative_count = sum(1 for e in emotions if e["emotion"] == "negative")
# 计算趋势
trend = self._calculate_trend(emotions)
# 计算得分 (0-100)
# 积极=100分,中性=50分,消极=0分
score = (
positive_count * 100 +
neutral_count * 50
) / total if total > 0 else 0.0
# 判断整体情绪
if positive_count / total >= 0.6:
overall = "positive"
elif negative_count / total >= 0.3:
overall = "negative"
else:
overall = "neutral"
return {
"score": round(score, 2),
"overall": overall,
"positive_ratio": round(positive_count / total, 4),
"neutral_ratio": round(neutral_count / total, 4),
"negative_ratio": round(negative_count / total, 4),
"positive_count": positive_count,
"neutral_count": neutral_count,
"negative_count": negative_count,
"total_turns": total,
"timeline": emotions,
"trend": trend
}
def _calculate_trend(self, emotions: List[Dict]) -> str:
"""计算情绪趋势"""
if len(emotions) < 2:
return "stable"
# 情绪到数值的映射
emotion_score = {"positive": 1, "neutral": 0, "negative": -1}
mid = len(emotions) // 2
# 前半段平均
first_half_avg = sum(
emotion_score.get(e["emotion"], 0) for e in emotions[:mid]
) / mid
# 后半段平均
second_half_avg = sum(
emotion_score.get(e["emotion"], 0) for e in emotions[mid:]
) / (len(emotions) - mid)
diff = second_half_avg - first_half_avg
if diff > 0.3:
return "improving"
elif diff < -0.3:
return "declining"
else:
return "stable"
def _get_empty_result(self) -> Dict:
"""返回空结果"""
return {
"score": 0.0,
"overall": "neutral",
"positive_ratio": 0.0,
"neutral_ratio": 0.0,
"negative_ratio": 0.0,
"positive_count": 0,
"neutral_count": 0,
"negative_count": 0,
"total_turns": 0,
"timeline": [],
"trend": "stable"
}
# 测试代码
if __name__ == "__main__":
analyzer = SentimentAnalyzer()
# 测试单轮分析
test_cases = [
"好的,谢谢你的帮助",
"我很不满意这个回答",
"请问还有什么可以帮到您?"
]
print("单轮情绪分析测试:")
for test in test_cases:
result = analyzer.analyze_turn(test)
print(f"\n测试: {test}")
print(f" 情绪: {result['label']}")
print(f" 置信度: {result['confidence']:.2f}")
print(f" 方法: {result['method']}")
# 测试对话分析
test_dialogue = [
{"turn_id": 1, "speaker": "Employee", "utterance": "你好"},
{"turn_id": 2, "speaker": "Employee", "utterance": "谢谢,非常满意"},
{"turn_id": 3, "speaker": "Employee", "utterance": "我有点不满"},
]
print("\n\n对话情绪分析测试:")
result = analyzer.analyze_dialogue(test_dialogue)
print(f"整体情绪: {result['overall']}")
print(f"积极比例: {result['positive_ratio']:.2%}")
print(f"中性比例: {result['neutral_ratio']:.2%}")
print(f"消极比例: {result['negative_ratio']:.2%}")
print(f"趋势: {result['trend']}")