freud-zero-mvp / counselor.py
Feng Chike
Freud Zero MVP: 心理咨询AI系统(清洁部署)
408f650
import os
import time
import threading
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_openai import ChatOpenAI
from prompts import COUNSELOR_SYSTEM_PROMPT, STRATEGIC_GUIDANCE_TEMPLATE
from evaluator import DisclosureEvaluator
from strategic_advisor import StrategicAdvisor
from session_logger import SessionLogger
from strategy_visualizer import StrategyVisualizer
class PsychodynamicCounselor:
def __init__(self):
self.llm = ChatOpenAI(
model="qwen-plus",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key=os.getenv("DASHSCOPE_API_KEY"),
temperature=0.7,
)
self.evaluator = DisclosureEvaluator()
self.advisor = StrategicAdvisor()
self.logger = SessionLogger()
self.visualizer = StrategyVisualizer()
self.history = [SystemMessage(content=COUNSELOR_SYSTEM_PROMPT)]
self.turn_number = 0
self.current_guidance = None
self._last_disclosure_score = 1 # 跟踪当前揭露水平,供相对评分用
self._last_dimensions = {} # 最近一次揭露维度 A-E
self._last_reasoning = "" # 最近一次评估理由
self._disclosure_history = [] # 揭露分数历史轨迹
self._last_trace_stats = None # 最近一次推理统计
self._pending_trace = None # 后台完成的战略推理结果
self._bg_thread = None
self._lock = threading.Lock()
def _inject_guidance(self):
"""后台线程调用:current_guidance 已更新,无需修改 history。
督导指令会在 respond() 调用模型时动态插入。"""
pass
def _run_strategic_reasoning(self, history_snapshot, current_disclosure):
"""后台线程:执行4层战略推理,完成后更新指导。"""
print(f"[战略推理] 第{self.turn_number}轮触发,当前揭露={current_disclosure},开始后台推理...")
try:
best, guidance, strategic_trace = self.advisor.run(history_snapshot, current_disclosure)
with self._lock:
if best and best.get("score", 0) > 0:
self.current_guidance = {
"direction": guidance.get("direction", best["seed"]),
"principles": guidance.get("principles", []),
"evidence": guidance.get("evidence", ""),
}
print(f"[战略推理] 完成! 选中: {best['id']}.{best['branch']} score={best['score']} delta={best['delta']}")
print(f" 方向: {guidance.get('direction', '?')}")
for p in guidance.get("principles", []):
print(f" 原则: {p}")
else:
print("[战略推理] 完成,但未产生有效方向建议")
self._pending_trace = strategic_trace
# 生成可视化报告
self.visualizer.render(strategic_trace, self.turn_number)
except Exception as e:
print(f"[战略推理] 后台推理失败,跳过本轮: {e}")
def respond(self, user_message):
self.turn_number += 1
self.history.append(HumanMessage(content=user_message))
# 检查是否有后台完成的战略推理结果需要记录
logged_trace = None
with self._lock:
if self._pending_trace is not None:
logged_trace = self._pending_trace
self._pending_trace = None
# 评估来访者当前发言的揭露深度
disclosure_result = self.evaluator.evaluate_disclosure(user_message)
self._last_disclosure_score = disclosure_result["score"]
self._last_dimensions = disclosure_result.get("dimensions", {})
self._last_reasoning = disclosure_result.get("reasoning", "")
self._disclosure_history.append(disclosure_result["score"])
# 同步战略推理:先推理,再用督导结果回复
print(f"[战略推理] 第{self.turn_number}轮,当前揭露={self._last_disclosure_score},同步推理中...")
try:
best, guidance, strategic_trace = self.advisor.run(
list(self.history), self._last_disclosure_score
)
if best and best.get("score", 0) > 0:
self.current_guidance = {
"direction": guidance.get("direction", best["seed"]),
"principles": guidance.get("principles", []),
"evidence": guidance.get("evidence", ""),
}
print(f"[战略推理] 完成! {best['id']}.{best['branch']} score={best['score']} delta={best['delta']}")
logged_trace = strategic_trace
self._last_trace_stats = {
"total_paths": len(strategic_trace.get("candidates", [])),
"deep_paths": len(strategic_trace.get("deep_paths", [])),
"seeds": list(strategic_trace.get("seeds", {}).keys()),
"selected": strategic_trace.get("selected", ""),
"timing": strategic_trace.get("timing", {}),
"best_score": best.get("score", 0) if best else 0,
"best_delta": best.get("delta", 0) if best else 0,
"predicted_disclosure": guidance.get("disclosure_level", best.get("score", "?")) if best else "?",
}
self.visualizer.render(strategic_trace, self.turn_number)
except Exception as e:
print(f"[战略推理] 推理失败,跳过: {e}")
logged_trace = None
# 前台模型生成回复:动态构建消息列表,督导指令插在最新用户消息之前
if self.current_guidance:
principles_text = "\n".join(f"- {p}" for p in self.current_guidance.get("principles", []))
guidance_text = STRATEGIC_GUIDANCE_TEMPLATE.replace(
"{direction}", self.current_guidance["direction"]
).replace("{principles}", principles_text
).replace("{evidence}", self.current_guidance.get("evidence", ""))
messages_to_send = self.history[:-1] + [SystemMessage(content=guidance_text)] + [self.history[-1]]
print(f"[前台] 第{self.turn_number}轮 | 督导方向已注入: {self.current_guidance['direction'][:40]}")
else:
messages_to_send = self.history
print(f"[前台] 第{self.turn_number}轮 | 无督导指令")
# 带重试的前台调用(防止并发限流 403)
for _retry in range(3):
try:
response = self.llm.invoke(messages_to_send)
break
except Exception as e:
if _retry < 2:
print(f"[前台] 调用失败({e}), 重试中...")
time.sleep(1)
else:
raise
self.history.append(AIMessage(content=response.content))
self.logger.log_turn(
self.turn_number,
user_message,
response.content,
disclosure_result["score"],
disclosure_result["dimensions"],
disclosure_result["reasoning"],
mcts_trace=logged_trace,
)
return response.content
def get_session_filepath(self):
return self.logger.get_filepath()