File size: 12,377 Bytes
a226682 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
import os
import sys
from tqdm import tqdm
from core.text_processor import TextProcessor
from core.character_extractor import CharacterExtractor
from core.character_analyzer import CharacterAnalyzer
from core.character_agent import CharacterAgent
from utils.text_utils import TextUtils
from utils.cache_manager import CacheManager
from config import Config
def print_banner():
"""打印欢迎横幅"""
banner = """
╔══════════════════════════════════════════════════════════════════╗
║ ║
║ 🎭 小说角色 Agent 系统 (大规模文本版) ║
║ ║
║ 基于 AI 的角色性格分析与对话系统 ║
║ ║
╚══════════════════════════════════════════════════════════════════╝
"""
print(banner)
def load_novel(file_path: str) -> str:
"""加载小说文本"""
try:
# 尝试不同的编码
encodings = ['utf-8', 'gbk', 'gb2312', 'latin-1', 'utf-16']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
text = f.read()
print(f"✓ 成功使用 {encoding} 编码加载文件")
return text
except UnicodeDecodeError:
continue
print("✗ 所有编码尝试失败")
return ""
except FileNotFoundError:
print(f"✗ 文件不存在: {file_path}")
return ""
except Exception as e:
print(f"✗ 加载小说失败: {e}")
return ""
def display_statistics(stats: dict):
"""显示文本统计信息"""
print("\n" + "="*70)
print("📊 文本统计信息")
print("="*70)
print(f"总字符数: {stats['total_length']:,}")
print(f"Token 数量: {stats['total_tokens']:,}")
print(f"段落数: {stats['paragraphs']:,}")
print(f"句子数: {stats['sentences']:,}")
lang_map = {'zh': '中文', 'en': '英文', 'mixed': '中英混合', 'unknown': '未知'}
print(f"检测语言: {lang_map.get(stats['language'], stats['language'])}")
# 估算阅读时间
text_utils = TextUtils()
reading_time = text_utils.estimate_reading_time(" " * stats['total_length'])
print(f"预计阅读: 约 {reading_time} 分钟")
print("="*70)
def select_character_interactive(characters: list) -> dict:
"""交互式选择角色"""
print("\n" + "="*70)
print("📋 检测到的主要角色")
print("="*70)
print(f"{'序号':<6}{'角色名':<25}{'出现次数':<12}{'分布章节':<12}")
print("-"*70)
for i, char in enumerate(characters[:15], 1):
name = char['name']
count = char['info']['count']
chunks = len(char['info']['chunks'])
print(f"{i:<6}{name:<25}{count:<12}{chunks:<12}")
print("="*70)
while True:
try:
choice = input(f"\n请选择角色编号 (1-{min(15, len(characters))}): ").strip()
if choice.isdigit():
idx = int(choice) - 1
if 0 <= idx < len(characters):
return characters[idx]
print("❌ 无效选择,请重试")
except KeyboardInterrupt:
print("\n\n👋 程序已退出")
sys.exit(0)
except:
print("❌ 输入错误,请重试")
def interactive_chat(agent: CharacterAgent):
"""交互式对话界面"""
print("\n" + "="*70)
print(agent.get_character_info())
print("\n💬 对话开始!")
print("-"*70)
print("💡 提示:")
print(" • 输入 'quit' 或 'exit' - 退出对话")
print(" • 输入 'reset' - 重置对话历史")
print(" • 输入 'save' - 保存对话")
print(" • 输入 'info' - 查看角色信息")
print(" • 输入 'help' - 显示帮助")
print("="*70 + "\n")
char_name = agent.character_profile['name']
while True:
try:
# 用户输入
user_input = input("🧑 你: ").strip()
if not user_input:
continue
# 处理命令
if user_input.lower() in ['quit', 'exit', '退出', 'q']:
print(f"\n👋 {char_name}: 再见,很高兴和你聊天!")
# 询问是否保存对话
if len(agent.conversation_history) > 0:
save = input("\n是否保存对话记录?(y/n): ").strip().lower()
if save in ['y', 'yes', '是']:
filename = f"conversation_{char_name}_{len(agent.conversation_history)}.json"
agent.save_conversation(filename)
break
if user_input.lower() in ['reset', '重置']:
agent.reset_conversation()
continue
if user_input.lower() in ['save', '保存']:
filename = f"conversation_{char_name}_{len(agent.conversation_history)}.json"
agent.save_conversation(filename)
continue
if user_input.lower() in ['info', '信息']:
print(agent.get_character_info())
continue
if user_input.lower() in ['help', '帮助']:
print("\n可用命令:")
print(" quit/exit - 退出对话")
print(" reset - 重置对话历史")
print(" save - 保存对话")
print(" info - 查看角色信息")
print(" help - 显示此帮助\n")
continue
# 正常对话
print(f"\n{'⏳ ' + char_name + ' 正在思考...':<70}", end='\r')
response = agent.chat(user_input)
print(" " * 70, end='\r') # 清除"思考中"
print(f"🎭 {char_name}: {response}\n")
except KeyboardInterrupt:
print(f"\n\n👋 {char_name}: 再见!")
break
except Exception as e:
print(f"\n❌ 错误: {e}\n")
def check_environment():
"""检查运行环境"""
issues = []
# 检查 API Key
if not Config.OPENAI_API_KEY or Config.OPENAI_API_KEY == "":
issues.append("未设置 OPENAI_API_KEY")
# 检查缓存目录
if not os.path.exists(Config.CACHE_DIR):
try:
os.makedirs(Config.CACHE_DIR)
except:
issues.append(f"无法创建缓存目录: {Config.CACHE_DIR}")
# 检查必要的包
try:
import openai
import chromadb
import tiktoken
except ImportError as e:
issues.append(f"缺少必要的包: {e}")
if issues:
print("\n⚠️ 环境检查发现问题:")
for issue in issues:
print(f" • {issue}")
print("\n请检查配置文件 .env 和依赖安装\n")
return False
return True
def main():
"""主函数 - 完整流程"""
# 打印横幅
print_banner()
# 检查环境
if not check_environment():
return
# 显示缓存信息
cache = CacheManager()
cache_info = cache.get_cache_info()
if cache_info['count'] > 0:
print(f"📦 缓存: {cache_info['count']} 个文件, {cache_info['size_mb']} MB")
# 1. 加载小说
print("\n" + "="*70)
print("📖 步骤 1/5: 加载小说")
print("="*70)
default_path = "sample_novels/harry_potter_sample.txt"
novel_path = input(f"\n请输入小说文件路径 (默认: {default_path})\n> ").strip()
if not novel_path:
novel_path = default_path
if not os.path.exists(novel_path):
print(f"❌ 文件不存在: {novel_path}")
# 尝试在 sample_novels 目录下查找
alt_path = os.path.join("sample_novels", os.path.basename(novel_path))
if os.path.exists(alt_path):
print(f"✓ 找到文件: {alt_path}")
novel_path = alt_path
else:
print("程序退出")
return
print(f"\n正在加载: {novel_path}")
novel_text = load_novel(novel_path)
if not novel_text:
print("❌ 无法加载小说,程序退出")
return
# 显示统计信息
processor = TextProcessor()
stats = processor.get_statistics(novel_text)
display_statistics(stats)
# 检查文本长度
if stats['total_length'] < 1000:
print("⚠️ 警告: 文本过短 (< 1000字符),可能影响分析效果")
proceed = input("是否继续?(y/n): ").strip().lower()
if proceed not in ['y', 'yes', '是']:
return
# 2. 文本分块
print("\n" + "="*70)
print("📄 步骤 2/5: 文本分块处理")
print("="*70)
chunks = processor.chunk_text(novel_text)
print(f"✓ 文本已分为 {len(chunks)} 个块")
print(f" 平均每块: {stats['total_length'] // len(chunks)} 字符")
# 3. 提取角色
print("\n" + "="*70)
print("👥 步骤 3/5: 提取主要角色")
print("="*70)
extractor = CharacterExtractor()
characters = extractor.extract_main_characters(
chunks,
text_sample=novel_text[:3000],
language=stats['language']
)
if not characters:
print("❌ 未能提取到角色,程序退出")
return
# 4. 选择角色
print("\n" + "="*70)
print("🎯 步骤 4/5: 选择要对话的角色")
print("="*70)
selected = select_character_interactive(characters)
character_name = selected['name']
character_info = selected['info']
print(f"\n✓ 已选择: {character_name}")
print(f" 出现次数: {character_info['count']}")
print(f" 分布章节: {len(character_info['chunks'])}")
# 5. 分析角色
print(f"\n" + "="*70)
print(f"🧠 步骤 5/5: 分析角色性格")
print("="*70)
print(f"正在深度分析 {character_name} 的性格特征...")
print("这可能需要几分钟,请耐心等待...\n")
analyzer = CharacterAnalyzer()
# 选择代表性文本块
representative_chunks = analyzer.select_representative_chunks(
chunks,
character_info['chunks']
)
print(f"✓ 选取了 {len(representative_chunks)} 个代表性片段进行分析")
# 执行分析
character_profile = analyzer.analyze_character_batch(
character_name,
representative_chunks
)
# 增强配置
character_profile = analyzer.enhance_profile_with_examples(
character_profile,
chunks,
character_info['chunks']
)
print(f"✓ 角色分析完成!")
# 6. 创建对话代理
print("\n" + "="*70)
print("🤖 创建对话代理")
print("="*70)
use_memory = input("\n是否启用记忆系统?(y/n, 默认: y): ").strip().lower()
if use_memory in ['', 'y', 'yes', '是']:
print("正在初始化记忆系统...")
agent = CharacterAgent(
character_profile,
chunks=chunks,
character_chunks=character_info['chunks']
)
print("✓ Agent 创建成功,记忆系统已初始化")
else:
agent = CharacterAgent(character_profile)
print("✓ Agent 创建成功(未启用记忆系统)")
# 7. 开始对话
interactive_chat(agent)
# 结束
print("\n" + "="*70)
print("感谢使用小说角色 Agent 系统!")
print("="*70)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n👋 程序已被用户中断")
except Exception as e:
print(f"\n❌ 程序错误: {e}")
import traceback
traceback.print_exc() |