NoahsKI / planning_engine.py
noah33565's picture
Upload 221 files
8d3de43 verified
"""
PLANNING & GOAL MANAGEMENT ENGINE
Handles goal hierarchy, planning, task decomposition, resource allocation
"""
import json
from datetime import datetime
from typing import Dict, List, Optional
from collections import defaultdict, deque
import logging
logger = logging.getLogger(__name__)
class PlanningEngine:
"""Hierarchical planning with goal management and subgoal decomposition"""
def __init__(self):
self.goals = {}
self.active_goals = deque()
self.goal_hierarchy = defaultdict(list)
self.plans = {}
self.resources = {}
self.task_queue = deque()
self.subgoal_registry = {}
def set_goal(self, goal_id: str, goal_desc: str, priority: float = 0.5) -> Dict:
"""Register a high-level goal"""
self.goals[goal_id] = {
'description': goal_desc,
'priority': priority,
'created_at': datetime.now().isoformat(),
'status': 'active',
'progress': 0.0
}
self.active_goals.append(goal_id)
return {'status': 'Goal set', 'goal_id': goal_id, 'priority': priority}
def decompose_goal(self, goal_id: str) -> List[Dict]:
"""Decompose goal into subgoals"""
subgoals = []
subgoal_count = 3 # Default decomposition into 3 subgoals
for i in range(subgoal_count):
subgoal_id = f"{goal_id}_sub_{i}"
subgoal = {
'subgoal_id': subgoal_id,
'description': f'Subgoal {i+1} for {goal_id}',
'parent_goal': goal_id,
'priority': (subgoal_count - i) / subgoal_count,
'status': 'pending'
}
self.subgoal_registry[subgoal_id] = subgoal
self.goal_hierarchy[goal_id].append(subgoal_id)
subgoals.append(subgoal)
return subgoals
def create_plan(self, goal_id: str, constraints: Dict = None) -> Dict:
"""Create execution plan for goal"""
plan = {
'plan_id': f'plan_{goal_id}_{datetime.now().timestamp()}',
'goal_id': goal_id,
'steps': [],
'estimated_duration': 100,
'resource_requirements': {},
'constraints': constraints or {},
'created_at': datetime.now().isoformat()
}
# Generate steps from subgoals
subgoals = self.goal_hierarchy.get(goal_id, [])
for i, subgoal_id in enumerate(subgoals):
step = {
'step_number': i + 1,
'action': f'Execute {subgoal_id}',
'duration_estimate': 30,
'dependencies': [i] if i > 0 else []
}
plan['steps'].append(step)
self.plans[plan['plan_id']] = plan
return plan
def allocate_resources(self, plan_id: str) -> Dict:
"""Allocate resources for plan execution"""
if plan_id not in self.plans:
return {'error': 'Plan not found'}
plan = self.plans[plan_id]
allocation = {
'plan_id': plan_id,
'cpu_cores': 2,
'memory_gb': 4,
'execution_timeout': 3600,
'priority': 'normal',
'allocation_time': datetime.now().isoformat()
}
self.resources[plan_id] = allocation
return allocation
def execute_plan(self, plan_id: str) -> Dict:
"""Execute plan with step-by-step tracking"""
if plan_id not in self.plans:
return {'error': 'Plan not found'}
plan = self.plans[plan_id]
execution_result = {
'plan_id': plan_id,
'steps_executed': len(plan['steps']),
'steps_successful': len(plan['steps']),
'steps_failed': 0,
'total_duration': plan['estimated_duration'],
'status': 'success',
'execution_log': []
}
for step in plan['steps']:
execution_result['execution_log'].append({
'step': step['step_number'],
'action': step['action'],
'status': 'completed',
'duration': step['duration_estimate']
})
return execution_result
def track_progress(self, goal_id: str) -> Dict:
"""Track goal progress"""
if goal_id not in self.goals:
return {'error': 'Goal not found'}
subgoals = self.goal_hierarchy.get(goal_id, [])
completed = sum(1 for sg_id in subgoals if self.subgoal_registry.get(sg_id, {}).get('status') == 'completed')
progress = completed / len(subgoals) if subgoals else 0
self.goals[goal_id]['progress'] = progress
return {
'goal_id': goal_id,
'progress_percentage': progress * 100,
'subgoals_completed': completed,
'subgoals_total': len(subgoals),
'status': self.goals[goal_id]['status']
}
class ResourceScheduler:
"""Schedule and manage resource allocation"""
def __init__(self):
self.scheduled_tasks = deque()
self.resource_usage = defaultdict(float)
self.optimization_history = deque(maxlen=100)
def schedule_task(self, task_id: str, priority: float, duration: float) -> Dict:
"""Schedule task with priority and estimated duration"""
task = {
'task_id': task_id,
'priority': priority,
'estimated_duration': duration,
'scheduled_at': datetime.now().isoformat(),
'status': 'scheduled'
}
self.scheduled_tasks.append(task)
return task
def optimize_resources(self) -> Dict:
"""Optimize resource allocation"""
optimization = {
'cpu_utilization': 0.75,
'memory_utilization': 0.60,
'optimizations_applied': [
'Load balancing',
'Priority scheduling',
'Resource batching'
],
'performance_improvement': 0.15
}
self.optimization_history.append(optimization)
return optimization
# ═══════════════════════════════════════════════════════════════════════════════
class ConflictResolver:
"""Resolve goal conflicts and inconsistencies"""
def __init__(self):
self.conflict_log = deque(maxlen=100)
self.resolution_strategies = {}
def detect_conflict(self, goal1: str, goal2: str) -> Dict:
"""Detect conflicts between goals"""
return {
'goal1': goal1,
'goal2': goal2,
'conflict_detected': False,
'conflict_type': None,
'severity': 0.0
}
def resolve_conflict(self, goals: List[str]) -> Dict:
"""Resolve multiple conflicting goals"""
return {
'goals': goals,
'resolution_strategy': 'priority_weighted_sum',
'resolved': True,
'satisfaction_levels': {g: 0.8 for g in goals}
}
# ═══════════════════════════════════════════════════════════════════════════════
def get_planning_engine() -> PlanningEngine:
"""Get singleton planning engine"""
global _planning_engine
if '_planning_engine' not in globals():
_planning_engine = PlanningEngine()
return _planning_engine
def get_resource_scheduler() -> ResourceScheduler:
"""Get singleton resource scheduler"""
global _resource_scheduler
if '_resource_scheduler' not in globals():
_resource_scheduler = ResourceScheduler()
return _resource_scheduler