Spaces:
Sleeping
Sleeping
| """ | |
| 情绪分析模块 | |
| 支持微调后的PaddleNLP模型和规则引擎备用方案 | |
| """ | |
| # 先导入 PaddleNLP 以避免与 datasets 的导入冲突 | |
| try: | |
| import paddlenlp | |
| except ImportError: | |
| pass | |
| from typing import Dict, List, Optional | |
| from pathlib import Path | |
| import numpy as np | |
| from config import EVALUATION_CONFIG | |
| class SentimentAnalyzer: | |
| """情绪分析器""" | |
| def __init__(self, model_path: Optional[str] = None): | |
| """ | |
| 初始化分析器 | |
| Args: | |
| model_path: 微调模型路径(支持本地路径或 HuggingFace Hub 模型名) | |
| """ | |
| self.model_path = model_path | |
| self.use_model = False | |
| self.classifier = None | |
| # 优先尝试加载微调模型 | |
| # model_path 可能是本地路径或 HuggingFace Hub 模型名(如 KarenYYH/sentiment-hr) | |
| if model_path: | |
| # 检查是否是本地路径 | |
| if Path(model_path).exists(): | |
| self._load_finetuned_model(model_path) | |
| # 检查是否是 HuggingFace Hub 模型名(不包含 / 则不是 Hub 模型) | |
| elif "/" in model_path: | |
| self._load_finetuned_model(model_path) | |
| else: | |
| self._load_official_model() | |
| else: | |
| self._load_official_model() | |
| # 情绪映射 | |
| self.emotion_map = { | |
| "positive": "positive", | |
| "neutral": "neutral", | |
| "negative": "negative" | |
| } | |
| def _load_finetuned_model(self, model_path: str): | |
| """加载微调后的模型""" | |
| print(f"正在加载微调模型: {model_path}...") | |
| # 对于 HuggingFace Hub 模型,先下载到本地临时目录 | |
| # 避免 aistudio_sdk 的兼容性问题 | |
| if "/" in model_path: | |
| local_path = self._download_from_hf_hub(model_path) | |
| if local_path: | |
| self._load_paddlenlp_model(local_path) | |
| return | |
| # 如果下载失败,继续尝试其他方式 | |
| # 尝试直接加载(本地路径或直接从 Hub) | |
| self._load_paddlenlp_model(model_path) | |
| def _download_from_hf_hub(self, model_id: str) -> Optional[str]: | |
| """使用 huggingface_hub 下载 PaddleNLP 模型到本地临时目录""" | |
| import tempfile | |
| try: | |
| from huggingface_hub import snapshot_download | |
| print(f"从 HuggingFace Hub 下载模型: {model_id}") | |
| # 创建临时目录 | |
| temp_dir = tempfile.mkdtemp(prefix="hf_model_") | |
| # 下载所有文件 | |
| snapshot_download( | |
| repo_id=model_id, | |
| local_dir=temp_dir, | |
| local_dir_use_symlinks=False | |
| ) | |
| print(f"模型已下载到: {temp_dir}") | |
| return temp_dir | |
| except ImportError: | |
| print("huggingface_hub 未安装,尝试直接加载...") | |
| return None | |
| except Exception as e: | |
| print(f"HuggingFace Hub 下载失败: {e}") | |
| return None | |
| def _load_paddlenlp_model(self, model_path: str): | |
| """使用 PaddleNLP 加载模型""" | |
| # 先尝试禁用 aistudio_sdk,然后再导入 PaddleNLP | |
| import os | |
| os.environ['HUB_DISABLE_DOWNLOAD'] = '1' | |
| try: | |
| import paddle | |
| from paddle.nn import Layer | |
| from paddle.nn import Linear, Dropout | |
| print(f"使用 PaddleNLP 加载模型: {model_path}") | |
| # 检查必需文件 | |
| model_state_path = Path(model_path) / "model_state.pdparams" | |
| config_path = Path(model_path) / "config.json" | |
| vocab_path = Path(model_path) / "vocab.txt" | |
| tokenizer_config_path = Path(model_path) / "tokenizer_config.json" | |
| if model_state_path.exists() and config_path.exists() and vocab_path.exists(): | |
| print(f"从本地文件加载模型: {model_state_path}") | |
| # 读取配置 | |
| import json | |
| with open(config_path, 'r') as f: | |
| config = json.load(f) | |
| # 手动创建 tokenizer(完全避免 PaddleNLP 的自动初始化) | |
| # 读取词汇表 | |
| with open(vocab_path, 'r', encoding='utf-8') as f: | |
| vocab_list = f.readlines() | |
| # 创建词汇表字典 | |
| vocab = {line.strip(): idx for idx, line in enumerate(vocab_list)} | |
| # 读取 tokenizer 配置 | |
| tokenizer_config = {} | |
| if tokenizer_config_path.exists(): | |
| with open(tokenizer_config_path, 'r') as f: | |
| tokenizer_config = json.load(f) | |
| # 延迟导入 - 只在真正需要时才导入 | |
| # 使用 exec 来避免模块级导入 | |
| import sys | |
| import types | |
| # 创建一个假的模块来拦截 aistudio_sdk 调用 | |
| class FakeAistudioHub: | |
| def download(*args, **kwargs): | |
| return None | |
| # 注入假模块 | |
| sys.modules['aistudio_sdk'] = types.ModuleType('aistudio_sdk', ()) | |
| sys.modules['aistudio_sdk.hub'] = types.ModuleType('aistudio_sdk.hub', ()) | |
| sys.modules['aistudio_sdk.hub'].download = FakeAistudioHub.download | |
| # 现在可以安全地导入 BertTokenizer | |
| from paddlenlp.transformers import BertTokenizer as BT | |
| self.tokenizer = BT( | |
| vocab, | |
| do_lower_case=tokenizer_config.get('do_lower_case', False), | |
| do_basic_tokenize=tokenizer_config.get('do_basic_tokenize', True), | |
| never_split=tokenizer_config.get('never_split', None) | |
| ) | |
| # 直接加载状态字典 | |
| state_dict = paddle.load(str(model_state_path)) | |
| # 创建一个自定义的 BERT 分类模型容器 | |
| class BertClassificationModel(Layer): | |
| def __init__(self, config_dict, num_labels=3): | |
| super().__init__() | |
| # 延迟导入 BertModel | |
| from paddlenlp.transformers import BertConfig, BertModel | |
| bert_config = BertConfig( | |
| vocab_size=config_dict.get('vocab_size', 21128), | |
| hidden_size=config_dict.get('hidden_size', 768), | |
| num_hidden_layers=config_dict.get('num_hidden_layers', 12), | |
| num_attention_heads=config_dict.get('num_attention_heads', 12), | |
| intermediate_size=config_dict.get('intermediate_size', 3072), | |
| hidden_act=config_dict.get('hidden_act', 'gelu'), | |
| hidden_dropout_prob=config_dict.get('hidden_dropout_prob', 0.1), | |
| attention_probs_dropout_prob=config_dict.get('attention_probs_dropout_prob', 0.1), | |
| max_position_embeddings=config_dict.get('max_position_embeddings', 512), | |
| type_vocab_size=config_dict.get('type_vocab_size', 2), | |
| initializer_range=config_dict.get('initializer_range', 0.02), | |
| pad_token_id=config_dict.get('pad_token_id', 0), | |
| ) | |
| # 直接创建 BertModel 实例 | |
| self.bert = BertModel(bert_config) | |
| self.dropout = Dropout(p=config_dict.get('hidden_dropout_prob', 0.1)) | |
| self.classifier = Linear( | |
| config_dict.get('hidden_size', 768), | |
| num_labels | |
| ) | |
| def forward(self, input_ids, token_type_ids=None, position_ids=None, attention_mask=None): | |
| outputs = self.bert( | |
| input_ids, | |
| token_type_ids=token_type_ids, | |
| position_ids=position_ids, | |
| attention_mask=attention_mask | |
| ) | |
| pooled_output = outputs[1] | |
| pooled_output = self.dropout(pooled_output) | |
| logits = self.classifier(pooled_output) | |
| return logits | |
| # 创建模型实例 | |
| self.model = BertClassificationModel(config, num_labels=3) | |
| # 加载权重 | |
| self.model.set_state_dict(state_dict) | |
| self.model.eval() | |
| else: | |
| # 尝试标准加载方式 | |
| from paddlenlp.transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| self.model = AutoModelForSequenceClassification.from_pretrained(model_path) | |
| self.model.eval() | |
| self.model_type = "paddlenlp" | |
| # 加载标签映射 | |
| self.id2label = self._load_label_mapping(model_path) | |
| self.use_model = True | |
| self.is_finetuned = True | |
| print(f"微调模型加载完成 (使用 PaddleNLP, 标签映射: {self.id2label})") | |
| except Exception as e: | |
| print(f"警告: PaddleNLP 模型加载失败: {e}") | |
| print("尝试加载官方预训练模型...") | |
| self._load_official_model() | |
| def _load_label_mapping(self, model_path: str) -> dict: | |
| """从模型配置加载标签映射""" | |
| import json | |
| # 首先尝试从 training_info.json 加载 | |
| if "/" not in model_path: # 本地路径 | |
| config_path = Path(model_path) / 'training_info.json' | |
| else: | |
| # HuggingFace Hub 模型,尝试从 config 加载 | |
| try: | |
| from transformers import AutoConfig | |
| config = AutoConfig.from_pretrained(model_path) | |
| if hasattr(config, 'id2label'): | |
| # 将 {0: 'LABEL_0', 1: 'LABEL_1', ...} 映射到情绪标签 | |
| # 根据训练时使用的标签顺序 | |
| label_values = list(config.id2label.values()) | |
| if len(label_values) == 3: | |
| # 假设顺序是 positive, neutral, negative | |
| return {0: 'positive', 1: 'neutral', 2: 'negative'} | |
| else: | |
| return {0: 'positive', 1: 'neutral', 2: 'negative'} | |
| except: | |
| pass | |
| # 对于 HuggingFace Hub 模型,无法直接读取 training_info.json | |
| # 使用默认映射 | |
| return {0: 'positive', 1: 'neutral', 2: 'negative'} | |
| if config_path.exists(): | |
| with open(config_path, 'r', encoding='utf-8') as f: | |
| config = json.load(f) | |
| return config.get('label_map', {0: 'positive', 1: 'neutral', 2: 'negative'}) | |
| return {0: 'positive', 1: 'neutral', 2: 'negative'} | |
| def _load_official_model(self): | |
| """加载官方预训练模型""" | |
| print("正在加载PaddleNLP官方情感分类模型...") | |
| try: | |
| from paddlenlp import Taskflow | |
| # 尝试使用正确的任务名称 | |
| try: | |
| self.classifier = Taskflow("sentiment_analysis") | |
| except AssertionError: | |
| # 如果sentiment_analysis不可用,尝试其他名称 | |
| self.classifier = Taskflow("emotion") | |
| self.use_model = True | |
| self.is_finetuned = False | |
| print("PaddleNLP官方模型加载完成") | |
| except Exception as e: | |
| print(f"警告: PaddleNLP加载失败: {e}") | |
| print("使用基于规则的情绪分析(备用方案)") | |
| self.use_model = False | |
| def analyze_turn( | |
| self, | |
| utterance: str, | |
| context: Optional[Dict] = None | |
| ) -> Dict: | |
| """ | |
| 分析单轮对话的情感 | |
| Args: | |
| utterance: 对话语句 | |
| context: 上下文 | |
| Returns: | |
| 情感分析结果 | |
| """ | |
| if self.use_model: | |
| return self._analyze_with_model(utterance) | |
| else: | |
| return self._analyze_with_rules(utterance) | |
| def _analyze_with_model(self, utterance: str) -> Dict: | |
| """使用PaddleNLP模型分析""" | |
| try: | |
| # 判断是微调模型还是官方模型 | |
| if getattr(self, 'is_finetuned', False): | |
| return self._predict_with_finetuned(utterance) | |
| else: | |
| return self._predict_with_official(utterance) | |
| except Exception as e: | |
| print(f"PaddleNLP分析失败: {e},使用备用方案") | |
| return self._analyze_with_rules(utterance) | |
| def _predict_with_finetuned(self, utterance: str) -> Dict: | |
| """使用微调模型预测""" | |
| import paddle | |
| # 分词 | |
| encoded = self.tokenizer( | |
| utterance, | |
| max_length=128, | |
| padding='max_length', | |
| truncation=True, | |
| return_tensors='pd' | |
| ) | |
| # 预测 | |
| with paddle.no_grad(): | |
| logits = self.model( | |
| encoded['input_ids'], | |
| token_type_ids=encoded['token_type_ids'] | |
| ) | |
| # 计算概率 | |
| probs = paddle.nn.functional.softmax(logits, axis=1).numpy()[0] | |
| pred_id = int(probs.argmax()) | |
| return { | |
| "emotion": self.id2label[pred_id], | |
| "confidence": float(probs[pred_id]), | |
| "probabilities": {self.id2label[i]: float(prob) for i, prob in enumerate(probs)}, | |
| "method": "finetuned_paddlenlp" | |
| } | |
| def _predict_with_official(self, utterance: str) -> Dict: | |
| """使用官方模型预测""" | |
| # PaddleNLP Taskflow返回结果 | |
| result = self.classifier(utterance) | |
| if isinstance(result, list) and len(result) > 0: | |
| top_result = result[0] | |
| # 提取标签和置信度 | |
| label = top_result.get('label', 'neutral') | |
| confidence = top_result.get('score', 0.5) | |
| # 映射标签 | |
| mapped_label = self.emotion_map.get(label, "neutral") | |
| return { | |
| "emotion": mapped_label, | |
| "confidence": float(confidence), | |
| "raw_label": label, | |
| "method": "official_paddlenlp" | |
| } | |
| # 如果官方模型返回格式不符合预期,降级到规则引擎 | |
| return self._analyze_with_rules(utterance) | |
| def _analyze_with_rules(self, utterance: str) -> Dict: | |
| """使用基于规则的分析(备用方案)""" | |
| # 积极词汇 | |
| positive_words = [ | |
| "好", "满意", "喜欢", "谢谢", "感谢", "可以", "行", | |
| "yes", "ok", "好的", "没问题", "支持" | |
| ] | |
| # 消极词汇 | |
| negative_words = [ | |
| "不", "没", "不行", "不好", "讨厌", "烦", "生气", "愤怒", | |
| "不满", "投诉", "举报", "错误", "失败", "no", "not" | |
| ] | |
| # 检测积极词 | |
| positive_count = sum(1 for word in positive_words if word in utterance.lower()) | |
| # 检测消极词 | |
| negative_count = sum(1 for word in negative_words if word in utterance.lower()) | |
| # 判断情绪 | |
| if positive_count > negative_count: | |
| label = "positive" | |
| confidence = min(0.7, 0.5 + positive_count * 0.1) | |
| elif negative_count > positive_count: | |
| label = "negative" | |
| confidence = min(0.7, 0.5 + negative_count * 0.1) | |
| else: | |
| label = "neutral" | |
| confidence = 0.5 | |
| return { | |
| "emotion": label, | |
| "confidence": float(confidence), | |
| "method": "rule_based", | |
| "positive_words": positive_count, | |
| "negative_words": negative_count | |
| } | |
| def analyze_dialogue( | |
| self, | |
| dialogue: List[Dict], | |
| speaker_filter: Optional[str] = "Employee" | |
| ) -> Dict: | |
| """ | |
| 分析整段对话的情绪时间线 | |
| Args: | |
| dialogue: 对话列表 | |
| speaker_filter: 只分析特定说话人(默认Employee) | |
| Returns: | |
| 情绪分析结果 | |
| """ | |
| emotions = [] | |
| for turn in dialogue: | |
| # 过滤说话人 | |
| if speaker_filter and turn.get("speaker") != speaker_filter: | |
| continue | |
| if "utterance" in turn: | |
| result = self.analyze_turn( | |
| turn["utterance"], | |
| {"turn_id": turn.get("turn_id")} | |
| ) | |
| result["turn_id"] = turn.get("turn_id") | |
| emotions.append(result) | |
| if not emotions: | |
| return self._get_empty_result() | |
| # 计算统计 | |
| total = len(emotions) | |
| positive_count = sum(1 for e in emotions if e["emotion"] == "positive") | |
| neutral_count = sum(1 for e in emotions if e["emotion"] == "neutral") | |
| negative_count = sum(1 for e in emotions if e["emotion"] == "negative") | |
| # 计算趋势 | |
| trend = self._calculate_trend(emotions) | |
| # 计算得分 (0-100) | |
| # 积极=100分,中性=50分,消极=0分 | |
| score = ( | |
| positive_count * 100 + | |
| neutral_count * 50 | |
| ) / total if total > 0 else 0.0 | |
| # 判断整体情绪 | |
| if positive_count / total >= 0.6: | |
| overall = "positive" | |
| elif negative_count / total >= 0.3: | |
| overall = "negative" | |
| else: | |
| overall = "neutral" | |
| return { | |
| "score": round(score, 2), | |
| "overall": overall, | |
| "positive_ratio": round(positive_count / total, 4), | |
| "neutral_ratio": round(neutral_count / total, 4), | |
| "negative_ratio": round(negative_count / total, 4), | |
| "positive_count": positive_count, | |
| "neutral_count": neutral_count, | |
| "negative_count": negative_count, | |
| "total_turns": total, | |
| "timeline": emotions, | |
| "trend": trend | |
| } | |
| def _calculate_trend(self, emotions: List[Dict]) -> str: | |
| """计算情绪趋势""" | |
| if len(emotions) < 2: | |
| return "stable" | |
| # 情绪到数值的映射 | |
| emotion_score = {"positive": 1, "neutral": 0, "negative": -1} | |
| mid = len(emotions) // 2 | |
| # 前半段平均 | |
| first_half_avg = sum( | |
| emotion_score.get(e["emotion"], 0) for e in emotions[:mid] | |
| ) / mid | |
| # 后半段平均 | |
| second_half_avg = sum( | |
| emotion_score.get(e["emotion"], 0) for e in emotions[mid:] | |
| ) / (len(emotions) - mid) | |
| diff = second_half_avg - first_half_avg | |
| if diff > 0.3: | |
| return "improving" | |
| elif diff < -0.3: | |
| return "declining" | |
| else: | |
| return "stable" | |
| def _get_empty_result(self) -> Dict: | |
| """返回空结果""" | |
| return { | |
| "score": 0.0, | |
| "overall": "neutral", | |
| "positive_ratio": 0.0, | |
| "neutral_ratio": 0.0, | |
| "negative_ratio": 0.0, | |
| "positive_count": 0, | |
| "neutral_count": 0, | |
| "negative_count": 0, | |
| "total_turns": 0, | |
| "timeline": [], | |
| "trend": "stable" | |
| } | |
| # 测试代码 | |
| if __name__ == "__main__": | |
| analyzer = SentimentAnalyzer() | |
| # 测试单轮分析 | |
| test_cases = [ | |
| "好的,谢谢你的帮助", | |
| "我很不满意这个回答", | |
| "请问还有什么可以帮到您?" | |
| ] | |
| print("单轮情绪分析测试:") | |
| for test in test_cases: | |
| result = analyzer.analyze_turn(test) | |
| print(f"\n测试: {test}") | |
| print(f" 情绪: {result['label']}") | |
| print(f" 置信度: {result['confidence']:.2f}") | |
| print(f" 方法: {result['method']}") | |
| # 测试对话分析 | |
| test_dialogue = [ | |
| {"turn_id": 1, "speaker": "Employee", "utterance": "你好"}, | |
| {"turn_id": 2, "speaker": "Employee", "utterance": "谢谢,非常满意"}, | |
| {"turn_id": 3, "speaker": "Employee", "utterance": "我有点不满"}, | |
| ] | |
| print("\n\n对话情绪分析测试:") | |
| result = analyzer.analyze_dialogue(test_dialogue) | |
| print(f"整体情绪: {result['overall']}") | |
| print(f"积极比例: {result['positive_ratio']:.2%}") | |
| print(f"中性比例: {result['neutral_ratio']:.2%}") | |
| print(f"消极比例: {result['negative_ratio']:.2%}") | |
| print(f"趋势: {result['trend']}") | |