import json from typing import TypedDict, Annotated from dotenv import load_dotenv from langchain_core.messages import BaseMessage, HumanMessage,ToolMessage,AIMessage,SystemMessage from langchain_groq import ChatGroq from langgraph.graph import START, END, StateGraph from langgraph.graph.message import add_messages # Short-term memory from langgraph.checkpoint.memory import InMemorySaver # Long-term memory from langgraph.store.memory import InMemoryStore # Agent from langchain.agents import create_agent from tool import tool_tavily,time_date,calculator,python_exec,get_weather,wikipedia_search,scrape_website,read_file,format_json,generate_sql,system_info,save_user_preference,list_folders load_dotenv() tools = [tool_tavily,time_date,calculator,python_exec,get_weather,wikipedia_search,scrape_website,read_file,format_json,generate_sql,system_info,save_user_preference,list_folders] # ----------------------------- # Memory Systems # ----------------------------- checkpointer = InMemorySaver() store = InMemoryStore() # ----------------------------- # LLM # ----------------------------- llm = ChatGroq( model="qwen/qwen3-32b", temperature=0, reasoning_format="parsed", ) # ----------------------------- # State # ----------------------------- def get_last_ai_message(messages): """Get the final AI response safely""" for msg in reversed(messages): if isinstance(msg, AIMessage) and msg.content: return msg.content if isinstance(msg.content, str) else str(msg.content) return "No response" def extract_tool_logs(messages, current_user_message): """ Extract tool calls and tool outputs from the latest turn only """ latest_turn = [] found_user_message = False # Walk backwards until we find the current user message for msg in reversed(messages): latest_turn.append(msg) if isinstance(msg, HumanMessage) and msg.content == current_user_message: found_user_message = True break if found_user_message: latest_turn = list(reversed(latest_turn)) else: latest_turn = messages logs = [] for msg in latest_turn: # AI asked to call tool if isinstance(msg, AIMessage) and getattr(msg, "tool_calls", None): for call in msg.tool_calls: tool_name = call.get("name", "unknown_tool") tool_args = call.get("args", {}) logs.append( f"### 🔧 Calling `{tool_name}`\n" f"```json\n{json.dumps(tool_args, indent=2)}\n```" ) # Tool returned output elif isinstance(msg, ToolMessage): tool_name = getattr(msg, "name", "unknown_tool") tool_output = msg.content if isinstance(msg.content, str) else str(msg.content) logs.append( f"### ✅ Output from `{tool_name}`\n" f"```\n{tool_output[:1000]}\n```" ) return "\n\n".join(logs) if logs else "_No tools used in this response._" class ChatState(TypedDict): messages: Annotated[list[BaseMessage], add_messages] # ----------------------------- # Memory Analyzer Node # ----------------------------- def memory_analyzer(state: ChatState, config, store): user_id = config["configurable"]["user_id"] last_message = state["messages"][-1].content prompt = f""" Analyze the message and decide if it contains useful long-term memory. Possible memory categories: - preferences - profile - interests Return JSON format: {{ "store_memory": true/false, "category": "preferences | profile | interests | project", "key": "memory_key", "value": {{}} }} Message: {last_message} """ result = llm.invoke([HumanMessage(content=prompt)]) try: data = json.loads(result.content) if data.get("store_memory"): namespace = (user_id, data["category"]) store.put( namespace, data["key"], data["value"] ) print("Stored memory:", data) except Exception as e: pass return {} # ----------------------------- # Memory Retrieval Node # ----------------------------- def memory_retrieval(state: ChatState, config, store): user_id = config["configurable"]["user_id"] query = state["messages"][-1].content categories = [ "preferences", "profile", "interests", "project" ] all_memories = [] for category in categories: namespace = (user_id, category) results = store.search( namespace, query=query, limit=3 ) all_memories.extend(results) memory_text = "\n".join( [f"{m.namespace[1]}: {m.value}" for m in all_memories] ) if memory_text: state["messages"].append( HumanMessage( content=f"Relevant user memories:\n{memory_text}" ) ) return {"messages": state["messages"]} # def memory_retrieval(state: ChatState, config, store): # user_id = config["configurable"]["user_id"] # query = state["messages"][-1].content # categories = [ # "preferences", # "profile", # "interests", # "project" # ] # all_memories = [] # for category in categories: # namespace = (user_id, category) # results = store.search(namespace, query=query, limit=3) # all_memories.extend(results) # memory_text = "\n".join([f"{m.namespace[1]}: {m.value}" for m in all_memories]) # # FIX: Only return the new message(s) added in this step, not the whole state. # if memory_text: # return { # "messages": [ # HumanMessage(content=f"Relevant user memories:\n{memory_text}") # ] # } # return {} # ----------------------------- # Chat Node # ----------------------------- def chat_node(state: ChatState): agent = create_agent(llm, tools=tools) messages = state["messages"] response = agent.invoke({"messages": messages}) # Return only messages added by the agent (exclude the ones we sent in) new_messages = response["messages"][len(messages):] return {"messages": new_messages} # ----------------------------- # Graph # ----------------------------- graph = StateGraph(ChatState) graph.add_node("memory_analyzer", memory_analyzer) graph.add_node("memory_retrieval", memory_retrieval) graph.add_node("chat_node", chat_node) graph.add_edge(START, "memory_analyzer") graph.add_edge("memory_analyzer", "memory_retrieval") graph.add_edge("memory_retrieval", "chat_node") graph.add_edge("chat_node", END) chatbot = graph.compile( checkpointer=checkpointer, store=store ) # ----------------------------- # Chat Loop # ----------------------------- # config = { # "configurable": { # "thread_id": "thread_1", # "user_id": "user_123" # } # } # messages = [] # while True: # user_input = input("\nUser: ") # if user_input.lower() in ["quit", "exit"]: # break # messages.append(HumanMessage(content=user_input)) # result = chatbot.invoke( # {"messages": messages}, # config # ) # response = result["messages"][-1].content # print("\nAssistant:", response) # messages = result["messages"]