|
|
"""DeepClaude 服务,用于协调 DeepSeek 和 Claude API 的调用""" |
|
|
import json |
|
|
import time |
|
|
import asyncio |
|
|
from typing import AsyncGenerator |
|
|
from app.utils.logger import logger |
|
|
from app.clients import DeepSeekClient, ClaudeClient |
|
|
|
|
|
|
|
|
class DeepClaude: |
|
|
"""处理 DeepSeek 和 Claude API 的流式输出衔接""" |
|
|
|
|
|
def __init__(self, deepseek_api_key: str, claude_api_key: str, |
|
|
deepseek_api_url: str = "https://api.deepseek.com/v1/chat/completions", |
|
|
claude_api_url: str = "https://api.anthropic.com/v1/messages", |
|
|
claude_provider: str = "anthropic", |
|
|
is_origin_reasoning: bool = True): |
|
|
"""初始化 API 客户端 |
|
|
|
|
|
Args: |
|
|
deepseek_api_key: DeepSeek API密钥 |
|
|
claude_api_key: Claude API密钥 |
|
|
""" |
|
|
self.deepseek_client = DeepSeekClient(deepseek_api_key, deepseek_api_url) |
|
|
self.claude_client = ClaudeClient(claude_api_key, claude_api_url, claude_provider) |
|
|
self.is_origin_reasoning = is_origin_reasoning |
|
|
|
|
|
async def chat_completions_with_stream(self, messages: list, |
|
|
deepseek_model: str = "deepseek-reasoner", |
|
|
claude_model: str = "claude-3-5-sonnet-20241022") -> AsyncGenerator[bytes, None]: |
|
|
"""处理完整的流式输出过程 |
|
|
|
|
|
Args: |
|
|
messages: 初始消息列表 |
|
|
deepseek_model: DeepSeek 模型名称 |
|
|
claude_model: Claude 模型名称 |
|
|
|
|
|
Yields: |
|
|
字节流数据,格式如下: |
|
|
{ |
|
|
"id": "chatcmpl-xxx", |
|
|
"object": "chat.completion.chunk", |
|
|
"created": timestamp, |
|
|
"model": model_name, |
|
|
"choices": [{ |
|
|
"index": 0, |
|
|
"delta": { |
|
|
"role": "assistant", |
|
|
"reasoning_content": reasoning_content, |
|
|
"content": content |
|
|
} |
|
|
}] |
|
|
} |
|
|
""" |
|
|
|
|
|
chat_id = f"chatcmpl-{hex(int(time.time() * 1000))[2:]}" |
|
|
created_time = int(time.time()) |
|
|
|
|
|
|
|
|
output_queue = asyncio.Queue() |
|
|
|
|
|
claude_queue = asyncio.Queue() |
|
|
|
|
|
|
|
|
reasoning_content = [] |
|
|
|
|
|
async def process_deepseek(): |
|
|
logger.info(f"开始处理 DeepSeek 流,使用模型:{deepseek_model}") |
|
|
try: |
|
|
async for content_type, content in self.deepseek_client.stream_chat(messages, deepseek_model, self.is_origin_reasoning): |
|
|
if content_type == "reasoning": |
|
|
reasoning_content.append(content) |
|
|
response = { |
|
|
"id": chat_id, |
|
|
"object": "chat.completion.chunk", |
|
|
"created": created_time, |
|
|
"model": deepseek_model, |
|
|
"choices": [{ |
|
|
"index": 0, |
|
|
"delta": { |
|
|
"role": "assistant", |
|
|
"reasoning_content": content, |
|
|
"content": "" |
|
|
} |
|
|
}] |
|
|
} |
|
|
await output_queue.put(f"data: {json.dumps(response)}\n\n".encode('utf-8')) |
|
|
elif content_type == "content": |
|
|
|
|
|
logger.info(f"DeepSeek 推理完成,收集到的推理内容长度:{len(''.join(reasoning_content))}") |
|
|
await claude_queue.put("".join(reasoning_content)) |
|
|
break |
|
|
except Exception as e: |
|
|
logger.error(f"处理 DeepSeek 流时发生错误: {e}") |
|
|
await claude_queue.put("") |
|
|
|
|
|
logger.info("DeepSeek 任务处理完成,标记结束") |
|
|
await output_queue.put(None) |
|
|
|
|
|
async def process_claude(): |
|
|
try: |
|
|
logger.info("等待获取 DeepSeek 的推理内容...") |
|
|
reasoning = await claude_queue.get() |
|
|
logger.debug(f"获取到推理内容,内容长度:{len(reasoning) if reasoning else 0}") |
|
|
if not reasoning: |
|
|
logger.warning("未能获取到有效的推理内容,将使用默认提示继续") |
|
|
reasoning = "获取推理内容失败" |
|
|
|
|
|
claude_messages = messages.copy() |
|
|
claude_messages.append({ |
|
|
"role": "assistant", |
|
|
"content": f"Here's my reasoning process:\n{reasoning}\n\nBased on this reasoning, I will now provide my response:" |
|
|
}) |
|
|
|
|
|
claude_messages = [message for message in claude_messages if message.get("role", "") != "system"] |
|
|
|
|
|
async for content_type, content in self.claude_client.stream_chat(claude_messages, claude_model): |
|
|
if content_type == "answer": |
|
|
response = { |
|
|
"id": chat_id, |
|
|
"object": "chat.completion.chunk", |
|
|
"created": created_time, |
|
|
"model": claude_model, |
|
|
"choices": [{ |
|
|
"index": 0, |
|
|
"delta": { |
|
|
"role": "assistant", |
|
|
"content": content |
|
|
} |
|
|
}] |
|
|
} |
|
|
await output_queue.put(f"data: {json.dumps(response)}\n\n".encode('utf-8')) |
|
|
except Exception as e: |
|
|
logger.error(f"处理 Claude 流时发生错误: {e}") |
|
|
|
|
|
await output_queue.put(None) |
|
|
|
|
|
|
|
|
deepseek_task = asyncio.create_task(process_deepseek()) |
|
|
claude_task = asyncio.create_task(process_claude()) |
|
|
|
|
|
|
|
|
finished_tasks = 0 |
|
|
while finished_tasks < 2: |
|
|
item = await output_queue.get() |
|
|
if item is None: |
|
|
finished_tasks += 1 |
|
|
else: |
|
|
yield item |
|
|
|
|
|
|
|
|
yield b'data: [DONE]\n\n' |