|
|
import os |
|
|
import sys |
|
|
from tqdm import tqdm |
|
|
from core.text_processor import TextProcessor |
|
|
from core.character_extractor import CharacterExtractor |
|
|
from core.character_analyzer import CharacterAnalyzer |
|
|
from core.character_agent import CharacterAgent |
|
|
from utils.text_utils import TextUtils |
|
|
from utils.cache_manager import CacheManager |
|
|
from config import Config |
|
|
|
|
|
def print_banner(): |
|
|
"""打印欢迎横幅""" |
|
|
banner = """ |
|
|
╔══════════════════════════════════════════════════════════════════╗ |
|
|
║ ║ |
|
|
║ 🎭 小说角色 Agent 系统 (大规模文本版) ║ |
|
|
║ ║ |
|
|
║ 基于 AI 的角色性格分析与对话系统 ║ |
|
|
║ ║ |
|
|
╚══════════════════════════════════════════════════════════════════╝ |
|
|
""" |
|
|
print(banner) |
|
|
|
|
|
def load_novel(file_path: str) -> str: |
|
|
"""加载小说文本""" |
|
|
try: |
|
|
|
|
|
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1', 'utf-16'] |
|
|
|
|
|
for encoding in encodings: |
|
|
try: |
|
|
with open(file_path, 'r', encoding=encoding) as f: |
|
|
text = f.read() |
|
|
print(f"✓ 成功使用 {encoding} 编码加载文件") |
|
|
return text |
|
|
except UnicodeDecodeError: |
|
|
continue |
|
|
|
|
|
print("✗ 所有编码尝试失败") |
|
|
return "" |
|
|
|
|
|
except FileNotFoundError: |
|
|
print(f"✗ 文件不存在: {file_path}") |
|
|
return "" |
|
|
except Exception as e: |
|
|
print(f"✗ 加载小说失败: {e}") |
|
|
return "" |
|
|
|
|
|
def display_statistics(stats: dict): |
|
|
"""显示文本统计信息""" |
|
|
print("\n" + "="*70) |
|
|
print("📊 文本统计信息") |
|
|
print("="*70) |
|
|
print(f"总字符数: {stats['total_length']:,}") |
|
|
print(f"Token 数量: {stats['total_tokens']:,}") |
|
|
print(f"段落数: {stats['paragraphs']:,}") |
|
|
print(f"句子数: {stats['sentences']:,}") |
|
|
|
|
|
lang_map = {'zh': '中文', 'en': '英文', 'mixed': '中英混合', 'unknown': '未知'} |
|
|
print(f"检测语言: {lang_map.get(stats['language'], stats['language'])}") |
|
|
|
|
|
|
|
|
text_utils = TextUtils() |
|
|
reading_time = text_utils.estimate_reading_time(" " * stats['total_length']) |
|
|
print(f"预计阅读: 约 {reading_time} 分钟") |
|
|
print("="*70) |
|
|
|
|
|
def select_character_interactive(characters: list) -> dict: |
|
|
"""交互式选择角色""" |
|
|
print("\n" + "="*70) |
|
|
print("📋 检测到的主要角色") |
|
|
print("="*70) |
|
|
print(f"{'序号':<6}{'角色名':<25}{'出现次数':<12}{'分布章节':<12}") |
|
|
print("-"*70) |
|
|
|
|
|
for i, char in enumerate(characters[:15], 1): |
|
|
name = char['name'] |
|
|
count = char['info']['count'] |
|
|
chunks = len(char['info']['chunks']) |
|
|
print(f"{i:<6}{name:<25}{count:<12}{chunks:<12}") |
|
|
|
|
|
print("="*70) |
|
|
|
|
|
while True: |
|
|
try: |
|
|
choice = input(f"\n请选择角色编号 (1-{min(15, len(characters))}): ").strip() |
|
|
|
|
|
if choice.isdigit(): |
|
|
idx = int(choice) - 1 |
|
|
if 0 <= idx < len(characters): |
|
|
return characters[idx] |
|
|
|
|
|
print("❌ 无效选择,请重试") |
|
|
except KeyboardInterrupt: |
|
|
print("\n\n👋 程序已退出") |
|
|
sys.exit(0) |
|
|
except: |
|
|
print("❌ 输入错误,请重试") |
|
|
|
|
|
def interactive_chat(agent: CharacterAgent): |
|
|
"""交互式对话界面""" |
|
|
|
|
|
print("\n" + "="*70) |
|
|
print(agent.get_character_info()) |
|
|
print("\n💬 对话开始!") |
|
|
print("-"*70) |
|
|
print("💡 提示:") |
|
|
print(" • 输入 'quit' 或 'exit' - 退出对话") |
|
|
print(" • 输入 'reset' - 重置对话历史") |
|
|
print(" • 输入 'save' - 保存对话") |
|
|
print(" • 输入 'info' - 查看角色信息") |
|
|
print(" • 输入 'help' - 显示帮助") |
|
|
print("="*70 + "\n") |
|
|
|
|
|
char_name = agent.character_profile['name'] |
|
|
|
|
|
while True: |
|
|
try: |
|
|
|
|
|
user_input = input("🧑 你: ").strip() |
|
|
|
|
|
if not user_input: |
|
|
continue |
|
|
|
|
|
|
|
|
if user_input.lower() in ['quit', 'exit', '退出', 'q']: |
|
|
print(f"\n👋 {char_name}: 再见,很高兴和你聊天!") |
|
|
|
|
|
|
|
|
if len(agent.conversation_history) > 0: |
|
|
save = input("\n是否保存对话记录?(y/n): ").strip().lower() |
|
|
if save in ['y', 'yes', '是']: |
|
|
filename = f"conversation_{char_name}_{len(agent.conversation_history)}.json" |
|
|
agent.save_conversation(filename) |
|
|
break |
|
|
|
|
|
if user_input.lower() in ['reset', '重置']: |
|
|
agent.reset_conversation() |
|
|
continue |
|
|
|
|
|
if user_input.lower() in ['save', '保存']: |
|
|
filename = f"conversation_{char_name}_{len(agent.conversation_history)}.json" |
|
|
agent.save_conversation(filename) |
|
|
continue |
|
|
|
|
|
if user_input.lower() in ['info', '信息']: |
|
|
print(agent.get_character_info()) |
|
|
continue |
|
|
|
|
|
if user_input.lower() in ['help', '帮助']: |
|
|
print("\n可用命令:") |
|
|
print(" quit/exit - 退出对话") |
|
|
print(" reset - 重置对话历史") |
|
|
print(" save - 保存对话") |
|
|
print(" info - 查看角色信息") |
|
|
print(" help - 显示此帮助\n") |
|
|
continue |
|
|
|
|
|
|
|
|
print(f"\n{'⏳ ' + char_name + ' 正在思考...':<70}", end='\r') |
|
|
response = agent.chat(user_input) |
|
|
print(" " * 70, end='\r') |
|
|
print(f"🎭 {char_name}: {response}\n") |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print(f"\n\n👋 {char_name}: 再见!") |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"\n❌ 错误: {e}\n") |
|
|
|
|
|
def check_environment(): |
|
|
"""检查运行环境""" |
|
|
issues = [] |
|
|
|
|
|
|
|
|
if not Config.OPENAI_API_KEY or Config.OPENAI_API_KEY == "": |
|
|
issues.append("未设置 OPENAI_API_KEY") |
|
|
|
|
|
|
|
|
if not os.path.exists(Config.CACHE_DIR): |
|
|
try: |
|
|
os.makedirs(Config.CACHE_DIR) |
|
|
except: |
|
|
issues.append(f"无法创建缓存目录: {Config.CACHE_DIR}") |
|
|
|
|
|
|
|
|
try: |
|
|
import openai |
|
|
import chromadb |
|
|
import tiktoken |
|
|
except ImportError as e: |
|
|
issues.append(f"缺少必要的包: {e}") |
|
|
|
|
|
if issues: |
|
|
print("\n⚠️ 环境检查发现问题:") |
|
|
for issue in issues: |
|
|
print(f" • {issue}") |
|
|
print("\n请检查配置文件 .env 和依赖安装\n") |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def main(): |
|
|
"""主函数 - 完整流程""" |
|
|
|
|
|
|
|
|
print_banner() |
|
|
|
|
|
|
|
|
if not check_environment(): |
|
|
return |
|
|
|
|
|
|
|
|
cache = CacheManager() |
|
|
cache_info = cache.get_cache_info() |
|
|
if cache_info['count'] > 0: |
|
|
print(f"📦 缓存: {cache_info['count']} 个文件, {cache_info['size_mb']} MB") |
|
|
|
|
|
|
|
|
print("\n" + "="*70) |
|
|
print("📖 步骤 1/5: 加载小说") |
|
|
print("="*70) |
|
|
|
|
|
default_path = "sample_novels/harry_potter_sample.txt" |
|
|
novel_path = input(f"\n请输入小说文件路径 (默认: {default_path})\n> ").strip() |
|
|
|
|
|
if not novel_path: |
|
|
novel_path = default_path |
|
|
|
|
|
if not os.path.exists(novel_path): |
|
|
print(f"❌ 文件不存在: {novel_path}") |
|
|
|
|
|
|
|
|
alt_path = os.path.join("sample_novels", os.path.basename(novel_path)) |
|
|
if os.path.exists(alt_path): |
|
|
print(f"✓ 找到文件: {alt_path}") |
|
|
novel_path = alt_path |
|
|
else: |
|
|
print("程序退出") |
|
|
return |
|
|
|
|
|
print(f"\n正在加载: {novel_path}") |
|
|
novel_text = load_novel(novel_path) |
|
|
|
|
|
if not novel_text: |
|
|
print("❌ 无法加载小说,程序退出") |
|
|
return |
|
|
|
|
|
|
|
|
processor = TextProcessor() |
|
|
stats = processor.get_statistics(novel_text) |
|
|
display_statistics(stats) |
|
|
|
|
|
|
|
|
if stats['total_length'] < 1000: |
|
|
print("⚠️ 警告: 文本过短 (< 1000字符),可能影响分析效果") |
|
|
proceed = input("是否继续?(y/n): ").strip().lower() |
|
|
if proceed not in ['y', 'yes', '是']: |
|
|
return |
|
|
|
|
|
|
|
|
print("\n" + "="*70) |
|
|
print("📄 步骤 2/5: 文本分块处理") |
|
|
print("="*70) |
|
|
|
|
|
chunks = processor.chunk_text(novel_text) |
|
|
print(f"✓ 文本已分为 {len(chunks)} 个块") |
|
|
print(f" 平均每块: {stats['total_length'] // len(chunks)} 字符") |
|
|
|
|
|
|
|
|
print("\n" + "="*70) |
|
|
print("👥 步骤 3/5: 提取主要角色") |
|
|
print("="*70) |
|
|
|
|
|
extractor = CharacterExtractor() |
|
|
characters = extractor.extract_main_characters( |
|
|
chunks, |
|
|
text_sample=novel_text[:3000], |
|
|
language=stats['language'] |
|
|
) |
|
|
|
|
|
if not characters: |
|
|
print("❌ 未能提取到角色,程序退出") |
|
|
return |
|
|
|
|
|
|
|
|
print("\n" + "="*70) |
|
|
print("🎯 步骤 4/5: 选择要对话的角色") |
|
|
print("="*70) |
|
|
|
|
|
selected = select_character_interactive(characters) |
|
|
character_name = selected['name'] |
|
|
character_info = selected['info'] |
|
|
|
|
|
print(f"\n✓ 已选择: {character_name}") |
|
|
print(f" 出现次数: {character_info['count']}") |
|
|
print(f" 分布章节: {len(character_info['chunks'])}") |
|
|
|
|
|
|
|
|
print(f"\n" + "="*70) |
|
|
print(f"🧠 步骤 5/5: 分析角色性格") |
|
|
print("="*70) |
|
|
print(f"正在深度分析 {character_name} 的性格特征...") |
|
|
print("这可能需要几分钟,请耐心等待...\n") |
|
|
|
|
|
analyzer = CharacterAnalyzer() |
|
|
|
|
|
|
|
|
representative_chunks = analyzer.select_representative_chunks( |
|
|
chunks, |
|
|
character_info['chunks'] |
|
|
) |
|
|
|
|
|
print(f"✓ 选取了 {len(representative_chunks)} 个代表性片段进行分析") |
|
|
|
|
|
|
|
|
character_profile = analyzer.analyze_character_batch( |
|
|
character_name, |
|
|
representative_chunks |
|
|
) |
|
|
|
|
|
|
|
|
character_profile = analyzer.enhance_profile_with_examples( |
|
|
character_profile, |
|
|
chunks, |
|
|
character_info['chunks'] |
|
|
) |
|
|
|
|
|
print(f"✓ 角色分析完成!") |
|
|
|
|
|
|
|
|
print("\n" + "="*70) |
|
|
print("🤖 创建对话代理") |
|
|
print("="*70) |
|
|
|
|
|
use_memory = input("\n是否启用记忆系统?(y/n, 默认: y): ").strip().lower() |
|
|
|
|
|
if use_memory in ['', 'y', 'yes', '是']: |
|
|
print("正在初始化记忆系统...") |
|
|
agent = CharacterAgent( |
|
|
character_profile, |
|
|
chunks=chunks, |
|
|
character_chunks=character_info['chunks'] |
|
|
) |
|
|
print("✓ Agent 创建成功,记忆系统已初始化") |
|
|
else: |
|
|
agent = CharacterAgent(character_profile) |
|
|
print("✓ Agent 创建成功(未启用记忆系统)") |
|
|
|
|
|
|
|
|
interactive_chat(agent) |
|
|
|
|
|
|
|
|
print("\n" + "="*70) |
|
|
print("感谢使用小说角色 Agent 系统!") |
|
|
print("="*70) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
try: |
|
|
main() |
|
|
except KeyboardInterrupt: |
|
|
print("\n\n👋 程序已被用户中断") |
|
|
except Exception as e: |
|
|
print(f"\n❌ 程序错误: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |