"""LangGraph StateGraph assembly + streaming entry point.""" from __future__ import annotations import logging from typing import Any, AsyncIterator from langgraph.graph import END, StateGraph from app.agent.nodes.executor import executor_node from app.agent.nodes.reflector import reflector_node from app.agent.nodes.skill_router import skill_router_node from app.agent.nodes.synthesizer import synthesize from app.agent.state import AgentState, EV_DONE, EV_ERROR from app.config import get_settings from app.skills.registry import REGISTRY logger = logging.getLogger(__name__) def _build_graph() -> Any: g = StateGraph(AgentState) g.add_node("skill_router", skill_router_node) g.add_node("executor", executor_node) g.add_node("reflector", reflector_node) g.add_node("synthesizer", lambda s: s) # placeholder; streaming handled outside g.set_entry_point("skill_router") # After router: if final_answer was set, go to synthesizer; else go to executor def _after_router(state: AgentState) -> str: if state.get("error"): return "synthesizer" if state.get("final_answer"): return "synthesizer" return "executor" g.add_conditional_edges("skill_router", _after_router, { "executor": "executor", "synthesizer": "synthesizer", }) # After executor: always go to reflector g.add_edge("executor", "reflector") # After reflector: if more rounds needed AND round limit not reached, loop back to router def _after_reflector(state: AgentState) -> str: verdict = state.get("reflection_verdict", "sufficient") rounds = state.get("rounds_used", 0) max_rounds = get_settings().agent_max_reflect_rounds if verdict == "need_more" and rounds < max_rounds: return "skill_router" return "synthesizer" g.add_conditional_edges("reflector", _after_reflector, { "skill_router": "skill_router", "synthesizer": "synthesizer", }) g.add_edge("synthesizer", END) return g.compile() _GRAPH = None def get_graph() -> Any: global _GRAPH if _GRAPH is None: _GRAPH = _build_graph() return _GRAPH # ---------- Public streaming entry point ---------- async def run_agent_stream( user_query: str, history: list[dict[str, Any]], session_id: str, ) -> AsyncIterator[dict[str, Any]]: """Stream agent events for a single user turn.""" from app.agent.state import ( EV_MESSAGE_FINAL, EV_REFLECTION, EV_THINK, EV_TOOL_CALL, EV_TOOL_RESULT, ) from app.utils.trace import generate_trace_id if not REGISTRY.list_specs(): yield {"event": EV_ERROR, "data": {"message": "No skills registered"}} yield {"event": EV_DONE, "data": {}} return trace_id = generate_trace_id() init_state: AgentState = { "user_query": user_query, "session_id": session_id, "history": history, "tool_calls": [], "rounds_used": 0, "reflection_verdict": "need_more", "trace_id": trace_id, } yield {"event": EV_THINK, "data": {"step": "entry", "text": f"开始处理:{user_query}", "trace_id": trace_id}} graph = get_graph() final_state: AgentState = dict(init_state) try: async for event in graph.astream(init_state): # event is dict {node_name: node_output} for node_name, node_out in event.items(): if not isinstance(node_out, dict): continue final_state.update(node_out) # Stream per-node events if node_name == "skill_router": tc = (node_out.get("tool_calls") or []) if tc and tc[-1].get("result") is None: last = tc[-1] yield { "event": EV_TOOL_CALL, "data": { "name": last["name"], "args": last.get("args", {}), "trace_id": last.get("trace_id", ""), }, } if node_out.get("final_answer"): yield { "event": EV_THINK, "data": {"step": "router_final", "text": "直接生成最终答案"}, } elif node_name == "executor": tc = (node_out.get("tool_calls") or []) if tc: last = tc[-1] yield { "event": EV_TOOL_RESULT, "data": { "name": last["name"], "ok": last.get("ok", False), "duration_ms": last.get("duration_ms", 0), "trace_id": last.get("trace_id", ""), "result": last.get("result"), "error": last.get("error"), }, } elif node_name == "reflector": yield { "event": EV_REFLECTION, "data": { "verdict": node_out.get("reflection_verdict", "sufficient"), "reason": node_out.get("reflection", ""), }, } except Exception as exc: # noqa: BLE001 logger.exception("agent graph execution failed") yield {"event": EV_ERROR, "data": {"message": f"Agent 执行失败: {exc}", "trace_id": trace_id}} # If the router produced a final answer directly (no synthesizer streaming) if final_state.get("final_answer") and not any( True for _ in [] ): # placeholder check yield { "event": EV_MESSAGE_FINAL, "data": { "content": final_state["final_answer"], "tool_calls": final_state.get("tool_calls", []), }, } else: # Stream synthesizer output async for ev in synthesize(final_state): yield ev yield {"event": EV_DONE, "data": {"trace_id": trace_id}}