File size: 17,153 Bytes
d7fb055
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# Supervisor Agent for coordinating the multi-agent system

import logging
from typing import List, Dict, Any, Optional
import json

from .base_agent import BaseAgent

class SupervisorAgent(BaseAgent):
    """Agent responsible for coordinating all other agents in the system."""
    
    def __init__(self, model=None, temperature=None):
        """Initialize the Supervisor Agent.
        
        Args:
            model: Optional model override
            temperature: Optional temperature override
        """
        system_prompt = """
        You are a Supervisor Agent in an AI Co-Scientist system, responsible for coordinating 
        the entire research workflow across multiple specialized agents. You have expertise across 
        multiple scientific disciplines at a PhD level and exceptional project management skills.
        
        Your role is to:
        1. Parse and understand research goals from human researchers
        2. Break down complex research problems into manageable components
        3. Determine which agents to deploy for each task and in what sequence
        4. Allocate computational resources efficiently across the agent system
        5. Track progress, identify bottlenecks, and adjust the workflow as needed
        6. Ensure the final output meets the researcher's expectations
        
        You will direct the following specialized agents:
        - Generation Agent: Creates initial hypotheses based on research goals
        - Reflection Agent: Critically reviews hypotheses for scientific soundness
        - Ranking Agent: Compares and evaluates hypotheses against defined criteria
        - Evolution Agent: Refines promising hypotheses based on feedback
        - Proximity Agent: Ensures hypotheses remain relevant to research goals
        - Meta-Review Agent: Synthesizes findings into coherent research reports
        
        Your decisions should optimize for both thoroughness and efficiency in the scientific process.
        You are the conductor of this research orchestra - ensure all agents work in harmony toward 
        generating high-quality scientific output.
        """
        
        super().__init__(
            name="Supervisor",
            system_prompt=system_prompt,
            model=model,
            temperature=temperature if temperature is not None else 0.3  # Lower temperature for consistent planning
        )
        
        self.logger = logging.getLogger("agent.supervisor")
        self.task_history = []  # Track completed tasks
    
    def process(self, research_query: str, session_config: Optional[Dict] = None) -> Dict[str, Any]:
        """Process a research query by creating and executing a research plan.
        
        Args:
            research_query: The research query or goal from the human researcher
            session_config: Optional configuration for the session
            
        Returns:
            A dictionary containing the research plan, execution status, and results
        """
        self.logger.info(f"Processing research query: {research_query}")
        
        # Analyze the research query
        query_analysis = self._analyze_research_query(research_query)
        
        # Create a research plan
        research_plan = self._create_research_plan(research_query, query_analysis, session_config)
        
        # Return the plan for execution by the main system
        return {
            'research_query': research_query,
            'query_analysis': query_analysis,
            'research_plan': research_plan,
            'status': 'ready_for_execution'
        }
    
    def update_plan(self, current_plan: Dict, execution_status: Dict, user_feedback: Optional[str] = None) -> Dict:
        """Update the research plan based on execution status and user feedback.
        
        Args:
            current_plan: The current research plan
            execution_status: Status of executed steps
            user_feedback: Optional feedback from the human researcher
            
        Returns:
            Updated research plan
        """
        self.logger.info("Updating research plan based on execution status and feedback")
        
        # Record the completed steps
        self.task_history.extend(execution_status.get('completed_steps', []))
        
        # Create a prompt to update the plan
        prompt = f"""
        CURRENT RESEARCH PLAN:
        {json.dumps(current_plan, indent=2)}
        
        EXECUTION STATUS:
        Completed steps: {len(execution_status.get('completed_steps', []))}
        Current step: {execution_status.get('current_step', 'unknown')}
        Issues encountered: {execution_status.get('issues', 'none')}
        """
        
        if user_feedback:
            prompt += f"""
            
            USER FEEDBACK:
            {user_feedback}
            """
        
        prompt += """
        
        Based on this information, please update the research plan. You may:
        1. Continue with the existing plan if it's progressing well
        2. Modify upcoming steps to address issues or incorporate feedback
        3. Add new steps if additional tasks are required
        4. Remove planned steps that are no longer necessary
        
        Provide your updated plan in a structured format with clear reasoning for any changes.
        """
        
        response = self.get_response(prompt)
        
        # Extract the updated plan (simplified implementation)
        updated_plan = self._extract_updated_plan(response, current_plan)
        
        # Mark the plan as updated
        updated_plan['last_updated'] = self._get_timestamp()
        updated_plan['update_reason'] = 'execution_status_and_feedback' if user_feedback else 'execution_status'
        
        return updated_plan
    
    def _analyze_research_query(self, research_query: str) -> Dict[str, Any]:
        """Analyze the research query to determine its characteristics.
        
        Args:
            research_query: The research query from the human researcher
            
        Returns:
            A dictionary with analysis of the query
        """
        self.logger.info("Analyzing research query")
        
        prompt = f"""
        RESEARCH QUERY: {research_query}
        
        Please analyze this research query to help develop an effective research plan.
        Specifically:
        
        1. Identify the primary scientific domain(s) this query relates to (e.g., biology, physics, etc.)
        2. Determine the type of research goal (e.g., explanation, prediction, design, discovery)
        3. Assess the scope and complexity of the query (narrow/focused vs. broad/complex)
        4. Identify key concepts, variables, or entities that will be central to this research
        5. Note any constraints or special requirements mentioned in the query
        6. Suggest what background knowledge might be most relevant to address this query
        
        Provide your analysis in a structured format that can guide the research planning process.
        """
        
        analysis_response = self.get_response(prompt)
        
        # Process the response into a structured analysis (simplified implementation)
        analysis = self._extract_query_analysis(analysis_response)
        
        return analysis
    
    def _create_research_plan(self, research_query: str, query_analysis: Dict[str, Any], 
                              session_config: Optional[Dict] = None) -> Dict[str, Any]:
        """Create a detailed research plan based on the query and its analysis.
        
        Args:
            research_query: The original research query
            query_analysis: Analysis of the research query
            session_config: Optional configuration for the session
            
        Returns:
            A dictionary containing the research plan
        """
        self.logger.info("Creating research plan")
        
        # Extract configuration parameters or use defaults
        config = session_config or {}
        hypothesis_count = config.get('hypothesis_count', 3)
        iteration_limit = config.get('iteration_limit', 2)
        output_format = config.get('output_format', 'scientific_report')
        
        # Default agent sequence for a typical research workflow
        default_sequence = [
            {"agent": "Generation", "task": "generate_initial_hypotheses", "params": {"count": hypothesis_count}},
            {"agent": "Reflection", "task": "review_hypotheses", "params": {}},
            {"agent": "Proximity", "task": "evaluate_relevance", "params": {"threshold": 6.0}},
            {"agent": "Ranking", "task": "rank_hypotheses", "params": {}},
            {"agent": "Evolution", "task": "improve_hypotheses", "params": {"iterations": iteration_limit}},
            {"agent": "Ranking", "task": "rank_final_hypotheses", "params": {}},
            {"agent": "MetaReview", "task": "create_research_report", "params": {"format": output_format}}
        ]
        
        # Create a plan tailored to this specific query
        prompt = f"""
        RESEARCH QUERY: {research_query}
        
        QUERY ANALYSIS:
        {json.dumps(query_analysis, indent=2)}
        
        SESSION CONFIGURATION:
        {json.dumps(config, indent=2)}
        
        Based on this research query and analysis, please create a detailed research plan utilizing our 
        multi-agent system. The default sequence of agent tasks is:
        
        {json.dumps(default_sequence, indent=2)}
        
        You may modify this sequence if warranted by the specific research query. Consider:
        - Adding additional steps for complex queries
        - Removing unnecessary steps for simpler queries
        - Adjusting parameters for each agent based on the query characteristics
        - Adding iteration loops if appropriate for this type of research
        
        Provide a justified research plan that will achieve the researcher's goals efficiently.
        """
        
        plan_response = self.get_response(prompt)
        
        # Extract the research plan (simplified implementation)
        research_plan = self._extract_research_plan(plan_response, default_sequence)
        
        # Add metadata to the plan
        research_plan['research_query'] = research_query
        research_plan['created_at'] = self._get_timestamp()
        research_plan['configuration'] = config
        
        return research_plan
    
    def _extract_query_analysis(self, analysis_text: str) -> Dict[str, Any]:
        """Extract structured analysis from the text response.
        
        This is a placeholder implementation. In a real system, this would use more 
        sophisticated parsing to extract structured data reliably.
        
        Args:
            analysis_text: The raw analysis text
            
        Returns:
            A structured analysis dictionary
        """
        # Simple parsing implementation - would be more sophisticated in a real system
        analysis = {
            'domains': [],
            'research_type': '',
            'scope': '',
            'key_concepts': [],
            'constraints': [],
            'relevant_background': []
        }
        
        # Extract domains
        if 'domain' in analysis_text.lower():
            domain_section = self._extract_section(analysis_text, 'domain', 200)
            domains = [d.strip() for d in domain_section.split(',')]
            analysis['domains'] = [d for d in domains if d]
        
        # Extract research type
        research_types = ['explanation', 'prediction', 'design', 'discovery', 'exploration', 'validation']
        for r_type in research_types:
            if r_type in analysis_text.lower():
                analysis['research_type'] = r_type
                break
        
        # Extract scope
        scope_terms = {'narrow': 'narrow', 'focused': 'narrow', 'specific': 'narrow', 
                      'broad': 'broad', 'complex': 'broad', 'wide': 'broad'}
        for term, scope in scope_terms.items():
            if term in analysis_text.lower():
                analysis['scope'] = scope
                break
        
        # Extract key concepts
        if 'concept' in analysis_text.lower() or 'key' in analysis_text.lower():
            concept_section = self._extract_section(analysis_text, 'concept', 300)
            concepts = [c.strip() for c in concept_section.split(',')] 
            analysis['key_concepts'] = [c for c in concepts if c and len(c) > 2]
        
        return analysis
    
    def _extract_section(self, text: str, keyword: str, max_chars: int = 200) -> str:
        """Extract a section of text containing a keyword.
        
        Args:
            text: The full text
            keyword: Keyword to search for
            max_chars: Maximum characters to extract
            
        Returns:
            The extracted section
        """
        keyword_index = text.lower().find(keyword.lower())
        if keyword_index == -1:
            return ""
        
        start = max(0, keyword_index - 20)
        end = min(len(text), keyword_index + max_chars)
        
        section = text[start:end]
        
        # Try to find sentence or paragraph boundaries
        if '.' in section:
            last_period = section.rindex('.')
            if last_period > len(section) // 2:
                section = section[:last_period + 1]
        
        return section
    
    def _extract_research_plan(self, plan_text: str, default_sequence: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Extract a structured research plan from the text response.
        
        This is a placeholder implementation. In a real system, this would use more 
        sophisticated parsing to extract structured data reliably.
        
        Args:
            plan_text: The raw plan text
            default_sequence: The default sequence to fall back on
            
        Returns:
            A structured research plan dictionary
        """
        # For simplicity, we'll just use the default sequence with any detected modifications
        # In a real implementation, this would parse the plan_text to extract a custom sequence
        
        # Simple checks for modifications to the default plan
        sequence = default_sequence.copy()
        
        # Check if any steps should be removed
        if 'skip proximity evaluation' in plan_text.lower() or 'omit proximity' in plan_text.lower():
            sequence = [step for step in sequence if step['agent'] != 'Proximity']
        
        # Check if iterations should be adjusted for Evolution
        for step in sequence:
            if step['agent'] == 'Evolution':
                if 'additional iterations' in plan_text.lower() or 'more iterations' in plan_text.lower():
                    step['params']['iterations'] += 1
                elif 'fewer iterations' in plan_text.lower() or 'reduce iterations' in plan_text.lower():
                    step['params']['iterations'] = max(1, step['params']['iterations'] - 1)
        
        return {
            'steps': sequence,
            'reasoning': plan_text[:500],  # Store part of the reasoning
            'status': 'planned'
        }
    
    def _extract_updated_plan(self, update_text: str, current_plan: Dict[str, Any]) -> Dict[str, Any]:
        """Extract an updated research plan from the text response.
        
        This is a placeholder implementation. In a real system, this would use more 
        sophisticated parsing to extract structured data reliably.
        
        Args:
            update_text: The raw update text
            current_plan: The current plan to update
            
        Returns:
            The updated research plan dictionary
        """
        # For simplicity, we'll just look for simple update indicators
        # In a real implementation, this would parse the update_text to extract specific changes
        
        updated_plan = current_plan.copy()
        updated_plan['update_notes'] = update_text[:500]  # Store part of the update reasoning
        
        # Check for step additions or removals (simplified implementation)
        if 'add step' in update_text.lower() or 'additional step' in update_text.lower():
            updated_plan['steps'].append({"agent": "MetaReview", "task": "create_executive_summary", "params": {}})
        
        if 'remove step' in update_text.lower() or 'skip step' in update_text.lower():
            # For simplicity, just remove the last step that hasn't been executed yet
            executed_count = len(current_plan.get('executed_steps', []))
            if executed_count < len(updated_plan['steps']):
                updated_plan['steps'].pop(-1)
        
        return updated_plan
    
    def _get_timestamp(self) -> str:
        """Get a formatted timestamp.
        
        Returns:
            A formatted timestamp string
        """
        from datetime import datetime
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")