"""ReAct Agent implementation.""" import re from typing import List, Dict, Any, Optional from langchain_openai import ChatOpenAI from langchain_core.messages import SystemMessage, HumanMessage, AIMessage from tools.word_counter import WordCounter from tools.keyword_extractor import KeywordExtractor from tools.sentiment_analyzer import SentimentAnalyzer from agent.prompts import get_system_prompt, get_user_prompt class ReActAgent: """ReAct Agent for text analysis.""" def __init__(self, api_key: str, model: str = "gpt-4-turbo-preview", max_iterations: int = 10): """Initialize the ReAct agent. Args: api_key: OpenAI API key model: Model to use max_iterations: Maximum number of reasoning iterations """ # Initialize LangChain ChatOpenAI with LangSmith tracing self.llm = ChatOpenAI( model=model, openai_api_key=api_key, max_tokens=2048, temperature=0 ) self.model = model self.max_iterations = max_iterations # Initialize tools self.tools = { "word_counter": WordCounter(), "keyword_extractor": KeywordExtractor(), "sentiment_analyzer": SentimentAnalyzer() } # Generate tools description self.tools_description = self._generate_tools_description() def _generate_tools_description(self) -> str: """Generate description of all available tools. Returns: Formatted string describing all tools """ descriptions = [] for tool_name, tool in self.tools.items(): descriptions.append(f"- {tool_name}: {tool.description}") return "\n".join(descriptions) def _parse_action(self, text: str) -> Optional[tuple[str, str]]: """Parse action and action input from agent response. Args: text: Agent's response text Returns: Tuple of (action_name, action_input) or None if not found """ # Look for Action: and Action Input: patterns action_match = re.search(r'Action:\s*([^\n]+)', text, re.IGNORECASE) # Updated regex: match everything after "Action Input:" until end or next section # This handles cases where text is cut off by stop parameter input_match = re.search(r'Action Input:\s*(.+?)(?:\n(?:Thought|Action|Final Answer|Observation)|$)', text, re.IGNORECASE | re.DOTALL) if action_match and input_match: action = action_match.group(1).strip() action_input = input_match.group(1).strip() # Clean up the action name action = action.lower().replace(' ', '_') return action, action_input return None def _check_final_answer(self, text: str) -> Optional[str]: """Check if the response contains a final answer. Args: text: Agent's response text Returns: Final answer if found, None otherwise """ match = re.search(r'Final Answer:\s*(.+)', text, re.IGNORECASE | re.DOTALL) if match: return match.group(1).strip() return None def _execute_tool(self, tool_name: str, tool_input: str) -> Dict[str, Any]: """Execute a tool and return the result. Args: tool_name: Name of the tool to execute tool_input: Input to the tool Returns: Tool execution result """ if tool_name not in self.tools: return {"error": f"Unknown tool: {tool_name}"} try: tool = self.tools[tool_name] result = tool.run(tool_input) return result except Exception as e: return {"error": f"Tool execution failed: {str(e)}"} def run(self, question: str, text: str) -> Dict[str, Any]: """Run the ReAct agent. Args: question: User's question text: Text to analyze Returns: Dictionary containing the final answer and execution trace """ # Build initial prompt system_prompt = get_system_prompt(self.tools_description) user_prompt = get_user_prompt(question, text) # Initialize conversation conversation_history = [] trace = [] # Agent loop for iteration in range(self.max_iterations): # Call LLM try: # Build messages using LangChain format messages = [SystemMessage(content=system_prompt)] if not conversation_history: messages.append(HumanMessage(content=user_prompt)) else: messages.append(HumanMessage(content=user_prompt)) # Convert conversation history to LangChain messages for msg in conversation_history: if msg["role"] == "assistant": messages.append(AIMessage(content=msg["content"])) else: messages.append(HumanMessage(content=msg["content"])) # Call LLM with stop sequences response = self.llm.invoke( messages, stop=["Observation:", "\nObservation"] ) agent_response = response.content trace.append({ "iteration": iteration + 1, "type": "thought", "content": agent_response }) # Check for final answer final_answer = self._check_final_answer(agent_response) if final_answer: return { "answer": final_answer, "trace": trace, "iterations": iteration + 1, "success": True } # Parse and execute action action_result = self._parse_action(agent_response) if action_result: action_name, action_input = action_result # Execute tool observation = self._execute_tool(action_name, action_input) trace.append({ "iteration": iteration + 1, "type": "action", "action": action_name, "input": action_input, "output": observation }) # Add to conversation history conversation_history.append({ "role": "assistant", "content": agent_response }) conversation_history.append({ "role": "user", "content": f"Observation: {observation}" }) else: # No action found and no final answer if agent_response.strip(): # Only add if response is not empty conversation_history.append({ "role": "assistant", "content": agent_response }) conversation_history.append({ "role": "user", "content": "Please continue with your analysis. Remember to use the format: Thought, Action, Action Input, or provide a Final Answer." }) else: # Empty response, likely cut by stop parameter - skip this iteration continue except Exception as e: return { "answer": f"Error: {str(e)}", "trace": trace, "iterations": iteration + 1, "success": False, "error": str(e) } # Max iterations reached return { "answer": "Maximum iterations reached without finding a final answer.", "trace": trace, "iterations": self.max_iterations, "success": False, "error": "Max iterations exceeded" }