LangGraph_agent / main.py
pratikmurali's picture
Add debug log support for Chainlit and simplify on_message
e43e83e
from dotenv import load_dotenv
import os
from langchain_core.messages import HumanMessage
from langchain_core.runnables.config import RunnableConfig
from config import load_config
from tools import create_tools
from graph import create_agent_graph
import chainlit as cl
import re
# Initialize resources at module level for Chainlit
load_dotenv()
config = load_config()
tools, model = create_tools(config)
graph = create_agent_graph(tools, model)
def clean_content(content):
"""Clean and format content for better readability.
Args:
content: The content to clean
Returns:
Cleaned content string
"""
if not content:
return ""
# Replace escaped newlines with actual newlines
content = content.replace('\\n', '\n')
# Remove excessive backslashes
content = content.replace('\\\\', '\\')
# Format URLs properly
content = re.sub(r'(?<![\[\("])https?://[^\s\'"]+', r'[Link](\g<0>)', content)
# Remove raw markdown links that didn't render
content = re.sub(r'\\*\[([^\]]+)\]\\*\(([^)]+)\)', r'[\1](\2)', content)
# Clean up repeated newlines
content = re.sub(r'\n{3,}', '\n\n', content)
return content
@cl.on_chat_start
async def on_chat_start():
"""Set up the session state when a new chat starts."""
cl.user_session.set("graph", graph)
# Send welcome message
welcome = cl.Message(content="👋 Hello! I'm a research assistant powered by LangGraph. I can search the web using Tavily and query arXiv papers. How can I help you today?")
await welcome.send()
@cl.on_message
async def on_message(message: cl.Message):
"""Process incoming user messages with the LangGraph agent."""
try:
# Get the compiled graph from session
graph = cl.user_session.get("graph")
# Create input with the user's message
inputs = {
"messages": [
HumanMessage(content=message.content)
]
}
# Set up callback handler for streaming - IMPORTANT CHANGE HERE
cb = cl.LangchainCallbackHandler(stream_final_answer=True)
runnable_config = RunnableConfig(callbacks=[cb])
# Send thinking indicator
thinking = cl.Message(content="🤔 Thinking...")
await thinking.send()
# Process with agent using invoke instead of stream to ensure completion
result = await graph.ainvoke(inputs, config=runnable_config)
# Remove thinking indicator once we get results
await thinking.remove()
# If no response was streamed, send the final response explicitly
if "messages" in result and result["messages"]:
final_message = result["messages"][-1]
if hasattr(final_message, "content") and final_message.content:
content = clean_content(final_message.content)
await cl.Message(content=content).send()
except Exception as e:
error_msg = f"An error occurred: {str(e)}"
print(f"ERROR: {error_msg}")
await cl.Message(content=f"❌ {error_msg}").send()
# Only used when running the script directly, not with Chainlit
if __name__ == "__main__":
print("Please run this app with Chainlit: chainlit run main.py")