svjack's picture
Update app.py
6b8d05a verified
#!/usr/bin/env python3
"""
对话历史查看器 - 独立Web应用
用于展示导出的对话历史JSON文件
"""
import os
import json
import glob
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi import Request
# 配置
BASE_DIR = Path(__file__).parent
FILES_DIR = BASE_DIR / "data" / "files" # 多媒体文件目录
HIST_DIR = BASE_DIR / "hist" # 对话历史目录
STATIC_DIR = BASE_DIR / "static"
TEMPLATES_DIR = BASE_DIR / "templates"
# 确保目录存在
for dir_path in [FILES_DIR, HIST_DIR, STATIC_DIR, TEMPLATES_DIR]:
dir_path.mkdir(parents=True, exist_ok=True)
# 创建FastAPI应用
app = FastAPI(
title="对话历史查看器",
description="查看导出的对话历史",
version="1.0.0"
)
# 挂载静态文件
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
# 模板引擎
templates = Jinja2Templates(directory=TEMPLATES_DIR)
# 自定义 Jinja2 过滤器:格式化 JSON(支持中文显示)
def json_format(value, indent=2):
"""将 Python 对象格式化为 JSON 字符串,支持中文显示"""
if isinstance(value, str):
try:
value = json.loads(value)
except:
pass
return json.dumps(value, indent=indent, ensure_ascii=False)
# 注册自定义过滤器
templates.env.filters['json_format'] = json_format
# 工具函数
def get_conversation_files() -> List[Path]:
"""获取所有对话历史文件"""
return sorted(
HIST_DIR.glob("*.json"),
key=lambda x: x.stat().st_mtime,
reverse=True
)
def parse_conversation_file(filepath: Path) -> Optional[Dict[str, Any]]:
"""解析对话历史文件"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
# 提取基本信息
metadata = data.get("metadata", {})
conversation = data.get("conversation", [])
# 统计消息类型
user_messages = [m for m in conversation if m.get("role") == "user"]
ai_messages = [m for m in conversation if m.get("role") == "ai"]
tool_messages = [m for m in conversation if m.get("role") == "tool"]
# 获取第一条用户消息作为预览
preview = ""
for msg in conversation:
if msg.get("role") == "user" and msg.get("content"):
preview = msg.get("content", "")[:100]
if len(msg.get("content", "")) > 100:
preview += "..."
break
return {
"filename": filepath.name,
"filepath": str(filepath),
"metadata": metadata,
"stats": {
"total_messages": len(conversation),
"user_messages": len(user_messages),
"ai_messages": len(ai_messages),
"tool_messages": len(tool_messages),
},
"preview": preview,
"export_time": metadata.get("export_time", ""),
"size_kb": round(filepath.stat().st_size / 1024, 2)
}
except Exception as e:
print(f"Error parsing {filepath}: {e}")
return None
def get_conversation_detail(filename: str) -> Optional[Dict[str, Any]]:
"""获取对话详情"""
filepath = HIST_DIR / filename
if not filepath.exists():
return None
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
# 处理消息,添加显示信息
conversation = data.get("conversation", [])
processed_messages = []
for msg in conversation:
processed_msg = msg.copy()
# 添加显示类名
role = msg.get("role", "")
msg_type = msg.get("type", "message")
if role == "user":
processed_msg["css_class"] = "user-message"
processed_msg["container_class"] = "user-message-container"
processed_msg["bubble_class"] = "user-bubble"
processed_msg["icon"] = "👤"
processed_msg["label"] = "用户"
elif role == "ai":
if msg_type == "step":
processed_msg["css_class"] = "ai-step"
processed_msg["container_class"] = "ai-step-container"
processed_msg["bubble_class"] = "ai-step-bubble"
processed_msg["icon"] = "⚙️"
processed_msg["label"] = "工具调用"
else:
processed_msg["css_class"] = "ai-message"
processed_msg["container_class"] = "ai-message-container"
processed_msg["bubble_class"] = "ai-bubble"
processed_msg["icon"] = "🤖"
processed_msg["label"] = "AI助手"
elif role == "tool":
processed_msg["css_class"] = "tool-message"
processed_msg["container_class"] = "tool-message-container"
processed_msg["bubble_class"] = "tool-bubble"
processed_msg["icon"] = "🔧"
processed_msg["label"] = "工具响应"
else:
processed_msg["css_class"] = "system-message"
processed_msg["container_class"] = "system-message-container"
processed_msg["bubble_class"] = "system-bubble"
processed_msg["icon"] = "⚙️"
processed_msg["label"] = "系统"
# 处理时间显示
created_at = msg.get("created_at", "")
if created_at:
try:
dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
processed_msg["time_display"] = dt.strftime("%Y-%m-%d %H:%M:%S")
except:
processed_msg["time_display"] = created_at
else:
processed_msg["time_display"] = ""
# 处理附件
elements = msg.get("elements", [])
if elements:
processed_msg["has_media"] = True
processed_msg["media_elements"] = []
for elem in elements:
elem_type = elem.get("type", "")
elem_url = elem.get("url", "")
# 提取文件ID:从URL中提取文件名部分
# 支持格式:http://localhost:9004/files/filename.mp4 或直接文件名
file_id = ""
if elem_url:
if elem_url.startswith(('http://', 'https://')):
# 从URL中提取文件名
import urllib.parse
parsed_url = urllib.parse.urlparse(elem_url)
file_id = parsed_url.path.split('/')[-1]
else:
# 直接就是文件名
file_id = elem_url.split('/')[-1]
media_info = {
"type": elem_type,
"url": elem_url,
"file_id": file_id,
"name": elem.get("name", ""),
"size": elem.get("size", "small")
}
# 根据类型设置图标和显示方式
if elem_type == "image":
media_info["icon"] = "🖼️"
media_info["display_type"] = "image"
media_info["local_url"] = f"/files/{file_id}" if file_id else ""
elif elem_type == "video":
media_info["icon"] = "🎬"
media_info["display_type"] = "video"
media_info["local_url"] = f"/files/{file_id}" if file_id else ""
elif elem_type == "audio":
media_info["icon"] = "🎵"
media_info["display_type"] = "audio"
media_info["local_url"] = f"/files/{file_id}" if file_id else ""
elif elem_type == "file":
media_info["icon"] = "📎"
media_info["display_type"] = "file"
media_info["local_url"] = f"/files/{file_id}" if file_id else ""
else:
media_info["icon"] = "📄"
media_info["display_type"] = "file"
media_info["local_url"] = f"/files/{file_id}" if file_id else ""
processed_msg["media_elements"].append(media_info)
else:
processed_msg["has_media"] = False
processed_messages.append(processed_msg)
data["conversation"] = processed_messages
return data
except Exception as e:
print(f"Error loading conversation detail {filename}: {e}")
return None
# 路由定义
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
"""首页 - 显示对话列表"""
conversation_files = get_conversation_files()
conversations = []
for filepath in conversation_files:
conv_info = parse_conversation_file(filepath)
if conv_info:
conversations.append(conv_info)
return templates.TemplateResponse(
"index.html",
{
"request": request,
"conversations": conversations,
"total_conversations": len(conversations)
}
)
@app.get("/conversation/{filename}", response_class=HTMLResponse)
async def view_conversation(request: Request, filename: str):
"""查看特定对话"""
# 安全检查,防止路径遍历
if ".." in filename or "/" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
conversation_data = get_conversation_detail(filename)
if not conversation_data:
raise HTTPException(status_code=404, detail="Conversation not found")
return templates.TemplateResponse(
"conversation.html",
{
"request": request,
"conversation": conversation_data,
"filename": filename
}
)
@app.get("/files/{file_id}")
async def get_file(file_id: str):
"""获取多媒体文件"""
# 安全检查
if ".." in file_id or "/" in file_id:
raise HTTPException(status_code=400, detail="Invalid file ID")
filepath = FILES_DIR / file_id
if not filepath.exists():
raise HTTPException(status_code=404, detail="File not found")
# 根据文件扩展名设置Content-Type
ext = filepath.suffix.lower()
media_types = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.mp4': 'video/mp4',
'.webm': 'video/webm',
'.mp3': 'audio/mpeg',
'.wav': 'audio/wav',
'.ogg': 'audio/ogg',
'.pdf': 'application/pdf',
'.txt': 'text/plain',
'.json': 'application/json',
}
media_type = media_types.get(ext, 'application/octet-stream')
return FileResponse(
filepath,
media_type=media_type,
filename=file_id
)
# API接口
@app.get("/api/conversations")
async def api_conversations():
"""API: 获取对话列表"""
conversation_files = get_conversation_files()
conversations = []
for filepath in conversation_files:
conv_info = parse_conversation_file(filepath)
if conv_info:
conversations.append(conv_info)
return JSONResponse({
"success": True,
"data": conversations,
"count": len(conversations)
})
@app.get("/api/conversation/{filename}")
async def api_conversation_detail(filename: str):
"""API: 获取对话详情"""
# 安全检查
if ".." in filename or "/" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
conversation_data = get_conversation_detail(filename)
if not conversation_data:
raise HTTPException(status_code=404, detail="Conversation not found")
return JSONResponse({
"success": True,
"data": conversation_data
})
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
if __name__ == "__main__":
import uvicorn
import argparse
# 解析命令行参数
parser = argparse.ArgumentParser(description="对话历史查看器")
parser.add_argument("--port", type=int, default=9051, help="服务端口号 (默认: 9051)")
parser.add_argument("--host", type=str, default="0.0.0.0", help="服务主机地址 (默认: 0.0.0.0)")
args = parser.parse_args()
# 确保端口大于9050
if args.port <= 9050:
#print(f"[警告] 端口 {args.port} 小于等于9050,自动调整为9051")
#args.port = 9051
pass
print("=" * 60)
print("对话历史查看器")
print("=" * 60)
print(f"文件目录: {FILES_DIR}")
print(f"历史目录: {HIST_DIR}")
print(f"对话文件数: {len(get_conversation_files())}")
print("=" * 60)
print(f"启动服务: http://localhost:{args.port}")
print(f" http://{args.host}:{args.port}")
print("=" * 60)
uvicorn.run(
"app:app",
host=args.host,
port=args.port,
reload=True
)