|
|
|
|
|
|
|
|
import logging |
|
|
from typing import List, Dict, Any, Optional |
|
|
import json |
|
|
|
|
|
from .base_agent import BaseAgent |
|
|
|
|
|
class SupervisorAgent(BaseAgent): |
|
|
"""Agent responsible for coordinating all other agents in the system.""" |
|
|
|
|
|
def __init__(self, model=None, temperature=None): |
|
|
"""Initialize the Supervisor Agent. |
|
|
|
|
|
Args: |
|
|
model: Optional model override |
|
|
temperature: Optional temperature override |
|
|
""" |
|
|
system_prompt = """ |
|
|
You are a Supervisor Agent in an AI Co-Scientist system, responsible for coordinating |
|
|
the entire research workflow across multiple specialized agents. You have expertise across |
|
|
multiple scientific disciplines at a PhD level and exceptional project management skills. |
|
|
|
|
|
Your role is to: |
|
|
1. Parse and understand research goals from human researchers |
|
|
2. Break down complex research problems into manageable components |
|
|
3. Determine which agents to deploy for each task and in what sequence |
|
|
4. Allocate computational resources efficiently across the agent system |
|
|
5. Track progress, identify bottlenecks, and adjust the workflow as needed |
|
|
6. Ensure the final output meets the researcher's expectations |
|
|
|
|
|
You will direct the following specialized agents: |
|
|
- Generation Agent: Creates initial hypotheses based on research goals |
|
|
- Reflection Agent: Critically reviews hypotheses for scientific soundness |
|
|
- Ranking Agent: Compares and evaluates hypotheses against defined criteria |
|
|
- Evolution Agent: Refines promising hypotheses based on feedback |
|
|
- Proximity Agent: Ensures hypotheses remain relevant to research goals |
|
|
- Meta-Review Agent: Synthesizes findings into coherent research reports |
|
|
|
|
|
Your decisions should optimize for both thoroughness and efficiency in the scientific process. |
|
|
You are the conductor of this research orchestra - ensure all agents work in harmony toward |
|
|
generating high-quality scientific output. |
|
|
""" |
|
|
|
|
|
super().__init__( |
|
|
name="Supervisor", |
|
|
system_prompt=system_prompt, |
|
|
model=model, |
|
|
temperature=temperature if temperature is not None else 0.3 |
|
|
) |
|
|
|
|
|
self.logger = logging.getLogger("agent.supervisor") |
|
|
self.task_history = [] |
|
|
|
|
|
def process(self, research_query: str, session_config: Optional[Dict] = None) -> Dict[str, Any]: |
|
|
"""Process a research query by creating and executing a research plan. |
|
|
|
|
|
Args: |
|
|
research_query: The research query or goal from the human researcher |
|
|
session_config: Optional configuration for the session |
|
|
|
|
|
Returns: |
|
|
A dictionary containing the research plan, execution status, and results |
|
|
""" |
|
|
self.logger.info(f"Processing research query: {research_query}") |
|
|
|
|
|
|
|
|
query_analysis = self._analyze_research_query(research_query) |
|
|
|
|
|
|
|
|
research_plan = self._create_research_plan(research_query, query_analysis, session_config) |
|
|
|
|
|
|
|
|
return { |
|
|
'research_query': research_query, |
|
|
'query_analysis': query_analysis, |
|
|
'research_plan': research_plan, |
|
|
'status': 'ready_for_execution' |
|
|
} |
|
|
|
|
|
def update_plan(self, current_plan: Dict, execution_status: Dict, user_feedback: Optional[str] = None) -> Dict: |
|
|
"""Update the research plan based on execution status and user feedback. |
|
|
|
|
|
Args: |
|
|
current_plan: The current research plan |
|
|
execution_status: Status of executed steps |
|
|
user_feedback: Optional feedback from the human researcher |
|
|
|
|
|
Returns: |
|
|
Updated research plan |
|
|
""" |
|
|
self.logger.info("Updating research plan based on execution status and feedback") |
|
|
|
|
|
|
|
|
self.task_history.extend(execution_status.get('completed_steps', [])) |
|
|
|
|
|
|
|
|
prompt = f""" |
|
|
CURRENT RESEARCH PLAN: |
|
|
{json.dumps(current_plan, indent=2)} |
|
|
|
|
|
EXECUTION STATUS: |
|
|
Completed steps: {len(execution_status.get('completed_steps', []))} |
|
|
Current step: {execution_status.get('current_step', 'unknown')} |
|
|
Issues encountered: {execution_status.get('issues', 'none')} |
|
|
""" |
|
|
|
|
|
if user_feedback: |
|
|
prompt += f""" |
|
|
|
|
|
USER FEEDBACK: |
|
|
{user_feedback} |
|
|
""" |
|
|
|
|
|
prompt += """ |
|
|
|
|
|
Based on this information, please update the research plan. You may: |
|
|
1. Continue with the existing plan if it's progressing well |
|
|
2. Modify upcoming steps to address issues or incorporate feedback |
|
|
3. Add new steps if additional tasks are required |
|
|
4. Remove planned steps that are no longer necessary |
|
|
|
|
|
Provide your updated plan in a structured format with clear reasoning for any changes. |
|
|
""" |
|
|
|
|
|
response = self.get_response(prompt) |
|
|
|
|
|
|
|
|
updated_plan = self._extract_updated_plan(response, current_plan) |
|
|
|
|
|
|
|
|
updated_plan['last_updated'] = self._get_timestamp() |
|
|
updated_plan['update_reason'] = 'execution_status_and_feedback' if user_feedback else 'execution_status' |
|
|
|
|
|
return updated_plan |
|
|
|
|
|
def _analyze_research_query(self, research_query: str) -> Dict[str, Any]: |
|
|
"""Analyze the research query to determine its characteristics. |
|
|
|
|
|
Args: |
|
|
research_query: The research query from the human researcher |
|
|
|
|
|
Returns: |
|
|
A dictionary with analysis of the query |
|
|
""" |
|
|
self.logger.info("Analyzing research query") |
|
|
|
|
|
prompt = f""" |
|
|
RESEARCH QUERY: {research_query} |
|
|
|
|
|
Please analyze this research query to help develop an effective research plan. |
|
|
Specifically: |
|
|
|
|
|
1. Identify the primary scientific domain(s) this query relates to (e.g., biology, physics, etc.) |
|
|
2. Determine the type of research goal (e.g., explanation, prediction, design, discovery) |
|
|
3. Assess the scope and complexity of the query (narrow/focused vs. broad/complex) |
|
|
4. Identify key concepts, variables, or entities that will be central to this research |
|
|
5. Note any constraints or special requirements mentioned in the query |
|
|
6. Suggest what background knowledge might be most relevant to address this query |
|
|
|
|
|
Provide your analysis in a structured format that can guide the research planning process. |
|
|
""" |
|
|
|
|
|
analysis_response = self.get_response(prompt) |
|
|
|
|
|
|
|
|
analysis = self._extract_query_analysis(analysis_response) |
|
|
|
|
|
return analysis |
|
|
|
|
|
def _create_research_plan(self, research_query: str, query_analysis: Dict[str, Any], |
|
|
session_config: Optional[Dict] = None) -> Dict[str, Any]: |
|
|
"""Create a detailed research plan based on the query and its analysis. |
|
|
|
|
|
Args: |
|
|
research_query: The original research query |
|
|
query_analysis: Analysis of the research query |
|
|
session_config: Optional configuration for the session |
|
|
|
|
|
Returns: |
|
|
A dictionary containing the research plan |
|
|
""" |
|
|
self.logger.info("Creating research plan") |
|
|
|
|
|
|
|
|
config = session_config or {} |
|
|
hypothesis_count = config.get('hypothesis_count', 3) |
|
|
iteration_limit = config.get('iteration_limit', 2) |
|
|
output_format = config.get('output_format', 'scientific_report') |
|
|
|
|
|
|
|
|
default_sequence = [ |
|
|
{"agent": "Generation", "task": "generate_initial_hypotheses", "params": {"count": hypothesis_count}}, |
|
|
{"agent": "Reflection", "task": "review_hypotheses", "params": {}}, |
|
|
{"agent": "Proximity", "task": "evaluate_relevance", "params": {"threshold": 6.0}}, |
|
|
{"agent": "Ranking", "task": "rank_hypotheses", "params": {}}, |
|
|
{"agent": "Evolution", "task": "improve_hypotheses", "params": {"iterations": iteration_limit}}, |
|
|
{"agent": "Ranking", "task": "rank_final_hypotheses", "params": {}}, |
|
|
{"agent": "MetaReview", "task": "create_research_report", "params": {"format": output_format}} |
|
|
] |
|
|
|
|
|
|
|
|
prompt = f""" |
|
|
RESEARCH QUERY: {research_query} |
|
|
|
|
|
QUERY ANALYSIS: |
|
|
{json.dumps(query_analysis, indent=2)} |
|
|
|
|
|
SESSION CONFIGURATION: |
|
|
{json.dumps(config, indent=2)} |
|
|
|
|
|
Based on this research query and analysis, please create a detailed research plan utilizing our |
|
|
multi-agent system. The default sequence of agent tasks is: |
|
|
|
|
|
{json.dumps(default_sequence, indent=2)} |
|
|
|
|
|
You may modify this sequence if warranted by the specific research query. Consider: |
|
|
- Adding additional steps for complex queries |
|
|
- Removing unnecessary steps for simpler queries |
|
|
- Adjusting parameters for each agent based on the query characteristics |
|
|
- Adding iteration loops if appropriate for this type of research |
|
|
|
|
|
Provide a justified research plan that will achieve the researcher's goals efficiently. |
|
|
""" |
|
|
|
|
|
plan_response = self.get_response(prompt) |
|
|
|
|
|
|
|
|
research_plan = self._extract_research_plan(plan_response, default_sequence) |
|
|
|
|
|
|
|
|
research_plan['research_query'] = research_query |
|
|
research_plan['created_at'] = self._get_timestamp() |
|
|
research_plan['configuration'] = config |
|
|
|
|
|
return research_plan |
|
|
|
|
|
def _extract_query_analysis(self, analysis_text: str) -> Dict[str, Any]: |
|
|
"""Extract structured analysis from the text response. |
|
|
|
|
|
This is a placeholder implementation. In a real system, this would use more |
|
|
sophisticated parsing to extract structured data reliably. |
|
|
|
|
|
Args: |
|
|
analysis_text: The raw analysis text |
|
|
|
|
|
Returns: |
|
|
A structured analysis dictionary |
|
|
""" |
|
|
|
|
|
analysis = { |
|
|
'domains': [], |
|
|
'research_type': '', |
|
|
'scope': '', |
|
|
'key_concepts': [], |
|
|
'constraints': [], |
|
|
'relevant_background': [] |
|
|
} |
|
|
|
|
|
|
|
|
if 'domain' in analysis_text.lower(): |
|
|
domain_section = self._extract_section(analysis_text, 'domain', 200) |
|
|
domains = [d.strip() for d in domain_section.split(',')] |
|
|
analysis['domains'] = [d for d in domains if d] |
|
|
|
|
|
|
|
|
research_types = ['explanation', 'prediction', 'design', 'discovery', 'exploration', 'validation'] |
|
|
for r_type in research_types: |
|
|
if r_type in analysis_text.lower(): |
|
|
analysis['research_type'] = r_type |
|
|
break |
|
|
|
|
|
|
|
|
scope_terms = {'narrow': 'narrow', 'focused': 'narrow', 'specific': 'narrow', |
|
|
'broad': 'broad', 'complex': 'broad', 'wide': 'broad'} |
|
|
for term, scope in scope_terms.items(): |
|
|
if term in analysis_text.lower(): |
|
|
analysis['scope'] = scope |
|
|
break |
|
|
|
|
|
|
|
|
if 'concept' in analysis_text.lower() or 'key' in analysis_text.lower(): |
|
|
concept_section = self._extract_section(analysis_text, 'concept', 300) |
|
|
concepts = [c.strip() for c in concept_section.split(',')] |
|
|
analysis['key_concepts'] = [c for c in concepts if c and len(c) > 2] |
|
|
|
|
|
return analysis |
|
|
|
|
|
def _extract_section(self, text: str, keyword: str, max_chars: int = 200) -> str: |
|
|
"""Extract a section of text containing a keyword. |
|
|
|
|
|
Args: |
|
|
text: The full text |
|
|
keyword: Keyword to search for |
|
|
max_chars: Maximum characters to extract |
|
|
|
|
|
Returns: |
|
|
The extracted section |
|
|
""" |
|
|
keyword_index = text.lower().find(keyword.lower()) |
|
|
if keyword_index == -1: |
|
|
return "" |
|
|
|
|
|
start = max(0, keyword_index - 20) |
|
|
end = min(len(text), keyword_index + max_chars) |
|
|
|
|
|
section = text[start:end] |
|
|
|
|
|
|
|
|
if '.' in section: |
|
|
last_period = section.rindex('.') |
|
|
if last_period > len(section) // 2: |
|
|
section = section[:last_period + 1] |
|
|
|
|
|
return section |
|
|
|
|
|
def _extract_research_plan(self, plan_text: str, default_sequence: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
|
"""Extract a structured research plan from the text response. |
|
|
|
|
|
This is a placeholder implementation. In a real system, this would use more |
|
|
sophisticated parsing to extract structured data reliably. |
|
|
|
|
|
Args: |
|
|
plan_text: The raw plan text |
|
|
default_sequence: The default sequence to fall back on |
|
|
|
|
|
Returns: |
|
|
A structured research plan dictionary |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sequence = default_sequence.copy() |
|
|
|
|
|
|
|
|
if 'skip proximity evaluation' in plan_text.lower() or 'omit proximity' in plan_text.lower(): |
|
|
sequence = [step for step in sequence if step['agent'] != 'Proximity'] |
|
|
|
|
|
|
|
|
for step in sequence: |
|
|
if step['agent'] == 'Evolution': |
|
|
if 'additional iterations' in plan_text.lower() or 'more iterations' in plan_text.lower(): |
|
|
step['params']['iterations'] += 1 |
|
|
elif 'fewer iterations' in plan_text.lower() or 'reduce iterations' in plan_text.lower(): |
|
|
step['params']['iterations'] = max(1, step['params']['iterations'] - 1) |
|
|
|
|
|
return { |
|
|
'steps': sequence, |
|
|
'reasoning': plan_text[:500], |
|
|
'status': 'planned' |
|
|
} |
|
|
|
|
|
def _extract_updated_plan(self, update_text: str, current_plan: Dict[str, Any]) -> Dict[str, Any]: |
|
|
"""Extract an updated research plan from the text response. |
|
|
|
|
|
This is a placeholder implementation. In a real system, this would use more |
|
|
sophisticated parsing to extract structured data reliably. |
|
|
|
|
|
Args: |
|
|
update_text: The raw update text |
|
|
current_plan: The current plan to update |
|
|
|
|
|
Returns: |
|
|
The updated research plan dictionary |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
updated_plan = current_plan.copy() |
|
|
updated_plan['update_notes'] = update_text[:500] |
|
|
|
|
|
|
|
|
if 'add step' in update_text.lower() or 'additional step' in update_text.lower(): |
|
|
updated_plan['steps'].append({"agent": "MetaReview", "task": "create_executive_summary", "params": {}}) |
|
|
|
|
|
if 'remove step' in update_text.lower() or 'skip step' in update_text.lower(): |
|
|
|
|
|
executed_count = len(current_plan.get('executed_steps', [])) |
|
|
if executed_count < len(updated_plan['steps']): |
|
|
updated_plan['steps'].pop(-1) |
|
|
|
|
|
return updated_plan |
|
|
|
|
|
def _get_timestamp(self) -> str: |
|
|
"""Get a formatted timestamp. |
|
|
|
|
|
Returns: |
|
|
A formatted timestamp string |
|
|
""" |
|
|
from datetime import datetime |
|
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|