MiroFish / backend /app /api /graph.py
Codex Deploy
Defer heavy imports for Space startup
dc2650a
"""
图谱相关API路由
采用项目上下文机制,服务端持久化状态
"""
import os
import traceback
import threading
from flask import request, jsonify
from . import graph_bp
from ..config import Config
from ..utils.file_parser import FileParser
from ..utils.logger import get_logger
from ..models.task import TaskManager, TaskStatus
from ..models.project import ProjectManager, ProjectStatus
# 获取日志器
logger = get_logger('mirofish.api')
class _LazyObjectProxy:
def __init__(self, loader):
self._loader = loader
def _target(self):
return self._loader()
def __call__(self, *args, **kwargs):
return self._target()(*args, **kwargs)
def __getattr__(self, name):
return getattr(self._target(), name)
def _load_ontology_generator():
from ..services.ontology_generator import OntologyGenerator
return OntologyGenerator
def _load_graph_builder_service():
from ..services.graph_builder import GraphBuilderService
return GraphBuilderService
def _load_text_processor():
from ..services.text_processor import TextProcessor
return TextProcessor
OntologyGenerator = _LazyObjectProxy(_load_ontology_generator)
GraphBuilderService = _LazyObjectProxy(_load_graph_builder_service)
TextProcessor = _LazyObjectProxy(_load_text_processor)
def allowed_file(filename: str) -> bool:
"""检查文件扩展名是否允许"""
if not filename or '.' not in filename:
return False
ext = os.path.splitext(filename)[1].lower().lstrip('.')
return ext in Config.ALLOWED_EXTENSIONS
# ============== 项目管理接口 ==============
@graph_bp.route('/project/<project_id>', methods=['GET'])
def get_project(project_id: str):
"""
获取项目详情
"""
project = ProjectManager.get_project(project_id)
if not project:
return jsonify({
"success": False,
"error": f"项目不存在: {project_id}"
}), 404
return jsonify({
"success": True,
"data": project.to_dict()
})
@graph_bp.route('/project/list', methods=['GET'])
def list_projects():
"""
列出所有项目
"""
limit = request.args.get('limit', 50, type=int)
projects = ProjectManager.list_projects(limit=limit)
return jsonify({
"success": True,
"data": [p.to_dict() for p in projects],
"count": len(projects)
})
@graph_bp.route('/project/<project_id>', methods=['DELETE'])
def delete_project(project_id: str):
"""
删除项目
"""
success = ProjectManager.delete_project(project_id)
if not success:
return jsonify({
"success": False,
"error": f"项目不存在或删除失败: {project_id}"
}), 404
return jsonify({
"success": True,
"message": f"项目已删除: {project_id}"
})
@graph_bp.route('/project/<project_id>/reset', methods=['POST'])
def reset_project(project_id: str):
"""
重置项目状态(用于重新构建图谱)
"""
project = ProjectManager.get_project(project_id)
if not project:
return jsonify({
"success": False,
"error": f"项目不存在: {project_id}"
}), 404
# 重置到本体已生成状态
if project.ontology:
project.status = ProjectStatus.ONTOLOGY_GENERATED
else:
project.status = ProjectStatus.CREATED
project.graph_id = None
project.graph_build_task_id = None
project.error = None
ProjectManager.save_project(project)
return jsonify({
"success": True,
"message": f"项目已重置: {project_id}",
"data": project.to_dict()
})
# ============== 接口1:上传文件并生成本体 ==============
@graph_bp.route('/ontology/generate', methods=['POST'])
def generate_ontology():
"""
接口1:上传文件,分析生成本体定义
请求方式:multipart/form-data
参数:
files: 上传的文件(PDF/MD/TXT),可多个
simulation_requirement: 模拟需求描述(必填)
project_name: 项目名称(可选)
additional_context: 额外说明(可选)
返回:
{
"success": true,
"data": {
"project_id": "proj_xxxx",
"ontology": {
"entity_types": [...],
"edge_types": [...],
"analysis_summary": "..."
},
"files": [...],
"total_text_length": 12345
}
}
"""
try:
logger.info("=== 开始生成本体定义 ===")
# 获取参数
simulation_requirement = request.form.get('simulation_requirement', '')
project_name = request.form.get('project_name', 'Unnamed Project')
additional_context = request.form.get('additional_context', '')
logger.debug(f"项目名称: {project_name}")
logger.debug(f"模拟需求: {simulation_requirement[:100]}...")
if not simulation_requirement:
return jsonify({
"success": False,
"error": "请提供模拟需求描述 (simulation_requirement)"
}), 400
# 获取上传的文件
uploaded_files = request.files.getlist('files')
if not uploaded_files or all(not f.filename for f in uploaded_files):
return jsonify({
"success": False,
"error": "请至少上传一个文档文件"
}), 400
# 创建项目
project = ProjectManager.create_project(name=project_name)
project.simulation_requirement = simulation_requirement
logger.info(f"创建项目: {project.project_id}")
# 保存文件并提取文本
document_texts = []
all_text = ""
for file in uploaded_files:
if file and file.filename and allowed_file(file.filename):
# 保存文件到项目目录
file_info = ProjectManager.save_file_to_project(
project.project_id,
file,
file.filename
)
project.files.append({
"filename": file_info["original_filename"],
"size": file_info["size"]
})
# 提取文本
text = FileParser.extract_text(file_info["path"])
text = TextProcessor.preprocess_text(text)
document_texts.append(text)
all_text += f"\n\n=== {file_info['original_filename']} ===\n{text}"
if not document_texts:
ProjectManager.delete_project(project.project_id)
return jsonify({
"success": False,
"error": "没有成功处理任何文档,请检查文件格式"
}), 400
# 保存提取的文本
project.total_text_length = len(all_text)
ProjectManager.save_extracted_text(project.project_id, all_text)
logger.info(f"文本提取完成,共 {len(all_text)} 字符")
# 生成本体
logger.info("调用 LLM 生成本体定义...")
generator = OntologyGenerator()
ontology = generator.generate(
document_texts=document_texts,
simulation_requirement=simulation_requirement,
additional_context=additional_context if additional_context else None
)
# 保存本体到项目
entity_count = len(ontology.get("entity_types", []))
edge_count = len(ontology.get("edge_types", []))
logger.info(f"本体生成完成: {entity_count} 个实体类型, {edge_count} 个关系类型")
project.ontology = {
"entity_types": ontology.get("entity_types", []),
"edge_types": ontology.get("edge_types", [])
}
project.analysis_summary = ontology.get("analysis_summary", "")
project.status = ProjectStatus.ONTOLOGY_GENERATED
ProjectManager.save_project(project)
logger.info(f"=== 本体生成完成 === 项目ID: {project.project_id}")
return jsonify({
"success": True,
"data": {
"project_id": project.project_id,
"project_name": project.name,
"ontology": project.ontology,
"analysis_summary": project.analysis_summary,
"files": project.files,
"total_text_length": project.total_text_length
}
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== 接口2:构建图谱 ==============
@graph_bp.route('/build', methods=['POST'])
def build_graph():
"""
接口2:根据project_id构建图谱
请求(JSON):
{
"project_id": "proj_xxxx", // 必填,来自接口1
"graph_name": "图谱名称", // 可选
"chunk_size": 500, // 可选,默认500
"chunk_overlap": 50 // 可选,默认50
}
返回:
{
"success": true,
"data": {
"project_id": "proj_xxxx",
"task_id": "task_xxxx",
"message": "图谱构建任务已启动"
}
}
"""
try:
logger.info("=== 开始构建图谱 ===")
# 检查配置
errors = []
if not Config.ZEP_API_KEY:
errors.append("ZEP_API_KEY未配置")
if errors:
logger.error(f"配置错误: {errors}")
return jsonify({
"success": False,
"error": "配置错误: " + "; ".join(errors)
}), 500
# 解析请求
data = request.get_json() or {}
project_id = data.get('project_id')
logger.debug(f"请求参数: project_id={project_id}")
if not project_id:
return jsonify({
"success": False,
"error": "请提供 project_id"
}), 400
# 获取项目
project = ProjectManager.get_project(project_id)
if not project:
return jsonify({
"success": False,
"error": f"项目不存在: {project_id}"
}), 404
# 检查项目状态
force = data.get('force', False) # 强制重新构建
if project.status == ProjectStatus.CREATED:
return jsonify({
"success": False,
"error": "项目尚未生成本体,请先调用 /ontology/generate"
}), 400
if project.status == ProjectStatus.GRAPH_BUILDING and not force:
return jsonify({
"success": False,
"error": "图谱正在构建中,请勿重复提交。如需强制重建,请添加 force: true",
"task_id": project.graph_build_task_id
}), 400
# 如果强制重建,重置状态
if force and project.status in [ProjectStatus.GRAPH_BUILDING, ProjectStatus.FAILED, ProjectStatus.GRAPH_COMPLETED]:
project.status = ProjectStatus.ONTOLOGY_GENERATED
project.graph_id = None
project.graph_build_task_id = None
project.error = None
# 获取配置
graph_name = data.get('graph_name', project.name or 'MiroFish Graph')
chunk_size = data.get('chunk_size', project.chunk_size or Config.DEFAULT_CHUNK_SIZE)
chunk_overlap = data.get('chunk_overlap', project.chunk_overlap or Config.DEFAULT_CHUNK_OVERLAP)
# 更新项目配置
project.chunk_size = chunk_size
project.chunk_overlap = chunk_overlap
# 获取提取的文本
text = ProjectManager.get_extracted_text(project_id)
if not text:
return jsonify({
"success": False,
"error": "未找到提取的文本内容"
}), 400
# 获取本体
ontology = project.ontology
if not ontology:
return jsonify({
"success": False,
"error": "未找到本体定义"
}), 400
# 创建异步任务
task_manager = TaskManager()
task_id = task_manager.create_task(f"构建图谱: {graph_name}")
logger.info(f"创建图谱构建任务: task_id={task_id}, project_id={project_id}")
# 更新项目状态
project.status = ProjectStatus.GRAPH_BUILDING
project.graph_build_task_id = task_id
ProjectManager.save_project(project)
# 启动后台任务
def build_task():
build_logger = get_logger('mirofish.build')
try:
build_logger.info(f"[{task_id}] 开始构建图谱...")
task_manager.update_task(
task_id,
status=TaskStatus.PROCESSING,
message="初始化图谱构建服务..."
)
# 创建图谱构建服务
builder = GraphBuilderService(api_key=Config.ZEP_API_KEY)
# 分块
task_manager.update_task(
task_id,
message="文本分块中...",
progress=5
)
chunks = TextProcessor.split_text(
text,
chunk_size=chunk_size,
overlap=chunk_overlap
)
total_chunks = len(chunks)
# 创建图谱
task_manager.update_task(
task_id,
message="创建Zep图谱...",
progress=10
)
graph_id = builder.create_graph(name=graph_name)
# 更新项目的graph_id
project.graph_id = graph_id
ProjectManager.save_project(project)
# 设置本体
task_manager.update_task(
task_id,
message="设置本体定义...",
progress=15
)
builder.set_ontology(graph_id, ontology)
# 添加文本(progress_callback 签名是 (msg, progress_ratio))
def add_progress_callback(msg, progress_ratio):
progress = 15 + int(progress_ratio * 40) # 15% - 55%
task_manager.update_task(
task_id,
message=msg,
progress=progress
)
task_manager.update_task(
task_id,
message=f"开始添加 {total_chunks} 个文本块...",
progress=15
)
episode_uuids = builder.add_text_batches(
graph_id,
chunks,
batch_size=3,
progress_callback=add_progress_callback
)
# 等待Zep处理完成(查询每个episode的processed状态)
task_manager.update_task(
task_id,
message="等待Zep处理数据...",
progress=55
)
def wait_progress_callback(msg, progress_ratio):
progress = 55 + int(progress_ratio * 35) # 55% - 90%
task_manager.update_task(
task_id,
message=msg,
progress=progress
)
builder._wait_for_episodes(episode_uuids, wait_progress_callback)
# 获取图谱数据
task_manager.update_task(
task_id,
message="获取图谱数据...",
progress=95
)
graph_data = builder.get_graph_data(graph_id)
# 更新项目状态
project.status = ProjectStatus.GRAPH_COMPLETED
ProjectManager.save_project(project)
node_count = graph_data.get("node_count", 0)
edge_count = graph_data.get("edge_count", 0)
build_logger.info(f"[{task_id}] 图谱构建完成: graph_id={graph_id}, 节点={node_count}, 边={edge_count}")
# 完成
task_manager.update_task(
task_id,
status=TaskStatus.COMPLETED,
message="图谱构建完成",
progress=100,
result={
"project_id": project_id,
"graph_id": graph_id,
"node_count": node_count,
"edge_count": edge_count,
"chunk_count": total_chunks
}
)
except Exception as e:
# 更新项目状态为失败
build_logger.error(f"[{task_id}] 图谱构建失败: {str(e)}")
build_logger.debug(traceback.format_exc())
project.status = ProjectStatus.FAILED
project.error = str(e)
ProjectManager.save_project(project)
task_manager.update_task(
task_id,
status=TaskStatus.FAILED,
message=f"构建失败: {str(e)}",
error=traceback.format_exc()
)
# 启动后台线程
thread = threading.Thread(target=build_task, daemon=True)
thread.start()
return jsonify({
"success": True,
"data": {
"project_id": project_id,
"task_id": task_id,
"message": "图谱构建任务已启动,请通过 /task/{task_id} 查询进度"
}
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
# ============== 任务查询接口 ==============
@graph_bp.route('/task/<task_id>', methods=['GET'])
def get_task(task_id: str):
"""
查询任务状态
"""
task = TaskManager().get_task(task_id)
if not task:
return jsonify({
"success": False,
"error": f"任务不存在: {task_id}"
}), 404
return jsonify({
"success": True,
"data": task.to_dict()
})
@graph_bp.route('/tasks', methods=['GET'])
def list_tasks():
"""
列出所有任务
"""
tasks = TaskManager().list_tasks()
return jsonify({
"success": True,
"data": [t.to_dict() for t in tasks],
"count": len(tasks)
})
# ============== 图谱数据接口 ==============
@graph_bp.route('/data/<graph_id>', methods=['GET'])
def get_graph_data(graph_id: str):
"""
获取图谱数据(节点和边)
"""
try:
if not Config.ZEP_API_KEY:
return jsonify({
"success": False,
"error": "ZEP_API_KEY未配置"
}), 500
builder = GraphBuilderService(api_key=Config.ZEP_API_KEY)
graph_data = builder.get_graph_data(graph_id)
return jsonify({
"success": True,
"data": graph_data
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500
@graph_bp.route('/delete/<graph_id>', methods=['DELETE'])
def delete_graph(graph_id: str):
"""
删除Zep图谱
"""
try:
if not Config.ZEP_API_KEY:
return jsonify({
"success": False,
"error": "ZEP_API_KEY未配置"
}), 500
builder = GraphBuilderService(api_key=Config.ZEP_API_KEY)
builder.delete_graph(graph_id)
return jsonify({
"success": True,
"message": f"图谱已删除: {graph_id}"
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}), 500