import html import re from pathlib import Path from typing import Any, cast import orjson from gemini_webapi import GeminiClient, ModelOutput from loguru import logger from ..models import Message from ..utils import g_config from ..utils.helper import ( add_tag, save_file_to_tempfile, save_url_to_tempfile, ) HTML_ESCAPE_RE = re.compile(r"&(?:lt|gt|amp|quot|apos|#[0-9]+|#x[0-9a-fA-F]+);") ESC_SYMBOLS_RE = re.compile(r"\\(?=[\\\[\]{}()<>`*_#~+.:!&^$|-])") CODE_FENCE_RE = re.compile(r"(```.*?```|`[^`\n]+?`)", re.DOTALL) FILE_PATH_PATTERN = re.compile( r"^(?=.*[./\\]|.*:\d+|^(?:Dockerfile|Makefile|Jenkinsfile|Procfile|Rakefile|Gemfile|Vagrantfile|Caddyfile|Justfile|LICENSE|README|CONTRIBUTING|CODEOWNERS|AUTHORS|NOTICE|CHANGELOG)$)([a-zA-Z0-9_./\\-]+(?::\d+)?)$", re.IGNORECASE, ) GOOGLE_SEARCH_LINK_PATTERN = re.compile( r"`?\[`?(.+?)`?`?]\((https://www\.google\.com/search\?q=)([^)]*)\)`?" ) GARBAGE_URL_RE = re.compile( r"https?://(?:[a-zA-Z0-9-]+\.)*(?:googleusercontent\.com|gstatic\.com|google\.com)/(?:image_collection|recs|image_retrieval)/[^\s]+" ) _UNSET = object() def clean_citations(text: str) -> str: """ Remove Web source citations like [1] or 【1†source】 from text. Protects code blocks and list items (e.g. [1] at start of line). """ if not text: return text parts = [] last_index = 0 # 1. Split by code blocks to avoid modifying code for match in CODE_FENCE_RE.finditer(text): # Process text chunk before the code block text_chunk = text[last_index : match.start()] cleaned_chunk = _clean_text_chunk(text_chunk) parts.append(cleaned_chunk) # Keep the code block as is parts.append(match.group(0)) last_index = match.end() # Process the remaining text text_chunk = text[last_index:] cleaned_chunk = _clean_text_chunk(text_chunk) parts.append(cleaned_chunk) return "".join(parts) # Pre-compiled citation pattern for performance CITATION_PATTERN = re.compile(r"(? str: # A. Always remove source-style citations text = SOURCE_CITATION_PATTERN.sub("", text) # B. Remove internal technical artifacts and links # These are raw internal URLs or IDs that Gemini Web sometimes leaks. # We target common technical subdomains or path patterns (e.g. image_collection, internal IDs) text = GARBAGE_URL_RE.sub("", text) # C. Remove standard [n] citations # Use careful regex to avoid matching list items (start of line) or URLs # Negative lookbehind to ensure we aren't part of a URL or Markdown link path def repl(m): try: start = m.start() # If match is at start of string, preserve it (likely list item) if start == 0: return m.group(0) # If immediately preceded by newline or space at start of line, preserve it preceding_text = text[:start] if not preceding_text.strip() or preceding_text.endswith("\n") or preceding_text.endswith("\n ") or preceding_text.endswith("\n\t"): return m.group(0) # Otherwise remove it return "" except Exception as e: logger.error(f"Error in _clean_text_chunk.repl: {e}") return m.group(0) return CITATION_PATTERN.sub(repl, text) def _resolve(value: Any, fallback: Any): return fallback if value is _UNSET else value class GeminiClientWrapper(GeminiClient): """Gemini client with helper methods.""" def __init__(self, client_id: str, **kwargs): super().__init__(**kwargs) self.id = client_id async def init( self, timeout: float = cast(float, _UNSET), auto_close: bool = False, close_delay: float = 300, auto_refresh: bool = cast(bool, _UNSET), refresh_interval: float = cast(float, _UNSET), verbose: bool = cast(bool, _UNSET), watchdog_timeout: float = cast(float, _UNSET), **kwargs: Any, ) -> None: """ Inject default configuration values. """ config = g_config.gemini timeout = cast(float, _resolve(timeout, config.timeout)) auto_refresh = cast(bool, _resolve(auto_refresh, config.auto_refresh)) refresh_interval = cast(float, _resolve(refresh_interval, config.refresh_interval)) verbose = cast(bool, _resolve(verbose, config.verbose)) call_kwargs: dict[str, Any] = { "timeout": timeout, "auto_close": auto_close, "close_delay": close_delay, "auto_refresh": auto_refresh, "refresh_interval": refresh_interval, "verbose": verbose, "watchdog_timeout": 120.0 if watchdog_timeout is _UNSET else watchdog_timeout, # 增加看门狗超时 } if kwargs: call_kwargs.update(kwargs) try: await super().init(**call_kwargs) except Exception: logger.exception(f"Failed to initialize GeminiClient {self.id}") raise def running(self) -> bool: return self._running @staticmethod async def process_message( message: Message, tempdir: Path | None = None, tagged: bool = True ) -> tuple[str, list[Path | str]]: """ Process a single Message object into a format suitable for the Gemini API. Extracts text fragments, handles images and files, and appends tool call blocks if present. """ files: list[Path | str] = [] text_fragments: list[str] = [] if isinstance(message.content, str): if message.content or message.role == "tool": text_fragments.append(message.content or "{}") elif isinstance(message.content, list): for item in message.content: if item.type == "text": if item.text or message.role == "tool": text_fragments.append(item.text or "{}") elif item.type == "image_url": if not item.image_url: raise ValueError("Image URL cannot be empty") if url := item.image_url.get("url", None): files.append(await save_url_to_tempfile(url, tempdir)) else: raise ValueError("Image URL must contain 'url' key") elif item.type == "file": if not item.file: raise ValueError("File cannot be empty") if file_data := item.file.get("file_data", None): filename = item.file.get("filename", "") files.append(await save_file_to_tempfile(file_data, filename, tempdir)) elif url := item.file.get("url", None): files.append(await save_url_to_tempfile(url, tempdir)) else: raise ValueError("File must contain 'file_data' or 'url' key") elif message.content is None and message.role == "tool": text_fragments.append("{}") elif message.content is not None: raise ValueError("Unsupported message content type.") if message.role == "tool": tool_name = message.name or "unknown" combined_content = "\n".join(text_fragments).strip() or "{}" text_fragments = [f"[response:{tool_name}]{combined_content}[/response]"] if message.tool_calls: tool_blocks: list[str] = [] for call in message.tool_calls: args_text = call.function.arguments.strip() try: parsed_args = orjson.loads(args_text) args_text = orjson.dumps(parsed_args, option=orjson.OPT_SORT_KEYS).decode( "utf-8" ) except orjson.JSONDecodeError: pass tool_blocks.append(f"[call:{call.function.name}]{args_text}[/call]") if tool_blocks: tool_section = "[function_calls]\n" + "".join(tool_blocks) + "\n[/function_calls]" text_fragments.append(tool_section) model_input = "\n".join(fragment for fragment in text_fragments if fragment is not None) if model_input or message.role == "tool": if tagged: model_input = add_tag(message.role, model_input) return model_input, files @staticmethod async def process_conversation( messages: list[Message], tempdir: Path | None = None ) -> tuple[str, list[Path | str]]: need_tag = any(m.role != "user" for m in messages) conversation: list[str] = [] files: list[Path | str] = [] for msg in messages: input_part, files_part = await GeminiClientWrapper.process_message( msg, tempdir, tagged=need_tag ) conversation.append(input_part) files.extend(files_part) if need_tag: conversation.append(add_tag("assistant", "", unclose=True)) return "\n".join(conversation), files @staticmethod def extract_output(response: ModelOutput, include_thoughts: bool = True) -> str: text = "" # Safely get text from response, handling empty candidates list try: response_text = response.text or "" except (IndexError, AttributeError): response_text = "" if include_thoughts: try: thoughts = response.thoughts except (IndexError, AttributeError): thoughts = None if thoughts: text += f"{thoughts}\n" text += response_text def _unescape_html(text_content: str) -> str: parts: list[str] = [] last_index = 0 for match in CODE_FENCE_RE.finditer(text_content): non_code = text_content[last_index : match.start()] if non_code: parts.append(HTML_ESCAPE_RE.sub(lambda m: html.unescape(m.group(0)), non_code)) parts.append(match.group(0)) last_index = match.end() tail = text_content[last_index:] if tail: parts.append(HTML_ESCAPE_RE.sub(lambda m: html.unescape(m.group(0)), tail)) return "".join(parts) def _unescape_symbols(text_content: str) -> str: parts: list[str] = [] last_index = 0 for match in CODE_FENCE_RE.finditer(text_content): non_code = text_content[last_index : match.start()] if non_code: parts.append(ESC_SYMBOLS_RE.sub("", non_code)) parts.append(match.group(0)) last_index = match.end() tail = text_content[last_index:] if tail: parts.append(ESC_SYMBOLS_RE.sub("", tail)) return "".join(parts) text = _unescape_html(text) text = _unescape_symbols(text) text = clean_citations(text) # Repair mangled links like [https://...](https://...) or [text (https://...) # Case 1: URL followed by ](URL) text = re.sub(r"(https?://[^\s\]]+)\]\((https?://[^\s)]+)\)", r"[\1](\2)", text) # Case 2: [text (URL) -> [text](URL) text = re.sub(r"\[([^\[\]\n]+)\s\((https?://[^\s)]+)\)", r"[\1](\2)", text) def extract_file_path_from_display_text(text_content: str) -> str | None: try: match = re.match(FILE_PATH_PATTERN, text_content) if match: return match.group(1) except (IndexError, AttributeError) as e: logger.debug(f"Failed to extract file path from '{text_content[:50]}...': {e}") return None def replacer(match: re.Match) -> str: try: display_text = str(match.group(1)).strip() google_search_prefix = match.group(2) query_part = match.group(3) file_path = extract_file_path_from_display_text(display_text) if file_path: # If it's a file path, transform it into a self-referencing Markdown link return f"[`{file_path}`]({file_path})" else: # Otherwise, reconstruct the original Google search link with the display_text original_google_search_url = f"{google_search_prefix}{query_part}" return f"[`{display_text}`]({original_google_search_url})" except (IndexError, AttributeError) as e: # Capture group access failed - return original match safely try: return match.group(0) except (IndexError, AttributeError): logger.error(f"Critical error in replacer: cannot access any match groups: {e}") return "" return re.sub(GOOGLE_SEARCH_LINK_PATTERN, replacer, text)