File size: 6,022 Bytes
6172a47 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | """Think tag parser for extracting reasoning content from responses."""
from collections.abc import Iterator
from dataclasses import dataclass
from enum import Enum
class ContentType(Enum):
"""Type of content chunk."""
TEXT = "text"
THINKING = "thinking"
@dataclass
class ContentChunk:
"""A chunk of parsed content."""
type: ContentType
content: str
class ThinkTagParser:
"""
Streaming parser for <think>...</think> tags.
Handles partial tags at chunk boundaries by buffering.
"""
OPEN_TAG = "<think>"
CLOSE_TAG = "</think>"
OPEN_TAG_LEN = 7
CLOSE_TAG_LEN = 8
def __init__(self):
self._buffer: str = ""
self._in_think_tag: bool = False
@property
def in_think_mode(self) -> bool:
"""Whether currently inside a think tag."""
return self._in_think_tag
def feed(self, content: str) -> Iterator[ContentChunk]:
"""
Feed content and yield parsed chunks.
Handles partial tags by buffering content near potential tag boundaries.
Uses an iterative loop instead of mutual recursion to avoid stack overflow
on inputs with many consecutive think tags.
"""
self._buffer += content
while self._buffer:
prev_len = len(self._buffer)
if not self._in_think_tag:
chunk = self._parse_outside_think()
else:
chunk = self._parse_inside_think()
if chunk:
yield chunk
elif len(self._buffer) == prev_len:
# No progress: waiting for more data
break
def _parse_outside_think(self) -> ContentChunk | None:
"""Parse content outside think tags."""
think_start = self._buffer.find(self.OPEN_TAG)
orphan_close = self._buffer.find(self.CLOSE_TAG)
# Handle orphan </think> - strip it (Step Fun AI sends reasoning via
# reasoning_content but may leak closing tags in content)
if orphan_close != -1 and (think_start == -1 or orphan_close < think_start):
pre_orphan = self._buffer[:orphan_close]
self._buffer = self._buffer[orphan_close + self.CLOSE_TAG_LEN :]
if pre_orphan:
return ContentChunk(ContentType.TEXT, pre_orphan)
# Buffer shrunk; the feed() loop will continue parsing
return None
if think_start == -1:
# No tag found - check for partial tag at end
# We buffer any trailing '<' and subsequent characters that could be part of <think> or </think>
last_bracket = self._buffer.rfind("<")
if last_bracket != -1:
potential_tag = self._buffer[last_bracket:]
tag_len = len(potential_tag)
# Check if could be partial <think> or </think>
if (
tag_len < self.OPEN_TAG_LEN
and self.OPEN_TAG.startswith(potential_tag)
) or (
tag_len < self.CLOSE_TAG_LEN
and self.CLOSE_TAG.startswith(potential_tag)
):
emit = self._buffer[:last_bracket]
self._buffer = self._buffer[last_bracket:]
if emit:
return ContentChunk(ContentType.TEXT, emit)
return None
# No partial tag found or it's irrelevant
emit = self._buffer
self._buffer = ""
if emit:
return ContentChunk(ContentType.TEXT, emit)
return None
else:
# Found <think> tag
pre_think = self._buffer[:think_start]
self._buffer = self._buffer[think_start + self.OPEN_TAG_LEN :]
self._in_think_tag = True
if pre_think:
return ContentChunk(ContentType.TEXT, pre_think)
# Buffer shrunk (consumed <think>); the feed() loop will continue
# parsing inside the think tag on the next iteration
return None
def _parse_inside_think(self) -> ContentChunk | None:
"""Parse content inside think tags."""
think_end = self._buffer.find(self.CLOSE_TAG)
if think_end == -1:
# No closing tag - check for partial at end
last_bracket = self._buffer.rfind("<")
if (
last_bracket != -1
and len(self._buffer) - last_bracket < self.CLOSE_TAG_LEN
):
# Check if the partial string could be the start of </think>
potential_tag = self._buffer[last_bracket:]
if self.CLOSE_TAG.startswith(potential_tag):
emit = self._buffer[:last_bracket]
self._buffer = self._buffer[last_bracket:]
if emit:
return ContentChunk(ContentType.THINKING, emit)
return None
emit = self._buffer
self._buffer = ""
if emit:
return ContentChunk(ContentType.THINKING, emit)
return None
else:
# Found </think> tag
thinking_content = self._buffer[:think_end]
self._buffer = self._buffer[think_end + self.CLOSE_TAG_LEN :]
self._in_think_tag = False
if thinking_content:
return ContentChunk(ContentType.THINKING, thinking_content)
# Buffer shrunk (consumed </think>); the feed() loop will continue
# parsing outside the think tag on the next iteration
return None
def flush(self) -> ContentChunk | None:
"""Flush any remaining buffered content."""
if self._buffer:
chunk_type = (
ContentType.THINKING if self._in_think_tag else ContentType.TEXT
)
content = self._buffer
self._buffer = ""
return ContentChunk(chunk_type, content)
return None
|