|
|
import json |
|
|
from typing import Iterator, Dict, Any, Iterable, AsyncIterator |
|
|
from itertools import chain |
|
|
|
|
|
def parse_json_array_stream(line_iterator: Iterable[str]) -> Iterator[Dict[str, Any]]: |
|
|
""" |
|
|
解析一个由文本行组成的、格式化的(pretty-printed)JSON数组流。 |
|
|
|
|
|
这个函数是一个生成器,它会为在流中发现的每个第一层级的JSON对象 |
|
|
产出(yield)一个完整的Python字典。它的设计目标是高内存效率, |
|
|
因为它会逐行处理流,而不是一次性加载所有内容。 |
|
|
|
|
|
Args: |
|
|
line_iterator: 一个产生响应行的迭代器。例如,`requests.Response.iter_lines()` |
|
|
解码后的结果。 |
|
|
|
|
|
Yields: |
|
|
一个从流中解析出的JSON对象的字典。 |
|
|
|
|
|
Raises: |
|
|
ValueError: 如果流看起来不像是以JSON数组开始,或者其格式错误 |
|
|
导致无法按对象进行解析。 |
|
|
""" |
|
|
|
|
|
buffer = [] |
|
|
brace_level = 0 |
|
|
in_array = False |
|
|
|
|
|
|
|
|
for line in line_iterator: |
|
|
stripped_line = line.strip() |
|
|
if not stripped_line: |
|
|
continue |
|
|
|
|
|
if stripped_line.startswith('['): |
|
|
in_array = True |
|
|
|
|
|
line = stripped_line[1:] |
|
|
|
|
|
line_iterator = chain([line], line_iterator) |
|
|
break |
|
|
|
|
|
if not in_array: |
|
|
raise ValueError("数据流不是以一个JSON数组 ( '[' ) 开始。") |
|
|
|
|
|
|
|
|
in_string = False |
|
|
escape_next = False |
|
|
|
|
|
for line in line_iterator: |
|
|
for char in line: |
|
|
|
|
|
if escape_next: |
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
escape_next = False |
|
|
continue |
|
|
|
|
|
|
|
|
if char == '\\': |
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
escape_next = True |
|
|
continue |
|
|
|
|
|
|
|
|
if char == '"' and brace_level > 0: |
|
|
in_string = not in_string |
|
|
buffer.append(char) |
|
|
continue |
|
|
|
|
|
|
|
|
if not in_string: |
|
|
|
|
|
if char == '{': |
|
|
|
|
|
if brace_level == 0: |
|
|
buffer = [] |
|
|
brace_level += 1 |
|
|
|
|
|
|
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
|
|
|
|
|
|
if char == '}': |
|
|
brace_level -= 1 |
|
|
|
|
|
if brace_level == 0 and buffer: |
|
|
obj_str = "".join(buffer) |
|
|
try: |
|
|
|
|
|
|
|
|
yield json.loads(obj_str, strict=False) |
|
|
except json.JSONDecodeError as e: |
|
|
|
|
|
raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e |
|
|
finally: |
|
|
|
|
|
buffer = [] |
|
|
in_string = False |
|
|
else: |
|
|
|
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
|
|
|
|
|
|
if brace_level != 0: |
|
|
print(f"警告: JSON流意外结束,括号层级为 {brace_level},可能数据不完整。") |
|
|
|
|
|
async def parse_json_array_stream_async(line_iterator: AsyncIterator[str]) -> AsyncIterator[Dict[str, Any]]: |
|
|
""" |
|
|
异步版本:解析一个由文本行组成的、格式化的(pretty-printed)JSON数组流。 |
|
|
|
|
|
这个函数是一个异步生成器,它会为在流中发现的每个第一层级的JSON对象 |
|
|
产出(yield)一个完整的Python字典。它的设计目标是高内存效率, |
|
|
因为它会逐行处理流,而不是一次性加载所有内容。 |
|
|
|
|
|
Args: |
|
|
line_iterator: 一个产生响应行的异步迭代器。例如,`httpx.Response.aiter_lines()` |
|
|
|
|
|
Yields: |
|
|
一个从流中解析出的JSON对象的字典。 |
|
|
|
|
|
Raises: |
|
|
ValueError: 如果流看起来不像是以JSON数组开始,或者其格式错误 |
|
|
导致无法按对象进行解析。 |
|
|
""" |
|
|
|
|
|
buffer = [] |
|
|
brace_level = 0 |
|
|
in_array = False |
|
|
|
|
|
|
|
|
in_string = False |
|
|
escape_next = False |
|
|
|
|
|
async for line in line_iterator: |
|
|
stripped_line = line.strip() |
|
|
if not stripped_line: |
|
|
continue |
|
|
|
|
|
if stripped_line.startswith('['): |
|
|
in_array = True |
|
|
|
|
|
line = stripped_line[1:] |
|
|
|
|
|
for char in line: |
|
|
if escape_next: |
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
escape_next = False |
|
|
continue |
|
|
|
|
|
if char == '\\': |
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
escape_next = True |
|
|
continue |
|
|
|
|
|
if char == '"' and brace_level > 0: |
|
|
in_string = not in_string |
|
|
buffer.append(char) |
|
|
continue |
|
|
|
|
|
if not in_string: |
|
|
if char == '{': |
|
|
if brace_level == 0: |
|
|
buffer = [] |
|
|
brace_level += 1 |
|
|
|
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
|
|
|
if char == '}': |
|
|
brace_level -= 1 |
|
|
if brace_level == 0 and buffer: |
|
|
obj_str = "".join(buffer) |
|
|
try: |
|
|
yield json.loads(obj_str, strict=False) |
|
|
except json.JSONDecodeError as e: |
|
|
raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e |
|
|
finally: |
|
|
buffer = [] |
|
|
in_string = False |
|
|
else: |
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
break |
|
|
|
|
|
if not in_array: |
|
|
raise ValueError("数据流不是以一个JSON数组 ( '[' ) 开始。") |
|
|
|
|
|
|
|
|
async for line in line_iterator: |
|
|
for char in line: |
|
|
|
|
|
if escape_next: |
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
escape_next = False |
|
|
continue |
|
|
|
|
|
|
|
|
if char == '\\': |
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
escape_next = True |
|
|
continue |
|
|
|
|
|
|
|
|
if char == '"' and brace_level > 0: |
|
|
in_string = not in_string |
|
|
buffer.append(char) |
|
|
continue |
|
|
|
|
|
|
|
|
if not in_string: |
|
|
|
|
|
if char == '{': |
|
|
|
|
|
if brace_level == 0: |
|
|
buffer = [] |
|
|
brace_level += 1 |
|
|
|
|
|
|
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
|
|
|
|
|
|
if char == '}': |
|
|
brace_level -= 1 |
|
|
|
|
|
if brace_level == 0 and buffer: |
|
|
obj_str = "".join(buffer) |
|
|
try: |
|
|
|
|
|
|
|
|
yield json.loads(obj_str, strict=False) |
|
|
except json.JSONDecodeError as e: |
|
|
|
|
|
raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e |
|
|
finally: |
|
|
|
|
|
buffer = [] |
|
|
in_string = False |
|
|
else: |
|
|
|
|
|
if brace_level > 0: |
|
|
buffer.append(char) |
|
|
|
|
|
|
|
|
if brace_level != 0: |
|
|
print(f"警告: JSON流意外结束,括号层级为 {brace_level},可能数据不完整。") |
|
|
|
|
|
|