from matplotlib.pylab import randint from numpy import ones, zeros, random from sklearn.metrics import mean_squared_error as MSE random.seed(123) OBJECTS = { "book": {"dims": [4, 4, 2], "stack": True}, "penstand": {"dims": [2, 2, 4], "stack": True}, "bottle": {"dims": [2, 2, 6], "stack": False}, "pen": {"dims": [1, 1, 4], "stack": False}, "pencil": {"dims": [1, 1, 6], "stack": False}, "eraser": {"dims": [2, 1, 1], "stack": False}, "powerbank": {"dims": [4, 2, 1], "stack": False}, "mobile": {"dims": [4, 2, 1], "stack": False}, "laptop": {"dims": [6, 4, 1], "stack": True}, "monitor": {"dims": [6, 4, 2], "stack": False}, "keyboard": {"dims": [6, 2, 1], "stack": False}, "mouse": {"dims": [4, 2, 1], "stack": False}, "headphones": {"dims": [4, 4, 2], "stack": False}, "charger": {"dims": [2, 2, 1], "stack": False}, "notebook": {"dims": [4, 4, 1], "stack": True}, "folder": {"dims": [4, 4, 1], "stack": True}, "backpack": {"dims": [6, 4, 2], "stack": False}, "pouch": {"dims": [4, 4, 2], "stack": False}, } OBJECT_NAMES = [ "book", "penstand", "bottle", "pen", "pencil", "eraser", "powerbank", "mobile", "laptop", "monitor", "keyboard", "mouse", "headphones", "charger", "notebook", "folder", "backpack", "pouch", ] ACTION_CONFIG = { "RIGHT": [1, 0, 0], "LEFT": [-1, 0, 0], "UP": [0, 1, 0], "DOWN": [0, -1, 0], "FORWARD": [0, 0, 1], "BACKWARD": [0, 0, -1], "ROTATE": [0, 0, 0], } def appendRewardFeedback(state, choice, feedback, reward): state.rewardList.append(reward) state.rewardFeedback.append(feedback) if choice == "segment": state.rewardFeedbackSegment.append(feedback) state.rewardListSegment.append(reward) elif choice == "place": state.rewardFeedbackPlace.append(feedback) state.rewardListPlace.append(reward) elif choice == "adjust": state.rewardFeedbackAdjust.append(feedback) state.rewardListAdjust.append(reward) def initDimentions(obj): dims = obj.get("dims") if dims is None: return [] return ones(dims, dtype=int).tolist() def initGrid(): sizeX, sizeY, sizeZ = randint(8, 12), randint(8, 12), randint(8, 12) grid = zeros((sizeX, sizeY, sizeZ), dtype=int).tolist() numObjs = randint(3, len(OBJECT_NAMES) + 1) chosenNames = random.choice(OBJECT_NAMES, size=numObjs, replace=False) placed = {} for name in chosenNames: obj = OBJECTS.get(name) dimX, dimY, dimZ = obj["dims"] if dimX > sizeX or dimY > sizeY or dimZ > sizeZ: continue isPlaced = False tryPlaced = 0 while not isPlaced and tryPlaced < 100: posX = randint(0, sizeX - dimX + 1) posY = randint(0, sizeY - dimY + 1) posZ = 0 canPlace = True for i in range(dimX): for j in range(dimY): for k in range(dimZ): if ( grid[posX + i][posY + j][posZ + k] != 0 and obj["stack"] == False ): canPlace = False break else: canPlace = True if not canPlace: break if not canPlace: break if canPlace: for i in range(dimX): for j in range(dimY): for k in range(dimZ): if ( obj["stack"] and grid[posX + i][posY + j][posZ + k] > 0 and posZ + k + 1 < sizeZ ): grid[posX + i][posY + j][posZ + k + 1] += 1 else: grid[posX + i][posY + j][posZ + k] += 1 placed[name] = (posX, posY, posZ, obj["stack"]) isPlaced = True return (grid, placed) def initWeightedGrid(shape=None): if shape is None: shape = (randint(8, 12), randint(8, 12), randint(8, 12)) grid = random.uniform(0, 1, shape) x_mid = grid.shape[0] // 2 x_span = grid.shape[0] // 4 y_front = grid.shape[1] // 3 grid[x_mid - x_span : x_mid + x_span, :y_front, :] *= 0.2 return grid def _get_weight_value(weight, x, y, z): if not weight or not weight[0] or not weight[0][0]: return 0.0 if ( x < 0 or y < 0 or z < 0 or x >= len(weight) or y >= len(weight[0]) or z >= len(weight[0][0]) ): return 0.0 return weight[x][y][z] def place(segment, objects, state): dims = state.currentGrid weight = state.weightedGrid objsPresent = state.ObjectsPresent reward = 0.0 totalObjs = len(objects) reward_per_obj_placed = 45.0 / totalObjs placement_failed = False if segment: appendRewardFeedback( state, "place", "Placing objects with segmentation is not allowed.", -60.0 ) return (-60.0, True) for obj_name, pos in objects.items(): obj = OBJECTS.get(obj_name) if obj is None: appendRewardFeedback( state, "place", f"Object '{obj_name}' is not recognized.", -reward_per_obj_placed, ) reward -= reward_per_obj_placed continue objGrid = initDimentions(obj) for i in range(len(objGrid)): for j in range(len(objGrid[0])): for k in range(len(objGrid[0][0])): if ( pos[0] + i >= len(dims) or pos[1] + j >= len(dims[0]) or pos[2] + k >= len(dims[0][0]) ): reward -= reward_per_obj_placed appendRewardFeedback( state, "place", f"Object '{obj_name}' placement is out of bounds.", -reward_per_obj_placed, ) placement_failed = True break if dims[pos[0] + i][pos[1] + j][pos[2] + k] > 0 and pos[3] == False: reward -= reward_per_obj_placed appendRewardFeedback( state, "place", f"Object '{obj_name}' placement overlaps with another object and stacking is not allowed.", -reward_per_obj_placed, ) placement_failed = True break elif ( dims[pos[0] + i][pos[1] + j][pos[2] + k] > 0 and pos[3] == True ): if pos[2] + k + 1 < len(dims[0][0]): dims[pos[0] + i][pos[1] + j][pos[2] + k + 1] += 1 bonus = ( _get_weight_value( weight, pos[0] + i, pos[1] + j, pos[2] + k + 1, ) * reward_per_obj_placed ) reward += bonus appendRewardFeedback( state, "place", f"Object '{obj_name}' placed with stacking. Bonus: {bonus:.2f}", bonus, ) else: reward -= reward_per_obj_placed appendRewardFeedback( state, "place", f"Object '{obj_name}' placement failed. No space for stacking.", -reward_per_obj_placed, ) placement_failed = True break else: dims[pos[0] + i][pos[1] + j][pos[2] + k] = 1 bonus = reward_per_obj_placed * _get_weight_value( weight, pos[0] + i, pos[1] + j, pos[2] + k ) reward += bonus appendRewardFeedback( state, "place", f"Object '{obj_name}' placed successfully. Bonus: {bonus:.2f}", bonus, ) if placement_failed: break if placement_failed: break if not placement_failed: state.ObjectsPlaced[obj_name] = pos state.numberPlaced += 1 try: if objsPresent[obj_name] == state.ObjectsPlaced[obj_name]: reward -= 45.0 / totalObjs appendRewardFeedback( state, "place", f"Object '{obj_name}' is being placed in the same location", -reward_per_obj_placed, ) except KeyError: reward -= reward_per_obj_placed appendRewardFeedback( state, "place", f"Object '{obj_name}' is present in the environment, but is placed in same location as originally found.", -reward_per_obj_placed, ) continue return (reward, placement_failed) def findobject(segment, objects, state): if not segment or segment is None: appendRewardFeedback( state, "segment", "Finding objects without segmentation is not allowed.", -60.0, ) return -60.0 if set(state.objectsFound) == set(state.ObjectsPresent.keys()): appendRewardFeedback( state, "segment", "No point in finding more objects as all are already found. Make the isSegement attribute false and execute the place method.", -60.0, ) return -60.0 reward = 0.0 glMetric = 45.0 / len(state.ObjectsPresent) objs = [] for obj_found, pos_found in objects.items(): pos_real = state.ObjectsPresent.get(obj_found) if pos_real is None: reward -= glMetric appendRewardFeedback( state, "segment", f"Object '{obj_found}' not found in the environment.", -glMetric, ) continue if pos_found == pos_real: reward += glMetric appendRewardFeedback( state, "segment", f"Object '{obj_found}' found with correct position and stacking.", glMetric, ) objs.append(obj_found) else: mse = MSE(pos_real[:3], pos_found[:3]) reward -= mse appendRewardFeedback( state, "segment", f"Object '{obj_found}' found with incorrect position. MSE: {mse:.2f}", -mse, ) if pos_found[3] != pos_real[3]: reward -= glMetric / 4.0 appendRewardFeedback( state, "segment", f"Object '{obj_found}' found with incorrect stacking. Penalty: {glMetric / 4.0}", -glMetric / 4.0, ) else: reward += glMetric / 4.0 appendRewardFeedback( state, "segment", f"Object '{obj_found}' found with correct stacking. Bonus: {glMetric / 4.0}", glMetric / 4.0, ) for obj in objs: if obj in state.objectsLeft: state.objectsLeft.remove(obj) if obj not in state.objectsFound: state.objectsFound.append(obj) return reward def _remove_object(state, obj_name): reward = 0 try: pos = state.ObjectsPlaced.pop(obj_name) except KeyError: reward -= 45.0 / len(state.ObjectsPresent) appendRewardFeedback( state, "adjust", f"Object '{obj_name}' is not placed in the environment.", -reward, ) return reward state.numberPlaced -= 1 dims = state.currentGrid obj = OBJECTS.get(obj_name) objGrid = initDimentions(obj) for i in range(len(objGrid)): for j in range(len(objGrid[0])): for k in range(len(objGrid[0][0])): if dims[pos[0] + i][pos[1] + j][pos[2] + k] > 0: dims[pos[0] + i][pos[1] + j][pos[2] + k] -= 1 def _adjustment_helper(state, name, pos, change, direction): _remove_object(state, name) if direction == "ROTATE": newPos = (pos[1], pos[0], pos[2], pos[3]) else: newPos = (pos[0] + change[0], pos[1] + change[1], pos[2] + change[2], pos[3]) reward, isNotPlaced = place(False, {name: newPos}, state) if isNotPlaced: dummyReward = place(False, {name: pos}, state)[0] appendRewardFeedback( state, "adjust", f"Failed to adjust object '{name}' in direction {direction}. Reverting to original position.", -dummyReward, ) return -dummyReward appendRewardFeedback( state, "adjust", f"Object '{name}' moved {direction} successfully.", reward, ) return reward def adjustment(segment, action, state): objsPlaced = state.ObjectsPlaced if segment: appendRewardFeedback( state, "adjust", "Placing objects with segmentation is not allowed.", -60.0 ) return -60.0 try: initPos = objsPlaced[action[0]] name = action[0] except KeyError: reward_per_obj_placed = 45.0 / len(state.ObjectsPresent) appendRewardFeedback( state, "adjust", f"Object '{action[0]}' is not placed in the environment, so it cannot be adjusted.", -reward_per_obj_placed, ) return -reward_per_obj_placed if action[1] in ACTION_CONFIG: reward = _adjustment_helper( state, name, initPos, ACTION_CONFIG.get(action[1]), action[1] ) return reward else: reward_per_obj_placed = 45.0 / len(state.ObjectsPresent) appendRewardFeedback( state, "adjust", f"Invalid adjustment direction '{action[1]}'. Valid directions are RIGHT, LEFT, UP, DOWN, FORWARD, BACKWARD, ROTATE.", -reward_per_obj_placed, ) return -reward_per_obj_placed