| """DeepClaude 服务,用于协调 DeepSeek 和 Claude API 的调用""" |
| import json |
| import time |
| import tiktoken |
| import asyncio |
| from typing import AsyncGenerator |
| from app.utils.logger import logger |
| from app.clients import DeepSeekClient, ClaudeClient |
|
|
|
|
| class DeepClaude: |
| """处理 DeepSeek 和 Claude API 的流式输出衔接""" |
|
|
| def __init__(self, deepseek_api_key: str, claude_api_key: str, |
| deepseek_api_url: str = "https://api.deepseek.com/v1/chat/completions", |
| claude_api_url: str = "https://api.anthropic.com/v1/messages", |
| claude_provider: str = "anthropic", |
| is_origin_reasoning: bool = True): |
| """初始化 API 客户端 |
| |
| Args: |
| deepseek_api_key: DeepSeek API密钥 |
| claude_api_key: Claude API密钥 |
| """ |
| self.deepseek_client = DeepSeekClient(deepseek_api_key, deepseek_api_url) |
| self.claude_client = ClaudeClient(claude_api_key, claude_api_url, claude_provider) |
| self.is_origin_reasoning = is_origin_reasoning |
|
|
| async def chat_completions_with_stream( |
| self, |
| messages: list, |
| model_arg: tuple[float, float, float, float], |
| deepseek_model: str = "deepseek-reasoner", |
| claude_model: str = "claude-3-5-sonnet-20241022" |
| ) -> AsyncGenerator[bytes, None]: |
| """处理完整的流式输出过程 |
| |
| Args: |
| messages: 初始消息列表 |
| model_arg: 模型参数 |
| deepseek_model: DeepSeek 模型名称 |
| claude_model: Claude 模型名称 |
| |
| Yields: |
| 字节流数据,格式如下: |
| { |
| "id": "chatcmpl-xxx", |
| "object": "chat.completion.chunk", |
| "created": timestamp, |
| "model": model_name, |
| "choices": [{ |
| "index": 0, |
| "delta": { |
| "role": "assistant", |
| "reasoning_content": reasoning_content, |
| "content": content |
| } |
| }] |
| } |
| """ |
| |
| chat_id = f"chatcmpl-{hex(int(time.time() * 1000))[2:]}" |
| created_time = int(time.time()) |
|
|
| |
| output_queue = asyncio.Queue() |
| |
| claude_queue = asyncio.Queue() |
|
|
| |
| reasoning_content = [] |
|
|
| async def process_deepseek(): |
| logger.info(f"开始处理 DeepSeek 流,使用模型:{deepseek_model}, 提供商: {self.deepseek_client.provider}") |
| try: |
| async for content_type, content in self.deepseek_client.stream_chat(messages, deepseek_model, self.is_origin_reasoning): |
| if content_type == "reasoning": |
| reasoning_content.append(content) |
| response = { |
| "id": chat_id, |
| "object": "chat.completion.chunk", |
| "created": created_time, |
| "model": deepseek_model, |
| "choices": [{ |
| "index": 0, |
| "delta": { |
| "role": "assistant", |
| "reasoning_content": content, |
| "content": "" |
| } |
| }] |
| } |
| await output_queue.put(f"data: {json.dumps(response)}\n\n".encode('utf-8')) |
| elif content_type == "content": |
| |
| logger.info(f"DeepSeek 推理完成,收集到的推理内容长度:{len(''.join(reasoning_content))}") |
| await claude_queue.put("".join(reasoning_content)) |
| break |
| except Exception as e: |
| logger.error(f"处理 DeepSeek 流时发生错误: {e}") |
| await claude_queue.put("") |
| |
| logger.info("DeepSeek 任务处理完成,标记结束") |
| await output_queue.put(None) |
|
|
| async def process_claude(): |
| try: |
| logger.info("等待获取 DeepSeek 的推理内容...") |
| reasoning = await claude_queue.get() |
| logger.debug(f"获取到推理内容,内容长度:{len(reasoning) if reasoning else 0}") |
| if not reasoning: |
| logger.warning("未能获取到有效的推理内容,将使用默认提示继续") |
| reasoning = "获取推理内容失败" |
| |
| claude_messages = messages.copy() |
| combined_content = f""" |
| Here's my another model's reasoning process:\n{reasoning}\n\n |
| Based on this reasoning, provide your response directly to me:""" |
| |
| |
| last_message = claude_messages[-1] |
| if last_message.get("role", "") == "user": |
| original_content = last_message["content"] |
| fixed_content = f"Here's my original input:\n{original_content}\n\n{combined_content}" |
| last_message["content"] = fixed_content |
| |
| claude_messages = [message for message in claude_messages if message.get("role", "") != "system"] |
|
|
| logger.info(f"开始处理 Claude 流,使用模型: {claude_model}, 提供商: {self.claude_client.provider}") |
|
|
| async for content_type, content in self.claude_client.stream_chat( |
| messages=claude_messages, |
| model_arg=model_arg, |
| model=claude_model, |
| ): |
| if content_type == "answer": |
| response = { |
| "id": chat_id, |
| "object": "chat.completion.chunk", |
| "created": created_time, |
| "model": claude_model, |
| "choices": [{ |
| "index": 0, |
| "delta": { |
| "role": "assistant", |
| "content": content |
| } |
| }] |
| } |
| await output_queue.put(f"data: {json.dumps(response)}\n\n".encode('utf-8')) |
| except Exception as e: |
| logger.error(f"处理 Claude 流时发生错误: {e}") |
| |
| logger.info("Claude 任务处理完成,标记结束") |
| await output_queue.put(None) |
| |
| |
| deepseek_task = asyncio.create_task(process_deepseek()) |
| claude_task = asyncio.create_task(process_claude()) |
| |
| |
| finished_tasks = 0 |
| while finished_tasks < 2: |
| item = await output_queue.get() |
| if item is None: |
| finished_tasks += 1 |
| else: |
| yield item |
| |
| |
| yield b'data: [DONE]\n\n' |
|
|
| async def chat_completions_without_stream( |
| self, |
| messages: list, |
| model_arg: tuple[float, float, float, float], |
| deepseek_model: str = "deepseek-reasoner", |
| claude_model: str = "claude-3-5-sonnet-20241022" |
| ) -> dict: |
| """处理非流式输出过程 |
| |
| Args: |
| messages: 初始消息列表 |
| model_arg: 模型参数 |
| deepseek_model: DeepSeek 模型名称 |
| claude_model: Claude 模型名称 |
| |
| Returns: |
| dict: OpenAI 格式的完整响应 |
| """ |
| chat_id = f"chatcmpl-{hex(int(time.time() * 1000))[2:]}" |
| created_time = int(time.time()) |
| reasoning_content = [] |
|
|
| |
| try: |
| async for content_type, content in self.deepseek_client.stream_chat(messages, deepseek_model, self.is_origin_reasoning): |
| if content_type == "reasoning": |
| reasoning_content.append(content) |
| elif content_type == "content": |
| break |
| except Exception as e: |
| logger.error(f"获取 DeepSeek 推理内容时发生错误: {e}") |
| reasoning_content = ["获取推理内容失败"] |
|
|
| |
| reasoning = "".join(reasoning_content) |
| claude_messages = messages.copy() |
|
|
| combined_content = f""" |
| Here's my another model's reasoning process:\n{reasoning}\n\n |
| Based on this reasoning, provide your response directly to me:""" |
| |
| |
| last_message = claude_messages[-1] |
| if last_message.get("role", "") == "user": |
| original_content = last_message["content"] |
| fixed_content = f"Here's my original input:\n{original_content}\n\n{combined_content}" |
| last_message["content"] = fixed_content |
|
|
| |
| claude_messages = [message for message in claude_messages if message.get("role", "") != "system"] |
|
|
| |
| token_content = "\n".join([message.get("content", "") for message in claude_messages]) |
| encoding = tiktoken.encoding_for_model("gpt-4o") |
| input_tokens = encoding.encode(token_content) |
| logger.debug(f"输入 Tokens: {len(input_tokens)}") |
|
|
| logger.debug("claude messages: " + str(claude_messages)) |
| |
| try: |
| answer = "" |
| async for content_type, content in self.claude_client.stream_chat( |
| messages=claude_messages, |
| model_arg=model_arg, |
| model=claude_model, |
| stream=False |
| ): |
| if content_type == "answer": |
| answer += content |
| output_tokens = encoding.encode(answer) |
| logger.debug(f"输出 Tokens: {len(output_tokens)}") |
|
|
| |
| return { |
| "id": chat_id, |
| "object": "chat.completion", |
| "created": created_time, |
| "model": claude_model, |
| "choices": [{ |
| "index": 0, |
| "message": { |
| "role": "assistant", |
| "content": answer, |
| "reasoning_content": reasoning |
| }, |
| "finish_reason": "stop" |
| }], |
| "usage": { |
| "prompt_tokens": len(input_tokens), |
| "completion_tokens": len(output_tokens), |
| "total_tokens": len(input_tokens + output_tokens) |
| } |
| } |
| except Exception as e: |
| logger.error(f"获取 Claude 响应时发生错误: {e}") |
| raise e |