FictionAgent / main.py
gdwind's picture
Upload folder using huggingface_hub
a226682 verified
import os
import sys
from tqdm import tqdm
from core.text_processor import TextProcessor
from core.character_extractor import CharacterExtractor
from core.character_analyzer import CharacterAnalyzer
from core.character_agent import CharacterAgent
from utils.text_utils import TextUtils
from utils.cache_manager import CacheManager
from config import Config
def print_banner():
"""打印欢迎横幅"""
banner = """
╔══════════════════════════════════════════════════════════════════╗
║ ║
║ 🎭 小说角色 Agent 系统 (大规模文本版) ║
║ ║
║ 基于 AI 的角色性格分析与对话系统 ║
║ ║
╚══════════════════════════════════════════════════════════════════╝
"""
print(banner)
def load_novel(file_path: str) -> str:
"""加载小说文本"""
try:
# 尝试不同的编码
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1', 'utf-16']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
text = f.read()
print(f"✓ 成功使用 {encoding} 编码加载文件")
return text
except UnicodeDecodeError:
continue
print("✗ 所有编码尝试失败")
return ""
except FileNotFoundError:
print(f"✗ 文件不存在: {file_path}")
return ""
except Exception as e:
print(f"✗ 加载小说失败: {e}")
return ""
def display_statistics(stats: dict):
"""显示文本统计信息"""
print("\n" + "="*70)
print("📊 文本统计信息")
print("="*70)
print(f"总字符数: {stats['total_length']:,}")
print(f"Token 数量: {stats['total_tokens']:,}")
print(f"段落数: {stats['paragraphs']:,}")
print(f"句子数: {stats['sentences']:,}")
lang_map = {'zh': '中文', 'en': '英文', 'mixed': '中英混合', 'unknown': '未知'}
print(f"检测语言: {lang_map.get(stats['language'], stats['language'])}")
# 估算阅读时间
text_utils = TextUtils()
reading_time = text_utils.estimate_reading_time(" " * stats['total_length'])
print(f"预计阅读: 约 {reading_time} 分钟")
print("="*70)
def select_character_interactive(characters: list) -> dict:
"""交互式选择角色"""
print("\n" + "="*70)
print("📋 检测到的主要角色")
print("="*70)
print(f"{'序号':<6}{'角色名':<25}{'出现次数':<12}{'分布章节':<12}")
print("-"*70)
for i, char in enumerate(characters[:15], 1):
name = char['name']
count = char['info']['count']
chunks = len(char['info']['chunks'])
print(f"{i:<6}{name:<25}{count:<12}{chunks:<12}")
print("="*70)
while True:
try:
choice = input(f"\n请选择角色编号 (1-{min(15, len(characters))}): ").strip()
if choice.isdigit():
idx = int(choice) - 1
if 0 <= idx < len(characters):
return characters[idx]
print("❌ 无效选择,请重试")
except KeyboardInterrupt:
print("\n\n👋 程序已退出")
sys.exit(0)
except:
print("❌ 输入错误,请重试")
def interactive_chat(agent: CharacterAgent):
"""交互式对话界面"""
print("\n" + "="*70)
print(agent.get_character_info())
print("\n💬 对话开始!")
print("-"*70)
print("💡 提示:")
print(" • 输入 'quit' 或 'exit' - 退出对话")
print(" • 输入 'reset' - 重置对话历史")
print(" • 输入 'save' - 保存对话")
print(" • 输入 'info' - 查看角色信息")
print(" • 输入 'help' - 显示帮助")
print("="*70 + "\n")
char_name = agent.character_profile['name']
while True:
try:
# 用户输入
user_input = input("🧑 你: ").strip()
if not user_input:
continue
# 处理命令
if user_input.lower() in ['quit', 'exit', '退出', 'q']:
print(f"\n👋 {char_name}: 再见,很高兴和你聊天!")
# 询问是否保存对话
if len(agent.conversation_history) > 0:
save = input("\n是否保存对话记录?(y/n): ").strip().lower()
if save in ['y', 'yes', '是']:
filename = f"conversation_{char_name}_{len(agent.conversation_history)}.json"
agent.save_conversation(filename)
break
if user_input.lower() in ['reset', '重置']:
agent.reset_conversation()
continue
if user_input.lower() in ['save', '保存']:
filename = f"conversation_{char_name}_{len(agent.conversation_history)}.json"
agent.save_conversation(filename)
continue
if user_input.lower() in ['info', '信息']:
print(agent.get_character_info())
continue
if user_input.lower() in ['help', '帮助']:
print("\n可用命令:")
print(" quit/exit - 退出对话")
print(" reset - 重置对话历史")
print(" save - 保存对话")
print(" info - 查看角色信息")
print(" help - 显示此帮助\n")
continue
# 正常对话
print(f"\n{'⏳ ' + char_name + ' 正在思考...':<70}", end='\r')
response = agent.chat(user_input)
print(" " * 70, end='\r') # 清除"思考中"
print(f"🎭 {char_name}: {response}\n")
except KeyboardInterrupt:
print(f"\n\n👋 {char_name}: 再见!")
break
except Exception as e:
print(f"\n❌ 错误: {e}\n")
def check_environment():
"""检查运行环境"""
issues = []
# 检查 API Key
if not Config.OPENAI_API_KEY or Config.OPENAI_API_KEY == "":
issues.append("未设置 OPENAI_API_KEY")
# 检查缓存目录
if not os.path.exists(Config.CACHE_DIR):
try:
os.makedirs(Config.CACHE_DIR)
except:
issues.append(f"无法创建缓存目录: {Config.CACHE_DIR}")
# 检查必要的包
try:
import openai
import chromadb
import tiktoken
except ImportError as e:
issues.append(f"缺少必要的包: {e}")
if issues:
print("\n⚠️ 环境检查发现问题:")
for issue in issues:
print(f" • {issue}")
print("\n请检查配置文件 .env 和依赖安装\n")
return False
return True
def main():
"""主函数 - 完整流程"""
# 打印横幅
print_banner()
# 检查环境
if not check_environment():
return
# 显示缓存信息
cache = CacheManager()
cache_info = cache.get_cache_info()
if cache_info['count'] > 0:
print(f"📦 缓存: {cache_info['count']} 个文件, {cache_info['size_mb']} MB")
# 1. 加载小说
print("\n" + "="*70)
print("📖 步骤 1/5: 加载小说")
print("="*70)
default_path = "sample_novels/harry_potter_sample.txt"
novel_path = input(f"\n请输入小说文件路径 (默认: {default_path})\n> ").strip()
if not novel_path:
novel_path = default_path
if not os.path.exists(novel_path):
print(f"❌ 文件不存在: {novel_path}")
# 尝试在 sample_novels 目录下查找
alt_path = os.path.join("sample_novels", os.path.basename(novel_path))
if os.path.exists(alt_path):
print(f"✓ 找到文件: {alt_path}")
novel_path = alt_path
else:
print("程序退出")
return
print(f"\n正在加载: {novel_path}")
novel_text = load_novel(novel_path)
if not novel_text:
print("❌ 无法加载小说,程序退出")
return
# 显示统计信息
processor = TextProcessor()
stats = processor.get_statistics(novel_text)
display_statistics(stats)
# 检查文本长度
if stats['total_length'] < 1000:
print("⚠️ 警告: 文本过短 (< 1000字符),可能影响分析效果")
proceed = input("是否继续?(y/n): ").strip().lower()
if proceed not in ['y', 'yes', '是']:
return
# 2. 文本分块
print("\n" + "="*70)
print("📄 步骤 2/5: 文本分块处理")
print("="*70)
chunks = processor.chunk_text(novel_text)
print(f"✓ 文本已分为 {len(chunks)} 个块")
print(f" 平均每块: {stats['total_length'] // len(chunks)} 字符")
# 3. 提取角色
print("\n" + "="*70)
print("👥 步骤 3/5: 提取主要角色")
print("="*70)
extractor = CharacterExtractor()
characters = extractor.extract_main_characters(
chunks,
text_sample=novel_text[:3000],
language=stats['language']
)
if not characters:
print("❌ 未能提取到角色,程序退出")
return
# 4. 选择角色
print("\n" + "="*70)
print("🎯 步骤 4/5: 选择要对话的角色")
print("="*70)
selected = select_character_interactive(characters)
character_name = selected['name']
character_info = selected['info']
print(f"\n✓ 已选择: {character_name}")
print(f" 出现次数: {character_info['count']}")
print(f" 分布章节: {len(character_info['chunks'])}")
# 5. 分析角色
print(f"\n" + "="*70)
print(f"🧠 步骤 5/5: 分析角色性格")
print("="*70)
print(f"正在深度分析 {character_name} 的性格特征...")
print("这可能需要几分钟,请耐心等待...\n")
analyzer = CharacterAnalyzer()
# 选择代表性文本块
representative_chunks = analyzer.select_representative_chunks(
chunks,
character_info['chunks']
)
print(f"✓ 选取了 {len(representative_chunks)} 个代表性片段进行分析")
# 执行分析
character_profile = analyzer.analyze_character_batch(
character_name,
representative_chunks
)
# 增强配置
character_profile = analyzer.enhance_profile_with_examples(
character_profile,
chunks,
character_info['chunks']
)
print(f"✓ 角色分析完成!")
# 6. 创建对话代理
print("\n" + "="*70)
print("🤖 创建对话代理")
print("="*70)
use_memory = input("\n是否启用记忆系统?(y/n, 默认: y): ").strip().lower()
if use_memory in ['', 'y', 'yes', '是']:
print("正在初始化记忆系统...")
agent = CharacterAgent(
character_profile,
chunks=chunks,
character_chunks=character_info['chunks']
)
print("✓ Agent 创建成功,记忆系统已初始化")
else:
agent = CharacterAgent(character_profile)
print("✓ Agent 创建成功(未启用记忆系统)")
# 7. 开始对话
interactive_chat(agent)
# 结束
print("\n" + "="*70)
print("感谢使用小说角色 Agent 系统!")
print("="*70)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n👋 程序已被用户中断")
except Exception as e:
print(f"\n❌ 程序错误: {e}")
import traceback
traceback.print_exc()