| """ |
| ReAct 流式解析器:字符级 MarkerDetector + StateMachine |
| |
| 将 LLM 的 ReAct 格式文本实时转换为 OpenAI SSE 流式事件: |
| |
| Thought: xxx → delta.content = "<think>xxx</think>" (流式) |
| Action: name → 缓存工具名 |
| Action Input: {} → delta.tool_calls[0].function.arguments (流式) |
| Final Answer: xxx → delta.content = "xxx" (流式) |
| Observation: xxx → delta.content = "xxx" (流式) |
| 无标记文本 → delta.content = "xxx" (直通) |
| |
| 核心设计: |
| MarkerDetector:默认零延迟直通,仅在遇到 Marker 首字母时暂存等待确认。 |
| StateMachine:IDLE / IN_THOUGHT / IN_ACTION / IN_ACTION_INPUT / |
| IN_OBSERVATION / IN_FINAL |
| """ |
|
|
| import json |
| import uuid |
| from enum import Enum, auto |
|
|
| |
|
|
| |
| _MARKERS: tuple[str, ...] = ( |
| "Thought:", |
| "Action Input:", |
| "Action:", |
| "Observation:", |
| "Final Answer:", |
| "最终答案:", |
| ) |
|
|
| _MARKER_FIRST_CHARS: frozenset[str] = frozenset(m[0] for m in _MARKERS) |
|
|
|
|
| |
|
|
|
|
| class _State(Enum): |
| IDLE = auto() |
| IN_THOUGHT = auto() |
| IN_ACTION = auto() |
| IN_ACTION_INPUT = auto() |
| IN_OBSERVATION = auto() |
| IN_FINAL = auto() |
|
|
|
|
| |
|
|
|
|
| class ReactStreamParser: |
| """ |
| 字符级 ReAct 流解析器,将 LLM 的 ReAct 格式输出转换为 OpenAI SSE chunks。 |
| |
| 用法:: |
| |
| parser = ReactStreamParser(chat_id, model, created, has_tools=True) |
| async for chunk in llm_stream: |
| # 注意:不要对 chunk 做 strip_session_id_suffix,否则客户端收不到会话 ID,下一轮无法复用会话 |
| for sse in parser.feed(chunk): |
| yield sse |
| for sse in parser.finish(): |
| yield sse |
| """ |
|
|
| def __init__( |
| self, |
| chat_id: str, |
| model: str, |
| created: int, |
| *, |
| has_tools: bool = True, |
| ) -> None: |
| self._chat_id = chat_id |
| self._model = model |
| self._created = created |
| self._has_tools = has_tools |
|
|
| |
| self._suspect_buf = "" |
| self._skip_leading_ws = False |
|
|
| |
| self._state = _State.IDLE |
| self._action_name_buf = "" |
| self._tool_call_id = "" |
| self._tool_call_index = 0 |
|
|
| |
| self._emitted_msg_start = False |
| self._think_open = False |
| self._think_closed = False |
| self._tool_call_started = False |
|
|
| |
|
|
| def feed(self, chunk: str) -> list[str]: |
| """处理一个文本 chunk,返回需要下发的 SSE 字符串列表(含 `data: ...\\n\\n`)。""" |
| events: list[str] = [] |
| for char in chunk: |
| events.extend(self._on_char(char)) |
| return events |
|
|
| def finish(self) -> list[str]: |
| """LLM 流结束时调用:flush 残留 suspect_buf,补发结束 SSE。""" |
| events: list[str] = [] |
| if self._suspect_buf: |
| buf, self._suspect_buf = self._suspect_buf, "" |
| events.extend(self._dispatch(buf)) |
| events.extend(self._emit_end()) |
| return events |
|
|
| |
|
|
| def _on_char(self, char: str) -> list[str]: |
| |
| if self._skip_leading_ws: |
| if char in (" ", "\t"): |
| return [] |
| self._skip_leading_ws = False |
|
|
| |
| if not self._has_tools: |
| return self._dispatch(char) |
|
|
| if not self._suspect_buf: |
| if char in _MARKER_FIRST_CHARS: |
| self._suspect_buf = char |
| return [] |
| return self._dispatch(char) |
|
|
| |
| self._suspect_buf += char |
|
|
| matched = self._exact_match() |
| if matched: |
| events = self._on_marker(matched) |
| self._suspect_buf = "" |
| return events |
|
|
| if self._is_prefix(): |
| return [] |
|
|
| |
| buf, self._suspect_buf = self._suspect_buf, "" |
| return self._dispatch(buf) |
|
|
| def _exact_match(self) -> str | None: |
| for m in _MARKERS: |
| if self._suspect_buf == m: |
| return m |
| return None |
|
|
| def _is_prefix(self) -> bool: |
| return any(m.startswith(self._suspect_buf) for m in _MARKERS) |
|
|
| |
|
|
| def _on_marker(self, marker: str) -> list[str]: |
| events: list[str] = [] |
| events.extend(self._exit_state()) |
|
|
| if marker == "Thought:": |
| self._state = _State.IN_THOUGHT |
| events.extend(self._enter_thought()) |
|
|
| elif marker == "Action:": |
| self._state = _State.IN_ACTION |
| self._action_name_buf = "" |
|
|
| elif marker == "Action Input:": |
| |
| if not self._tool_call_started: |
| events.extend(self._start_function_call()) |
| self._state = _State.IN_ACTION_INPUT |
|
|
| elif marker == "Observation:": |
| self._state = _State.IN_OBSERVATION |
|
|
| elif marker in ("Final Answer:", "最终答案:"): |
| self._state = _State.IN_FINAL |
| events.extend(self._enter_final()) |
|
|
| self._skip_leading_ws = True |
| return events |
|
|
| def _exit_state(self) -> list[str]: |
| """离开当前状态时的收尾动作。""" |
| events: list[str] = [] |
| if self._state == _State.IN_THOUGHT: |
| if self._think_open and not self._think_closed: |
| self._think_closed = True |
| events.extend(self._make_content("</think>")) |
| return events |
|
|
| |
|
|
| def _enter_thought(self) -> list[str]: |
| events: list[str] = [] |
| if not self._emitted_msg_start: |
| events.extend(self._emit_msg_start()) |
| |
| self._think_open = True |
| self._think_closed = False |
| events.extend(self._make_content("<think>")) |
| return events |
|
|
| def _enter_final(self) -> list[str]: |
| events: list[str] = [] |
| if not self._emitted_msg_start: |
| events.extend(self._emit_msg_start()) |
| return events |
|
|
| def _start_function_call(self) -> list[str]: |
| """Action 名收集完毕,发送 function_call_start。""" |
| name = self._action_name_buf.strip() |
| self._tool_call_id = f"call_{uuid.uuid4().hex[:8]}" |
| self._tool_call_started = True |
| events: list[str] = [] |
| if not self._emitted_msg_start: |
| events.extend(self._emit_msg_start()) |
| events.extend(self._make_tool_call_start(name)) |
| return events |
|
|
| |
|
|
| def _dispatch(self, text: str) -> list[str]: |
| """将 text 按当前状态路由到对应的输出动作。""" |
| s = self._state |
| events: list[str] = [] |
|
|
| if s == _State.IDLE: |
| if not self._emitted_msg_start: |
| events.extend(self._emit_msg_start()) |
| events.extend(self._make_content(text)) |
|
|
| elif s == _State.IN_THOUGHT: |
| if not self._think_open: |
| |
| events.extend(self._enter_thought()) |
| events.extend(self._make_content(text)) |
|
|
| elif s == _State.IN_ACTION: |
| |
| for ch in text: |
| if ch == "\n": |
| if self._action_name_buf.strip() and not self._tool_call_started: |
| events.extend(self._start_function_call()) |
| else: |
| self._action_name_buf += ch |
|
|
| elif s == _State.IN_ACTION_INPUT: |
| if self._tool_call_started: |
| events.extend(self._make_tool_args(text)) |
|
|
| elif s == _State.IN_OBSERVATION: |
| |
| if not self._emitted_msg_start: |
| events.extend(self._emit_msg_start()) |
| events.extend(self._make_content(text)) |
|
|
| elif s == _State.IN_FINAL: |
| events.extend(self._make_content(text)) |
|
|
| return events |
|
|
| |
|
|
| def _emit_end(self) -> list[str]: |
| events: list[str] = [] |
|
|
| |
| if self._think_open and not self._think_closed: |
| self._think_closed = True |
| events.extend(self._make_content("</think>")) |
|
|
| if self._tool_call_started: |
| events.extend(self._make_tool_calls_finish()) |
| elif self._emitted_msg_start: |
| events.extend(self._make_stop()) |
| else: |
| |
| events.extend(self._emit_msg_start()) |
| events.extend(self._make_stop()) |
|
|
| events.append("data: [DONE]\n\n") |
| return events |
|
|
| |
|
|
| def _emit_msg_start(self) -> list[str]: |
| """发送 role:assistant + content:"" 的首帧。""" |
| self._emitted_msg_start = True |
| return [ |
| self._sse( |
| { |
| "id": self._chat_id, |
| "object": "chat.completion.chunk", |
| "created": self._created, |
| "model": self._model, |
| "choices": [ |
| { |
| "index": 0, |
| "delta": {"role": "assistant", "content": ""}, |
| "logprobs": None, |
| "finish_reason": None, |
| } |
| ], |
| } |
| ) |
| ] |
|
|
| def _make_content(self, text: str) -> list[str]: |
| return [ |
| self._sse( |
| { |
| "id": self._chat_id, |
| "object": "chat.completion.chunk", |
| "created": self._created, |
| "model": self._model, |
| "choices": [ |
| { |
| "index": 0, |
| "delta": {"content": text}, |
| "logprobs": None, |
| "finish_reason": None, |
| } |
| ], |
| } |
| ) |
| ] |
|
|
| def _make_tool_call_start(self, name: str) -> list[str]: |
| """发送 function_call_start:携带 id、type、name 和空 arguments。""" |
| return [ |
| self._sse( |
| { |
| "id": self._chat_id, |
| "object": "chat.completion.chunk", |
| "created": self._created, |
| "model": self._model, |
| "choices": [ |
| { |
| "index": 0, |
| "delta": { |
| "tool_calls": [ |
| { |
| "index": self._tool_call_index, |
| "id": self._tool_call_id, |
| "type": "function", |
| "function": {"name": name, "arguments": ""}, |
| } |
| ] |
| }, |
| "logprobs": None, |
| "finish_reason": None, |
| } |
| ], |
| } |
| ) |
| ] |
|
|
| def _make_tool_args(self, delta: str) -> list[str]: |
| """逐字发送 arguments 增量。""" |
| return [ |
| self._sse( |
| { |
| "id": self._chat_id, |
| "object": "chat.completion.chunk", |
| "created": self._created, |
| "model": self._model, |
| "choices": [ |
| { |
| "index": 0, |
| "delta": { |
| "tool_calls": [ |
| { |
| "index": self._tool_call_index, |
| "function": {"arguments": delta}, |
| } |
| ] |
| }, |
| "logprobs": None, |
| "finish_reason": None, |
| } |
| ], |
| } |
| ) |
| ] |
|
|
| def _make_tool_calls_finish(self) -> list[str]: |
| return [ |
| self._sse( |
| { |
| "id": self._chat_id, |
| "object": "chat.completion.chunk", |
| "created": self._created, |
| "model": self._model, |
| "choices": [ |
| { |
| "index": 0, |
| "delta": {}, |
| "logprobs": None, |
| "finish_reason": "tool_calls", |
| } |
| ], |
| } |
| ) |
| ] |
|
|
| def _make_stop(self) -> list[str]: |
| return [ |
| self._sse( |
| { |
| "id": self._chat_id, |
| "object": "chat.completion.chunk", |
| "created": self._created, |
| "model": self._model, |
| "choices": [ |
| { |
| "index": 0, |
| "delta": {}, |
| "logprobs": None, |
| "finish_reason": "stop", |
| } |
| ], |
| } |
| ) |
| ] |
|
|
| @staticmethod |
| def _sse(obj: dict) -> str: |
| return f"data: {json.dumps(obj, ensure_ascii=False)}\n\n" |
|
|