Spaces:
Paused
Paused
| import asyncio | |
| import base64 | |
| import hashlib | |
| import io | |
| import random | |
| import re | |
| import reprlib | |
| import uuid | |
| from dataclasses import dataclass | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, AsyncGenerator | |
| import orjson | |
| from fastapi import APIRouter, Depends, HTTPException, Request, status | |
| from fastapi.responses import StreamingResponse | |
| from gemini_webapi import ModelOutput | |
| from gemini_webapi.client import ChatSession | |
| from gemini_webapi.constants import Model | |
| from gemini_webapi.types.image import GeneratedImage, Image | |
| from loguru import logger | |
| from ..models import ( | |
| ChatCompletionRequest, | |
| ContentItem, | |
| ConversationInStore, | |
| Message, | |
| ModelData, | |
| ModelListResponse, | |
| ResponseCreateRequest, | |
| ResponseCreateResponse, | |
| ResponseImageGenerationCall, | |
| ResponseImageTool, | |
| ResponseInputContent, | |
| ResponseInputItem, | |
| ResponseOutputContent, | |
| ResponseOutputMessage, | |
| ResponseToolCall, | |
| ResponseToolChoice, | |
| ResponseUsage, | |
| Tool, | |
| ToolChoiceFunction, | |
| ToolChoiceFunctionDetail, | |
| ToolFunctionDefinition, | |
| AnthropicMessageRequest, | |
| ) | |
| from ..services import GeminiClientPool, GeminiClientWrapper, LMDBConversationStore | |
| from ..utils import g_config | |
| from ..utils.helper import ( | |
| TOOL_HINT_LINE_END, | |
| TOOL_HINT_LINE_START, | |
| TOOL_HINT_STRIPPED, | |
| TOOL_WRAP_HINT, | |
| estimate_tokens, | |
| extract_image_dimensions, | |
| extract_tool_calls, | |
| retry_with_backoff, | |
| strip_system_hints, | |
| text_from_message, | |
| ) | |
| from .middleware import get_image_store_dir, get_image_token, get_temp_dir, verify_api_key | |
| MAX_CHARS_PER_REQUEST = int(g_config.gemini.max_chars_per_request * 0.9) | |
| METADATA_TTL_MINUTES = 15 | |
| router = APIRouter() | |
| class StructuredOutputRequirement: | |
| """Represents a structured response request from the client.""" | |
| schema_name: str | |
| schema: dict[str, Any] | |
| instruction: str | |
| raw_format: dict[str, Any] | |
| # --- Helper Functions --- | |
| async def _image_to_base64( | |
| image: Image, temp_dir: Path | |
| ) -> tuple[str, int | None, int | None, str, str]: | |
| """Persist an image provided by gemini_webapi and return base64 plus dimensions, filename, and hash.""" | |
| if isinstance(image, GeneratedImage): | |
| try: | |
| saved_path = await image.save(path=str(temp_dir), full_size=True) | |
| except Exception as e: | |
| logger.warning( | |
| f"Failed to download full-size GeneratedImage, retrying with default size: {e}" | |
| ) | |
| saved_path = await image.save(path=str(temp_dir), full_size=False) | |
| else: | |
| saved_path = await image.save(path=str(temp_dir)) | |
| if not saved_path: | |
| raise ValueError("Failed to save generated image") | |
| original_path = Path(saved_path) | |
| random_name = f"img_{uuid.uuid4().hex}{original_path.suffix}" | |
| new_path = temp_dir / random_name | |
| original_path.rename(new_path) | |
| data = new_path.read_bytes() | |
| width, height = extract_image_dimensions(data) | |
| filename = random_name | |
| file_hash = hashlib.sha256(data).hexdigest() | |
| return base64.b64encode(data).decode("ascii"), width, height, filename, file_hash | |
| def _calculate_usage( | |
| messages: list[Message], | |
| assistant_text: str | None, | |
| tool_calls: list[Any] | None, | |
| ) -> tuple[int, int, int]: | |
| """Calculate prompt, completion and total tokens consistently.""" | |
| prompt_tokens = sum(estimate_tokens(text_from_message(msg)) for msg in messages) | |
| tool_args_text = "" | |
| if tool_calls: | |
| for call in tool_calls: | |
| if hasattr(call, "function"): | |
| tool_args_text += call.function.arguments or "" | |
| elif isinstance(call, dict): | |
| tool_args_text += call.get("function", {}).get("arguments", "") | |
| completion_basis = assistant_text or "" | |
| if tool_args_text: | |
| completion_basis = ( | |
| f"{completion_basis}\n{tool_args_text}" if completion_basis else tool_args_text | |
| ) | |
| completion_tokens = estimate_tokens(completion_basis) | |
| return prompt_tokens, completion_tokens, prompt_tokens + completion_tokens | |
| def _create_responses_standard_payload( | |
| response_id: str, | |
| created_time: int, | |
| model_name: str, | |
| detected_tool_calls: list[Any] | None, | |
| image_call_items: list[ResponseImageGenerationCall], | |
| response_contents: list[ResponseOutputContent], | |
| usage: ResponseUsage, | |
| request: ResponseCreateRequest, | |
| normalized_input: Any, | |
| ) -> ResponseCreateResponse: | |
| """Unified factory for building ResponseCreateResponse objects.""" | |
| message_id = f"msg_{uuid.uuid4().hex}" | |
| tool_call_items: list[ResponseToolCall] = [] | |
| if detected_tool_calls: | |
| tool_call_items = [ | |
| ResponseToolCall( | |
| id=call.id if hasattr(call, "id") else call["id"], | |
| status="completed", | |
| function=call.function if hasattr(call, "function") else call["function"], | |
| ) | |
| for call in detected_tool_calls | |
| ] | |
| return ResponseCreateResponse( | |
| id=response_id, | |
| created_at=created_time, | |
| model=model_name, | |
| output=[ | |
| ResponseOutputMessage( | |
| id=message_id, | |
| type="message", | |
| role="assistant", | |
| content=response_contents, | |
| ), | |
| *tool_call_items, | |
| *image_call_items, | |
| ], | |
| status="completed", | |
| usage=usage, | |
| input=normalized_input or None, | |
| metadata=request.metadata or None, | |
| tools=request.tools, | |
| tool_choice=request.tool_choice, | |
| ) | |
| def _create_chat_completion_standard_payload( | |
| completion_id: str, | |
| created_time: int, | |
| model_name: str, | |
| visible_output: str | None, | |
| tool_calls_payload: list[dict] | None, | |
| finish_reason: str, | |
| usage: dict, | |
| ) -> dict: | |
| """Unified factory for building Chat Completion response dictionaries.""" | |
| return { | |
| "id": completion_id, | |
| "object": "chat.completion", | |
| "created": created_time, | |
| "model": model_name, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": { | |
| "role": "assistant", | |
| "content": visible_output or None, | |
| "tool_calls": tool_calls_payload or None, | |
| }, | |
| "finish_reason": finish_reason, | |
| } | |
| ], | |
| "usage": usage, | |
| } | |
| def _process_llm_output( | |
| raw_output_with_think: str, | |
| raw_output_clean: str, | |
| structured_requirement: StructuredOutputRequirement | None, | |
| ) -> tuple[str, str, list[Any]]: | |
| """ | |
| Common post-processing logic for Gemini output. | |
| Returns: (visible_text, storage_output, tool_calls) | |
| """ | |
| visible_with_think, tool_calls = extract_tool_calls(raw_output_with_think) | |
| if tool_calls: | |
| logger.debug(f"Detected {len(tool_calls)} tool call(s) in model output.") | |
| visible_output = visible_with_think.strip() | |
| storage_output, _ = extract_tool_calls(raw_output_clean) | |
| storage_output = storage_output.strip() | |
| if structured_requirement: | |
| cleaned_for_json = LMDBConversationStore.remove_think_tags(visible_output) | |
| if cleaned_for_json: | |
| try: | |
| structured_payload = orjson.loads(cleaned_for_json) | |
| canonical_output = orjson.dumps(structured_payload).decode("utf-8") | |
| visible_output = canonical_output | |
| storage_output = canonical_output | |
| logger.debug( | |
| f"Structured response fulfilled (schema={structured_requirement.schema_name})." | |
| ) | |
| except orjson.JSONDecodeError: | |
| logger.warning( | |
| f"Failed to decode JSON for structured response (schema={structured_requirement.schema_name})." | |
| ) | |
| else: | |
| # Apply standard cleaning (citations, links, html entities) to the final output | |
| # treating it as a ModelOutput object wrapper or just reusing the static method logic | |
| # Since extract_output expects ModelOutput, we can use the helper method logic directly | |
| # or construct a dummy object. | |
| # Actually, let's just use the static method with a dummy object for convenience, | |
| # or better: refactor extract_output to take string? | |
| # For minimum blast radius, we'll wrap it. | |
| # But wait, extract_output handles "thoughts" too. | |
| # Here we already separated thoughts. | |
| # We just want to clean `visible_output`. | |
| # We can expose a `clean_text` static method in implementation, but strict instruction says "modify chat.py". | |
| # Let's import the cleaning functions? They are internal to client.py. | |
| # Better: Use GeminiClientWrapper.extract_output with a dummy ModelOutput. | |
| dummy_output = ModelOutput(metadata=[], candidates=[{"rcid": "dummy", "text": visible_output}]) | |
| # We don't want to re-add thoughts (include_thoughts=False) because we handle them separately? | |
| # extract_output puts thoughts in <think> tags if include_thoughts=True. | |
| # Here `visible_output` DOES NOT have thoughts (they are in `raw_output_with_think`'s <think> part). | |
| # Wait, `extract_tool_calls` returned `visible_with_think`. | |
| # `visible_output` = `visible_with_think.strip()`. | |
| # So `visible_output` MIGHT have <think> tags if they weren't stripped? | |
| # Actually `extract_tool_calls` doesn't strip <think>. | |
| # So `visible_output` HAS thoughts. | |
| # So we can pass it to extract_output? | |
| # extract_output re-adds <think> from `response.thoughts`. | |
| # If we pass `text=visible_output` (which has <think>), and `thoughts=None`, and `include_thoughts=False`. | |
| # It should process `text`. | |
| # BUT `extract_output` does NOT strip existing <think> tags from `text`. | |
| # So it's safe. | |
| try: | |
| visible_output = GeminiClientWrapper.extract_output(dummy_output, include_thoughts=False) | |
| # Apply to storage_output too? | |
| # storage_output usually should match visible_output for history consistency. | |
| storage_output = GeminiClientWrapper.extract_output( | |
| ModelOutput(metadata=[], candidates=[{"rcid": "dummy", "text": storage_output}]), | |
| include_thoughts=False | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Failed to clean output with GeminiClientWrapper: {e}") | |
| return visible_output, storage_output, tool_calls | |
| def _persist_conversation( | |
| db: LMDBConversationStore, | |
| model_name: str, | |
| client_id: str, | |
| metadata: list[str | None], | |
| messages: list[Message], | |
| storage_output: str | None, | |
| tool_calls: list[Any] | None, | |
| ) -> str | None: | |
| """Unified logic to save conversation history to LMDB.""" | |
| try: | |
| current_assistant_message = Message( | |
| role="assistant", | |
| content=storage_output or None, | |
| tool_calls=tool_calls or None, | |
| ) | |
| full_history = [*messages, current_assistant_message] | |
| cleaned_history = db.sanitize_assistant_messages(full_history) | |
| conv = ConversationInStore( | |
| model=model_name, | |
| client_id=client_id, | |
| metadata=metadata, | |
| messages=cleaned_history, | |
| ) | |
| key = db.store(conv) | |
| logger.debug(f"Conversation saved to LMDB with key: {key[:12]}") | |
| return key | |
| except Exception as e: | |
| logger.warning(f"Failed to save {len(messages) + 1} messages to LMDB: {e}") | |
| return None | |
| def _build_structured_requirement( | |
| response_format: dict[str, Any] | None, | |
| ) -> StructuredOutputRequirement | None: | |
| """Translate OpenAI-style response_format into internal instructions.""" | |
| if not response_format or not isinstance(response_format, dict): | |
| return None | |
| if response_format.get("type") != "json_schema": | |
| logger.warning( | |
| f"Unsupported response_format type requested: {reprlib.repr(response_format)}" | |
| ) | |
| return None | |
| json_schema = response_format.get("json_schema") | |
| if not isinstance(json_schema, dict): | |
| logger.warning( | |
| f"Invalid json_schema payload in response_format: {reprlib.repr(response_format)}" | |
| ) | |
| return None | |
| schema = json_schema.get("schema") | |
| if not isinstance(schema, dict): | |
| logger.warning( | |
| f"Missing `schema` object in response_format payload: {reprlib.repr(response_format)}" | |
| ) | |
| return None | |
| schema_name = json_schema.get("name") or "response" | |
| strict = json_schema.get("strict", True) | |
| pretty_schema = orjson.dumps(schema, option=orjson.OPT_SORT_KEYS).decode("utf-8") | |
| instruction_parts = [ | |
| "You must respond with a single valid JSON document that conforms to the schema shown below.", | |
| "Do not include explanations, comments, or any text before or after the JSON.", | |
| f'Schema name: "{schema_name}"', | |
| "JSON Schema:", | |
| pretty_schema, | |
| ] | |
| if not strict: | |
| instruction_parts.insert( | |
| 1, | |
| "The schema allows unspecified fields, but include only what is necessary to satisfy the user's request.", | |
| ) | |
| instruction = "\n\n".join(instruction_parts) | |
| return StructuredOutputRequirement( | |
| schema_name=schema_name, | |
| schema=schema, | |
| instruction=instruction, | |
| raw_format=response_format, | |
| ) | |
| def _build_tool_prompt( | |
| tools: list[Tool], | |
| tool_choice: str | ToolChoiceFunction | None, | |
| ) -> str: | |
| """Generate a system prompt chunk describing available tools.""" | |
| if not tools: | |
| return "" | |
| lines: list[str] = [ | |
| "You can invoke the following developer tools. Call a tool only when it is required and follow the JSON schema exactly when providing arguments." | |
| ] | |
| for tool in tools: | |
| function = tool.function | |
| description = function.description or "No description provided." | |
| lines.append(f"Tool `{function.name}`: {description}") | |
| if function.parameters: | |
| schema_text = orjson.dumps(function.parameters, option=orjson.OPT_SORT_KEYS).decode( | |
| "utf-8" | |
| ) | |
| lines.append("Arguments JSON schema:") | |
| lines.append(schema_text) | |
| else: | |
| lines.append("Arguments JSON schema: {}") | |
| if tool_choice == "none": | |
| lines.append( | |
| "For this request you must not call any tool. Provide the best possible natural language answer." | |
| ) | |
| elif tool_choice == "required": | |
| lines.append( | |
| "You must call at least one tool before responding to the user. Do not provide a final user-facing answer until a tool call has been issued." | |
| ) | |
| elif isinstance(tool_choice, ToolChoiceFunction): | |
| target = tool_choice.function.name | |
| lines.append( | |
| f"You are required to call the tool named `{target}`. Do not call any other tool." | |
| ) | |
| lines.append( | |
| "When you decide to call a tool you MUST respond with nothing except a single [function_calls] block exactly like the template below." | |
| ) | |
| lines.append("Do not add text before or after the block.") | |
| lines.append("[function_calls]") | |
| lines.append('[call:tool_name]{"argument": "value"}[/call]') | |
| lines.append("[/function_calls]") | |
| lines.append( | |
| "Use double quotes for JSON keys and values. CRITICAL: The content inside [call:...]...[/call] MUST be a raw JSON object. Do not wrap it in ```json blocks or add any conversational text inside the tag." | |
| ) | |
| lines.append( | |
| "To call multiple tools, list each [call:tool_name]...[/call] entry sequentially within a single [function_calls] block." | |
| ) | |
| lines.append( | |
| "If no tool call is needed, provide a normal response and DO NOT use the [function_calls] tag." | |
| ) | |
| return "\n".join(lines) | |
| def _build_image_generation_instruction( | |
| tools: list[ResponseImageTool] | None, | |
| tool_choice: ResponseToolChoice | None, | |
| ) -> str | None: | |
| """Construct explicit guidance so Gemini emits images when requested.""" | |
| has_forced_choice = tool_choice is not None and tool_choice.type == "image_generation" | |
| primary = tools[0] if tools else None | |
| if not has_forced_choice and primary is None: | |
| return None | |
| instructions: list[str] = [ | |
| "Image generation is enabled. When the user requests an image, you must return an actual generated image, not a text description.", | |
| "For new image requests, generate at least one new image matching the description.", | |
| "If the user provides an image and asks for edits or variations, return a newly generated image with the requested changes.", | |
| "Avoid all text replies unless a short caption is explicitly requested. Do not explain, apologize, or describe image creation steps.", | |
| "Never send placeholder text like 'Here is your image' or any other response without an actual image attachment.", | |
| ] | |
| if primary: | |
| if primary.model: | |
| instructions.append( | |
| f"Where styles differ, favor the `{primary.model}` image model when rendering the scene." | |
| ) | |
| if primary.output_format: | |
| instructions.append( | |
| f"Encode the image using the `{primary.output_format}` format whenever possible." | |
| ) | |
| if has_forced_choice: | |
| instructions.append( | |
| "Image generation was explicitly requested. You must return at least one generated image. Any response without an image will be treated as a failure." | |
| ) | |
| return "\n\n".join(instructions) | |
| def _append_tool_hint_to_last_user_message(messages: list[Message]) -> None: | |
| """Ensure the last user message carries the tool wrap hint.""" | |
| for msg in reversed(messages): | |
| if msg.role != "user" or msg.content is None: | |
| continue | |
| if isinstance(msg.content, str): | |
| if TOOL_HINT_STRIPPED not in msg.content: | |
| msg.content = f"{msg.content}\n{TOOL_WRAP_HINT}" | |
| return | |
| if isinstance(msg.content, list): | |
| for part in reversed(msg.content): | |
| if getattr(part, "type", None) != "text": | |
| continue | |
| text_value = part.text or "" | |
| if TOOL_HINT_STRIPPED in text_value: | |
| return | |
| part.text = f"{text_value}\n{TOOL_WRAP_HINT}" | |
| return | |
| messages_text = TOOL_WRAP_HINT.strip() | |
| msg.content.append(ContentItem(type="text", text=messages_text)) | |
| return | |
| def _prepare_messages_for_model( | |
| source_messages: list[Message], | |
| tools: list[Tool] | None, | |
| tool_choice: str | ToolChoiceFunction | None, | |
| extra_instructions: list[str] | None = None, | |
| inject_system_defaults: bool = True, | |
| ) -> list[Message]: | |
| """Return a copy of messages enriched with tool instructions when needed.""" | |
| prepared = [msg.model_copy(deep=True) for msg in source_messages] | |
| # Resolve tool names for 'tool' messages by looking back at previous assistant tool calls | |
| tool_id_to_name = {} | |
| for msg in prepared: | |
| if msg.role == "assistant" and msg.tool_calls: | |
| for tc in msg.tool_calls: | |
| tool_id_to_name[tc.id] = tc.function.name | |
| for msg in prepared: | |
| if msg.role == "tool" and not msg.name and msg.tool_call_id: | |
| msg.name = tool_id_to_name.get(msg.tool_call_id) | |
| instructions: list[str] = [] | |
| if inject_system_defaults: | |
| if tools: | |
| tool_prompt = _build_tool_prompt(tools, tool_choice) | |
| if tool_prompt: | |
| instructions.append(tool_prompt) | |
| if extra_instructions: | |
| instructions.extend(instr for instr in extra_instructions if instr) | |
| logger.debug( | |
| f"Applied {len(extra_instructions)} extra instructions for tool/structured output." | |
| ) | |
| if not instructions: | |
| if tools and tool_choice != "none": | |
| _append_tool_hint_to_last_user_message(prepared) | |
| return prepared | |
| combined_instructions = "\n\n".join(instructions) | |
| if prepared and prepared[0].role == "system" and isinstance(prepared[0].content, str): | |
| existing = prepared[0].content or "" | |
| if combined_instructions not in existing: | |
| separator = "\n\n" if existing else "" | |
| prepared[0].content = f"{existing}{separator}{combined_instructions}" | |
| else: | |
| prepared.insert(0, Message(role="system", content=combined_instructions)) | |
| if tools and tool_choice != "none": | |
| _append_tool_hint_to_last_user_message(prepared) | |
| return prepared | |
| def _response_items_to_messages( | |
| items: str | list[ResponseInputItem], | |
| ) -> tuple[list[Message], str | list[ResponseInputItem]]: | |
| """Convert Responses API input items into internal Message objects and normalized input.""" | |
| messages: list[Message] = [] | |
| if isinstance(items, str): | |
| messages.append(Message(role="user", content=items)) | |
| logger.debug("Normalized Responses input: single string message.") | |
| return messages, items | |
| normalized_input: list[ResponseInputItem] = [] | |
| for item in items: | |
| role = item.role | |
| content = item.content | |
| normalized_contents: list[ResponseInputContent] = [] | |
| if isinstance(content, str): | |
| normalized_contents.append(ResponseInputContent(type="input_text", text=content)) | |
| messages.append(Message(role=role, content=content)) | |
| else: | |
| converted: list[ContentItem] = [] | |
| for part in content: | |
| if part.type == "input_text": | |
| text_value = part.text or "" | |
| normalized_contents.append( | |
| ResponseInputContent(type="input_text", text=text_value) | |
| ) | |
| if text_value: | |
| converted.append(ContentItem(type="text", text=text_value)) | |
| elif part.type == "input_image": | |
| image_url = part.image_url | |
| if image_url: | |
| normalized_contents.append( | |
| ResponseInputContent( | |
| type="input_image", | |
| image_url=image_url, | |
| detail=part.detail if part.detail else "auto", | |
| ) | |
| ) | |
| converted.append( | |
| ContentItem( | |
| type="image_url", | |
| image_url={ | |
| "url": image_url, | |
| "detail": part.detail if part.detail else "auto", | |
| }, | |
| ) | |
| ) | |
| elif part.type == "input_file": | |
| if part.file_url or part.file_data: | |
| normalized_contents.append(part) | |
| file_info = {} | |
| if part.file_data: | |
| file_info["file_data"] = part.file_data | |
| file_info["filename"] = part.filename | |
| if part.file_url: | |
| file_info["url"] = part.file_url | |
| converted.append(ContentItem(type="file", file=file_info)) | |
| messages.append(Message(role=role, content=converted or None)) | |
| normalized_input.append( | |
| ResponseInputItem(type="message", role=item.role, content=normalized_contents or []) | |
| ) | |
| logger.debug(f"Normalized Responses input: {len(normalized_input)} message items.") | |
| return messages, normalized_input | |
| def _instructions_to_messages( | |
| instructions: str | list[ResponseInputItem] | None, | |
| ) -> list[Message]: | |
| """Normalize instructions payload into Message objects.""" | |
| if not instructions: | |
| return [] | |
| if isinstance(instructions, str): | |
| return [Message(role="system", content=instructions)] | |
| instruction_messages: list[Message] = [] | |
| for item in instructions: | |
| if item.type and item.type != "message": | |
| continue | |
| role = item.role | |
| content = item.content | |
| if isinstance(content, str): | |
| instruction_messages.append(Message(role=role, content=content)) | |
| else: | |
| converted: list[ContentItem] = [] | |
| for part in content: | |
| if part.type == "input_text": | |
| text_value = part.text or "" | |
| if text_value: | |
| converted.append(ContentItem(type="text", text=text_value)) | |
| elif part.type == "input_image": | |
| image_url = part.image_url | |
| if image_url: | |
| converted.append( | |
| ContentItem( | |
| type="image_url", | |
| image_url={ | |
| "url": image_url, | |
| "detail": part.detail if part.detail else "auto", | |
| }, | |
| ) | |
| ) | |
| elif part.type == "input_file": | |
| file_info = {} | |
| if part.file_data: | |
| file_info["file_data"] = part.file_data | |
| file_info["filename"] = part.filename | |
| if part.file_url: | |
| file_info["url"] = part.file_url | |
| if file_info: | |
| converted.append(ContentItem(type="file", file=file_info)) | |
| instruction_messages.append(Message(role=role, content=converted or None)) | |
| return instruction_messages | |
| def _get_model_by_name(name: str) -> Model: | |
| """Retrieve a Model instance by name.""" | |
| strategy = g_config.gemini.model_strategy | |
| custom_models = {m.model_name: m for m in g_config.gemini.models if m.model_name} | |
| if name in custom_models: | |
| return Model.from_dict(custom_models[name].model_dump()) | |
| if strategy == "overwrite": | |
| raise ValueError(f"Model '{name}' not found in custom models (strategy='overwrite').") | |
| return Model.from_name(name) | |
| def _get_available_models() -> list[ModelData]: | |
| """Return a list of available models based on configuration strategy.""" | |
| now = int(datetime.now(tz=timezone.utc).timestamp()) | |
| strategy = g_config.gemini.model_strategy | |
| models_data = [] | |
| custom_models = [m for m in g_config.gemini.models if m.model_name] | |
| for m in custom_models: | |
| models_data.append( | |
| ModelData( | |
| id=m.model_name, | |
| created=now, | |
| owned_by="custom", | |
| ) | |
| ) | |
| if strategy == "append": | |
| custom_ids = {m.model_name for m in custom_models} | |
| for model in Model: | |
| m_name = model.model_name | |
| if not m_name or m_name == "unspecified": | |
| continue | |
| if m_name in custom_ids: | |
| continue | |
| models_data.append( | |
| ModelData( | |
| id=m_name, | |
| created=now, | |
| owned_by="gemini-web", | |
| ) | |
| ) | |
| return models_data | |
| async def _find_reusable_session( | |
| db: LMDBConversationStore, | |
| pool: GeminiClientPool, | |
| model: Model, | |
| messages: list[Message], | |
| ) -> tuple[ChatSession | None, GeminiClientWrapper | None, list[Message]]: | |
| """Find an existing chat session matching the longest suitable history prefix.""" | |
| if len(messages) < 2: | |
| return None, None, messages | |
| search_end = len(messages) | |
| while search_end >= 2: | |
| search_history = messages[:search_end] | |
| # Note: 'tool' role is excluded from session matching because tool messages | |
| # are new inputs that should trigger generation, not part of existing history. | |
| # When a tool result is sent, it needs to be processed as a new message. | |
| if search_history[-1].role in {"assistant", "system"}: | |
| try: | |
| if conv := db.find(model.model_name, search_history): | |
| now = datetime.now() | |
| updated_at = conv.updated_at or conv.created_at or now | |
| age_minutes = (now - updated_at).total_seconds() / 60 | |
| if age_minutes <= METADATA_TTL_MINUTES: | |
| client = await pool.acquire(conv.client_id) | |
| session = client.start_chat(metadata=conv.metadata, model=model) | |
| remain = messages[search_end:] | |
| logger.debug( | |
| f"Match found at prefix length {search_end}/{len(messages)}. Client: {conv.client_id}" | |
| ) | |
| return session, client, remain | |
| else: | |
| logger.debug( | |
| f"Matched conversation at length {search_end} is too old ({age_minutes:.1f}m), skipping reuse." | |
| ) | |
| else: | |
| # Log that we tried this prefix but failed | |
| pass | |
| except Exception as e: | |
| logger.warning( | |
| f"Error checking LMDB for reusable session at length {search_end}: {e}" | |
| ) | |
| break | |
| search_end -= 1 | |
| logger.debug(f"No reusable session found for {len(messages)} messages.") | |
| return None, None, messages | |
| async def _send_with_split( | |
| session: ChatSession, | |
| text: str, | |
| files: list[Path | str | io.BytesIO] | None = None, | |
| stream: bool = False, | |
| ) -> AsyncGenerator[ModelOutput, None] | ModelOutput: | |
| """ | |
| Send text to Gemini, splitting or converting to attachment if too long. | |
| Includes retry with exponential backoff for transient failures. | |
| """ | |
| async def _stream_with_retry( | |
| content: str, file_list: list | None | |
| ) -> AsyncGenerator[ModelOutput, None]: | |
| """Manual retry logic for streaming.""" | |
| max_retries = 1 # 再次减少到 1 次,确保总时长可控 | |
| for attempt in range(max_retries + 1): | |
| try: | |
| gen = session.send_message_stream(content, files=file_list) | |
| has_yielded = False | |
| async for chunk in gen: | |
| yield chunk | |
| has_yielded = True | |
| return | |
| except Exception as e: | |
| if has_yielded: | |
| logger.error(f"Stream interrupted after yielding data: {e}") | |
| raise e | |
| error_str = str(e).lower() | |
| if any(code in error_str for code in ["429", "403", "401", "quota"]): | |
| raise e | |
| if attempt < max_retries: | |
| # 如果是流中断,给予更长的冷却时间让上游恢复 | |
| if "interrupted" in error_str or "truncated" in error_str: | |
| delay = 5.0 + random.uniform(1.0, 3.0) | |
| logger.warning(f"Stream interrupted. Cooling down for {delay:.2f}s before retry...") | |
| else: | |
| delay = 2.0 + random.uniform(0.1, 0.5) | |
| logger.warning(f"Stream failed to start (attempt {attempt+1}/{max_retries}). Retrying in {delay:.2f}s. Error: {e}") | |
| await asyncio.sleep(delay) | |
| else: | |
| raise e | |
| async def _send_with_retry( | |
| content: str, file_list: list | None, is_stream: bool | |
| ) -> AsyncGenerator[ModelOutput, None] | ModelOutput: | |
| """Internal function with retry logic.""" | |
| try: | |
| if is_stream: | |
| return _stream_with_retry(content, file_list) | |
| return await session.send_message(content, files=file_list) | |
| except Exception as e: | |
| error_msg = str(e) | |
| # 如果是流中断,记录警告并抛出以触发重试 | |
| if "Stream interrupted" in error_msg or "truncated" in error_msg: | |
| logger.warning(f"Gemini stream interrupted (Session: {session.sid}): {e}") | |
| raise e | |
| raise e | |
| if len(text) <= MAX_CHARS_PER_REQUEST: | |
| try: | |
| return await _send_with_retry(text, files, stream) | |
| except Exception as e: | |
| logger.exception(f"Error sending message to Gemini after retries: {e}") | |
| raise | |
| logger.info( | |
| f"Message length ({len(text)}) exceeds limit ({MAX_CHARS_PER_REQUEST}). Converting text to file attachment." | |
| ) | |
| file_obj = io.BytesIO(text.encode("utf-8")) | |
| file_obj.name = "message.txt" | |
| try: | |
| final_files = list(files) if files else [] | |
| final_files.append(file_obj) | |
| instruction = ( | |
| "The user's input exceeds the character limit and is provided in the attached file `message.txt`.\n\n" | |
| "**System Instruction:**\n" | |
| "1. Read the content of `message.txt`.\n" | |
| "2. Treat that content as the **primary** user prompt for this turn.\n" | |
| "3. Execute the instructions or answer the questions found *inside* that file immediately.\n" | |
| ) | |
| return await _send_with_retry(instruction, final_files, stream) | |
| except Exception as e: | |
| logger.exception(f"Error sending large text as file to Gemini after retries: {e}") | |
| raise | |
| class StreamingOutputFilter: | |
| """ | |
| State Machine filter to suppress technical markers, tool calls, and system hints. | |
| Handles fragmentation where markers are split across multiple chunks. | |
| Returns structured events: {'type': 'text'|'tool_start'|'tool_delta'|'tool_end', 'content': ...} | |
| """ | |
| # Internal technical artifacts to suppress in real-time | |
| GARBAGE_URL_RE = re.compile( | |
| r"https?://(?:[a-zA-Z0-9-]+\.)*(?:googleusercontent\.com|gstatic\.com|google\.com)/(?:image_collection|recs|image_retrieval)/[^\s]+" | |
| ) | |
| # Citation markers (e.g., [1] or 【1†source】) | |
| CITATION_RE = re.compile(r"【\d+†source】|\[\d+\]") | |
| # Match patterns for JSON-like content | |
| JSON_START_RE = re.compile(r"\s*\{") | |
| # Match whitespace for faster stripping | |
| LEADING_WS_RE = re.compile(r"^\s+") | |
| # Pre-compiled patterns for streaming performance | |
| TRAILING_FENCE_RE = re.compile(r'\n?```$') | |
| URL_DELIMITER_RE = re.compile(r"[\s\]\)]") | |
| BRACKET_NUM_RE = re.compile(r"\[\d+\]") | |
| def __init__(self): | |
| self.buffer = "" | |
| self.state = "NORMAL" | |
| self.current_role = "" | |
| self.TOOL_START = "[function_calls]" | |
| self.TOOL_END_PREFIX = "[/function_" | |
| self.CALL_START = "[call:" | |
| self.CALL_TITLE_END = "]" | |
| self.CALL_END = "[/call]" | |
| self.TAG_START = "<|im_start|>" | |
| self.TAG_END = "<|im_end|>" | |
| self.HINT_START = f"\n{TOOL_HINT_LINE_START}" if TOOL_HINT_LINE_START else "" | |
| self.HINT_END = TOOL_HINT_LINE_END | |
| self.WATCH_PREFIXES = [ | |
| self.TOOL_START, | |
| self.TAG_START, | |
| self.TAG_END, | |
| self.TOOL_END_PREFIX, | |
| "http://googleusercontent.com/", | |
| "https://googleusercontent.com/", | |
| "http://www.googleusercontent.com/", | |
| "https://www.googleusercontent.com/", | |
| "http://", | |
| "https://", | |
| "[", # For citations | |
| "【", # For source citations | |
| ] | |
| if self.HINT_START: | |
| self.WATCH_PREFIXES.append(self.HINT_START) | |
| self.tool_call_index = 0 | |
| self.current_call_id = None | |
| self.args_started = False | |
| def process(self, chunk: str) -> list[dict[str, Any]]: | |
| self.buffer += chunk | |
| events = [] | |
| while self.buffer: | |
| if self.state == "NORMAL": | |
| low_buf = self.buffer.lower() | |
| tool_idx = low_buf.find(self.TOOL_START.lower()) | |
| tag_idx = low_buf.find(self.TAG_START.lower()) | |
| end_idx = low_buf.find(self.TAG_END.lower()) | |
| hint_idx = low_buf.find(self.HINT_START.lower()) if self.HINT_START else -1 | |
| url_idx = -1 | |
| http_idx = low_buf.find("http://") | |
| https_idx = low_buf.find("https://") | |
| if http_idx != -1 and https_idx != -1: | |
| url_idx = min(http_idx, https_idx) | |
| elif http_idx != -1: | |
| url_idx = http_idx | |
| elif https_idx != -1: | |
| url_idx = https_idx | |
| # Check for brackets (potential citations or tool calls) | |
| bracket_idx = low_buf.find("[") | |
| source_idx = low_buf.find("【") | |
| indices = [(i, t) for i, t in [ | |
| (tool_idx, "TOOL"), (tag_idx, "TAG"), (end_idx, "END"), | |
| (hint_idx, "HINT"), (url_idx, "URL_START"), | |
| (bracket_idx, "BRACKET"), (source_idx, "SOURCE") | |
| ] if i != -1] | |
| if not indices: | |
| keep_len = 0 | |
| for p in self.WATCH_PREFIXES: | |
| p_low = p.lower() | |
| for i in range(len(p) - 1, 0, -1): | |
| if low_buf.endswith(p_low[:i]): | |
| keep_len = max(keep_len, i) | |
| break | |
| yield_len = len(self.buffer) - keep_len | |
| if yield_len > 0: | |
| content = self.buffer[:yield_len] | |
| content = self.GARBAGE_URL_RE.sub("", content) | |
| content = self.CITATION_RE.sub("", content) | |
| if content: | |
| events.append({"type": "text", "content": content}) | |
| self.buffer = self.buffer[yield_len:] | |
| break | |
| indices.sort() | |
| m_idx, m_type = indices[0] | |
| if m_idx > 0: | |
| content = self.buffer[:m_idx] | |
| content = self.GARBAGE_URL_RE.sub("", content) | |
| content = self.CITATION_RE.sub("", content) | |
| if content: | |
| events.append({"type": "text", "content": content}) | |
| self.buffer = self.buffer[m_idx:] | |
| if m_type == "TOOL": | |
| self.state = "IN_TOOL_BLOCK" | |
| self.buffer = self.buffer[len(self.TOOL_START):] | |
| elif m_type == "TAG": | |
| self.state = "IN_TAG" | |
| self.buffer = self.buffer[len(self.TAG_START):] | |
| elif m_type == "END": | |
| self.buffer = self.buffer[len(self.TAG_END):] | |
| elif m_type == "HINT": | |
| self.state = "IN_HINT" | |
| self.buffer = self.buffer[len(self.HINT_START):] | |
| elif m_type == "URL_START": | |
| self.state = "IN_POTENTIAL_URL" | |
| elif m_type == "BRACKET": | |
| self.state = "IN_BRACKETED" | |
| elif m_type == "SOURCE": | |
| self.state = "IN_POTENTIAL_SOURCE" | |
| elif self.state == "IN_POTENTIAL_URL": | |
| match = self.URL_DELIMITER_RE.search(self.buffer) | |
| if match: | |
| end_idx = match.start() | |
| url_candidate = self.buffer[:end_idx] | |
| delimiter = self.buffer[end_idx] | |
| if self.GARBAGE_URL_RE.match(url_candidate): | |
| self.buffer = self.buffer[end_idx:] | |
| self.state = "NORMAL" | |
| continue | |
| remaining = self.buffer[end_idx:] | |
| if len(remaining) >= 2: | |
| if remaining.startswith("]("): | |
| events.append({"type": "text", "content": f"[{url_candidate}]("}) | |
| self.buffer = remaining[2:] | |
| self.state = "NORMAL" | |
| continue | |
| else: | |
| events.append({"type": "text", "content": url_candidate}) | |
| self.buffer = self.buffer[end_idx:] | |
| self.state = "NORMAL" | |
| continue | |
| else: | |
| break | |
| else: | |
| if len(self.buffer) > 2000: | |
| events.append({"type": "text", "content": self.buffer}) | |
| self.buffer = "" | |
| self.state = "NORMAL" | |
| break | |
| elif self.state == "IN_BRACKETED": | |
| # Handles citations [1] or tool calls [call:...] | |
| end_idx = self.buffer.find("]") | |
| if end_idx != -1: | |
| full_match = self.buffer[:end_idx + 1] | |
| if self.BRACKET_NUM_RE.match(full_match): | |
| # It's a citation, skip it | |
| self.buffer = self.buffer[end_idx + 1:] | |
| self.state = "NORMAL" | |
| else: | |
| # Pass back to NORMAL (first character) to be handled as text or other markers | |
| events.append({"type": "text", "content": "["}) | |
| self.buffer = self.buffer[1:] | |
| self.state = "NORMAL" | |
| else: | |
| if len(self.buffer) > 100: | |
| events.append({"type": "text", "content": "["}) | |
| self.buffer = self.buffer[1:] | |
| self.state = "NORMAL" | |
| break | |
| elif self.state == "IN_POTENTIAL_SOURCE": | |
| end_idx = self.buffer.find("】") | |
| if end_idx != -1: | |
| full_match = self.buffer[:end_idx + 1] | |
| if "source" in full_match.lower(): | |
| # Discard citation | |
| self.buffer = self.buffer[end_idx + 1:] | |
| self.state = "NORMAL" | |
| else: | |
| events.append({"type": "text", "content": "【"}) | |
| self.buffer = self.buffer[1:] | |
| self.state = "NORMAL" | |
| else: | |
| if len(self.buffer) > 100: | |
| events.append({"type": "text", "content": "【"}) | |
| self.buffer = self.buffer[1:] | |
| self.state = "NORMAL" | |
| break | |
| elif self.state == "IN_HINT": | |
| low_buf = self.buffer.lower() | |
| end_idx = low_buf.find(self.HINT_END.lower()) | |
| if end_idx != -1: | |
| self.buffer = self.buffer[end_idx + len(self.HINT_END):] | |
| self.state = "NORMAL" | |
| else: | |
| keep_len = len(self.HINT_END) - 1 | |
| if len(self.buffer) > keep_len: | |
| self.buffer = self.buffer[-keep_len:] | |
| break | |
| elif self.state == "IN_TOOL_BLOCK": | |
| low_buf = self.buffer.lower() | |
| call_idx = low_buf.find(self.CALL_START.lower()) | |
| block_end_idx = low_buf.find(self.TOOL_END_PREFIX.lower()) | |
| m_indices = [(i, t) for i, t in [(call_idx, "CALL"), (block_end_idx, "BLOCK_END")] if i != -1] | |
| if not m_indices: | |
| pass | |
| else: | |
| m_indices.sort() | |
| idx, m_type = m_indices[0] | |
| self.buffer = self.buffer[idx:] | |
| if m_type == "CALL": | |
| self.state = "IN_CALL_TITLE" | |
| self.buffer = self.buffer[len(self.CALL_START):] | |
| continue | |
| elif m_type == "BLOCK_END": | |
| self.state = "NORMAL" | |
| bracket_idx = self.buffer.find("]", len(self.TOOL_END_PREFIX)) | |
| if bracket_idx != -1 and bracket_idx < 50: | |
| self.buffer = self.buffer[bracket_idx + 1:] | |
| else: | |
| self.buffer = self.buffer[len(self.TOOL_END_PREFIX):] | |
| continue | |
| max_marker_len = max(len(self.CALL_START), len(self.TOOL_END_PREFIX)) | |
| if len(self.buffer) > max_marker_len: | |
| self.buffer = self.buffer[-(max_marker_len - 1):] if max_marker_len > 1 else "" | |
| break | |
| elif self.state == "IN_CALL_TITLE": | |
| low_buf = self.buffer.lower() | |
| title_end_idx = low_buf.find(self.CALL_TITLE_END.lower()) | |
| if title_end_idx != -1: | |
| tool_name = self.buffer[:title_end_idx].strip() | |
| self.current_call_id = f"call_{uuid.uuid4().hex[:24]}" | |
| events.append({ | |
| "type": "tool_start", | |
| "index": self.tool_call_index, | |
| "id": self.current_call_id, | |
| "name": tool_name | |
| }) | |
| self.buffer = self.buffer[title_end_idx + len(self.CALL_TITLE_END):] | |
| self.state = "IN_CALL_ARGS" | |
| self.args_started = False | |
| else: | |
| break | |
| elif self.state == "IN_CALL_ARGS": | |
| low_buf = self.buffer.lower() | |
| call_end_idx = low_buf.find(self.CALL_END.lower()) | |
| block_end_idx = low_buf.find(self.TOOL_END_PREFIX.lower()) | |
| stop_idx = -1 | |
| m_type = "" | |
| if call_end_idx != -1 and block_end_idx != -1: | |
| if call_end_idx < block_end_idx: | |
| stop_idx = call_end_idx | |
| m_type = "CALL_END" | |
| else: | |
| stop_idx = block_end_idx | |
| m_type = "BLOCK_END" | |
| elif call_end_idx != -1: | |
| stop_idx = call_end_idx | |
| m_type = "CALL_END" | |
| elif block_end_idx != -1: | |
| stop_idx = block_end_idx | |
| m_type = "BLOCK_END" | |
| if stop_idx != -1: | |
| args_part = self.buffer[:stop_idx] | |
| if not self.args_started: | |
| stripped_args = args_part.lstrip() | |
| # More robust handling: accept any non-empty content | |
| # Even if it doesn't start with '{', we still emit it | |
| # The JSON parsing will handle errors downstream | |
| if stripped_args: | |
| self.args_started = True | |
| # If doesn't start with '{', try to fix common issues | |
| if not stripped_args.startswith("{"): | |
| # Check if it might be a valid JSON that's missing the opening brace | |
| # or if it's just plain text arguments | |
| args_part = "{" + args_part.lstrip() | |
| if args_part: | |
| events.append({"type": "tool_delta", "index": self.tool_call_index, "content": args_part}) | |
| events.append({"type": "tool_end", "index": self.tool_call_index}) | |
| self.tool_call_index += 1 | |
| self.current_call_id = None | |
| self.args_started = False | |
| if m_type == "CALL_END": | |
| self.buffer = self.buffer[stop_idx + len(self.CALL_END):] | |
| self.state = "IN_TOOL_BLOCK" | |
| else: | |
| self.buffer = self.buffer[stop_idx:] | |
| self.state = "IN_TOOL_BLOCK" | |
| else: | |
| if not self.args_started: | |
| stripped = self.buffer.lstrip() | |
| if stripped: | |
| if stripped.startswith("{"): | |
| self.args_started = True | |
| else: | |
| # Be more tolerant: accept arguments even without opening brace | |
| # Common issue: LLM outputs `key: value` instead of `{"key": "value"}` | |
| self.args_started = True | |
| if self.args_started: | |
| keep_len = max(len(self.CALL_END), len(self.TOOL_END_PREFIX)) - 1 | |
| if len(self.buffer) > keep_len: | |
| chunk_to_send = self.buffer[:-keep_len] | |
| if chunk_to_send: | |
| events.append({"type": "tool_delta", "index": self.tool_call_index, "content": chunk_to_send}) | |
| self.buffer = self.buffer[-keep_len:] | |
| else: | |
| # Buffer has content but we haven't found JSON start yet | |
| # Wait a bit more for potential JSON start | |
| if len(self.buffer) > 100: | |
| # Timeout waiting for JSON, emit what we have | |
| events.append({"type": "tool_end", "index": self.tool_call_index}) | |
| self.tool_call_index += 1 | |
| self.current_call_id = None | |
| self.args_started = False | |
| self.state = "IN_TOOL_BLOCK" | |
| break | |
| elif self.state == "IN_TAG": | |
| nl_idx = self.buffer.find("\n") | |
| if nl_idx != -1: | |
| self.current_role = self.buffer[:nl_idx].strip().lower() | |
| self.buffer = self.buffer[nl_idx + 1:] | |
| self.state = "IN_BLOCK" | |
| else: | |
| break | |
| elif self.state == "IN_BLOCK": | |
| end_idx = self.buffer.find(self.TAG_END) | |
| if end_idx != -1: | |
| content = self.buffer[:end_idx] | |
| if self.current_role != "tool": | |
| events.append({"type": "text", "content": content}) | |
| self.buffer = self.buffer[end_idx + len(self.TAG_END):] | |
| self.state = "NORMAL" | |
| self.current_role = "" | |
| else: | |
| keep_len = len(self.TAG_END) - 1 | |
| if self.current_role != "tool": | |
| if len(self.buffer) > keep_len: | |
| events.append({"type": "text", "content": self.buffer[:-keep_len]}) | |
| self.buffer = self.buffer[-keep_len:] | |
| break | |
| else: | |
| if len(self.buffer) > keep_len: | |
| self.buffer = self.buffer[-keep_len:] | |
| break | |
| return events | |
| def flush(self) -> list[dict[str, Any]]: | |
| events = [] | |
| if self.buffer: | |
| if self.state == "IN_CALL_ARGS": | |
| events.append({"type": "tool_delta", "index": self.tool_call_index, "content": self.buffer}) | |
| events.append({"type": "tool_end", "index": self.tool_call_index}) | |
| self.tool_call_index += 1 | |
| elif self.state in ("NORMAL", "IN_POTENTIAL_URL", "IN_BRACKETED", "IN_POTENTIAL_SOURCE"): | |
| content = self.GARBAGE_URL_RE.sub("", self.buffer) | |
| content = self.CITATION_RE.sub("", content) | |
| if content: | |
| events.append({"type": "text", "content": content}) | |
| elif self.state == "IN_BLOCK" and self.current_role != "tool": | |
| events.append({"type": "text", "content": self.buffer}) | |
| self.buffer = "" | |
| self.state = "NORMAL" | |
| self.current_call_id = None | |
| self.args_started = False | |
| return events | |
| # --- Response Builders & Streaming --- | |
| def _create_real_streaming_response( | |
| generator: AsyncGenerator[ModelOutput, None], | |
| completion_id: str, | |
| created_time: int, | |
| model_name: str, | |
| messages: list[Message], | |
| db: LMDBConversationStore, | |
| model: Model, | |
| client_wrapper: GeminiClientWrapper, | |
| session: ChatSession, | |
| base_url: str, | |
| structured_requirement: StructuredOutputRequirement | None = None, | |
| ) -> StreamingResponse: | |
| """ | |
| Create a real-time streaming response. | |
| Reconciles manual delta accumulation with the model's final authoritative state. | |
| """ | |
| async def generate_stream(): | |
| full_thoughts, full_text = "", "" | |
| has_started = False | |
| last_chunk_was_thought = False | |
| all_outputs: list[ModelOutput] = [] | |
| suppressor = StreamingOutputFilter() | |
| try: | |
| async for chunk in generator: | |
| all_outputs.append(chunk) | |
| if not has_started: | |
| data = { | |
| "id": completion_id, | |
| "object": "chat.completion.chunk", | |
| "created": created_time, | |
| "model": model_name, | |
| "choices": [ | |
| {"index": 0, "delta": {"role": "assistant"}, "finish_reason": None} | |
| ], | |
| } | |
| yield f"data: {orjson.dumps(data).decode('utf-8')}\n\n" | |
| has_started = True | |
| # Robust thought delta calculation | |
| new_thoughts = chunk.thoughts or "" | |
| if len(new_thoughts) > len(full_thoughts): | |
| t_delta = new_thoughts[len(full_thoughts) :] | |
| if not last_chunk_was_thought and not full_thoughts: | |
| yield f"data: {orjson.dumps({'id': completion_id, 'object': 'chat.completion.chunk', 'created': created_time, 'model': model_name, 'choices': [{'index': 0, 'delta': {'content': '<think>'}, 'finish_reason': None}]}).decode('utf-8')}\n\n" | |
| data = { | |
| "id": completion_id, | |
| "object": "chat.completion.chunk", | |
| "created": created_time, | |
| "model": model_name, | |
| "choices": [ | |
| {"index": 0, "delta": {"content": t_delta}, "finish_reason": None} | |
| ], | |
| } | |
| yield f"data: {orjson.dumps(data).decode('utf-8')}\n\n" | |
| full_thoughts = new_thoughts | |
| last_chunk_was_thought = True | |
| # Robust text delta calculation | |
| new_text = chunk.text or "" | |
| # Strip transient triple-backtick fence which is often added by the web-scraping backend | |
| # during streaming snapshots. We will restore the real ending after the loop. | |
| display_text = StreamingOutputFilter.TRAILING_FENCE_RE.sub('', new_text) | |
| if len(display_text) > len(full_text): | |
| text_delta = display_text[len(full_text) :] | |
| if last_chunk_was_thought: | |
| yield f"data: {orjson.dumps({'id': completion_id, 'object': 'chat.completion.chunk', 'created': created_time, 'model': model_name, 'choices': [{'index': 0, 'delta': {'content': '</think>\n'}, 'finish_reason': None}]}).decode('utf-8')}\n\n" | |
| last_chunk_was_thought = False | |
| filter_events = suppressor.process(text_delta) | |
| for event in filter_events: | |
| delta = {} | |
| if event['type'] == 'text': | |
| delta = {"content": event['content']} | |
| elif event['type'] == 'tool_start': | |
| delta = {"tool_calls": [{"index": event['index'], "id": event['id'], "type": "function", "function": {"name": event['name'], "arguments": ""}}]} | |
| elif event['type'] == 'tool_delta': | |
| delta = {"tool_calls": [{"index": event['index'], "function": {"arguments": event['content']}}]} | |
| if delta: | |
| data = { | |
| "id": completion_id, | |
| "object": "chat.completion.chunk", | |
| "created": created_time, | |
| "model": model_name, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": delta, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| yield f"data: {orjson.dumps(data).decode('utf-8')}\n\n" | |
| full_text = display_text | |
| except Exception as e: | |
| logger.exception(f"Error during OpenAI streaming: {e}") | |
| yield f"data: {orjson.dumps({'error': {'message': 'Streaming error occurred.', 'type': 'server_error', 'param': None, 'code': None}}).decode('utf-8')}\n\n" | |
| return | |
| if all_outputs: | |
| final_chunk = all_outputs[-1] | |
| final_text = final_chunk.text or "" | |
| if len(final_text) > len(full_text): | |
| # Yield any real trailing content (like the actual closing fence) | |
| final_delta = final_text[len(full_text) :] | |
| for event in suppressor.process(final_delta): | |
| if event['type'] == 'text': | |
| yield f"data: {orjson.dumps({'id': completion_id, 'object': 'chat.completion.chunk', 'created': created_time, 'model': model_name, 'choices': [{'index': 0, 'delta': {'content': event['content']}, 'finish_reason': None}]}).decode('utf-8')}\n\n" | |
| full_text = final_text | |
| if final_chunk.thoughts: | |
| full_thoughts = final_chunk.thoughts | |
| if last_chunk_was_thought: | |
| yield f"data: {orjson.dumps({'id': completion_id, 'object': 'chat.completion.chunk', 'created': created_time, 'model': model_name, 'choices': [{'index': 0, 'delta': {'content': '</think>\n'}, 'finish_reason': None}]}).decode('utf-8')}\n\n" | |
| for event in suppressor.flush(): | |
| delta = {} | |
| if event['type'] == 'text': | |
| delta = {"content": event['content']} | |
| elif event['type'] == 'tool_start': | |
| delta = {"tool_calls": [{"index": event['index'], "id": event['id'], "type": "function", "function": {"name": event['name'], "arguments": ""}}]} | |
| elif event['type'] == 'tool_delta': | |
| delta = {"tool_calls": [{"index": event['index'], "function": {"arguments": event['content']}}]} | |
| if delta: | |
| data = { | |
| "id": completion_id, | |
| "object": "chat.completion.chunk", | |
| "created": created_time, | |
| "model": model_name, | |
| "choices": [ | |
| {"index": 0, "delta": delta, "finish_reason": None} | |
| ], | |
| } | |
| yield f"data: {orjson.dumps(data).decode('utf-8')}\n\n" | |
| raw_output_with_think = f"<think>{full_thoughts}</think>\n" if full_thoughts else "" | |
| raw_output_with_think += full_text | |
| assistant_text, storage_output, tool_calls = _process_llm_output( | |
| raw_output_with_think, full_text, structured_requirement | |
| ) | |
| images = [] | |
| seen_urls = set() | |
| for out in all_outputs: | |
| if out.images: | |
| for img in out.images: | |
| # Use the image URL as a stable identifier across chunks | |
| if img.url not in seen_urls: | |
| images.append(img) | |
| seen_urls.add(img.url) | |
| image_markdown = "" | |
| seen_hashes = set() | |
| for image in images: | |
| try: | |
| image_store = get_image_store_dir() | |
| _, _, _, filename, file_hash = await _image_to_base64(image, image_store) | |
| if file_hash in seen_hashes: | |
| # Duplicate content, delete the file and skip | |
| (image_store / filename).unlink(missing_ok=True) | |
| continue | |
| seen_hashes.add(file_hash) | |
| img_url = ( | |
| f"})" | |
| ) | |
| image_markdown += f"\n\n{img_url}" | |
| except Exception as exc: | |
| logger.warning(f"Failed to process image in OpenAI stream: {exc}") | |
| if image_markdown: | |
| assistant_text += image_markdown | |
| storage_output += image_markdown | |
| yield f"data: {orjson.dumps({'id': completion_id, 'object': 'chat.completion.chunk', 'created': created_time, 'model': model_name, 'choices': [{'index': 0, 'delta': {'content': image_markdown}, 'finish_reason': None}]}).decode('utf-8')}\n\n" | |
| p_tok, c_tok, t_tok = _calculate_usage(messages, assistant_text, tool_calls) | |
| usage = {"prompt_tokens": p_tok, "completion_tokens": c_tok, "total_tokens": t_tok} | |
| final_delta = {} | |
| if tool_calls: | |
| # Add index field to each tool_call for OpenAI compatibility | |
| final_delta["tool_calls"] = [ | |
| {**call.model_dump(mode="json"), "index": i} | |
| for i, call in enumerate(tool_calls) | |
| ] | |
| data = { | |
| "id": completion_id, | |
| "object": "chat.completion.chunk", | |
| "created": created_time, | |
| "model": model_name, | |
| "choices": [ | |
| {"index": 0, "delta": final_delta, "finish_reason": "tool_calls" if tool_calls else "stop"} | |
| ], | |
| "usage": usage, | |
| } | |
| _persist_conversation( | |
| db, | |
| model.model_name, | |
| client_wrapper.id, | |
| session.metadata, | |
| messages, # This should be the prepared messages | |
| storage_output, | |
| tool_calls, | |
| ) | |
| yield f"data: {orjson.dumps(data).decode('utf-8')}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return StreamingResponse(generate_stream(), media_type="text/event-stream") | |
| def _create_responses_real_streaming_response( | |
| generator: AsyncGenerator[ModelOutput, None], | |
| response_id: str, | |
| created_time: int, | |
| model_name: str, | |
| messages: list[Message], | |
| db: LMDBConversationStore, | |
| model: Model, | |
| client_wrapper: GeminiClientWrapper, | |
| session: ChatSession, | |
| request: ResponseCreateRequest, | |
| image_store: Path, | |
| base_url: str, | |
| structured_requirement: StructuredOutputRequirement | None = None, | |
| ) -> StreamingResponse: | |
| """ | |
| Create a real-time streaming response for the Responses API. | |
| Ensures final accumulated text and thoughts are synchronized. | |
| """ | |
| base_event = { | |
| "id": response_id, | |
| "object": "response", | |
| "created_at": created_time, | |
| "model": model_name, | |
| } | |
| async def generate_stream(): | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.created', 'response': {'id': response_id, 'object': 'response', 'created_at': created_time, 'model': model_name, 'status': 'in_progress', 'metadata': request.metadata, 'input': None, 'tools': request.tools, 'tool_choice': request.tool_choice}}).decode('utf-8')}\n\n" | |
| message_id = f"msg_{uuid.uuid4().hex}" | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_item.added', 'output_index': 0, 'item': {'id': message_id, 'type': 'message', 'role': 'assistant', 'content': []}}).decode('utf-8')}\n\n" | |
| full_thoughts, full_text = "", "" | |
| last_chunk_was_thought = False | |
| all_outputs: list[ModelOutput] = [] | |
| suppressor = StreamingOutputFilter() | |
| try: | |
| async for chunk in generator: | |
| all_outputs.append(chunk) | |
| # Robust thought delta calculation | |
| new_thoughts = chunk.thoughts or "" | |
| if len(new_thoughts) > len(full_thoughts): | |
| t_delta = new_thoughts[len(full_thoughts) :] | |
| if not last_chunk_was_thought and not full_thoughts: | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_text.delta', 'output_index': 0, 'delta': '<think>'}).decode('utf-8')}\n\n" | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_text.delta', 'output_index': 0, 'delta': t_delta}).decode('utf-8')}\n\n" | |
| full_thoughts = new_thoughts | |
| last_chunk_was_thought = True | |
| # Robust text delta calculation | |
| new_text = chunk.text or "" | |
| # Strip transient triple-backtick fence which is often added by the web-scraping backend | |
| # during streaming snapshots. We will restore the real ending after the loop. | |
| display_text = StreamingOutputFilter.TRAILING_FENCE_RE.sub('', new_text) | |
| if len(display_text) > len(full_text): | |
| text_delta = display_text[len(full_text) :] | |
| if last_chunk_was_thought: | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_text.delta', 'output_index': 0, 'delta': '</think>\n'}).decode('utf-8')}\n\n" | |
| last_chunk_was_thought = False | |
| filter_events = suppressor.process(text_delta) | |
| for event in filter_events: | |
| if event['type'] == 'text': | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_text.delta', 'output_index': 0, 'delta': event['content']}).decode('utf-8')}\n\n" | |
| # Note: This API currently handles tool calls as discrete items at the end. | |
| # We ignore internal tool call events in the text stream for now. | |
| full_text = display_text | |
| except Exception as e: | |
| logger.exception(f"Error during Responses API streaming: {e}") | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'error', 'error': {'message': 'Streaming error.'}}).decode('utf-8')}\n\n" | |
| return | |
| if all_outputs: | |
| final_chunk = all_outputs[-1] | |
| final_text = final_chunk.text or "" | |
| if len(final_text) > len(full_text): | |
| # Yield any real trailing content (like the actual closing fence) | |
| final_delta = final_text[len(full_text) :] | |
| for event in suppressor.process(final_delta): | |
| if event['type'] == 'text': | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_text.delta', 'output_index': 0, 'delta': event['content']}).decode('utf-8')}\n\n" | |
| full_text = final_text | |
| if final_chunk.thoughts: | |
| full_thoughts = final_chunk.thoughts | |
| if last_chunk_was_thought: | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_text.delta', 'output_index': 0, 'delta': '</think>\n'}).decode('utf-8')}\n\n" | |
| for event in suppressor.flush(): | |
| if event['type'] == 'text': | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_text.delta', 'output_index': 0, 'delta': event['content']}).decode('utf-8')}\n\n" | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_text.done', 'output_index': 0}).decode('utf-8')}\n\n" | |
| raw_output_with_think = f"<think>{full_thoughts}</think>\n" if full_thoughts else "" | |
| raw_output_with_think += full_text | |
| assistant_text, storage_output, detected_tool_calls = _process_llm_output( | |
| raw_output_with_think, full_text, structured_requirement | |
| ) | |
| images = [] | |
| seen_urls = set() | |
| for out in all_outputs: | |
| if out.images: | |
| for img in out.images: | |
| if img.url not in seen_urls: | |
| images.append(img) | |
| seen_urls.add(img.url) | |
| # Check if image generation was forced via tool_choice (same logic as non-streaming) | |
| image_generation_forced = ( | |
| request.tool_choice is not None | |
| and isinstance(request.tool_choice, ResponseToolChoice) | |
| and request.tool_choice.type == "image_generation" | |
| ) | |
| if image_generation_forced and not images and not assistant_text: | |
| logger.warning("Image generation was forced via tool_choice but no images or text were returned in stream.") | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'error', 'error': {'message': 'No images returned from forced image generation request.'}}).decode('utf-8')}\n\n" | |
| return | |
| response_contents, image_call_items = [], [] | |
| seen_hashes = set() | |
| for image in images: | |
| try: | |
| image_base64, width, height, filename, file_hash = await _image_to_base64( | |
| image, image_store | |
| ) | |
| if file_hash in seen_hashes: | |
| (image_store / filename).unlink(missing_ok=True) | |
| continue | |
| seen_hashes.add(file_hash) | |
| img_format = "png" if isinstance(image, GeneratedImage) else "jpeg" | |
| image_url = ( | |
| f"})" | |
| ) | |
| image_call_items.append( | |
| ResponseImageGenerationCall( | |
| id=filename.rsplit(".", 1)[0], | |
| result=image_base64, | |
| output_format=img_format, | |
| size=f"{width}x{height}" if width and height else None, | |
| ) | |
| ) | |
| response_contents.append(ResponseOutputContent(type="output_text", text=image_url)) | |
| except Exception as exc: | |
| logger.warning(f"Failed to process image in stream: {exc}") | |
| if assistant_text: | |
| response_contents.append(ResponseOutputContent(type="output_text", text=assistant_text)) | |
| if not response_contents: | |
| response_contents.append(ResponseOutputContent(type="output_text", text="")) | |
| # Aggregate images for storage | |
| image_markdown = "" | |
| for img_call in image_call_items: | |
| fname = f"{img_call.id}.{img_call.output_format}" | |
| img_url = f"})" | |
| image_markdown += f"\n\n{img_url}" | |
| if image_markdown: | |
| storage_output += image_markdown | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_item.done', 'output_index': 0, 'item': {'id': message_id, 'type': 'message', 'role': 'assistant', 'content': [c.model_dump(mode='json') for c in response_contents]}}).decode('utf-8')}\n\n" | |
| current_idx = 1 | |
| for call in detected_tool_calls: | |
| tc_item = ResponseToolCall(id=call.id, status="completed", function=call.function) | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_item.added', 'output_index': current_idx, 'item': tc_item.model_dump(mode='json')}).decode('utf-8')}\n\n" | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_item.done', 'output_index': current_idx, 'item': tc_item.model_dump(mode='json')}).decode('utf-8')}\n\n" | |
| current_idx += 1 | |
| for img_call in image_call_items: | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_item.added', 'output_index': current_idx, 'item': img_call.model_dump(mode='json')}).decode('utf-8')}\n\n" | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.output_item.done', 'output_index': current_idx, 'item': img_call.model_dump(mode='json')}).decode('utf-8')}\n\n" | |
| current_idx += 1 | |
| p_tok, c_tok, t_tok = _calculate_usage(messages, assistant_text, detected_tool_calls) | |
| usage = ResponseUsage(input_tokens=p_tok, output_tokens=c_tok, total_tokens=t_tok) | |
| payload = _create_responses_standard_payload( | |
| response_id, | |
| created_time, | |
| model_name, | |
| detected_tool_calls, | |
| image_call_items, | |
| response_contents, | |
| usage, | |
| request, | |
| None, | |
| ) | |
| _persist_conversation( | |
| db, | |
| model.model_name, | |
| client_wrapper.id, | |
| session.metadata, | |
| messages, | |
| storage_output, | |
| detected_tool_calls, | |
| ) | |
| yield f"data: {orjson.dumps({**base_event, 'type': 'response.completed', 'response': payload.model_dump(mode='json')}).decode('utf-8')}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return StreamingResponse(generate_stream(), media_type="text/event-stream") | |
| # --- Core Session Logic --- | |
| async def _execute_core_chat_session( | |
| request_model: str, | |
| messages: list[Message], | |
| tmp_dir: Path, | |
| tools: list[Tool] | None = None, | |
| tool_choice: str | ToolChoiceFunction | None = None, | |
| stream: bool = False, | |
| extra_instructions: list[str] | None = None, | |
| response_format: dict[str, Any] | None = None, | |
| ): | |
| """ | |
| Common logic or executing a chat session with Gemini. | |
| Forces model to 'gemini-3.0-pro' regardless of request_model. | |
| Returns: (resp_or_stream, completion_id, created_time, session, client, db, model_obj, prepared_messages, structured_req) | |
| """ | |
| # FORCE MODEL OVERRIDE | |
| forced_model_name = "gemini-3.0-pro" | |
| pool, db = GeminiClientPool(), LMDBConversationStore() | |
| try: | |
| model = _get_model_by_name(forced_model_name) | |
| except ValueError as exc: | |
| # Fallback if gemini-3.0-pro isn't in config/models for some reason, though it should be. | |
| # But per requirements we MUST use it. | |
| # If it fails here, it means the server isn't configured for it. | |
| # We try to proceed with what we have if 3.0 fails, OR we hard error. | |
| # Given "can only be gemini-3.0-pro", we should probably assume it exists or fail. | |
| # Let's try to get it, if not found, we might try to construct it or fail. | |
| # _get_model_by_name handles custom models and strategy. | |
| # If it fails, we raise. | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Required model '{forced_model_name}' not available: {str(exc)}") from exc | |
| if not messages: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Messages required.") | |
| structured_requirement = _build_structured_requirement(response_format) | |
| # Merge extra instructions with structured requirement instruction | |
| final_extra_instr = list(extra_instructions) if extra_instructions else [] | |
| if structured_requirement: | |
| final_extra_instr.append(structured_requirement.instruction) | |
| # This ensures that server-injected system instructions are part of the history | |
| msgs = _prepare_messages_for_model( | |
| messages, | |
| tools, | |
| tool_choice, | |
| final_extra_instr or None, | |
| ) | |
| session, client, remain = await _find_reusable_session(db, pool, model, msgs) | |
| # Check if we should actually reuse the session | |
| if session and not remain: | |
| # Special case: if the last message is a tool result, we should process it | |
| # even if remain is empty (this happens when tool message is at the end) | |
| if messages[-1].role == "tool": | |
| # Create a remain list with just the tool message | |
| remain = [messages[-1]] | |
| elif messages[-1].role == "assistant": | |
| # If the last message is assistant and there are no new messages, | |
| # we should not reuse this session. Let the client create a new session | |
| # or handle this case differently. | |
| logger.debug("Session matched but no new messages after assistant turn - not reusing session.") | |
| session = None | |
| client = None | |
| remain = msgs | |
| else: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No new messages.") | |
| if session: | |
| # For reused sessions, we only need to process the remaining messages. | |
| input_msgs = _prepare_messages_for_model( | |
| remain, | |
| tools, | |
| tool_choice, | |
| final_extra_instr or None, | |
| True, # Ensure tool hints/instructions are present in the turn | |
| ) | |
| if len(input_msgs) == 1: | |
| m_input, files = await GeminiClientWrapper.process_message( | |
| input_msgs[0], tmp_dir, tagged=False | |
| ) | |
| else: | |
| m_input, files = await GeminiClientWrapper.process_conversation(input_msgs, tmp_dir) | |
| logger.debug( | |
| f"Reused session {reprlib.repr(session.metadata)} - sending {len(input_msgs)} prepared messages." | |
| ) | |
| else: | |
| try: | |
| client = await pool.acquire() | |
| session = client.start_chat(model=model) | |
| m_input, files = await GeminiClientWrapper.process_conversation(msgs, tmp_dir) | |
| except Exception as e: | |
| logger.exception("Error in preparing conversation") | |
| raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(e)) | |
| completion_id = f"chatcmpl-{uuid.uuid4()}" | |
| created_time = int(datetime.now(tz=timezone.utc).timestamp()) | |
| try: | |
| assert session and client | |
| logger.debug( | |
| f"Client ID: {client.id}, Input length: {len(m_input)}, files count: {len(files)}" | |
| ) | |
| resp_or_stream = await _send_with_split( | |
| session, m_input, files=files, stream=stream | |
| ) | |
| except Exception as e: | |
| logger.exception("Gemini API error") | |
| raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(e)) | |
| return resp_or_stream, completion_id, created_time, session, client, db, model, msgs, structured_requirement | |
| async def _create_anthropic_streaming_response( | |
| generator: AsyncGenerator[ModelOutput, None], | |
| completion_id: str, | |
| created_time: int, | |
| model_name: str, | |
| messages: list[Message], | |
| ): | |
| """ | |
| Stream Gemini output as Anthropic SSE events. | |
| Now supports structured tool call streaming. | |
| """ | |
| async def generate_stream(): | |
| # 1. Message Start | |
| msg_start = { | |
| "type": "message_start", | |
| "message": { | |
| "id": completion_id, | |
| "type": "message", | |
| "role": "assistant", | |
| "content": [], | |
| "model": model_name, | |
| "stop_reason": None, | |
| "stop_sequence": None, | |
| "usage": {"input_tokens": 0, "output_tokens": 0} | |
| } | |
| } | |
| yield f"event: message_start\ndata: {orjson.dumps(msg_start).decode('utf-8')}\n\n" | |
| full_text = "" | |
| suppressor = StreamingOutputFilter() | |
| content_block_index = 0 | |
| in_text_block = False | |
| last_chunk: ModelOutput | None = None | |
| # 3. Stream Deltas | |
| try: | |
| async for chunk in generator: | |
| last_chunk = chunk | |
| if chunk.text: | |
| display_text = StreamingOutputFilter.TRAILING_FENCE_RE.sub('', chunk.text) | |
| if len(display_text) > len(full_text): | |
| delta_text = display_text[len(full_text):] | |
| full_text = display_text | |
| filter_events = suppressor.process(delta_text) | |
| for event in filter_events: | |
| if event['type'] == 'text': | |
| if not in_text_block: | |
| # Start a new text block if not in one | |
| yield f"event: content_block_start\ndata: {orjson.dumps({'type': 'content_block_start', 'index': content_block_index, 'content_block': {'type': 'text', 'text': ''}}).decode('utf-8')}\n\n" | |
| in_text_block = True | |
| delta = { | |
| "type": "content_block_delta", | |
| "index": content_block_index, | |
| "delta": {"type": "text_delta", "text": event['content']} | |
| } | |
| yield f"event: content_block_delta\ndata: {orjson.dumps(delta).decode('utf-8')}\n\n" | |
| elif event['type'] == 'tool_start': | |
| if in_text_block: | |
| yield f"event: content_block_stop\ndata: {orjson.dumps({'type': 'content_block_stop', 'index': content_block_index}).decode('utf-8')}\n\n" | |
| content_block_index += 1 | |
| in_text_block = False | |
| # Anthropic tool_use start | |
| yield f"event: content_block_start\ndata: {orjson.dumps({'type': 'content_block_start', 'index': content_block_index, 'content_block': {'type': 'tool_use', 'id': event['id'], 'name': event['name'], 'input': {}}}).decode('utf-8')}\n\n" | |
| elif event['type'] == 'tool_delta': | |
| # Anthropic tool_use delta (input_json_delta) | |
| delta = { | |
| "type": "content_block_delta", | |
| "index": content_block_index, | |
| "delta": {"type": "input_json_delta", "partial_json": event['content']} | |
| } | |
| yield f"event: content_block_delta\ndata: {orjson.dumps(delta).decode('utf-8')}\n\n" | |
| elif event['type'] == 'tool_end': | |
| yield f"event: content_block_stop\ndata: {orjson.dumps({'type': 'content_block_stop', 'index': content_block_index}).decode('utf-8')}\n\n" | |
| content_block_index += 1 | |
| # We stay in "not in_text_block" state until next text event | |
| # Handle final delta after loop ends | |
| final_text = "" | |
| final_thoughts = "" | |
| if last_chunk: | |
| final_text = last_chunk.text or "" | |
| final_thoughts = last_chunk.thoughts or "" | |
| if len(final_text) > len(full_text): | |
| final_delta = final_text[len(full_text):] | |
| for event in suppressor.process(final_delta): | |
| if event['type'] == 'text': | |
| if not in_text_block: | |
| yield f"event: content_block_start\ndata: {orjson.dumps({'type': 'content_block_start', 'index': content_block_index, 'content_block': {'type': 'text', 'text': ''}}).decode('utf-8')}\n\n" | |
| in_text_block = True | |
| delta = { | |
| "type": "content_block_delta", | |
| "index": content_block_index, | |
| "delta": {"type": "text_delta", "text": event['content']} | |
| } | |
| yield f"event: content_block_delta\ndata: {orjson.dumps(delta).decode('utf-8')}\n\n" | |
| full_text = final_text | |
| # Final verification of tool calls (Built-in tools like image_retrieval often appear at the end) | |
| raw_output_with_think = f"<think>{final_thoughts}</think>\n" if final_thoughts else "" | |
| raw_output_with_think += full_text | |
| assistant_visible, assistant_storage, tool_calls = _process_llm_output( | |
| raw_output_with_think, full_text, None | |
| ) | |
| # If we detected tool_calls that weren't captured by suppressor yet, emit them now | |
| # (Note: suppressor currently handles manual formatting, but built-in tools are caught here) | |
| # To avoid duplicates if suppressor already found them, we could check event IDs. | |
| # But normally built-in tools won't trigger the regex-based suppressor. | |
| if tool_calls: | |
| for tcall in tool_calls: | |
| # Emit as a new content block if it's a tool_use | |
| if in_text_block: | |
| yield f"event: content_block_stop\ndata: {orjson.dumps({'type': 'content_block_stop', 'index': content_block_index}).decode('utf-8')}\n\n" | |
| content_block_index += 1 | |
| in_text_block = False | |
| yield f"event: content_block_start\ndata: {orjson.dumps({'type': 'content_block_start', 'index': content_block_index, 'content_block': {'type': 'tool_use', 'id': tcall.id, 'name': tcall.function.name, 'input': {}}}).decode('utf-8')}\n\n" | |
| yield f"event: content_block_delta\ndata: {orjson.dumps({'type': 'content_block_delta', 'index': content_block_index, 'delta': {'type': 'input_json_delta', 'partial_json': tcall.function.arguments}}).decode('utf-8')}\n\n" | |
| yield f"event: content_block_stop\ndata: {orjson.dumps({'type': 'content_block_stop', 'index': content_block_index}).decode('utf-8')}\n\n" | |
| content_block_index += 1 | |
| # Handle trailing buffer | |
| for event in suppressor.flush(): | |
| if event['type'] == 'text': | |
| if not in_text_block: | |
| yield f"event: content_block_start\ndata: {orjson.dumps({'type': 'content_block_start', 'index': content_block_index, 'content_block': {'type': 'text', 'text': ''}}).decode('utf-8')}\n\n" | |
| in_text_block = True | |
| delta = { | |
| "type": "content_block_delta", | |
| "index": content_block_index, | |
| "delta": {"type": "text_delta", "text": event['content']} | |
| } | |
| yield f"event: content_block_delta\ndata: {orjson.dumps(delta).decode('utf-8')}\n\n" | |
| elif event['type'] == 'tool_start': | |
| if in_text_block: | |
| yield f"event: content_block_stop\ndata: {orjson.dumps({'type': 'content_block_stop', 'index': content_block_index}).decode('utf-8')}\n\n" | |
| content_block_index += 1 | |
| in_text_block = False | |
| yield f"event: content_block_start\ndata: {orjson.dumps({'type': 'content_block_start', 'index': content_block_index, 'content_block': {'type': 'tool_use', 'id': event['id'], 'name': event['name'], 'input': {}}}).decode('utf-8')}\n\n" | |
| elif event['type'] == 'tool_delta': | |
| delta = { | |
| "type": "content_block_delta", | |
| "index": content_block_index, | |
| "delta": {"type": "input_json_delta", "partial_json": event['content']} | |
| } | |
| yield f"event: content_block_delta\ndata: {orjson.dumps(delta).decode('utf-8')}\n\n" | |
| elif event['type'] == 'tool_end': | |
| yield f"event: content_block_stop\ndata: {orjson.dumps({'type': 'content_block_stop', 'index': content_block_index}).decode('utf-8')}\n\n" | |
| content_block_index += 1 | |
| except Exception as e: | |
| logger.exception(f"Error during Anthropic streaming: {e}") | |
| yield f"event: error\ndata: {orjson.dumps({'type': 'error', 'error': {'type': 'api_error', 'message': str(e)}}).decode('utf-8')}\n\n" | |
| return | |
| if in_text_block: | |
| yield f"event: content_block_stop\ndata: {orjson.dumps({'type': 'content_block_stop', 'index': content_block_index}).decode('utf-8')}\n\n" | |
| # 5. Message Delta (Stop Reason & Usage) | |
| p_tok = sum(estimate_tokens(text_from_message(msg)) for msg in messages) | |
| c_tok = estimate_tokens(full_text) | |
| msg_delta = { | |
| "type": "message_delta", | |
| "delta": {"stop_reason": "end_turn", "stop_sequence": None}, | |
| "usage": {"output_tokens": c_tok} | |
| } | |
| yield f"event: message_delta\ndata: {orjson.dumps(msg_delta).decode('utf-8')}\n\n" | |
| # 6. Message Stop | |
| yield f"event: message_stop\ndata: {orjson.dumps({'type': 'message_stop'}).decode('utf-8')}\n\n" | |
| return StreamingResponse(generate_stream(), media_type="text/event-stream") | |
| # --- Main Router Endpoints --- | |
| async def list_models(api_key: str = Depends(verify_api_key)): | |
| models = _get_available_models() | |
| return ModelListResponse(data=models) | |
| async def create_chat_completion( | |
| request: ChatCompletionRequest, | |
| raw_request: Request, | |
| api_key: str = Depends(verify_api_key), | |
| tmp_dir: Path = Depends(get_temp_dir), | |
| image_store: Path = Depends(get_image_store_dir), | |
| ): | |
| base_url = str(raw_request.base_url) | |
| resp_or_stream, completion_id, created_time, session, client, db, model, msgs, struct_req = await _execute_core_chat_session( | |
| request_model=request.model, | |
| messages=request.messages, | |
| tmp_dir=tmp_dir, | |
| tools=request.tools, | |
| tool_choice=request.tool_choice, | |
| stream=request.stream, | |
| response_format=request.response_format, | |
| ) | |
| if request.stream: | |
| return _create_real_streaming_response( | |
| resp_or_stream, | |
| completion_id, | |
| created_time, | |
| "gemini-3.0-pro", # Force reported model name too? Or keep request? Let's report what we used. | |
| msgs, | |
| db, | |
| model, | |
| client, | |
| session, | |
| base_url, | |
| struct_req, | |
| ) | |
| try: | |
| raw_with_t = GeminiClientWrapper.extract_output(resp_or_stream, include_thoughts=True) | |
| raw_clean = GeminiClientWrapper.extract_output(resp_or_stream, include_thoughts=False) | |
| except Exception as exc: | |
| logger.exception("Gemini output parsing failed.") | |
| raise HTTPException( | |
| status_code=status.HTTP_502_BAD_GATEWAY, detail="Malformed response." | |
| ) from exc | |
| visible_output, storage_output, tool_calls = _process_llm_output( | |
| raw_with_t, raw_clean, struct_req | |
| ) | |
| # Process images for OpenAI non-streaming flow | |
| images = resp_or_stream.images or [] | |
| # Log response details for debugging | |
| logger.debug(f"Chat response: text_len={len(visible_output)}, images={len(images)}, tool_calls={len(tool_calls)}") | |
| logger.debug(f"Raw response text (first 500 chars): {raw_with_t[:500] if raw_with_t else 'EMPTY'}") | |
| # Check if response is completely empty | |
| if not visible_output and not images and not tool_calls: | |
| logger.warning("Gemini returned an empty response for chat completion (no text, images, or tool calls)") | |
| # Log more details about the response object | |
| logger.debug(f"Response object type: {type(resp_or_stream)}") | |
| logger.debug(f"Response has candidates: {hasattr(resp_or_stream, 'candidates')}") | |
| if hasattr(resp_or_stream, 'candidates'): | |
| logger.debug(f"Candidates: {resp_or_stream.candidates}") | |
| if hasattr(resp_or_stream, 'text'): | |
| logger.debug(f"Response text attribute: {repr(resp_or_stream.text)[:200] if resp_or_stream.text else 'None'}") | |
| image_markdown = "" | |
| seen_hashes = set() | |
| for image in images: | |
| try: | |
| _, _, _, filename, file_hash = await _image_to_base64(image, image_store) | |
| if file_hash in seen_hashes: | |
| (image_store / filename).unlink(missing_ok=True) | |
| continue | |
| seen_hashes.add(file_hash) | |
| img_url = ( | |
| f"})" | |
| ) | |
| image_markdown += f"\n\n{img_url}" | |
| except Exception as exc: | |
| logger.warning(f"Failed to process image in OpenAI response: {exc}") | |
| if image_markdown: | |
| visible_output += image_markdown | |
| storage_output += image_markdown | |
| tool_calls_payload = [call.model_dump(mode="json") for call in tool_calls] | |
| if tool_calls_payload: | |
| logger.debug(f"Detected tool calls: {reprlib.repr(tool_calls_payload)}") | |
| p_tok, c_tok, t_tok = _calculate_usage(msgs, visible_output, tool_calls) | |
| usage = {"prompt_tokens": p_tok, "completion_tokens": c_tok, "total_tokens": t_tok} | |
| payload = _create_chat_completion_standard_payload( | |
| completion_id, | |
| created_time, | |
| "gemini-3.0-pro", | |
| visible_output, | |
| tool_calls_payload, | |
| "tool_calls" if tool_calls else "stop", | |
| usage, | |
| ) | |
| _persist_conversation( | |
| db, | |
| model.model_name, | |
| client.id, | |
| session.metadata, | |
| msgs, | |
| storage_output, | |
| tool_calls, | |
| ) | |
| return payload | |
| async def create_anthropic_message( | |
| request: AnthropicMessageRequest, | |
| raw_request: Request, | |
| api_key: str = Depends(verify_api_key), | |
| tmp_dir: Path = Depends(get_temp_dir), | |
| ): | |
| """ | |
| Anthropic-compatible endpoint. | |
| """ | |
| # Adapter: Convert Anthropic request to internal Message format | |
| # Anthropic separates 'system' from 'messages'. | |
| converted_messages = [] | |
| # Handle system prompt | |
| if request.system: | |
| system_content = "" | |
| if isinstance(request.system, str): | |
| system_content = request.system | |
| elif isinstance(request.system, list): | |
| # Anthropic system list: [{"type": "text", "text": "..."}] | |
| for item in request.system: | |
| if item.get("type") == "text": | |
| system_content += item.get("text", "") + "\n" | |
| if system_content: | |
| converted_messages.append(Message(role="system", content=system_content)) | |
| # Handle messages (with potential Anthropic-style content arrays) | |
| for msg in request.messages: | |
| original_content = msg.content | |
| if isinstance(original_content, list): | |
| # Adapter: Convert Anthropic content array to internal ContentItem list | |
| new_content = [] | |
| for part in original_content: | |
| if isinstance(part, dict): | |
| p_type = part.get("type") | |
| if p_type == "text": | |
| new_content.append(ContentItem(type="text", text=part.get("text"))) | |
| elif p_type == "image" and "source" in part: | |
| # Anthropic image -> internal image_url | |
| source = part["source"] | |
| if source.get("type") == "base64": | |
| data = source.get("data") | |
| media_type = source.get("media_type") | |
| new_content.append(ContentItem( | |
| type="image_url", | |
| image_url={"url": f"data:{media_type};base64,{data}"} | |
| )) | |
| else: | |
| # Fallback for other types or already compatible parts | |
| pass | |
| if new_content: | |
| msg.content = new_content | |
| converted_messages.append(msg) | |
| # Tools: Convert Anthropic tool format to OpenAI-compatible format | |
| # Anthropic tools: [{"name": "...", "description": "...", "input_schema": {...}}] | |
| # OpenAI tools: [{"type": "function", "function": {"name": "...", "description": "...", "parameters": {...}}}] | |
| converted_tools = None | |
| if request.tools: | |
| converted_tools = [] | |
| for tool in request.tools: | |
| if isinstance(tool, dict): | |
| # Anthropic format: {name, description, input_schema} | |
| tool_name = tool.get("name") | |
| tool_desc = tool.get("description", "") | |
| # Anthropic uses "input_schema", OpenAI uses "parameters" | |
| tool_params = tool.get("input_schema") or tool.get("parameters", {}) | |
| if tool_name: | |
| converted_tools.append(Tool( | |
| type="function", | |
| function=ToolFunctionDefinition( | |
| name=tool_name, | |
| description=tool_desc, | |
| parameters=tool_params | |
| ) | |
| )) | |
| # Convert Anthropic tool_choice to OpenAI format | |
| converted_tool_choice = None | |
| if request.tool_choice: | |
| if isinstance(request.tool_choice, dict): | |
| # Anthropic: {"type": "tool", "name": "..."} or {"type": "auto"} etc. | |
| choice_type = request.tool_choice.get("type") | |
| if choice_type == "tool": | |
| tool_name = request.tool_choice.get("name") | |
| if tool_name: | |
| converted_tool_choice = ToolChoiceFunction( | |
| type="function", | |
| function=ToolChoiceFunctionDetail(name=tool_name) | |
| ) | |
| elif choice_type in ("auto", "any"): | |
| converted_tool_choice = "auto" # "any" maps to "auto" for now | |
| elif choice_type == "none": | |
| converted_tool_choice = "none" | |
| elif isinstance(request.tool_choice, str): | |
| # Direct string values: "auto", "any", "none" | |
| if request.tool_choice == "none": | |
| converted_tool_choice = "none" | |
| else: | |
| converted_tool_choice = "auto" # "auto" and "any" both map to auto | |
| resp_or_stream, completion_id, created_time, session, client, db, model, msgs, struct_req = await _execute_core_chat_session( | |
| request_model=request.model, | |
| messages=converted_messages, | |
| tmp_dir=tmp_dir, | |
| tools=converted_tools, | |
| tool_choice=converted_tool_choice, | |
| stream=request.stream, | |
| response_format=None, # Anthropic doesn't use response_format this way | |
| ) | |
| if request.stream: | |
| return await _create_anthropic_streaming_response( | |
| resp_or_stream, | |
| completion_id, | |
| created_time, | |
| "gemini-3.0-pro", | |
| msgs, | |
| ) | |
| # Non-streaming response | |
| try: | |
| raw_clean = GeminiClientWrapper.extract_output(resp_or_stream, include_thoughts=False) | |
| # Anthropic doesn't support thoughts natively in same field, let's just return clean text. | |
| except Exception as exc: | |
| raise HTTPException(status_code=502, detail="Malformed response") from exc | |
| # Process tool calls from the response | |
| raw_with_think = GeminiClientWrapper.extract_output(resp_or_stream, include_thoughts=True) | |
| assistant_visible, assistant_storage, tool_calls = _process_llm_output(raw_with_think, raw_clean, None) | |
| # Build Anthropic-style content blocks | |
| content_blocks = [] | |
| if assistant_visible: | |
| content_blocks.append({"type": "text", "text": assistant_visible}) | |
| # Add tool_use blocks if tool calls detected | |
| if tool_calls: | |
| for tcall in tool_calls: | |
| # Safely parse tool arguments | |
| tool_input = {} | |
| if tcall.function.arguments: | |
| try: | |
| tool_input = orjson.loads(tcall.function.arguments) | |
| except orjson.JSONDecodeError: | |
| # If parsing fails, use the raw string as input | |
| tool_input = {"raw": tcall.function.arguments} | |
| content_blocks.append({ | |
| "type": "tool_use", | |
| "id": tcall.id, | |
| "name": tcall.function.name, | |
| "input": tool_input | |
| }) | |
| p_tok, c_tok, t_tok = _calculate_usage(converted_messages, assistant_visible, tool_calls) | |
| # Determine stop_reason based on whether tool calls were made | |
| stop_reason = "tool_use" if tool_calls else "end_turn" | |
| # Persist conversation for future reuse | |
| _persist_conversation( | |
| db, | |
| model.model_name, | |
| client.id, | |
| session.metadata, | |
| msgs, | |
| assistant_storage, | |
| tool_calls, | |
| ) | |
| return { | |
| "id": completion_id, | |
| "type": "message", | |
| "role": "assistant", | |
| "content": content_blocks, | |
| "model": "gemini-3.0-pro", | |
| "stop_reason": stop_reason, | |
| "stop_sequence": None, | |
| "usage": { | |
| "input_tokens": p_tok, | |
| "output_tokens": c_tok | |
| } | |
| } | |
| async def create_response( | |
| request: ResponseCreateRequest, | |
| raw_request: Request, | |
| api_key: str = Depends(verify_api_key), | |
| tmp_dir: Path = Depends(get_temp_dir), | |
| image_store: Path = Depends(get_image_store_dir), | |
| ): | |
| base_url = str(raw_request.base_url) | |
| base_messages, norm_input = _response_items_to_messages(request.input) | |
| struct_req = _build_structured_requirement(request.response_format) | |
| extra_instr = [struct_req.instruction] if struct_req else [] | |
| standard_tools, image_tools = [], [] | |
| if request.tools: | |
| for t in request.tools: | |
| if isinstance(t, Tool): | |
| standard_tools.append(t) | |
| elif isinstance(t, ResponseImageTool): | |
| image_tools.append(t) | |
| elif isinstance(t, dict): | |
| if t.get("type") == "function": | |
| standard_tools.append(Tool.model_validate(t)) | |
| elif t.get("type") == "image_generation": | |
| image_tools.append(ResponseImageTool.model_validate(t)) | |
| img_instr = _build_image_generation_instruction( | |
| image_tools, | |
| request.tool_choice if isinstance(request.tool_choice, ResponseToolChoice) else None, | |
| ) | |
| if img_instr: | |
| extra_instr.append(img_instr) | |
| preface = _instructions_to_messages(request.instructions) | |
| conv_messages = [*preface, *base_messages] if preface else base_messages | |
| model_tool_choice = ( | |
| request.tool_choice if isinstance(request.tool_choice, (str, ToolChoiceFunction)) else None | |
| ) | |
| # Reuse core logic for session setup? | |
| # Responses API is slightly distinct (ResponseCreateRequest), let's keep it separate or partially aligned. | |
| # It uses 'response.model' which we must also force. | |
| # Manual force for this endpoint too: | |
| request.model = "gemini-3.0-pro" # Force locally | |
| messages = _prepare_messages_for_model( | |
| conv_messages, | |
| standard_tools or None, | |
| model_tool_choice, | |
| extra_instr or None, | |
| ) | |
| pool, db = GeminiClientPool(), LMDBConversationStore() | |
| try: | |
| model = _get_model_by_name(request.model) | |
| except ValueError as exc: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc | |
| session, client, remain = await _find_reusable_session(db, pool, model, messages) | |
| if session: | |
| # Special case: if the last message is a tool result, process it even if remain is empty | |
| if not remain and messages and messages[-1].role == "tool": | |
| remain = [messages[-1]] | |
| msgs = _prepare_messages_for_model( | |
| remain, | |
| standard_tools or None, | |
| model_tool_choice, | |
| None, | |
| False, | |
| ) | |
| if not msgs: | |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No new messages.") | |
| m_input, files = ( | |
| await GeminiClientWrapper.process_message(msgs[0], tmp_dir, tagged=False) | |
| if len(msgs) == 1 | |
| else await GeminiClientWrapper.process_conversation(msgs, tmp_dir) | |
| ) | |
| logger.debug( | |
| f"Reused session {reprlib.repr(session.metadata)} - sending {len(msgs)} prepared messages." | |
| ) | |
| else: | |
| try: | |
| client = await pool.acquire() | |
| session = client.start_chat(model=model) | |
| m_input, files = await GeminiClientWrapper.process_conversation(messages, tmp_dir) | |
| except Exception as e: | |
| logger.exception("Error in preparing conversation") | |
| raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(e)) | |
| response_id = f"resp_{uuid.uuid4().hex}" | |
| created_time = int(datetime.now(tz=timezone.utc).timestamp()) | |
| try: | |
| assert session and client | |
| logger.debug( | |
| f"Client ID: {client.id}, Input length: {len(m_input)}, files count: {len(files)}" | |
| ) | |
| resp_or_stream = await _send_with_split( | |
| session, m_input, files=files, stream=request.stream | |
| ) | |
| except Exception as e: | |
| logger.exception("Gemini API error") | |
| raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(e)) | |
| if request.stream: | |
| return _create_responses_real_streaming_response( | |
| resp_or_stream, | |
| response_id, | |
| created_time, | |
| request.model, | |
| messages, | |
| db, | |
| model, | |
| client, | |
| session, | |
| request, | |
| image_store, | |
| base_url, | |
| struct_req, | |
| ) | |
| try: | |
| raw_t = GeminiClientWrapper.extract_output(resp_or_stream, include_thoughts=True) | |
| raw_c = GeminiClientWrapper.extract_output(resp_or_stream, include_thoughts=False) | |
| except Exception as exc: | |
| logger.exception("Gemini parsing failed") | |
| raise HTTPException( | |
| status_code=status.HTTP_502_BAD_GATEWAY, detail="Malformed response." | |
| ) from exc | |
| assistant_text, storage_output, tool_calls = _process_llm_output(raw_t, raw_c, struct_req) | |
| images = resp_or_stream.images or [] | |
| # Log response details for debugging | |
| logger.debug(f"Response: text_len={len(assistant_text)}, images={len(images)}, tool_calls={len(tool_calls)}") | |
| # Check if response is completely empty (no text, no images, no tool calls) | |
| if not assistant_text and not images and not tool_calls: | |
| logger.warning("Gemini returned an empty response (no text, images, or tool calls)") | |
| # Check if there's an error in the response | |
| if hasattr(resp_or_stream, 'candidates') and resp_or_stream.candidates: | |
| logger.debug(f"Candidates: {resp_or_stream.candidates}") | |
| # Check if image generation was forced via tool_choice | |
| # Only enforce image requirement if tool_choice explicitly requests image generation | |
| # tools: [{"type": "image_generation"}] just declares the tool is available, not mandatory | |
| image_generation_forced = ( | |
| request.tool_choice is not None | |
| and isinstance(request.tool_choice, ResponseToolChoice) | |
| and request.tool_choice.type == "image_generation" | |
| ) | |
| if image_generation_forced and not images and not assistant_text: | |
| logger.warning("Image generation was forced via tool_choice but no images or text were returned.") | |
| raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail="No images returned from forced image generation request.") | |
| contents, img_calls = [], [] | |
| seen_hashes = set() | |
| for img in images: | |
| try: | |
| b64, w, h, fname, fhash = await _image_to_base64(img, image_store) | |
| if fhash in seen_hashes: | |
| (image_store / fname).unlink(missing_ok=True) | |
| continue | |
| seen_hashes.add(fhash) | |
| contents.append( | |
| ResponseOutputContent( | |
| type="output_text", | |
| text=f"})", | |
| ) | |
| ) | |
| img_calls.append( | |
| ResponseImageGenerationCall( | |
| id=fname.rsplit(".", 1)[0], | |
| result=b64, | |
| output_format="png" if isinstance(img, GeneratedImage) else "jpeg", | |
| size=f"{w}x{h}" if w and h else None, | |
| ) | |
| ) | |
| except Exception as e: | |
| logger.warning(f"Image error: {e}") | |
| if assistant_text: | |
| contents.append(ResponseOutputContent(type="output_text", text=assistant_text)) | |
| if not contents: | |
| contents.append(ResponseOutputContent(type="output_text", text="")) | |
| # Aggregate images for storage | |
| image_markdown = "" | |
| for img_call in img_calls: | |
| fname = f"{img_call.id}.{img_call.output_format}" | |
| img_url = f"})" | |
| image_markdown += f"\n\n{img_url}" | |
| if image_markdown: | |
| storage_output += image_markdown | |
| p_tok, c_tok, t_tok = _calculate_usage(messages, assistant_text, tool_calls) | |
| usage = ResponseUsage(input_tokens=p_tok, output_tokens=c_tok, total_tokens=t_tok) | |
| payload = _create_responses_standard_payload( | |
| response_id, | |
| created_time, | |
| request.model, | |
| tool_calls, | |
| img_calls, | |
| contents, | |
| usage, | |
| request, | |
| norm_input, | |
| ) | |
| _persist_conversation( | |
| db, model.model_name, client.id, session.metadata, messages, storage_output, tool_calls | |
| ) | |
| return payload | |