import json import numpy as np from graph import graph_obj from gen.utils.game_util import get_objects_with_name_and_prop from env.reward import get_action class BaseTask(object): ''' base class for tasks ''' def __init__(self, traj, env, args, reward_type='sparse', max_episode_length=2000): # settings self.traj = traj self.env = env self.args = args self.task_type = self.traj['task_type'] self.max_episode_length = max_episode_length self.reward_type = reward_type self.step_num = 0 self.num_subgoals = self.get_num_subgoals(self.traj['plan']['high_pddl']) self.goal_finished = False # internal states self.goal_idx = 0 self.finished = -1 # load navigation graph self.gt_graph = None self.load_nav_graph() # reward config self.reward_config = None self.reward_config = 'models/config/rewards.json' if self.reward_config != None: self.load_reward_config(args.reward_config) self.strict = 'strict' in reward_type # prev state self.prev_state = self.env.last_event def load_reward_config(self, config_file): ''' load json file with reward values ''' with open(config_file, 'r') as rc: reward_config = json.load(rc) self.reward_config = reward_config def load_nav_graph(self): ''' build navigation grid graph ''' floor_plan = self.traj['scene']['floor_plan'] scene_num = self.traj['scene']['scene_num'] self.gt_graph = graph_obj.Graph(use_gt=True, construct_graph=True, scene_id=scene_num) def get_num_subgoals(self, high_pddl): ''' number of subgoals in high-level pddl plan ''' last_action = high_pddl[-1] # issue97: https://github.com/askforalfred/alfred/issues/97 if last_action['planner_action']['action'] == 'End': return len(high_pddl) - 1 # ignore NoOp/End action else: return len(high_pddl) def goal_satisfied(self, state): ''' check if the overall task goal was satisfied. ''' raise NotImplementedError def transition_reward(self, state): ''' immediate reward given the current state ''' reward = 0 # goal completed if self.goal_finished: done = True return reward, done # get subgoal and action expert_plan = self.traj['plan']['high_pddl'] action_type = expert_plan[self.goal_idx]['planner_action']['action'] # subgoal reward if "dense" in self.reward_type: action = get_action(action_type, self.gt_graph, self.env, self.reward_config, self.strict) sg_reward, sg_done = action.get_reward(state, self.prev_state, expert_plan, self.goal_idx) reward += sg_reward if sg_done: self.finished += 1 if self.goal_idx + 1 < self.num_subgoals: self.goal_idx += 1 # end task reward goal_finished = self.goal_satisfied(state) if goal_finished: reward += self.reward_config['Generic']['goal_reward'] self.goal_finished = True # success reward if "success" in self.reward_type and self.env.last_event.metadata['lastActionSuccess']: reward += self.reward_config['Generic']['success'] # failure reward if "failure" in self.reward_type and not self.env.last_event.metadata['lastActionSuccess']: reward += self.reward_config['Generic']['failure'] # step penalty if self.step_num > len(self.traj['plan']['low_actions']): reward += self.reward_config['Generic']['step_penalty'] # save event self.prev_state = self.env.last_event # step and check if max_episode_length reached self.step_num += 1 done = self.goal_idx >= self.num_subgoals or self.step_num >= self.max_episode_length return reward, done def reset(self): ''' Reset internal states ''' self.goal_idx = 0 self.finished = -1 self.step_num = 0 self.goal_finished = False def get_subgoal_idx(self): return self.finished def get_target(self, var): ''' returns the object type of a task param ''' return self.traj['pddl_params'][var] if self.traj['pddl_params'][var] is not None else None def get_targets(self): ''' returns a dictionary of all targets for the task ''' targets = { 'object': self.get_target('object_target'), 'parent': self.get_target('parent_target'), 'toggle': self.get_target('toggle_target'), 'mrecep': self.get_target('mrecep_target'), } # slice exception if 'object_sliced' in self.traj['pddl_params'] and self.traj['pddl_params']['object_sliced']: targets['object'] += 'Sliced' # Change, e.g., "Apple" -> "AppleSliced" as pickup target. return targets class PickAndPlaceSimpleTask(BaseTask): ''' pick_and_place task ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def goal_satisfied(self, state): # check if any object of 'object' class is inside any receptacle of 'parent' class pcs = self.goal_conditions_met(state) return pcs[0] == pcs[1] def goal_conditions_met(self, state): ts = 1 s = 0 targets = self.get_targets() receptacles = get_objects_with_name_and_prop(targets['parent'], 'receptacle', state.metadata) pickupables = get_objects_with_name_and_prop(targets['object'], 'pickupable', state.metadata) # check if object needs to be sliced if 'Sliced' in targets['object']: ts += 1 if len([p for p in pickupables if 'Sliced' in p['objectId']]) >= 1: s += 1 if np.any([np.any([p['objectId'] in r['receptacleObjectIds'] for r in receptacles if r['receptacleObjectIds'] is not None]) for p in pickupables]): s += 1 return s, ts def reset(self): super().reset() class PickTwoObjAndPlaceTask(BaseTask): ''' pick_two_obj_and_place task ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def goal_satisfied(self, state): # check if two objects of 'object' class are in any receptacle of 'parent' class pcs = self.goal_conditions_met(state) return pcs[0] == pcs[1] def goal_conditions_met(self, state): ts = 2 s = 0 targets = self.get_targets() receptacles = get_objects_with_name_and_prop(targets['parent'], 'receptacle', state.metadata) pickupables = get_objects_with_name_and_prop(targets['object'], 'pickupable', state.metadata) # check if object needs to be sliced if 'Sliced' in targets['object']: ts += 2 s += min(len([p for p in pickupables if 'Sliced' in p['objectId']]), 2) # placing each object counts as a goal_condition s += min(np.max([sum([1 if r['receptacleObjectIds'] is not None and p['objectId'] in r['receptacleObjectIds'] else 0 for p in pickupables]) for r in receptacles]), 2) return s, ts def reset(self): super().reset() class LookAtObjInLightTask(BaseTask): ''' look_at_obj_in_light task ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def goal_satisfied(self, state): # check if any object of 'object' class is being held in front of 'toggle' object that is turned on pcs = self.goal_conditions_met(state) return pcs[0] == pcs[1] def goal_conditions_met(self, state): ts = 2 s = 0 targets = self.get_targets() toggleables = get_objects_with_name_and_prop(targets['toggle'], 'toggleable', state.metadata) pickupables = get_objects_with_name_and_prop(targets['object'], 'pickupable', state.metadata) inventory_objects = state.metadata['inventoryObjects'] # check if object needs to be sliced if 'Sliced' in targets['object']: ts += 1 if len([p for p in pickupables if 'Sliced' in p['objectId']]) >= 1: s += 1 # check if the right object is in hand if len(inventory_objects) > 0 and inventory_objects[0]['objectId'] in [p['objectId'] for p in pickupables]: s += 1 # check if the lamp is visible and turned on if np.any([t['isToggled'] and t['visible'] for t in toggleables]): s += 1 return s, ts def reset(self): super().reset() class PickHeatThenPlaceInRecepTask(BaseTask): ''' pick_heat_then_place_in_recep task ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def goal_satisfied(self, state): # check if any object of 'object' class inside receptacle of 'parent' class is hot pcs = self.goal_conditions_met(state) return pcs[0] == pcs[1] def goal_conditions_met(self, state): ts = 3 s = 0 targets = self.get_targets() receptacles = get_objects_with_name_and_prop(targets['parent'], 'receptacle', state.metadata) pickupables = get_objects_with_name_and_prop(targets['object'], 'pickupable', state.metadata) # check if object needs to be sliced if 'Sliced' in targets['object']: ts += 1 if len([p for p in pickupables if 'Sliced' in p['objectId']]) >= 1: s += 1 objs_in_place = [p['objectId'] for p in pickupables for r in receptacles if r['receptacleObjectIds'] is not None and p['objectId'] in r['receptacleObjectIds']] objs_heated = [p['objectId'] for p in pickupables if p['objectId'] in self.env.heated_objects] # check if object is in the receptacle if len(objs_in_place) > 0: s += 1 # check if some object was heated if len(objs_heated) > 0: s += 1 # check if the object is both in the receptacle and hot if np.any([obj_id in objs_heated for obj_id in objs_in_place]): s += 1 return s, ts def reset(self): super().reset() class PickCoolThenPlaceInRecepTask(BaseTask): ''' pick_cool_then_place_in_recep task ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def goal_satisfied(self, state): # check if any object of 'object' class inside receptacle of 'parent' class is cold pcs = self.goal_conditions_met(state) return pcs[0] == pcs[1] def goal_conditions_met(self, state): ts = 3 s = 0 targets = self.get_targets() receptacles = get_objects_with_name_and_prop(targets['parent'], 'receptacle', state.metadata) pickupables = get_objects_with_name_and_prop(targets['object'], 'pickupable', state.metadata) if 'Sliced' in targets['object']: ts += 1 if len([p for p in pickupables if 'Sliced' in p['objectId']]) >= 1: s += 1 objs_in_place = [p['objectId'] for p in pickupables for r in receptacles if r['receptacleObjectIds'] is not None and p['objectId'] in r['receptacleObjectIds']] objs_cooled = [p['objectId'] for p in pickupables if p['objectId'] in self.env.cooled_objects] # check if object is in the receptacle if len(objs_in_place) > 0: s += 1 # check if some object was cooled if len(objs_cooled) > 0: s += 1 # check if the object is both in the receptacle and cold if np.any([obj_id in objs_cooled for obj_id in objs_in_place]): s += 1 return s, ts def reset(self): super().reset() class PickCleanThenPlaceInRecepTask(BaseTask): ''' pick_clean_then_place_in_recep task ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def goal_satisfied(self, state): # check if any object of 'object' class inside receptacle of 'parent' class is clean pcs = self.goal_conditions_met(state) return pcs[0] == pcs[1] def goal_conditions_met(self, state): ts = 3 s = 0 targets = self.get_targets() receptacles = get_objects_with_name_and_prop(targets['parent'], 'receptacle', state.metadata) pickupables = get_objects_with_name_and_prop(targets['object'], 'pickupable', state.metadata) if 'Sliced' in targets['object']: ts += 1 if len([p for p in pickupables if 'Sliced' in p['objectId']]) >= 1: s += 1 objs_in_place = [p['objectId'] for p in pickupables for r in receptacles if r['receptacleObjectIds'] is not None and p['objectId'] in r['receptacleObjectIds']] objs_cleaned = [p['objectId'] for p in pickupables if p['objectId'] in self.env.cleaned_objects] # check if object is in the receptacle if len(objs_in_place) > 0: s += 1 # check if some object was cleaned if len(objs_cleaned) > 0: s += 1 # check if the object is both in the receptacle and clean if np.any([obj_id in objs_cleaned for obj_id in objs_in_place]): s += 1 return s, ts def reset(self): super().reset() class PickAndPlaceWithMovableRecepTask(BaseTask): ''' pick_and_place_with_movable_recep task ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def goal_satisfied(self, state): # check if any object of 'object' class is inside any movable receptacle of 'mrecep' class at receptacle of 'parent' class pcs = self.goal_conditions_met(state) return pcs[0] == pcs[1] def goal_conditions_met(self, state): ts = 3 s = 0 targets = self.get_targets() receptacles = get_objects_with_name_and_prop(targets['parent'], 'receptacle', state.metadata) pickupables = get_objects_with_name_and_prop(targets['object'], 'pickupable', state.metadata) movables = get_objects_with_name_and_prop(targets['mrecep'], 'pickupable', state.metadata) # check if object needs to be sliced if 'Sliced' in targets['object']: ts += 1 if len([p for p in pickupables if 'Sliced' in p['objectId']]) >= 1: s += 1 pickup_in_place = [p for p in pickupables for m in movables if 'receptacleObjectIds' in p and m['receptacleObjectIds'] is not None and p['objectId'] in m['receptacleObjectIds']] movable_in_place = [m for m in movables for r in receptacles if 'receptacleObjectIds' in r and r['receptacleObjectIds'] is not None and m['objectId'] in r['receptacleObjectIds']] # check if the object is in the final receptacle if len(pickup_in_place) > 0: s += 1 # check if the movable receptacle is in the final receptacle if len(movable_in_place) > 0: s += 1 # check if both the object and movable receptacle stack is in the final receptacle if np.any([np.any([p['objectId'] in m['receptacleObjectIds'] for p in pickupables]) and np.any([r['objectId'] in m['parentReceptacles'] for r in receptacles]) for m in movables if m['parentReceptacles'] is not None and m['receptacleObjectIds'] is not None]): s += 1 return s, ts def reset(self): super().reset() def get_task(task_type, traj, env, args, reward_type='sparse', max_episode_length=2000): task_class_str = task_type.replace('_', ' ').title().replace(' ', '') + "Task" if task_class_str in globals(): task = globals()[task_class_str] return task(traj, env, args, reward_type=reward_type, max_episode_length=max_episode_length) else: raise Exception("Invalid task_type %s" % task_class_str)