""" Agent Execution Module Executes analysis plans step-by-step with progress tracking and error handling. The executor: - Runs each step in the plan sequentially - Tracks progress for each step - Handles errors gracefully - Logs execution details - Stores results UNIVERSAL DESIGN: - Works with any plan generated by the planner - Adapts to different restaurant types - Provides real-time progress updates """ from typing import List, Dict, Any, Optional, Callable from datetime import datetime import time class AgentExecutor: """ Executes analysis plans step-by-step. Features: - Sequential step execution - Progress tracking (% complete, time remaining) - Error handling and recovery - Result storage - Real-time status updates Example: executor = AgentExecutor() # Execute a plan results = executor.execute_plan( plan=plan, progress_callback=lambda status: print(status) ) # Check execution status if executor.execution_successful: print("All steps completed!") """ def __init__(self): """Initialize the executor.""" self.execution_log: List[str] = [] self.step_results: Dict[int, Any] = {} self.execution_successful: bool = False self.current_step: int = 0 self.total_steps: int = 0 self.start_time: Optional[float] = None self.end_time: Optional[float] = None def execute_plan( self, plan: List[Dict[str, Any]], progress_callback: Optional[Callable[[str], None]] = None, context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Execute an analysis plan step-by-step. D2-003: Method skeleton D2-004: Step-by-step execution logic D2-005: Progress tracking D2-006: Error handling Args: plan: List of steps to execute progress_callback: Optional callback for progress updates context: Optional context data (URLs, data, etc.) Returns: Dictionary with execution results: - success: Boolean indicating if execution completed - results: Results from each step - execution_time: Total time taken - logs: Execution log entries Example: def show_progress(status): print(f"Progress: {status}") results = executor.execute_plan( plan=my_plan, progress_callback=show_progress ) """ # D2-005: Initialize progress tracking self.total_steps = len(plan) self.current_step = 0 self.start_time = time.time() self.execution_successful = False self.step_results = {} self.execution_log = [] self._log(f"Starting execution of {self.total_steps}-step plan") if progress_callback: progress_callback(f"Initializing executor (0/{self.total_steps} steps)") # D2-004: Execute each step sequentially for step in plan: self.current_step = step['step'] try: # D2-005: Update progress progress_pct = int((self.current_step / self.total_steps) * 100) self._log(f"Step {self.current_step}/{self.total_steps} ({progress_pct}%): {step['action']}") if progress_callback: progress_callback( f"Step {self.current_step}/{self.total_steps}: {step['action']}" ) # D2-004: Execute the step result = self._execute_step(step, context) # Store result self.step_results[self.current_step] = { 'action': step['action'], 'result': result, 'status': 'success', 'timestamp': datetime.now().isoformat() } self._log(f"✅ Step {self.current_step} completed: {step['action']}") except Exception as e: # D2-006: Error handling self._log(f"❌ Step {self.current_step} failed: {step['action']}") self._log(f" Error: {str(e)}") # Store error self.step_results[self.current_step] = { 'action': step['action'], 'result': None, 'status': 'failed', 'error': str(e), 'timestamp': datetime.now().isoformat() } if progress_callback: progress_callback(f"⚠️ Step {self.current_step} failed: {str(e)}") # Decide whether to continue or stop # For now, we'll log and continue (graceful degradation) self._log(f"⚠️ Continuing with remaining steps...") # Execution complete self.end_time = time.time() execution_time = self.end_time - self.start_time # Check if all steps succeeded failed_steps = [ step_num for step_num, result in self.step_results.items() if result['status'] == 'failed' ] self.execution_successful = len(failed_steps) == 0 if self.execution_successful: self._log(f"✅ Execution completed successfully in {execution_time:.2f}s") else: self._log(f"⚠️ Execution completed with {len(failed_steps)} failed steps in {execution_time:.2f}s") if progress_callback: if self.execution_successful: progress_callback(f"✅ All {self.total_steps} steps completed!") else: progress_callback(f"⚠️ {self.total_steps - len(failed_steps)}/{self.total_steps} steps completed") # Return results return { 'success': self.execution_successful, 'results': self.step_results, 'execution_time': execution_time, 'logs': self.execution_log, 'failed_steps': failed_steps } def _execute_step( self, step: Dict[str, Any], context: Optional[Dict[str, Any]] ) -> Any: """ Execute a single step. D2-004: Core step execution logic Args: step: Step dictionary with action, params, reason context: Execution context Returns: Result from executing the step Note: This is a placeholder. In future days, we'll implement actual logic for each action type (scrape, analyze, etc.) """ action = step['action'] params = step.get('params', {}) # For now, simulate execution with a small delay # In future days, we'll add real implementations for each action time.sleep(0.1) # Simulate work # Placeholder results based on action type if action == 'scrape_reviews': return {'status': 'simulated', 'reviews_count': 500} elif action == 'discover_menu_items': return {'status': 'simulated', 'items_found': 52} elif action == 'discover_aspects': return {'status': 'simulated', 'aspects_found': 7} elif action == 'analyze_sentiment': return {'status': 'simulated', 'overall_sentiment': 0.73} else: return {'status': 'simulated', 'action': action} def _log(self, message: str) -> None: """ Log execution progress. Args: message: Log message """ timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"[{timestamp}] {message}" self.execution_log.append(log_entry) print(f"⚙️ {log_entry}") def get_execution_summary(self) -> Dict[str, Any]: """ Get a summary of the execution. Returns: Dictionary with summary info """ if not self.start_time: return {'status': 'not_started'} execution_time = (self.end_time - self.start_time) if self.end_time else 0 return { 'total_steps': self.total_steps, 'completed_steps': len(self.step_results), 'successful_steps': sum(1 for r in self.step_results.values() if r['status'] == 'success'), 'failed_steps': sum(1 for r in self.step_results.values() if r['status'] == 'failed'), 'execution_time': f"{execution_time:.2f}s", 'success': self.execution_successful } # D2-007: Test execution with sample plan if __name__ == "__main__": print("=" * 70) print("D2-007: Testing Agent Executor with Sample Plan") print("=" * 70 + "\n") # Create a sample plan (similar to what planner generates) sample_plan = [ { 'step': 1, 'action': 'scrape_reviews', 'params': {'url': 'https://opentable.ca/r/test-restaurant'}, 'reason': 'Need review data' }, { 'step': 2, 'action': 'discover_menu_items', 'params': {'reviews': 'scraped_data'}, 'reason': 'Discover menu items dynamically' }, { 'step': 3, 'action': 'discover_aspects', 'params': {'reviews': 'scraped_data'}, 'reason': 'Discover relevant aspects' }, { 'step': 4, 'action': 'analyze_sentiment', 'params': {'reviews': 'scraped_data'}, 'reason': 'Calculate sentiment scores' }, { 'step': 5, 'action': 'generate_insights_chef', 'params': {'analysis': 'results'}, 'reason': 'Create chef summary' } ] # Create executor executor = AgentExecutor() # Define progress callback def show_progress(status): print(f"📊 {status}") # Execute the plan print("Starting execution...\n") results = executor.execute_plan( plan=sample_plan, progress_callback=show_progress ) # Display results print("\n" + "=" * 70) print("EXECUTION RESULTS") print("=" * 70) print(f"\nSuccess: {results['success']}") print(f"Execution time: {results['execution_time']:.2f}s") print(f"Steps completed: {len(results['results'])}/{len(sample_plan)}") if results['failed_steps']: print(f"Failed steps: {results['failed_steps']}") print("\nStep Results:") for step_num, result in results['results'].items(): status_icon = "✅" if result['status'] == 'success' else "❌" print(f" {status_icon} Step {step_num}: {result['action']} - {result['status']}") # Get summary print("\n" + "=" * 70) print("EXECUTION SUMMARY") print("=" * 70) summary = executor.get_execution_summary() for key, value in summary.items(): print(f" {key}: {value}") print("\n" + "=" * 70) print("🎉 Executor test complete!") print("=" * 70)