agent / backend /backend_app /consumer.py
Toadied's picture
2312
8b383ad verified
Raw
History Blame Contribute Delete
2.39 kB
import json
import time
import logging
from redis_config import get_redis, QUEUE_RAG_QDRANT, QUEUE_RAG_NEO4J, QUEUE_MEMORY
# 你自己的 RAGTool
from helloAgents.tools.builtin.rag_tool import RAGTool
from helloAgents.tools.builtin.memory_tool import MemoryTool
logger = logging.getLogger(__name__)
# ====================== 【全局单例:只创建一次】 ======================
# 全局只初始化1次,所有任务共用
_RAG_TOOL = None
_MEMORY_TOOL = None
def get_rag_tool():
"""获取RAG工具单例"""
global _RAG_TOOL
if _RAG_TOOL is None:
_RAG_TOOL = RAGTool()
return _RAG_TOOL
def get_memory_tool():
"""获取记忆工具单例"""
global _MEMORY_TOOL
if _MEMORY_TOOL is None:
_MEMORY_TOOL = MemoryTool()
return _MEMORY_TOOL
# ======================================================================
def run_consumer(queue_name):
# 每个进程独立创建 Redis 连接(关键!多进程必须这样)
redis = get_redis()
logger.info("🚀 消费者启动,监听队列:%s", queue_name)
while True:
try:
# 从队列左侧取任务(非阻塞)
msg = redis.lpop(queue_name)
if not msg:
time.sleep(0.2)
continue
# JSON 解析任务
try:
task = json.loads(msg)
except json.JSONDecodeError:
logger.error("❌ 任务格式不是合法 JSON")
continue
# 执行文档处理
run(queue_name, task)
logger.info("✅ 处理完成")
except Exception as e:
logger.exception("❌ 消费任务时发生异常")
time.sleep(1)
def run(queue_name, task):
logger.info(f"📥 处理 {queue_name} 相关任务")
# ====================== 【使用单例,不重复创建】 ======================
if queue_name == QUEUE_RAG_QDRANT:
rag_tool = get_rag_tool() # 拿单例,不新建
rag_tool.run(task)
elif queue_name == QUEUE_RAG_NEO4J:
rag_tool = get_rag_tool() # 拿单例,不新建
rag_tool.run(task)
elif queue_name == QUEUE_MEMORY:
memory_tool = get_memory_tool() # 拿单例,不新建
memory_tool.run(task)