from langchain_openai import ChatOpenAI from langchain.agents import initialize_agent, AgentType from clearml import Logger, Task from langchain.tools import Tool from langchain.memory import ConversationBufferMemory, SimpleMemory from langchain_community.callbacks import ClearMLCallbackHandler from langchain_core.callbacks import StdOutCallbackHandler from langchain_community.utilities import SerpAPIWrapper import agent.router_agent as router_agent import agent.product_review_agent as product_review_agent import agent.generic_agent as generic_agent import agent.composer_agent as composer_agent import logging # Set httpx (HTTP request) logging to WARNING or ERROR level # This will hide the HTTP request logs while keeping agent thoughts visible logging.getLogger("httpx").setLevel(logging.WARNING) # added on 23-Nob # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Global variables llm = None chat_memory = None query_memory = None agent = None clearml_callback = None def initialize_planning_agent(llm_instance, chat_memory_instance, query_memory_instance, clearml_instance): global llm, chat_memory, query_memory, agent, clearml_callback llm = llm_instance chat_memory = chat_memory_instance query_memory = query_memory_instance clearml_callback = clearml_instance SERPAPI_API_KEY = "619f2302253fbe56448bcf82565caf2a3263d845944682533f10b09a0d1650e6" # Initialize agents router_agent.initialize_router_agent(llm, chat_memory) product_review_agent.initialize_product_review_agent(llm, chat_memory) generic_agent.initialize_generic_agent(llm, chat_memory) # composer_agent.initialize_composer_agent(llm, memory) #ltool = Tool("serpapi", llm=llm, callbacks=clearml_callback) #ltool2 = load_tools("llm-math", llm=llm, callbacks=clearml_callback) tools = [ Tool( name="route_query", func=route_query, description="Determine query type. Returns either 'product_review' or 'generic'" ), Tool( name="get_product_info", func=get_product_info, description="Use this to get product-related data such as features, prices, availability, or reviews" ), Tool( name="handle_generic_query", func=handle_generic_query, description="Use this to get response to user queries which are generic and where the retrieval of product details are not required" ), Tool( name="compose_response", func=compose_response, description="Use this to only format the response. After this step, return the formatted response to main.py" ) ] system_prompt = """You are an efficient AI planning agent. Follow these rules strictly: CRITICAL INSTRUCTION: For simple queries listed below, skip the route_query and directly go to handle_generic_query. SIMPLE QUERIES (NEVER use tools): 1. Greetings: "hi", "hello", "hey", "good morning", "good evening", "good afternoon" 2. Farewells: "bye", "goodbye", "see you", "take care" 3. Thank you messages: "thanks", "thank you", "thanks a lot", "appreciate it" 4. Simple confirmations: "okay", "yes", "no", "sure", "alright" 5. Basic courtesy: "how are you?", "how are you doing?", "what's up?", "what are you doing?" 6. Simple acknowledgments: "got it", "understood", "I see" FOR ALL OTHER QUERIES: 1. Use route_query to determine if query is product_review or generic 2. If route_query returns 'generic', use handle_generic_query and STOP 3. If route_query returns 'product_review', use get_product_info and STOP EXAMPLES: User: "Hi" Thought: This is a Simple greeting, I will use handle_generic_query to get appropriate response Action: handle_generic_query Observation: "Hi! How can I help you today?" Thought: I have got the final answer. I will use compose_responses to format the response. Action: compose_responses Final Answer: "Hi! How can I help you today?" User: "I got my package delivered yesterday. It was delivered very late. I want to file a complaint." Thought: This is a generic query that does not require product details. I will use handle_generic_query to get appropriate response. Action: handle_generic_query Action Input: User query: I got my package delivered yesterday. It was delivered very late. I want to file a complaint. Observation: {'intermediate_steps': [], 'output': "I'm sorry to hear about the delay in your package delivery. I understand your frustration and I'm here to assist you with filing a complaint. To better assist you, could you please provide me with the tracking number of your package? Once I have that information, I will look into the matter and ensure that your feedback is addressed appropriately.", 'action': 'Final Answer', 'action_input': "I'm sorry to hear about the delay in your package delivery. I understand your frustration and I'm here to assist you with filing a complaint. To better assist you, could you please provide me with the tracking number of your package? Once I have that information, I will look into the matter and ensure that your feedback is addressed appropriately."} Thought:I have got the final answer. I will use compose_responses to format the response. Action: compose_responses Final Answer: I'm sorry to hear about the delay in your package delivery. I understand your frustration and I'm here to assist you with filing a complaint. To better assist you, could you please provide me with the tracking number of your package? Once I have that information, I will look into the matter and ensure that your feedback is addressed appropriately. Remember: For simple queries listed above, respond immediately with Final Answer WITHOUT using tools. """ agent = initialize_agent( tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, callbacks=clearml_callback, verbose=True, memory=chat_memory, system_message=system_prompt, early_stopping_method="generate", max_iterations=2 ) logger.info("Planning agent initialized successfully") def summarize_chat_history(): """Summarize the chat history to retain context without overwhelming memory.""" try: if chat_memory: # Retrieve the chat history as a list of messages chat_history = chat_memory.buffer # Assuming this is a list of messages logger.info(f"Chat history type: {type(chat_history)}") if chat_history: # Extract text from AIMessage objects text_messages = [msg.content for msg in chat_history if hasattr(msg, 'content')] logger.info(f"Extracted messages: {text_messages}") # Get the last 5 messages for the summary summary = "\n".join(text_messages[-5:]) # Adjust as needed logger.info(f"Generated summary: {summary}") # Clear the old history chat_memory.clear() # Clear the old history # If the memory allows appending, do that chat_memory.buffer.append(summary) # Append the summary # Or if there's a method to set the buffer, do that: # chat_memory.set_memory([summary]) # If such a method exists except Exception as e: logger.error(f"Error summarizing chat history: {str(e)}") def route_query(query): # Summarize chat history before routing summarize_chat_history() # Get original query from memory if needed original_query = query_memory.memories.get('original_query', query) return router_agent.classify_query(original_query) def get_product_info(query): # Summarize chat history before retrieving product info summarize_chat_history() # Get original query from memory if needed original_query = query_memory.memories.get('original_query', query) response = product_review_agent.process(original_query) return { "intermediate_steps": [], "output": response, "action": "Final Answer", "action_input": response } def handle_generic_query(query): # Summarize chat history before handling generic queries summarize_chat_history() # Get original query from memory if needed original_query = query_memory.memories.get('original_query', query) response = generic_agent.process(original_query) return { "intermediate_steps": [], "output": response, "action": "Final Answer", "action_input": response } def compose_response(response): return composer_agent.compose_response(response) def execute(query): try: # Store original query query_memory.memories['original_query'] = query response = agent.run( f"Process this user query: {query}") # clearml_callback.flush_tracker(langchain_asset=agent,name="Planning agent", finish=True) #clearml_callback.flush_tracker(langchain_asset=llm,name="Planning agent") return response except Exception as e: logger.error(f"Error in planning agent: {str(e)}") return f"Error in planning agent: {str(e)}" def clear_context(): if chat_memory: chat_memory.clear() if query_memory: query_memory.memories.clear() product_review_agent.clear_context() generic_agent.clear_context()