| import json | |
| from typing_extensions import TypedDict | |
| from models import extractor, router | |
| from langgraph.graph import StateGraph, START,END | |
| class PlanExecute(TypedDict): | |
| ui_data:dict | |
| query:str | |
| previous_msgs:list | |
| list_keys: list | |
| information:list | |
| async def route_step(state: PlanExecute): | |
| response = await router.ainvoke({"query":state["query"],"previous_messages":state["previous_msgs"]}) | |
| return {"list_keys":response.route} | |
| async def extract_step(state: PlanExecute): | |
| key=state['list_keys'][0] | |
| response = await extractor.ainvoke({"query":state["query"],"data":json.dumps(state['ui_data'][key]),"previous_messages":state["previous_msgs"]}) | |
| state['list_keys'].pop(0) | |
| if response.information!='': | |
| state['information'].append(response.information) | |
| print(state['information']) | |
| def should_end(state: PlanExecute): | |
| if len(state["list_keys"])==0: | |
| return END | |
| else: | |
| return "extract_step" | |
| workflow = StateGraph(PlanExecute) | |
| workflow.add_node("route_step", route_step) | |
| workflow.add_node("extract_step", extract_step) | |
| workflow.add_edge(START, "route_step") | |
| workflow.add_conditional_edges("route_step",should_end,[END, "extract_step"]) | |
| workflow.add_conditional_edges("extract_step",should_end,[END, "extract_step"]) | |
| app_graph = workflow.compile() | |