File size: 6,373 Bytes
f992c25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
"""LangGraph StateGraph assembly + streaming entry point."""
from __future__ import annotations

import logging
from typing import Any, AsyncIterator

from langgraph.graph import END, StateGraph

from app.agent.nodes.executor import executor_node
from app.agent.nodes.reflector import reflector_node
from app.agent.nodes.skill_router import skill_router_node
from app.agent.nodes.synthesizer import synthesize
from app.agent.state import AgentState, EV_DONE, EV_ERROR
from app.config import get_settings
from app.skills.registry import REGISTRY

logger = logging.getLogger(__name__)


def _build_graph() -> Any:
    g = StateGraph(AgentState)
    g.add_node("skill_router", skill_router_node)
    g.add_node("executor", executor_node)
    g.add_node("reflector", reflector_node)
    g.add_node("synthesizer", lambda s: s)  # placeholder; streaming handled outside

    g.set_entry_point("skill_router")

    # After router: if final_answer was set, go to synthesizer; else go to executor
    def _after_router(state: AgentState) -> str:
        if state.get("error"):
            return "synthesizer"
        if state.get("final_answer"):
            return "synthesizer"
        return "executor"

    g.add_conditional_edges("skill_router", _after_router, {
        "executor": "executor",
        "synthesizer": "synthesizer",
    })

    # After executor: always go to reflector
    g.add_edge("executor", "reflector")

    # After reflector: if more rounds needed AND round limit not reached, loop back to router
    def _after_reflector(state: AgentState) -> str:
        verdict = state.get("reflection_verdict", "sufficient")
        rounds = state.get("rounds_used", 0)
        max_rounds = get_settings().agent_max_reflect_rounds
        if verdict == "need_more" and rounds < max_rounds:
            return "skill_router"
        return "synthesizer"

    g.add_conditional_edges("reflector", _after_reflector, {
        "skill_router": "skill_router",
        "synthesizer": "synthesizer",
    })

    g.add_edge("synthesizer", END)

    return g.compile()


_GRAPH = None


def get_graph() -> Any:
    global _GRAPH
    if _GRAPH is None:
        _GRAPH = _build_graph()
    return _GRAPH


# ---------- Public streaming entry point ----------


async def run_agent_stream(
    user_query: str,
    history: list[dict[str, Any]],
    session_id: str,
) -> AsyncIterator[dict[str, Any]]:
    """Stream agent events for a single user turn."""
    from app.agent.state import (
        EV_MESSAGE_FINAL,
        EV_REFLECTION,
        EV_THINK,
        EV_TOOL_CALL,
        EV_TOOL_RESULT,
    )
    from app.utils.trace import generate_trace_id

    if not REGISTRY.list_specs():
        yield {"event": EV_ERROR, "data": {"message": "No skills registered"}}
        yield {"event": EV_DONE, "data": {}}
        return

    trace_id = generate_trace_id()
    init_state: AgentState = {
        "user_query": user_query,
        "session_id": session_id,
        "history": history,
        "tool_calls": [],
        "rounds_used": 0,
        "reflection_verdict": "need_more",
        "trace_id": trace_id,
    }
    yield {"event": EV_THINK, "data": {"step": "entry", "text": f"开始处理:{user_query}", "trace_id": trace_id}}

    graph = get_graph()
    final_state: AgentState = dict(init_state)

    try:
        async for event in graph.astream(init_state):
            # event is dict {node_name: node_output}
            for node_name, node_out in event.items():
                if not isinstance(node_out, dict):
                    continue
                final_state.update(node_out)
                # Stream per-node events
                if node_name == "skill_router":
                    tc = (node_out.get("tool_calls") or [])
                    if tc and tc[-1].get("result") is None:
                        last = tc[-1]
                        yield {
                            "event": EV_TOOL_CALL,
                            "data": {
                                "name": last["name"],
                                "args": last.get("args", {}),
                                "trace_id": last.get("trace_id", ""),
                            },
                        }
                    if node_out.get("final_answer"):
                        yield {
                            "event": EV_THINK,
                            "data": {"step": "router_final", "text": "直接生成最终答案"},
                        }
                elif node_name == "executor":
                    tc = (node_out.get("tool_calls") or [])
                    if tc:
                        last = tc[-1]
                        yield {
                            "event": EV_TOOL_RESULT,
                            "data": {
                                "name": last["name"],
                                "ok": last.get("ok", False),
                                "duration_ms": last.get("duration_ms", 0),
                                "trace_id": last.get("trace_id", ""),
                                "result": last.get("result"),
                                "error": last.get("error"),
                            },
                        }
                elif node_name == "reflector":
                    yield {
                        "event": EV_REFLECTION,
                        "data": {
                            "verdict": node_out.get("reflection_verdict", "sufficient"),
                            "reason": node_out.get("reflection", ""),
                        },
                    }
    except Exception as exc:  # noqa: BLE001
        logger.exception("agent graph execution failed")
        yield {"event": EV_ERROR, "data": {"message": f"Agent 执行失败: {exc}", "trace_id": trace_id}}

    # If the router produced a final answer directly (no synthesizer streaming)
    if final_state.get("final_answer") and not any(
        True for _ in []
    ):  # placeholder check
        yield {
            "event": EV_MESSAGE_FINAL,
            "data": {
                "content": final_state["final_answer"],
                "tool_calls": final_state.get("tool_calls", []),
            },
        }
    else:
        # Stream synthesizer output
        async for ev in synthesize(final_state):
            yield ev

    yield {"event": EV_DONE, "data": {"trace_id": trace_id}}