import os import sys from tqdm import tqdm from core.text_processor import TextProcessor from core.character_extractor import CharacterExtractor from core.character_analyzer import CharacterAnalyzer from core.character_agent import CharacterAgent from utils.text_utils import TextUtils from utils.cache_manager import CacheManager from config import Config def print_banner(): """打印欢迎横幅""" banner = """ ╔══════════════════════════════════════════════════════════════════╗ ║ ║ ║ 🎭 小说角色 Agent 系统 (大规模文本版) ║ ║ ║ ║ 基于 AI 的角色性格分析与对话系统 ║ ║ ║ ╚══════════════════════════════════════════════════════════════════╝ """ print(banner) def load_novel(file_path: str) -> str: """加载小说文本""" try: # 尝试不同的编码 encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1', 'utf-16'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as f: text = f.read() print(f"✓ 成功使用 {encoding} 编码加载文件") return text except UnicodeDecodeError: continue print("✗ 所有编码尝试失败") return "" except FileNotFoundError: print(f"✗ 文件不存在: {file_path}") return "" except Exception as e: print(f"✗ 加载小说失败: {e}") return "" def display_statistics(stats: dict): """显示文本统计信息""" print("\n" + "="*70) print("📊 文本统计信息") print("="*70) print(f"总字符数: {stats['total_length']:,}") print(f"Token 数量: {stats['total_tokens']:,}") print(f"段落数: {stats['paragraphs']:,}") print(f"句子数: {stats['sentences']:,}") lang_map = {'zh': '中文', 'en': '英文', 'mixed': '中英混合', 'unknown': '未知'} print(f"检测语言: {lang_map.get(stats['language'], stats['language'])}") # 估算阅读时间 text_utils = TextUtils() reading_time = text_utils.estimate_reading_time(" " * stats['total_length']) print(f"预计阅读: 约 {reading_time} 分钟") print("="*70) def select_character_interactive(characters: list) -> dict: """交互式选择角色""" print("\n" + "="*70) print("📋 检测到的主要角色") print("="*70) print(f"{'序号':<6}{'角色名':<25}{'出现次数':<12}{'分布章节':<12}") print("-"*70) for i, char in enumerate(characters[:15], 1): name = char['name'] count = char['info']['count'] chunks = len(char['info']['chunks']) print(f"{i:<6}{name:<25}{count:<12}{chunks:<12}") print("="*70) while True: try: choice = input(f"\n请选择角色编号 (1-{min(15, len(characters))}): ").strip() if choice.isdigit(): idx = int(choice) - 1 if 0 <= idx < len(characters): return characters[idx] print("❌ 无效选择,请重试") except KeyboardInterrupt: print("\n\n👋 程序已退出") sys.exit(0) except: print("❌ 输入错误,请重试") def interactive_chat(agent: CharacterAgent): """交互式对话界面""" print("\n" + "="*70) print(agent.get_character_info()) print("\n💬 对话开始!") print("-"*70) print("💡 提示:") print(" • 输入 'quit' 或 'exit' - 退出对话") print(" • 输入 'reset' - 重置对话历史") print(" • 输入 'save' - 保存对话") print(" • 输入 'info' - 查看角色信息") print(" • 输入 'help' - 显示帮助") print("="*70 + "\n") char_name = agent.character_profile['name'] while True: try: # 用户输入 user_input = input("🧑 你: ").strip() if not user_input: continue # 处理命令 if user_input.lower() in ['quit', 'exit', '退出', 'q']: print(f"\n👋 {char_name}: 再见,很高兴和你聊天!") # 询问是否保存对话 if len(agent.conversation_history) > 0: save = input("\n是否保存对话记录?(y/n): ").strip().lower() if save in ['y', 'yes', '是']: filename = f"conversation_{char_name}_{len(agent.conversation_history)}.json" agent.save_conversation(filename) break if user_input.lower() in ['reset', '重置']: agent.reset_conversation() continue if user_input.lower() in ['save', '保存']: filename = f"conversation_{char_name}_{len(agent.conversation_history)}.json" agent.save_conversation(filename) continue if user_input.lower() in ['info', '信息']: print(agent.get_character_info()) continue if user_input.lower() in ['help', '帮助']: print("\n可用命令:") print(" quit/exit - 退出对话") print(" reset - 重置对话历史") print(" save - 保存对话") print(" info - 查看角色信息") print(" help - 显示此帮助\n") continue # 正常对话 print(f"\n{'⏳ ' + char_name + ' 正在思考...':<70}", end='\r') response = agent.chat(user_input) print(" " * 70, end='\r') # 清除"思考中" print(f"🎭 {char_name}: {response}\n") except KeyboardInterrupt: print(f"\n\n👋 {char_name}: 再见!") break except Exception as e: print(f"\n❌ 错误: {e}\n") def check_environment(): """检查运行环境""" issues = [] # 检查 API Key if not Config.OPENAI_API_KEY or Config.OPENAI_API_KEY == "": issues.append("未设置 OPENAI_API_KEY") # 检查缓存目录 if not os.path.exists(Config.CACHE_DIR): try: os.makedirs(Config.CACHE_DIR) except: issues.append(f"无法创建缓存目录: {Config.CACHE_DIR}") # 检查必要的包 try: import openai import chromadb import tiktoken except ImportError as e: issues.append(f"缺少必要的包: {e}") if issues: print("\n⚠️ 环境检查发现问题:") for issue in issues: print(f" • {issue}") print("\n请检查配置文件 .env 和依赖安装\n") return False return True def main(): """主函数 - 完整流程""" # 打印横幅 print_banner() # 检查环境 if not check_environment(): return # 显示缓存信息 cache = CacheManager() cache_info = cache.get_cache_info() if cache_info['count'] > 0: print(f"📦 缓存: {cache_info['count']} 个文件, {cache_info['size_mb']} MB") # 1. 加载小说 print("\n" + "="*70) print("📖 步骤 1/5: 加载小说") print("="*70) default_path = "sample_novels/harry_potter_sample.txt" novel_path = input(f"\n请输入小说文件路径 (默认: {default_path})\n> ").strip() if not novel_path: novel_path = default_path if not os.path.exists(novel_path): print(f"❌ 文件不存在: {novel_path}") # 尝试在 sample_novels 目录下查找 alt_path = os.path.join("sample_novels", os.path.basename(novel_path)) if os.path.exists(alt_path): print(f"✓ 找到文件: {alt_path}") novel_path = alt_path else: print("程序退出") return print(f"\n正在加载: {novel_path}") novel_text = load_novel(novel_path) if not novel_text: print("❌ 无法加载小说,程序退出") return # 显示统计信息 processor = TextProcessor() stats = processor.get_statistics(novel_text) display_statistics(stats) # 检查文本长度 if stats['total_length'] < 1000: print("⚠️ 警告: 文本过短 (< 1000字符),可能影响分析效果") proceed = input("是否继续?(y/n): ").strip().lower() if proceed not in ['y', 'yes', '是']: return # 2. 文本分块 print("\n" + "="*70) print("📄 步骤 2/5: 文本分块处理") print("="*70) chunks = processor.chunk_text(novel_text) print(f"✓ 文本已分为 {len(chunks)} 个块") print(f" 平均每块: {stats['total_length'] // len(chunks)} 字符") # 3. 提取角色 print("\n" + "="*70) print("👥 步骤 3/5: 提取主要角色") print("="*70) extractor = CharacterExtractor() characters = extractor.extract_main_characters( chunks, text_sample=novel_text[:3000], language=stats['language'] ) if not characters: print("❌ 未能提取到角色,程序退出") return # 4. 选择角色 print("\n" + "="*70) print("🎯 步骤 4/5: 选择要对话的角色") print("="*70) selected = select_character_interactive(characters) character_name = selected['name'] character_info = selected['info'] print(f"\n✓ 已选择: {character_name}") print(f" 出现次数: {character_info['count']}") print(f" 分布章节: {len(character_info['chunks'])}") # 5. 分析角色 print(f"\n" + "="*70) print(f"🧠 步骤 5/5: 分析角色性格") print("="*70) print(f"正在深度分析 {character_name} 的性格特征...") print("这可能需要几分钟,请耐心等待...\n") analyzer = CharacterAnalyzer() # 选择代表性文本块 representative_chunks = analyzer.select_representative_chunks( chunks, character_info['chunks'] ) print(f"✓ 选取了 {len(representative_chunks)} 个代表性片段进行分析") # 执行分析 character_profile = analyzer.analyze_character_batch( character_name, representative_chunks ) # 增强配置 character_profile = analyzer.enhance_profile_with_examples( character_profile, chunks, character_info['chunks'] ) print(f"✓ 角色分析完成!") # 6. 创建对话代理 print("\n" + "="*70) print("🤖 创建对话代理") print("="*70) use_memory = input("\n是否启用记忆系统?(y/n, 默认: y): ").strip().lower() if use_memory in ['', 'y', 'yes', '是']: print("正在初始化记忆系统...") agent = CharacterAgent( character_profile, chunks=chunks, character_chunks=character_info['chunks'] ) print("✓ Agent 创建成功,记忆系统已初始化") else: agent = CharacterAgent(character_profile) print("✓ Agent 创建成功(未启用记忆系统)") # 7. 开始对话 interactive_chat(agent) # 结束 print("\n" + "="*70) print("感谢使用小说角色 Agent 系统!") print("="*70) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n👋 程序已被用户中断") except Exception as e: print(f"\n❌ 程序错误: {e}") import traceback traceback.print_exc()