vikash-nuvai
fix: make TiffinPackingEnvironment a singleton to support stateless HTTP endpoints in openenv
bfdceab | # Copyright (c) 2026 CtrlAltWin Team | |
| """ | |
| Tiffin Packing Environment — OpenEnv Server Implementation. | |
| Wraps the packing simulation into the OpenEnv Environment base class, | |
| exposing step(), reset(), and state() for LLM agent interaction. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any, Optional | |
| from uuid import uuid4 | |
| try: | |
| from openenv.core.env_server import Environment | |
| except ImportError: | |
| # Fallback for local testing without openenv installed | |
| class Environment: | |
| def __init__(self, **kwargs): pass | |
| def reset(self, **kwargs): raise NotImplementedError | |
| def step(self, action, **kwargs): raise NotImplementedError | |
| def state(self): raise NotImplementedError | |
| from tiffin_packer.models import TiffinAction, TiffinObservation, TiffinState | |
| from tiffin_packer.simulation.engine import PackingSimulation | |
| from tiffin_packer.vlm.classifier import FoodClassifier | |
| from tiffin_packer.tasks import get_task_config, list_tasks | |
| from tiffin_packer.grader import grade, grade_detailed | |
| class TiffinPackingEnvironment(Environment): | |
| """ | |
| OpenEnv-compliant tiffin packing environment. | |
| An LLM agent controls a robotic arm to identify food items using VLM | |
| and pack them into the correct tiffin containers under real-world | |
| constraints (type compatibility, volume, temperature, fragility). | |
| Supports 3 tasks: easy, medium, hard. | |
| """ | |
| _instance = None | |
| def __new__(cls, *args, **kwargs): | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if getattr(self, "_initialized", False): | |
| return | |
| super().__init__() | |
| self.sim = PackingSimulation() | |
| self.vlm = FoodClassifier() | |
| self._state = TiffinState() | |
| self._identified_items: set = set() | |
| self._task_config = None | |
| self._initialized = True | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> TiffinObservation: | |
| """ | |
| Reset the environment for a new episode. | |
| Args: | |
| seed: Optional random seed for reproducibility. | |
| episode_id: Optional custom episode ID. | |
| **kwargs: Must include 'task_id' (easy/medium/hard). | |
| Returns: | |
| Initial TiffinObservation with scene description. | |
| """ | |
| task_id = kwargs.get("task_id", "easy") | |
| # Load task configuration | |
| self._task_config = get_task_config(task_id, seed=seed) | |
| # Reset simulation | |
| self.sim.reset( | |
| food_items=self._task_config.food_items, | |
| containers=self._task_config.containers, | |
| seed=seed, | |
| ) | |
| # Reset state | |
| self._state = TiffinState( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| task_id=task_id, | |
| items_packed=0, | |
| total_items=len(self._task_config.food_items), | |
| items_identified=0, | |
| packing_log=[], | |
| constraints_violated=[], | |
| ) | |
| self._identified_items = set() | |
| # Build initial observation | |
| return self._build_observation( | |
| reward=0.0, | |
| done=False, | |
| feedback=( | |
| f"Episode started! Task: {task_id.upper()}\n\n" | |
| f"{self._task_config.description}\n\n" | |
| f"You have {self._task_config.max_steps} steps to pack " | |
| f"{len(self._task_config.food_items)} food items into " | |
| f"{len(self._task_config.containers)} containers.\n\n" | |
| f"Start by using 'observe' to see the scene, then 'identify' " | |
| f"each food item before packing." | |
| ), | |
| ) | |
| def step( | |
| self, | |
| action: TiffinAction, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> TiffinObservation: | |
| """ | |
| Execute one step in the environment. | |
| Args: | |
| action: TiffinAction with command and optional target_id. | |
| timeout_s: Optional timeout (unused). | |
| Returns: | |
| TiffinObservation with updated scene state. | |
| """ | |
| self._state.step_count += 1 | |
| reward = 0.0 | |
| done = False | |
| vlm_result = None | |
| feedback = "" | |
| command = action.command.lower().strip() | |
| target_id = action.target_id | |
| # --- Dispatch command --- | |
| if command == "observe": | |
| _, feedback, reward = self.sim.observe() | |
| elif command == "identify": | |
| if target_id is None: | |
| feedback = "Error: 'identify' requires a target_id (food item ID)." | |
| reward = -0.1 | |
| else: | |
| success, feedback, reward, vlm_result = self.sim.identify(target_id) | |
| if success and vlm_result and vlm_result.get("name"): | |
| self._identified_items.add(target_id) | |
| self._state.items_identified = len(self._identified_items) | |
| elif command == "pick": | |
| if target_id is None: | |
| feedback = "Error: 'pick' requires a target_id (food item ID)." | |
| reward = -0.1 | |
| else: | |
| success, feedback, reward = self.sim.pick(target_id) | |
| elif command == "place": | |
| if target_id is None: | |
| feedback = "Error: 'place' requires a target_id (container ID)." | |
| reward = -0.1 | |
| else: | |
| success, feedback, reward = self.sim.place(target_id) | |
| if success: | |
| self._state.items_packed = sum( | |
| 1 | |
| for i in self.sim.food_items | |
| if i.status == "packed" | |
| ) | |
| self._state.packing_log = list(self.sim.packing_log) | |
| elif command == "pour": | |
| if target_id is None: | |
| feedback = "Error: 'pour' requires a target_id (container ID)." | |
| reward = -0.1 | |
| else: | |
| success, feedback, reward = self.sim.pour(target_id) | |
| if success: | |
| self._state.items_packed = sum( | |
| 1 | |
| for i in self.sim.food_items | |
| if i.status == "packed" | |
| ) | |
| self._state.packing_log = list(self.sim.packing_log) | |
| else: | |
| feedback = ( | |
| f"Unknown command: '{command}'. " | |
| f"Available commands: {self.sim.get_available_commands()}" | |
| ) | |
| reward = -0.1 | |
| # --- Time penalty --- | |
| reward -= 0.02 | |
| # --- Check termination --- | |
| done = ( | |
| self.sim.all_packed | |
| or self._state.step_count >= self._task_config.max_steps | |
| ) | |
| # --- Final grading --- | |
| final_score = None | |
| grade_breakdown = None | |
| if done: | |
| grade_breakdown = grade_detailed( | |
| self._state.packing_log, self._task_config | |
| ) | |
| final_score = grade_breakdown["final_score"] | |
| reward += final_score # bonus = final grade | |
| if self.sim.all_packed: | |
| feedback += f"\n\n🎉 All items packed! Final score: {final_score:.4f}" | |
| else: | |
| feedback += ( | |
| f"\n\n⏰ Time's up! {self.sim.unpacked_count} items remaining. " | |
| f"Final score: {final_score:.4f}" | |
| ) | |
| return self._build_observation( | |
| reward=reward, | |
| done=done, | |
| feedback=feedback, | |
| vlm_result=vlm_result, | |
| final_score=final_score, | |
| grade_breakdown=grade_breakdown, | |
| ) | |
| def state(self) -> TiffinState: | |
| """Return the current episode state.""" | |
| return self._state | |
| # ------------------------------------------------------------------- | |
| # Helpers | |
| # ------------------------------------------------------------------- | |
| def _build_observation( | |
| self, | |
| reward: float = 0.0, | |
| done: bool = False, | |
| feedback: str = "", | |
| vlm_result: dict = None, | |
| final_score: float = None, | |
| grade_breakdown: dict = None, | |
| ) -> TiffinObservation: | |
| """Build a TiffinObservation from current state.""" | |
| metadata = {} | |
| if final_score is not None: | |
| metadata["final_score"] = final_score | |
| if grade_breakdown is not None: | |
| metadata["grade_breakdown"] = grade_breakdown | |
| return TiffinObservation( | |
| done=done, | |
| reward=round(reward, 4), | |
| metadata=metadata, | |
| scene_description=self.sim.get_scene_description(), | |
| food_items=[ | |
| item.to_dict(hide_unidentified=True) | |
| for item in self.sim.food_items | |
| ], | |
| containers=[c.to_dict() for c in self.sim.containers], | |
| held_item=( | |
| self.sim.held_item.to_dict(hide_unidentified=False) | |
| if self.sim.held_item | |
| else None | |
| ), | |
| vlm_result=vlm_result, | |
| available_commands=self.sim.get_available_commands(), | |
| step_feedback=feedback, | |
| ) | |