|
|
|
|
|
import json |
|
|
from typing import Dict, Any, List |
|
|
import time |
|
|
import requests |
|
|
import os |
|
|
from .base_agent import BaseAgent, AgentConfig, AgentResponse, TaskInput |
|
|
|
|
|
|
|
|
|
|
|
class InformationSeekerAgent(BaseAgent): |
|
|
""" |
|
|
Information Seeker Agent that follows ReAct pattern (Reasoning + Acting) |
|
|
|
|
|
This agent takes decomposed sub-questions or tasks from parent agents, |
|
|
thinks interleaved (reasoning -> action -> reasoning -> action), |
|
|
uses MCP tools to gather information, and returns structured results. |
|
|
""" |
|
|
|
|
|
def __init__(self, config: AgentConfig = None, shared_mcp_client=None): |
|
|
|
|
|
if config is None: |
|
|
config = AgentConfig(agent_name="InformationSeekerAgent") |
|
|
elif config.agent_name == "base_agent": |
|
|
config.agent_name = "InformationSeekerAgent" |
|
|
|
|
|
super().__init__(config, shared_mcp_client) |
|
|
|
|
|
def _build_system_prompt(self) -> str: |
|
|
"""Build the system prompt for the ReAct agent""" |
|
|
tool_schemas_str = json.dumps(self.tool_schemas, ensure_ascii=False) |
|
|
system_prompt_template = """You are an Information Seeker Agent that follows the ReAct pattern (Reasoning + Acting). |
|
|
|
|
|
Your role is to: |
|
|
1. Take decomposed sub-questions or tasks from parent agents |
|
|
2. Think step-by-step through reasoning |
|
|
3. Use available tools to gather information when needed |
|
|
4. Continue reasoning based on tool results |
|
|
5. Repeat this process until you have sufficient information |
|
|
6. Call info_seeker_objective_task_done to provide a structured summary |
|
|
|
|
|
### Optimized Workflow: |
|
|
Follow this optimized workflow for information gathering: |
|
|
|
|
|
1. INITIAL RESEARCH: |
|
|
- Use `batch_web_search` to find relevant URLs for your queries. When calling the search statement, consider the language of the user's question. For example, for a Chinese question, generate a part of the search statement in Chinese. |
|
|
- Analyze the search results (titles, snippets, URLs) to identify promising sources |
|
|
|
|
|
2. CONTENT EXTRACTION: |
|
|
- For important URLs, use `url_crawler` to: |
|
|
a) Extract full content from the webpage |
|
|
b) Save the content to a file in the workspace |
|
|
- Store results with meaningful file paths (e.g., \"research/ai_trends_2024.txt\") |
|
|
|
|
|
3. CONTENT ANALYSIS: |
|
|
- Use `document_qa` to ask specific questions about the saved files: |
|
|
a) Formulate focused questions to extract key insights |
|
|
b) Use answers to deepen your understanding |
|
|
- You can ask multiple questions about the same file |
|
|
|
|
|
4. FILE MANAGEMENT: |
|
|
- Use `file_write` to save important findings or summaries |
|
|
- For reviewing saved content: |
|
|
a) Prefer `document_qa` to ask specific questions about the content |
|
|
b) Use `file_read` ONLY for small files (<1000 tokens) when you need the entire content |
|
|
c) Avoid reading large files directly as it may exceed context limits |
|
|
|
|
|
5. TASK COMPLETION: |
|
|
- When ready to report, call `info_seeker_objective_task_done` with: |
|
|
a) Comprehensive markdown summary of your process and findings |
|
|
b) List of key files created with descriptions |
|
|
|
|
|
### Usage of Systematic Tool: |
|
|
- `think` is a systematic tool. After receiving the response from the complex tool or before invoking any other tools, you must **first invoke the `think` tool**: to deeply reflect on the results of previous tool invocations (if any), and to thoroughly consider and plan the user's task. The `think` tool does not acquire new information; it only saves your thoughts into memory. |
|
|
- `reflect` is a systematic tool. When encountering a failure in tool execution, it is necessary to invoke the reflect tool to conduct a review and revise the task plan. It does not acquire new information; it only saves your thoughts into memory. |
|
|
|
|
|
Always provide clear reasoning for your actions and synthesize information effectively. |
|
|
|
|
|
Below, within the <tools></tools> tags, are the descriptions of each tool and the required fields for invocation: |
|
|
<tools> |
|
|
$tool_schemas |
|
|
</tools> |
|
|
For each function call, return a JSON object placed within the [unused11][unused12] tags, which includes the function name and the corresponding function arguments: |
|
|
[unused11][{\"name\": <function name>, \"arguments\": <args json object>}][unused12] |
|
|
""" |
|
|
return system_prompt_template.replace("$tool_schemas", tool_schemas_str) |
|
|
|
|
|
@staticmethod |
|
|
def _build_initial_message_from_task_input(task_input: TaskInput) -> str: |
|
|
"""Build the initial user message from TaskInput""" |
|
|
message = task_input.format_for_prompt() |
|
|
|
|
|
message += "\nPlease analyze this task and start your ReAct process:\n" |
|
|
message += "1. Reason about what information you need to gather\n" |
|
|
message += "2. Use appropriate tools to get that information\n" |
|
|
message += "3. Continue reasoning and acting until you have sufficient information\n" |
|
|
message += "4. Call task_done when ready to provide your complete findings\n\n" |
|
|
message += "Begin with your initial reasoning about the task." |
|
|
|
|
|
return message |
|
|
|
|
|
def execute_task(self, task_input: TaskInput) -> AgentResponse: |
|
|
""" |
|
|
Execute a task using ReAct pattern (Reasoning + Acting) |
|
|
|
|
|
Args: |
|
|
task_input: TaskInput object with standardized task information |
|
|
|
|
|
Returns: |
|
|
AgentResponse with results and process trace |
|
|
""" |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
self.logger.info(f"Starting information seeker task: {task_input.task_content}") |
|
|
|
|
|
|
|
|
self.reset_trace() |
|
|
|
|
|
|
|
|
conversation_history = [] |
|
|
|
|
|
|
|
|
system_prompt = self._build_system_prompt() |
|
|
|
|
|
|
|
|
user_message = self._build_initial_message_from_task_input(task_input) |
|
|
|
|
|
|
|
|
|
|
|
conversation_history.append({"role": "system", "content": system_prompt}) |
|
|
conversation_history.append({"role": "user", "content": user_message+" /no_think"}) |
|
|
|
|
|
|
|
|
iteration = 0 |
|
|
task_completed = False |
|
|
|
|
|
from config.config import get_config |
|
|
config = get_config() |
|
|
model_config = config.get_custom_llm_config() |
|
|
|
|
|
pangu_url = model_config.get('url') or os.getenv('MODEL_REQUEST_URL', '') |
|
|
model_token = model_config.get('token') or os.getenv('MODEL_REQUEST_TOKEN', '') |
|
|
headers = {'Content-Type': 'application/json', 'csb-token': model_token} |
|
|
|
|
|
|
|
|
while iteration < self.config.max_iterations and not task_completed: |
|
|
iteration += 1 |
|
|
self.logger.info(f"Planning iteration {iteration}") |
|
|
|
|
|
try: |
|
|
|
|
|
retry_num = 1 |
|
|
max_retry_num = 10 |
|
|
while retry_num < max_retry_num: |
|
|
try: |
|
|
response = requests.post( |
|
|
url=pangu_url, |
|
|
headers=headers, |
|
|
json={ |
|
|
"model": self.config.model, |
|
|
"chat_template": "{% for message in messages %}{% if loop.first and messages[0]['role'] != 'system' %}{{ '<s>[unused9]系统:[unused10]' }}{% endif %}{% if message['role'] == 'system' %}{{'<s>[unused9]系统:' + message['content'] + '[unused10]'}}{% endif %}{% if message['role'] == 'assistant' %}{{'[unused9]助手:' + message['content'] + '[unused10]'}}{% endif %}{% if message['role'] == 'tool' %}{{'[unused9]工具:' + message['content'] + '[unused10]'}}{% endif %}{% if message['role'] == 'function' %}{{'[unused9]方法:' + message['content'] + '[unused10]'}}{% endif %}{% if message['role'] == 'user' %}{{'[unused9]用户:' + message['content'] + '[unused10]'}}{% endif %}{% endfor %}{% if add_generation_prompt %}{{ '[unused9]助手:' }}{% endif %}", |
|
|
"messages": conversation_history, |
|
|
"temperature": self.config.temperature, |
|
|
"spaces_between_special_tokens": False, |
|
|
"max_tokens": self.config.max_tokens, |
|
|
}, |
|
|
timeout=model_config.get("timeout", 180) |
|
|
) |
|
|
response = response.json() |
|
|
self.logger.debug(f"API response received") |
|
|
break |
|
|
except Exception as e: |
|
|
time.sleep(3) |
|
|
retry_num += 1 |
|
|
if retry_num == max_retry_num: |
|
|
raise ValueError(str(e)) |
|
|
continue |
|
|
|
|
|
assistant_message = response["choices"][0]["message"] |
|
|
|
|
|
|
|
|
try: |
|
|
if assistant_message["content"]: |
|
|
reasoning_content = assistant_message["content"].split("[unused16]")[-1].split("[unused17]")[0] |
|
|
if len(reasoning_content) > 0: |
|
|
self.log_reasoning(iteration, reasoning_content) |
|
|
except Exception as e: |
|
|
self.logger.warning(f"Tool call parsing error: {e}") |
|
|
|
|
|
followup_prompt = f"There is a problem with the format of model generation: {e}. Please try again." |
|
|
conversation_history.append({"role": "user", "content": followup_prompt + " /no_think"}) |
|
|
continue |
|
|
|
|
|
def extract_tool_calls(content): |
|
|
import re |
|
|
if not content: |
|
|
return [] |
|
|
tool_call_str = re.findall(r"\[unused11\]([\s\S]*?)\[unused12\]", content) |
|
|
if len(tool_call_str) > 0: |
|
|
try: |
|
|
tool_calls = json.loads(tool_call_str[0].strip()) |
|
|
except: |
|
|
return [] |
|
|
else: |
|
|
return [] |
|
|
return tool_calls |
|
|
|
|
|
|
|
|
conversation_history.append({ |
|
|
"role": "assistant", |
|
|
"content": assistant_message["content"] |
|
|
}) |
|
|
|
|
|
tool_calls = extract_tool_calls(assistant_message["content"]) |
|
|
|
|
|
|
|
|
|
|
|
for tool_call in tool_calls: |
|
|
arguments = tool_call["arguments"] |
|
|
|
|
|
|
|
|
if tool_call["name"] in ["info_seeker_objective_task_done"]: |
|
|
task_completed = True |
|
|
self.log_action(iteration, tool_call["name"], arguments, arguments) |
|
|
break |
|
|
if tool_call["name"] in ["think", "reflect"]: |
|
|
tool_result = {"tool_results": "You can proceed to invoke other tools if needed."} |
|
|
else: |
|
|
tool_result = self.execute_tool_call(tool_call) |
|
|
|
|
|
|
|
|
self.log_action(iteration, tool_call["name"], arguments, tool_result) |
|
|
|
|
|
|
|
|
conversation_history.append({ |
|
|
"role": "tool", |
|
|
"content": json.dumps(tool_result, ensure_ascii=False, indent=2) + " /no_think" |
|
|
}) |
|
|
|
|
|
|
|
|
if len(tool_calls) == 0: |
|
|
|
|
|
followup_prompt = ( |
|
|
"Continue your planning process. Use available tools to assign tasks to agents, " |
|
|
"search for information, or coordinate work. When you have a complete answer, " |
|
|
"call info_seeker_objective_task_done. /no_think" |
|
|
) |
|
|
conversation_history.append({"role": "user", "content": followup_prompt}) |
|
|
if iteration == self.config.max_iterations-3: |
|
|
followup_prompt = "Due to length and number of rounds restrictions, you must now call the `info_seeker_objective_task_done` tool to report the completion of your task. /no_think" |
|
|
conversation_history.append({"role": "user", "content": followup_prompt}) |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Error in planning iteration {iteration}: {e}" |
|
|
self.log_error(iteration, error_msg) |
|
|
break |
|
|
|
|
|
execution_time = time.time() - start_time |
|
|
|
|
|
if task_completed: |
|
|
|
|
|
task_done_result = None |
|
|
for step in reversed(self.reasoning_trace): |
|
|
if step.get("type") == "action" and step.get("tool") == "info_seeker_objective_task_done": |
|
|
task_done_result = step.get("result") |
|
|
break |
|
|
|
|
|
return self.create_response( |
|
|
success=True, |
|
|
result=task_done_result, |
|
|
iterations=iteration, |
|
|
execution_time=execution_time |
|
|
) |
|
|
else: |
|
|
return self.create_response( |
|
|
success=False, |
|
|
error=f"Task not completed within {self.config.max_iterations} iterations", |
|
|
iterations=iteration, |
|
|
execution_time=execution_time |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
execution_time = time.time() - start_time |
|
|
self.logger.error(f"Error in execute_task: {e}") |
|
|
return self.create_response( |
|
|
success=False, |
|
|
error=str(e), |
|
|
iterations=iteration if 'iteration' in locals() else 0, |
|
|
execution_time=execution_time |
|
|
) |
|
|
|
|
|
def _build_agent_specific_tool_schemas(self) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Build tool schemas for InformationSeekerAgent using proper MCP architecture. |
|
|
Schemas come from MCP server via client, not direct imports. |
|
|
""" |
|
|
|
|
|
|
|
|
schemas = super()._build_agent_specific_tool_schemas() |
|
|
|
|
|
|
|
|
builtin_assignment_schemas = [ |
|
|
{ |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": "think", |
|
|
"description": "Use the tool to think about something. It will not obtain new information or make any changes to the repository, but just log the thought. Use it when complex reasoning or brainstorming is needed.", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"thought": { |
|
|
"type": "string", |
|
|
"description": "Your thoughts." |
|
|
} |
|
|
}, |
|
|
"required": ["thought"] |
|
|
} |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": "reflect", |
|
|
"description": "When multiple attempts yield no progress, use this tool to reflect on previous reasoning and planning, considering possible overlooked clues and exploring more possibilities. It will not obtain new information or make any changes to the repository.", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"reflect": { |
|
|
"type": "string", |
|
|
"description": "The specific content of your reflection" |
|
|
} |
|
|
}, |
|
|
"required": ["reflect"] |
|
|
} |
|
|
} |
|
|
}, |
|
|
{ |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": "info_seeker_objective_task_done", |
|
|
"description": "Structured reporting of task completion details including summary, decisions, outputs, and status", |
|
|
"inputSchema": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"task_summary": { |
|
|
"type": "string", |
|
|
"description": "Comprehensive markdown covering what the agent was asked to do, steps taken, tools used, key findings, files created, challenges, and final deliverables.", |
|
|
"format": "markdown" |
|
|
}, |
|
|
"task_name": { |
|
|
"type": "string", |
|
|
"description": "The name of the task currently assigned to the agent, usually with underscores (e.g., 'web_research_ai_trends')" |
|
|
}, |
|
|
"key_files": { |
|
|
"type": "array", |
|
|
"items": { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"file_path": { |
|
|
"type": "string", |
|
|
"description": "Relative path to created/modified file" |
|
|
}, |
|
|
"desc": { |
|
|
"type": "string", |
|
|
"description": "File contents and creation purpose" |
|
|
}, |
|
|
"is_final_output_file": { |
|
|
"type": "boolean", |
|
|
"description": "Whether file is primary deliverable" |
|
|
} |
|
|
}, |
|
|
"required": ["file_path", "desc", "is_final_output_file"] |
|
|
}, |
|
|
"description": "List of key files generated or modified during the task, with their details." |
|
|
}, |
|
|
"completion_status": { |
|
|
"type": "string", |
|
|
"enum": ["completed", "partial", "failed"], |
|
|
"description": "Final task status" |
|
|
} |
|
|
}, |
|
|
"required": ["task_summary", "task_name", "key_files", "completion_status"] |
|
|
} |
|
|
} |
|
|
}, |
|
|
] |
|
|
|
|
|
schemas.extend(builtin_assignment_schemas) |
|
|
|
|
|
return schemas |
|
|
|
|
|
|
|
|
|
|
|
def create_objective_information_seeker( |
|
|
model: Any = None, |
|
|
max_iterations: Any = None, |
|
|
shared_mcp_client=None, |
|
|
**kwargs |
|
|
) -> InformationSeekerAgent: |
|
|
""" |
|
|
Create an InformationSeekerAgent instance with server-managed sessions. |
|
|
|
|
|
Args: |
|
|
model: The LLM model to use |
|
|
max_iterations: Maximum number of iterations |
|
|
shared_mcp_client: Optional shared MCP client from parent agent (prevents extra sessions) |
|
|
**kwargs: Additional configuration options |
|
|
|
|
|
Returns: |
|
|
Configured InformationSeekerAgent instance with appropriate tools |
|
|
""" |
|
|
|
|
|
from ..agents.base_agent import create_agent_config |
|
|
|
|
|
|
|
|
config = create_agent_config( |
|
|
agent_name="InformationSeekerAgent", |
|
|
model=model, |
|
|
max_iterations=max_iterations, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
|
|
|
agent = InformationSeekerAgent(config=config, shared_mcp_client=shared_mcp_client) |
|
|
|
|
|
return agent |
|
|
|