deepllm / app /deepclaude /deepclaude.py
mistpe's picture
Upload 13 files
a2cbf8a verified
"""DeepClaude 服务,用于协调 DeepSeek 和 Claude API 的调用"""
import json
import time
import asyncio
from typing import AsyncGenerator
from app.utils.logger import logger
from app.clients import DeepSeekClient, ClaudeClient
class DeepClaude:
"""处理 DeepSeek 和 Claude API 的流式输出衔接"""
def __init__(self, deepseek_api_key: str, claude_api_key: str,
deepseek_api_url: str = "https://api.deepseek.com/v1/chat/completions",
claude_api_url: str = "https://api.anthropic.com/v1/messages",
claude_provider: str = "anthropic",
is_origin_reasoning: bool = True):
"""初始化 API 客户端
Args:
deepseek_api_key: DeepSeek API密钥
claude_api_key: Claude API密钥
"""
self.deepseek_client = DeepSeekClient(deepseek_api_key, deepseek_api_url)
self.claude_client = ClaudeClient(claude_api_key, claude_api_url, claude_provider)
self.is_origin_reasoning = is_origin_reasoning
async def chat_completions_with_stream(self, messages: list,
deepseek_model: str = "deepseek-reasoner",
claude_model: str = "claude-3-5-sonnet-20241022") -> AsyncGenerator[bytes, None]:
"""处理完整的流式输出过程
Args:
messages: 初始消息列表
deepseek_model: DeepSeek 模型名称
claude_model: Claude 模型名称
Yields:
字节流数据,格式如下:
{
"id": "chatcmpl-xxx",
"object": "chat.completion.chunk",
"created": timestamp,
"model": model_name,
"choices": [{
"index": 0,
"delta": {
"role": "assistant",
"reasoning_content": reasoning_content,
"content": content
}
}]
}
"""
# 生成唯一的会话ID和时间戳
chat_id = f"chatcmpl-{hex(int(time.time() * 1000))[2:]}"
created_time = int(time.time())
# 创建队列,用于收集输出数据
output_queue = asyncio.Queue()
# 队列,用于传递 DeepSeek 推理内容给 Claude
claude_queue = asyncio.Queue()
# 用于存储 DeepSeek 的推理累积内容
reasoning_content = []
async def process_deepseek():
logger.info(f"开始处理 DeepSeek 流,使用模型:{deepseek_model}")
try:
async for content_type, content in self.deepseek_client.stream_chat(messages, deepseek_model, self.is_origin_reasoning):
if content_type == "reasoning":
reasoning_content.append(content)
response = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created_time,
"model": deepseek_model,
"choices": [{
"index": 0,
"delta": {
"role": "assistant",
"reasoning_content": content,
"content": ""
}
}]
}
await output_queue.put(f"data: {json.dumps(response)}\n\n".encode('utf-8'))
elif content_type == "content":
# 当收到 content 类型时,将完整的推理内容发送到 claude_queue,并结束 DeepSeek 流处理
logger.info(f"DeepSeek 推理完成,收集到的推理内容长度:{len(''.join(reasoning_content))}")
await claude_queue.put("".join(reasoning_content))
break
except Exception as e:
logger.error(f"处理 DeepSeek 流时发生错误: {e}")
await claude_queue.put("")
# 用 None 标记 DeepSeek 任务结束
logger.info("DeepSeek 任务处理完成,标记结束")
await output_queue.put(None)
async def process_claude():
try:
logger.info("等待获取 DeepSeek 的推理内容...")
reasoning = await claude_queue.get()
logger.debug(f"获取到推理内容,内容长度:{len(reasoning) if reasoning else 0}")
if not reasoning:
logger.warning("未能获取到有效的推理内容,将使用默认提示继续")
reasoning = "获取推理内容失败"
# 构造 Claude 的输入消息
claude_messages = messages.copy()
claude_messages.append({
"role": "assistant",
"content": f"Here's my reasoning process:\n{reasoning}\n\nBased on this reasoning, I will now provide my response:"
})
# 处理可能 messages 内存在 role = system 的情况,如果有,则去掉当前这一条的消息对象
claude_messages = [message for message in claude_messages if message.get("role", "") != "system"]
async for content_type, content in self.claude_client.stream_chat(claude_messages, claude_model):
if content_type == "answer":
response = {
"id": chat_id,
"object": "chat.completion.chunk",
"created": created_time,
"model": claude_model,
"choices": [{
"index": 0,
"delta": {
"role": "assistant",
"content": content
}
}]
}
await output_queue.put(f"data: {json.dumps(response)}\n\n".encode('utf-8'))
except Exception as e:
logger.error(f"处理 Claude 流时发生错误: {e}")
# 用 None 标记 Claude 任务结束
await output_queue.put(None)
# 创建并发任务
deepseek_task = asyncio.create_task(process_deepseek())
claude_task = asyncio.create_task(process_claude())
# 等待两个任务完成,通过计数判断
finished_tasks = 0
while finished_tasks < 2:
item = await output_queue.get()
if item is None:
finished_tasks += 1
else:
yield item
# 发送结束标记
yield b'data: [DONE]\n\n'