#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import sys import json import getpass from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any from dataclasses import dataclass, field import uuid # 导入自定义模块 from auth.user_manager import UnifiedUserManager from auth.session_handler import SessionHandler from agents.agent_registry import AgentRegistry from discussion.discussion_engine import ClinicalDiscussionEngine from storage.discussion_storage import DiscussionStorage from storage.user_data import UserDataManager from utils.config import ClinicalConfig from utils.logger import setup_logger from auth import User project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) class ClinicalCLI: """临床多智能体讨论系统命令行界面""" def __init__(self, config_path: str = "config.json"): # 初始化配置 self.config = ClinicalConfig(config_path) self.cli_interface = CLIInterface(self) # 初始化日志 self.logger = setup_logger("clinical_cli") # 初始化核心组件 try: self.user_manager = UnifiedUserManager() self.session_handler = SessionHandler() self.agent_registry = AgentRegistry() self.discussion_storage = DiscussionStorage() self.user_data_manager = UserDataManager() except Exception as e: self.logger.error(f"组件初始化失败: {e}") # 提供更详细的错误信息 if "api_base" in str(e): self.logger.error("请检查配置文件中模型API设置是否正确") raise # 当前会话状态 self.current_session = None self.current_user = None self.current_discussion = None # 讨论配置 self.discussion_config = { "rounds": self.config.discussion.default_rounds, "user_participation": False, # 默认不参与讨论 "auto_save": True, "export_format": "json" } def get_model_args(self): """获取模型参数 - 修复这个方法""" # 创建一个简单的参数对象 class Args: def __init__(self, config): self.model = config.model.engine self.llm_name = config.model.model_name self.url = config.model.api_base # 使用配置中的API地址 self.temp = config.model.temperature self.discussion_rounds = getattr(config.discussion, 'default_rounds', 3) return Args(self.config) def clear_screen(self): """清屏""" os.system('cls' if os.name == 'nt' else 'clear') def print_header(self, title: str): """打印标题""" self.clear_screen() print("=" * 60) print(f"临床MDT智能模拟助手 - {title}") print("=" * 60) print() def wait_for_enter(self, message: str = "按回车键继续..."): """等待用户按回车""" input(f"\n{message}") def authenticate_user(self) -> bool: """用户认证流程""" while True: self.print_header("用户认证") print("1. 用户登录") print("2. 用户注册") print("3. 退出系统") print() choice = self.cli_interface.get_user_input("请选择操作: ", required=False) if choice == "1": return self.user_login() elif choice == "2": self.user_register() elif choice == "3" or choice.lower() == "exit": print("感谢使用临床多智能体讨论系统!") sys.exit(0) else: print("无效选择,请重新输入。") self.wait_for_enter() def user_login(self) -> bool: """用户登录""" self.print_header("用户登录") username = self.cli_interface.get_user_input("用户名: ") password = self.cli_interface.get_user_input("密码: ", password=False) try: user_info = self.user_manager.authenticate(username, password) if user_info: # user_info 是 User 对象,不是字典 self.current_user = { 'user_id': user_info.user_id, 'username': user_info.username, 'full_name': getattr(user_info, 'full_name', ''), 'department': getattr(user_info, 'department', ''), 'role': getattr(user_info, 'role', 'user') } self.current_session = self.session_handler.create_session(user_info.user_id) print(f"\n登录成功!欢迎回来,{user_info.username}!") self.logger.info(f"用户 {username} 登录成功") self.wait_for_enter() return True else: print("\n登录失败:用户名或密码错误。") self.wait_for_enter() return False except Exception as e: print(f"\n登录失败:{e}") self.wait_for_enter() return False def user_register(self): """用户注册""" self.print_header("用户注册") username = self.cli_interface.get_user_input("请输入用户名: ") # 检查用户名是否已存在 if self.user_manager.user_exists(username): print("该用户名已存在,请选择其他用户名。") self.wait_for_enter() return password = self.cli_interface.get_user_input("请输入密码: ", password=False) confirm_password = self.cli_interface.get_user_input("请确认密码: ", password=False) if password != confirm_password: print("密码不一致,请重新输入。") self.wait_for_enter() return # 可选信息 full_name = self.cli_interface.get_user_input("请输入真实姓名(可选): ", required=False) department = self.cli_interface.get_user_input("请输入所在科室(可选): ", required=False) try: # 直接使用UnifiedUserManager创建用户 success, result = self.user_manager.create_user( username=username, password=password, full_name=full_name, department=department ) if success: print(f"\n注册成功!欢迎使用临床多智能体讨论系统,{username}!") self.logger.info(f"新用户注册: {username}") else: print(f"\n注册失败:{result}") self.wait_for_enter() except Exception as e: print(f"\n注册失败:{e}") self.wait_for_enter() def show_main_menu(self): """显示主菜单""" while True: self.print_header("主菜单") print(f"当前用户: {self.current_user['username']}") if self.current_session: print(f"会话ID: {self.current_session[:8]}...") print() print("1. 开始新的讨论") print("2. 查看历史讨论") print("3. 管理智能体") print("4. 系统设置") print("5. 用户信息") print("6. 退出系统") print() choice = self.cli_interface.get_user_input("请选择操作: ", required=False) if choice == "1": self.start_new_discussion() elif choice == "2": self.view_discussion_history() elif choice == "3": self.manage_agents() elif choice == "4": self.system_settings() elif choice == "5": self.user_information() elif choice == "5": self.user_information() elif choice == "6": if self.cli_interface.confirm_action("确定要退出系统吗?"): print("感谢使用临床多智能体讨论系统!") break else: print("无效选择,请重新输入。") self.wait_for_enter() def start_new_discussion(self): """开始新的讨论""" self.print_header("开始新的讨论") # 步骤1: 选择智能体 available_agents = self.agent_registry.get_available_agents(self.current_session) agent_names = list(available_agents.keys()) if not agent_names: print("当前没有可用的智能体,请先添加智能体。") self.wait_for_enter() return print("步骤1: 选择参与讨论的智能体") print("提示:可以输入多个编号,用空格或逗号分隔,如:1 3 5 或 1,3,5") selected_agents = self.cli_interface.select_from_list( agent_names, "请选择要参与讨论的智能体(可多选):", allow_multiple=True ) if not selected_agents: print("未选择任何智能体,取消讨论。") self.wait_for_enter() return print(f"已选择 {len(selected_agents)} 个智能体: {', '.join(selected_agents)}") # 步骤2: 输入病历信息 self.print_header("输入病历信息") print("步骤2: 输入病历信息") print("请输入患者的病历信息,包括主诉、现病史、既往史、体格检查、辅助检查等。") print("(输入完成后请双击回车结束输入)") print() medical_record = self.cli_interface.get_multiline_input("病历信息:") if not medical_record.strip(): print("病历信息不能为空。") self.wait_for_enter() return # 步骤3: 输入讨论问题 self.print_header("输入讨论问题") print("步骤3: 输入讨论问题") question = self.cli_interface.get_user_input("请输入需要讨论的临床问题: ") if not question.strip(): print("讨论问题不能为空。") self.wait_for_enter() return # 步骤4: 配置讨论参数 self.configure_discussion_settings() # 步骤5: 确认并开始讨论 self.print_header("确认讨论信息") print("请确认以下讨论信息:") print(f"参与智能体: {', '.join(selected_agents)}") print(f"讨论问题: {question}") print(f"讨论轮数: {self.discussion_config['rounds']}") print(f"用户参与: {'是' if self.discussion_config['user_participation'] else '否'}") print() if not self.cli_interface.confirm_action("确认开始讨论吗?"): print("讨论已取消。") self.wait_for_enter() return # 开始讨论 try: self.run_discussion(selected_agents, medical_record, question) except Exception as e: print(f"讨论过程中出现错误: {e}") self.logger.error(f"讨论错误: {e}") self.wait_for_enter() def configure_discussion_settings(self): """配置讨论设置""" self.print_header("讨论设置") print("当前设置:") print(f"1. 讨论轮数: {self.discussion_config['rounds']}") print(f"2. 用户参与讨论: {'是' if self.discussion_config['user_participation'] else '否'}") print(f"3. 自动保存: {'是' if self.discussion_config['auto_save'] else '否'}") print(f"4. 导出格式: {self.discussion_config['export_format']}") print() change_settings = self.cli_interface.confirm_action("是否修改讨论设置?") if change_settings: # 修改讨论轮数 try: rounds_input = self.cli_interface.get_user_input( f"讨论轮数 (当前: {self.discussion_config['rounds']}, 范围: 1-5): ", required=False ) if rounds_input: new_rounds = int(rounds_input) if 1 <= new_rounds <= 5: self.discussion_config['rounds'] = new_rounds print(f"讨论轮数已设置为: {new_rounds}") except ValueError: print("讨论轮数必须在1-5之间,将使用默认值。") # 用户参与设置 - 明确提示 print("\n用户参与设置:") print("如果选择'是',讨论过程中会提示您介入;") print("如果选择'否',系统将自动完成整个讨论过程。") participation = self.cli_interface.confirm_action("是否允许用户参与讨论?") self.discussion_config['user_participation'] = participation print(f"用户参与已设置为: {'是' if participation else '否'}") # 自动保存 auto_save = self.cli_interface.confirm_action("是否自动保存讨论记录?") self.discussion_config['auto_save'] = auto_save # 导出格式 formats = ["json", "docx", "txt"] print("可选导出格式: " + ", ".join(formats)) export_format = self.cli_interface.get_user_input(f"导出格式 ({self.discussion_config['export_format']}): ", required=False) if export_format and export_format.lower() in formats: self.discussion_config['export_format'] = export_format.lower() def show_discussion_result(self, discussion_result: Dict): """显示讨论结果 - 修复未定义字段问题""" print("\n" + "=" * 80) print("临床多智能体讨论汇总报告") print("=" * 80) # === 修复:安全地获取字段,提供默认值 === # 检查结果状态 status = discussion_result.get("status", "completed") if status == "interrupted": print("讨论被用户中断") metadata = discussion_result.get("metadata", {}) rounds_completed = metadata.get("rounds_completed", 0) print(f"已完成轮次: {rounds_completed}") return elif status == "error": error_msg = discussion_result.get("error", "未知错误") print(f"讨论过程中出现错误: {error_msg}") return # === 修复:安全获取metadata === metadata = discussion_result.get('metadata', {}) print(f"讨论ID: {metadata.get('discussion_id', '未知')}") print(f"参与智能体: {', '.join(metadata.get('agents_used', []))}") print(f"讨论轮数: {metadata.get('rounds', 0)}") print(f"生成时间: {metadata.get('timestamp', '未知')}") print("-" * 80) # === 修复:安全获取clinical_summary === clinical_summary = discussion_result.get('clinical_summary', {}) if clinical_summary: print("\n临床总结:") print("-" * 40) # 主要诊断 - 支持多种可能的字段名 primary_diagnosis = clinical_summary.get('primary_diagnosis', clinical_summary.get('diagnosis', clinical_summary.get('final_decision', '未知'))) print(f"主要诊断: {primary_diagnosis}") # 治疗方案 - 支持多种数据结构 treatment_plan = clinical_summary.get('treatment_plan', {}) if treatment_plan: print("\n治疗方案:") if isinstance(treatment_plan, dict): for category, plans in treatment_plan.items(): if plans: print(f" {category}:") if isinstance(plans, list): for plan in plans: print(f" • {plan}") else: print(f" • {plans}") elif isinstance(treatment_plan, list): for plan in treatment_plan: print(f" • {plan}") elif isinstance(treatment_plan, str): print(f" {treatment_plan}") # === 修复:安全获取质量评估 === quality_assessment = discussion_result.get('evaluation_metrics', {}) if quality_assessment: print("\n质量评估:") for metric, score in quality_assessment.items(): if isinstance(score, (int, float)): print(f" {metric}: {score}/100") # === 修复:安全获取讨论统计 === discussion_process = discussion_result.get('discussion_process', {}) discussion_log = discussion_process.get('discussion_log', []) if discussion_log: total_contributions = 0 for round_data in discussion_log: contributions = round_data.get('contributions', []) total_contributions += len(contributions) print(f"\n讨论统计: 共{len(discussion_log)}轮,{total_contributions}次发言") print("\n" + "=" * 80) def run_discussion(self, agent_names: List[str], medical_record: str, question: str): """运行讨论 - 修复导出方法调用""" self.print_header("讨论进行中") print("正在初始化讨论环境...") # 创建讨论引擎 discussion_engine = ClinicalDiscussionEngine( args=self.get_model_args(), user_session={ 'session_id': self.current_session, 'user_id': self.current_user['user_id'] }, interface=self.cli_interface ) if hasattr(discussion_engine, 'set_discussion_config'): discussion_engine.set_discussion_config(self.discussion_config) discussion_engine.discussion_config = self.discussion_config discussion_engine.max_rounds = self.discussion_config['rounds'] discussion_engine.initialize_discussion(medical_record, question, agent_names) print("讨论开始...") print(f"病历信息: {medical_record}") print(f"讨论问题: {question}") print(f"参与智能体: {', '.join(agent_names)}") print(f"计划讨论轮数: {self.discussion_config['rounds']}") print("-" * 60) # 执行讨论 self.logger.info(f"开始讨论,参与智能体: {agent_names}") discussion_result = discussion_engine.start_discussion() # 调试:打印结果结构 self.logger.info(f"讨论结果类型: {type(discussion_result)}") if isinstance(discussion_result, dict): self.logger.info(f"讨论结果键: {list(discussion_result.keys())}") # 保存讨论结果 if self.discussion_config['auto_save'] and isinstance(discussion_result, dict): try: # 构建完整的数据结构 save_data = { "agents": agent_names, "rounds": self.discussion_config['rounds'], "medical_record": medical_record, "question": question, "log": discussion_result.get("discussion_process", {}).get("discussion_log", []), "summary": discussion_result.get("clinical_summary", {}), "interventions": discussion_result.get("discussion_process", {}).get("user_interventions", []), "metrics": discussion_result.get("evaluation_metrics", {}), "metadata": discussion_result.get("metadata", { "discussion_id": str(uuid.uuid4())[:8], "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), "agents_used": agent_names, "rounds": self.discussion_config['rounds'] }), "medical_context": discussion_result.get("medical_context", { "medical_record": medical_record, "question": question }), "discussion_process": discussion_result.get("discussion_process", { "discussion_log": discussion_result.get("discussion_process", {}).get("discussion_log", []), "user_interventions": discussion_result.get("discussion_process", {}).get("user_interventions", []) }) } discussion_id = self.discussion_storage.save_discussion( self.current_user['user_id'], save_data ) discussion_result['discussion_id'] = discussion_id self.logger.info(f"讨论记录已保存,ID: {discussion_id}") except Exception as e: self.logger.error(f"保存讨论记录失败: {e}") discussion_result['save_error'] = str(e) # 显示讨论结果 self.show_discussion_result(discussion_result) # === 修复:调用正确的导出方法 === self.handle_discussion_export(discussion_result) self.current_discussion = discussion_result self.wait_for_enter("讨论结束,按回车键返回主菜单...") def handle_discussion_export(self, discussion_result: Dict): """处理讨论导出 - 新增方法,替代export_discussion_result""" if not self.cli_interface.confirm_action("是否导出讨论结果?"): return # 使用现有的导出逻辑 export_formats = [ {"name": "JSON格式", "value": "json"}, {"name": "Word文档", "value": "docx"}, {"name": "HTML标准版", "value": "html"}, {"name": "HTML简洁版", "value": "simple_html"}, {"name": "文本文件", "value": "txt"} ] format_names = [fmt["name"] for fmt in export_formats] selected_formats = self.cli_interface.select_from_list( format_names, "请选择导出格式(可多选):", allow_multiple=True ) if not selected_formats: print("取消导出。") return # 获取导出路径 default_filename = f"clinical_discussion_{datetime.now().strftime('%Y%m%d_%H%M%S')}" export_base_path = self.cli_interface.get_user_input( f"导出路径(默认: ./exports/{default_filename}): ", required=False ) if not export_base_path: export_base_path = f"./exports/{default_filename}" # 确保导出目录存在 os.makedirs(os.path.dirname(export_base_path), exist_ok=True) success_exports = [] failed_exports = [] try: for format_name in selected_formats: format_value = next(fmt["value"] for fmt in export_formats if fmt["name"] == format_name) try: export_file = self.discussion_storage.export_discussion( discussion_result, format_value, export_base_path + f".{format_value}" ) success_exports.append((format_name, export_file)) print(f"✅ 成功导出为 {format_name}: {export_file}") except Exception as e: failed_exports.append((format_name, str(e))) print(f"❌ 导出 {format_name} 失败: {e}") # 显示导出结果汇总 print("\n" + "="*50) print("导出结果汇总:") print("="*50) if success_exports: print("✅ 成功导出的格式:") for format_name, filepath in success_exports: print(f" - {format_name}: {filepath}") if failed_exports: print("❌ 导出失败的格式:") for format_name, error in failed_exports: print(f" - {format_name}: {error}") self.wait_for_enter() except Exception as e: print(f"❌ 导出过程中发生未知错误: {e}") self.wait_for_enter() def prompt_for_intervention_with_timeout(self, prompt: str, timeout: int = 5) -> Optional[str]: """ 带超时的用户介入提示 """ import threading from queue import Queue # 先显示提示信息 print(f"\n{prompt}", end='', flush=True) result_queue = Queue() def input_with_timeout(): try: user_input = input() # 直接获取输入 result_queue.put(user_input) except: result_queue.put(None) # 创建输入线程 input_thread = threading.Thread(target=input_with_timeout) input_thread.daemon = True input_thread.start() # 等待输入或超时 input_thread.join(timeout) if not result_queue.empty(): return result_queue.get() else: print(f"\n⏰ {timeout}秒超时,继续自动讨论...") return None def get_structured_intervention_prompt(self) -> Dict[str, Any]: """获取结构化的介入提示选项""" print((" - 1 向特定智能体提问 (q); 2. 向所有智能体提问 (a); 3. 补充病例信息 (i); 4. 跳过当前轮次 (s); 5. 终止讨论 (x); 6. 继续自动讨论 (回车) ")) # 直接显示提示,不等待 print("请选择介入方式: ", end='', flush=True) try: choice = input().strip().lower() except: choice = "" intervention_map = { '1': 'question_to_agent', 'q': 'question_to_agent', '2': 'broadcast_question', 'a': 'broadcast_question', '3': 'add_information', 'i': 'add_information', '4': 'skip_round', 's': 'skip_round', '5': 'interrupt', 'x': 'interrupt' } intervention_type = intervention_map.get(choice) if intervention_type: return self._get_intervention_details(intervention_type) return None def _get_intervention_details(self, intervention_type: str) -> Dict[str, Any]: """获取介入详细信息 - 立即提示输入""" try: if intervention_type == 'question_to_agent': print("请选择目标智能体: ", end='', flush=True) # 显示可用智能体列表 available_agents = list(self.agents.keys()) for i, agent in enumerate(available_agents, 1): print(f"{i}. {agent}") print("请输入智能体编号或名称: ", end='', flush=True) agent_input = input().strip() # 尝试解析为编号或名称 if agent_input.isdigit() and 1 <= int(agent_input) <= len(available_agents): target_agent = available_agents[int(agent_input) - 1] else: target_agent = agent_input print("请输入问题: ", end='', flush=True) question = input().strip() return { 'type': 'question_to_agent', 'target_agent': target_agent, 'question': question } elif intervention_type == 'broadcast_question': print("请输入要向所有智能体提问的问题: ", end='', flush=True) question = input().strip() return { 'type': 'broadcast_question', 'question': question } elif intervention_type == 'add_information': print("请输入要补充的病例信息: ", end='', flush=True) information = input().strip() return { 'type': 'add_information', 'information': information } elif intervention_type == 'skip_round': return {'type': 'skip_round'} elif intervention_type == 'interrupt': return {'type': 'interrupt'} except Exception as e: self.logger.error(f"获取介入详情失败: {e}") return None def view_discussion_history(self): """查看历史讨论""" self.print_header("历史讨论记录") try: discussions = self.discussion_storage.get_user_discussions(self.current_user['user_id']) if not discussions: print("暂无讨论记录。") self.wait_for_enter() return # 格式化讨论列表显示 discussion_list = [] for disc in discussions: discussion_list.append({ 'id': disc['metadata']['discussion_id'], 'date': disc['metadata']['timestamp'], 'question': disc['medical_context']['question'][:50] + '...' if len(disc['medical_context']['question']) > 50 else disc['medical_context']['question'], 'agents': ', '.join(disc['metadata']['agents_used'][:3]) + ('...' if len(disc['metadata']['agents_used']) > 3 else '') }) while True: # 显示讨论列表 print("历史讨论记录:") print("-" * 80) print(f"{'序号':<4} {'日期':<12} {'问题':<30} {'参与智能体':<30}") print("-" * 80) for i, disc in enumerate(discussion_list, 1): print(f"{i:<4} {disc['date']:<12} {disc['question']:<30} {disc['agents']:<30}") print("-" * 80) print("\n操作选项:") print("1. 查看详细记录") print("2. 导出讨论记录") print("3. 删除讨论记录") print("4. 返回主菜单") print() choice = self.cli_interface.get_user_input("请选择操作: ", required=False) if choice == "1": self.view_discussion_details(discussions) elif choice == "2": self.export_discussion_batch(discussions) elif choice == "3": self.delete_discussions(discussions) elif choice == "4": break else: print("无效选择,请重新输入。") self.wait_for_enter() except Exception as e: print(f"获取历史记录失败: {e}") self.wait_for_enter() def view_discussion_details(self, discussions: List[Dict]): """查看讨论详情""" discussion_ids = [disc['discussion_id'] for disc in discussions] selected_id = self.cli_interface.select_from_list( discussion_ids, "请选择要查看的讨论记录:", allow_multiple=False ) if selected_id: discussion_id = selected_id[0] discussion = next(disc for disc in discussions if disc['discussion_id'] == discussion_id) self.show_discussion_result(discussion) self.wait_for_enter() def export_discussion_batch(self, discussions: List[Dict]): """批量导出讨论记录""" discussion_ids = [disc['discussion_id'] for disc in discussions] selected_ids = self.cli_interface.select_from_list( discussion_ids, "请选择要导出的讨论记录(可多选):", allow_multiple=True ) if not selected_ids: return selected_discussions = [disc for disc in discussions if disc['discussion_id'] in selected_ids] export_formats = ["json", "docx", "html", "simple_html", "txt"] format_choice = self.cli_interface.select_from_list( export_formats, "请选择导出格式:", allow_multiple=False ) if format_choice: export_format = format_choice[0] export_path = self.cli_interface.get_user_input("导出路径: ", required=False) or "./exports/batch_export" try: export_files = self.discussion_storage.export_discussions_batch( selected_discussions, export_format, export_path ) print(f"成功导出 {len(export_files)} 个文件。") for file in export_files: print(f" - {file}") self.wait_for_enter() except Exception as e: print(f"批量导出失败: {e}") self.wait_for_enter() def delete_discussions(self, discussions: List[Dict]): """删除讨论记录""" discussion_ids = [disc['discussion_id'] for disc in discussions] selected_ids = self.cli_interface.select_from_list( discussion_ids, "请选择要删除的讨论记录(可多选):", allow_multiple=True ) if not selected_ids: return if self.cli_interface.confirm_action(f"确定要删除选中的 {len(selected_ids)} 条讨论记录吗?此操作不可恢复。"): try: deleted_count = self.user_data_manager.delete_discussions( self.current_user['user_id'], selected_ids ) print(f"成功删除 {deleted_count} 条讨论记录。") self.wait_for_enter() except Exception as e: print(f"删除失败: {e}") self.wait_for_enter() def manage_agents(self): """管理智能体 - 增强版本""" self.print_header("智能体管理") while True: available_agents = self.agent_registry.get_available_agents(self.current_session) builtin_agents = self.agent_registry.get_builtin_agents() custom_agents = self.agent_registry.get_custom_agents(self.current_session) print("当前可用智能体:") print("-" * 60) # 显示内置智能体(按分类分组) print("📚 内置智能体:") categorized_agents = self.agent_registry.get_agents_by_category(self.current_session) for category, agents in categorized_agents.items(): if any(agent.get('is_builtin', False) for agent in agents): print(f" 📂 {category}:") for agent in agents: if agent.get('is_builtin', False): print(f" • {agent['name']} - {agent['description']}") # 显示自定义智能体 print("\n🎯 自定义智能体:") if custom_agents: for agent_name, agent_info in custom_agents.items(): print(f" • {agent_name} - {agent_info.get('description', '自定义智能体')}") else: print(" 暂无自定义智能体") print("-" * 60) print("\n🛠️ 操作选项:") print("1. 添加自定义智能体") print("2. 编辑自定义智能体") print("3. 删除自定义智能体") print("4. 搜索智能体") print("5. 查看智能体详情") print("6. 返回主菜单") print() choice = self.cli_interface.get_user_input("请选择操作: ", required=False) if choice == "1": self.add_custom_agent() elif choice == "2": self.edit_custom_agent(custom_agents) elif choice == "3": self.delete_custom_agent(custom_agents) elif choice == "4": self.search_agents(available_agents) elif choice == "5": self.view_agent_details(available_agents) elif choice == "6": break else: print("无效选择,请重新输入。") self.wait_for_enter() def add_custom_agent(self): """添加自定义智能体 - 增强版本""" self.print_header("添加自定义智能体") print("🎯 创建新的自定义智能体") print("=" * 50) agent_name = self.cli_interface.get_user_input("智能体名称: ") # 检查名称是否已存在 available_agents = self.agent_registry.get_available_agents(self.current_session) if agent_name in available_agents: print("❌ 该名称已存在,请使用其他名称。") self.wait_for_enter() return print("\n📝📝 请输入智能体的专业描述:") description = self.cli_interface.get_user_input("智能体描述: ") print("\n请输入智能体的专业提示词:") print("(这将决定智能体的专业领域和行为方式)") print("提示:可以描述智能体的专业背景、分析风格、回答特点等") agent_prompt = self.cli_interface.get_multiline_input("智能体提示词:") if not agent_prompt.strip(): print("❌ 提示词不能为空。") self.wait_for_enter() return # 选择分类 categories = ["内科", "外科", "医技科", "药学", "辅助科室", "自定义"] print("\n📂 请选择分类:") for i, category in enumerate(categories, 1): print(f" {i}. {category}") category_choice = self.cli_interface.get_user_input(f"选择分类 (1-{len(categories)}): ", required=False) try: category_index = int(category_choice) - 1 category = categories[category_index] if 0 <= category_index < len(categories) else "自定义" except: category = "自定义" try: success = self.agent_registry.create_custom_agent( self.current_session, agent_name, agent_prompt, description=description, category=category ) if success: print(f"✅ 成功添加自定义智能体: {agent_name}") print(f"📂 分类: {category}") print(f"📝 描述: {description}") self.logger.info(f"用户 {self.current_user['username']} 添加自定义智能体: {agent_name}") else: print("❌ 添加智能体失败") self.wait_for_enter() except Exception as e: print(f"❌ 添加智能体失败: {e}") self.wait_for_enter() def search_agents(self, available_agents: Dict): """搜索智能体""" self.print_header("搜索智能体") query = self.cli_interface.get_user_input("请输入搜索关键词: ", required=False) if not query: return results = self.agent_registry.search_agents(query, self.current_session) if results: print(f"🔍 找到 {len(results)} 个相关智能体:") print("-" * 60) for i, result in enumerate(results, 1): type_icon = "📚" if result.get('is_builtin') else "🎯" print(f"{i}. {type_icon} {result['name']}") print(f" 分类: {result.get('category', '未知')}") print(f" 描述: {result.get('description', '')}") print() else: print("❌ 未找到相关智能体") self.wait_for_enter() def delete_custom_agent(self, custom_agents: Dict): """删除自定义智能体""" if not custom_agents: print("当前没有自定义智能体可删除。") self.wait_for_enter() return agent_names = list(custom_agents.keys()) selected_agents = self.cli_interface.select_from_list( agent_names, "请选择要删除的自定义智能体(可多选):", allow_multiple=True ) if not selected_agents: return if self.cli_interface.confirm_action(f"确定要删除选中的 {len(selected_agents)} 个自定义智能体吗?"): try: for agent_name in selected_agents: self.agent_registry.delete_custom_agent( self.current_session, agent_name ) print(f"已删除智能体: {agent_name}") self.logger.info(f"用户 {self.current_user['username']} 删除自定义智能体: {selected_agents}") self.wait_for_enter() except Exception as e: print(f"删除智能体失败: {e}") self.wait_for_enter() def view_agent_details(self, available_agents: Dict): """查看智能体详情""" agent_names = list(available_agents.keys()) selected_agent = self.cli_interface.select_from_list( agent_names, "请选择要查看的智能体:", allow_multiple=False ) if not selected_agent: return agent_name = selected_agent[0] agent_info = available_agents[agent_name] self.print_header(f"智能体详情 - {agent_name}") print(f"类型: {'内置' if agent_name in self.agent_registry.get_builtin_agents() else '自定义'}") print(f"专业领域: {agent_info.get('specialty', '未指定')}") print("\n提示词:") print("-" * 60) print(agent_info.get('prompt', '无提示词信息')) print("-" * 60) self.wait_for_enter() def system_settings(self): """系统设置""" self.print_header("系统设置") print("当前系统设置:") print(f"1. 默认讨论轮数: {self.config.DEFAULT_ROUNDS}") print(f"2. 最大自定义智能体数: {self.config.MAX_CUSTOM_AGENTS}") print(f"3. 会话超时时间: {self.config.SESSION_TIMEOUT} 秒") print(f"4. 默认导出格式: {self.config.DEFAULT_EXPORT_FORMAT}") print() if self.current_user.get('role') == 'admin': print("管理员选项:") print("5. 修改系统设置") print("6. 查看系统日志") print("7. 管理所有用户") print() print("8. 返回主菜单") print() choice = self.cli_interface.get_user_input("请选择操作: ", required=False) if choice == "1" and self.current_user.get('role') == 'admin': self.change_default_rounds() elif choice == "2" and self.current_user.get('role') == 'admin': self.change_max_custom_agents() elif choice == "3" and self.current_user.get('role') == 'admin': self.change_session_timeout() elif choice == "4" and self.current_user.get('role') == 'admin': self.change_default_export_format() elif choice == "5" and self.current_user.get('role') == 'admin': print("当前系统设置:") print(f"1. 模型引擎: {self.config.model.engine}") print(f"2. API端点: {self.config.model.api_base}") print(f"3. 模型名称: {self.config.model.model_name}") print(f"4. 温度参数: {self.config.model.temperature}") print("\n模型配置管理:") print("5. 重新加载模型配置") print("6. 测试模型连接") print("7. 编辑模型配置文件") choice = self.cli_interface.get_user_input("请选择操作: ", required=False) if choice == "5" and self.current_user.get('role') == 'admin': self.reload_model_config() elif choice == "6": self.test_model_connection() elif choice == "7" and self.current_user.get('role') == 'admin': self.edit_model_config() elif choice == "6" and self.current_user.get('role') == 'admin': self.view_system_logs() elif choice == "7" and self.current_user.get('role') == 'admin': self.manage_all_users() elif choice == "8": # 返回主菜单 pass else: print("无权限或无效选择。") self.wait_for_enter() def change_default_rounds(self): """修改默认讨论轮数""" new_rounds = self.get_user_input(f"新的默认讨论轮数 ({self.config.DEFAULT_ROUNDS}): ") try: new_rounds = int(new_rounds) if 1 <= new_rounds <= 10: self.config.DEFAULT_ROUNDS = new_rounds self.config.save() print("默认讨论轮数已更新。") else: print("讨论轮数必须在1-10之间。") except ValueError: print("请输入有效的数字。") self.wait_for_enter() def reload_model_config(self): """重新加载模型配置""" try: self.config = ClinicalConfig.reload_config() print("✅ 模型配置已重新加载") except Exception as e: print(f"❌ 重新加载配置失败: {e}") def test_model_connection(self): """测试模型连接""" print("测试模型连接...") # 实现测试逻辑 pass def edit_model_config(self): """编辑模型配置文件""" config_file = "model_config.json" if not os.path.exists(config_file): # 创建默认配置文件 self.create_default_model_config() print(f"编辑配置文件: {config_file}") # 可以调用系统编辑器或提供编辑界面 pass def change_max_custom_agents(self): """修改最大自定义智能体数""" new_max = self.get_user_input(f"新的最大自定义智能体数 ({self.config.MAX_CUSTOM_AGENTS}): ") try: new_max = int(new_max) if new_max >= 1: self.config.MAX_CUSTOM_AGENTS = new_max self.config.save() print("最大自定义智能体数已更新。") else: print("数值必须大于0。") except ValueError: print("请输入有效的数字。") self.wait_for_enter() def change_session_timeout(self): """修改会话超时时间""" new_timeout = self.get_user_input(f"新的会话超时时间(秒) ({self.config.SESSION_TIMEOUT}): ") try: new_timeout = int(new_timeout) if new_timeout >= 60: # 至少1分钟 self.config.SESSION_TIMEOUT = new_timeout self.config.save() print("会话超时时间已更新。") else: print("超时时间必须至少60秒。") except ValueError: print("请输入有效的数字。") self.wait_for_enter() def change_llm_api(self): """修改LLM API地址""" new_api = self.get_user_input(f"新的LLM API地址 ({self.config.LLM_API_BASE}): ") if new_api: self.config.LLM_API_BASE = new_api self.config.save() print("LLM API地址已更新。") self.wait_for_enter() def change_default_export_format(self): """修改默认导出格式""" formats = ["json", "docx", "txt", "pdf"] print("可选格式: " + ", ".join(formats)) new_format = self.get_user_input(f"新的默认导出格式 ({self.config.DEFAULT_EXPORT_FORMAT}): ") if new_format and new_format.lower() in formats: self.config.DEFAULT_EXPORT_FORMAT = new_format.lower() self.config.save() print("默认导出格式已更新。") self.wait_for_enter() def view_system_logs(self): """查看系统日志""" self.print_header("系统日志") log_files = self.get_log_files() if not log_files: print("暂无日志文件。") self.wait_for_enter() return selected_file = self.cli_interface.select_from_list( log_files, "请选择要查看的日志文件:", allow_multiple=False ) if selected_file: try: with open(selected_file[0], 'r', encoding='utf-8') as f: content = f.read() print(f"日志文件: {selected_file[0]}") print("-" * 80) print(content) print("-" * 80) self.wait_for_enter() except Exception as e: print(f"读取日志文件失败: {e}") self.wait_for_enter() def get_log_files(self): """获取日志文件列表""" log_dir = "logs" if not os.path.exists(log_dir): return [] log_files = [] for file in os.listdir(log_dir): if file.endswith(".log"): log_files.append(os.path.join(log_dir, file)) return sorted(log_files, reverse=True) # 最新的在前 def manage_all_users(self): """管理所有用户""" self.print_header("用户管理") try: all_users = self.user_manager.get_all_users() print("所有用户列表:") print("-" * 80) print(f"{'用户名':<15} {'真实姓名':<15} {'科室':<15} {'角色':<10} {'最后登录':<20}") print("-" * 80) for user in all_users: print(f"{user['username']:<15} {user.get('full_name', ''):<15} " f"{user.get('department', ''):<15} {user.get('role', 'user'):<10} " f"{user.get('last_login', '从未登录'):<20}") print("-" * 80) print("\n操作选项:") print("1. 查看用户详情") print("2. 修改用户角色") print("3. 重置用户密码") print("4. 删除用户") print("5. 返回系统设置") print() choice = self.cli_interface.get_user_input("请选择操作: ", required=False) if choice == "1": self.view_user_details(all_users) elif choice == "2": self.change_user_role(all_users) elif choice == "3": self.reset_user_password(all_users) elif choice == "4": self.delete_user(all_users) elif choice == "5": return else: print("无效选择,请重新输入。") self.wait_for_enter() except Exception as e: print(f"获取用户列表失败: {e}") self.wait_for_enter() def view_user_details(self, users: List[Dict]): """查看用户详情""" usernames = [user['username'] for user in users] selected_user = self.cli_interface.select_from_list( usernames, "请选择要查看的用户:", allow_multiple=False ) if not selected_user: return username = selected_user[0] user_info = next(user for user in users if user['username'] == username) self.print_header(f"用户详情 - {username}") print(f"用户名: {user_info['username']}") print(f"真实姓名: {user_info.get('full_name', '未设置')}") print(f"科室: {user_info.get('department', '未设置')}") print(f"角色: {user_info.get('role', 'user')}") print(f"创建时间: {user_info.get('created_at', '未知')}") print(f"最后登录: {user_info.get('last_login', '从未登录')}") print(f"讨论记录数: {user_info.get('discussion_count', 0)}") self.wait_for_enter() def change_user_role(self, users: List[Dict]): """修改用户角色""" usernames = [user['username'] for user in users] selected_user = self.cli_interface.select_from_list( usernames, "请选择要修改角色的用户:", allow_multiple=False ) if not selected_user: return username = selected_user[0] current_role = next(user['role'] for user in users if user['username'] == username) roles = ["user", "admin"] new_role = self.cli_interface.select_from_list( roles, f"请选择新角色 (当前: {current_role}):", allow_multiple=False ) if new_role and new_role[0] != current_role: try: self.user_manager.change_user_role(username, new_role[0]) print(f"用户 {username} 的角色已从 {current_role} 改为 {new_role[0]}。") self.logger.info(f"管理员 {self.current_user['username']} 修改用户 {username} 角色为 {new_role[0]}") except Exception as e: print(f"修改角色失败: {e}") self.wait_for_enter() def reset_user_password(self, users: List[Dict]): """重置用户密码""" usernames = [user['username'] for user in users] selected_user = self.cli_interface.select_from_list( usernames, "请选择要重置密码的用户:", allow_multiple=False ) if not selected_user: return username = selected_user[0] if self.cli_interface.confirm_action(f"确定要重置用户 {username} 的密码吗?"): new_password = self.get_user_input("请输入新密码: ", password=True) confirm_password = self.get_user_input("请确认新密码: ", password=True) if new_password != confirm_password: print("密码不一致,重置取消。") self.wait_for_enter() return try: self.user_manager.reset_user_password(username, new_password) print(f"用户 {username} 的密码已重置。") self.logger.info(f"管理员 {self.current_user['username']} 重置用户 {username} 的密码") except Exception as e: print(f"重置密码失败: {e}") self.wait_for_enter() def delete_user(self, users: List[Dict]): """删除用户""" usernames = [user['username'] for user in users if user['username'] != self.current_user['username']] if not usernames: print("没有其他用户可删除。") self.wait_for_enter() return selected_user = self.cli_interface.select_from_list( usernames, "请选择要删除的用户:", allow_multiple=False ) if not selected_user: return username = selected_user[0] if self.cli_interface.confirm_action(f"确定要删除用户 {username} 吗?此操作不可恢复。"): try: self.user_manager.delete_user(username) print(f"用户 {username} 已删除。") self.logger.info(f"管理员 {self.current_user['username']} 删除用户 {username}") except Exception as e: print(f"删除用户失败: {e}") self.wait_for_enter() def user_information(self): """用户信息""" self.print_header("用户信息") print(f"用户名: {self.current_user['username']}") print(f"真实姓名: {self.current_user.get('full_name', '未设置')}") print(f"科室: {self.current_user.get('department', '未设置')}") print(f"角色: {self.current_user.get('role', 'user')}") print(f"注册时间: {self.current_user.get('created_at', '未知')}") print(f"最后登录: {self.current_user.get('last_login', '未知')}") # 获取用户的讨论统计 try: discussion_count = self.user_data_manager.get_user_discussion_count(self.current_user['user_id']) print(f"讨论记录数: {discussion_count}") except: print("讨论记录数: 无法获取") print("\n操作选项:") print("1. 修改个人信息") print("2. 修改密码") print("3. 返回主菜单") print() choice = self.cli_interface.get_user_input("请选择操作: ", required=False) if choice == "1": self.update_profile() elif choice == "2": self.change_password() elif choice == "3": return else: print("无效选择,请重新输入。") self.wait_for_enter() def update_profile(self): """修改个人信息""" self.print_header("修改个人信息") current_full_name = self.current_user.get('full_name', '') current_department = self.current_user.get('department', '') new_full_name = self.get_user_input(f"真实姓名 ({current_full_name}): ", required=False) new_department = self.get_user_input(f"科室 ({current_department}): ", required=False) if not new_full_name and not new_department: print("未修改任何信息。") self.wait_for_enter() return try: updates = {} if new_full_name: updates['full_name'] = new_full_name if new_department: updates['department'] = new_department self.user_manager.update_user_profile(self.current_user['user_id'], updates) # 更新当前会话中的用户信息 if new_full_name: self.current_user['full_name'] = new_full_name if new_department: self.current_user['department'] = new_department print("个人信息已更新。") self.logger.info(f"用户 {self.current_user['username']} 更新个人信息") except Exception as e: print(f"更新个人信息失败: {e}") self.wait_for_enter() def change_password(self): """修改密码""" self.print_header("修改密码") current_password = self.get_user_input("当前密码: ", password=True) # 验证当前密码 if not self.user_manager.verify_password(self.current_user['user_id'], current_password): print("当前密码不正确。") self.wait_for_enter() return new_password = self.get_user_input("新密码: ", password=True) confirm_password = self.get_user_input("确认新密码: ", password=True) if new_password != confirm_password: print("新密码不一致,修改取消。") self.wait_for_enter() return try: self.user_manager.change_user_password(self.current_user['user_id'], new_password) print("密码已修改。") self.logger.info(f"用户 {self.current_user['username']} 修改密码") except Exception as e: print(f"修改密码失败: {e}") self.wait_for_enter() def run(self): """运行CLI主循环""" try: # 显示欢迎信息 self.clear_screen() print("=" * 60) print(" 欢迎使用临床多智能体讨论系统") print("=" * 60) print() self.wait_for_enter("按回车键开始...") # 用户认证 if not self.authenticate_user(): return # 主菜单循环 self.show_main_menu() except KeyboardInterrupt: print("\n\n感谢使用临床多智能体讨论系统!") except Exception as e: print(f"系统错误: {e}") self.logger.error(f"系统错误: {e}") self.wait_for_enter() class CLIInterface: """CLI接口适配器 - 统一输入处理""" def __init__(self, cli_instance): self.cli = cli_instance def get_user_input(self, prompt: str = "", required: bool = True, password: bool = False) -> str: """获取用户输入""" while True: try: if password: user_input = getpass.getpass(prompt) else: user_input = input(prompt).strip() if required and not user_input: print("请输入:") continue return user_input except KeyboardInterrupt: print("\n\n操作已取消。") sys.exit(0) except Exception as e: print(f"输入错误: {e}") continue def get_multiline_input(self, prompt: str) -> str: """获取多行输入""" print(prompt) print("(输入空行结束输入)") lines = [] while True: try: line = input() if line.strip() == "": break lines.append(line) except EOFError: break except KeyboardInterrupt: return "" return "\n".join(lines) def confirm_action(self, prompt: str) -> bool: """确认操作""" response = self.get_user_input(f"{prompt} (y/n): ", required=False) return response and response.lower() in ['y', 'yes', '是', '确认'] def select_from_list(self, items: List, prompt: str, allow_multiple: bool = False) -> List: """从列表中选择项目""" if not items: print("暂无选项可用。") return [] print(f"\n{prompt}") print("-" * 40) for i, item in enumerate(items, 1): if isinstance(item, dict): display_text = item.get('name', str(item)) else: display_text = str(item) print(f"{i}. {display_text}", end=" ") if i % 6 == 0: print() print() if allow_multiple: print(f"{len(items) + 1}. 全选") print(f"{len(items) + 2}. 取消选择") print("\n提示:可以输入多个编号,用空格、逗号或中文逗号分隔") else: print(f"{len(items) + 1}. 返回上级") print("-" * 40) while True: try: choice = self.get_user_input("请选择编号: ", required=False) if not choice: return [] if allow_multiple: if choice.lower() == 'all' or choice == str(len(items) + 1): return items if choice.lower() in ['cancel', '取消'] or choice == str(len(items) + 2): return [] # 支持多种分隔符 import re choice = re.sub(r'[,\s]+', ',', choice) choice = choice.strip(',') if ',' in choice: indices = [] for part in choice.split(','): part = part.strip() if part.isdigit(): indices.append(int(part)) elif '-' in part: range_parts = part.split('-') if len(range_parts) == 2 and range_parts[0].isdigit() and range_parts[1].isdigit(): start, end = int(range_parts[0]), int(range_parts[1]) indices.extend(range(start, end + 1)) else: indices = [int(choice)] if choice.isdigit() else [] valid_indices = [i for i in indices if 1 <= i <= len(items)] if valid_indices: selected_items = [] for i in valid_indices: if i <= len(items): selected_items.append(items[i-1]) return selected_items else: print("无效的选择,请重新输入。") else: index = int(choice) if 1 <= index <= len(items): return [items[index-1]] elif index == len(items) + 1: return [] else: print("无效的选择,请重新输入。") except ValueError: print("请输入有效的数字。") except KeyboardInterrupt: return [] def has_user_input(self, timeout: float = 0) -> bool: """检查是否有用户输入 - 简化实现""" # 在实际应用中可以实现真正的非阻塞检查 return False def should_prompt_for_intervention(self) -> bool: """是否应该提示用户介入""" return False def main(): """主函数""" try: cli = ClinicalCLI() cli.run() except Exception as e: print(f"程序启动失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()