Spaces:
Running
Running
File size: 4,019 Bytes
3973360 ef0145e 3973360 ef0145e 3973360 ef0145e 6b7a285 ef0145e 6b7a285 ef0145e 3973360 6b7a285 ef0145e 6b7a285 ef0145e 6b7a285 ef0145e 6b7a285 ef0145e 3973360 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
import json
from fastapi import BackgroundTasks
from src.langgraph.multi_agent.planner.planner_flow import planner_app
from src.utils.helper import parse_itinerary
from src.utils.logger import logger
async def message_generator(input_graph, config, background: BackgroundTasks):
try:
last_output_state = None
temp = ""
try:
async for event in planner_app.astream(
input=input_graph,
config=config,
stream_mode=["messages", "values"],
):
try:
event_type, event_message = event
if event_type == "messages":
message, _ = event_message
if message.content:
temp += message.content
message_yield = json.dumps(
{"type": "message", "content": temp},
ensure_ascii=False,
)
# Remove extra newlines to prevent JSON parsing issues
yield message_yield
if event_type == "values":
last_output_state = event_message
except Exception as e:
logger.error(f"Error processing stream event: {str(e)}")
yield json.dumps(
{
"type": "error",
"content": "Error processing planner response " + str(e),
},
ensure_ascii=False,
)
return
if last_output_state is None:
raise ValueError("No output state received from planner workflow")
if "final_answer" not in last_output_state:
raise ValueError("No final answer in planner output")
try:
parser_ouput = parse_itinerary(last_output_state["final_answer"])
final_response = json.dumps(
{
"type": "final",
"content": parser_ouput,
},
ensure_ascii=False,
)
# Send final response without extra newlines
yield final_response
except Exception as e:
logger.error(f"Error parsing itinerary: {str(e)}")
yield json.dumps(
{
"type": "error",
"content": "Error parsing the generated itinerary" + str(e),
},
ensure_ascii=False,
)
return
except Exception as e:
logger.error(f"Error in planner workflow stream: {str(e)}")
yield json.dumps(
{
"type": "error",
"content": "Error processing planner stream" + str(e),
},
ensure_ascii=False,
)
return
except Exception as e:
logger.error(f"Unexpected error in planner: {str(e)}")
yield json.dumps(
{
"type": "error",
"content": "An unexpected error occurred in the planner" + str(e),
},
ensure_ascii=False,
)
return
from pydantic import BaseModel, Field
from datetime import datetime
class Activity(BaseModel):
"""Activity model"""
description: str = Field(
..., description="Short description of the activity can have location"
)
start_time: datetime = Field(..., description="Start time of the activity")
end_time: datetime = Field(..., description="End time of the activity")
class Output(BaseModel):
"""Output model"""
activities: list[Activity] = Field(..., description="List of activities")
note: str = Field(..., description="Note for the user")
|