| | import base64 |
| | import json |
| | import random |
| | import string |
| | import time |
| | import uuid |
| | from abc import ABC, abstractmethod |
| | from typing import Any, Dict, List, Optional |
| |
|
| | from app.config.config import settings |
| | from app.log.logger import get_openai_logger |
| | from app.utils.helpers import is_image_upload_configured |
| | from app.utils.uploader import ImageUploaderFactory |
| |
|
| | logger = get_openai_logger() |
| |
|
| |
|
| | class ResponseHandler(ABC): |
| | """响应处理器基类""" |
| |
|
| | @abstractmethod |
| | def handle_response( |
| | self, response: Dict[str, Any], model: str, stream: bool = False |
| | ) -> Dict[str, Any]: |
| | pass |
| |
|
| |
|
| | class GeminiResponseHandler(ResponseHandler): |
| | """Gemini响应处理器""" |
| |
|
| | def __init__(self): |
| | self.thinking_first = True |
| | self.thinking_status = False |
| |
|
| | def handle_response( |
| | self, |
| | response: Dict[str, Any], |
| | model: str, |
| | stream: bool = False, |
| | usage_metadata: Optional[Dict[str, Any]] = None, |
| | ) -> Dict[str, Any]: |
| | if stream: |
| | return _handle_gemini_stream_response(response, model, stream) |
| | return _handle_gemini_normal_response(response, model, stream) |
| |
|
| |
|
| | def _handle_openai_stream_response( |
| | response: Dict[str, Any], |
| | model: str, |
| | finish_reason: str, |
| | usage_metadata: Optional[Dict[str, Any]], |
| | ) -> Dict[str, Any]: |
| | choices = [] |
| | candidates = response.get("candidates", []) |
| |
|
| | for candidate in candidates: |
| | index = candidate.get("index", 0) |
| | text, reasoning_content, tool_calls, _ = _extract_result( |
| | {"candidates": [candidate]}, model, stream=True, gemini_format=False |
| | ) |
| |
|
| | if not text and not tool_calls and not reasoning_content: |
| | delta = {} |
| | else: |
| | delta = { |
| | "content": text, |
| | "reasoning_content": reasoning_content, |
| | "role": "assistant", |
| | } |
| | if tool_calls: |
| | delta["tool_calls"] = tool_calls |
| |
|
| | choice = {"index": index, "delta": delta, "finish_reason": finish_reason} |
| | choices.append(choice) |
| |
|
| | template_chunk = { |
| | "id": f"chatcmpl-{uuid.uuid4()}", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": model, |
| | "choices": choices, |
| | } |
| | if usage_metadata: |
| | template_chunk["usage"] = { |
| | "prompt_tokens": usage_metadata.get("promptTokenCount", 0), |
| | "completion_tokens": usage_metadata.get("candidatesTokenCount", 0), |
| | "total_tokens": usage_metadata.get("totalTokenCount", 0), |
| | } |
| | return template_chunk |
| |
|
| |
|
| | def _handle_openai_normal_response( |
| | response: Dict[str, Any], |
| | model: str, |
| | finish_reason: str, |
| | usage_metadata: Optional[Dict[str, Any]], |
| | ) -> Dict[str, Any]: |
| | choices = [] |
| | candidates = response.get("candidates", []) |
| |
|
| | for i, candidate in enumerate(candidates): |
| | text, reasoning_content, tool_calls, _ = _extract_result( |
| | {"candidates": [candidate]}, model, stream=False, gemini_format=False |
| | ) |
| | choice = { |
| | "index": i, |
| | "message": { |
| | "role": "assistant", |
| | "content": text, |
| | "reasoning_content": reasoning_content, |
| | "tool_calls": tool_calls, |
| | }, |
| | "finish_reason": finish_reason, |
| | } |
| | choices.append(choice) |
| |
|
| | return { |
| | "id": f"chatcmpl-{uuid.uuid4()}", |
| | "object": "chat.completion", |
| | "created": int(time.time()), |
| | "model": model, |
| | "choices": choices, |
| | "usage": { |
| | "prompt_tokens": usage_metadata.get("promptTokenCount", 0), |
| | "completion_tokens": usage_metadata.get("candidatesTokenCount", 0), |
| | "total_tokens": usage_metadata.get("totalTokenCount", 0), |
| | }, |
| | } |
| |
|
| |
|
| | class OpenAIResponseHandler(ResponseHandler): |
| | """OpenAI响应处理器""" |
| |
|
| | def __init__(self, config): |
| | self.config = config |
| | self.thinking_first = True |
| | self.thinking_status = False |
| |
|
| | def handle_response( |
| | self, |
| | response: Dict[str, Any], |
| | model: str, |
| | stream: bool = False, |
| | finish_reason: str = None, |
| | usage_metadata: Optional[Dict[str, Any]] = None, |
| | ) -> Optional[Dict[str, Any]]: |
| | if stream: |
| | return _handle_openai_stream_response( |
| | response, model, finish_reason, usage_metadata |
| | ) |
| | return _handle_openai_normal_response( |
| | response, model, finish_reason, usage_metadata |
| | ) |
| |
|
| | def handle_image_chat_response( |
| | self, image_str: str, model: str, stream=False, finish_reason="stop" |
| | ): |
| | if stream: |
| | return _handle_openai_stream_image_response(image_str, model, finish_reason) |
| | return _handle_openai_normal_image_response(image_str, model, finish_reason) |
| |
|
| |
|
| | def _handle_openai_stream_image_response( |
| | image_str: str, model: str, finish_reason: str |
| | ) -> Dict[str, Any]: |
| | return { |
| | "id": f"chatcmpl-{uuid.uuid4()}", |
| | "object": "chat.completion.chunk", |
| | "created": int(time.time()), |
| | "model": model, |
| | "choices": [ |
| | { |
| | "index": 0, |
| | "delta": {"content": image_str} if image_str else {}, |
| | "finish_reason": finish_reason, |
| | } |
| | ], |
| | } |
| |
|
| |
|
| | def _handle_openai_normal_image_response( |
| | image_str: str, model: str, finish_reason: str |
| | ) -> Dict[str, Any]: |
| | return { |
| | "id": f"chatcmpl-{uuid.uuid4()}", |
| | "object": "chat.completion", |
| | "created": int(time.time()), |
| | "model": model, |
| | "choices": [ |
| | { |
| | "index": 0, |
| | "message": {"role": "assistant", "content": image_str}, |
| | "finish_reason": finish_reason, |
| | } |
| | ], |
| | "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}, |
| | } |
| |
|
| |
|
| | def _extract_result( |
| | response: Dict[str, Any], |
| | model: str, |
| | stream: bool = False, |
| | gemini_format: bool = False, |
| | ) -> tuple[str, Optional[str], List[Dict[str, Any]], Optional[bool]]: |
| | text, reasoning_content, tool_calls, thought = "", "", [], None |
| |
|
| | if stream: |
| | if response.get("candidates"): |
| | candidate = response["candidates"][0] |
| | content = candidate.get("content", {}) |
| | parts = content.get("parts", []) |
| | if not parts: |
| | logger.warning("No parts found in stream response") |
| | return "", None, [], None |
| |
|
| | if "text" in parts[0]: |
| | text = parts[0].get("text") |
| | if "thought" in parts[0]: |
| | if not gemini_format and settings.SHOW_THINKING_PROCESS: |
| | reasoning_content = text |
| | text = "" |
| | thought = parts[0].get("thought") |
| | elif "executableCode" in parts[0]: |
| | text = _format_code_block(parts[0]["executableCode"]) |
| | elif "codeExecution" in parts[0]: |
| | text = _format_code_block(parts[0]["codeExecution"]) |
| | elif "executableCodeResult" in parts[0]: |
| | text = _format_execution_result(parts[0]["executableCodeResult"]) |
| | elif "codeExecutionResult" in parts[0]: |
| | text = _format_execution_result(parts[0]["codeExecutionResult"]) |
| | elif "inlineData" in parts[0]: |
| | text = _extract_image_data(parts[0]) |
| | else: |
| | text = "" |
| | text = _add_search_link_text(model, candidate, text) |
| | tool_calls = _extract_tool_calls(parts, gemini_format) |
| | else: |
| | if response.get("candidates"): |
| | candidate = response["candidates"][0] |
| | text, reasoning_content = "", "" |
| |
|
| | |
| | content = candidate.get("content", {}) |
| |
|
| | if content and isinstance(content, dict): |
| | parts = content.get("parts", []) |
| |
|
| | if parts: |
| | for part in parts: |
| | if "text" in part: |
| | if "thought" in part and settings.SHOW_THINKING_PROCESS: |
| | reasoning_content += part["text"] |
| | else: |
| | text += part["text"] |
| | if "thought" in part and thought is None: |
| | thought = part.get("thought") |
| | elif "inlineData" in part: |
| | text += _extract_image_data(part) |
| | else: |
| | logger.warning(f"No parts found in content for model: {model}") |
| | else: |
| | logger.error(f"Invalid content structure for model: {model}") |
| |
|
| | text = _add_search_link_text(model, candidate, text) |
| |
|
| | |
| | parts = candidate.get("content", {}).get("parts", []) |
| | tool_calls = _extract_tool_calls(parts, gemini_format) |
| | else: |
| | logger.warning(f"No candidates found in response for model: {model}") |
| | text = "暂无返回" |
| |
|
| | return text, reasoning_content, tool_calls, thought |
| |
|
| |
|
| | def _has_inline_image_part(response: Dict[str, Any]) -> bool: |
| | try: |
| | for c in response.get("candidates", []): |
| | for p in c.get("content", {}).get("parts", []): |
| | if isinstance(p, dict) and ("inlineData" in p): |
| | return True |
| | except Exception: |
| | return False |
| | return False |
| |
|
| |
|
| | def _extract_image_data(part: dict) -> str: |
| | image_uploader = None |
| | if settings.UPLOAD_PROVIDER == "smms": |
| | image_uploader = ImageUploaderFactory.create( |
| | provider=settings.UPLOAD_PROVIDER, api_key=settings.SMMS_SECRET_TOKEN |
| | ) |
| | elif settings.UPLOAD_PROVIDER == "picgo": |
| | image_uploader = ImageUploaderFactory.create( |
| | provider=settings.UPLOAD_PROVIDER, |
| | api_key=settings.PICGO_API_KEY, |
| | api_url=settings.PICGO_API_URL |
| | ) |
| | elif settings.UPLOAD_PROVIDER == "cloudflare_imgbed": |
| | image_uploader = ImageUploaderFactory.create( |
| | provider=settings.UPLOAD_PROVIDER, |
| | base_url=settings.CLOUDFLARE_IMGBED_URL, |
| | auth_code=settings.CLOUDFLARE_IMGBED_AUTH_CODE, |
| | upload_folder=settings.CLOUDFLARE_IMGBED_UPLOAD_FOLDER, |
| | ) |
| | current_date = time.strftime("%Y/%m/%d") |
| | filename = f"{current_date}/{uuid.uuid4().hex[:8]}.png" |
| | base64_data = part["inlineData"]["data"] |
| | mime_type = part["inlineData"]["mimeType"] |
| | |
| | |
| | if not is_image_upload_configured(settings): |
| | return f"\n\n\n\n" |
| | bytes_data = base64.b64decode(base64_data) |
| | upload_response = image_uploader.upload(bytes_data, filename) |
| | if upload_response.success: |
| | text = f"\n\n\n\n" |
| | else: |
| | text = f"\n\n\n\n" |
| | return text |
| |
|
| |
|
| | def _extract_tool_calls( |
| | parts: List[Dict[str, Any]], gemini_format: bool |
| | ) -> List[Dict[str, Any]]: |
| | """提取工具调用信息""" |
| | if not parts or not isinstance(parts, list): |
| | return [] |
| |
|
| | letters = string.ascii_lowercase + string.digits |
| | tool_calls = list() |
| |
|
| | for i in range(len(parts)): |
| | part = parts[i] |
| | if not part or not isinstance(part, dict): |
| | continue |
| |
|
| | item = part.get("functionCall", {}) |
| | if not item or not isinstance(item, dict): |
| | continue |
| |
|
| | if gemini_format: |
| | tool_calls.append(part) |
| | else: |
| | id = f"call_{''.join(random.sample(letters, 32))}" |
| | name = item.get("name", "") |
| | arguments = json.dumps(item.get("args", None) or {}) |
| |
|
| | tool_calls.append( |
| | { |
| | "index": i, |
| | "id": id, |
| | "type": "function", |
| | "function": {"name": name, "arguments": arguments}, |
| | } |
| | ) |
| |
|
| | return tool_calls |
| |
|
| |
|
| | def _handle_gemini_stream_response( |
| | response: Dict[str, Any], model: str, stream: bool |
| | ) -> Dict[str, Any]: |
| | |
| | if not is_image_upload_configured(settings) and _has_inline_image_part(response): |
| | return response |
| |
|
| | text, reasoning_content, tool_calls, thought = _extract_result( |
| | response, model, stream=stream, gemini_format=True |
| | ) |
| | if tool_calls: |
| | content = {"parts": tool_calls, "role": "model"} |
| | else: |
| | part = {"text": text} |
| | if thought is not None: |
| | part["thought"] = thought |
| | content = {"parts": [part], "role": "model"} |
| | response["candidates"][0]["content"] = content |
| | return response |
| |
|
| |
|
| | def _handle_gemini_normal_response( |
| | response: Dict[str, Any], model: str, stream: bool |
| | ) -> Dict[str, Any]: |
| | |
| | if not is_image_upload_configured(settings) and _has_inline_image_part(response): |
| | return response |
| |
|
| | text, reasoning_content, tool_calls, thought = _extract_result( |
| | response, model, stream=stream, gemini_format=True |
| | ) |
| | parts = [] |
| | if tool_calls: |
| | parts = tool_calls |
| | else: |
| | if thought is not None: |
| | parts.append({"text": reasoning_content, "thought": thought}) |
| | part = {"text": text} |
| | parts.append(part) |
| | content = {"parts": parts, "role": "model"} |
| | response["candidates"][0]["content"] = content |
| | return response |
| |
|
| |
|
| | def _format_code_block(code_data: dict) -> str: |
| | """格式化代码块输出""" |
| | language = code_data.get("language", "").lower() |
| | code = code_data.get("code", "").strip() |
| | return f"""\n\n---\n\n【代码执行】\n```{language}\n{code}\n```\n""" |
| |
|
| |
|
| | def _add_search_link_text(model: str, candidate: dict, text: str) -> str: |
| | if ( |
| | settings.SHOW_SEARCH_LINK |
| | and model.endswith("-search") |
| | and "groundingMetadata" in candidate |
| | and "groundingChunks" in candidate["groundingMetadata"] |
| | ): |
| | grounding_chunks = candidate["groundingMetadata"]["groundingChunks"] |
| | text += "\n\n---\n\n" |
| | text += "**【引用来源】**\n\n" |
| | for _, grounding_chunk in enumerate(grounding_chunks, 1): |
| | if "web" in grounding_chunk: |
| | text += _create_search_link(grounding_chunk["web"]) |
| | return text |
| | else: |
| | return text |
| |
|
| |
|
| | def _create_search_link(grounding_chunk: dict) -> str: |
| | return f'\n- [{grounding_chunk["title"]}]({grounding_chunk["uri"]})' |
| |
|
| |
|
| | def _format_execution_result(result_data: dict) -> str: |
| | """格式化执行结果输出""" |
| | outcome = result_data.get("outcome", "") |
| | output = result_data.get("output", "").strip() |
| | return f"""\n【执行结果】\n> outcome: {outcome}\n\n【输出结果】\n```plaintext\n{output}\n```\n\n---\n\n""" |
| |
|