File size: 4,215 Bytes
962759f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
SSE (Server-Sent Events) parser for streaming responses
"""

import json
from typing import Dict, Any, Generator, Optional, Type
import requests


class SSEParser:
    """Server-Sent Events parser for streaming responses"""

    def __init__(self, response: requests.Response, debug_mode: bool = False):
        """Initialize SSE parser

        Args:
            response: requests.Response object with stream=True
            debug_mode: Enable debug logging
        """
        self.response = response
        self.debug_mode = debug_mode
        self.buffer = ""
        self.line_count = 0

    def debug_log(self, format_str: str, *args) -> None:
        """Log debug message if debug mode is enabled"""
        if self.debug_mode:
            if args:
                print(f"[SSE_PARSER] {format_str % args}")
            else:
                print(f"[SSE_PARSER] {format_str}")

    def iter_events(self) -> Generator[Dict[str, Any], None, None]:
        """Iterate over SSE events

        Yields:
            dict: Parsed SSE event data
        """
        self.debug_log("开始解析 SSE 流")

        for line in self.response.iter_lines():
            self.line_count += 1

            # Skip empty lines
            if not line:
                continue

            # Decode bytes
            if isinstance(line, bytes):
                try:
                    line = line.decode("utf-8")
                except UnicodeDecodeError:
                    self.debug_log(f"第{self.line_count}行解码失败,跳过")
                    continue

            # Skip comment lines
            if line.startswith(":"):
                continue

            # Parse field-value pairs
            if ":" in line:
                field, value = line.split(":", 1)
                field = field.strip()
                value = value.lstrip()

                if field == "data":
                    self.debug_log(f"收到数据 (第{self.line_count}行): {value}")

                    # Try to parse JSON
                    try:
                        data = json.loads(value)
                        yield {"type": "data", "data": data, "raw": value}
                    except json.JSONDecodeError:
                        yield {"type": "data", "data": value, "raw": value, "is_json": False}

                elif field == "event":
                    yield {"type": "event", "event": value}

                elif field == "id":
                    yield {"type": "id", "id": value}

                elif field == "retry":
                    try:
                        retry = int(value)
                        yield {"type": "retry", "retry": retry}
                    except ValueError:
                        self.debug_log(f"无效的 retry 值: {value}")

    def iter_data_only(self) -> Generator[Dict[str, Any], None, None]:
        """Iterate only over data events"""
        for event in self.iter_events():
            if event["type"] == "data":
                yield event

    def iter_json_data(self, model_class: Optional[Type] = None) -> Generator[Dict[str, Any], None, None]:
        """Iterate only over JSON data events with optional validation

        Args:
            model_class: Optional Pydantic model class for validation

        Yields:
            dict: JSON data events
        """
        for event in self.iter_events():
            if event["type"] == "data" and event.get("is_json", True):
                try:
                    if model_class:
                        data = model_class.model_validate_json(event["raw"])
                        yield {"type": "data", "data": data, "raw": event["raw"]}
                    else:
                        yield event
                except Exception as e:
                    self.debug_log(f"数据验证失败: {e}")
                    continue

    def close(self) -> None:
        """Close the response connection"""
        if hasattr(self.response, "close"):
            self.response.close()

    def __enter__(self):
        """Context manager entry"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Context manager exit"""
        self.close()