|
|
"""
|
|
|
SSE (Server-Sent Events) parser for streaming responses
|
|
|
"""
|
|
|
|
|
|
import json
|
|
|
from typing import Dict, Any, Generator, Optional, Type
|
|
|
import requests
|
|
|
|
|
|
|
|
|
class SSEParser:
|
|
|
"""Server-Sent Events parser for streaming responses"""
|
|
|
|
|
|
def __init__(self, response: requests.Response, debug_mode: bool = False):
|
|
|
"""Initialize SSE parser
|
|
|
|
|
|
Args:
|
|
|
response: requests.Response object with stream=True
|
|
|
debug_mode: Enable debug logging
|
|
|
"""
|
|
|
self.response = response
|
|
|
self.debug_mode = debug_mode
|
|
|
self.buffer = ""
|
|
|
self.line_count = 0
|
|
|
|
|
|
def debug_log(self, format_str: str, *args) -> None:
|
|
|
"""Log debug message if debug mode is enabled"""
|
|
|
if self.debug_mode:
|
|
|
if args:
|
|
|
print(f"[SSE_PARSER] {format_str % args}")
|
|
|
else:
|
|
|
print(f"[SSE_PARSER] {format_str}")
|
|
|
|
|
|
def iter_events(self) -> Generator[Dict[str, Any], None, None]:
|
|
|
"""Iterate over SSE events
|
|
|
|
|
|
Yields:
|
|
|
dict: Parsed SSE event data
|
|
|
"""
|
|
|
self.debug_log("开始解析 SSE 流")
|
|
|
|
|
|
for line in self.response.iter_lines():
|
|
|
self.line_count += 1
|
|
|
|
|
|
|
|
|
if not line:
|
|
|
continue
|
|
|
|
|
|
|
|
|
if isinstance(line, bytes):
|
|
|
try:
|
|
|
line = line.decode("utf-8")
|
|
|
except UnicodeDecodeError:
|
|
|
self.debug_log(f"第{self.line_count}行解码失败,跳过")
|
|
|
continue
|
|
|
|
|
|
|
|
|
if line.startswith(":"):
|
|
|
continue
|
|
|
|
|
|
|
|
|
if ":" in line:
|
|
|
field, value = line.split(":", 1)
|
|
|
field = field.strip()
|
|
|
value = value.lstrip()
|
|
|
|
|
|
if field == "data":
|
|
|
self.debug_log(f"收到数据 (第{self.line_count}行): {value}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
data = json.loads(value)
|
|
|
yield {"type": "data", "data": data, "raw": value}
|
|
|
except json.JSONDecodeError:
|
|
|
yield {"type": "data", "data": value, "raw": value, "is_json": False}
|
|
|
|
|
|
elif field == "event":
|
|
|
yield {"type": "event", "event": value}
|
|
|
|
|
|
elif field == "id":
|
|
|
yield {"type": "id", "id": value}
|
|
|
|
|
|
elif field == "retry":
|
|
|
try:
|
|
|
retry = int(value)
|
|
|
yield {"type": "retry", "retry": retry}
|
|
|
except ValueError:
|
|
|
self.debug_log(f"无效的 retry 值: {value}")
|
|
|
|
|
|
def iter_data_only(self) -> Generator[Dict[str, Any], None, None]:
|
|
|
"""Iterate only over data events"""
|
|
|
for event in self.iter_events():
|
|
|
if event["type"] == "data":
|
|
|
yield event
|
|
|
|
|
|
def iter_json_data(self, model_class: Optional[Type] = None) -> Generator[Dict[str, Any], None, None]:
|
|
|
"""Iterate only over JSON data events with optional validation
|
|
|
|
|
|
Args:
|
|
|
model_class: Optional Pydantic model class for validation
|
|
|
|
|
|
Yields:
|
|
|
dict: JSON data events
|
|
|
"""
|
|
|
for event in self.iter_events():
|
|
|
if event["type"] == "data" and event.get("is_json", True):
|
|
|
try:
|
|
|
if model_class:
|
|
|
data = model_class.model_validate_json(event["raw"])
|
|
|
yield {"type": "data", "data": data, "raw": event["raw"]}
|
|
|
else:
|
|
|
yield event
|
|
|
except Exception as e:
|
|
|
self.debug_log(f"数据验证失败: {e}")
|
|
|
continue
|
|
|
|
|
|
def close(self) -> None:
|
|
|
"""Close the response connection"""
|
|
|
if hasattr(self.response, "close"):
|
|
|
self.response.close()
|
|
|
|
|
|
def __enter__(self):
|
|
|
"""Context manager entry"""
|
|
|
return self
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
|
|
|
"""Context manager exit"""
|
|
|
self.close()
|
|
|
|