Spaces:
Sleeping
Sleeping
| """ | |
| Agent Execution Module | |
| Executes analysis plans step-by-step with progress tracking and error handling. | |
| The executor: | |
| - Runs each step in the plan sequentially | |
| - Tracks progress for each step | |
| - Handles errors gracefully | |
| - Logs execution details | |
| - Stores results | |
| UNIVERSAL DESIGN: | |
| - Works with any plan generated by the planner | |
| - Adapts to different restaurant types | |
| - Provides real-time progress updates | |
| """ | |
| from typing import List, Dict, Any, Optional, Callable | |
| from datetime import datetime | |
| import time | |
| class AgentExecutor: | |
| """ | |
| Executes analysis plans step-by-step. | |
| Features: | |
| - Sequential step execution | |
| - Progress tracking (% complete, time remaining) | |
| - Error handling and recovery | |
| - Result storage | |
| - Real-time status updates | |
| Example: | |
| executor = AgentExecutor() | |
| # Execute a plan | |
| results = executor.execute_plan( | |
| plan=plan, | |
| progress_callback=lambda status: print(status) | |
| ) | |
| # Check execution status | |
| if executor.execution_successful: | |
| print("All steps completed!") | |
| """ | |
| def __init__(self): | |
| """Initialize the executor.""" | |
| self.execution_log: List[str] = [] | |
| self.step_results: Dict[int, Any] = {} | |
| self.execution_successful: bool = False | |
| self.current_step: int = 0 | |
| self.total_steps: int = 0 | |
| self.start_time: Optional[float] = None | |
| self.end_time: Optional[float] = None | |
| def execute_plan( | |
| self, | |
| plan: List[Dict[str, Any]], | |
| progress_callback: Optional[Callable[[str], None]] = None, | |
| context: Optional[Dict[str, Any]] = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Execute an analysis plan step-by-step. | |
| D2-003: Method skeleton | |
| D2-004: Step-by-step execution logic | |
| D2-005: Progress tracking | |
| D2-006: Error handling | |
| Args: | |
| plan: List of steps to execute | |
| progress_callback: Optional callback for progress updates | |
| context: Optional context data (URLs, data, etc.) | |
| Returns: | |
| Dictionary with execution results: | |
| - success: Boolean indicating if execution completed | |
| - results: Results from each step | |
| - execution_time: Total time taken | |
| - logs: Execution log entries | |
| Example: | |
| def show_progress(status): | |
| print(f"Progress: {status}") | |
| results = executor.execute_plan( | |
| plan=my_plan, | |
| progress_callback=show_progress | |
| ) | |
| """ | |
| # D2-005: Initialize progress tracking | |
| self.total_steps = len(plan) | |
| self.current_step = 0 | |
| self.start_time = time.time() | |
| self.execution_successful = False | |
| self.step_results = {} | |
| self.execution_log = [] | |
| self._log(f"Starting execution of {self.total_steps}-step plan") | |
| if progress_callback: | |
| progress_callback(f"Initializing executor (0/{self.total_steps} steps)") | |
| # D2-004: Execute each step sequentially | |
| for step in plan: | |
| self.current_step = step['step'] | |
| try: | |
| # D2-005: Update progress | |
| progress_pct = int((self.current_step / self.total_steps) * 100) | |
| self._log(f"Step {self.current_step}/{self.total_steps} ({progress_pct}%): {step['action']}") | |
| if progress_callback: | |
| progress_callback( | |
| f"Step {self.current_step}/{self.total_steps}: {step['action']}" | |
| ) | |
| # D2-004: Execute the step | |
| result = self._execute_step(step, context) | |
| # Store result | |
| self.step_results[self.current_step] = { | |
| 'action': step['action'], | |
| 'result': result, | |
| 'status': 'success', | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| self._log(f"β Step {self.current_step} completed: {step['action']}") | |
| except Exception as e: | |
| # D2-006: Error handling | |
| self._log(f"β Step {self.current_step} failed: {step['action']}") | |
| self._log(f" Error: {str(e)}") | |
| # Store error | |
| self.step_results[self.current_step] = { | |
| 'action': step['action'], | |
| 'result': None, | |
| 'status': 'failed', | |
| 'error': str(e), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| if progress_callback: | |
| progress_callback(f"β οΈ Step {self.current_step} failed: {str(e)}") | |
| # Decide whether to continue or stop | |
| # For now, we'll log and continue (graceful degradation) | |
| self._log(f"β οΈ Continuing with remaining steps...") | |
| # Execution complete | |
| self.end_time = time.time() | |
| execution_time = self.end_time - self.start_time | |
| # Check if all steps succeeded | |
| failed_steps = [ | |
| step_num for step_num, result in self.step_results.items() | |
| if result['status'] == 'failed' | |
| ] | |
| self.execution_successful = len(failed_steps) == 0 | |
| if self.execution_successful: | |
| self._log(f"β Execution completed successfully in {execution_time:.2f}s") | |
| else: | |
| self._log(f"β οΈ Execution completed with {len(failed_steps)} failed steps in {execution_time:.2f}s") | |
| if progress_callback: | |
| if self.execution_successful: | |
| progress_callback(f"β All {self.total_steps} steps completed!") | |
| else: | |
| progress_callback(f"β οΈ {self.total_steps - len(failed_steps)}/{self.total_steps} steps completed") | |
| # Return results | |
| return { | |
| 'success': self.execution_successful, | |
| 'results': self.step_results, | |
| 'execution_time': execution_time, | |
| 'logs': self.execution_log, | |
| 'failed_steps': failed_steps | |
| } | |
| def _execute_step( | |
| self, | |
| step: Dict[str, Any], | |
| context: Optional[Dict[str, Any]] | |
| ) -> Any: | |
| """ | |
| Execute a single step. | |
| D2-004: Core step execution logic | |
| Args: | |
| step: Step dictionary with action, params, reason | |
| context: Execution context | |
| Returns: | |
| Result from executing the step | |
| Note: | |
| This is a placeholder. In future days, we'll implement | |
| actual logic for each action type (scrape, analyze, etc.) | |
| """ | |
| action = step['action'] | |
| params = step.get('params', {}) | |
| # For now, simulate execution with a small delay | |
| # In future days, we'll add real implementations for each action | |
| time.sleep(0.1) # Simulate work | |
| # Placeholder results based on action type | |
| if action == 'scrape_reviews': | |
| return {'status': 'simulated', 'reviews_count': 500} | |
| elif action == 'discover_menu_items': | |
| return {'status': 'simulated', 'items_found': 52} | |
| elif action == 'discover_aspects': | |
| return {'status': 'simulated', 'aspects_found': 7} | |
| elif action == 'analyze_sentiment': | |
| return {'status': 'simulated', 'overall_sentiment': 0.73} | |
| else: | |
| return {'status': 'simulated', 'action': action} | |
| def _log(self, message: str) -> None: | |
| """ | |
| Log execution progress. | |
| Args: | |
| message: Log message | |
| """ | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = f"[{timestamp}] {message}" | |
| self.execution_log.append(log_entry) | |
| print(f"βοΈ {log_entry}") | |
| def get_execution_summary(self) -> Dict[str, Any]: | |
| """ | |
| Get a summary of the execution. | |
| Returns: | |
| Dictionary with summary info | |
| """ | |
| if not self.start_time: | |
| return {'status': 'not_started'} | |
| execution_time = (self.end_time - self.start_time) if self.end_time else 0 | |
| return { | |
| 'total_steps': self.total_steps, | |
| 'completed_steps': len(self.step_results), | |
| 'successful_steps': sum(1 for r in self.step_results.values() if r['status'] == 'success'), | |
| 'failed_steps': sum(1 for r in self.step_results.values() if r['status'] == 'failed'), | |
| 'execution_time': f"{execution_time:.2f}s", | |
| 'success': self.execution_successful | |
| } | |
| # D2-007: Test execution with sample plan | |
| if __name__ == "__main__": | |
| print("=" * 70) | |
| print("D2-007: Testing Agent Executor with Sample Plan") | |
| print("=" * 70 + "\n") | |
| # Create a sample plan (similar to what planner generates) | |
| sample_plan = [ | |
| { | |
| 'step': 1, | |
| 'action': 'scrape_reviews', | |
| 'params': {'url': 'https://opentable.ca/r/test-restaurant'}, | |
| 'reason': 'Need review data' | |
| }, | |
| { | |
| 'step': 2, | |
| 'action': 'discover_menu_items', | |
| 'params': {'reviews': 'scraped_data'}, | |
| 'reason': 'Discover menu items dynamically' | |
| }, | |
| { | |
| 'step': 3, | |
| 'action': 'discover_aspects', | |
| 'params': {'reviews': 'scraped_data'}, | |
| 'reason': 'Discover relevant aspects' | |
| }, | |
| { | |
| 'step': 4, | |
| 'action': 'analyze_sentiment', | |
| 'params': {'reviews': 'scraped_data'}, | |
| 'reason': 'Calculate sentiment scores' | |
| }, | |
| { | |
| 'step': 5, | |
| 'action': 'generate_insights_chef', | |
| 'params': {'analysis': 'results'}, | |
| 'reason': 'Create chef summary' | |
| } | |
| ] | |
| # Create executor | |
| executor = AgentExecutor() | |
| # Define progress callback | |
| def show_progress(status): | |
| print(f"π {status}") | |
| # Execute the plan | |
| print("Starting execution...\n") | |
| results = executor.execute_plan( | |
| plan=sample_plan, | |
| progress_callback=show_progress | |
| ) | |
| # Display results | |
| print("\n" + "=" * 70) | |
| print("EXECUTION RESULTS") | |
| print("=" * 70) | |
| print(f"\nSuccess: {results['success']}") | |
| print(f"Execution time: {results['execution_time']:.2f}s") | |
| print(f"Steps completed: {len(results['results'])}/{len(sample_plan)}") | |
| if results['failed_steps']: | |
| print(f"Failed steps: {results['failed_steps']}") | |
| print("\nStep Results:") | |
| for step_num, result in results['results'].items(): | |
| status_icon = "β " if result['status'] == 'success' else "β" | |
| print(f" {status_icon} Step {step_num}: {result['action']} - {result['status']}") | |
| # Get summary | |
| print("\n" + "=" * 70) | |
| print("EXECUTION SUMMARY") | |
| print("=" * 70) | |
| summary = executor.get_execution_summary() | |
| for key, value in summary.items(): | |
| print(f" {key}: {value}") | |
| print("\n" + "=" * 70) | |
| print("π Executor test complete!") | |
| print("=" * 70) |