Spaces:
Sleeping
Sleeping
File size: 8,367 Bytes
a01e687 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
"""ReAct Agent implementation."""
import re
from typing import List, Dict, Any, Optional
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from tools.word_counter import WordCounter
from tools.keyword_extractor import KeywordExtractor
from tools.sentiment_analyzer import SentimentAnalyzer
from agent.prompts import get_system_prompt, get_user_prompt
class ReActAgent:
"""ReAct Agent for text analysis."""
def __init__(self, api_key: str, model: str = "gpt-4-turbo-preview", max_iterations: int = 10):
"""Initialize the ReAct agent.
Args:
api_key: OpenAI API key
model: Model to use
max_iterations: Maximum number of reasoning iterations
"""
# Initialize LangChain ChatOpenAI with LangSmith tracing
self.llm = ChatOpenAI(
model=model,
openai_api_key=api_key,
max_tokens=2048,
temperature=0
)
self.model = model
self.max_iterations = max_iterations
# Initialize tools
self.tools = {
"word_counter": WordCounter(),
"keyword_extractor": KeywordExtractor(),
"sentiment_analyzer": SentimentAnalyzer()
}
# Generate tools description
self.tools_description = self._generate_tools_description()
def _generate_tools_description(self) -> str:
"""Generate description of all available tools.
Returns:
Formatted string describing all tools
"""
descriptions = []
for tool_name, tool in self.tools.items():
descriptions.append(f"- {tool_name}: {tool.description}")
return "\n".join(descriptions)
def _parse_action(self, text: str) -> Optional[tuple[str, str]]:
"""Parse action and action input from agent response.
Args:
text: Agent's response text
Returns:
Tuple of (action_name, action_input) or None if not found
"""
# Look for Action: and Action Input: patterns
action_match = re.search(r'Action:\s*([^\n]+)', text, re.IGNORECASE)
# Updated regex: match everything after "Action Input:" until end or next section
# This handles cases where text is cut off by stop parameter
input_match = re.search(r'Action Input:\s*(.+?)(?:\n(?:Thought|Action|Final Answer|Observation)|$)', text, re.IGNORECASE | re.DOTALL)
if action_match and input_match:
action = action_match.group(1).strip()
action_input = input_match.group(1).strip()
# Clean up the action name
action = action.lower().replace(' ', '_')
return action, action_input
return None
def _check_final_answer(self, text: str) -> Optional[str]:
"""Check if the response contains a final answer.
Args:
text: Agent's response text
Returns:
Final answer if found, None otherwise
"""
match = re.search(r'Final Answer:\s*(.+)', text, re.IGNORECASE | re.DOTALL)
if match:
return match.group(1).strip()
return None
def _execute_tool(self, tool_name: str, tool_input: str) -> Dict[str, Any]:
"""Execute a tool and return the result.
Args:
tool_name: Name of the tool to execute
tool_input: Input to the tool
Returns:
Tool execution result
"""
if tool_name not in self.tools:
return {"error": f"Unknown tool: {tool_name}"}
try:
tool = self.tools[tool_name]
result = tool.run(tool_input)
return result
except Exception as e:
return {"error": f"Tool execution failed: {str(e)}"}
def run(self, question: str, text: str) -> Dict[str, Any]:
"""Run the ReAct agent.
Args:
question: User's question
text: Text to analyze
Returns:
Dictionary containing the final answer and execution trace
"""
# Build initial prompt
system_prompt = get_system_prompt(self.tools_description)
user_prompt = get_user_prompt(question, text)
# Initialize conversation
conversation_history = []
trace = []
# Agent loop
for iteration in range(self.max_iterations):
# Call LLM
try:
# Build messages using LangChain format
messages = [SystemMessage(content=system_prompt)]
if not conversation_history:
messages.append(HumanMessage(content=user_prompt))
else:
messages.append(HumanMessage(content=user_prompt))
# Convert conversation history to LangChain messages
for msg in conversation_history:
if msg["role"] == "assistant":
messages.append(AIMessage(content=msg["content"]))
else:
messages.append(HumanMessage(content=msg["content"]))
# Call LLM with stop sequences
response = self.llm.invoke(
messages,
stop=["Observation:", "\nObservation"]
)
agent_response = response.content
trace.append({
"iteration": iteration + 1,
"type": "thought",
"content": agent_response
})
# Check for final answer
final_answer = self._check_final_answer(agent_response)
if final_answer:
return {
"answer": final_answer,
"trace": trace,
"iterations": iteration + 1,
"success": True
}
# Parse and execute action
action_result = self._parse_action(agent_response)
if action_result:
action_name, action_input = action_result
# Execute tool
observation = self._execute_tool(action_name, action_input)
trace.append({
"iteration": iteration + 1,
"type": "action",
"action": action_name,
"input": action_input,
"output": observation
})
# Add to conversation history
conversation_history.append({
"role": "assistant",
"content": agent_response
})
conversation_history.append({
"role": "user",
"content": f"Observation: {observation}"
})
else:
# No action found and no final answer
if agent_response.strip(): # Only add if response is not empty
conversation_history.append({
"role": "assistant",
"content": agent_response
})
conversation_history.append({
"role": "user",
"content": "Please continue with your analysis. Remember to use the format: Thought, Action, Action Input, or provide a Final Answer."
})
else:
# Empty response, likely cut by stop parameter - skip this iteration
continue
except Exception as e:
return {
"answer": f"Error: {str(e)}",
"trace": trace,
"iterations": iteration + 1,
"success": False,
"error": str(e)
}
# Max iterations reached
return {
"answer": "Maximum iterations reached without finding a final answer.",
"trace": trace,
"iterations": self.max_iterations,
"success": False,
"error": "Max iterations exceeded"
}
|