| import difflib
|
| import re
|
| import reprlib
|
| from typing import Any
|
|
|
| import orjson as json
|
|
|
| from .logger import logger
|
|
|
| _LENGTH_MARKER_PATTERN = re.compile(r"(\d+)\n")
|
| _FLICKER_ESC_RE = re.compile(r"\\+[`*_~].*$")
|
|
|
|
|
| def get_clean_text(s: str) -> str:
|
| """
|
| Clean Gemini text by removing trailing code block artifacts and temporary escapes of Markdown markers.
|
| """
|
|
|
| if not s:
|
| return ""
|
|
|
| if s.endswith("\n```"):
|
| s = s[:-4]
|
|
|
| return _FLICKER_ESC_RE.sub("", s)
|
|
|
|
|
| def get_delta_by_fp_len(
|
| new_raw: str, last_sent_clean: str, is_final: bool
|
| ) -> tuple[str, str]:
|
| """
|
| Calculate text delta by aligning stable content and matching volatile symbols.
|
| Handles temporary flicker at ends and permanent escaping drift during code block transitions.
|
| Uses SequenceMatcher to robustly handle middle-string modifications.
|
| """
|
|
|
| new_c = get_clean_text(new_raw) if not is_final else new_raw
|
|
|
| if new_c.startswith(last_sent_clean):
|
| return new_c[len(last_sent_clean) :], new_c
|
|
|
|
|
| search_len = min(3000, max(1000, len(last_sent_clean)))
|
| search_len = min(search_len, len(last_sent_clean), len(new_c))
|
|
|
| if search_len == 0:
|
| return new_c, new_c
|
|
|
| tail_last = last_sent_clean[-search_len:]
|
| tail_new = new_c[-search_len:]
|
|
|
| sm = difflib.SequenceMatcher(None, tail_last, tail_new)
|
| blocks = [b for b in sm.get_matching_blocks() if b.size > 0]
|
|
|
| if blocks:
|
| last_match = blocks[-1]
|
| match_end = last_match.b + last_match.size
|
| return tail_new[match_end:], new_c
|
|
|
|
|
| sm = difflib.SequenceMatcher(None, last_sent_clean, new_c)
|
| blocks = [b for b in sm.get_matching_blocks() if b.size > 0]
|
|
|
| if blocks:
|
| last_match = blocks[-1]
|
| match_end = last_match.b + last_match.size
|
| return new_c[match_end:], new_c
|
|
|
| return new_c, new_c
|
|
|
|
|
| def _get_char_count_for_utf16_units(
|
| s: str, start_idx: int, utf16_units: int
|
| ) -> tuple[int, int]:
|
| """
|
| Calculate the number of Python characters (code points) and actual UTF-16
|
| units found.
|
| """
|
|
|
| count = 0
|
| units = 0
|
| limit = len(s)
|
|
|
| while units < utf16_units and (start_idx + count) < limit:
|
| char = s[start_idx + count]
|
| u = 2 if ord(char) > 0xFFFF else 1
|
| if units + u > utf16_units:
|
| break
|
| units += u
|
| count += 1
|
|
|
| return count, units
|
|
|
|
|
| def get_nested_value(
|
| data: Any, path: list[int | str], default: Any = None, verbose: bool = False
|
| ) -> Any:
|
| """
|
| Safely navigate through a nested structure (list or dict) using a sequence of keys/indices.
|
|
|
| Parameters
|
| ----------
|
| data: `Any`
|
| The nested structure to traverse.
|
| path: `list[int | str]`
|
| A list of indices or keys representing the path.
|
| default: `Any`
|
| Value to return if the path is invalid.
|
| verbose: `bool`
|
| If True, log debug information when the path cannot be fully traversed.
|
| """
|
|
|
| current = data
|
|
|
| for i, key in enumerate(path):
|
| found = False
|
| if isinstance(key, int):
|
| if isinstance(current, list) and -len(current) <= key < len(current):
|
| current = current[key]
|
| found = True
|
| elif isinstance(key, str):
|
| if isinstance(current, dict) and key in current:
|
| current = current[key]
|
| found = True
|
|
|
| if not found:
|
| if verbose:
|
| logger.debug(
|
| f"Safe navigation: path {path} ended at index {i} (key '{key}'), "
|
| f"returning default. Context: {reprlib.repr(current)}"
|
| )
|
| return default
|
|
|
| return current if current is not None else default
|
|
|
|
|
| def parse_response_by_frame(content: str) -> tuple[list[Any], str]:
|
| """
|
| Core parser for Google's length-prefixed framing protocol,
|
| Parse as many JSON frames as possible from an accumulated buffer received from streaming responses.
|
|
|
| This function implements Google's length-prefixed framing protocol. Each frame starts
|
| with a length marker (number of characters) followed by a newline and the JSON content.
|
| If a frame is partially received, it stays in the buffer for the next call.
|
|
|
| Each frame has the format: `[length]\n[json_payload]\n`,
|
| The length value includes the newline after the number and the newline after the JSON.
|
|
|
| Parameters
|
| ----------
|
| content: `str`
|
| The accumulated string buffer containing raw streaming data from the API.
|
|
|
| Returns
|
| -------
|
| `tuple[list[Any], str]`
|
| A tuple containing:
|
| - A list of parsed JSON objects (envelopes) extracted from the buffer.
|
| - The remaining unparsed part of the buffer (incomplete frames).
|
| """
|
|
|
| consumed_pos = 0
|
| total_len = len(content)
|
| parsed_frames = []
|
|
|
| while consumed_pos < total_len:
|
| while consumed_pos < total_len and content[consumed_pos].isspace():
|
| consumed_pos += 1
|
|
|
| if consumed_pos >= total_len:
|
| break
|
|
|
| match = _LENGTH_MARKER_PATTERN.match(content, pos=consumed_pos)
|
| if not match:
|
| break
|
|
|
| length_val = match.group(1)
|
| length = int(length_val)
|
|
|
|
|
|
|
| start_content = match.start() + len(length_val)
|
| char_count, units_found = _get_char_count_for_utf16_units(
|
| content, start_content, length
|
| )
|
|
|
| if units_found < length:
|
| logger.debug(
|
| f"Incomplete frame at position {consumed_pos}: expected {length} UTF-16 units, "
|
| f"but received {units_found}. Waiting for additional data..."
|
| )
|
| break
|
|
|
| end_pos = start_content + char_count
|
| chunk = content[start_content:end_pos].strip()
|
| consumed_pos = end_pos
|
|
|
| if not chunk:
|
| continue
|
|
|
| try:
|
| parsed = json.loads(chunk)
|
| if isinstance(parsed, list):
|
| parsed_frames.extend(parsed)
|
| else:
|
| parsed_frames.append(parsed)
|
| except json.JSONDecodeError:
|
| logger.debug(
|
| f"Failed to parse chunk at pos {start_content} with length {length}. "
|
| f"Frame content: {reprlib.repr(chunk)}"
|
| )
|
|
|
| return parsed_frames, content[consumed_pos:]
|
|
|
|
|
| def extract_json_from_response(text: str) -> list:
|
| """
|
| Extract and normalize JSON content from a Google API response.
|
| """
|
|
|
| if not isinstance(text, str):
|
| raise TypeError(
|
| f"Input text is expected to be a string, got {type(text).__name__} instead."
|
| )
|
|
|
| content = text
|
| if content.startswith(")]}'"):
|
| content = content[4:]
|
|
|
| content = content.lstrip()
|
|
|
|
|
| result, _ = parse_response_by_frame(content)
|
| if result:
|
| return result
|
|
|
|
|
| content_stripped = content.strip()
|
| try:
|
| parsed = json.loads(content_stripped)
|
| return parsed if isinstance(parsed, list) else [parsed]
|
| except json.JSONDecodeError:
|
| pass
|
|
|
|
|
| collected_lines = []
|
| for line in content_stripped.splitlines():
|
| line = line.strip()
|
| if not line:
|
| continue
|
|
|
| try:
|
| parsed = json.loads(line)
|
| except json.JSONDecodeError:
|
| continue
|
|
|
| if isinstance(parsed, list):
|
| collected_lines.extend(parsed)
|
| elif isinstance(parsed, dict):
|
| collected_lines.append(parsed)
|
|
|
| if collected_lines:
|
| return collected_lines
|
|
|
| raise ValueError("Could not find a valid JSON object or array in the response.")
|
|
|