Final_Assignment_Template / src /flexible_agent.py
rqueraud's picture
50% mark
2099ec7
import os
import shutil
from typing import TypedDict, Optional, List, Annotated
from datetime import datetime
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langgraph.graph import START, END, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
from langfuse.langchain import CallbackHandler
try:
# Try relative imports first (when used as package)
from .tools import (
wikipedia_search, youtube_search, decode_text,
download_and_process_file, web_search, evaluate_computation
)
except ImportError:
# Fall back to absolute imports (when run directly)
from tools import (
wikipedia_search, youtube_search, decode_text,
download_and_process_file, web_search, evaluate_computation
)
from langchain_google_genai import ChatGoogleGenerativeAI
# --- Agent State following LangGraph pattern ---
class AgentState(TypedDict):
# Messages for LLM interactions (includes question and all conversation)
messages: Annotated[List[BaseMessage], add_messages]
# Task ID for file downloads
task_id: Optional[str]
# File classification results
requires_file: Optional[bool]
# Final answer
final_answer: Optional[str]
# --- Flexible Tool-Based Agent ---
class FlexibleAgent:
def __init__(self):
# Initialize Gemini chat model for LangChain integration
self.chat = ChatGoogleGenerativeAI(
model="gemini-2.5-flash",
temperature=0.0,
max_tokens=None
)
# Define available tools
self.tools = [
wikipedia_search,
youtube_search,
decode_text,
web_search,
download_and_process_file,
evaluate_computation
]
# Bind tools to the LLM
self.chat_with_tools = self.chat.bind_tools(self.tools)
# Initialize Langfuse CallbackHandler for tracing
try:
self.langfuse_handler = CallbackHandler()
print("βœ… Langfuse CallbackHandler initialized successfully")
except Exception as e:
print(f"⚠️ Warning: Could not initialize Langfuse CallbackHandler: {e}")
print(" Continuing without Langfuse tracing...")
self.langfuse_handler = None
# Create questions directory for logging
self.questions_dir = "questions"
# Clear previous question files
if os.path.exists(self.questions_dir):
shutil.rmtree(self.questions_dir)
os.makedirs(self.questions_dir, exist_ok=True)
self.question_counter = 0
# Build the graph following LangGraph pattern
self._build_graph()
print("FlexibleAgent initialized with Gemini LLM and LangGraph workflow.")
def log_full_conversation(self, question: str, final_state: dict, answer: str):
"""Log the complete conversation including all tool calls and LLM interactions"""
self.question_counter += 1
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"question_{self.question_counter:03d}_{timestamp}.txt"
filepath = os.path.join(self.questions_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(f"Question #{self.question_counter}\n")
f.write(f"Timestamp: {datetime.now().isoformat()}\n")
f.write(f"Question: {question}\n")
f.write("="*60 + "\n")
f.write("FULL CONVERSATION TRACE:\n")
f.write("="*60 + "\n\n")
# Log all messages in the conversation
messages = final_state.get("messages", [])
for i, message in enumerate(messages):
f.write(f"--- Message {i+1}: {type(message).__name__} ---\n")
f.write(f"Content: {message.content}\n")
# If it's an AI message with tool calls, log the tool calls
if hasattr(message, 'tool_calls') and message.tool_calls:
f.write(f"Tool Calls: {len(message.tool_calls)}\n")
for j, tool_call in enumerate(message.tool_calls):
# Handle both dict and object formats
if hasattr(tool_call, 'name'):
f.write(f" Tool {j+1}: {tool_call.name}\n")
f.write(f" Arguments: {tool_call.args}\n")
f.write(f" ID: {tool_call.id}\n")
elif isinstance(tool_call, dict):
f.write(f" Tool {j+1}: {tool_call.get('name', 'unknown')}\n")
f.write(f" Arguments: {tool_call.get('args', {})}\n")
f.write(f" ID: {tool_call.get('id', 'unknown')}\n")
else:
f.write(f" Tool {j+1}: {str(tool_call)}\n")
# If it's a tool message, show which tool it came from
if hasattr(message, 'tool_call_id'):
f.write(f"Tool Call ID: {message.tool_call_id}\n")
f.write("\n")
f.write("="*60 + "\n")
f.write(f"FINAL ANSWER: {answer}\n")
f.write("="*60 + "\n")
print(f"Logged full conversation to: {filename}")
def classify_file_requirement(self, state: AgentState):
"""Check if question mentions an attached file"""
messages = state["messages"]
# Get the original question from first message
if messages and isinstance(messages[0], HumanMessage):
question = messages[0].content.lower()
# Simple keyword check for file attachments
file_keywords = ["attached", "attachment", "see the file", "in the file",
"i've attached", "attached as", "attached file"]
requires_file = any(keyword in question for keyword in file_keywords)
return {"requires_file": requires_file}
return {"requires_file": False}
def download_file_content(self, state: AgentState):
"""Download and add file content to messages"""
task_id = state.get("task_id")
if not task_id:
# Add error message
return {
"messages": [HumanMessage(content="Error: No task_id provided for file download")]
}
try:
# Use the download tool directly
file_result = download_and_process_file.invoke({"task_id": task_id})
# Add file content as a system message
return {
"messages": [HumanMessage(content=f"File content:\n{file_result}")]
}
except Exception as e:
return {
"messages": [HumanMessage(content=f"Error downloading file: {str(e)}")]
}
def call_model(self, state: AgentState):
"""Call the model with tools - it will decide what to do"""
messages = state["messages"]
response = self.chat_with_tools.invoke(messages)
return {"messages": [response]}
def analyze_tool_results(self, state: AgentState):
"""Analyze if tool results are sufficient to answer the question"""
analysis_prompt = """Based on the tool results above, think through the following:
1. Do you have enough information to answer the original question?
2. Are the tool results relevant and helpful?
3. Do you need to use another tool to get more information?
If you consider that you don't need to use another tool, then try to answer the question based on what infos you have, the best you can.
Think about the fact that the answer may formulated using synonyms or similar words to the ones used in the question.
Even if you are not able to youtube video, the result may be in the description of the video.
Provide your reasoning and conclude with either:
- "READY_TO_ANSWER" if you have sufficient information
- "NEED_MORE_TOOLS" if you need additional tool calls
Format your response as:
REASONING: [your analysis here]
CONCLUSION: [READY_TO_ANSWER or NEED_MORE_TOOLS]"""
messages = state["messages"] + [HumanMessage(content=analysis_prompt)]
response = self.chat.invoke(messages)
# Add the analysis to messages
return {"messages": [response]}
def route_after_analysis(self, state: AgentState) -> str:
"""Route based on whether we can answer or need more tools"""
messages = state["messages"]
# Get the last message (should be the analysis)
if messages:
last_message = messages[-1]
if isinstance(last_message, AIMessage):
content = last_message.content.upper()
# Check if ready to answer
if "READY_TO_ANSWER" in content:
return "extract_answer"
elif "NEED_MORE_TOOLS" in content:
return "call_model"
# Default: try to answer
return "extract_answer"
def extract_final_answer(self, state: AgentState):
"""Extract ONLY the final answer from the conversation"""
# Create a dedicated extraction prompt that looks at the entire conversation
extraction_prompt = """Based on all the information gathered above, provide ONLY the final answer to the original question.
Rules:
- Return ONLY the answer with NO explanations, sentences, or extra words
- If the answer is a number, write it in digits only
- No punctuation unless it's part of the answer
- No phrases like "The answer is" or "Based on..."
Examples:
- Question: "What is the capital of France?" β†’ Answer: Paris
- Question: "How much is 2+2?" β†’ Answer: 4
- Question: "What is the opposite of left?" β†’ Answer: right
Final answer only:"""
try:
# Use the full conversation context for extraction
messages = state["messages"] + [HumanMessage(content=extraction_prompt)]
response = self.chat.invoke(messages)
answer = response.content.strip()
# Return dict to update state (LangGraph requirement)
return {"final_answer": answer}
except Exception as e:
print(f"Answer extraction error: {e}")
# Fallback: get the last AI message content
messages = state["messages"]
for msg in reversed(messages):
if isinstance(msg, AIMessage) and not getattr(msg, 'tool_calls', None):
return {"final_answer": msg.content.strip()}
return {"final_answer": "No answer found"}
def route_after_classification(self, state: AgentState) -> str:
"""Route based on file requirement"""
if state.get("requires_file"):
return "download_file"
else:
return "call_model"
def _build_graph(self):
"""Build LangGraph workflow with reasoning/analysis step"""
graph = StateGraph(AgentState)
# Add nodes
graph.add_node("classify_file", self.classify_file_requirement)
graph.add_node("download_file", self.download_file_content)
graph.add_node("call_model", self.call_model)
graph.add_node("tools", ToolNode(self.tools))
graph.add_node("analyze_results", self.analyze_tool_results)
graph.add_node("extract_answer", self.extract_final_answer)
# Define the flow
graph.add_edge(START, "classify_file")
# After classification, either download file or go to model
graph.add_conditional_edges(
"classify_file",
self.route_after_classification,
{
"download_file": "download_file",
"call_model": "call_model"
}
)
# After downloading file, call model
graph.add_edge("download_file", "call_model")
# After model call, check if tools were called
graph.add_conditional_edges(
"call_model",
tools_condition, # Built-in function that checks for tool calls
{
"tools": "tools", # If tools called, execute them
END: "extract_answer", # No tools, go straight to answer
}
)
# After tools execute, analyze the results
graph.add_edge("tools", "analyze_results")
# After analysis, decide next step
graph.add_conditional_edges(
"analyze_results",
self.route_after_analysis,
{
"extract_answer": "extract_answer", # Ready to answer
"call_model": "call_model", # Need more tools
}
)
# After extracting answer, we're done
graph.add_edge("extract_answer", END)
# Compile the graph
self.compiled_graph = graph.compile()
def __call__(self, question: str, task_id: Optional[str] = None) -> str:
"""Process question using LangGraph workflow"""
print(f"Processing: {question[:50]}...")
# Create initial state with just the question as a message
initial_state = {
"messages": [HumanMessage(content=question)],
"task_id": task_id,
"requires_file": None,
"final_answer": None
}
try:
# Run the graph with Langfuse tracing
config = {"recursion_limit": 25}
# Add Langfuse callback handler if available
if self.langfuse_handler:
config["callbacks"] = [self.langfuse_handler]
print("πŸ”Œ Running with Langfuse tracing enabled")
result = self.compiled_graph.invoke(initial_state, config=config)
# Extract the final answer from the state
answer = result.get("final_answer", "No answer found")
print(f"Answer: {answer[:50]}...")
# Log the complete conversation for review
self.log_full_conversation(question, result, answer)
return answer
except Exception as e:
print(f"Error: {e}")
error_answer = "Error occurred."
# Create a minimal state for error logging
error_state = {
"question": question,
"messages": [
HumanMessage(content=question),
AIMessage(content=f"Error: {str(e)}")
]
}
self.log_full_conversation(question, error_state, error_answer)
return error_answer
if __name__ == "__main__":
print("Testing FlexibleAgent with a simple question...")
try:
# Create an instance of the agent
agent = FlexibleAgent()
# Test with a simple math question
test_question = "How much is 2+2?"
print(f"\nQuestion: {test_question}")
# Get the answer
answer = agent(test_question)
print(f"Answer: {answer}")
# Check if the answer is correct
if answer == "4":
print("βœ… Test passed! The agent correctly answered the math question.")
else:
print("❌ Test failed. Expected the answer to be '4'.")
answer = agent("What is the surname of the equine veterinarian mentioned in 1.E Exercises from the chemistry materials licensed by Marisa Alviar-Agnew & Henry Agnew under the CK-12 license in LibreText's Introductory Chemistry materials as compiled 08/21/2023?")
print(f"Answer: {answer}")
if answer == "Louvrier":
print("βœ… Test passed! The agent correctly answered the question.")
else:
print("❌ Test failed. Expected the answer to contain 'Louvrier'.")
answer = agent("In the video https://www.youtube.com/watch?v=L1vXCYZAYYM, what is the highest number of bird species to be on camera simultaneously?")
print(f"Answer: {answer}")
if answer == "3":
print("βœ… Test passed! The agent correctly answered the question.")
else:
print("❌ Test failed. Expected the answer to be '3'.")
except Exception as e:
import traceback
print(f"❌ Test failed with error: {e}")
print("Full traceback:")
traceback.print_exc()