import os import shutil from typing import TypedDict, Optional, List, Annotated from datetime import datetime from langchain_core.messages import HumanMessage, AIMessage, BaseMessage from langgraph.graph import START, END, StateGraph from langgraph.graph.message import add_messages from langgraph.prebuilt import ToolNode, tools_condition from langfuse.langchain import CallbackHandler try: # Try relative imports first (when used as package) from .tools import ( wikipedia_search, youtube_search, decode_text, download_and_process_file, web_search, evaluate_computation ) except ImportError: # Fall back to absolute imports (when run directly) from tools import ( wikipedia_search, youtube_search, decode_text, download_and_process_file, web_search, evaluate_computation ) from langchain_google_genai import ChatGoogleGenerativeAI # --- Agent State following LangGraph pattern --- class AgentState(TypedDict): # Messages for LLM interactions (includes question and all conversation) messages: Annotated[List[BaseMessage], add_messages] # Task ID for file downloads task_id: Optional[str] # File classification results requires_file: Optional[bool] # Final answer final_answer: Optional[str] # --- Flexible Tool-Based Agent --- class FlexibleAgent: def __init__(self): # Initialize Gemini chat model for LangChain integration self.chat = ChatGoogleGenerativeAI( model="gemini-2.5-flash", temperature=0.0, max_tokens=None ) # Define available tools self.tools = [ wikipedia_search, youtube_search, decode_text, web_search, download_and_process_file, evaluate_computation ] # Bind tools to the LLM self.chat_with_tools = self.chat.bind_tools(self.tools) # Initialize Langfuse CallbackHandler for tracing try: self.langfuse_handler = CallbackHandler() print("✅ Langfuse CallbackHandler initialized successfully") except Exception as e: print(f"⚠️ Warning: Could not initialize Langfuse CallbackHandler: {e}") print(" Continuing without Langfuse tracing...") self.langfuse_handler = None # Create questions directory for logging self.questions_dir = "questions" # Clear previous question files if os.path.exists(self.questions_dir): shutil.rmtree(self.questions_dir) os.makedirs(self.questions_dir, exist_ok=True) self.question_counter = 0 # Build the graph following LangGraph pattern self._build_graph() print("FlexibleAgent initialized with Gemini LLM and LangGraph workflow.") def log_full_conversation(self, question: str, final_state: dict, answer: str): """Log the complete conversation including all tool calls and LLM interactions""" self.question_counter += 1 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"question_{self.question_counter:03d}_{timestamp}.txt" filepath = os.path.join(self.questions_dir, filename) with open(filepath, 'w', encoding='utf-8') as f: f.write(f"Question #{self.question_counter}\n") f.write(f"Timestamp: {datetime.now().isoformat()}\n") f.write(f"Question: {question}\n") f.write("="*60 + "\n") f.write("FULL CONVERSATION TRACE:\n") f.write("="*60 + "\n\n") # Log all messages in the conversation messages = final_state.get("messages", []) for i, message in enumerate(messages): f.write(f"--- Message {i+1}: {type(message).__name__} ---\n") f.write(f"Content: {message.content}\n") # If it's an AI message with tool calls, log the tool calls if hasattr(message, 'tool_calls') and message.tool_calls: f.write(f"Tool Calls: {len(message.tool_calls)}\n") for j, tool_call in enumerate(message.tool_calls): # Handle both dict and object formats if hasattr(tool_call, 'name'): f.write(f" Tool {j+1}: {tool_call.name}\n") f.write(f" Arguments: {tool_call.args}\n") f.write(f" ID: {tool_call.id}\n") elif isinstance(tool_call, dict): f.write(f" Tool {j+1}: {tool_call.get('name', 'unknown')}\n") f.write(f" Arguments: {tool_call.get('args', {})}\n") f.write(f" ID: {tool_call.get('id', 'unknown')}\n") else: f.write(f" Tool {j+1}: {str(tool_call)}\n") # If it's a tool message, show which tool it came from if hasattr(message, 'tool_call_id'): f.write(f"Tool Call ID: {message.tool_call_id}\n") f.write("\n") f.write("="*60 + "\n") f.write(f"FINAL ANSWER: {answer}\n") f.write("="*60 + "\n") print(f"Logged full conversation to: {filename}") def classify_file_requirement(self, state: AgentState): """Check if question mentions an attached file""" messages = state["messages"] # Get the original question from first message if messages and isinstance(messages[0], HumanMessage): question = messages[0].content.lower() # Simple keyword check for file attachments file_keywords = ["attached", "attachment", "see the file", "in the file", "i've attached", "attached as", "attached file"] requires_file = any(keyword in question for keyword in file_keywords) return {"requires_file": requires_file} return {"requires_file": False} def download_file_content(self, state: AgentState): """Download and add file content to messages""" task_id = state.get("task_id") if not task_id: # Add error message return { "messages": [HumanMessage(content="Error: No task_id provided for file download")] } try: # Use the download tool directly file_result = download_and_process_file.invoke({"task_id": task_id}) # Add file content as a system message return { "messages": [HumanMessage(content=f"File content:\n{file_result}")] } except Exception as e: return { "messages": [HumanMessage(content=f"Error downloading file: {str(e)}")] } def call_model(self, state: AgentState): """Call the model with tools - it will decide what to do""" messages = state["messages"] response = self.chat_with_tools.invoke(messages) return {"messages": [response]} def analyze_tool_results(self, state: AgentState): """Analyze if tool results are sufficient to answer the question""" analysis_prompt = """Based on the tool results above, think through the following: 1. Do you have enough information to answer the original question? 2. Are the tool results relevant and helpful? 3. Do you need to use another tool to get more information? If you consider that you don't need to use another tool, then try to answer the question based on what infos you have, the best you can. Think about the fact that the answer may formulated using synonyms or similar words to the ones used in the question. Even if you are not able to youtube video, the result may be in the description of the video. Provide your reasoning and conclude with either: - "READY_TO_ANSWER" if you have sufficient information - "NEED_MORE_TOOLS" if you need additional tool calls Format your response as: REASONING: [your analysis here] CONCLUSION: [READY_TO_ANSWER or NEED_MORE_TOOLS]""" messages = state["messages"] + [HumanMessage(content=analysis_prompt)] response = self.chat.invoke(messages) # Add the analysis to messages return {"messages": [response]} def route_after_analysis(self, state: AgentState) -> str: """Route based on whether we can answer or need more tools""" messages = state["messages"] # Get the last message (should be the analysis) if messages: last_message = messages[-1] if isinstance(last_message, AIMessage): content = last_message.content.upper() # Check if ready to answer if "READY_TO_ANSWER" in content: return "extract_answer" elif "NEED_MORE_TOOLS" in content: return "call_model" # Default: try to answer return "extract_answer" def extract_final_answer(self, state: AgentState): """Extract ONLY the final answer from the conversation""" # Create a dedicated extraction prompt that looks at the entire conversation extraction_prompt = """Based on all the information gathered above, provide ONLY the final answer to the original question. Rules: - Return ONLY the answer with NO explanations, sentences, or extra words - If the answer is a number, write it in digits only - No punctuation unless it's part of the answer - No phrases like "The answer is" or "Based on..." Examples: - Question: "What is the capital of France?" → Answer: Paris - Question: "How much is 2+2?" → Answer: 4 - Question: "What is the opposite of left?" → Answer: right Final answer only:""" try: # Use the full conversation context for extraction messages = state["messages"] + [HumanMessage(content=extraction_prompt)] response = self.chat.invoke(messages) answer = response.content.strip() # Return dict to update state (LangGraph requirement) return {"final_answer": answer} except Exception as e: print(f"Answer extraction error: {e}") # Fallback: get the last AI message content messages = state["messages"] for msg in reversed(messages): if isinstance(msg, AIMessage) and not getattr(msg, 'tool_calls', None): return {"final_answer": msg.content.strip()} return {"final_answer": "No answer found"} def route_after_classification(self, state: AgentState) -> str: """Route based on file requirement""" if state.get("requires_file"): return "download_file" else: return "call_model" def _build_graph(self): """Build LangGraph workflow with reasoning/analysis step""" graph = StateGraph(AgentState) # Add nodes graph.add_node("classify_file", self.classify_file_requirement) graph.add_node("download_file", self.download_file_content) graph.add_node("call_model", self.call_model) graph.add_node("tools", ToolNode(self.tools)) graph.add_node("analyze_results", self.analyze_tool_results) graph.add_node("extract_answer", self.extract_final_answer) # Define the flow graph.add_edge(START, "classify_file") # After classification, either download file or go to model graph.add_conditional_edges( "classify_file", self.route_after_classification, { "download_file": "download_file", "call_model": "call_model" } ) # After downloading file, call model graph.add_edge("download_file", "call_model") # After model call, check if tools were called graph.add_conditional_edges( "call_model", tools_condition, # Built-in function that checks for tool calls { "tools": "tools", # If tools called, execute them END: "extract_answer", # No tools, go straight to answer } ) # After tools execute, analyze the results graph.add_edge("tools", "analyze_results") # After analysis, decide next step graph.add_conditional_edges( "analyze_results", self.route_after_analysis, { "extract_answer": "extract_answer", # Ready to answer "call_model": "call_model", # Need more tools } ) # After extracting answer, we're done graph.add_edge("extract_answer", END) # Compile the graph self.compiled_graph = graph.compile() def __call__(self, question: str, task_id: Optional[str] = None) -> str: """Process question using LangGraph workflow""" print(f"Processing: {question[:50]}...") # Create initial state with just the question as a message initial_state = { "messages": [HumanMessage(content=question)], "task_id": task_id, "requires_file": None, "final_answer": None } try: # Run the graph with Langfuse tracing config = {"recursion_limit": 25} # Add Langfuse callback handler if available if self.langfuse_handler: config["callbacks"] = [self.langfuse_handler] print("🔌 Running with Langfuse tracing enabled") result = self.compiled_graph.invoke(initial_state, config=config) # Extract the final answer from the state answer = result.get("final_answer", "No answer found") print(f"Answer: {answer[:50]}...") # Log the complete conversation for review self.log_full_conversation(question, result, answer) return answer except Exception as e: print(f"Error: {e}") error_answer = "Error occurred." # Create a minimal state for error logging error_state = { "question": question, "messages": [ HumanMessage(content=question), AIMessage(content=f"Error: {str(e)}") ] } self.log_full_conversation(question, error_state, error_answer) return error_answer if __name__ == "__main__": print("Testing FlexibleAgent with a simple question...") try: # Create an instance of the agent agent = FlexibleAgent() # Test with a simple math question test_question = "How much is 2+2?" print(f"\nQuestion: {test_question}") # Get the answer answer = agent(test_question) print(f"Answer: {answer}") # Check if the answer is correct if answer == "4": print("✅ Test passed! The agent correctly answered the math question.") else: print("❌ Test failed. Expected the answer to be '4'.") answer = agent("What is the surname of the equine veterinarian mentioned in 1.E Exercises from the chemistry materials licensed by Marisa Alviar-Agnew & Henry Agnew under the CK-12 license in LibreText's Introductory Chemistry materials as compiled 08/21/2023?") print(f"Answer: {answer}") if answer == "Louvrier": print("✅ Test passed! The agent correctly answered the question.") else: print("❌ Test failed. Expected the answer to contain 'Louvrier'.") answer = agent("In the video https://www.youtube.com/watch?v=L1vXCYZAYYM, what is the highest number of bird species to be on camera simultaneously?") print(f"Answer: {answer}") if answer == "3": print("✅ Test passed! The agent correctly answered the question.") else: print("❌ Test failed. Expected the answer to be '3'.") except Exception as e: import traceback print(f"❌ Test failed with error: {e}") print("Full traceback:") traceback.print_exc()