Spaces:
Sleeping
Sleeping
| """ReAct Agent implementation.""" | |
| import re | |
| from typing import List, Dict, Any, Optional | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
| from tools.word_counter import WordCounter | |
| from tools.keyword_extractor import KeywordExtractor | |
| from tools.sentiment_analyzer import SentimentAnalyzer | |
| from agent.prompts import get_system_prompt, get_user_prompt | |
| class ReActAgent: | |
| """ReAct Agent for text analysis.""" | |
| def __init__(self, api_key: str, model: str = "gpt-4-turbo-preview", max_iterations: int = 10): | |
| """Initialize the ReAct agent. | |
| Args: | |
| api_key: OpenAI API key | |
| model: Model to use | |
| max_iterations: Maximum number of reasoning iterations | |
| """ | |
| # Initialize LangChain ChatOpenAI with LangSmith tracing | |
| self.llm = ChatOpenAI( | |
| model=model, | |
| openai_api_key=api_key, | |
| max_tokens=2048, | |
| temperature=0 | |
| ) | |
| self.model = model | |
| self.max_iterations = max_iterations | |
| # Initialize tools | |
| self.tools = { | |
| "word_counter": WordCounter(), | |
| "keyword_extractor": KeywordExtractor(), | |
| "sentiment_analyzer": SentimentAnalyzer() | |
| } | |
| # Generate tools description | |
| self.tools_description = self._generate_tools_description() | |
| def _generate_tools_description(self) -> str: | |
| """Generate description of all available tools. | |
| Returns: | |
| Formatted string describing all tools | |
| """ | |
| descriptions = [] | |
| for tool_name, tool in self.tools.items(): | |
| descriptions.append(f"- {tool_name}: {tool.description}") | |
| return "\n".join(descriptions) | |
| def _parse_action(self, text: str) -> Optional[tuple[str, str]]: | |
| """Parse action and action input from agent response. | |
| Args: | |
| text: Agent's response text | |
| Returns: | |
| Tuple of (action_name, action_input) or None if not found | |
| """ | |
| # Look for Action: and Action Input: patterns | |
| action_match = re.search(r'Action:\s*([^\n]+)', text, re.IGNORECASE) | |
| # Updated regex: match everything after "Action Input:" until end or next section | |
| # This handles cases where text is cut off by stop parameter | |
| input_match = re.search(r'Action Input:\s*(.+?)(?:\n(?:Thought|Action|Final Answer|Observation)|$)', text, re.IGNORECASE | re.DOTALL) | |
| if action_match and input_match: | |
| action = action_match.group(1).strip() | |
| action_input = input_match.group(1).strip() | |
| # Clean up the action name | |
| action = action.lower().replace(' ', '_') | |
| return action, action_input | |
| return None | |
| def _check_final_answer(self, text: str) -> Optional[str]: | |
| """Check if the response contains a final answer. | |
| Args: | |
| text: Agent's response text | |
| Returns: | |
| Final answer if found, None otherwise | |
| """ | |
| match = re.search(r'Final Answer:\s*(.+)', text, re.IGNORECASE | re.DOTALL) | |
| if match: | |
| return match.group(1).strip() | |
| return None | |
| def _execute_tool(self, tool_name: str, tool_input: str) -> Dict[str, Any]: | |
| """Execute a tool and return the result. | |
| Args: | |
| tool_name: Name of the tool to execute | |
| tool_input: Input to the tool | |
| Returns: | |
| Tool execution result | |
| """ | |
| if tool_name not in self.tools: | |
| return {"error": f"Unknown tool: {tool_name}"} | |
| try: | |
| tool = self.tools[tool_name] | |
| result = tool.run(tool_input) | |
| return result | |
| except Exception as e: | |
| return {"error": f"Tool execution failed: {str(e)}"} | |
| def run(self, question: str, text: str) -> Dict[str, Any]: | |
| """Run the ReAct agent. | |
| Args: | |
| question: User's question | |
| text: Text to analyze | |
| Returns: | |
| Dictionary containing the final answer and execution trace | |
| """ | |
| # Build initial prompt | |
| system_prompt = get_system_prompt(self.tools_description) | |
| user_prompt = get_user_prompt(question, text) | |
| # Initialize conversation | |
| conversation_history = [] | |
| trace = [] | |
| # Agent loop | |
| for iteration in range(self.max_iterations): | |
| # Call LLM | |
| try: | |
| # Build messages using LangChain format | |
| messages = [SystemMessage(content=system_prompt)] | |
| if not conversation_history: | |
| messages.append(HumanMessage(content=user_prompt)) | |
| else: | |
| messages.append(HumanMessage(content=user_prompt)) | |
| # Convert conversation history to LangChain messages | |
| for msg in conversation_history: | |
| if msg["role"] == "assistant": | |
| messages.append(AIMessage(content=msg["content"])) | |
| else: | |
| messages.append(HumanMessage(content=msg["content"])) | |
| # Call LLM with stop sequences | |
| response = self.llm.invoke( | |
| messages, | |
| stop=["Observation:", "\nObservation"] | |
| ) | |
| agent_response = response.content | |
| trace.append({ | |
| "iteration": iteration + 1, | |
| "type": "thought", | |
| "content": agent_response | |
| }) | |
| # Check for final answer | |
| final_answer = self._check_final_answer(agent_response) | |
| if final_answer: | |
| return { | |
| "answer": final_answer, | |
| "trace": trace, | |
| "iterations": iteration + 1, | |
| "success": True | |
| } | |
| # Parse and execute action | |
| action_result = self._parse_action(agent_response) | |
| if action_result: | |
| action_name, action_input = action_result | |
| # Execute tool | |
| observation = self._execute_tool(action_name, action_input) | |
| trace.append({ | |
| "iteration": iteration + 1, | |
| "type": "action", | |
| "action": action_name, | |
| "input": action_input, | |
| "output": observation | |
| }) | |
| # Add to conversation history | |
| conversation_history.append({ | |
| "role": "assistant", | |
| "content": agent_response | |
| }) | |
| conversation_history.append({ | |
| "role": "user", | |
| "content": f"Observation: {observation}" | |
| }) | |
| else: | |
| # No action found and no final answer | |
| if agent_response.strip(): # Only add if response is not empty | |
| conversation_history.append({ | |
| "role": "assistant", | |
| "content": agent_response | |
| }) | |
| conversation_history.append({ | |
| "role": "user", | |
| "content": "Please continue with your analysis. Remember to use the format: Thought, Action, Action Input, or provide a Final Answer." | |
| }) | |
| else: | |
| # Empty response, likely cut by stop parameter - skip this iteration | |
| continue | |
| except Exception as e: | |
| return { | |
| "answer": f"Error: {str(e)}", | |
| "trace": trace, | |
| "iterations": iteration + 1, | |
| "success": False, | |
| "error": str(e) | |
| } | |
| # Max iterations reached | |
| return { | |
| "answer": "Maximum iterations reached without finding a final answer.", | |
| "trace": trace, | |
| "iterations": self.max_iterations, | |
| "success": False, | |
| "error": "Max iterations exceeded" | |
| } | |