xiaoyukkkk commited on
Commit
cf6e54c
·
verified ·
1 Parent(s): fa1062a

Upload streaming_parser.py

Browse files
Files changed (1) hide show
  1. util/streaming_parser.py +257 -0
util/streaming_parser.py CHANGED
@@ -0,0 +1,257 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from typing import Iterator, Dict, Any, Iterable, AsyncIterator
3
+
4
+ def parse_json_array_stream(line_iterator: Iterable[str]) -> Iterator[Dict[str, Any]]:
5
+ """
6
+ 解析一个由文本行组成的、格式化的(pretty-printed)JSON数组流。
7
+
8
+ 这个函数是一个生成器,它会为在流中发现的每个第一层级的JSON对象
9
+ 产出(yield)一个完整的Python字典。它的设计目标是高内存效率,
10
+ 因为它会逐行处理流,而不是一次性加载所有内容。
11
+
12
+ Args:
13
+ line_iterator: 一个产生响应行的迭代器。例如,`requests.Response.iter_lines()`
14
+ 解码后的结果。
15
+
16
+ Yields:
17
+ 一个从流中解析出的JSON对象的字典。
18
+
19
+ Raises:
20
+ ValueError: 如果流看起来不像是以JSON数组开始,或者其格式错误
21
+ 导致无法按对象进行解析。
22
+ """
23
+ # 状态变量
24
+ buffer = []
25
+ brace_level = 0
26
+ in_array = False
27
+
28
+ # 1. 寻找数组的起始符 '[',并忽略之前的所有行
29
+ for line in line_iterator:
30
+ stripped_line = line.strip()
31
+ if not stripped_line:
32
+ continue
33
+
34
+ if stripped_line.startswith('['):
35
+ in_array = True
36
+ # 去掉起始的 '[' 字符,剩下的部分继续处理
37
+ line = stripped_line[1:]
38
+ # 将剩余部分和后续的迭代器重新组合成一个新的迭代器
39
+ line_iterator = iter(list([line]) + list(line_iterator))
40
+ break
41
+
42
+ if not in_array:
43
+ raise ValueError("数据流不是以一个JSON数组 ( '[' ) 开始。")
44
+
45
+ # 2. 遍历流,逐个字符地构建和解析对象
46
+ in_string = False # 是否在字符串内部
47
+ escape_next = False # 下一个字符是否被转义
48
+
49
+ for line in line_iterator:
50
+ for char in line:
51
+ # 处理转义字符
52
+ if escape_next:
53
+ if brace_level > 0:
54
+ buffer.append(char)
55
+ escape_next = False
56
+ continue
57
+
58
+ # 检查是否是转义符
59
+ if char == '\\':
60
+ if brace_level > 0:
61
+ buffer.append(char)
62
+ escape_next = True
63
+ continue
64
+
65
+ # 检查字符串边界(只在对象内部时才处理)
66
+ if char == '"' and brace_level > 0:
67
+ in_string = not in_string
68
+ buffer.append(char)
69
+ continue
70
+
71
+ # 只有在非字符串内部时,才处理括号
72
+ if not in_string:
73
+ # 当遇到 '{' 时,增加嵌套层级
74
+ if char == '{':
75
+ # 如果是第一层级的对象,清空缓冲区,准备接收新对象
76
+ if brace_level == 0:
77
+ buffer = []
78
+ brace_level += 1
79
+
80
+ # 只有在对象内部时 (brace_level > 0),才将字符加入缓冲区
81
+ if brace_level > 0:
82
+ buffer.append(char)
83
+
84
+ # 当遇到 '}' 时,减少嵌套层级
85
+ if char == '}':
86
+ brace_level -= 1
87
+ # 当层级回到0时,说明一个第一层级的对象已经完整
88
+ if brace_level == 0 and buffer:
89
+ obj_str = "".join(buffer)
90
+ try:
91
+ # 解析这个完整的对象字符串并产出结果
92
+ # 使用 strict=False 允许控制字符
93
+ yield json.loads(obj_str, strict=False)
94
+ except json.JSONDecodeError as e:
95
+ # 如果解析失败,抛出带上下文的异常
96
+ raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e
97
+ finally:
98
+ # 重置缓冲区,为下一个对象做准备
99
+ buffer = []
100
+ in_string = False # 重置字符串状态
101
+ else:
102
+ # 在字符串内部,直接添加字符
103
+ if brace_level > 0:
104
+ buffer.append(char)
105
+
106
+ # 3. 检查流结束后,是否还有未闭合的对象
107
+ if brace_level != 0:
108
+ print(f"警告: JSON流意外结束,括号层级为 {brace_level},可能数据不完整。")
109
+
110
+ async def parse_json_array_stream_async(line_iterator: AsyncIterator[str]) -> AsyncIterator[Dict[str, Any]]:
111
+ """
112
+ 异步版本:解析一个由文本行组成的、格式化的(pretty-printed)JSON数组流。
113
+
114
+ 这个函数是一个异步生成器,它会为在流中发现的每个第一层级的JSON对象
115
+ 产出(yield)一个完整的Python字典。它的设计目标是高内存效率,
116
+ 因为它会逐行处理流,而不是一次性加载所有内容。
117
+
118
+ Args:
119
+ line_iterator: 一个产生响应行的异步迭代器���例如,`httpx.Response.aiter_lines()`
120
+
121
+ Yields:
122
+ 一个从流中解析出的JSON对象的字典。
123
+
124
+ Raises:
125
+ ValueError: 如果流看起来不像是以JSON数组开始,或者其格式错误
126
+ 导致无法按对象进行解析。
127
+ """
128
+ # 状态变量
129
+ buffer = []
130
+ brace_level = 0
131
+ in_array = False
132
+
133
+ # 1. 寻找数组的起始符 '[',并忽略之前的所有行
134
+ in_string = False
135
+ escape_next = False
136
+
137
+ async for line in line_iterator:
138
+ stripped_line = line.strip()
139
+ if not stripped_line:
140
+ continue
141
+
142
+ if stripped_line.startswith('['):
143
+ in_array = True
144
+ # 去掉起始的 '[' 字符,剩下的部分继续处理
145
+ line = stripped_line[1:]
146
+ # 处理剩余部分(使用相同的字符串状态跟踪逻辑)
147
+ for char in line:
148
+ if escape_next:
149
+ if brace_level > 0:
150
+ buffer.append(char)
151
+ escape_next = False
152
+ continue
153
+
154
+ if char == '\\':
155
+ if brace_level > 0:
156
+ buffer.append(char)
157
+ escape_next = True
158
+ continue
159
+
160
+ if char == '"' and brace_level > 0:
161
+ in_string = not in_string
162
+ buffer.append(char)
163
+ continue
164
+
165
+ if not in_string:
166
+ if char == '{':
167
+ if brace_level == 0:
168
+ buffer = []
169
+ brace_level += 1
170
+
171
+ if brace_level > 0:
172
+ buffer.append(char)
173
+
174
+ if char == '}':
175
+ brace_level -= 1
176
+ if brace_level == 0 and buffer:
177
+ obj_str = "".join(buffer)
178
+ try:
179
+ yield json.loads(obj_str, strict=False)
180
+ except json.JSONDecodeError as e:
181
+ raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e
182
+ finally:
183
+ buffer = []
184
+ in_string = False
185
+ else:
186
+ if brace_level > 0:
187
+ buffer.append(char)
188
+ break
189
+
190
+ if not in_array:
191
+ raise ValueError("数据流不是以一个JSON数组 ( '[' ) 开始。")
192
+
193
+ # 2. 遍历流,逐个字符地构建和解析对象
194
+ in_string = False # 是否在字符串内部
195
+ escape_next = False # 下一个字符是否被转义
196
+
197
+ async for line in line_iterator:
198
+ for char in line:
199
+ # 处理转义字符
200
+ if escape_next:
201
+ if brace_level > 0:
202
+ buffer.append(char)
203
+ escape_next = False
204
+ continue
205
+
206
+ # 检查是否是转义符
207
+ if char == '\\':
208
+ if brace_level > 0:
209
+ buffer.append(char)
210
+ escape_next = True
211
+ continue
212
+
213
+ # 检查字符串边界(只在对象内部时才处理)
214
+ if char == '"' and brace_level > 0:
215
+ in_string = not in_string
216
+ buffer.append(char)
217
+ continue
218
+
219
+ # 只有在非字符串内部时,才处理括号
220
+ if not in_string:
221
+ # 当遇到 '{' 时,增加嵌套层级
222
+ if char == '{':
223
+ # 如果是第一层级的对象,清空缓冲区,准备接收新对象
224
+ if brace_level == 0:
225
+ buffer = []
226
+ brace_level += 1
227
+
228
+ # 只有在对象内部时 (brace_level > 0),才将字符加入缓冲区
229
+ if brace_level > 0:
230
+ buffer.append(char)
231
+
232
+ # 当遇到 '}' 时,减少嵌套层级
233
+ if char == '}':
234
+ brace_level -= 1
235
+ # 当层级回到0时,说明一个第一层级的对象已经完整
236
+ if brace_level == 0 and buffer:
237
+ obj_str = "".join(buffer)
238
+ try:
239
+ # 解析这个完整的对象字符串并产出结果
240
+ # 使用 strict=False 允许控制字符
241
+ yield json.loads(obj_str, strict=False)
242
+ except json.JSONDecodeError as e:
243
+ # 如果解析失败,抛出带上下文的异常
244
+ raise ValueError(f"解析JSON对象失败: {e}\n内容: {obj_str}") from e
245
+ finally:
246
+ # 重置缓冲区,为下一个对象做准备
247
+ buffer = []
248
+ in_string = False # 重置字符串状态
249
+ else:
250
+ # 在字符串内部,直接添加字符
251
+ if brace_level > 0:
252
+ buffer.append(char)
253
+
254
+ # 3. 检查流结束后,是否还有未闭合的对象
255
+ if brace_level != 0:
256
+ print(f"警告: JSON流意外结束,括号层级为 {brace_level},可能数据不完整。")
257
+