vikash-nuvai
feat: complete tiffin packing OpenEnv environment with 3 tasks, VLM, grader, and inference
bbc1784 | # Copyright (c) 2026 CtrlAltWin Team | |
| """ | |
| Tiffin Packing Simulation Engine β Pure Logic + PyBullet Physics. | |
| This module implements the core packing simulation. It operates in two modes: | |
| 1. **Logic mode** (default): Pure Python state tracking β fast, lightweight, | |
| guaranteed to run on 2 vCPU / 8 GB RAM. Used for all OpenEnv interactions. | |
| 2. **Physics mode** (optional): PyBullet simulation with real URDF models | |
| (Kuka arm, table, containers, food cubes/spheres). Used for rendering | |
| and visual validation. | |
| The LLM agent issues high-level commands (pick, place, pour, identify), | |
| and this engine validates and executes them. | |
| """ | |
| from __future__ import annotations | |
| import math | |
| import random | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional, Tuple | |
| # --------------------------------------------------------------------------- | |
| # Data structures | |
| # --------------------------------------------------------------------------- | |
| class FoodItem: | |
| """Represents a food item on the table.""" | |
| id: int | |
| name: str | |
| food_type: str # "solid" | "liquid" | "semi-solid" | |
| volume_ml: float | |
| temperature: str # "hot" | "cold" | "room" | |
| fragility: float # 0.0 (sturdy) to 1.0 (very fragile) | |
| preferred_container: str # "sealed" | "flat" | "deep" | |
| color: str = "unknown" | |
| special_notes: str = "" | |
| status: str = "on_table" # "on_table" | "held" | "packed" | "dropped" | |
| identified: bool = False | |
| position: Tuple[float, float, float] = (0.0, 0.0, 0.0) | |
| def to_dict(self, hide_unidentified: bool = True) -> Dict[str, Any]: | |
| """Convert to observation dict. Hides properties if not yet identified.""" | |
| base = {"id": self.id, "status": self.status} | |
| if self.identified or not hide_unidentified: | |
| base.update( | |
| { | |
| "name": self.name, | |
| "food_type": self.food_type, | |
| "volume_ml": self.volume_ml, | |
| "temperature": self.temperature, | |
| "fragility": self.fragility, | |
| "preferred_container": self.preferred_container, | |
| "color": self.color, | |
| } | |
| ) | |
| else: | |
| base["name"] = f"Unknown food item #{self.id}" | |
| base["food_type"] = "unknown" | |
| base["hint"] = "Use 'identify' command to classify this item" | |
| return base | |
| class Container: | |
| """Represents a tiffin container.""" | |
| id: int | |
| name: str | |
| container_type: str # "sealed_round" | "flat_open" | "deep_box" | "small_sealed" | |
| capacity_ml: float | |
| filled_ml: float = 0.0 | |
| contents: List[str] = field(default_factory=list) | |
| content_types: List[str] = field(default_factory=list) # food types inside | |
| content_temperatures: List[str] = field(default_factory=list) | |
| content_fragilites: List[float] = field(default_factory=list) | |
| position: Tuple[float, float, float] = (0.0, 0.0, 0.0) | |
| def remaining_ml(self) -> float: | |
| return max(0, self.capacity_ml - self.filled_ml) | |
| def fill_percentage(self) -> float: | |
| return (self.filled_ml / self.capacity_ml) * 100 if self.capacity_ml > 0 else 0 | |
| def accepts_liquid(self) -> bool: | |
| """Sealed containers can hold liquids.""" | |
| return "sealed" in self.container_type | |
| def is_flat(self) -> bool: | |
| return "flat" in self.container_type | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "id": self.id, | |
| "name": self.name, | |
| "type": self.container_type, | |
| "capacity_ml": self.capacity_ml, | |
| "filled_ml": round(self.filled_ml, 1), | |
| "remaining_ml": round(self.remaining_ml, 1), | |
| "fill_percentage": round(self.fill_percentage, 1), | |
| "contents": self.contents, | |
| "accepts_liquid": self.accepts_liquid, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Compatibility rules | |
| # --------------------------------------------------------------------------- | |
| CONTAINER_TYPE_COMPATIBILITY = { | |
| # food_type -> set of compatible container_types | |
| "liquid": {"sealed_round", "small_sealed"}, | |
| "semi-solid": {"sealed_round", "small_sealed", "deep_box"}, | |
| "solid": {"sealed_round", "flat_open", "deep_box", "small_sealed"}, | |
| } | |
| def is_type_compatible(food_type: str, container_type: str) -> bool: | |
| """Check if a food type is compatible with a container type.""" | |
| compatible = CONTAINER_TYPE_COMPATIBILITY.get(food_type, set()) | |
| return container_type in compatible | |
| # --------------------------------------------------------------------------- | |
| # Main simulation engine | |
| # --------------------------------------------------------------------------- | |
| class PackingSimulation: | |
| """ | |
| Pure-logic tiffin packing simulation. | |
| Models a robotic arm, food items, and containers as data structures. | |
| Validates all actions against physical constraints (volume, type | |
| compatibility, temperature zones, fragility ordering). | |
| """ | |
| def __init__(self): | |
| self.arm_state: str = "idle" # "idle" | "holding" | |
| self.held_item: Optional[FoodItem] = None | |
| self.food_items: List[FoodItem] = [] | |
| self.containers: List[Container] = [] | |
| self.packing_log: List[Dict[str, Any]] = [] | |
| self._step_count: int = 0 | |
| def reset( | |
| self, | |
| food_items: List[FoodItem], | |
| containers: List[Container], | |
| seed: Optional[int] = None, | |
| ): | |
| """Initialize simulation with food items and containers.""" | |
| if seed is not None: | |
| random.seed(seed) | |
| self.arm_state = "idle" | |
| self.held_item = None | |
| self.food_items = food_items | |
| self.containers = containers | |
| self.packing_log = [] | |
| self._step_count = 0 | |
| # Randomize positions on table | |
| for i, item in enumerate(self.food_items): | |
| angle = (2 * math.pi * i) / max(len(self.food_items), 1) | |
| item.position = ( | |
| 0.3 * math.cos(angle), | |
| 0.3 * math.sin(angle), | |
| 0.65, # table height | |
| ) | |
| for i, container in enumerate(self.containers): | |
| container.position = ( | |
| -0.4 + 0.25 * i, | |
| 0.5, | |
| 0.65, | |
| ) | |
| # ------------------------------------------------------------------- | |
| # Actions | |
| # ------------------------------------------------------------------- | |
| def observe(self) -> Tuple[bool, str, float]: | |
| """Get detailed scene description. Returns (success, feedback, reward).""" | |
| desc = self.get_scene_description() | |
| return True, desc, 0.05 # small reward for observing | |
| def identify(self, item_id: int) -> Tuple[bool, str, float, Optional[Dict]]: | |
| """ | |
| Classify a food item using VLM. | |
| Returns (success, feedback, reward, vlm_result_or_None). | |
| """ | |
| item = self._find_item(item_id) | |
| if item is None: | |
| return False, f"No food item with ID {item_id} found.", -0.1, None | |
| if item.status == "packed": | |
| return ( | |
| False, | |
| f"Item #{item_id} ({item.name}) is already packed.", | |
| -0.05, | |
| None, | |
| ) | |
| if item.identified: | |
| # Re-identifying is allowed but gives no reward | |
| vlm_result = { | |
| "name": item.name, | |
| "type": item.food_type, | |
| "fragility": item.fragility, | |
| "preferred_container": item.preferred_container, | |
| "volume_ml": item.volume_ml, | |
| "temperature": item.temperature, | |
| "color": item.color, | |
| "special_notes": item.special_notes, | |
| } | |
| return ( | |
| True, | |
| f"Item #{item_id} already identified as '{item.name}'. {item.special_notes}", | |
| 0.0, | |
| vlm_result, | |
| ) | |
| # First-time identification | |
| item.identified = True | |
| vlm_result = { | |
| "name": item.name, | |
| "type": item.food_type, | |
| "fragility": item.fragility, | |
| "preferred_container": item.preferred_container, | |
| "volume_ml": item.volume_ml, | |
| "temperature": item.temperature, | |
| "color": item.color, | |
| "special_notes": item.special_notes, | |
| } | |
| return ( | |
| True, | |
| f"VLM identified item #{item_id}: '{item.name}' β " | |
| f"type={item.food_type}, volume={item.volume_ml}ml, " | |
| f"temperature={item.temperature}, fragility={item.fragility:.1f}, " | |
| f"preferred container={item.preferred_container}. " | |
| f"Note: {item.special_notes}", | |
| 0.1, # reward for gathering information | |
| vlm_result, | |
| ) | |
| def pick(self, item_id: int) -> Tuple[bool, str, float]: | |
| """Pick up a food item. Returns (success, feedback, reward).""" | |
| if self.arm_state == "holding": | |
| return ( | |
| False, | |
| f"Arm is already holding '{self.held_item.name}'. " | |
| f"Place or pour it first before picking another item.", | |
| -0.1, | |
| ) | |
| item = self._find_item(item_id) | |
| if item is None: | |
| return False, f"No food item with ID {item_id} found.", -0.1 | |
| if item.status != "on_table": | |
| return ( | |
| False, | |
| f"Item #{item_id} ({item.name}) cannot be picked β status is '{item.status}'.", | |
| -0.1, | |
| ) | |
| # Success β pick up the item | |
| item.status = "held" | |
| self.held_item = item | |
| self.arm_state = "holding" | |
| return ( | |
| True, | |
| f"Successfully picked up item #{item_id} " | |
| f"({'identified as ' + item.name if item.identified else 'unidentified'}).", | |
| 0.3, | |
| ) | |
| def place(self, container_id: int) -> Tuple[bool, str, float]: | |
| """Place held item into container. Returns (success, feedback, reward).""" | |
| if self.arm_state != "holding" or self.held_item is None: | |
| return ( | |
| False, | |
| "Arm is not holding any item. Use 'pick' first.", | |
| -0.1, | |
| ) | |
| container = self._find_container(container_id) | |
| if container is None: | |
| return False, f"No container with ID {container_id} found.", -0.1 | |
| item = self.held_item | |
| reward = 0.0 | |
| feedback_parts = [] | |
| # --- Check type compatibility --- | |
| type_ok = is_type_compatible(item.food_type, container.container_type) | |
| if not type_ok: | |
| reward -= 1.5 | |
| feedback_parts.append( | |
| f"WARNING: {item.food_type} food in {container.container_type} " | |
| f"container is incompatible! (e.g. liquid will spill from open container)" | |
| ) | |
| # --- Check volume overflow --- | |
| if item.volume_ml > container.remaining_ml: | |
| overflow = item.volume_ml - container.remaining_ml | |
| reward -= 1.0 | |
| feedback_parts.append( | |
| f"WARNING: Overflow! Item needs {item.volume_ml}ml but container " | |
| f"only has {container.remaining_ml:.0f}ml remaining. " | |
| f"Overflow of {overflow:.0f}ml!" | |
| ) | |
| # --- Check temperature mixing --- | |
| if container.content_temperatures: | |
| existing_temps = set(container.content_temperatures) | |
| if item.temperature == "hot" and "cold" in existing_temps: | |
| reward -= 0.5 | |
| feedback_parts.append( | |
| "WARNING: Placing hot food with cold items! " | |
| "Temperature contamination will occur." | |
| ) | |
| elif item.temperature == "cold" and "hot" in existing_temps: | |
| reward -= 0.5 | |
| feedback_parts.append( | |
| "WARNING: Placing cold food with hot items! " | |
| "Temperature contamination will occur." | |
| ) | |
| # --- Check fragility --- | |
| if container.content_fragilites and item.fragility < 0.5: | |
| # Placing heavy/sturdy item β check if fragile items are under | |
| max_existing_fragility = max(container.content_fragilites) | |
| if max_existing_fragility > 0.6: | |
| reward -= 0.3 | |
| feedback_parts.append( | |
| f"WARNING: Placing sturdy item on top of fragile item " | |
| f"(fragility {max_existing_fragility:.1f}) β may crush it!" | |
| ) | |
| # --- Positive rewards --- | |
| if type_ok: | |
| reward += 1.5 # correct container type | |
| if container.container_type == item.preferred_container or ( | |
| item.preferred_container in container.container_type | |
| ): | |
| reward += 0.5 # preferred container bonus | |
| feedback_parts.append("Great choice β matches preferred container type!") | |
| if item.volume_ml <= container.remaining_ml: | |
| # Good volume fit | |
| utilization = item.volume_ml / container.capacity_ml | |
| reward += 0.3 * utilization # reward proportional to space usage | |
| # --- Execute placement --- | |
| container.filled_ml += item.volume_ml | |
| container.contents.append(item.name) | |
| container.content_types.append(item.food_type) | |
| container.content_temperatures.append(item.temperature) | |
| container.content_fragilites.append(item.fragility) | |
| item.status = "packed" | |
| self.held_item = None | |
| self.arm_state = "idle" | |
| # Log the placement | |
| self.packing_log.append( | |
| { | |
| "food_name": item.name, | |
| "food_id": item.id, | |
| "food_type": item.food_type, | |
| "food_volume": item.volume_ml, | |
| "food_temperature": item.temperature, | |
| "food_fragility": item.fragility, | |
| "food_preferred_container": item.preferred_container, | |
| "container_id": container.id, | |
| "container_type": container.container_type, | |
| "container_name": container.name, | |
| "type_compatible": type_ok, | |
| "overflow": item.volume_ml > container.remaining_ml + item.volume_ml, | |
| } | |
| ) | |
| if feedback_parts: | |
| feedback = f"Placed '{item.name}' in '{container.name}'. " + " ".join( | |
| feedback_parts | |
| ) | |
| else: | |
| feedback = ( | |
| f"Placed '{item.name}' in '{container.name}'. " | |
| f"Container now {container.fill_percentage:.0f}% full." | |
| ) | |
| return True, feedback, round(reward, 2) | |
| def pour(self, container_id: int) -> Tuple[bool, str, float]: | |
| """Pour liquid from held item into container. Returns (success, feedback, reward).""" | |
| if self.arm_state != "holding" or self.held_item is None: | |
| return False, "Arm is not holding any item. Use 'pick' first.", -0.1 | |
| item = self.held_item | |
| # Only liquids or semi-solids can be poured | |
| if item.food_type not in ("liquid", "semi-solid"): | |
| return ( | |
| False, | |
| f"Cannot pour '{item.name}' β it is '{item.food_type}', not a pourable item. " | |
| f"Use 'place' instead.", | |
| -0.1, | |
| ) | |
| # Pour is functionally same as place but gives extra reward for liquids | |
| success, feedback, reward = self.place(container_id) | |
| if success: | |
| reward += 0.2 # bonus for correctly using pour for liquids | |
| feedback = feedback.replace("Placed", "Poured") | |
| return success, feedback, reward | |
| # ------------------------------------------------------------------- | |
| # Scene description | |
| # ------------------------------------------------------------------- | |
| def get_scene_description(self) -> str: | |
| """Generate natural language description of the current scene.""" | |
| lines = [] | |
| lines.append("=" * 60) | |
| lines.append("TIFFIN PACKING SCENE") | |
| lines.append("=" * 60) | |
| # Arm state | |
| lines.append("") | |
| lines.append("π€ ROBOTIC ARM STATUS:") | |
| if self.arm_state == "holding" and self.held_item: | |
| item = self.held_item | |
| if item.identified: | |
| lines.append( | |
| f" Currently holding: {item.name} " | |
| f"(type={item.food_type}, volume={item.volume_ml}ml)" | |
| ) | |
| else: | |
| lines.append(f" Currently holding: Unknown food item #{item.id}") | |
| else: | |
| lines.append(" Arm is idle β ready to pick up an item") | |
| # Food items on table | |
| on_table = [i for i in self.food_items if i.status == "on_table"] | |
| packed = [i for i in self.food_items if i.status == "packed"] | |
| lines.append("") | |
| lines.append(f"π FOOD ITEMS ON TABLE ({len(on_table)} remaining, {len(packed)} packed):") | |
| for item in self.food_items: | |
| status_icon = {"on_table": "β¬", "held": "π€", "packed": "β ", "dropped": "β"}.get( | |
| item.status, "?" | |
| ) | |
| if item.identified: | |
| lines.append( | |
| f" {status_icon} [{item.id}] {item.name} β " | |
| f"type={item.food_type}, volume={item.volume_ml}ml, " | |
| f"temp={item.temperature}, fragility={item.fragility:.1f}, " | |
| f"preferred={item.preferred_container}" | |
| ) | |
| else: | |
| lines.append( | |
| f" {status_icon} [{item.id}] Unknown food item " | |
| f"(use 'identify' to classify)" | |
| ) | |
| # Containers | |
| lines.append("") | |
| lines.append("π± TIFFIN CONTAINERS:") | |
| for c in self.containers: | |
| bar_len = 20 | |
| filled_bars = int((c.fill_percentage / 100) * bar_len) | |
| bar = "β" * filled_bars + "β" * (bar_len - filled_bars) | |
| lines.append( | |
| f" [{c.id}] {c.name} ({c.container_type}) β " | |
| f"[{bar}] {c.fill_percentage:.0f}% " | |
| f"({c.filled_ml:.0f}/{c.capacity_ml:.0f}ml)" | |
| ) | |
| if c.contents: | |
| lines.append(f" Contains: {', '.join(c.contents)}") | |
| liquid_note = "β Can hold liquids" if c.accepts_liquid else "β οΈ Open β no liquids" | |
| lines.append(f" {liquid_note}") | |
| lines.append("") | |
| lines.append("=" * 60) | |
| return "\n".join(lines) | |
| def get_available_commands(self) -> List[str]: | |
| """Return list of valid commands given current state.""" | |
| commands = ["observe"] | |
| unpacked = [i for i in self.food_items if i.status == "on_table"] | |
| unidentified = [i for i in self.food_items if not i.identified and i.status != "packed"] | |
| if unidentified: | |
| commands.append("identify") | |
| if self.arm_state == "idle" and unpacked: | |
| commands.append("pick") | |
| if self.arm_state == "holding" and self.held_item: | |
| commands.append("place") | |
| if self.held_item.food_type in ("liquid", "semi-solid"): | |
| commands.append("pour") | |
| return commands | |
| # ------------------------------------------------------------------- | |
| # Helpers | |
| # ------------------------------------------------------------------- | |
| def _find_item(self, item_id: int) -> Optional[FoodItem]: | |
| for item in self.food_items: | |
| if item.id == item_id: | |
| return item | |
| return None | |
| def _find_container(self, container_id: int) -> Optional[Container]: | |
| for c in self.containers: | |
| if c.id == container_id: | |
| return c | |
| return None | |
| def all_packed(self) -> bool: | |
| return all(i.status == "packed" for i in self.food_items) | |
| def unpacked_count(self) -> int: | |
| return sum(1 for i in self.food_items if i.status != "packed") | |