from uuid import uuid4 from openenv.core.env_server.interfaces import Environment try: from ..models import AppAction, AppObservation, AppState except ImportError: from models import AppAction, AppObservation, AppState try: from ..utils import * except ImportError: from utils import * class AppEnvironment(Environment): SUPPORTS_CONCURRENT_SESSIONS: bool = True MAX_STEPS: int = 20 def __init__(self): self._state = self._new_state() self._reset_count = 0 def _coerce_state(self) -> AppState: if isinstance(self._state, AppState): return self._state if isinstance(self._state, dict): self._state = AppState(**self._state) return self._state self._state = self._new_state() return self._state def _new_state(self) -> AppState: grid, placed = initGrid() grid_shape = (len(grid), len(grid[0]), len(grid[0][0])) return AppState( episode_id=str(uuid4()), step_count=0, currentGrid=grid, weightedGrid=initWeightedGrid(grid_shape), objectsLeft=list(placed.keys()), objectsFound=[], reward=0.0, isDone=False, ObjectsPresent=placed, ObjectsPlaced={}, rewardFeedback=[], rewardList=[], numberPlaced=0, rewardListSegment=[], rewardFeedbackSegment=[], rewardListPlace=[], rewardFeedbackPlace=[], rewardListAdjust=[], rewardFeedbackAdjust=[], ) def reset(self) -> AppObservation: self._state = self._new_state() return AppObservation( currentGrid=self._state.currentGrid, positions=self._state.ObjectsPresent, objectsLeft=self._state.objectsLeft, objectsFound=self._state.objectsFound, reward=self._state.reward, isDone=self._state.isDone, rewardFeedback=self._state.rewardFeedback, rewardList=self._state.rewardList, numberPlaced=self._state.numberPlaced, ObjectsPlaced=self._state.ObjectsPlaced, rewardListSegment=self._state.rewardListSegment, rewardFeedbackSegment=self._state.rewardFeedbackSegment, rewardListPlace=self._state.rewardListPlace, rewardFeedbackPlace=self._state.rewardFeedbackPlace, rewardListAdjust=self._state.rewardListAdjust, rewardFeedbackAdjust=self._state.rewardFeedbackAdjust, ) def step(self, action: AppAction) -> AppObservation: state = self._coerce_state() if state.isDone: return AppObservation( currentGrid=state.currentGrid, positions=state.ObjectsPresent, objectsLeft=state.objectsLeft, objectsFound=state.objectsFound, reward=state.reward, isDone=state.isDone, rewardFeedback=state.rewardFeedback, rewardList=state.rewardList, numberPlaced=state.numberPlaced, ObjectsPlaced=state.ObjectsPlaced, rewardListSegment=state.rewardListSegment, rewardFeedbackSegment=state.rewardFeedbackSegment, rewardListPlace=state.rewardListPlace, rewardFeedbackPlace=state.rewardFeedbackPlace, rewardListAdjust=state.rewardListAdjust, rewardFeedbackAdjust=state.rewardFeedbackAdjust, ) if isinstance(action, dict): action = AppAction(**action) state.step_count += 1 reward = 0.0 if action is None: reward -= 10.0 appendRewardFeedback( state, "", "No action is of invalid schema or format. Penalty applied.", reward, ) return AppObservation( currentGrid=state.currentGrid, positions=state.ObjectsPresent, objectsLeft=state.objectsLeft, objectsFound=state.objectsFound, reward=state.reward, isDone=state.isDone, rewardFeedback=state.rewardFeedback, rewardList=state.rewardList, numberPlaced=state.numberPlaced, ObjectsPlaced=state.ObjectsPlaced, rewardListSegment=state.rewardListSegment, rewardFeedbackSegment=state.rewardFeedbackSegment, rewardListPlace=state.rewardListPlace, rewardFeedbackPlace=state.rewardFeedbackPlace, rewardListAdjust=state.rewardListAdjust, rewardFeedbackAdjust=state.rewardFeedbackAdjust, ) if action.isSegmentation and action is not None: reward += 10.0 appendRewardFeedback(state, "segment", "Segmentation successful.", reward) if action.placement and action is not None: placement_reward, placement_failed = place( action.isSegmentation, action.placement, state ) reward += placement_reward if placement_failed: appendRewardFeedback(state, "place", "Failed to place object.", reward) else: appendRewardFeedback( state, "place", "Object placed successfully.", reward ) if action.adjust and action is not None: reward += adjustment(action.isSegmentation, action.adjust, state) appendRewardFeedback( state, "adjust", "Object adjusted successfully.", reward ) if action.findObjects and action is not None: reward += findobject(action.isSegmentation, action.findObjects, state) appendRewardFeedback(state, "segment", "Object found successfully.", reward) if ( len(state.objectsLeft) == 0 and len(state.ObjectsPresent) == state.numberPlaced ): state.isDone = True reward += 10.0 appendRewardFeedback( state, "segment", "All objects found. Episode completed!", reward ) elif state.step_count >= self.MAX_STEPS: state.isDone = True appendRewardFeedback( state, "", f"Maximum step limit of {self.MAX_STEPS} reached. Episode ended.", reward, ) state.reward += reward return AppObservation( currentGrid=state.currentGrid, positions=state.ObjectsPresent, objectsLeft=state.objectsLeft, objectsFound=state.objectsFound, reward=state.reward, isDone=state.isDone, rewardFeedback=state.rewardFeedback, rewardList=state.rewardList, numberPlaced=state.numberPlaced, ObjectsPlaced=state.ObjectsPlaced, rewardListSegment=state.rewardListSegment, rewardFeedbackSegment=state.rewardFeedbackSegment, rewardListPlace=state.rewardListPlace, rewardFeedbackPlace=state.rewardFeedbackPlace, rewardListAdjust=state.rewardListAdjust, rewardFeedbackAdjust=state.rewardFeedbackAdjust, ) @property def state(self) -> AppState: return self._coerce_state()