""" 智能判断分析层 分析员工问题,生成判断报告和回复指令 """ import json import re from typing import Dict, List, Optional, Tuple from models.correctness import CorrectnessEvaluator from models.compliance import ComplianceChecker from models.sentiment import SentimentAnalyzer from config import MODEL_CONFIG import numpy as np class IntelligenceAnalyzer: """ 智能分析器 - 第一层 分析员工问题,生成判断报告和回复指令 """ # HR场景定义 HR_SCENARIOS = { "training_application": { "name": "培训申请", "description": "员工申请参加培训课程", "required_info": ["training_type", "participant_count", "budget", "duration"], "workflow": [ "确认培训类型", "收集参与人数", "询问预算范围", "确认培训时长", "了解特殊要求" ], "policy_notes": "培训需符合年度培训计划,预算需在部门预算范围内" }, "leave_application": { "name": "请假申请", "description": "员工申请各类假期", "required_info": ["leave_type", "start_date", "end_date", "reason"], "workflow": [ "确认请假类型", "确认请假时间", "询问请假原因", "提醒交接工作" ], "policy_notes": "年假需提前3天申请,病假需提供证明" }, "salary_inquiry": { "name": "薪资咨询", "description": "员工咨询薪资相关问题", "required_info": [], "workflow": [ "了解具体咨询内容", "解释相关政策", "提供计算方式" ], "policy_notes": "薪资属于隐私,只能查询个人薪资信息" }, "complaint": { "name": "投诉/不满", "description": "员工表达不满或投诉", "required_info": ["issue_description", "affected_parties"], "workflow": [ "表达歉意和理解", "了解具体情况", "记录问题", "承诺处理时限" ], "policy_notes": "需要耐心倾听,记录详细信息,及时反馈" }, "resignation": { "name": "离职申请", "description": "员工提出离职", "required_info": ["last_working_day", "reason"], "workflow": [ "确认离职意向", "了解离职原因", "说明离职流程", "安排工作交接" ], "policy_notes": "正式员工需提前30天通知,试用期需提前3天" }, "resignation_inquiry": { "name": "离职咨询", "description": "员工咨询离职相关政策", "required_info": [], "workflow": [ "理解咨询内容", "解释离职政策", "提供相关信息" ], "policy_notes": "离职补偿、离职流程等政策咨询" }, "policy_inquiry": { "name": "政策咨询", "description": "员工咨询公司政策或劳动法规", "required_info": ["policy_topic"], "workflow": [ "理解咨询内容", "提供相关政策", "解释具体条款" ], "policy_notes": "确保信息准确,不确定时需查阅后回复" }, # 新增场景 "reimbursement": { "name": "报销申请", "description": "员工申请费用报销", "required_info": ["expense_type", "amount", "description"], "workflow": [ "确认报销类型", "核实报销金额", "了解费用详情", "说明报销流程" ], "policy_notes": "报销需在发生费用后30日内申请,需提供发票" }, "business_trip": { "name": "出差申请", "description": "员工申请出差", "required_info": ["destination", "duration", "purpose"], "workflow": [ "确认出差地点", "确认出差时间", "了解出差目的", "说明审批流程" ], "policy_notes": "出差需提前申请,部门经理审批" }, "overtime": { "name": "加班申请", "description": "员工申请加班", "required_info": ["overtime_date", "duration", "reason"], "workflow": [ "确认加班日期", "确认加班时长", "了解加班原因", "说明审批流程" ], "policy_notes": "加班需提前申请,加班费按公司规定计算" }, "promotion": { "name": "晋升咨询", "description": "员工咨询晋升相关问题", "required_info": [], "workflow": [ "了解咨询内容", "解释晋升政策", "提供发展建议" ], "policy_notes": "晋升每年评审一次,需满足任职年限和绩效要求" }, "transfer": { "name": "转岗申请", "description": "员工申请内部转岗", "required_info": ["target_position", "reason"], "workflow": [ "确认目标岗位", "了解转岗原因", "说明转岗流程", "确认双方部门意见" ], "policy_notes": "转岗需原部门和目标部门双方同意" }, "benefits": { "name": "福利咨询", "description": "员工咨询福利待遇", "required_info": ["benefit_type"], "workflow": [ "确认咨询内容", "解释福利政策", "提供申请方式" ], "policy_notes": "福利包括社保、公积金、商业保险等" }, "contract_renewal": { "name": "合同续签", "description": "员工合同到期续签", "required_info": [], "workflow": [ "确认合同到期时间", "了解续签意向", "说明续签流程", "确认续签条件" ], "policy_notes": "合同到期前30天需确认续签意向" }, "performance_review": { "name": "绩效考核", "description": "员工咨询绩效考核", "required_info": [], "workflow": [ "了解咨询内容", "解释考核标准", "提供考核时间安排" ], "policy_notes": "绩效考核每季度进行一次" }, "serious_complaint": { "name": "严重投诉", "description": "员工反映严重问题(欠薪、违法用工等)", "required_info": ["issue_details", "affected_period"], "workflow": [ "认真倾听员工诉求", "表达理解和关心", "承诺反馈给公司", "说明内部处理流程", "承诺跟进处理" ], "policy_notes": "此类问题需高度重视,及时向公司反馈并推动解决,维护员工关系" }, "general_inquiry": { "name": "一般咨询", "description": "其他一般性问题", "required_info": [], "workflow": [ "理解问题", "提供信息或引导" ], "policy_notes": "友好解答,无法解答时转交相关负责人" } } # 中文数字映射 CHINESE_NUMBERS = { "一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9, "十": 10, "两": 2, "俩": 2, "仨": 3 } # 程度词映射 INTENSITY_MODIFIERS = { # 高程度 "非常": 0.9, "特别": 0.9, "极其": 0.95, "十分": 0.85, "超级": 0.9, "太": 0.8, "真是": 0.8, # 中等程度 "比较": 0.6, "还算": 0.55, "挺": 0.6, # 低程度 "有点": 0.3, "稍微": 0.25, "略": 0.2, "有些": 0.35, "不算": 0.4 } # 否定词 NEGATION_WORDS = ["不", "没", "无", "非", "未", "别"] # 信息类型映射 (支持中文数字) INFO_TYPE_PATTERNS = { "training_type": [r"培训", r"课程", r"学习"], # 支持多种数字格式 "participant_count": [ r"(\d+)人", # 3人 r"(三|两|四|五|六|七|八|九|十)个人", # 三个人 r"参加.*?(\d+|[三两四五六七八九十])", # 参加3/三 r"人数.*?(\d+|[三两四五六七八九十])" ], "budget": [ r"预算[::]?\s*(\d+[元块万千k]?)", # 预算:10000元/块/万/k r"费用[::]?\s*(\d+[元块万千k]?)", r"(\d+[元块万千k])\s*(预算|费用)?", # 10000元预算/10000块 r"([一二三四五六七八九十百千万]+)[元块万千k]?", # 中文数字+单位:一万块 r"(\d+)[元块万千k]", # 阿拉伯数字+单位:10000元 r"(\d+)万", r"(\d+)k" # 简写:10000万、10000k ], "duration": [ r"(\d+)天", r"(\d+)小时", r"([一二三四五六七八九十]+)天", r"时长", r"多长时间" ], "leave_type": [r"年假", r"病假", r"事假", r"调休", r"婚假", r"产假", r"陪产假"], "start_date": [r"从.*开始", r"(\d+)月(\d+)日", r"明天", r"后天"], "end_date": [r"到.*结束", r"至", r"(\d+)月(\d+)日"], "reason": [r"因为", r"由于", r"原因"], "issue_description": [r"不满", r"问题", r"投诉"], "last_working_day": [r"最后一天", r"(\d+)号.*离职"], "policy_topic": [r"社保", r"公积金", r"加班", r"福利"], # 新增 "expense_type": [r"交通", r"住宿", r"餐饮", r"招待"], "amount": [r"(\d+)元", r"([一二三四五六七八九十百千万]+)元", r"(\d+)块", r"([一二三四五六七八九十百千万]+)块", r"(\d+)万"], "destination": [r"去.*?(\w{2,})", r"到.*?(\w{2,})"], "overtime_date": [r"(\d+)月(\d+)日", r"明天", r"本周"], "target_position": [r"申请.*?(\w{2,}岗)", r"转.*?(\w{2,})"] } def __init__(self): """初始化分析器""" self.correctness_evaluator = CorrectnessEvaluator() self.compliance_checker = ComplianceChecker() # 传递微调模型路径(如果有) sentiment_model_path = MODEL_CONFIG.get("sentiment_model_path") self.sentiment_analyzer = SentimentAnalyzer(model_path=sentiment_model_path) # 导入上下文管理器 from services.conversation_context import get_conversation_manager self.context_manager = get_conversation_manager() self.current_context = None # 初始化意图模型属性 self.intent_model = None self.intent_tokenizer = None self.intent_labels = None def analyze( self, employee_input: str, conversation_history: Optional[List[Dict]] = None, session_id: Optional[str] = None ) -> Dict: """ 分析员工输入(增强版:支持对话上下文) Args: employee_input: 员工的问题 conversation_history: 对话历史 session_id: 会话ID(用于多用户支持) Returns: { "analysis_report": {...}, # 分析报告 "reply_instruction": {...}, # 回复指令 "context_update": {...} # 上下文更新 } """ # 获取或创建对话上下文 if session_id: self.current_context = self.context_manager.get_or_create_session(session_id) else: # 使用默认会话 self.current_context = self.context_manager.get_or_create_session("default") # 检查是否是追问(传入对话历史用于追问检测) followup_info = self.current_context.is_followup_question(employee_input, conversation_history) # 如果是信息供给类追问,直接更新信息收集状态 if followup_info["is_followup"] and followup_info.get("followup_type") == "information_supply": # 追问处理:从回答中提取信息并更新上下文 return self._handle_followup_response( employee_input, conversation_history, followup_info ) # 正常分析流程(首次问题或新话题) return self._analyze_new_topic( employee_input, conversation_history ) def _handle_followup_response( self, employee_input: str, conversation_history: Optional[List[Dict]], followup_info: Dict ) -> Dict: """处理追问回答""" # 记录当前轮次 self.current_context.add_to_history({ "role": "user", "content": employee_input }) # 从对话历史中恢复上下文状态 if conversation_history and len(conversation_history) >= 2: # 重建上下文:从对话历史中获取最后一条assistant消息 last_assistant_msg = None for msg in reversed(conversation_history): if msg.get("role") == "assistant": last_assistant_msg = msg.get("content", "") break if last_assistant_msg: # 检查是否是"年假天数"场景,且回答是年份 # 扩展检查:包括政策咨询场景中年假相关的问题 annual_leave_keywords = ["年假", "请假天数", "入职日期", "入职时间", "入职年份", "工龄"] has_annual_leave_context = any(kw in last_assistant_msg for kw in annual_leave_keywords) has_year_input = re.search(r'(19|20)\d{2}年?', employee_input) print(f"[DEBUG] last_assistant_msg: {last_assistant_msg[:100]}...") print(f"[DEBUG] has_annual_leave_context: {has_annual_leave_context}") print(f"[DEBUG] has_year_input: {has_year_input is not None}") if has_annual_leave_context and has_year_input: # 这是一个特殊场景:年假天数 + 入职年份 # 直接生成确认完成的回复 print(f"[DEBUG] 触发年假计算特殊处理") return self._generate_annual_leave_response(employee_input, last_assistant_msg) # 根据HR回复内容推断场景并初始化上下文 scenario_id = self._infer_scenario_from_response(last_assistant_msg) scenario_def = self.HR_SCENARIOS.get(scenario_id, {}) # 初始化场景状态 self.current_context.current_scenario = scenario_id self.current_context.scenario_confidence = 0.8 self.current_context.total_steps = len(scenario_def.get("workflow", [])) # 根据HR回复内容推断已收集和缺失的信息 self._restore_info_state_from_response(last_assistant_msg, scenario_def) # 获取场景ID scenario_id = self.current_context.current_scenario or "general_inquiry" scenario_def = self.HR_SCENARIOS.get(scenario_id, {}) required_info = scenario_def.get("required_info", []) # 提取新信息 extracted_info = self._extract_information(employee_input, {"scenario_id": scenario_id, "required_info": required_info}) # 手动更新上下文的已收集信息 new_collected = extracted_info.get("extracted_data", {}) for key, value in new_collected.items(): if key not in self.current_context.collected_info: self.current_context.collected_info[key] = value # 重新计算缺失信息 updated_missing = [field for field in required_info if field not in self.current_context.collected_info] self.current_context.missing_info = updated_missing # 更新上下文中的信息 context_summary = self.current_context.update_from_analysis( { "scenario": {"scenario_id": scenario_id}, "information_extraction": extracted_info, "missing_information": { "missing_fields": updated_missing }, "conversation_stage": { "stage": "in_progress" if updated_missing else "complete", "current_step": self.current_context.current_step + 1, "total_steps": self.current_context.total_steps } }, {"role": "user", "content": employee_input} ) # 获取下一步行动 next_action = self.current_context.get_next_action_suggestion() # 情绪分析 emotion = self._analyze_emotion(employee_input) # 风险评估 risk_assessment = self._assess_risk(employee_input) # 如果有下一个问题,先记录下来(在生成回复指令之前) if next_action.get("action") == "ask_next_question": suggested_question = next_action.get("suggested_question", "") self.current_context.record_hr_interaction( hr_response=suggested_question, extracted_question=suggested_question ) # 生成回复指令 reply_instruction = self._generate_reply_instruction_from_context( next_action, emotion, risk_assessment ) return { "analysis_report": { "intent": { "primary_intent": "supply_info", "confidence": 0.9, "intent_scores": {"supply_info": 0.9, "apply": 0.1} }, "scenario": { "scenario_id": self.current_context.current_scenario, "scenario_name": self._get_scenario_name(self.current_context.current_scenario), "confidence": self.current_context.scenario_confidence }, "extracted_info": extracted_info, "missing_info": self.current_context.missing_info, "conversation_stage": { "stage": self.current_context.conversation_stage, "current_step": self.current_context.current_step, "total_steps": self.current_context.total_steps, "completion_rate": context_summary["completion_rate"] }, "emotion": emotion, "risk_assessment": risk_assessment, "is_followup": True, "followup_info": followup_info }, "reply_instruction": reply_instruction, "context_update": context_summary } def _analyze_new_topic( self, employee_input: str, conversation_history: Optional[List[Dict]] ) -> Dict: """分析新话题(原有逻辑)""" # Step 1: 意图识别 intent = self._detect_intent(employee_input) # Step 2: 场景识别 scenario = self._identify_scenario(employee_input, intent) # Step 3: 信息提取 extracted_info = self._extract_information(employee_input, scenario) # Step 4: 检查缺失信息 missing_info = self._check_missing_info(scenario, extracted_info) # Step 5: 情绪分析 emotion = self._analyze_emotion(employee_input) # Step 6: 风险检测 risk_assessment = self._assess_risk(employee_input) # Step 7: 生成回复指令 reply_instruction = self._generate_reply_instruction( scenario=scenario, intent=intent, extracted_info=extracted_info, missing_info=missing_info, emotion=emotion, risk_assessment=risk_assessment, conversation_history=conversation_history, user_question=employee_input # 传递原始问题用于知识库检索 ) # 更新上下文(为多轮对话做准备) context_summary = None if self.current_context: # 如果有缺失信息,说明需要追问,记录HR的问题 hr_question = None if missing_info and reply_instruction.get("suggested_templates"): hr_question = reply_instruction["suggested_templates"][0] # 先记录HR的问题(在update_from_analysis之前) self.current_context.record_hr_interaction( hr_response=hr_question, extracted_question=hr_question ) # 更新上下文状态 context_summary = self.current_context.update_from_analysis( { "scenario": scenario, "information_extraction": extracted_info, "missing_information": {"missing_fields": missing_info}, "conversation_stage": self._determine_conversation_stage( extracted_info, missing_info, scenario ) }, {"role": "user", "content": employee_input} ) return { "analysis_report": { "intent": intent, "scenario": scenario, "extracted_info": extracted_info, "missing_info": missing_info, "emotion": emotion, "risk_assessment": risk_assessment, "conversation_stage": self._determine_conversation_stage( extracted_info, missing_info, scenario ) }, "reply_instruction": reply_instruction, "context_update": context_summary } def _detect_intent(self, text: str) -> Dict: """ 检测意图 Returns: { "primary_intent": "apply/inquire/complain/other", "confidence": 0.95, "intent_details": {...} } """ text_lower = text.lower() # 意图关键词 intent_patterns = { "apply": ["申请", "想", "要", "需要", "希望", "我想"], "inquire": ["怎么", "如何", "什么", "是否", "能不能", "可以", "多少", "?", "?"], "complain": ["不满", "投诉", "生气", "不满意", "问题", "不公", "抗议", "欠薪", "拖欠", "不发工资", "克扣", "违法", "仲裁", "起诉", "诉讼", "告", "维权", "劳动监察", "举报"], "report": ["汇报", "报告", "通知"] } # 计算匹配分数 intent_scores = {} for intent, keywords in intent_patterns.items(): score = sum(1 for kw in keywords if kw in text) intent_scores[intent] = score # 确定主要意图 if not intent_scores or max(intent_scores.values()) == 0: primary_intent = "other" confidence = 0.3 else: primary_intent = max(intent_scores, key=intent_scores.get) max_score = intent_scores[primary_intent] confidence = min(0.5 + max_score * 0.15, 0.95) return { "primary_intent": primary_intent, "confidence": confidence, "intent_scores": intent_scores } def _identify_scenario(self, text: str, intent: Dict) -> Dict: """ 识别HR场景 (优先使用BERT模型) Returns: { "scenario_id": "training_application", "scenario_name": "培训申请", "confidence": 0.9 } """ # 尝试使用模型预测 if self.intent_model and self.intent_tokenizer and self.intent_labels: try: inputs = self.intent_tokenizer( text, return_tensors="pt", truncation=True, padding=True, max_length=64 ) with torch.no_grad(): outputs = self.intent_model(**inputs) probs = torch.nn.functional.softmax(outputs.logits, dim=-1) confidence, predicted_idx = torch.max(probs, dim=-1) confidence_score = confidence.item() predicted_label = str(predicted_idx.item()) # id2label keys are strings in json usually # 转换 label ID to scenario ID scenario_id = self.intent_labels.get(predicted_label) if scenario_id and confidence_score >= INTENT_MODEL_CONFIG["confidence_threshold"]: return { "scenario_id": scenario_id, "scenario_name": self._get_scenario_name(scenario_id), "confidence": confidence_score, "source": "model" } except Exception as e: logger.error(f"Model prediction failed: {e}") # 降级到规则匹配 text_lower = text.lower() # 先判断是否是咨询类问题(优先级高) # 咨询类问题通常包含"多少"、"怎么"、"如何"、"什么"、"哪些"等疑问词 inquiry_indicators = ["多少", "怎么", "如何", "什么", "哪些", "是否", "有没有", "几", "吗", "呢", "?", "?"] is_inquiry = any(ind in text_lower for ind in inquiry_indicators) # 场景关键词匹配 scenario_keywords = { "training_application": ["培训", "课程", "学习", "进修"], "leave_application": ["请假", "休假", "病假", "事假", "调休"], # 移除"年假"避免与咨询混淆 "salary_inquiry": ["薪资", "工资", "薪水", "奖金", "加班费"], "complaint": ["不满", "投诉", "生气", "不满意"], "resignation_inquiry": ["补偿金", "补偿", "怎么计算", "如何计算", "流程", "政策"], "resignation": ["离职", "辞职", "不走"], "policy_inquiry": ["政策", "规定", "制度", "社保", "公积金", "年假", "加班", "福利", "请假"], # 添加"年假" # 新增场景关键词 "reimbursement": ["报销", "费用", "发票"], "business_trip": ["出差", "去外地", "外地"], "overtime": ["加班", "OT", "晚走"], "promotion": ["晋升", "升职", "升职加薪"], "transfer": ["转岗", "调岗", "换部门"], "benefits": ["福利", "保险", "补贴"], "contract_renewal": ["合同", "续签", "到期"], "performance_review": ["绩效", "考核", "考评"], # 严重投诉场景关键词(优先级高) "serious_complaint": ["欠薪", "拖欠工资", "不发工资", "克扣工资", "违法", "侵权", "逼迫", "威胁", "骚扰", "歧视", "仲裁", "起诉", "诉讼", "告", "维权", "劳动监察", "举报", "不发了", "再不", "有的没的"] } # 计算场景匹配分数 scenario_scores = {} for scenario_id, keywords in scenario_keywords.items(): score = sum(1 for kw in keywords if kw in text_lower) if score > 0: scenario_scores[scenario_id] = score # 确定场景 - 优先级处理 if not scenario_scores: scenario_id = "general_inquiry" confidence = 0.5 else: # 检查申请类意图词(如"想申请"、"要请假"等) application_indicators = ["想", "要", "申请", "打算", "准备", "希望"] has_application_intent = any(ind in text_lower for ind in application_indicators) # 如果有申请意图,优先匹配申请类场景 if has_application_intent: # 排除咨询类场景,优先匹配申请类 application_scenarios = { k: v for k, v in scenario_scores.items() if k in ["training_application", "leave_application", "reimbursement", "business_trip", "overtime", "resignation", "transfer"] } if application_scenarios: scenario_id = max(application_scenarios, key=application_scenarios.get) else: scenario_id = max(scenario_scores, key=scenario_scores.get) # 如果是咨询类问题且没有申请意图,优先匹配咨询类场景 elif is_inquiry: inquiry_scenarios = { k: v for k, v in scenario_scores.items() if k in ["policy_inquiry", "salary_inquiry", "resignation_inquiry", "benefits", "promotion", "contract_renewal"] } if inquiry_scenarios: scenario_id = max(inquiry_scenarios, key=inquiry_scenarios.get) else: scenario_id = max(scenario_scores, key=scenario_scores.get) else: scenario_id = max(scenario_scores, key=scenario_scores.get) max_score = scenario_scores[scenario_id] confidence = min(0.6 + max_score * 0.1, 0.95) scenario_info = self.HR_SCENARIOS.get(scenario_id, self.HR_SCENARIOS["general_inquiry"]) return { "scenario_id": scenario_id, "scenario_name": scenario_info["name"], "description": scenario_info["description"], "confidence": confidence, "required_info": scenario_info["required_info"], "workflow": scenario_info["workflow"], "policy_notes": scenario_info["policy_notes"] } def _extract_information(self, text: str, scenario: Dict) -> Dict: """ 提取信息 Returns: { "training_type": "机器学习培训", "participant_count": "3", "extracted_fields": ["training_type", "participant_count"] } """ extracted = {} scenario_id = scenario.get("scenario_id", "") required_info = scenario.get("required_info", []) # 根据场景需要提取的信息 for info_type in required_info: patterns = self.INFO_TYPE_PATTERNS.get(info_type, []) for pattern in patterns: matches = re.finditer(pattern, text) for match in matches: if info_type not in extracted: # 返回完整的匹配字符串(match.group(0)),而不是捕获组 matched_text = match.group(0) # 验证匹配是否有效(避免过度匹配) # 例如:避免"三个人"中的"三"被匹配为预算 if self._is_valid_extraction(info_type, matched_text, text): extracted[info_type] = matched_text break return { "extracted_data": extracted, "extracted_fields": list(extracted.keys()), "extraction_confidence": len(extracted) / len(required_info) if required_info else 1.0 } def _is_valid_extraction(self, info_type: str, matched_text: str, full_text: str) -> bool: """验证提取的信息是否有效""" # 对于预算和金额,必须包含货币单位或明确的预算关键词 if info_type == "budget": # 预算必须包含明确的单位或预算相关词 budget_indicators = ["预算", "费用", "元", "块", "万", "k", "K"] return any(ind in matched_text for ind in budget_indicators) # 对于时长,必须包含时间单位 if info_type == "duration": duration_indicators = ["天", "小时", "小时", "时长", "多长时间"] return any(ind in matched_text for ind in duration_indicators) # 对于人数,必须包含"人"字 if info_type == "participant_count": return "人" in matched_text return True def _check_missing_info(self, scenario: Dict, extracted_info: Dict) -> List[str]: """检查缺失信息""" required = scenario.get("required_info", []) extracted = extracted_info.get("extracted_fields", []) missing = [field for field in required if field not in extracted] return missing def _analyze_emotion(self, text: str) -> Dict: """ 分析情绪(增强版:支持否定词和程度词) Returns: { "emotion": "neutral/positive/negative", "intensity": 0.6, "has_negation": false, "indicators": [...] } """ # 情绪词库 positive_words = ["满意", "感谢", "期待", "开心", "高兴", "好", "喜欢", "不错"] negative_words = ["不满", "生气", "投诉", "失望", "糟糕", "差", "难过", "烦恼", "欠薪", "拖欠", "克扣", "不发工资", "违法", "侵权", "逼迫", "威胁", "骚扰", "歧视", "不公", "抗议", "仲裁", "起诉", "诉讼", "告", "维权", "有的没的", "废话", "不发了", "再不"] # 威胁性词汇(即使有否定词前缀,也保持负面情绪) threat_words = ["仲裁", "起诉", "诉讼", "告", "维权", "劳动监察", "举报"] text_lower = text.lower() # 检测否定词 has_negation = any(neg in text for neg in self.NEGATION_WORDS) negation_count = sum(1 for neg in self.NEGATION_WORDS if neg in text) # 统计情绪词 positive_count = sum(1 for word in positive_words if word in text_lower) negative_count = sum(1 for word in negative_words if word in text_lower) threat_count = sum(1 for word in threat_words if word in text_lower) # 检测程度词 intensity_modifier = 1.0 detected_modifier = None for modifier, value in self.INTENSITY_MODIFIERS.items(): if modifier in text: intensity_modifier = value detected_modifier = modifier break # 计算基础情绪 base_positive = positive_count base_negative = negative_count # 如果包含威胁性词汇,强制为负面情绪,不进行否定反转 if threat_count > 0: base_negative += threat_count # 威胁词额外增加负面权重 emotion = "negative" base_intensity = min(0.7 + threat_count * 0.1, 1.0) else: # 处理否定(如"不是不满意"→positive) if has_negation: # 双重否定检测 if negation_count >= 2: # 双重否定加强原情绪 pass elif negation_count == 1: # 单重否定反转情绪 base_positive, base_negative = base_negative, base_positive # 确定情绪类型 if base_negative > base_positive: emotion = "negative" base_intensity = min(0.5 + base_negative * 0.15, 1.0) elif base_positive > base_negative: emotion = "positive" base_intensity = min(0.5 + base_positive * 0.15, 1.0) else: emotion = "neutral" base_intensity = 0.3 # 应用程度词 intensity = min(1.0, base_intensity * intensity_modifier) if intensity < 0.3: intensity = 0.3 # 标点符号增强 if "!" in text or "!" in text: intensity = min(1.0, intensity + 0.15) if "!!" in text or "!!" in text: intensity = min(1.0, intensity + 0.25) return { "emotion": emotion, "intensity": round(intensity, 2), "has_negation": has_negation, "detected_modifier": detected_modifier, "positive_indicators": positive_count, "negative_indicators": negative_count } def _assess_risk(self, text: str) -> Dict: """ 评估风险 Returns: { "risk_level": "low/medium/high", "risk_factors": [...] } """ risk_factors = [] # 检测情绪风险 emotion = self._analyze_emotion(text) if emotion["emotion"] == "negative" and emotion["intensity"] > 0.7: risk_factors.append({ "type": "emotional_risk", "severity": "high", "description": "员工情绪激动,需要谨慎处理" }) # 检测合规风险 compliance_result = self.compliance_checker.check_turn(text) if compliance_result["violations"]: risk_factors.append({ "type": "compliance_risk", "severity": "medium", "description": "可能涉及违规内容", "violations": compliance_result["violations"] }) # 检测紧急程度 urgent_keywords = ["紧急", "急", "马上", "立即"] if any(kw in text for kw in urgent_keywords): risk_factors.append({ "type": "urgency", "severity": "medium", "description": "员工表示情况紧急" }) # 确定风险等级 if not risk_factors: risk_level = "low" elif any(rf["severity"] == "high" for rf in risk_factors): risk_level = "high" else: risk_level = "medium" return { "risk_level": risk_level, "risk_factors": risk_factors, "recommended_action": self._get_risk_action(risk_level) } def _get_risk_action(self, risk_level: str) -> str: """获取风险应对建议""" actions = { "low": "正常处理", "medium": "需要关注,保持谨慎", "high": "高风险,建议升级处理或寻求主管支持" } return actions.get(risk_level, "正常处理") def _determine_conversation_stage( self, extracted_info: Dict, missing_info: List, scenario: Dict ) -> Dict: """ 确定对话阶段 Returns: { "stage": "initial/in_progress/complete", "current_step": 2, "total_steps": 5, "next_action": "询问培训人数" } """ workflow = scenario.get("workflow", []) required_info = scenario.get("required_info", []) # 计算完成度 if not required_info: completion_rate = 1.0 else: completion_rate = len(extracted_info.get("extracted_fields", [])) / len(required_info) # 确定阶段 if completion_rate == 0: stage = "initial" current_step = 0 elif completion_rate < 1.0: stage = "in_progress" current_step = int(completion_rate * len(workflow)) else: stage = "complete" current_step = len(workflow) # 确定下一步行动 next_action = None if stage != "complete" and missing_info: # 根据缺失信息确定下一步 next_action = self._get_question_for_info(missing_info[0]) return { "stage": stage, "current_step": current_step, "total_steps": len(workflow), "completion_rate": completion_rate, "next_action": next_action } def _get_question_for_info(self, info_type: str) -> str: """获取询问特定信息的标准问题""" questions = { "training_type": "请问您想申请什么类型的培训?", "participant_count": "请问有多少人参加培训?", "budget": "请问培训预算大约是多少?", "duration": "请问培训计划进行多长时间?", "leave_type": "请问您想请什么类型的假期?", "start_date": "请问您打算从哪天开始请假?", "end_date": "请问您计划哪天回来上班?", "reason": "请问请假的原因是什么?", "issue_description": "请问能详细描述一下遇到的问题吗?", "last_working_day": "请问您计划的最后工作日是哪天?" } return questions.get(info_type, "请问能提供更多相关信息吗?") def _generate_reply_instruction( self, scenario: Dict, intent: Dict, extracted_info: Dict, missing_info: List, emotion: Dict, risk_assessment: Dict, conversation_history: Optional[List[Dict]] = None, user_question: str = "" ) -> Dict: """ 生成回复指令 这是核心功能:告诉HR Agent应该如何回复 """ # 基础回复策略 base_strategy = self._determine_base_strategy( intent, emotion, risk_assessment ) # 对话阶段策略 stage_strategy = self._determine_stage_strategy( scenario, missing_info ) # 具体回复指令 instruction = { # 策略类型 "strategy_type": base_strategy["type"], # 语气要求 "tone_requirement": self._get_tone_requirement(emotion, risk_assessment), # 必须包含的内容 "must_include": self._get_must_include( scenario, extracted_info, missing_info ), # 不能说的话 "must_avoid": self._get_must_avoid(), # 建议回复模板 "suggested_templates": self._generate_reply_templates( base_strategy, stage_strategy, scenario, missing_info, user_question ), # 后续行动 "next_steps": self._plan_next_steps( scenario, missing_info, risk_assessment ), # 特殊注意事项 "special_notes": self._get_special_notes( risk_assessment, scenario ) } return instruction def _determine_base_strategy( self, intent: Dict, emotion: Dict, risk_assessment: Dict ) -> Dict: """确定基础回复策略""" primary_intent = intent["primary_intent"] risk_level = risk_assessment["risk_level"] if risk_level == "high": return { "type": "empathetic escalation", "priority": "high", "description": "高风险场景,需要展现同理心并考虑升级处理" } if emotion["emotion"] == "negative": return { "type": "empathetic resolution", "priority": "medium-high", "description": "员工情绪消极,优先安抚情绪再解决问题" } if primary_intent == "complain": return { "type": "acknowledgment and investigation", "priority": "high", "description": "投诉类问题,需要确认理解并调查" } if primary_intent == "apply": return { "type": "information collection", "priority": "normal", "description": "申请类问题,需要收集必要信息" } return { "type": "standard assistance", "priority": "normal", "description": "标准咨询流程" } def _determine_stage_strategy( self, scenario: Dict, missing_info: List ) -> Dict: """确定阶段策略""" if not missing_info: return { "phase": "completion", "action": "provide_summary_and_next_steps", "description": "信息收集完成,可以给出总结和后续步骤" } return { "phase": "information_gathering", "action": "ask_next_question", "description": f"需要收集缺失信息: {', '.join(missing_info)}", "next_question_topic": missing_info[0] } def _get_tone_requirement( self, emotion: Dict, risk_assessment: Dict ) -> Dict: """获取语气要求""" risk_level = risk_assessment["risk_level"] user_emotion = emotion["emotion"] if risk_level == "high" or user_emotion == "negative": return { "style": "empathetic professional", "keywords": ["理解", "抱歉", "帮助解决"], "avoid": ["质疑", "推诿", "不耐烦"] } return { "style": "friendly professional", "keywords": ["乐意", "协助", "为您"], "avoid": ["粗鲁", "敷衍"] } def _get_must_include( self, scenario: Dict, extracted_info: Dict, missing_info: List ) -> List[str]: """获取必须包含的内容""" must_include = [] # 根据场景添加必要内容 scenario_id = scenario.get("scenario_id", "") if scenario_id == "training_application": if not missing_info: must_include.append("确认培训申请已记录") must_include.append("说明后续流程") elif scenario_id == "leave_application": must_include.append("确认请假类型和时间") elif scenario_id == "complaint": must_include.append("表达歉意") must_include.append("承诺处理时限") return must_include def _get_must_avoid(self) -> List[str]: """获取不能说的话""" return [ "歧视性语言(年龄、性别等)", "承诺无法兑现的事项", "泄露他人隐私信息", "与公司政策冲突的表述" ] def _generate_reply_templates( self, base_strategy: Dict, stage_strategy: Dict, scenario: Dict, missing_info: List, user_question: str = "" ) -> List[str]: """生成回复模板""" templates = [] scenario_name = scenario.get("scenario_name", "") scenario_id = scenario.get("scenario_id", "") # 咨询类场景:从知识库检索答案(优先级最高) inquiry_scenarios = ["resignation_inquiry", "policy_inquiry", "benefits", "promotion", "salary_inquiry"] if scenario_id in inquiry_scenarios and user_question: # 从知识库检索答案 kb_answer = self._retrieve_from_knowledge_base(user_question) if kb_answer: templates.append(kb_answer) return templates if stage_strategy["phase"] == "information_gathering": next_question = self._get_question_for_info(missing_info[0]) if base_strategy["type"] == "empathetic escalation": templates.append( f"我理解您的需求。关于{scenario_name},{next_question}" ) elif base_strategy["type"] == "empathetic resolution": templates.append( f"非常抱歉给您带来困扰。我会尽力帮助您解决{scenario_name}的问题。{next_question}" ) else: templates.append(f"好的,{next_question}") templates.append(f"收到,{next_question}") else: # completion phase templates.append(f"好的,您的{scenario_name}已记录,我们会尽快处理。") templates.append(f"感谢您提供的信息,{scenario_name}流程已启动。") return templates def _plan_next_steps( self, scenario: Dict, missing_info: List, risk_assessment: Dict ) -> List[str]: """规划后续步骤""" next_steps = [] if risk_assessment["risk_level"] == "high": next_steps.append("评估是否需要升级处理") next_steps.append("考虑通知主管") if missing_info: next_steps.append("继续收集缺失信息") if not missing_info: next_steps.append("确认信息完整性") next_steps.append("执行相应的业务流程") return next_steps def _get_special_notes( self, risk_assessment: Dict, scenario: Dict ) -> List[str]: """获取特殊注意事项""" notes = [] # 添加场景政策说明 policy = scenario.get("policy_notes", "") if policy: notes.append(f"政策说明: {policy}") # 添加风险说明 if risk_assessment["risk_level"] != "low": notes.append(f"风险提示: {risk_assessment['recommended_action']}") return notes def _get_scenario_name(self, scenario_id: str) -> str: """获取场景名称""" return self.HR_SCENARIOS.get(scenario_id, {}).get("name", scenario_id) def _generate_reply_instruction_from_context( self, next_action: Dict, emotion: Dict, risk_assessment: Dict ) -> Dict: """基于上下文生成回复指令""" action = next_action.get("action", "continue") if action == "confirm_complete": # 信息收集完成 return { "strategy_type": "completion", "suggested_templates": [next_action.get("suggested_response", "好的,您的信息已确认。")], "tone_requirement": { "style": "friendly professional", "keywords": ["确认", "完成"], "avoid": ["催促"] }, "must_include": [], "must_avoid": self._get_must_avoid(), "next_steps": ["提交处理", "生成确认单"] } elif action == "ask_next_question": # 继续询问下一个信息 question = next_action.get("suggested_question", "") return { "strategy_type": "information_collection", "suggested_templates": [question], "tone_requirement": { "style": "friendly professional", "keywords": ["请问", "询问"], "avoid": ["催促", "质疑"] }, "must_include": [], "must_avoid": self._get_must_avoid(), "next_steps": next_action.get("missing_fields", []) } # 默认策略 return self._determine_base_strategy( {"primary_intent": "continue", "confidence": 0.8}, emotion, risk_assessment ) def _retrieve_from_knowledge_base(self, question: str) -> Optional[str]: """从知识库检索答案""" try: # 使用correctness_evaluator的知识库检索功能 from models.correctness import CorrectnessEvaluator if not hasattr(self, '_kb_evaluator'): self._kb_evaluator = CorrectnessEvaluator() # 只使用knowledge_based部分的Q&A kb_qa_only = [qa for qa in self._kb_evaluator.knowledge_base if qa.get('type') == '知识型'] if not kb_qa_only: return None # 关键词预过滤:提取问题中的关键词 question_keywords = self._extract_keywords(question) # 过滤出包含相关关键词的Q&A if question_keywords: filtered_qa = [] for qa in kb_qa_only: qa_text = qa.get('question', '') + ' ' + qa.get('standard_answer', '') # 如果包含任一关键词,保留 if any(kw in qa_text for kw in question_keywords): filtered_qa.append(qa) # 如果过滤后有结果,使用过滤后的结果 if filtered_qa: kb_qa_only = filtered_qa # 如果没有匹配的,使用全部知识型Q&A if not kb_qa_only: return None # 计算相似度 query_embedding = self._kb_evaluator.model.encode([question]) kb_questions = [qa['question'] for qa in kb_qa_only] kb_embeddings = self._kb_evaluator.model.encode(kb_questions) from sklearn.metrics.pairwise import cosine_similarity similarities = cosine_similarity(query_embedding, kb_embeddings)[0] # 找到最匹配的 best_idx = int(similarities.argmax()) best_similarity = similarities[best_idx] # 如果相似度足够高,返回答案 if best_similarity > 0.6: best_qa = kb_qa_only[best_idx] answer = best_qa.get("standard_answer", "") source = best_qa.get("source", "") if answer: return f"{answer}(来源:{source})" return None except Exception as e: print(f"知识库检索失败: {e}") import traceback traceback.print_exc() return None def _generate_annual_leave_response(self, year_answer: str, hr_question: str) -> Dict: """ 生成年假天数的回复(特殊场景) Args: year_answer: 用户的回答(如"2020年") hr_question: HR之前的问题 Returns: 完整的分析报告和回复指令 """ import re from datetime import datetime # 提取年份 year_match = re.search(r'(19|20)\d{2}', year_answer) if year_match: join_year = int(year_match.group()) current_year = datetime.now().year # 动态获取当前年份 years_of_service = current_year - join_year # 根据司龄计算年假天数(通用规则,可根据公司政策调整) if years_of_service >= 20: annual_days = 15 elif years_of_service >= 10: annual_days = 10 elif years_of_service >= 5: annual_days = 7 elif years_of_service >= 1: annual_days = 5 else: annual_days = 5 # 生成回复 answer = f"感谢您提供的信息!根据您{join_year}年入职公司,截至{current_year}年,您的司龄为{years_of_service}年。根据公司年假政策,您今年可享受的年假天数为{annual_days}天。" # 返回完整的分析报告 # 获取完整的场景定义 scenario_info = self.HR_SCENARIOS.get("leave_application", self.HR_SCENARIOS["general_inquiry"]) return { "analysis_report": { "intent": { "primary_intent": "supply_info", "confidence": 0.95, "intent_scores": {"supply_info": 1, "apply": 0, "inquire": 0, "complain": 0} }, "scenario": { "scenario_id": "leave_application", "scenario_name": scenario_info["name"], "description": scenario_info["description"], "confidence": 0.9, "required_info": scenario_info["required_info"], "workflow": scenario_info["workflow"], "policy_notes": scenario_info["policy_notes"] }, "extracted_info": { "extracted_data": {"join_year": join_year, "years_of_service": years_of_service}, "extracted_fields": ["join_year", "years_of_service"], "extraction_confidence": 0.95 }, "missing_info": [], # 信息已完整 "conversation_stage": { "stage": "complete", "current_step": 2, "total_steps": 2, "completion_rate": 100.0, "next_action": None }, "emotion": { "emotion": "neutral", "intensity": 0.3, "has_negation": False, "detected_modifier": None, "positive_indicators": 0, "negative_indicators": 0 }, "risk_assessment": {"risk_level": "low", "risk_factors": [], "recommended_action": "正常处理"}, "is_followup": True }, "reply_instruction": { "strategy_type": "completion", "suggested_templates": [answer], "tone_requirement": { "style": "friendly professional", "keywords": ["感谢", "年假天数"], "avoid": [] }, "must_include": [], "must_avoid": [], "next_steps": ["确认年假天数", "说明请假流程"], "special_notes": [] }, "context_update": { "completion_rate": 100.0, "collected_info": {"join_year": join_year, "years_of_service": years_of_service}, "missing_info": [] } } # 如果没有匹配到年份格式,返回默认处理(提示用户确认年份) # 获取完整的场景定义 scenario_info = self.HR_SCENARIOS.get("leave_application", self.HR_SCENARIOS["general_inquiry"]) return { "analysis_report": { "intent": { "primary_intent": "supply_info", "confidence": 0.3, "intent_scores": {"supply_info": 0, "apply": 0, "inquire": 0, "complain": 0} }, "scenario": { "scenario_id": "leave_application", "scenario_name": scenario_info["name"], "description": scenario_info["description"], "confidence": 0.5, "required_info": scenario_info["required_info"], "workflow": scenario_info["workflow"], "policy_notes": scenario_info["policy_notes"] }, "extracted_info": { "extracted_data": {}, "extracted_fields": [], "extraction_confidence": 0.3 }, "missing_info": {"missing_fields": ["join_year"], "priority": ["join_year"]}, "conversation_stage": { "stage": "in_progress", "current_step": 1, "total_steps": 2, "completion_rate": 0.0, "next_action": "请问您是哪一年加入公司的呢?" }, "emotion": { "emotion": "neutral", "intensity": 0.3, "has_negation": False, "detected_modifier": None, "positive_indicators": 0, "negative_indicators": 0 }, "risk_assessment": {"risk_level": "low", "risk_factors": [], "recommended_action": "正常处理"}, "is_followup": True }, "reply_instruction": { "strategy_type": "inquire", "suggested_templates": ["抱歉,我没有识别到您说的年份。请问您是哪一年加入公司的呢?请提供具体的年份,比如2020年。"], "tone_requirement": { "style": "friendly professional", "keywords": ["抱歉", "年份"], "avoid": [] }, "must_include": [], "must_avoid": [], "next_steps": ["确认入职年份"], "special_notes": [] }, "context_update": { "completion_rate": 0.0, "collected_info": {}, "missing_info": ["join_year"] } } def _infer_scenario_from_response(self, hr_response: str) -> str: """ 从HR回复内容推断场景ID Args: hr_response: HR的回复内容 Returns: 场景ID """ # 根据回复中的关键词推断场景(使用HR_SCENARIOS中存在的ID) if "年假" in hr_response or "休假" in hr_response or "请假" in hr_response: return "leave_application" elif "培训" in hr_response: return "training_application" elif "薪资" in hr_response or "工资" in hr_response or "薪水" in hr_response: return "salary_inquiry" elif "报销" in hr_response or "费用" in hr_response: return "reimbursement" elif "离职" in hr_response or "辞职" in hr_response: return "resignation_inquiry" elif "合同" in hr_response: return "contract_renewal" elif "社保" in hr_response or "公积金" in hr_response: return "benefits" elif "绩效" in hr_response or "考核" in hr_response: return "performance_review" return "general_inquiry" def _restore_info_state_from_response(self, hr_response: str, scenario_def: Dict): """ 从HR回复中推断并恢复信息收集状态 Args: hr_response: HR的回复内容 scenario_def: 场景定义 """ required_info = scenario_def.get("required_info", []) # 检查HR询问了哪些问题,这些就是缺失信息 missing = [] # 常见问题的关键词映射 field_keywords = { "issue_details": ["什么事", "具体情况", "详情", "描述"], "affected_period": ["什么时候", "时间", "期间", "月份"], "training_type": ["什么培训", "哪种", "培训内容"], "participant_count": ["多少人", "人数", "几个人"], "budget": ["预算", "多少钱", "费用"], "duration": ["多久", "多长时间", "几天"], "start_date": ["什么时候开始", "开始时间", "哪天"], "location": ["在哪里", "地点", "哪里"], "target_position": ["什么岗位", "哪个部门", "转岗"], "reason": ["为什么", "原因", "什么原因"], "join_year": ["哪一年", "哪年", "哪年加入", "哪一年入职"] } # 检查HR回复中包含哪些问题的关键词 for field, keywords in field_keywords.items(): if field in required_info and any(kw in hr_response for kw in keywords): missing.append(field) # 更新缺失信息列表 self.current_context.missing_info = missing self.current_context.conversation_stage = "in_progress" if missing else "complete" self.current_context.current_step = 1 # 确保有 total_steps if self.current_context.total_steps == 0: self.current_context.total_steps = len(scenario_def.get("workflow", [])) def _extract_keywords(self, question: str) -> List[str]: """提取问题中的关键词""" keywords = [] # 离职相关 if any(w in question for w in ['离职', '辞职', '补偿', '赔偿', '辞退']): keywords.append('离职') keywords.append('补偿') keywords.append('辞职') # 加班相关 if any(w in question for w in ['加班', '加班费', 'OT']): keywords.append('加班') # 请假相关 if any(w in question for w in ['请假', '年假', '事假', '病假']): keywords.append('请假') # 薪资相关 if any(w in question for w in ['工资', '薪资', '薪水', '奖金']): keywords.append('工资') # 社保相关 if any(w in question for w in ['社保', '公积金', '保险']): keywords.append('社保') return list(set(keywords)) # 单例 _analyzer_instance = None def get_analyzer() -> IntelligenceAnalyzer: """获取分析器单例""" global _analyzer_instance if _analyzer_instance is None: print("正在初始化智能分析器...") _analyzer_instance = IntelligenceAnalyzer() print("✓ 智能分析器初始化完成") return _analyzer_instance