subapi / gemini_webapi /utils /parsing.py
habulaj's picture
Upload 49 files
e816bb2 verified
import difflib
import re
import reprlib
from typing import Any
import orjson as json
from .logger import logger
_LENGTH_MARKER_PATTERN = re.compile(r"(\d+)\n")
_FLICKER_ESC_RE = re.compile(r"\\+[`*_~].*$")
def get_clean_text(s: str) -> str:
"""
Clean Gemini text by removing trailing code block artifacts and temporary escapes of Markdown markers.
"""
if not s:
return ""
if s.endswith("\n```"):
s = s[:-4]
return _FLICKER_ESC_RE.sub("", s)
def get_delta_by_fp_len(
new_raw: str, last_sent_clean: str, is_final: bool
) -> tuple[str, str]:
"""
Calculate text delta by aligning stable content and matching volatile symbols.
Handles temporary flicker at ends and permanent escaping drift during code block transitions.
Uses SequenceMatcher to robustly handle middle-string modifications.
"""
new_c = get_clean_text(new_raw) if not is_final else new_raw
if new_c.startswith(last_sent_clean):
return new_c[len(last_sent_clean) :], new_c
# Find the matching suffix to handle differences gracefully
search_len = min(3000, max(1000, len(last_sent_clean)))
search_len = min(search_len, len(last_sent_clean), len(new_c))
if search_len == 0:
return new_c, new_c
tail_last = last_sent_clean[-search_len:]
tail_new = new_c[-search_len:]
sm = difflib.SequenceMatcher(None, tail_last, tail_new)
blocks = [b for b in sm.get_matching_blocks() if b.size > 0]
if blocks:
last_match = blocks[-1]
match_end = last_match.b + last_match.size
return tail_new[match_end:], new_c
# Fallback to full string if tail didn't match at all
sm = difflib.SequenceMatcher(None, last_sent_clean, new_c)
blocks = [b for b in sm.get_matching_blocks() if b.size > 0]
if blocks:
last_match = blocks[-1]
match_end = last_match.b + last_match.size
return new_c[match_end:], new_c
return new_c, new_c
def _get_char_count_for_utf16_units(
s: str, start_idx: int, utf16_units: int
) -> tuple[int, int]:
"""
Calculate the number of Python characters (code points) and actual UTF-16
units found.
"""
count = 0
units = 0
limit = len(s)
while units < utf16_units and (start_idx + count) < limit:
char = s[start_idx + count]
u = 2 if ord(char) > 0xFFFF else 1
if units + u > utf16_units:
break
units += u
count += 1
return count, units
def get_nested_value(
data: Any, path: list[int | str], default: Any = None, verbose: bool = False
) -> Any:
"""
Safely navigate through a nested structure (list or dict) using a sequence of keys/indices.
Parameters
----------
data: `Any`
The nested structure to traverse.
path: `list[int | str]`
A list of indices or keys representing the path.
default: `Any`
Value to return if the path is invalid.
verbose: `bool`
If True, log debug information when the path cannot be fully traversed.
"""
current = data
for i, key in enumerate(path):
found = False
if isinstance(key, int):
if isinstance(current, list) and -len(current) <= key < len(current):
current = current[key]
found = True
elif isinstance(key, str):
if isinstance(current, dict) and key in current:
current = current[key]
found = True
if not found:
if verbose:
logger.debug(
f"Safe navigation: path {path} ended at index {i} (key '{key}'), "
f"returning default. Context: {reprlib.repr(current)}"
)
return default
return current if current is not None else default
def parse_response_by_frame(content: str) -> tuple[list[Any], str]:
"""
Core parser for Google's length-prefixed framing protocol,
Parse as many JSON frames as possible from an accumulated buffer received from streaming responses.
This function implements Google's length-prefixed framing protocol. Each frame starts
with a length marker (number of characters) followed by a newline and the JSON content.
If a frame is partially received, it stays in the buffer for the next call.
Each frame has the format: `[length]\n[json_payload]\n`,
The length value includes the newline after the number and the newline after the JSON.
Parameters
----------
content: `str`
The accumulated string buffer containing raw streaming data from the API.
Returns
-------
`tuple[list[Any], str]`
A tuple containing:
- A list of parsed JSON objects (envelopes) extracted from the buffer.
- The remaining unparsed part of the buffer (incomplete frames).
"""
consumed_pos = 0
total_len = len(content)
parsed_frames = []
while consumed_pos < total_len:
while consumed_pos < total_len and content[consumed_pos].isspace():
consumed_pos += 1
if consumed_pos >= total_len:
break
match = _LENGTH_MARKER_PATTERN.match(content, pos=consumed_pos)
if not match:
break
length_val = match.group(1)
length = int(length_val)
# Content starts immediately after the digits.
# Google uses UTF-16 code units (JavaScript `String.length`) for the length marker.
start_content = match.start() + len(length_val)
char_count, units_found = _get_char_count_for_utf16_units(
content, start_content, length
)
if units_found < length:
logger.debug(
f"Incomplete frame at position {consumed_pos}: expected {length} UTF-16 units, "
f"but received {units_found}. Waiting for additional data..."
)
break
end_pos = start_content + char_count
chunk = content[start_content:end_pos].strip()
consumed_pos = end_pos
if not chunk:
continue
try:
parsed = json.loads(chunk)
if isinstance(parsed, list):
parsed_frames.extend(parsed)
else:
parsed_frames.append(parsed)
except json.JSONDecodeError:
logger.debug(
f"Failed to parse chunk at pos {start_content} with length {length}. "
f"Frame content: {reprlib.repr(chunk)}"
)
return parsed_frames, content[consumed_pos:]
def extract_json_from_response(text: str) -> list:
"""
Extract and normalize JSON content from a Google API response.
"""
if not isinstance(text, str):
raise TypeError(
f"Input text is expected to be a string, got {type(text).__name__} instead."
)
content = text
if content.startswith(")]}'"):
content = content[4:]
content = content.lstrip()
# Try extracting with framing protocol first, as it's the most structured format
result, _ = parse_response_by_frame(content)
if result:
return result
# Extract the entire content if parsing by frames failed
content_stripped = content.strip()
try:
parsed = json.loads(content_stripped)
return parsed if isinstance(parsed, list) else [parsed]
except json.JSONDecodeError:
pass
# Extract with NDJSON
collected_lines = []
for line in content_stripped.splitlines():
line = line.strip()
if not line:
continue
try:
parsed = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(parsed, list):
collected_lines.extend(parsed)
elif isinstance(parsed, dict):
collected_lines.append(parsed)
if collected_lines:
return collected_lines
raise ValueError("Could not find a valid JSON object or array in the response.")