Chatbot / memory_chatbot.py
yashAI007's picture
add folder list tool
fbf7ee2
import json
from typing import TypedDict, Annotated
from dotenv import load_dotenv
from langchain_core.messages import BaseMessage, HumanMessage,ToolMessage,AIMessage,SystemMessage
from langchain_groq import ChatGroq
from langgraph.graph import START, END, StateGraph
from langgraph.graph.message import add_messages
# Short-term memory
from langgraph.checkpoint.memory import InMemorySaver
# Long-term memory
from langgraph.store.memory import InMemoryStore
# Agent
from langchain.agents import create_agent
from tool import tool_tavily,time_date,calculator,python_exec,get_weather,wikipedia_search,scrape_website,read_file,format_json,generate_sql,system_info,save_user_preference,list_folders
load_dotenv()
tools = [tool_tavily,time_date,calculator,python_exec,get_weather,wikipedia_search,scrape_website,read_file,format_json,generate_sql,system_info,save_user_preference,list_folders]
# -----------------------------
# Memory Systems
# -----------------------------
checkpointer = InMemorySaver()
store = InMemoryStore()
# -----------------------------
# LLM
# -----------------------------
llm = ChatGroq(
model="qwen/qwen3-32b",
temperature=0,
reasoning_format="parsed",
)
# -----------------------------
# State
# -----------------------------
def get_last_ai_message(messages):
"""Get the final AI response safely"""
for msg in reversed(messages):
if isinstance(msg, AIMessage) and msg.content:
return msg.content if isinstance(msg.content, str) else str(msg.content)
return "No response"
def extract_tool_logs(messages, current_user_message):
"""
Extract tool calls and tool outputs from the latest turn only
"""
latest_turn = []
found_user_message = False
# Walk backwards until we find the current user message
for msg in reversed(messages):
latest_turn.append(msg)
if isinstance(msg, HumanMessage) and msg.content == current_user_message:
found_user_message = True
break
if found_user_message:
latest_turn = list(reversed(latest_turn))
else:
latest_turn = messages
logs = []
for msg in latest_turn:
# AI asked to call tool
if isinstance(msg, AIMessage) and getattr(msg, "tool_calls", None):
for call in msg.tool_calls:
tool_name = call.get("name", "unknown_tool")
tool_args = call.get("args", {})
logs.append(
f"### 🔧 Calling `{tool_name}`\n"
f"```json\n{json.dumps(tool_args, indent=2)}\n```"
)
# Tool returned output
elif isinstance(msg, ToolMessage):
tool_name = getattr(msg, "name", "unknown_tool")
tool_output = msg.content if isinstance(msg.content, str) else str(msg.content)
logs.append(
f"### ✅ Output from `{tool_name}`\n"
f"```\n{tool_output[:1000]}\n```"
)
return "\n\n".join(logs) if logs else "_No tools used in this response._"
class ChatState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
# -----------------------------
# Memory Analyzer Node
# -----------------------------
def memory_analyzer(state: ChatState, config, store):
user_id = config["configurable"]["user_id"]
last_message = state["messages"][-1].content
prompt = f"""
Analyze the message and decide if it contains useful long-term memory.
Possible memory categories:
- preferences
- profile
- interests
Return JSON format:
{{
"store_memory": true/false,
"category": "preferences | profile | interests | project",
"key": "memory_key",
"value": {{}}
}}
Message:
{last_message}
"""
result = llm.invoke([HumanMessage(content=prompt)])
try:
data = json.loads(result.content)
if data.get("store_memory"):
namespace = (user_id, data["category"])
store.put(
namespace,
data["key"],
data["value"]
)
print("Stored memory:", data)
except Exception as e:
pass
return {}
# -----------------------------
# Memory Retrieval Node
# -----------------------------
def memory_retrieval(state: ChatState, config, store):
user_id = config["configurable"]["user_id"]
query = state["messages"][-1].content
categories = [
"preferences",
"profile",
"interests",
"project"
]
all_memories = []
for category in categories:
namespace = (user_id, category)
results = store.search(
namespace,
query=query,
limit=3
)
all_memories.extend(results)
memory_text = "\n".join(
[f"{m.namespace[1]}: {m.value}" for m in all_memories]
)
if memory_text:
state["messages"].append(
HumanMessage(
content=f"Relevant user memories:\n{memory_text}"
)
)
return {"messages": state["messages"]}
# def memory_retrieval(state: ChatState, config, store):
# user_id = config["configurable"]["user_id"]
# query = state["messages"][-1].content
# categories = [
# "preferences",
# "profile",
# "interests",
# "project"
# ]
# all_memories = []
# for category in categories:
# namespace = (user_id, category)
# results = store.search(namespace, query=query, limit=3)
# all_memories.extend(results)
# memory_text = "\n".join([f"{m.namespace[1]}: {m.value}" for m in all_memories])
# # FIX: Only return the new message(s) added in this step, not the whole state.
# if memory_text:
# return {
# "messages": [
# HumanMessage(content=f"Relevant user memories:\n{memory_text}")
# ]
# }
# return {}
# -----------------------------
# Chat Node
# -----------------------------
def chat_node(state: ChatState):
agent = create_agent(llm, tools=tools)
messages = state["messages"]
response = agent.invoke({"messages": messages})
# Return only messages added by the agent (exclude the ones we sent in)
new_messages = response["messages"][len(messages):]
return {"messages": new_messages}
# -----------------------------
# Graph
# -----------------------------
graph = StateGraph(ChatState)
graph.add_node("memory_analyzer", memory_analyzer)
graph.add_node("memory_retrieval", memory_retrieval)
graph.add_node("chat_node", chat_node)
graph.add_edge(START, "memory_analyzer")
graph.add_edge("memory_analyzer", "memory_retrieval")
graph.add_edge("memory_retrieval", "chat_node")
graph.add_edge("chat_node", END)
chatbot = graph.compile(
checkpointer=checkpointer,
store=store
)
# -----------------------------
# Chat Loop
# -----------------------------
# config = {
# "configurable": {
# "thread_id": "thread_1",
# "user_id": "user_123"
# }
# }
# messages = []
# while True:
# user_input = input("\nUser: ")
# if user_input.lower() in ["quit", "exit"]:
# break
# messages.append(HumanMessage(content=user_input))
# result = chatbot.invoke(
# {"messages": messages},
# config
# )
# response = result["messages"][-1].content
# print("\nAssistant:", response)
# messages = result["messages"]