| | import json |
| | from typing import Iterator, Dict, Any, Iterable, AsyncIterator |
| | from itertools import chain |
| |
|
| | def parse_json_array_stream(line_iterator: Iterable[str]) -> Iterator[Dict[str, Any]]: |
| | """ |
| | 解析一个由文本行组成的、格式化的(pretty-printed)JSON数组流。 |
| | |
| | 这个函数是一个生成器,它会为在流中发现的每个第一层级的JSON对象 |
| | 产出(yield)一个完整的Python字典。它的设计目标是高内存效率, |
| | 因为它会逐行处理流,而不是一次性加载所有内容。 |
| | |
| | Args: |
| | line_iterator: 一个产生响应行的迭代器。例如,`requests.Response.iter_lines()` |
| | 解码后的结果。 |
| | |
| | Yields: |
| | 一个从流中解析出的JSON对象的字典。 |
| | |
| | Raises: |
| | ValueError: 如果流看起来不像是以JSON数组开始,或者其格式错误 |
| | 导致无法按对象进行解析。 |
| | """ |
| | |
| | buffer = [] |
| | brace_level = 0 |
| | in_array = False |
| |
|
| | |
| | for line in line_iterator: |
| | stripped_line = line.strip() |
| | if not stripped_line: |
| | continue |
| |
|
| | if stripped_line.startswith('['): |
| | in_array = True |
| | |
| | line = stripped_line[1:] |
| | |
| | line_iterator = chain([line], line_iterator) |
| | break |
| | |
| | if not in_array: |
| | raise ValueError("数据流不是以一个JSON数组 ( '[' ) 开始。") |
| |
|
| | |
| | in_string = False |
| | escape_next = False |
| |
|
| | for line in line_iterator: |
| | for char in line: |
| | |
| | if escape_next: |
| | if brace_level > 0: |
| | buffer.append(char) |
| | escape_next = False |
| | continue |
| |
|
| | |
| | if char == '\\': |
| | if brace_level > 0: |
| | buffer.append(char) |
| | escape_next = True |
| | continue |
| |
|
| | |
| | if char == '"' and brace_level > 0: |
| | in_string = not in_string |
| | buffer.append(char) |
| | continue |
| |
|
| | |
| | if not in_string: |
| | |
| | if char == '{': |
| | |
| | if brace_level == 0: |
| | buffer = [] |
| | brace_level += 1 |
| |
|
| | |
| | if brace_level > 0: |
| | buffer.append(char) |
| |
|
| | |
| | if char == '}': |
| | brace_level -= 1 |
| | |
| | if brace_level == 0 and buffer: |
| | obj_str = "".join(buffer) |
| | try: |
| | |
| | |
| | yield json.loads(obj_str, strict=False) |
| | except json.JSONDecodeError as e: |
| | |
| | raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e |
| | finally: |
| | |
| | buffer = [] |
| | in_string = False |
| | else: |
| | |
| | if brace_level > 0: |
| | buffer.append(char) |
| |
|
| | |
| | if brace_level != 0: |
| | print(f"警告: JSON流意外结束,括号层级为 {brace_level},可能数据不完整。") |
| |
|
| | async def parse_json_array_stream_async(line_iterator: AsyncIterator[str]) -> AsyncIterator[Dict[str, Any]]: |
| | """ |
| | 异步版本:解析一个由文本行组成的、格式化的(pretty-printed)JSON数组流。 |
| | |
| | 这个函数是一个异步生成器,它会为在流中发现的每个第一层级的JSON对象 |
| | 产出(yield)一个完整的Python字典。它的设计目标是高内存效率, |
| | 因为它会逐行处理流,而不是一次性加载所有内容。 |
| | |
| | Args: |
| | line_iterator: 一个产生响应行的异步迭代器。例如,`httpx.Response.aiter_lines()` |
| | |
| | Yields: |
| | 一个从流中解析出的JSON对象的字典。 |
| | |
| | Raises: |
| | ValueError: 如果流看起来不像是以JSON数组开始,或者其格式错误 |
| | 导致无法按对象进行解析。 |
| | """ |
| | |
| | buffer = [] |
| | brace_level = 0 |
| | in_array = False |
| |
|
| | |
| | in_string = False |
| | escape_next = False |
| |
|
| | async for line in line_iterator: |
| | stripped_line = line.strip() |
| | if not stripped_line: |
| | continue |
| |
|
| | if stripped_line.startswith('['): |
| | in_array = True |
| | |
| | line = stripped_line[1:] |
| | |
| | for char in line: |
| | if escape_next: |
| | if brace_level > 0: |
| | buffer.append(char) |
| | escape_next = False |
| | continue |
| |
|
| | if char == '\\': |
| | if brace_level > 0: |
| | buffer.append(char) |
| | escape_next = True |
| | continue |
| |
|
| | if char == '"' and brace_level > 0: |
| | in_string = not in_string |
| | buffer.append(char) |
| | continue |
| |
|
| | if not in_string: |
| | if char == '{': |
| | if brace_level == 0: |
| | buffer = [] |
| | brace_level += 1 |
| |
|
| | if brace_level > 0: |
| | buffer.append(char) |
| |
|
| | if char == '}': |
| | brace_level -= 1 |
| | if brace_level == 0 and buffer: |
| | obj_str = "".join(buffer) |
| | try: |
| | yield json.loads(obj_str, strict=False) |
| | except json.JSONDecodeError as e: |
| | raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e |
| | finally: |
| | buffer = [] |
| | in_string = False |
| | else: |
| | if brace_level > 0: |
| | buffer.append(char) |
| | break |
| |
|
| | if not in_array: |
| | raise ValueError("数据流不是以一个JSON数组 ( '[' ) 开始。") |
| |
|
| | |
| | async for line in line_iterator: |
| | for char in line: |
| | |
| | if escape_next: |
| | if brace_level > 0: |
| | buffer.append(char) |
| | escape_next = False |
| | continue |
| |
|
| | |
| | if char == '\\': |
| | if brace_level > 0: |
| | buffer.append(char) |
| | escape_next = True |
| | continue |
| |
|
| | |
| | if char == '"' and brace_level > 0: |
| | in_string = not in_string |
| | buffer.append(char) |
| | continue |
| |
|
| | |
| | if not in_string: |
| | |
| | if char == '{': |
| | |
| | if brace_level == 0: |
| | buffer = [] |
| | brace_level += 1 |
| |
|
| | |
| | if brace_level > 0: |
| | buffer.append(char) |
| |
|
| | |
| | if char == '}': |
| | brace_level -= 1 |
| | |
| | if brace_level == 0 and buffer: |
| | obj_str = "".join(buffer) |
| | try: |
| | |
| | |
| | yield json.loads(obj_str, strict=False) |
| | except json.JSONDecodeError as e: |
| | |
| | raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e |
| | finally: |
| | |
| | buffer = [] |
| | in_string = False |
| | else: |
| | |
| | if brace_level > 0: |
| | buffer.append(char) |
| |
|
| | |
| | if brace_level != 0: |
| | print(f"警告: JSON流意外结束,括号层级为 {brace_level},可能数据不完整。") |
| |
|
| |
|