z2api / app /utils /sse_parser.py
zhaoxiaozhao07's picture
feat(core): 重构配置模块并添加新功能
962759f
"""
SSE (Server-Sent Events) parser for streaming responses
"""
import json
from typing import Dict, Any, Generator, Optional, Type
import requests
class SSEParser:
"""Server-Sent Events parser for streaming responses"""
def __init__(self, response: requests.Response, debug_mode: bool = False):
"""Initialize SSE parser
Args:
response: requests.Response object with stream=True
debug_mode: Enable debug logging
"""
self.response = response
self.debug_mode = debug_mode
self.buffer = ""
self.line_count = 0
def debug_log(self, format_str: str, *args) -> None:
"""Log debug message if debug mode is enabled"""
if self.debug_mode:
if args:
print(f"[SSE_PARSER] {format_str % args}")
else:
print(f"[SSE_PARSER] {format_str}")
def iter_events(self) -> Generator[Dict[str, Any], None, None]:
"""Iterate over SSE events
Yields:
dict: Parsed SSE event data
"""
self.debug_log("开始解析 SSE 流")
for line in self.response.iter_lines():
self.line_count += 1
# Skip empty lines
if not line:
continue
# Decode bytes
if isinstance(line, bytes):
try:
line = line.decode("utf-8")
except UnicodeDecodeError:
self.debug_log(f"第{self.line_count}行解码失败,跳过")
continue
# Skip comment lines
if line.startswith(":"):
continue
# Parse field-value pairs
if ":" in line:
field, value = line.split(":", 1)
field = field.strip()
value = value.lstrip()
if field == "data":
self.debug_log(f"收到数据 (第{self.line_count}行): {value}")
# Try to parse JSON
try:
data = json.loads(value)
yield {"type": "data", "data": data, "raw": value}
except json.JSONDecodeError:
yield {"type": "data", "data": value, "raw": value, "is_json": False}
elif field == "event":
yield {"type": "event", "event": value}
elif field == "id":
yield {"type": "id", "id": value}
elif field == "retry":
try:
retry = int(value)
yield {"type": "retry", "retry": retry}
except ValueError:
self.debug_log(f"无效的 retry 值: {value}")
def iter_data_only(self) -> Generator[Dict[str, Any], None, None]:
"""Iterate only over data events"""
for event in self.iter_events():
if event["type"] == "data":
yield event
def iter_json_data(self, model_class: Optional[Type] = None) -> Generator[Dict[str, Any], None, None]:
"""Iterate only over JSON data events with optional validation
Args:
model_class: Optional Pydantic model class for validation
Yields:
dict: JSON data events
"""
for event in self.iter_events():
if event["type"] == "data" and event.get("is_json", True):
try:
if model_class:
data = model_class.model_validate_json(event["raw"])
yield {"type": "data", "data": data, "raw": event["raw"]}
else:
yield event
except Exception as e:
self.debug_log(f"数据验证失败: {e}")
continue
def close(self) -> None:
"""Close the response connection"""
if hasattr(self.response, "close"):
self.response.close()
def __enter__(self):
"""Context manager entry"""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit"""
self.close()