File size: 5,475 Bytes
5669b22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import json
from typing import List, Dict, Any, Optional, Tuple
from loguru import logger


class StreamJSONDetector:
    """Detector for real-time JSON detection in streaming text."""

    def __init__(self):
        self.buffer = ""  # Store text that has not been fully processed
        self.potential_jsons = []  # Store possible JSON starting positions
        self.completed_jsons = []  # Store completed JSON objects
        self.processed_ranges = []  # Store processed intervals [start, end]

    def process_chunk(self, chunk: str) -> List[Dict[str, Any]]:
        """Process a single text chunk, return a list of complete JSON objects found in this chunk.



        Args:

            chunk (str): Received text chunk



        Returns:

            List[Dict[str, Any]]: List of complete JSON objects parsed from the current chunk

        """
        # Add new chunk to buffer
        old_length = len(self.buffer)
        self.buffer += chunk

        # Update potential JSON starting positions
        self._find_potential_starts(old_length)

        # Try to parse potential JSON objects
        new_jsons = self._try_parse_jsons()

        return new_jsons

    def _find_potential_starts(self, start_from: int) -> None:
        """Find new potential JSON starting positions in the buffer.



        Args:

            start_from (int): Position to start searching from

        """
        for i in range(start_from, len(self.buffer)):
            if self.buffer[i] == "{" and not self._is_in_processed_range(i):
                self.potential_jsons.append(i)

    def _is_in_processed_range(self, pos: int) -> bool:
        """Check if a specified position is within a processed range.



        Args:

            pos (int): Position to check



        Returns:

            bool: True if position is within a processed range, otherwise False

        """
        for start, end in self.processed_ranges:
            if start <= pos <= end:
                return True
        return False

    def _try_parse_jsons(self) -> List[Dict[str, Any]]:
        """Try to parse JSON objects from the current buffer.



        Returns:

            List[Dict[str, Any]]: List of newly parsed JSON objects

        """
        new_jsons = []
        remaining_potential = []

        # Sort by starting position, process outermost JSON first
        self.potential_jsons.sort()

        for start_idx in self.potential_jsons:
            # Skip if this position is already within a processed range
            if self._is_in_processed_range(start_idx):
                continue

            result, end_idx = self._extract_json(start_idx)
            if result is not None:
                new_jsons.append(result)
                # Mark this range as processed
                self.processed_ranges.append((start_idx, end_idx))
                self.completed_jsons.append(result)
            else:
                # This JSON may not be complete yet, keep it
                remaining_potential.append(start_idx)

        self.potential_jsons = remaining_potential
        return new_jsons

    def _extract_json(self, start_idx: int) -> Tuple[Optional[Dict[str, Any]], int]:
        """Try to extract a complete JSON object from the given position.



        Args:

            start_idx (int): Potential starting position of JSON



        Returns:

            Tuple[Optional[Dict[str, Any]], int]: Parsed JSON object and ending position,

                                               or (None, -1) if incomplete

        """
        stack = 1
        i = start_idx + 1

        while i < len(self.buffer) and stack > 0:
            if self.buffer[i] == "{":
                stack += 1
            elif self.buffer[i] == "}":
                stack -= 1
            i += 1

        # If complete JSON is found
        if stack == 0:
            json_str = self.buffer[start_idx:i]
            try:
                json_data = json.loads(json_str)
                return json_data, i - 1
            except json.JSONDecodeError:
                logger.warning(
                    f"JSON structure found but parsing failed: {json_str[:50]}..."
                )

        return None, -1

    def get_all_jsons(self) -> List[Dict[str, Any]]:
        """Get all JSON objects parsed so far.



        Returns:

            List[Dict[str, Any]]: List of all parsed JSON objects

        """
        return self.completed_jsons

    def reset(self) -> None:
        """Reset detector state, prepare to process a new stream."""
        self.buffer = ""
        self.potential_jsons = []
        self.completed_jsons = []
        self.processed_ranges = []


# Usage example
if __name__ == "__main__":
    # Simulate streaming text reception
    test_chunks = [
        "This is some plain text ",
        "Here comes JSON: {",
        '"name": "test", "values": [1, 2, ',
        '3]} This is text after JSON {"another": "json", ',
        '"nested": {"key": "value"}}',
    ]

    detector = StreamJSONDetector()

    for i, chunk in enumerate(test_chunks):
        logger.info(f"Processing chunk {i + 1}: {chunk}")
        new_jsons = detector.process_chunk(chunk)
        if new_jsons:
            logger.info(f"Complete JSON found in this chunk: {new_jsons}")

    logger.info(f"All detected JSONs: {detector.get_all_jsons()}")