| """
|
| PLANNING & GOAL MANAGEMENT ENGINE
|
| Handles goal hierarchy, planning, task decomposition, resource allocation
|
| """
|
|
|
| import json
|
| from datetime import datetime
|
| from typing import Dict, List, Optional
|
| from collections import defaultdict, deque
|
| import logging
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class PlanningEngine:
|
| """Hierarchical planning with goal management and subgoal decomposition"""
|
|
|
| def __init__(self):
|
| self.goals = {}
|
| self.active_goals = deque()
|
| self.goal_hierarchy = defaultdict(list)
|
| self.plans = {}
|
| self.resources = {}
|
| self.task_queue = deque()
|
| self.subgoal_registry = {}
|
|
|
| def set_goal(self, goal_id: str, goal_desc: str, priority: float = 0.5) -> Dict:
|
| """Register a high-level goal"""
|
| self.goals[goal_id] = {
|
| 'description': goal_desc,
|
| 'priority': priority,
|
| 'created_at': datetime.now().isoformat(),
|
| 'status': 'active',
|
| 'progress': 0.0
|
| }
|
| self.active_goals.append(goal_id)
|
| return {'status': 'Goal set', 'goal_id': goal_id, 'priority': priority}
|
|
|
| def decompose_goal(self, goal_id: str) -> List[Dict]:
|
| """Decompose goal into subgoals"""
|
| subgoals = []
|
| subgoal_count = 3
|
|
|
| for i in range(subgoal_count):
|
| subgoal_id = f"{goal_id}_sub_{i}"
|
| subgoal = {
|
| 'subgoal_id': subgoal_id,
|
| 'description': f'Subgoal {i+1} for {goal_id}',
|
| 'parent_goal': goal_id,
|
| 'priority': (subgoal_count - i) / subgoal_count,
|
| 'status': 'pending'
|
| }
|
| self.subgoal_registry[subgoal_id] = subgoal
|
| self.goal_hierarchy[goal_id].append(subgoal_id)
|
| subgoals.append(subgoal)
|
|
|
| return subgoals
|
|
|
| def create_plan(self, goal_id: str, constraints: Dict = None) -> Dict:
|
| """Create execution plan for goal"""
|
| plan = {
|
| 'plan_id': f'plan_{goal_id}_{datetime.now().timestamp()}',
|
| 'goal_id': goal_id,
|
| 'steps': [],
|
| 'estimated_duration': 100,
|
| 'resource_requirements': {},
|
| 'constraints': constraints or {},
|
| 'created_at': datetime.now().isoformat()
|
| }
|
|
|
|
|
| subgoals = self.goal_hierarchy.get(goal_id, [])
|
| for i, subgoal_id in enumerate(subgoals):
|
| step = {
|
| 'step_number': i + 1,
|
| 'action': f'Execute {subgoal_id}',
|
| 'duration_estimate': 30,
|
| 'dependencies': [i] if i > 0 else []
|
| }
|
| plan['steps'].append(step)
|
|
|
| self.plans[plan['plan_id']] = plan
|
| return plan
|
|
|
| def allocate_resources(self, plan_id: str) -> Dict:
|
| """Allocate resources for plan execution"""
|
| if plan_id not in self.plans:
|
| return {'error': 'Plan not found'}
|
|
|
| plan = self.plans[plan_id]
|
| allocation = {
|
| 'plan_id': plan_id,
|
| 'cpu_cores': 2,
|
| 'memory_gb': 4,
|
| 'execution_timeout': 3600,
|
| 'priority': 'normal',
|
| 'allocation_time': datetime.now().isoformat()
|
| }
|
|
|
| self.resources[plan_id] = allocation
|
| return allocation
|
|
|
| def execute_plan(self, plan_id: str) -> Dict:
|
| """Execute plan with step-by-step tracking"""
|
| if plan_id not in self.plans:
|
| return {'error': 'Plan not found'}
|
|
|
| plan = self.plans[plan_id]
|
| execution_result = {
|
| 'plan_id': plan_id,
|
| 'steps_executed': len(plan['steps']),
|
| 'steps_successful': len(plan['steps']),
|
| 'steps_failed': 0,
|
| 'total_duration': plan['estimated_duration'],
|
| 'status': 'success',
|
| 'execution_log': []
|
| }
|
|
|
| for step in plan['steps']:
|
| execution_result['execution_log'].append({
|
| 'step': step['step_number'],
|
| 'action': step['action'],
|
| 'status': 'completed',
|
| 'duration': step['duration_estimate']
|
| })
|
|
|
| return execution_result
|
|
|
| def track_progress(self, goal_id: str) -> Dict:
|
| """Track goal progress"""
|
| if goal_id not in self.goals:
|
| return {'error': 'Goal not found'}
|
|
|
| subgoals = self.goal_hierarchy.get(goal_id, [])
|
| completed = sum(1 for sg_id in subgoals if self.subgoal_registry.get(sg_id, {}).get('status') == 'completed')
|
| progress = completed / len(subgoals) if subgoals else 0
|
|
|
| self.goals[goal_id]['progress'] = progress
|
|
|
| return {
|
| 'goal_id': goal_id,
|
| 'progress_percentage': progress * 100,
|
| 'subgoals_completed': completed,
|
| 'subgoals_total': len(subgoals),
|
| 'status': self.goals[goal_id]['status']
|
| }
|
|
|
|
|
| class ResourceScheduler:
|
| """Schedule and manage resource allocation"""
|
|
|
| def __init__(self):
|
| self.scheduled_tasks = deque()
|
| self.resource_usage = defaultdict(float)
|
| self.optimization_history = deque(maxlen=100)
|
|
|
| def schedule_task(self, task_id: str, priority: float, duration: float) -> Dict:
|
| """Schedule task with priority and estimated duration"""
|
| task = {
|
| 'task_id': task_id,
|
| 'priority': priority,
|
| 'estimated_duration': duration,
|
| 'scheduled_at': datetime.now().isoformat(),
|
| 'status': 'scheduled'
|
| }
|
| self.scheduled_tasks.append(task)
|
| return task
|
|
|
| def optimize_resources(self) -> Dict:
|
| """Optimize resource allocation"""
|
| optimization = {
|
| 'cpu_utilization': 0.75,
|
| 'memory_utilization': 0.60,
|
| 'optimizations_applied': [
|
| 'Load balancing',
|
| 'Priority scheduling',
|
| 'Resource batching'
|
| ],
|
| 'performance_improvement': 0.15
|
| }
|
| self.optimization_history.append(optimization)
|
| return optimization
|
|
|
|
|
|
|
|
|
| class ConflictResolver:
|
| """Resolve goal conflicts and inconsistencies"""
|
|
|
| def __init__(self):
|
| self.conflict_log = deque(maxlen=100)
|
| self.resolution_strategies = {}
|
|
|
| def detect_conflict(self, goal1: str, goal2: str) -> Dict:
|
| """Detect conflicts between goals"""
|
| return {
|
| 'goal1': goal1,
|
| 'goal2': goal2,
|
| 'conflict_detected': False,
|
| 'conflict_type': None,
|
| 'severity': 0.0
|
| }
|
|
|
| def resolve_conflict(self, goals: List[str]) -> Dict:
|
| """Resolve multiple conflicting goals"""
|
| return {
|
| 'goals': goals,
|
| 'resolution_strategy': 'priority_weighted_sum',
|
| 'resolved': True,
|
| 'satisfaction_levels': {g: 0.8 for g in goals}
|
| }
|
|
|
|
|
|
|
|
|
| def get_planning_engine() -> PlanningEngine:
|
| """Get singleton planning engine"""
|
| global _planning_engine
|
| if '_planning_engine' not in globals():
|
| _planning_engine = PlanningEngine()
|
| return _planning_engine
|
|
|
|
|
| def get_resource_scheduler() -> ResourceScheduler:
|
| """Get singleton resource scheduler"""
|
| global _resource_scheduler
|
| if '_resource_scheduler' not in globals():
|
| _resource_scheduler = ResourceScheduler()
|
| return _resource_scheduler
|
|
|