cr-agent / agents.py
Sibi Krishnamoorthy
fix some prompts
a06b39a
import os
from typing import Annotated, Literal, TypedDict
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, AIMessage, ToolMessage
from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from tools import get_current_weather, get_weather_forecast, duckduckgo_search, read_document_with_docling
# LLM Configuration with Fallback
def get_llm(temperature=0):
"""Get LLM with fallback support for OpenAI, Google GenAI, and Ollama."""
openai_key = os.getenv("OPENAI_API_KEY")
google_key = os.getenv("GOOGLE_API_KEY")
ollama_base_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
ollama_model = os.getenv("OLLAMA_MODEL", "llama3.2:3b-instruct-q6_K")
# Check for placeholder strings
is_openai_valid = openai_key and "your_openai_api_key" not in openai_key and len(openai_key) > 20
is_google_valid = google_key and "your_google_genai_api_key" not in google_key and len(google_key) > 20
# Try OpenAI first if valid
# if is_openai_valid:
# try:
# from langchain_openai import ChatOpenAI
# return ChatOpenAI(
# temperature=temperature,
# model=os.getenv("OPENAI_MODEL", "gpt-3.5-turbo"),
# base_url=os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
# )
# except Exception as e:
# print(f"OpenAI initialization failed: {e}")
# Fallback to Google GenAI if valid
if is_google_valid:
try:
from langchain_google_genai import ChatGoogleGenerativeAI
return ChatGoogleGenerativeAI(model="gemma-3-12b", temperature=temperature)
except Exception as e:
print(f"Google GenAI initialization failed: {e}")
# Fallback to Ollama if configured
if ollama_base_url:
try:
from langchain_ollama import ChatOllama
print(f"Using Ollama fallback: {ollama_model} at {ollama_base_url}")
return ChatOllama(model=ollama_model, base_url=ollama_base_url, temperature=temperature)
except Exception as e:
print(f"Ollama initialization failed: {e}")
# If all invalid or fail, but keys exist, try anyway (last resort)
if openai_key:
from langchain_openai import ChatOpenAI
return ChatOpenAI(temperature=temperature,model=os.getenv("OPENAI_MODEL","gpt-4o"),base_url=os.getenv("OPENAI_BASE_URL","https://api.openai.com/v1"))
if google_key:
from langchain_google_genai import ChatGoogleGenerativeAI
return ChatGoogleGenerativeAI(model="gemma-3-12b", temperature=temperature)
raise ValueError("No valid LLM configured. Set OPENAI_API_KEY, GOOGLE_API_KEY, or OLLAMA_BASE_URL in .env")
from database import engine, get_session
from models import Meeting
from sqlmodel import select, Session
# --- SQL Tool for Agent 4 ---
# We implement this manually or use LangChain's SQLDatabase,
# but since we use SQLModel/DuckDB, we can write a specific tool/chain.
from langchain_community.utilities import SQLDatabase
from langchain.chains import create_sql_query_chain
from langchain_community.tools.sql_database.tool import QuerySQLDataBaseTool
# Setup SQL Database for LangChain (Moved inside query_db_node to avoid top-level inspection issues)
from datetime import datetime, timedelta
def query_db_node(state):
"""Agent 4: NL to SQL."""
# Initialize SQLDatabase lazily
db = SQLDatabase(engine)
messages = state["messages"]
last_user_message = messages[-1].content
# We'll use a simple chain here with SQLite-specific guidance
llm = get_llm(temperature=0)
# Create a custom prompt that emphasizes SQLite syntax
from langchain_core.prompts import PromptTemplate
# Get current date for SQL queries (to avoid timezone issues with SQLite's 'now')
from datetime import datetime
current_date = datetime.now().strftime('%Y-%m-%d')
tomorrow_date = (datetime.now() + timedelta(days=1)).strftime('%Y-%m-%d')
sqlite_prompt = PromptTemplate.from_template(
"""You are an AI assistant that generates SQL queries.
CONTEXT:
- Today's date is: {current_date}
- Tomorrow's date is: {tomorrow_date}
TABLE INFO:
Table 'meeting': id, title, description, start_time, end_time, participants
SCHEMA:
{table_info}
RULES:
1. Use SQLite syntax.
2. To filter by date, use: date(start_time) = 'YYYY-MM-DD'
3. Return ONLY the SQL query. No markdown. No explanation.
Question: {input}
SQL Query:"""
)
try:
# Get table info
table_info = db.get_table_info()
# Generate query with SQLite-specific prompt
prompt_input = {
"input": last_user_message,
"table_info": table_info,
"current_date": current_date,
"tomorrow_date": tomorrow_date
}
formatted_prompt = sqlite_prompt.format(**prompt_input)
print(f"🔍 SQL Prompt sent to LLM:\n{formatted_prompt}")
response = llm.invoke([HumanMessage(content=formatted_prompt)])
# Extract SQL from response
sql_query = response.content.strip()
print(f"🔍 Generated SQL: {sql_query}")
# Clean up the query
if "SQLQuery:" in sql_query:
sql_query = sql_query.split("SQLQuery:")[-1].strip()
# Remove markdown code blocks if present
sql_query = sql_query.replace("```sql", "").replace("```", "").strip()
# Attempt to extract SQL if there's still conversational text
import re
# Look for SELECT ... FROM ... (case insensitive, multiline)
sql_match = re.search(r'(SELECT\s+.*)', sql_query, re.IGNORECASE | re.DOTALL)
if sql_match:
sql_query = sql_match.group(1)
sql_query = sql_query.rstrip(';').strip()
print(f"🔍 Executing SQL: {sql_query}")
# Execute the cleaned query
try:
result = db.run(sql_query)
except Exception as e:
return {"messages": [AIMessage(content=f"❌ SQL Execution Error:\nQuery: `{sql_query}`\nError: {e}")]}
# Format results into natural language
if result and result != "[]":
# Parse the result string (it's typically a string representation of a list of tuples)
import ast
try:
parsed_result = ast.literal_eval(result)
if isinstance(parsed_result, list) and len(parsed_result) > 0:
# Format based on what's being queried
if "meeting" in last_user_message.lower():
formatted_results = []
for row in parsed_result:
# Handle both tuple and dict results (SQLDatabase can return both depending on config)
if isinstance(row, dict):
title = row.get("title", "Meeting")
description = row.get("description", "")
location = row.get("location", "")
start_time = row.get("start_time", "")
end_time = row.get("end_time", "")
participants = row.get("participants", "")
elif len(row) >= 7:
# id, title, description, location, start_time, end_time, participants
meeting_id, title, description, location, start_time, end_time, participants = row[:7]
elif len(row) >= 6:
# id, title, description, start_time, end_time, participants (old schema)
meeting_id, title, description, start_time, end_time, participants = row[:6]
location = ""
else:
# Fallback for partial selects
title = row[0] if len(row) > 0 else "Meeting"
start_time = row[1] if len(row) > 1 else ""
description = ""
location = ""
end_time = ""
participants = ""
# Format datetime to human-readable format
try:
from datetime import datetime as dt
# Handle various formats
start_str = str(start_time).replace('.000000', '')
if ' ' in start_str:
start_dt = dt.strptime(start_str, "%Y-%m-%d %H:%M:%S")
else:
start_dt = dt.fromisoformat(start_str)
end_str = str(end_time).replace('.000000', '')
if ' ' in end_str:
end_dt = dt.strptime(end_str, "%Y-%m-%d %H:%M:%S")
else:
end_dt = dt.fromisoformat(end_str)
# Format as "Jan 3, 2026 at 2:00 PM"
start_formatted = start_dt.strftime("%b %d, %Y at %I:%M %p")
end_formatted = end_dt.strftime("%I:%M %p")
time_display = f"{start_formatted} to {end_formatted}"
except Exception as e:
# Fallback if parsing fails
time_display = f"{start_time} to {end_time}"
# Format location display
location_display = f"\n Location: {location}" if location else ""
formatted_results.append(
f"📅 **{title}**"
f"\n\n{time_display}{location_display}"
f"\n\n{description}"
f"\n\nParticipants: {participants}"
)
response_text = f"Found {len(parsed_result)} meeting(s):\n\n" + "\n\n".join(formatted_results)
else:
# Generic formatting for other queries
response_text = f"Found {len(parsed_result)} result(s):\n\n"
for row in parsed_result:
response_text += f"• {', '.join(str(item) for item in row)}\n"
else:
response_text = f"Query executed successfully.\n\nResult: {result}"
except (ValueError, SyntaxError):
# If parsing fails, use LLM to format the result
format_prompt = f"""Format this SQL query result into natural language:
Query: {sql_query}
Raw Result: {result}
Provide a clear, human-readable response."""
format_response = llm.invoke([SystemMessage(content=format_prompt)])
response_text = format_response.content
else:
response_text = f"No results found.\n(Debug: Executed `{sql_query}`)"
except Exception as e:
response_text = f"Error querying database: {e}"
return {"messages": [AIMessage(content=response_text)]}
# We need a `schedule_meeting` tool for Agent 3.
from langchain_core.tools import tool
@tool
def schedule_meeting(title: str, start_time_str: str, end_time_str: str, participants: str = "", city: str = "") -> str:
"""
Schedule a meeting in the database after checking weather conditions.
Only schedules if weather is good (Clear, Clouds, Fair conditions).
Args:
title: Meeting title
start_time_str: Start time in ISO format (YYYY-MM-DDTHH:MM:SS)
end_time_str: End time in ISO format (YYYY-MM-DDTHH:MM:SS)
participants: Comma-separated list of participants
city: City to check weather for (required for weather-conditional scheduling)
Returns:
Success or failure message with reasoning
"""
from datetime import datetime
import requests
try:
start_time = datetime.fromisoformat(start_time_str)
end_time = datetime.fromisoformat(end_time_str)
except ValueError:
return "Invalid date format. Use ISO format (YYYY-MM-DDTHH:MM:SS)."
# Check weather if city is provided
if city:
api_key = os.getenv("OPENWEATHERMAP_API_KEY")
if api_key:
try:
url = f"http://api.openweathermap.org/data/2.5/forecast?q={city}&appid={api_key}&units=metric"
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
# Check forecast for the meeting time
weather_condition = "unknown"
# Look for forecast closest to meeting time
if 'list' in data and len(data['list']) > 0:
# Get main weather condition from first available forecast
weather_condition = data['list'][0]['weather'][0]['main']
# Evaluate if weather is good
bad_conditions = ['Rain', 'Drizzle', 'Thunderstorm', 'Snow', 'Mist', 'Fog']
good_conditions = ['Clear', 'Clouds']
if weather_condition in bad_conditions:
return f"❌ Meeting NOT scheduled. Weather condition '{weather_condition}' is unfavorable in {city}. Recommendation: Reschedule to a day with better weather."
elif weather_condition not in good_conditions:
return f"⚠️ Meeting NOT scheduled. Weather condition '{weather_condition}' is uncertain in {city}. Recommendation: Check forecast again closer to meeting time."
except Exception as e:
return f"Weather check failed: {e}. Meeting not scheduled for safety."
# Check for schedule conflicts
with Session(engine) as session:
statement = select(Meeting).where(
(Meeting.start_time < end_time) & (Meeting.end_time > start_time)
)
conflicts = session.exec(statement).all()
if conflicts:
conflict_details = ", ".join([f"'{m.title}' ({m.start_time} - {m.end_time})" for m in conflicts])
return f"❌ Meeting conflict detected with: {conflict_details}. Please choose a different time slot."
# Schedule the meeting
meeting = Meeting(
title=title,
start_time=start_time,
end_time=end_time,
participants=participants,
description=f"Weather-checked meeting in {city}" if city else None
)
session.add(meeting)
session.commit()
weather_note = f" (Weather in {city} is favorable)" if city else ""
return f"✅ Meeting '{title}' scheduled successfully from {start_time} to {end_time}{weather_note}."
# --- State ---
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
file_path: str | None # For Agent 2
# --- Router ---
def router(state) -> Literal["weather_agent", "doc_agent", "meeting_agent", "sql_agent", "__end__"]:
messages = state["messages"]
last_message = messages[-1]
# Simple keyword based or specific routing LLM.
# For robust agentic behavior, we should use a router chain.
# But to follow the "Agent" boxes in the diagram, let's explicitely route.
# We can use an LLM to classify.
llm = get_llm(temperature=0)
system = """You are a router. Classify the user query into ONE of these agents:
1. 'weather_agent': ONLY for standalone weather questions (no meeting scheduling).
Examples: "What's the weather?", "Will it rain tomorrow?"
2. 'meeting_agent': For scheduling/creating NEW meetings OR cancelling/deleting meetings.
Examples: "Schedule a meeting", "Book a team meeting", "Cancel all meetings", "Unschedule tomorrow's meetings"
3. 'sql_agent': For querying EXISTING meetings (show, list, find).
Examples: "Show all meetings", "What meetings do I have tomorrow?", "List scheduled meetings"
4. 'doc_agent': For document analysis or general knowledge.
Examples: "What's in this PDF?", "Explain the policy", "What are AI trends?"
CRITICAL: "Schedule", "book", "cancel", "unschedule", "delete" → meeting_agent, NOT sql_agent!
Return ONLY ONE agent name."""
# We can use structured output or just string.
response = llm.invoke([SystemMessage(content=system), last_message])
decision = response.content.strip().lower()
# Priority routing (order matters!)
if "meeting" in decision and ("schedule" in last_message.content.lower() or "book" in last_message.content.lower() or "create" in last_message.content.lower()):
return "meeting_agent"
if "meeting_agent" in decision:
return "meeting_agent"
if "weather_agent" in decision:
return "weather_agent"
if "sql_agent" in decision:
return "sql_agent"
if "doc_agent" in decision:
return "doc_agent"
# Keyword fallback
query_lower = last_message.content.lower()
if any(word in query_lower for word in ["schedule", "book", "arrange", "set up", "cancel", "unschedule", "delete", "remove"]) and "meeting" in query_lower:
return "meeting_agent"
if any(word in query_lower for word in ["show", "list", "display", "find", "get"]) and "meeting" in query_lower:
return "sql_agent"
if "weather" in query_lower and "meeting" not in query_lower:
return "weather_agent"
# Default fallback
return "doc_agent"
# --- Agent Nodes ---
def weather_agent_node(state):
llm = get_llm(temperature=0)
tools = [get_current_weather, get_weather_forecast]
llm_with_tools = llm.bind_tools(tools)
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response]}
def doc_agent_node(state):
"""Document + Web Intelligence Agent with FORCED RAG execution."""
llm = get_llm(temperature=0.1)
file_path = state.get("file_path")
# If file uploaded, FORCE tool execution instead of asking model
if file_path:
import os
from tools import ingest_document_to_vector_store, search_vector_store, duckduckgo_search
doc_id = os.path.basename(file_path).replace('.', '_')
user_query = state["messages"][-1].content
# STEP 1: Force ingest (deterministic)
print(f"🔴 FORCING ingest_document_to_vector_store('{file_path}', '{doc_id}', is_temporary=True)")
try:
ingest_result = ingest_document_to_vector_store.invoke({
"file_path": file_path,
"document_id": doc_id,
"is_temporary": True
})
print(f"✅ Ingest result: {ingest_result}")
except Exception as e:
print(f"❌ Ingest failed: {e}")
ingest_result = f"Error: {e}"
# STEP 2: Force search (deterministic)
print(f"🔴 FORCING search_vector_store('{user_query}', '{doc_id}', search_type='temporary')")
try:
search_results = search_vector_store.invoke({
"query": user_query,
"document_id": doc_id,
"top_k": 3,
"search_type": "temporary"
})
print(f"✅ Search results: {search_results[:200]}...")
# Parse similarity score from results
import re
scores = re.findall(r'Similarity: ([\d\.]+)', search_results)
max_score = float(scores[0]) if scores else 0.0
print(f"📊 Best similarity score: {max_score}")
except Exception as e:
print(f"❌ Search failed: {e}")
search_results = f"Error: {e}"
max_score = 0.0
# STEP 3: Decide if we need web search (< 0.7 threshold)
web_results = ""
if max_score < 0.7:
print(f"⚠️ Low confidence ({max_score} < 0.7), calling web search")
try:
web_results = duckduckgo_search.invoke({"query": user_query})
print(f"🌐 Web search results: {web_results[:200]}...")
except Exception as e:
print(f"❌ Web search failed: {e}")
web_results = f"Web search error: {e}"
# STEP 4: Ask LLM to synthesize answer from results
synthesis_prompt = f"""You are answering based on the following information:
DOCUMENT SEARCH RESULTS (Similarity: {max_score:.2f}):
{search_results}
{f'WEB SEARCH RESULTS (fallback):{chr(10)}{web_results}' if web_results else ''}
USER QUESTION: {user_query}
Provide a clear, accurate answer based on the information above."""
response = llm.invoke([HumanMessage(content=synthesis_prompt)])
print(f"📤 LLM Response content: {response.content[:200]}...")
return {"messages": [response]}
# No file uploaded - search persistent documents first, then web
else:
from tools import search_vector_store, duckduckgo_search
user_query = state["messages"][-1].content
# Try searching all persistent documents first (empty string searches all)
print(f"🔍 No file uploaded, searching persistent documents for: {user_query}")
try:
search_results = search_vector_store.invoke({
"query": user_query,
"document_id": "",
"top_k": 3,
"search_type": "persistent"
})
print(f"📋 Raw search results:\n{search_results}")
# Parse similarity score
import re
scores = re.findall(r'Similarity: ([\d\.]+)', search_results)
max_score = float(scores[0]) if scores else 0.0
print(f"📊 Best persistent doc score: {max_score}")
# If good match in persistent docs, use it
if max_score >= 0.5: # Lower threshold for persistent docs
print(f"✅ Found relevant info in persistent documents (score: {max_score})")
synthesis_prompt = f"""Answer based on company documents:
COMPANY DOCUMENTS:
{search_results}
USER QUESTION: {user_query}
Provide a clear answer based on the company documents above."""
response = llm.invoke([HumanMessage(content=synthesis_prompt)])
print(f"📤 LLM Response content: {response.content[:200]}...")
return {"messages": [response]}
except Exception as e:
print(f"⚠️ Persistent doc search failed: {e}")
# Fallback to web search if no good persistent doc match
print(f"🌐 Using web search for: {user_query}")
try:
web_results = duckduckgo_search.invoke({"query": user_query})
synthesis_prompt = f"""Answer the question using this web search information:
WEB SEARCH RESULTS:
{web_results}
USER QUESTION: {user_query}
Provide a clear answer."""
response = llm.invoke([HumanMessage(content=synthesis_prompt)])
print(f"📤 LLM Response content: {response.content[:200]}...")
return {"messages": [response]}
except Exception as e:
print(f"⚠️ Web search exception: {e}")
response = llm.invoke(state["messages"])
print(f"📤 LLM Response content: {response.content[:200]}...")
return {"messages": [response]}
def meeting_agent_node_implementation(state):
"""Meeting Scheduling and Cancellation Agent with FORCED weather check."""
llm = get_llm(temperature=0.1)
user_query = state["messages"][-1].content
from tools import get_weather_forecast, schedule_meeting, cancel_meetings
from datetime import datetime, timedelta
# Check if this is a cancellation request
query_lower = user_query.lower()
if any(word in query_lower for word in ["cancel", "unschedule", "delete", "remove"]) and ("meeting" in query_lower or "meetings" in query_lower):
# Parse cancellation request
date_filter = "all"
if "tomorrow" in query_lower:
date_filter = "tomorrow"
elif "today" in query_lower:
date_filter = "today"
print(f"🗑️ FORCING cancel_meetings(date_filter='{date_filter}')")
try:
cancel_result = cancel_meetings.invoke({"date_filter": date_filter, "meeting_ids": ""})
print(f"✅ Cancel result: {cancel_result}")
return {"messages": [AIMessage(content=cancel_result)]}
except Exception as e:
print(f"❌ Cancellation failed: {e}")
return {"messages": [AIMessage(content=f"❌ Failed to cancel meetings: {e}")]}
# Parse meeting request using LLM
parse_prompt = f"""You are a JSON extraction assistant. Extract meeting information from the user's request.
User Request: "{user_query}"
Extract these fields and return ONLY a valid JSON object (no code, no explanation):
- title: meeting title as a string
- date: "tomorrow", "today", or "YYYY-MM-DD"
- time: time in 24-hour format like "14:00"
- city: city name (default: "Chennai")
- location: specific venue
- participants: comma-separated participant names
- duration_hours: meeting duration as a number (default: 1)
Rules:
1. If this is just a greeting ("hi", "hello"), return: {{}}
2. If this is NOT a meeting request, return: {{}}
3. If critical details (date/time) are missing, return: {{}}
4. Return ONLY the JSON object. No Python code. No markdown.
Valid example: {{"title": "Team Meeting", "date": "tomorrow", "time": "14:00", "city": "Chennai", "location": "Conference Room A", "participants": "John, Sarah", "duration_hours": 1}}
JSON:"""
parse_response = llm.invoke([HumanMessage(content=parse_prompt)])
print(f"📋 Parsed meeting request: {parse_response.content}")
# Extract JSON from response
import json
import re
json_match = re.search(r'\{[\s\S]*?\}', parse_response.content)
if json_match:
try:
meeting_data = json.loads(json_match.group())
# Handle empty JSON (Greeting or unclear request)
if not meeting_data:
print("⚠️ Empty JSON received, treating as greeting/general chat")
greeting_prompt = f"The user said: '{user_query}'. This was routed to the meeting agent but contains no meeting details. Please respond appropriately (e.g. return a greeting or ask for meeting details)."
greeting_response = llm.invoke([HumanMessage(content=greeting_prompt)])
return {"messages": [greeting_response]}
# Convert date to actual datetime
if "tomorrow" in meeting_data.get("date", "").lower():
meeting_date = (datetime.now() + timedelta(days=1)).strftime("%Y-%m-%d")
days_ahead = 1
elif "today" in meeting_data.get("date", "").lower():
meeting_date = datetime.now().strftime("%Y-%m-%d")
days_ahead = 0
else:
meeting_date = meeting_data.get("date", datetime.now().strftime("%Y-%m-%d"))
days_ahead = (datetime.strptime(meeting_date, "%Y-%m-%d") - datetime.now()).days
# Convert time to 24hr format
time_str = meeting_data.get("time", "14:00")
if "pm" in time_str.lower() and "12" not in time_str:
hour = int(re.findall(r'\d+', time_str)[0]) + 12
time_24hr = f"{hour:02d}:00"
else:
time_24hr = re.sub(r'[^\d:]', '', time_str)
if len(time_24hr) <= 2:
time_24hr = f"{time_24hr}:00"
start_time = f"{meeting_date} {time_24hr}:00"
end_time = f"{meeting_date} {int(time_24hr.split(':')[0]) + meeting_data.get('duration_hours', 1):02d}:{time_24hr.split(':')[1]}:00"
city = meeting_data.get("city", "Chennai")
location = meeting_data.get("location", city)
# STEP 1: Force weather check
print(f"🌤️ FORCING get_weather_forecast('{city}', {days_ahead})")
try:
weather_data = get_weather_forecast.invoke({"city": city})
# Extract weather description from forecast data
if isinstance(weather_data, dict) and 'list' in weather_data:
# Get first forecast entry (next 3 hours)
first_forecast = weather_data['list'][0] if weather_data['list'] else {}
weather_desc = first_forecast.get('weather', [{}])[0].get('description', 'unknown')
temp = first_forecast.get('main', {}).get('temp', 'N/A')
weather_result = f"{weather_desc}, {temp}°C"
else:
weather_result = str(weather_data)[:200]
print(f"✅ Weather: {weather_result}")
# Evaluate weather
bad_conditions = ["rain", "drizzle", "thunderstorm", "snow", "mist", "fog"]
is_bad_weather = any(cond in weather_result.lower() for cond in bad_conditions)
weather_emoji = "❌" if is_bad_weather else "✅"
except Exception as e:
print(f"❌ Weather check failed: {e}")
weather_result = "Unknown"
weather_emoji = "⚠️"
is_bad_weather = False
# STEP 2: Schedule meeting (even if bad weather, just warn)
print(f"📅 FORCING schedule_meeting('{meeting_data.get('title')}', {start_time}, {end_time})")
try:
schedule_result = schedule_meeting.invoke({
"title": meeting_data.get("title", "Meeting"),
"description": f"Weather: {weather_result[:100]}",
"start_time": start_time,
"end_time": end_time,
"participants": meeting_data.get("participants", ""),
"location": location
})
print(f"✅ Schedule result: {schedule_result}")
# Build response
response_text = f"{weather_emoji} Meeting scheduled!\n\n"
response_text += f"Title: {meeting_data.get('title')}\n\n"
response_text += f"Time: {start_time} to {end_time}\n\n"
response_text += f"Location: {location}\n\n"
response_text += f"Participants: {meeting_data.get('participants')}\n\n"
response_text += f"Weather: {weather_result[:200]}\n\n"
if is_bad_weather:
response_text += "⚠️ Warning: Weather conditions may not be ideal for this meeting."
return {"messages": [AIMessage(content=response_text)]}
return {"messages": [AIMessage(content=response_text)]}
except Exception as e:
print(f"❌ Scheduling failed: {e}")
return {"messages": [AIMessage(content=f"❌ Failed to schedule: {e}")]}
except Exception as e:
print(f"❌ Parsing failed: {e}")
return {"messages": [AIMessage(content=f"Could not parse meeting request: {e}. Please provide title, date, time, and participants.")]}
# Fallback if parsing fails
return {"messages": [AIMessage(content="Could not understand meeting request. Please specify: title, date/time, and participants.")]}
# --- Graph Construction ---
workflow = StateGraph(AgentState)
# Nodes
workflow.add_node("weather_agent", weather_agent_node)
workflow.add_node("doc_agent", doc_agent_node)
workflow.add_node("meeting_agent", meeting_agent_node_implementation)
workflow.add_node("sql_agent", query_db_node)
# Tool Node (Shared or separate? For simplicity, we can use a generic prebuilt ToolNode
# but each agent has different tools. So we need to handle tool calls.
# The nodes above (except sql) return an AIMessage which MIGHT have tool_calls.
# We need to execute those tools.
from langgraph.prebuilt import ToolNode
# Import cancel_meetings tool
from tools import cancel_meetings
# Define tool nodes for each agent to ensure they only access their allowed tools
weather_tools_node = ToolNode([get_current_weather, get_weather_forecast])
doc_tools_node = ToolNode([read_document_with_docling, duckduckgo_search])
meeting_tools_node = ToolNode([get_weather_forecast, schedule_meeting, cancel_meetings])
workflow.add_node("weather_tools", weather_tools_node)
workflow.add_node("doc_tools", doc_tools_node)
workflow.add_node("meeting_tools", meeting_tools_node)
# Conditional Edges for tools
def should_continue(state):
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return END
# Creating the flow
# Router -> Agent -> (if tool) -> ToolNode -> Agent ...
# To simplify, we'll let the Router pick the start node.
workflow.add_conditional_edges(START, router, {
"weather_agent": "weather_agent",
"doc_agent": "doc_agent",
"meeting_agent": "meeting_agent",
"sql_agent": "sql_agent"
})
# Weather Flow
workflow.add_conditional_edges("weather_agent", should_continue, {"tools": "weather_tools", END: END})
workflow.add_edge("weather_tools", "weather_agent")
# Doc Flow
workflow.add_conditional_edges("doc_agent", should_continue, {"tools": "doc_tools", END: END})
workflow.add_edge("doc_tools", "doc_agent")
# Meeting Flow
workflow.add_conditional_edges("meeting_agent", should_continue, {"tools": "meeting_tools", END: END})
workflow.add_edge("meeting_tools", "meeting_agent")
# SQL Flow (No tools, just runs)
workflow.add_edge("sql_agent", END)
app = workflow.compile()