hyp / src /agents /supervisor_agent.py
Leon4gr45's picture
Upload 64 files
d7fb055 verified
# Supervisor Agent for coordinating the multi-agent system
import logging
from typing import List, Dict, Any, Optional
import json
from .base_agent import BaseAgent
class SupervisorAgent(BaseAgent):
"""Agent responsible for coordinating all other agents in the system."""
def __init__(self, model=None, temperature=None):
"""Initialize the Supervisor Agent.
Args:
model: Optional model override
temperature: Optional temperature override
"""
system_prompt = """
You are a Supervisor Agent in an AI Co-Scientist system, responsible for coordinating
the entire research workflow across multiple specialized agents. You have expertise across
multiple scientific disciplines at a PhD level and exceptional project management skills.
Your role is to:
1. Parse and understand research goals from human researchers
2. Break down complex research problems into manageable components
3. Determine which agents to deploy for each task and in what sequence
4. Allocate computational resources efficiently across the agent system
5. Track progress, identify bottlenecks, and adjust the workflow as needed
6. Ensure the final output meets the researcher's expectations
You will direct the following specialized agents:
- Generation Agent: Creates initial hypotheses based on research goals
- Reflection Agent: Critically reviews hypotheses for scientific soundness
- Ranking Agent: Compares and evaluates hypotheses against defined criteria
- Evolution Agent: Refines promising hypotheses based on feedback
- Proximity Agent: Ensures hypotheses remain relevant to research goals
- Meta-Review Agent: Synthesizes findings into coherent research reports
Your decisions should optimize for both thoroughness and efficiency in the scientific process.
You are the conductor of this research orchestra - ensure all agents work in harmony toward
generating high-quality scientific output.
"""
super().__init__(
name="Supervisor",
system_prompt=system_prompt,
model=model,
temperature=temperature if temperature is not None else 0.3 # Lower temperature for consistent planning
)
self.logger = logging.getLogger("agent.supervisor")
self.task_history = [] # Track completed tasks
def process(self, research_query: str, session_config: Optional[Dict] = None) -> Dict[str, Any]:
"""Process a research query by creating and executing a research plan.
Args:
research_query: The research query or goal from the human researcher
session_config: Optional configuration for the session
Returns:
A dictionary containing the research plan, execution status, and results
"""
self.logger.info(f"Processing research query: {research_query}")
# Analyze the research query
query_analysis = self._analyze_research_query(research_query)
# Create a research plan
research_plan = self._create_research_plan(research_query, query_analysis, session_config)
# Return the plan for execution by the main system
return {
'research_query': research_query,
'query_analysis': query_analysis,
'research_plan': research_plan,
'status': 'ready_for_execution'
}
def update_plan(self, current_plan: Dict, execution_status: Dict, user_feedback: Optional[str] = None) -> Dict:
"""Update the research plan based on execution status and user feedback.
Args:
current_plan: The current research plan
execution_status: Status of executed steps
user_feedback: Optional feedback from the human researcher
Returns:
Updated research plan
"""
self.logger.info("Updating research plan based on execution status and feedback")
# Record the completed steps
self.task_history.extend(execution_status.get('completed_steps', []))
# Create a prompt to update the plan
prompt = f"""
CURRENT RESEARCH PLAN:
{json.dumps(current_plan, indent=2)}
EXECUTION STATUS:
Completed steps: {len(execution_status.get('completed_steps', []))}
Current step: {execution_status.get('current_step', 'unknown')}
Issues encountered: {execution_status.get('issues', 'none')}
"""
if user_feedback:
prompt += f"""
USER FEEDBACK:
{user_feedback}
"""
prompt += """
Based on this information, please update the research plan. You may:
1. Continue with the existing plan if it's progressing well
2. Modify upcoming steps to address issues or incorporate feedback
3. Add new steps if additional tasks are required
4. Remove planned steps that are no longer necessary
Provide your updated plan in a structured format with clear reasoning for any changes.
"""
response = self.get_response(prompt)
# Extract the updated plan (simplified implementation)
updated_plan = self._extract_updated_plan(response, current_plan)
# Mark the plan as updated
updated_plan['last_updated'] = self._get_timestamp()
updated_plan['update_reason'] = 'execution_status_and_feedback' if user_feedback else 'execution_status'
return updated_plan
def _analyze_research_query(self, research_query: str) -> Dict[str, Any]:
"""Analyze the research query to determine its characteristics.
Args:
research_query: The research query from the human researcher
Returns:
A dictionary with analysis of the query
"""
self.logger.info("Analyzing research query")
prompt = f"""
RESEARCH QUERY: {research_query}
Please analyze this research query to help develop an effective research plan.
Specifically:
1. Identify the primary scientific domain(s) this query relates to (e.g., biology, physics, etc.)
2. Determine the type of research goal (e.g., explanation, prediction, design, discovery)
3. Assess the scope and complexity of the query (narrow/focused vs. broad/complex)
4. Identify key concepts, variables, or entities that will be central to this research
5. Note any constraints or special requirements mentioned in the query
6. Suggest what background knowledge might be most relevant to address this query
Provide your analysis in a structured format that can guide the research planning process.
"""
analysis_response = self.get_response(prompt)
# Process the response into a structured analysis (simplified implementation)
analysis = self._extract_query_analysis(analysis_response)
return analysis
def _create_research_plan(self, research_query: str, query_analysis: Dict[str, Any],
session_config: Optional[Dict] = None) -> Dict[str, Any]:
"""Create a detailed research plan based on the query and its analysis.
Args:
research_query: The original research query
query_analysis: Analysis of the research query
session_config: Optional configuration for the session
Returns:
A dictionary containing the research plan
"""
self.logger.info("Creating research plan")
# Extract configuration parameters or use defaults
config = session_config or {}
hypothesis_count = config.get('hypothesis_count', 3)
iteration_limit = config.get('iteration_limit', 2)
output_format = config.get('output_format', 'scientific_report')
# Default agent sequence for a typical research workflow
default_sequence = [
{"agent": "Generation", "task": "generate_initial_hypotheses", "params": {"count": hypothesis_count}},
{"agent": "Reflection", "task": "review_hypotheses", "params": {}},
{"agent": "Proximity", "task": "evaluate_relevance", "params": {"threshold": 6.0}},
{"agent": "Ranking", "task": "rank_hypotheses", "params": {}},
{"agent": "Evolution", "task": "improve_hypotheses", "params": {"iterations": iteration_limit}},
{"agent": "Ranking", "task": "rank_final_hypotheses", "params": {}},
{"agent": "MetaReview", "task": "create_research_report", "params": {"format": output_format}}
]
# Create a plan tailored to this specific query
prompt = f"""
RESEARCH QUERY: {research_query}
QUERY ANALYSIS:
{json.dumps(query_analysis, indent=2)}
SESSION CONFIGURATION:
{json.dumps(config, indent=2)}
Based on this research query and analysis, please create a detailed research plan utilizing our
multi-agent system. The default sequence of agent tasks is:
{json.dumps(default_sequence, indent=2)}
You may modify this sequence if warranted by the specific research query. Consider:
- Adding additional steps for complex queries
- Removing unnecessary steps for simpler queries
- Adjusting parameters for each agent based on the query characteristics
- Adding iteration loops if appropriate for this type of research
Provide a justified research plan that will achieve the researcher's goals efficiently.
"""
plan_response = self.get_response(prompt)
# Extract the research plan (simplified implementation)
research_plan = self._extract_research_plan(plan_response, default_sequence)
# Add metadata to the plan
research_plan['research_query'] = research_query
research_plan['created_at'] = self._get_timestamp()
research_plan['configuration'] = config
return research_plan
def _extract_query_analysis(self, analysis_text: str) -> Dict[str, Any]:
"""Extract structured analysis from the text response.
This is a placeholder implementation. In a real system, this would use more
sophisticated parsing to extract structured data reliably.
Args:
analysis_text: The raw analysis text
Returns:
A structured analysis dictionary
"""
# Simple parsing implementation - would be more sophisticated in a real system
analysis = {
'domains': [],
'research_type': '',
'scope': '',
'key_concepts': [],
'constraints': [],
'relevant_background': []
}
# Extract domains
if 'domain' in analysis_text.lower():
domain_section = self._extract_section(analysis_text, 'domain', 200)
domains = [d.strip() for d in domain_section.split(',')]
analysis['domains'] = [d for d in domains if d]
# Extract research type
research_types = ['explanation', 'prediction', 'design', 'discovery', 'exploration', 'validation']
for r_type in research_types:
if r_type in analysis_text.lower():
analysis['research_type'] = r_type
break
# Extract scope
scope_terms = {'narrow': 'narrow', 'focused': 'narrow', 'specific': 'narrow',
'broad': 'broad', 'complex': 'broad', 'wide': 'broad'}
for term, scope in scope_terms.items():
if term in analysis_text.lower():
analysis['scope'] = scope
break
# Extract key concepts
if 'concept' in analysis_text.lower() or 'key' in analysis_text.lower():
concept_section = self._extract_section(analysis_text, 'concept', 300)
concepts = [c.strip() for c in concept_section.split(',')]
analysis['key_concepts'] = [c for c in concepts if c and len(c) > 2]
return analysis
def _extract_section(self, text: str, keyword: str, max_chars: int = 200) -> str:
"""Extract a section of text containing a keyword.
Args:
text: The full text
keyword: Keyword to search for
max_chars: Maximum characters to extract
Returns:
The extracted section
"""
keyword_index = text.lower().find(keyword.lower())
if keyword_index == -1:
return ""
start = max(0, keyword_index - 20)
end = min(len(text), keyword_index + max_chars)
section = text[start:end]
# Try to find sentence or paragraph boundaries
if '.' in section:
last_period = section.rindex('.')
if last_period > len(section) // 2:
section = section[:last_period + 1]
return section
def _extract_research_plan(self, plan_text: str, default_sequence: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Extract a structured research plan from the text response.
This is a placeholder implementation. In a real system, this would use more
sophisticated parsing to extract structured data reliably.
Args:
plan_text: The raw plan text
default_sequence: The default sequence to fall back on
Returns:
A structured research plan dictionary
"""
# For simplicity, we'll just use the default sequence with any detected modifications
# In a real implementation, this would parse the plan_text to extract a custom sequence
# Simple checks for modifications to the default plan
sequence = default_sequence.copy()
# Check if any steps should be removed
if 'skip proximity evaluation' in plan_text.lower() or 'omit proximity' in plan_text.lower():
sequence = [step for step in sequence if step['agent'] != 'Proximity']
# Check if iterations should be adjusted for Evolution
for step in sequence:
if step['agent'] == 'Evolution':
if 'additional iterations' in plan_text.lower() or 'more iterations' in plan_text.lower():
step['params']['iterations'] += 1
elif 'fewer iterations' in plan_text.lower() or 'reduce iterations' in plan_text.lower():
step['params']['iterations'] = max(1, step['params']['iterations'] - 1)
return {
'steps': sequence,
'reasoning': plan_text[:500], # Store part of the reasoning
'status': 'planned'
}
def _extract_updated_plan(self, update_text: str, current_plan: Dict[str, Any]) -> Dict[str, Any]:
"""Extract an updated research plan from the text response.
This is a placeholder implementation. In a real system, this would use more
sophisticated parsing to extract structured data reliably.
Args:
update_text: The raw update text
current_plan: The current plan to update
Returns:
The updated research plan dictionary
"""
# For simplicity, we'll just look for simple update indicators
# In a real implementation, this would parse the update_text to extract specific changes
updated_plan = current_plan.copy()
updated_plan['update_notes'] = update_text[:500] # Store part of the update reasoning
# Check for step additions or removals (simplified implementation)
if 'add step' in update_text.lower() or 'additional step' in update_text.lower():
updated_plan['steps'].append({"agent": "MetaReview", "task": "create_executive_summary", "params": {}})
if 'remove step' in update_text.lower() or 'skip step' in update_text.lower():
# For simplicity, just remove the last step that hasn't been executed yet
executed_count = len(current_plan.get('executed_steps', []))
if executed_count < len(updated_plan['steps']):
updated_plan['steps'].pop(-1)
return updated_plan
def _get_timestamp(self) -> str:
"""Get a formatted timestamp.
Returns:
A formatted timestamp string
"""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")