| from typing import Any, Dict, List, Tuple |
| import json |
| from src.converter.utils import extract_content_and_reasoning |
| from log import log |
| from src.converter.openai2gemini import _convert_usage_metadata |
|
|
| def safe_get_nested(obj: Any, *keys: str, default: Any = None) -> Any: |
| """安全获取嵌套字典值 |
| |
| Args: |
| obj: 字典对象 |
| *keys: 嵌套键路径 |
| default: 默认值 |
| |
| Returns: |
| 获取到的值或默认值 |
| """ |
| for key in keys: |
| if not isinstance(obj, dict): |
| return default |
| obj = obj.get(key, default) |
| if obj is default: |
| return default |
| return obj |
|
|
| def parse_response_for_fake_stream(response_data: Dict[str, Any]) -> tuple: |
| """从完整响应中提取内容和推理内容(用于假流式) |
| |
| Args: |
| response_data: Gemini API 响应数据 |
| |
| Returns: |
| (content, reasoning_content, finish_reason, images): 内容、推理内容、结束原因和图片数据的元组 |
| """ |
| import json |
|
|
| |
| if "response" in response_data and "candidates" not in response_data: |
| log.debug(f"[FAKE_STREAM] Unwrapping response field") |
| response_data = response_data["response"] |
|
|
| candidates = response_data.get("candidates", []) |
| log.debug(f"[FAKE_STREAM] Found {len(candidates)} candidates") |
| if not candidates: |
| return "", "", "STOP", [] |
|
|
| candidate = candidates[0] |
| finish_reason = candidate.get("finishReason", "STOP") |
| parts = safe_get_nested(candidate, "content", "parts", default=[]) |
| log.debug(f"[FAKE_STREAM] Extracted {len(parts)} parts: {json.dumps(parts, ensure_ascii=False)}") |
| content, reasoning_content, images = extract_content_and_reasoning(parts) |
| log.debug(f"[FAKE_STREAM] Content length: {len(content)}, Reasoning length: {len(reasoning_content)}, Images count: {len(images)}") |
|
|
| return content, reasoning_content, finish_reason, images |
|
|
| def extract_fake_stream_content(response: Any) -> Tuple[str, str, Dict[str, int]]: |
| """ |
| 从 Gemini 非流式响应中提取内容,用于假流式处理 |
| |
| Args: |
| response: Gemini API 响应对象 |
| |
| Returns: |
| (content, reasoning_content, usage) 元组 |
| """ |
| from src.converter.utils import extract_content_and_reasoning |
| |
| |
| if hasattr(response, "body"): |
| body_str = ( |
| response.body.decode() |
| if isinstance(response.body, bytes) |
| else str(response.body) |
| ) |
| elif hasattr(response, "content"): |
| body_str = ( |
| response.content.decode() |
| if isinstance(response.content, bytes) |
| else str(response.content) |
| ) |
| else: |
| body_str = str(response) |
|
|
| try: |
| response_data = json.loads(body_str) |
|
|
| |
| |
| if "response" in response_data: |
| gemini_response = response_data["response"] |
| else: |
| gemini_response = response_data |
|
|
| |
| content = "" |
| reasoning_content = "" |
| images = [] |
| if "candidates" in gemini_response and gemini_response["candidates"]: |
| |
| candidate = gemini_response["candidates"][0] |
| if "content" in candidate and "parts" in candidate["content"]: |
| parts = candidate["content"]["parts"] |
| content, reasoning_content, images = extract_content_and_reasoning(parts) |
| elif "choices" in gemini_response and gemini_response["choices"]: |
| |
| content = gemini_response["choices"][0].get("message", {}).get("content", "") |
|
|
| |
| if not content and reasoning_content: |
| log.warning("Fake stream response contains only thinking content") |
| content = "[模型正在思考中,请稍后再试或重新提问]" |
| |
| |
| if not content: |
| log.warning(f"No content found in response: {gemini_response}") |
| content = "[响应为空,请重新尝试]" |
|
|
| |
| usage = _convert_usage_metadata(gemini_response.get("usageMetadata")) |
| |
| return content, reasoning_content, usage |
|
|
| except json.JSONDecodeError: |
| |
| return body_str, "", None |
|
|
| def _build_candidate(parts: List[Dict[str, Any]], finish_reason: str = "STOP") -> Dict[str, Any]: |
| """构建标准候选响应结构 |
| |
| Args: |
| parts: parts 列表 |
| finish_reason: 结束原因 |
| |
| Returns: |
| 候选响应字典 |
| """ |
| return { |
| "candidates": [{ |
| "content": {"parts": parts, "role": "model"}, |
| "finishReason": finish_reason, |
| "index": 0, |
| }] |
| } |
|
|
| def create_openai_heartbeat_chunk() -> Dict[str, Any]: |
| """ |
| 创建 OpenAI 格式的心跳块(用于假流式) |
| |
| Returns: |
| 心跳响应块字典 |
| """ |
| return { |
| "choices": [ |
| { |
| "index": 0, |
| "delta": {"role": "assistant", "content": ""}, |
| "finish_reason": None, |
| } |
| ] |
| } |
|
|
| def build_gemini_fake_stream_chunks(content: str, reasoning_content: str, finish_reason: str, images: List[Dict[str, Any]] = None, chunk_size: int = 50) -> List[Dict[str, Any]]: |
| """构建假流式响应的数据块 |
| |
| Args: |
| content: 主要内容 |
| reasoning_content: 推理内容 |
| finish_reason: 结束原因 |
| images: 图片数据列表(可选) |
| chunk_size: 每个chunk的字符数(默认50) |
| |
| Returns: |
| 响应数据块列表 |
| """ |
| if images is None: |
| images = [] |
|
|
| log.debug(f"[build_gemini_fake_stream_chunks] Input - content: {repr(content)}, reasoning: {repr(reasoning_content)}, finish_reason: {finish_reason}, images count: {len(images)}") |
| chunks = [] |
|
|
| |
| if not content: |
| default_text = "[模型正在思考中,请稍后再试或重新提问]" if reasoning_content else "[响应为空,请重新尝试]" |
| return [_build_candidate([{"text": default_text}], finish_reason)] |
|
|
| |
| first_chunk = True |
| for i in range(0, len(content), chunk_size): |
| chunk_text = content[i:i + chunk_size] |
| is_last_chunk = (i + chunk_size >= len(content)) and not reasoning_content |
| chunk_finish_reason = finish_reason if is_last_chunk else None |
|
|
| |
| parts = [] |
| if first_chunk and images: |
| |
| for img in images: |
| if img.get("type") == "image_url": |
| url = img.get("image_url", {}).get("url", "") |
| |
| if url.startswith("data:"): |
| parts_of_url = url.split(";base64,") |
| if len(parts_of_url) == 2: |
| mime_type = parts_of_url[0].replace("data:", "") |
| base64_data = parts_of_url[1] |
| parts.append({ |
| "inlineData": { |
| "mimeType": mime_type, |
| "data": base64_data |
| } |
| }) |
| first_chunk = False |
|
|
| parts.append({"text": chunk_text}) |
| chunk_data = _build_candidate(parts, chunk_finish_reason) |
| log.debug(f"[build_gemini_fake_stream_chunks] Generated chunk: {chunk_data}") |
| chunks.append(chunk_data) |
|
|
| |
| if reasoning_content: |
| for i in range(0, len(reasoning_content), chunk_size): |
| chunk_text = reasoning_content[i:i + chunk_size] |
| is_last_chunk = i + chunk_size >= len(reasoning_content) |
| chunk_finish_reason = finish_reason if is_last_chunk else None |
| chunks.append(_build_candidate([{"text": chunk_text, "thought": True}], chunk_finish_reason)) |
|
|
| log.debug(f"[build_gemini_fake_stream_chunks] Total chunks generated: {len(chunks)}") |
| return chunks |
|
|
|
|
| def create_gemini_heartbeat_chunk() -> Dict[str, Any]: |
| """创建 Gemini 格式的心跳数据块 |
| |
| Returns: |
| 心跳数据块 |
| """ |
| chunk = _build_candidate([{"text": ""}]) |
| chunk["candidates"][0]["finishReason"] = None |
| return chunk |
|
|
|
|
| def build_openai_fake_stream_chunks(content: str, reasoning_content: str, finish_reason: str, model: str, images: List[Dict[str, Any]] = None, chunk_size: int = 50) -> List[Dict[str, Any]]: |
| """构建 OpenAI 格式的假流式响应数据块 |
| |
| Args: |
| content: 主要内容 |
| reasoning_content: 推理内容 |
| finish_reason: 结束原因(如 "STOP", "MAX_TOKENS") |
| model: 模型名称 |
| images: 图片数据列表(可选) |
| chunk_size: 每个chunk的字符数(默认50) |
| |
| Returns: |
| OpenAI 格式的响应数据块列表 |
| """ |
| import time |
| import uuid |
|
|
| if images is None: |
| images = [] |
|
|
| log.debug(f"[build_openai_fake_stream_chunks] Input - content: {repr(content)}, reasoning: {repr(reasoning_content)}, finish_reason: {finish_reason}, images count: {len(images)}") |
| chunks = [] |
| response_id = f"chatcmpl-{uuid.uuid4().hex[:24]}" |
| created = int(time.time()) |
|
|
| |
| openai_finish_reason = None |
| if finish_reason == "STOP": |
| openai_finish_reason = "stop" |
| elif finish_reason == "MAX_TOKENS": |
| openai_finish_reason = "length" |
| elif finish_reason in ["SAFETY", "RECITATION"]: |
| openai_finish_reason = "content_filter" |
|
|
| |
| if not content: |
| default_text = "[模型正在思考中,请稍后再试或重新提问]" if reasoning_content else "[响应为空,请重新尝试]" |
| return [{ |
| "id": response_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": model, |
| "choices": [{ |
| "index": 0, |
| "delta": {"content": default_text}, |
| "finish_reason": openai_finish_reason, |
| }] |
| }] |
|
|
| |
| first_chunk = True |
| for i in range(0, len(content), chunk_size): |
| chunk_text = content[i:i + chunk_size] |
| is_last_chunk = (i + chunk_size >= len(content)) and not reasoning_content |
| chunk_finish = openai_finish_reason if is_last_chunk else None |
|
|
| delta_content = {} |
|
|
| |
| if first_chunk and images: |
| delta_content["content"] = images + [{"type": "text", "text": chunk_text}] |
| first_chunk = False |
| else: |
| delta_content["content"] = chunk_text |
|
|
| chunk_data = { |
| "id": response_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": model, |
| "choices": [{ |
| "index": 0, |
| "delta": delta_content, |
| "finish_reason": chunk_finish, |
| }] |
| } |
| log.debug(f"[build_openai_fake_stream_chunks] Generated chunk: {chunk_data}") |
| chunks.append(chunk_data) |
|
|
| |
| if reasoning_content: |
| for i in range(0, len(reasoning_content), chunk_size): |
| chunk_text = reasoning_content[i:i + chunk_size] |
| is_last_chunk = i + chunk_size >= len(reasoning_content) |
| chunk_finish = openai_finish_reason if is_last_chunk else None |
|
|
| chunks.append({ |
| "id": response_id, |
| "object": "chat.completion.chunk", |
| "created": created, |
| "model": model, |
| "choices": [{ |
| "index": 0, |
| "delta": {"reasoning_content": chunk_text}, |
| "finish_reason": chunk_finish, |
| }] |
| }) |
|
|
| log.debug(f"[build_openai_fake_stream_chunks] Total chunks generated: {len(chunks)}") |
| return chunks |
|
|
|
|
| def create_anthropic_heartbeat_chunk() -> Dict[str, Any]: |
| """ |
| 创建 Anthropic 格式的心跳块(用于假流式) |
| |
| Returns: |
| 心跳响应块字典 |
| """ |
| return { |
| "type": "ping" |
| } |
|
|
|
|
| def build_anthropic_fake_stream_chunks(content: str, reasoning_content: str, finish_reason: str, model: str, images: List[Dict[str, Any]] = None, chunk_size: int = 50) -> List[Dict[str, Any]]: |
| """构建 Anthropic 格式的假流式响应数据块 |
| |
| Args: |
| content: 主要内容 |
| reasoning_content: 推理内容(thinking content) |
| finish_reason: 结束原因(如 "STOP", "MAX_TOKENS") |
| model: 模型名称 |
| images: 图片数据列表(可选) |
| chunk_size: 每个chunk的字符数(默认50) |
| |
| Returns: |
| Anthropic SSE 格式的响应数据块列表 |
| """ |
| import uuid |
|
|
| if images is None: |
| images = [] |
|
|
| log.debug(f"[build_anthropic_fake_stream_chunks] Input - content: {repr(content)}, reasoning: {repr(reasoning_content)}, finish_reason: {finish_reason}, images count: {len(images)}") |
| chunks = [] |
| message_id = f"msg_{uuid.uuid4().hex}" |
|
|
| |
| anthropic_stop_reason = "end_turn" |
| if finish_reason == "MAX_TOKENS": |
| anthropic_stop_reason = "max_tokens" |
| elif finish_reason in ["SAFETY", "RECITATION"]: |
| anthropic_stop_reason = "end_turn" |
|
|
| |
| chunks.append({ |
| "type": "message_start", |
| "message": { |
| "id": message_id, |
| "type": "message", |
| "role": "assistant", |
| "model": model, |
| "content": [], |
| "stop_reason": None, |
| "stop_sequence": None, |
| "usage": {"input_tokens": 0, "output_tokens": 0} |
| } |
| }) |
|
|
| |
| if not content: |
| default_text = "[模型正在思考中,请稍后再试或重新提问]" if reasoning_content else "[响应为空,请重新尝试]" |
|
|
| |
| chunks.append({ |
| "type": "content_block_start", |
| "index": 0, |
| "content_block": {"type": "text", "text": ""} |
| }) |
|
|
| |
| chunks.append({ |
| "type": "content_block_delta", |
| "index": 0, |
| "delta": {"type": "text_delta", "text": default_text} |
| }) |
|
|
| |
| chunks.append({ |
| "type": "content_block_stop", |
| "index": 0 |
| }) |
|
|
| |
| chunks.append({ |
| "type": "message_delta", |
| "delta": {"stop_reason": anthropic_stop_reason, "stop_sequence": None}, |
| "usage": {"output_tokens": 0} |
| }) |
|
|
| |
| chunks.append({ |
| "type": "message_stop" |
| }) |
|
|
| return chunks |
|
|
| block_index = 0 |
|
|
| |
| if reasoning_content: |
| |
| chunks.append({ |
| "type": "content_block_start", |
| "index": block_index, |
| "content_block": {"type": "thinking", "thinking": ""} |
| }) |
|
|
| |
| for i in range(0, len(reasoning_content), chunk_size): |
| chunk_text = reasoning_content[i:i + chunk_size] |
| chunks.append({ |
| "type": "content_block_delta", |
| "index": block_index, |
| "delta": {"type": "thinking_delta", "thinking": chunk_text} |
| }) |
|
|
| |
| chunks.append({ |
| "type": "content_block_stop", |
| "index": block_index |
| }) |
|
|
| block_index += 1 |
|
|
| |
| if images: |
| for img in images: |
| if img.get("type") == "image_url": |
| url = img.get("image_url", {}).get("url", "") |
| |
| if url.startswith("data:"): |
| parts_of_url = url.split(";base64,") |
| if len(parts_of_url) == 2: |
| mime_type = parts_of_url[0].replace("data:", "") |
| base64_data = parts_of_url[1] |
|
|
| |
| chunks.append({ |
| "type": "content_block_start", |
| "index": block_index, |
| "content_block": { |
| "type": "image", |
| "source": { |
| "type": "base64", |
| "media_type": mime_type, |
| "data": base64_data |
| } |
| } |
| }) |
|
|
| |
| chunks.append({ |
| "type": "content_block_stop", |
| "index": block_index |
| }) |
|
|
| block_index += 1 |
|
|
| |
| |
| chunks.append({ |
| "type": "content_block_start", |
| "index": block_index, |
| "content_block": {"type": "text", "text": ""} |
| }) |
|
|
| |
| for i in range(0, len(content), chunk_size): |
| chunk_text = content[i:i + chunk_size] |
| chunks.append({ |
| "type": "content_block_delta", |
| "index": block_index, |
| "delta": {"type": "text_delta", "text": chunk_text} |
| }) |
|
|
| |
| chunks.append({ |
| "type": "content_block_stop", |
| "index": block_index |
| }) |
|
|
| |
| chunks.append({ |
| "type": "message_delta", |
| "delta": {"stop_reason": anthropic_stop_reason, "stop_sequence": None}, |
| "usage": {"output_tokens": len(content) + len(reasoning_content)} |
| }) |
|
|
| |
| chunks.append({ |
| "type": "message_stop" |
| }) |
|
|
| log.debug(f"[build_anthropic_fake_stream_chunks] Total chunks generated: {len(chunks)}") |
| return chunks |