| import constants |
| from agents.agent_base import AgentBase |
| from game_states.planned_game_state import PlannedGameState |
| from utils import game_util |
|
|
|
|
| class PlanAgent(AgentBase): |
| def __init__(self, thread_id=0, game_state=None, controller_agent=None): |
| super(PlanAgent, self).__init__(thread_id, game_state) |
| assert(isinstance(game_state, PlannedGameState)) |
| self.controller_agent = controller_agent |
| self.planned = False |
|
|
| def reset(self): |
| self.planned = False |
|
|
| def execute_plan(self): |
| step_count = 0 |
| self.planned = True |
| self.controller_agent.planning = True |
| if constants.OPEN_LOOP: |
| plan = self.game_state.get_current_plan(force_update=True) |
|
|
| if plan[0]['action'] == 'End': |
| raise ValueError('Empty plan is successful, no work to do') |
|
|
| elif len(plan) == 0 or len(plan) > constants.PLANNER_MAX_STEPS: |
| print ("Planning failed. Possibly because the goal was already satisfied") |
| raise ValueError("Symbolic Planning Failed") |
|
|
| for idx, plan_action in enumerate(plan): |
| self.save_plan(plan, idx) |
|
|
| if plan_action['action'] == 'GotoLocation': |
| plan_action = self.game_state.get_teleport_action(plan_action) |
| elif plan_action['action'] == 'End': |
| break |
| self.controller_agent.step(plan_action, executing_plan=True) |
| step_count += 1 |
| if self.controller_agent.current_frame_count >= constants.MAX_EPISODE_LENGTH: |
| break |
| else: |
| past_plans = [] |
| plan = self.game_state.get_current_plan(force_update=True) |
|
|
| if plan[0]['action'] == 'End': |
| raise ValueError('Empty plan is successful, no work to do') |
| elif len(plan) == 0 or len(plan) > constants.PLANNER_MAX_STEPS: |
| print("Symbolic Planning Failed") |
| raise ValueError("Symbolic Planning Failed") |
|
|
| plan_action = plan[0] |
| if constants.USE_DETERMINISTIC_CONTROLLER: |
| |
| rotations = 0 |
| while rotations < 4 and (plan_action is None or plan_action['action'] == 'End'): |
| action = {'action': 'RotateLeft'} |
| self.controller_agent.step(action, executing_plan=True) |
| rotations += 1 |
| plan = self.game_state.get_current_plan(force_update=True) |
| plan_action = plan[0] |
|
|
| while not(plan_action is None or plan_action['action'] == 'End'): |
| self.controller_agent.step(self.game_state.get_plan_action(plan_action), executing_plan=True) |
|
|
| |
| self.save_plan(plan, 0) |
|
|
| step_count += 1 |
| past_plans.append(plan) |
| if len(past_plans) > 5: |
| past_plans = past_plans[-5:] |
| plan = self.game_state.get_current_plan(force_update=True) |
| if plan[0]['action'] == 'End': |
| break |
| if (step_count >= constants.MAX_PLANNER_STEP_COUNT or |
| self.controller_agent.current_frame_count >= constants.MAX_EPISODE_LENGTH): |
| |
| break |
| if len(plan) > 1 and any([plan == past_plan for past_plan in past_plans]): |
| plan_action = plan[0] |
| self.controller_agent.step(self.game_state.get_plan_action(plan_action), executing_plan=True) |
| step_count += 1 |
| plan = plan[1:] |
| plan_action = plan[0] |
|
|
| self.controller_agent.planning = False |
|
|
| def save_plan(self, plan, idx=0): |
| plan_action = plan[idx] |
| constants.data_dict['plan']['high_pddl'].append({"high_idx": len(constants.data_dict['plan']['high_pddl']), |
| "planner_action": plan_action, |
| "discrete_action": game_util.get_discrete_hl_action(plan, idx)}) |
| constants.data_dict['template']['high_descs'].append(game_util.get_templated_action_str(plan, idx)) |
|
|