| from uuid import uuid4 |
|
|
| from openenv.core.env_server.interfaces import Environment |
|
|
| try: |
| from ..models import AppAction, AppObservation, AppState |
| except ImportError: |
| from models import AppAction, AppObservation, AppState |
|
|
| try: |
| from ..utils import * |
| except ImportError: |
| from utils import * |
|
|
|
|
| class AppEnvironment(Environment): |
|
|
| SUPPORTS_CONCURRENT_SESSIONS: bool = True |
| MAX_STEPS: int = 20 |
|
|
| def __init__(self): |
| self._state = self._new_state() |
| self._reset_count = 0 |
|
|
| def _coerce_state(self) -> AppState: |
|
|
| if isinstance(self._state, AppState): |
| return self._state |
|
|
| if isinstance(self._state, dict): |
| self._state = AppState(**self._state) |
| return self._state |
|
|
| self._state = self._new_state() |
| return self._state |
|
|
| def _new_state(self) -> AppState: |
| grid, placed = initGrid() |
| grid_shape = (len(grid), len(grid[0]), len(grid[0][0])) |
|
|
| return AppState( |
| episode_id=str(uuid4()), |
| step_count=0, |
| currentGrid=grid, |
| weightedGrid=initWeightedGrid(grid_shape), |
| objectsLeft=list(placed.keys()), |
| objectsFound=[], |
| reward=0.0, |
| isDone=False, |
| ObjectsPresent=placed, |
| ObjectsPlaced={}, |
| rewardFeedback=[], |
| rewardList=[], |
| numberPlaced=0, |
| rewardListSegment=[], |
| rewardFeedbackSegment=[], |
| rewardListPlace=[], |
| rewardFeedbackPlace=[], |
| rewardListAdjust=[], |
| rewardFeedbackAdjust=[], |
| ) |
|
|
| def reset(self) -> AppObservation: |
| self._state = self._new_state() |
|
|
| return AppObservation( |
| currentGrid=self._state.currentGrid, |
| positions=self._state.ObjectsPresent, |
| objectsLeft=self._state.objectsLeft, |
| objectsFound=self._state.objectsFound, |
| reward=self._state.reward, |
| isDone=self._state.isDone, |
| rewardFeedback=self._state.rewardFeedback, |
| rewardList=self._state.rewardList, |
| numberPlaced=self._state.numberPlaced, |
| ObjectsPlaced=self._state.ObjectsPlaced, |
| rewardListSegment=self._state.rewardListSegment, |
| rewardFeedbackSegment=self._state.rewardFeedbackSegment, |
| rewardListPlace=self._state.rewardListPlace, |
| rewardFeedbackPlace=self._state.rewardFeedbackPlace, |
| rewardListAdjust=self._state.rewardListAdjust, |
| rewardFeedbackAdjust=self._state.rewardFeedbackAdjust, |
| ) |
|
|
| def step(self, action: AppAction) -> AppObservation: |
| state = self._coerce_state() |
|
|
| if state.isDone: |
| return AppObservation( |
| currentGrid=state.currentGrid, |
| positions=state.ObjectsPresent, |
| objectsLeft=state.objectsLeft, |
| objectsFound=state.objectsFound, |
| reward=state.reward, |
| isDone=state.isDone, |
| rewardFeedback=state.rewardFeedback, |
| rewardList=state.rewardList, |
| numberPlaced=state.numberPlaced, |
| ObjectsPlaced=state.ObjectsPlaced, |
| rewardListSegment=state.rewardListSegment, |
| rewardFeedbackSegment=state.rewardFeedbackSegment, |
| rewardListPlace=state.rewardListPlace, |
| rewardFeedbackPlace=state.rewardFeedbackPlace, |
| rewardListAdjust=state.rewardListAdjust, |
| rewardFeedbackAdjust=state.rewardFeedbackAdjust, |
| ) |
|
|
| if isinstance(action, dict): |
| action = AppAction(**action) |
|
|
| state.step_count += 1 |
| reward = 0.0 |
|
|
| if action is None: |
| reward -= 10.0 |
| appendRewardFeedback( |
| state, |
| "", |
| "No action is of invalid schema or format. Penalty applied.", |
| reward, |
| ) |
| return AppObservation( |
| currentGrid=state.currentGrid, |
| positions=state.ObjectsPresent, |
| objectsLeft=state.objectsLeft, |
| objectsFound=state.objectsFound, |
| reward=state.reward, |
| isDone=state.isDone, |
| rewardFeedback=state.rewardFeedback, |
| rewardList=state.rewardList, |
| numberPlaced=state.numberPlaced, |
| ObjectsPlaced=state.ObjectsPlaced, |
| rewardListSegment=state.rewardListSegment, |
| rewardFeedbackSegment=state.rewardFeedbackSegment, |
| rewardListPlace=state.rewardListPlace, |
| rewardFeedbackPlace=state.rewardFeedbackPlace, |
| rewardListAdjust=state.rewardListAdjust, |
| rewardFeedbackAdjust=state.rewardFeedbackAdjust, |
| ) |
|
|
| if action.isSegmentation and action is not None: |
| reward += 10.0 |
| appendRewardFeedback(state, "segment", "Segmentation successful.", reward) |
|
|
| if action.placement and action is not None: |
| placement_reward, placement_failed = place( |
| action.isSegmentation, action.placement, state |
| ) |
| reward += placement_reward |
| if placement_failed: |
| appendRewardFeedback(state, "place", "Failed to place object.", reward) |
| else: |
| appendRewardFeedback( |
| state, "place", "Object placed successfully.", reward |
| ) |
|
|
| if action.adjust and action is not None: |
| reward += adjustment(action.isSegmentation, action.adjust, state) |
| appendRewardFeedback( |
| state, "adjust", "Object adjusted successfully.", reward |
| ) |
|
|
| if action.findObjects and action is not None: |
| reward += findobject(action.isSegmentation, action.findObjects, state) |
| appendRewardFeedback(state, "segment", "Object found successfully.", reward) |
|
|
| if ( |
| len(state.objectsLeft) == 0 |
| and len(state.ObjectsPresent) == state.numberPlaced |
| ): |
| state.isDone = True |
| reward += 10.0 |
| appendRewardFeedback( |
| state, "segment", "All objects found. Episode completed!", reward |
| ) |
| elif state.step_count >= self.MAX_STEPS: |
| state.isDone = True |
| appendRewardFeedback( |
| state, |
| "", |
| f"Maximum step limit of {self.MAX_STEPS} reached. Episode ended.", |
| reward, |
| ) |
|
|
| state.reward += reward |
|
|
| return AppObservation( |
| currentGrid=state.currentGrid, |
| positions=state.ObjectsPresent, |
| objectsLeft=state.objectsLeft, |
| objectsFound=state.objectsFound, |
| reward=state.reward, |
| isDone=state.isDone, |
| rewardFeedback=state.rewardFeedback, |
| rewardList=state.rewardList, |
| numberPlaced=state.numberPlaced, |
| ObjectsPlaced=state.ObjectsPlaced, |
| rewardListSegment=state.rewardListSegment, |
| rewardFeedbackSegment=state.rewardFeedbackSegment, |
| rewardListPlace=state.rewardListPlace, |
| rewardFeedbackPlace=state.rewardFeedbackPlace, |
| rewardListAdjust=state.rewardListAdjust, |
| rewardFeedbackAdjust=state.rewardFeedbackAdjust, |
| ) |
|
|
| @property |
| def state(self) -> AppState: |
| return self._coerce_state() |
|
|