TushP's picture
Upload folder using huggingface_hub
bb9baa9 verified
"""
Agent Execution Module
Executes analysis plans step-by-step with progress tracking and error handling.
The executor:
- Runs each step in the plan sequentially
- Tracks progress for each step
- Handles errors gracefully
- Logs execution details
- Stores results
UNIVERSAL DESIGN:
- Works with any plan generated by the planner
- Adapts to different restaurant types
- Provides real-time progress updates
"""
from typing import List, Dict, Any, Optional, Callable
from datetime import datetime
import time
class AgentExecutor:
"""
Executes analysis plans step-by-step.
Features:
- Sequential step execution
- Progress tracking (% complete, time remaining)
- Error handling and recovery
- Result storage
- Real-time status updates
Example:
executor = AgentExecutor()
# Execute a plan
results = executor.execute_plan(
plan=plan,
progress_callback=lambda status: print(status)
)
# Check execution status
if executor.execution_successful:
print("All steps completed!")
"""
def __init__(self):
"""Initialize the executor."""
self.execution_log: List[str] = []
self.step_results: Dict[int, Any] = {}
self.execution_successful: bool = False
self.current_step: int = 0
self.total_steps: int = 0
self.start_time: Optional[float] = None
self.end_time: Optional[float] = None
def execute_plan(
self,
plan: List[Dict[str, Any]],
progress_callback: Optional[Callable[[str], None]] = None,
context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Execute an analysis plan step-by-step.
D2-003: Method skeleton
D2-004: Step-by-step execution logic
D2-005: Progress tracking
D2-006: Error handling
Args:
plan: List of steps to execute
progress_callback: Optional callback for progress updates
context: Optional context data (URLs, data, etc.)
Returns:
Dictionary with execution results:
- success: Boolean indicating if execution completed
- results: Results from each step
- execution_time: Total time taken
- logs: Execution log entries
Example:
def show_progress(status):
print(f"Progress: {status}")
results = executor.execute_plan(
plan=my_plan,
progress_callback=show_progress
)
"""
# D2-005: Initialize progress tracking
self.total_steps = len(plan)
self.current_step = 0
self.start_time = time.time()
self.execution_successful = False
self.step_results = {}
self.execution_log = []
self._log(f"Starting execution of {self.total_steps}-step plan")
if progress_callback:
progress_callback(f"Initializing executor (0/{self.total_steps} steps)")
# D2-004: Execute each step sequentially
for step in plan:
self.current_step = step['step']
try:
# D2-005: Update progress
progress_pct = int((self.current_step / self.total_steps) * 100)
self._log(f"Step {self.current_step}/{self.total_steps} ({progress_pct}%): {step['action']}")
if progress_callback:
progress_callback(
f"Step {self.current_step}/{self.total_steps}: {step['action']}"
)
# D2-004: Execute the step
result = self._execute_step(step, context)
# Store result
self.step_results[self.current_step] = {
'action': step['action'],
'result': result,
'status': 'success',
'timestamp': datetime.now().isoformat()
}
self._log(f"βœ… Step {self.current_step} completed: {step['action']}")
except Exception as e:
# D2-006: Error handling
self._log(f"❌ Step {self.current_step} failed: {step['action']}")
self._log(f" Error: {str(e)}")
# Store error
self.step_results[self.current_step] = {
'action': step['action'],
'result': None,
'status': 'failed',
'error': str(e),
'timestamp': datetime.now().isoformat()
}
if progress_callback:
progress_callback(f"⚠️ Step {self.current_step} failed: {str(e)}")
# Decide whether to continue or stop
# For now, we'll log and continue (graceful degradation)
self._log(f"⚠️ Continuing with remaining steps...")
# Execution complete
self.end_time = time.time()
execution_time = self.end_time - self.start_time
# Check if all steps succeeded
failed_steps = [
step_num for step_num, result in self.step_results.items()
if result['status'] == 'failed'
]
self.execution_successful = len(failed_steps) == 0
if self.execution_successful:
self._log(f"βœ… Execution completed successfully in {execution_time:.2f}s")
else:
self._log(f"⚠️ Execution completed with {len(failed_steps)} failed steps in {execution_time:.2f}s")
if progress_callback:
if self.execution_successful:
progress_callback(f"βœ… All {self.total_steps} steps completed!")
else:
progress_callback(f"⚠️ {self.total_steps - len(failed_steps)}/{self.total_steps} steps completed")
# Return results
return {
'success': self.execution_successful,
'results': self.step_results,
'execution_time': execution_time,
'logs': self.execution_log,
'failed_steps': failed_steps
}
def _execute_step(
self,
step: Dict[str, Any],
context: Optional[Dict[str, Any]]
) -> Any:
"""
Execute a single step.
D2-004: Core step execution logic
Args:
step: Step dictionary with action, params, reason
context: Execution context
Returns:
Result from executing the step
Note:
This is a placeholder. In future days, we'll implement
actual logic for each action type (scrape, analyze, etc.)
"""
action = step['action']
params = step.get('params', {})
# For now, simulate execution with a small delay
# In future days, we'll add real implementations for each action
time.sleep(0.1) # Simulate work
# Placeholder results based on action type
if action == 'scrape_reviews':
return {'status': 'simulated', 'reviews_count': 500}
elif action == 'discover_menu_items':
return {'status': 'simulated', 'items_found': 52}
elif action == 'discover_aspects':
return {'status': 'simulated', 'aspects_found': 7}
elif action == 'analyze_sentiment':
return {'status': 'simulated', 'overall_sentiment': 0.73}
else:
return {'status': 'simulated', 'action': action}
def _log(self, message: str) -> None:
"""
Log execution progress.
Args:
message: Log message
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
self.execution_log.append(log_entry)
print(f"βš™οΈ {log_entry}")
def get_execution_summary(self) -> Dict[str, Any]:
"""
Get a summary of the execution.
Returns:
Dictionary with summary info
"""
if not self.start_time:
return {'status': 'not_started'}
execution_time = (self.end_time - self.start_time) if self.end_time else 0
return {
'total_steps': self.total_steps,
'completed_steps': len(self.step_results),
'successful_steps': sum(1 for r in self.step_results.values() if r['status'] == 'success'),
'failed_steps': sum(1 for r in self.step_results.values() if r['status'] == 'failed'),
'execution_time': f"{execution_time:.2f}s",
'success': self.execution_successful
}
# D2-007: Test execution with sample plan
if __name__ == "__main__":
print("=" * 70)
print("D2-007: Testing Agent Executor with Sample Plan")
print("=" * 70 + "\n")
# Create a sample plan (similar to what planner generates)
sample_plan = [
{
'step': 1,
'action': 'scrape_reviews',
'params': {'url': 'https://opentable.ca/r/test-restaurant'},
'reason': 'Need review data'
},
{
'step': 2,
'action': 'discover_menu_items',
'params': {'reviews': 'scraped_data'},
'reason': 'Discover menu items dynamically'
},
{
'step': 3,
'action': 'discover_aspects',
'params': {'reviews': 'scraped_data'},
'reason': 'Discover relevant aspects'
},
{
'step': 4,
'action': 'analyze_sentiment',
'params': {'reviews': 'scraped_data'},
'reason': 'Calculate sentiment scores'
},
{
'step': 5,
'action': 'generate_insights_chef',
'params': {'analysis': 'results'},
'reason': 'Create chef summary'
}
]
# Create executor
executor = AgentExecutor()
# Define progress callback
def show_progress(status):
print(f"πŸ“Š {status}")
# Execute the plan
print("Starting execution...\n")
results = executor.execute_plan(
plan=sample_plan,
progress_callback=show_progress
)
# Display results
print("\n" + "=" * 70)
print("EXECUTION RESULTS")
print("=" * 70)
print(f"\nSuccess: {results['success']}")
print(f"Execution time: {results['execution_time']:.2f}s")
print(f"Steps completed: {len(results['results'])}/{len(sample_plan)}")
if results['failed_steps']:
print(f"Failed steps: {results['failed_steps']}")
print("\nStep Results:")
for step_num, result in results['results'].items():
status_icon = "βœ…" if result['status'] == 'success' else "❌"
print(f" {status_icon} Step {step_num}: {result['action']} - {result['status']}")
# Get summary
print("\n" + "=" * 70)
print("EXECUTION SUMMARY")
print("=" * 70)
summary = executor.get_execution_summary()
for key, value in summary.items():
print(f" {key}: {value}")
print("\n" + "=" * 70)
print("πŸŽ‰ Executor test complete!")
print("=" * 70)