mulitagent / app.py
dpaul93's picture
Upload app.py
45d9925 verified
"""
Multi-Agent Travel Planning System
A LangGraph-based travel assistant with specialized agents for flights, hotels, and itineraries.
"""
import os
import json
from typing import TypedDict, Annotated, List, Optional, Union
import operator
from dotenv import load_dotenv
import gradio as gr
import uuid
# Load environment variables
load_dotenv()
# Core imports
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage, ToolMessage
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
# LangGraph imports
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import InMemorySaver
# Tool imports
from langchain_tavily import TavilySearch
from langchain_core.tools import tool
import serpapi
class TravelPlannerState(TypedDict):
"""State schema for travel multiagent system"""
messages: Annotated[List[BaseMessage], operator.add]
next_agent: Optional[str]
user_query: Optional[str]
class TravelPlannerApp:
"""Main travel planner application class"""
def __init__(self):
# Check for required environment variables
required_vars = ['GOOGLE_API_KEY', 'TAVILY_API_KEY', 'SERPAPI_API_KEY']
missing_vars = [var for var in required_vars if not os.environ.get(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
self.llm = self._setup_llm()
self.tools = self._setup_tools()
self.agents = self._setup_agents()
self.router = self._create_router()
self.workflow = self._build_workflow()
def _setup_llm(self):
"""Initialize the LLM"""
return ChatGoogleGenerativeAI(
model="gemini-2.0-flash-exp",
temperature=0.2,
google_api_key=os.environ.get("GOOGLE_API_KEY")
)
def _setup_tools(self):
"""Setup external tools"""
# Tavily search tool
tavily_tool = TavilySearch(max_results=2)
# Define SERP API tools using @tool decorator
@tool
def search_flights(departure_airport: str, arrival_airport: str,
outbound_date: str, return_date: str = None,
adults: int = 1, children: int = 0) -> str:
"""Search for flights using Google Flights engine via SERP API"""
return self._search_flights(departure_airport, arrival_airport,
outbound_date, return_date, adults, children)
@tool
def search_hotels(location: str, check_in_date: str, check_out_date: str,
adults: int = 1, children: int = 0, rooms: int = 1,
hotel_class: str = None, sort_by: int = 8) -> str:
"""Search for hotels using Google Hotels engine via SERP API"""
return self._search_hotels(location, check_in_date, check_out_date,
adults, children, rooms, hotel_class, sort_by)
return {
"tavily": tavily_tool,
"search_flights": search_flights,
"search_hotels": search_hotels
}
def _search_flights(self, departure_airport: str, arrival_airport: str,
outbound_date: str, return_date: str = None,
adults: int = 1, children: int = 0) -> str:
"""Search for flights using Google Flights engine via SERP API"""
try:
params = {
'api_key': os.environ.get('SERPAPI_API_KEY'),
'engine': 'google_flights',
'hl': 'en',
'gl': 'us',
'departure_id': departure_airport,
'arrival_id': arrival_airport,
'outbound_date': outbound_date,
'currency': 'USD',
'adults': adults,
'children': children,
}
# Set trip type based on return_date
if return_date:
params['return_date'] = return_date
params['type'] = '1' # Round trip
else:
params['type'] = '2' # One way
print(f"πŸ” Searching flights with params: {params}")
# Add timeout to prevent hanging
import time
start_time = time.time()
search = serpapi.search(params)
elapsed = time.time() - start_time
print(f"⏱️ Search completed in {elapsed:.2f} seconds")
if not search.data:
return "No search results returned from SERP API"
# Try different result keys depending on trip type
possible_keys = ['best_flights', 'other_flights', 'flights']
results = None
for key in possible_keys:
if key in search.data and search.data[key]:
results = search.data[key]
break
if not results:
available_keys = list(search.data.keys())
return f"No flights found. Available data keys: {available_keys}"
return json.dumps(results, indent=2)
except Exception as e:
error_msg = f"Flight search failed: {str(e)}"
print(f"❌ {error_msg}")
return error_msg
def _search_hotels(self, location: str, check_in_date: str, check_out_date: str,
adults: int = 1, children: int = 0, rooms: int = 1,
hotel_class: str = None, sort_by: int = 8) -> str:
"""Search for hotels using Google Hotels engine via SERP API"""
try:
adults = int(float(adults)) if adults else 1
children = int(float(children)) if children else 0
rooms = int(float(rooms)) if rooms else 1
sort_by = int(float(sort_by)) if sort_by else 8
params = {
'api_key': os.environ.get('SERPAPI_API_KEY'),
'engine': 'google_hotels',
'hl': 'en',
'gl': 'us',
'q': location,
'check_in_date': check_in_date,
'check_out_date': check_out_date,
'currency': 'USD',
'adults': adults,
'children': children,
'rooms': rooms,
'sort_by': sort_by
}
if hotel_class:
params['hotel_class'] = hotel_class
print(f"πŸ” Searching hotels with params: {params}")
# Add timeout to prevent hanging
import time
start_time = time.time()
search = serpapi.search(params)
elapsed = time.time() - start_time
print(f"⏱️ Search completed in {elapsed:.2f} seconds")
if not search.data:
return "No search results returned from SERP API"
properties = search.data.get('properties', [])
if not properties:
available_keys = list(search.data.keys())
return f"No hotels found in results. Available data keys: {available_keys}"
# Return formatted results
results = []
for hotel in properties[:5]: # Top 5 results
hotel_info = {
'name': hotel.get('name', 'Unknown'),
'price': hotel.get('rate_per_night', 'Price not available'),
'rating': hotel.get('overall_rating', 'No rating'),
'description': hotel.get('description', 'No description'),
'amenities': hotel.get('amenities', [])
}
results.append(hotel_info)
return json.dumps(results, indent=2)
except Exception as e:
error_msg = f"Hotel search failed: {str(e)}"
print(f"❌ {error_msg}")
return error_msg
def _setup_agents(self):
"""Setup all specialized agents"""
# Itinerary Agent
itinerary_prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert travel itinerary planner. ONLY respond to travel planning and itinerary-related questions.
IMPORTANT RULES:
- If asked about non-travel topics (weather, math, general questions), politely decline and redirect to travel planning
- Always provide complete, well-formatted itineraries with specific details
- Include timing, locations, transportation, and practical tips
Use the ReAct approach:
1. THOUGHT: Analyze what travel information is needed
2. ACTION: Search for current information about destinations, attractions, prices, hours
3. OBSERVATION: Process the search results
4. Provide a comprehensive, formatted response
Available tools:
- tavily_search_results_json: Search for current travel information
Format your itineraries with:
- Clear day-by-day breakdown
- Specific times and locations
- Transportation between locations
- Estimated costs when possible
- Practical tips and recommendations"""),
MessagesPlaceholder(variable_name="messages"),
])
# Flight Agent
flight_prompt = ChatPromptTemplate.from_messages([
("system", """You are a flight booking expert. ONLY respond to flight-related queries.
IMPORTANT RULES:
- If asked about non-flight topics, politely decline and redirect to flight booking
- Always use the search_flights tool to find current flight information
- For one-way flights: only provide departure_airport, arrival_airport, and outbound_date
- For round-trip flights: include return_date parameter
- CRITICAL: When parsing dates, pay attention to the year mentioned by the user
- If no year is specified, assume the current year (2025)
- Format dates as YYYY-MM-DD (e.g., 2025-07-15 for July 15, 2025)
Available tools:
- search_flights: Search for comprehensive flight data
Parameters for search_flights:
- departure_airport: 3-letter airport code (e.g., "DEL", "JFK")
- arrival_airport: 3-letter airport code (e.g., "LHR", "LAX", "DXB")
- outbound_date: Date in YYYY-MM-DD format (IMPORTANT: Use correct year!)
- return_date: Optional, only for round-trip flights
- adults: Number of adult passengers (default: 1)
- children: Number of child passengers (default: 0)
Examples:
- "15 Jul 2025" β†’ "2025-07-15"
- "July 15, 2025" β†’ "2025-07-15"
- "15th July 2025" β†’ "2025-07-15"
- "15 Jul" (no year specified) β†’ "2025-07-15"
Process:
1. ALWAYS search for flights first using the tool
2. Analyze the results to find flights matching user preferences
3. Present organized results with clear recommendations
Airport code mapping:
- Delhi: DEL
- London Heathrow: LHR
- London Gatwick: LGW
- Dubai: DXB
- New York JFK: JFK
- New York LaGuardia: LGA
- New York Newark: EWR
- etc."""),
MessagesPlaceholder(variable_name="messages"),
])
# Hotel Agent
hotel_prompt = ChatPromptTemplate.from_messages([
("system", """You are a hotel booking expert. ONLY respond to hotel and accommodation-related queries.
IMPORTANT RULES:
- If asked about non-hotel topics, politely decline and redirect to hotel booking
- Always use the search_hotels tool to find current hotel information
- Provide detailed hotel options with prices, ratings, amenities, and location details
- Include practical booking advice and tips
- You CAN search and analyze results for different criteria like star ratings, price ranges, amenities
Available tools:
- search_hotels: Search for hotels using Google Hotels engine
When searching hotels:
- If check-out date is not provided in the initial request, assume a 1-night stay (add 1 day to check-in date)
- Always proceed with the search even if some details are missing
- Format dates as YYYY-MM-DD
For hotel searches, you need:
- Location/destination
- Check-in date (YYYY-MM-DD format)
- Check-out date (YYYY-MM-DD format)
- Number of guests (adults, children)
- Number of rooms
- Hotel preferences (star rating, amenities, etc.)
Present results with:
- Hotel name and star rating
- Price per night and total cost
- Key amenities and features
- Location and nearby attractions
- Booking recommendations
If user provides a follow-up response after asking for clarification, immediately proceed with the hotel search using all available information."""),
MessagesPlaceholder(variable_name="messages"),
])
# Bind tools to agents
itinerary_agent = itinerary_prompt | self.llm.bind_tools([self.tools["tavily"]])
flight_agent = flight_prompt | self.llm.bind_tools([self.tools["search_flights"]])
hotel_agent = hotel_prompt | self.llm.bind_tools([self.tools["search_hotels"]])
return {
"itinerary": itinerary_agent,
"flight": flight_agent,
"hotel": hotel_agent
}
def _create_router(self):
"""Create routing logic for agent selection"""
router_prompt = ChatPromptTemplate.from_messages([
("system", """You are a routing expert for a travel planning system.
Analyze the user's query and decide which specialist agent should handle it:
- FLIGHT: Flight bookings, airlines, air travel, flight search, tickets, airports, departures, arrivals, airline prices
- HOTEL: Hotels, accommodations, stays, rooms, hotel bookings, lodging, resorts, hotel search, hotel prices
- ITINERARY: Travel itineraries, trip planning, destinations, activities, attractions, sightseeing, travel advice, weather, culture, food, general travel questions
Respond with ONLY one word: FLIGHT, HOTEL, or ITINERARY
Examples:
"Book me a flight to Paris" β†’ FLIGHT
"Find hotels in Tokyo" β†’ HOTEL
"Plan my 5-day trip to Italy" β†’ ITINERARY
"Search flights from NYC to London" β†’ FLIGHT
"Where should I stay in Bali?" β†’ HOTEL
"What are the best attractions in Rome?" β†’ ITINERARY
"I need airline tickets" β†’ FLIGHT
"Show me hotel options" β†’ HOTEL
"Create an itinerary for Japan" β†’ ITINERARY"""),
("user", "Query: {query}")
])
router_chain = router_prompt | self.llm | StrOutputParser()
def route_query(state):
"""Router function - decides which agent to call next"""
user_message = state["messages"][-1].content
try:
decision = router_chain.invoke({"query": user_message}).strip().upper()
agent_mapping = {
"FLIGHT": "flight_agent",
"HOTEL": "hotel_agent",
"ITINERARY": "itinerary_agent"
}
next_agent = agent_mapping.get(decision, "itinerary_agent")
return next_agent
except Exception:
return "itinerary_agent"
return route_query
def _ensure_valid_content(self, content):
"""Ensure content is valid and not empty for Gemini API"""
if not content:
return "No results available"
# Convert to string if not already
content_str = str(content)
# Check if empty or whitespace only
if not content_str or not content_str.strip():
return "No results available"
# Ensure minimum length
if len(content_str.strip()) < 3:
return f"Limited results: {content_str.strip()}"
return content_str
def _itinerary_agent_node(self, state: TravelPlannerState):
"""Itinerary planning agent node"""
messages = state["messages"]
response = self.agents["itinerary"].invoke({"messages": messages})
if hasattr(response, 'tool_calls') and response.tool_calls:
tool_messages = []
for tool_call in response.tool_calls:
if tool_call['name'] == 'tavily_search_results_json':
try:
print(f"πŸ” Tavily search query: {tool_call['args'].get('query', 'No query')}")
# Use the direct search method instead of invoke
search_query = tool_call['args'].get('query', '')
if search_query:
tool_result = self.tools["tavily"].search(search_query, max_results=2)
else:
tool_result = "No search query provided"
print(f"πŸ“‹ Tavily raw result: {type(tool_result)} - {str(tool_result)[:200]}...")
# Handle different response types
if isinstance(tool_result, list):
if len(tool_result) == 0:
tool_result = "No search results found"
else:
tool_result = json.dumps(tool_result, indent=2)
elif isinstance(tool_result, dict):
tool_result = json.dumps(tool_result, indent=2)
# Ensure valid content for Gemini API
tool_result = self._ensure_valid_content(tool_result)
print(f"βœ… Processed tool result length: {len(tool_result)}")
except Exception as e:
print(f"❌ Tavily search error: {e}")
tool_result = f"Search failed: {str(e)}"
tool_messages.append(ToolMessage(
content=tool_result,
tool_call_id=tool_call['id']
))
if tool_messages:
all_messages = messages + [response] + tool_messages
try:
final_response = self.agents["itinerary"].invoke({"messages": all_messages})
return {"messages": [response] + tool_messages + [final_response]}
except Exception as e:
print(f"❌ Error in final response: {e}")
# Return a fallback response
fallback_response = self.agents["itinerary"].invoke({"messages": messages})
return {"messages": [fallback_response]}
return {"messages": [response]}
def _flight_agent_node(self, state: TravelPlannerState):
"""Flight booking agent node"""
messages = state["messages"]
try:
response = self.agents["flight"].invoke({"messages": messages})
if hasattr(response, 'tool_calls') and response.tool_calls:
tool_messages = []
for tool_call in response.tool_calls:
if tool_call['name'] == 'search_flights':
try:
print(f"✈️ Flight search with args: {tool_call['args']}")
tool_result = self.tools["search_flights"].invoke(tool_call['args'])
# Ensure valid content for Gemini API
tool_result = self._ensure_valid_content(tool_result)
print(f"βœ… Flight search completed, result length: {len(tool_result)}")
except Exception as e:
print(f"❌ Flight search error: {e}")
tool_result = f"Flight search failed: {str(e)}"
tool_messages.append(ToolMessage(
content=tool_result,
tool_call_id=tool_call['id']
))
if tool_messages:
all_messages = messages + [response] + tool_messages
try:
final_response = self.agents["flight"].invoke({"messages": all_messages})
return {"messages": [response] + tool_messages + [final_response]}
except Exception as e:
print(f"❌ Error in flight final response: {e}")
# Return a fallback response
fallback_response = self.agents["flight"].invoke({"messages": messages})
return {"messages": [fallback_response]}
return {"messages": [response]}
except Exception as e:
print(f"❌ Error in flight agent node: {e}")
# Create a fallback response
from langchain_core.messages import AIMessage
fallback_msg = AIMessage(content=f"I apologize, but I encountered an error while processing your flight request. Please try again with your flight search query.")
return {"messages": [fallback_msg]}
def _hotel_agent_node(self, state: TravelPlannerState):
"""Hotel booking agent node"""
messages = state["messages"]
try:
response = self.agents["hotel"].invoke({"messages": messages})
if hasattr(response, 'tool_calls') and response.tool_calls:
tool_messages = []
for tool_call in response.tool_calls:
if tool_call['name'] == 'search_hotels':
try:
print(f"🏨 Hotel search with args: {tool_call['args']}")
tool_result = self.tools["search_hotels"].invoke(tool_call['args'])
# Ensure valid content for Gemini API
tool_result = self._ensure_valid_content(tool_result)
print(f"βœ… Hotel search completed, result length: {len(tool_result)}")
except Exception as e:
print(f"❌ Hotel search error: {e}")
tool_result = f"Hotel search failed: {str(e)}"
tool_messages.append(ToolMessage(
content=tool_result,
tool_call_id=tool_call['id']
))
if tool_messages:
all_messages = messages + [response] + tool_messages
try:
final_response = self.agents["hotel"].invoke({"messages": all_messages})
return {"messages": [response] + tool_messages + [final_response]}
except Exception as e:
print(f"❌ Error in hotel final response: {e}")
# Return a fallback response
fallback_response = self.agents["hotel"].invoke({"messages": messages})
return {"messages": [fallback_response]}
return {"messages": [response]}
except Exception as e:
print(f"❌ Error in hotel agent node: {e}")
# Create a fallback response
from langchain_core.messages import AIMessage
fallback_msg = AIMessage(content=f"I apologize, but I encountered an error while processing your hotel request. Please try again with your hotel search query.")
return {"messages": [fallback_msg]}
def _router_node(self, state: TravelPlannerState):
"""Router node - determines which agent should handle the query"""
user_message = state["messages"][-1].content
next_agent = self.router(state)
return {
"next_agent": next_agent,
"user_query": user_message
}
def _route_to_agent(self, state: TravelPlannerState):
"""Conditional edge function - routes to appropriate agent"""
next_agent = state.get("next_agent")
if next_agent == "flight_agent":
return "flight_agent"
elif next_agent == "hotel_agent":
return "hotel_agent"
elif next_agent == "itinerary_agent":
return "itinerary_agent"
else:
return "itinerary_agent"
def _build_workflow(self):
"""Build the complete LangGraph workflow"""
workflow = StateGraph(TravelPlannerState)
# Add nodes
workflow.add_node("router", self._router_node)
workflow.add_node("flight_agent", self._flight_agent_node)
workflow.add_node("hotel_agent", self._hotel_agent_node)
workflow.add_node("itinerary_agent", self._itinerary_agent_node)
# Set entry point
workflow.set_entry_point("router")
# Add conditional edges
workflow.add_conditional_edges(
"router",
self._route_to_agent,
{
"flight_agent": "flight_agent",
"hotel_agent": "hotel_agent",
"itinerary_agent": "itinerary_agent"
}
)
# Add edges to END
workflow.add_edge("flight_agent", END)
workflow.add_edge("hotel_agent", END)
workflow.add_edge("itinerary_agent", END)
# Compile with memory
checkpointer = InMemorySaver()
return workflow.compile(checkpointer=checkpointer)
def chat(self, message: str, thread_id: str = "default"):
"""Process a single message and return response"""
try:
config = {"configurable": {"thread_id": thread_id}}
result = self.workflow.invoke(
{"messages": [HumanMessage(content=message)]},
config
)
# Ensure we have a valid response
if not result.get("messages") or len(result["messages"]) == 0:
return "I apologize, but I didn't receive a proper response. Please try your request again."
last_message = result["messages"][-1]
# Check if the last message has content
if hasattr(last_message, 'content') and last_message.content:
return last_message.content
else:
return "I apologize, but I didn't generate a proper response. Please try your request again."
except Exception as e:
print(f"❌ Error in chat method: {e}")
return f"I encountered an error while processing your request: {str(e)}. Please try again."
def chat_stream(self, message: str, thread_id: str = "default"):
"""Stream response for a message"""
config = {"configurable": {"thread_id": thread_id}}
for chunk in self.workflow.stream(
{"messages": [HumanMessage(content=message)]},
config
):
yield chunk
# For LangGraph Cloud deployment
app = TravelPlannerApp()
# Gradio Interface Functions
def create_gradio_interface():
"""Create and configure the Gradio interface"""
def chat_function(message, history, session_id):
"""Handle chat messages with session memory"""
try:
# Use session_id as thread_id for maintaining conversation context
response = app.chat(message, thread_id=session_id)
return response
except Exception as e:
return f"❌ Error: {str(e)}"
def reset_conversation():
"""Reset conversation by returning new session ID"""
return str(uuid.uuid4())
# Create the Gradio interface
with gr.Blocks(
title="🧳 Multi-Agent Travel Planner",
theme=gr.themes.Soft(),
css="""
.gradio-container {
max-width: 900px !important;
}
.chat-message {
font-size: 14px !important;
}
"""
) as demo:
gr.Markdown("""
# 🧳 Multi-Agent Travel Planning System
**Your AI-powered travel assistant with specialized agents for:**
- ✈️ **Flight Search & Booking** - Find and compare flights
- 🏨 **Hotel Search & Booking** - Discover accommodations
- πŸ—ΊοΈ **Itinerary Planning** - Create detailed travel plans
Just type your travel question and let our agents help you plan your perfect trip!
""")
# Session state for maintaining conversation context
session_id = gr.State(value=str(uuid.uuid4()))
# Chat interface
chatbot = gr.Chatbot(
label="Travel Assistant",
height=500,
show_label=True,
container=True,
bubble_full_width=False
)
with gr.Row():
msg = gr.Textbox(
placeholder="Ask me about flights, hotels, or travel planning...",
label="Your Message",
scale=4,
container=False
)
send_btn = gr.Button("Send", scale=1, variant="primary")
with gr.Row():
clear_btn = gr.Button("Clear Chat", scale=1)
gr.Markdown("**Examples:** *Find flights from NYC to London*, *Hotels in Tokyo for 3 nights*, *Plan a 5-day trip to Italy*")
# Event handlers
def respond(message, history, session_id):
if not message.strip():
return history, ""
# Add user message to history
history.append([message, None])
# Get bot response
bot_response = chat_function(message, history, session_id)
# Add bot response to history
history[-1][1] = bot_response
return history, ""
def clear_chat():
return [], str(uuid.uuid4())
# Wire up the events
msg.submit(
respond,
inputs=[msg, chatbot, session_id],
outputs=[chatbot, msg]
)
send_btn.click(
respond,
inputs=[msg, chatbot, session_id],
outputs=[chatbot, msg]
)
clear_btn.click(
clear_chat,
outputs=[chatbot, session_id]
)
# Example buttons
gr.Examples(
examples=[
"Give me flight from delhi to dubai for 15 Aug 2025",
"any good 5 start hotel in dubai for my stay there from 15 Aug to 17 Aug 2025",
"Plan a 2 day itinerary for my dubai trip",
"Hey, I'm a foodie anything to try there"
],
inputs=msg,
label="Example Queries"
)
gr.Markdown("""
---
πŸ’‘ **Tips:**
- Be specific with dates, locations, and preferences
- The system remembers your conversation context
- Each agent specializes in their domain for better results
""")
return demo
def main():
"""Main function to launch the Gradio interface"""
print("πŸš€ Starting Multi-Agent Travel Planning System...")
try:
# Create and launch the Gradio interface
demo = create_gradio_interface()
# Launch the interface
demo.launch(
share=False, # Set to True if you want to create a public link
#server_name="127.0.0.1", # Use localhost instead of 0.0.0.0
# server_port=7860,
# show_error=True,
# quiet=False,
# inbrowser=True # Automatically open browser
)
except Exception as e:
print(f"❌ Error launching interface: {str(e)}")
print("Please check your environment variables and dependencies.")
if __name__ == "__main__":
main()