vikash-nuvai
feat: complete tiffin packing OpenEnv environment with 3 tasks, VLM, grader, and inference
bbc1784
# Copyright (c) 2026 CtrlAltWin Team
"""
Tiffin Packing Simulation Engine β€” Pure Logic + PyBullet Physics.
This module implements the core packing simulation. It operates in two modes:
1. **Logic mode** (default): Pure Python state tracking β€” fast, lightweight,
guaranteed to run on 2 vCPU / 8 GB RAM. Used for all OpenEnv interactions.
2. **Physics mode** (optional): PyBullet simulation with real URDF models
(Kuka arm, table, containers, food cubes/spheres). Used for rendering
and visual validation.
The LLM agent issues high-level commands (pick, place, pour, identify),
and this engine validates and executes them.
"""
from __future__ import annotations
import math
import random
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class FoodItem:
"""Represents a food item on the table."""
id: int
name: str
food_type: str # "solid" | "liquid" | "semi-solid"
volume_ml: float
temperature: str # "hot" | "cold" | "room"
fragility: float # 0.0 (sturdy) to 1.0 (very fragile)
preferred_container: str # "sealed" | "flat" | "deep"
color: str = "unknown"
special_notes: str = ""
status: str = "on_table" # "on_table" | "held" | "packed" | "dropped"
identified: bool = False
position: Tuple[float, float, float] = (0.0, 0.0, 0.0)
def to_dict(self, hide_unidentified: bool = True) -> Dict[str, Any]:
"""Convert to observation dict. Hides properties if not yet identified."""
base = {"id": self.id, "status": self.status}
if self.identified or not hide_unidentified:
base.update(
{
"name": self.name,
"food_type": self.food_type,
"volume_ml": self.volume_ml,
"temperature": self.temperature,
"fragility": self.fragility,
"preferred_container": self.preferred_container,
"color": self.color,
}
)
else:
base["name"] = f"Unknown food item #{self.id}"
base["food_type"] = "unknown"
base["hint"] = "Use 'identify' command to classify this item"
return base
@dataclass
class Container:
"""Represents a tiffin container."""
id: int
name: str
container_type: str # "sealed_round" | "flat_open" | "deep_box" | "small_sealed"
capacity_ml: float
filled_ml: float = 0.0
contents: List[str] = field(default_factory=list)
content_types: List[str] = field(default_factory=list) # food types inside
content_temperatures: List[str] = field(default_factory=list)
content_fragilites: List[float] = field(default_factory=list)
position: Tuple[float, float, float] = (0.0, 0.0, 0.0)
@property
def remaining_ml(self) -> float:
return max(0, self.capacity_ml - self.filled_ml)
@property
def fill_percentage(self) -> float:
return (self.filled_ml / self.capacity_ml) * 100 if self.capacity_ml > 0 else 0
@property
def accepts_liquid(self) -> bool:
"""Sealed containers can hold liquids."""
return "sealed" in self.container_type
@property
def is_flat(self) -> bool:
return "flat" in self.container_type
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"type": self.container_type,
"capacity_ml": self.capacity_ml,
"filled_ml": round(self.filled_ml, 1),
"remaining_ml": round(self.remaining_ml, 1),
"fill_percentage": round(self.fill_percentage, 1),
"contents": self.contents,
"accepts_liquid": self.accepts_liquid,
}
# ---------------------------------------------------------------------------
# Compatibility rules
# ---------------------------------------------------------------------------
CONTAINER_TYPE_COMPATIBILITY = {
# food_type -> set of compatible container_types
"liquid": {"sealed_round", "small_sealed"},
"semi-solid": {"sealed_round", "small_sealed", "deep_box"},
"solid": {"sealed_round", "flat_open", "deep_box", "small_sealed"},
}
def is_type_compatible(food_type: str, container_type: str) -> bool:
"""Check if a food type is compatible with a container type."""
compatible = CONTAINER_TYPE_COMPATIBILITY.get(food_type, set())
return container_type in compatible
# ---------------------------------------------------------------------------
# Main simulation engine
# ---------------------------------------------------------------------------
class PackingSimulation:
"""
Pure-logic tiffin packing simulation.
Models a robotic arm, food items, and containers as data structures.
Validates all actions against physical constraints (volume, type
compatibility, temperature zones, fragility ordering).
"""
def __init__(self):
self.arm_state: str = "idle" # "idle" | "holding"
self.held_item: Optional[FoodItem] = None
self.food_items: List[FoodItem] = []
self.containers: List[Container] = []
self.packing_log: List[Dict[str, Any]] = []
self._step_count: int = 0
def reset(
self,
food_items: List[FoodItem],
containers: List[Container],
seed: Optional[int] = None,
):
"""Initialize simulation with food items and containers."""
if seed is not None:
random.seed(seed)
self.arm_state = "idle"
self.held_item = None
self.food_items = food_items
self.containers = containers
self.packing_log = []
self._step_count = 0
# Randomize positions on table
for i, item in enumerate(self.food_items):
angle = (2 * math.pi * i) / max(len(self.food_items), 1)
item.position = (
0.3 * math.cos(angle),
0.3 * math.sin(angle),
0.65, # table height
)
for i, container in enumerate(self.containers):
container.position = (
-0.4 + 0.25 * i,
0.5,
0.65,
)
# -------------------------------------------------------------------
# Actions
# -------------------------------------------------------------------
def observe(self) -> Tuple[bool, str, float]:
"""Get detailed scene description. Returns (success, feedback, reward)."""
desc = self.get_scene_description()
return True, desc, 0.05 # small reward for observing
def identify(self, item_id: int) -> Tuple[bool, str, float, Optional[Dict]]:
"""
Classify a food item using VLM.
Returns (success, feedback, reward, vlm_result_or_None).
"""
item = self._find_item(item_id)
if item is None:
return False, f"No food item with ID {item_id} found.", -0.1, None
if item.status == "packed":
return (
False,
f"Item #{item_id} ({item.name}) is already packed.",
-0.05,
None,
)
if item.identified:
# Re-identifying is allowed but gives no reward
vlm_result = {
"name": item.name,
"type": item.food_type,
"fragility": item.fragility,
"preferred_container": item.preferred_container,
"volume_ml": item.volume_ml,
"temperature": item.temperature,
"color": item.color,
"special_notes": item.special_notes,
}
return (
True,
f"Item #{item_id} already identified as '{item.name}'. {item.special_notes}",
0.0,
vlm_result,
)
# First-time identification
item.identified = True
vlm_result = {
"name": item.name,
"type": item.food_type,
"fragility": item.fragility,
"preferred_container": item.preferred_container,
"volume_ml": item.volume_ml,
"temperature": item.temperature,
"color": item.color,
"special_notes": item.special_notes,
}
return (
True,
f"VLM identified item #{item_id}: '{item.name}' β€” "
f"type={item.food_type}, volume={item.volume_ml}ml, "
f"temperature={item.temperature}, fragility={item.fragility:.1f}, "
f"preferred container={item.preferred_container}. "
f"Note: {item.special_notes}",
0.1, # reward for gathering information
vlm_result,
)
def pick(self, item_id: int) -> Tuple[bool, str, float]:
"""Pick up a food item. Returns (success, feedback, reward)."""
if self.arm_state == "holding":
return (
False,
f"Arm is already holding '{self.held_item.name}'. "
f"Place or pour it first before picking another item.",
-0.1,
)
item = self._find_item(item_id)
if item is None:
return False, f"No food item with ID {item_id} found.", -0.1
if item.status != "on_table":
return (
False,
f"Item #{item_id} ({item.name}) cannot be picked β€” status is '{item.status}'.",
-0.1,
)
# Success β€” pick up the item
item.status = "held"
self.held_item = item
self.arm_state = "holding"
return (
True,
f"Successfully picked up item #{item_id} "
f"({'identified as ' + item.name if item.identified else 'unidentified'}).",
0.3,
)
def place(self, container_id: int) -> Tuple[bool, str, float]:
"""Place held item into container. Returns (success, feedback, reward)."""
if self.arm_state != "holding" or self.held_item is None:
return (
False,
"Arm is not holding any item. Use 'pick' first.",
-0.1,
)
container = self._find_container(container_id)
if container is None:
return False, f"No container with ID {container_id} found.", -0.1
item = self.held_item
reward = 0.0
feedback_parts = []
# --- Check type compatibility ---
type_ok = is_type_compatible(item.food_type, container.container_type)
if not type_ok:
reward -= 1.5
feedback_parts.append(
f"WARNING: {item.food_type} food in {container.container_type} "
f"container is incompatible! (e.g. liquid will spill from open container)"
)
# --- Check volume overflow ---
if item.volume_ml > container.remaining_ml:
overflow = item.volume_ml - container.remaining_ml
reward -= 1.0
feedback_parts.append(
f"WARNING: Overflow! Item needs {item.volume_ml}ml but container "
f"only has {container.remaining_ml:.0f}ml remaining. "
f"Overflow of {overflow:.0f}ml!"
)
# --- Check temperature mixing ---
if container.content_temperatures:
existing_temps = set(container.content_temperatures)
if item.temperature == "hot" and "cold" in existing_temps:
reward -= 0.5
feedback_parts.append(
"WARNING: Placing hot food with cold items! "
"Temperature contamination will occur."
)
elif item.temperature == "cold" and "hot" in existing_temps:
reward -= 0.5
feedback_parts.append(
"WARNING: Placing cold food with hot items! "
"Temperature contamination will occur."
)
# --- Check fragility ---
if container.content_fragilites and item.fragility < 0.5:
# Placing heavy/sturdy item β€” check if fragile items are under
max_existing_fragility = max(container.content_fragilites)
if max_existing_fragility > 0.6:
reward -= 0.3
feedback_parts.append(
f"WARNING: Placing sturdy item on top of fragile item "
f"(fragility {max_existing_fragility:.1f}) β€” may crush it!"
)
# --- Positive rewards ---
if type_ok:
reward += 1.5 # correct container type
if container.container_type == item.preferred_container or (
item.preferred_container in container.container_type
):
reward += 0.5 # preferred container bonus
feedback_parts.append("Great choice β€” matches preferred container type!")
if item.volume_ml <= container.remaining_ml:
# Good volume fit
utilization = item.volume_ml / container.capacity_ml
reward += 0.3 * utilization # reward proportional to space usage
# --- Execute placement ---
container.filled_ml += item.volume_ml
container.contents.append(item.name)
container.content_types.append(item.food_type)
container.content_temperatures.append(item.temperature)
container.content_fragilites.append(item.fragility)
item.status = "packed"
self.held_item = None
self.arm_state = "idle"
# Log the placement
self.packing_log.append(
{
"food_name": item.name,
"food_id": item.id,
"food_type": item.food_type,
"food_volume": item.volume_ml,
"food_temperature": item.temperature,
"food_fragility": item.fragility,
"food_preferred_container": item.preferred_container,
"container_id": container.id,
"container_type": container.container_type,
"container_name": container.name,
"type_compatible": type_ok,
"overflow": item.volume_ml > container.remaining_ml + item.volume_ml,
}
)
if feedback_parts:
feedback = f"Placed '{item.name}' in '{container.name}'. " + " ".join(
feedback_parts
)
else:
feedback = (
f"Placed '{item.name}' in '{container.name}'. "
f"Container now {container.fill_percentage:.0f}% full."
)
return True, feedback, round(reward, 2)
def pour(self, container_id: int) -> Tuple[bool, str, float]:
"""Pour liquid from held item into container. Returns (success, feedback, reward)."""
if self.arm_state != "holding" or self.held_item is None:
return False, "Arm is not holding any item. Use 'pick' first.", -0.1
item = self.held_item
# Only liquids or semi-solids can be poured
if item.food_type not in ("liquid", "semi-solid"):
return (
False,
f"Cannot pour '{item.name}' β€” it is '{item.food_type}', not a pourable item. "
f"Use 'place' instead.",
-0.1,
)
# Pour is functionally same as place but gives extra reward for liquids
success, feedback, reward = self.place(container_id)
if success:
reward += 0.2 # bonus for correctly using pour for liquids
feedback = feedback.replace("Placed", "Poured")
return success, feedback, reward
# -------------------------------------------------------------------
# Scene description
# -------------------------------------------------------------------
def get_scene_description(self) -> str:
"""Generate natural language description of the current scene."""
lines = []
lines.append("=" * 60)
lines.append("TIFFIN PACKING SCENE")
lines.append("=" * 60)
# Arm state
lines.append("")
lines.append("πŸ€– ROBOTIC ARM STATUS:")
if self.arm_state == "holding" and self.held_item:
item = self.held_item
if item.identified:
lines.append(
f" Currently holding: {item.name} "
f"(type={item.food_type}, volume={item.volume_ml}ml)"
)
else:
lines.append(f" Currently holding: Unknown food item #{item.id}")
else:
lines.append(" Arm is idle β€” ready to pick up an item")
# Food items on table
on_table = [i for i in self.food_items if i.status == "on_table"]
packed = [i for i in self.food_items if i.status == "packed"]
lines.append("")
lines.append(f"πŸ› FOOD ITEMS ON TABLE ({len(on_table)} remaining, {len(packed)} packed):")
for item in self.food_items:
status_icon = {"on_table": "⬜", "held": "🀏", "packed": "βœ…", "dropped": "❌"}.get(
item.status, "?"
)
if item.identified:
lines.append(
f" {status_icon} [{item.id}] {item.name} β€” "
f"type={item.food_type}, volume={item.volume_ml}ml, "
f"temp={item.temperature}, fragility={item.fragility:.1f}, "
f"preferred={item.preferred_container}"
)
else:
lines.append(
f" {status_icon} [{item.id}] Unknown food item "
f"(use 'identify' to classify)"
)
# Containers
lines.append("")
lines.append("🍱 TIFFIN CONTAINERS:")
for c in self.containers:
bar_len = 20
filled_bars = int((c.fill_percentage / 100) * bar_len)
bar = "β–ˆ" * filled_bars + "β–‘" * (bar_len - filled_bars)
lines.append(
f" [{c.id}] {c.name} ({c.container_type}) β€” "
f"[{bar}] {c.fill_percentage:.0f}% "
f"({c.filled_ml:.0f}/{c.capacity_ml:.0f}ml)"
)
if c.contents:
lines.append(f" Contains: {', '.join(c.contents)}")
liquid_note = "βœ… Can hold liquids" if c.accepts_liquid else "⚠️ Open β€” no liquids"
lines.append(f" {liquid_note}")
lines.append("")
lines.append("=" * 60)
return "\n".join(lines)
def get_available_commands(self) -> List[str]:
"""Return list of valid commands given current state."""
commands = ["observe"]
unpacked = [i for i in self.food_items if i.status == "on_table"]
unidentified = [i for i in self.food_items if not i.identified and i.status != "packed"]
if unidentified:
commands.append("identify")
if self.arm_state == "idle" and unpacked:
commands.append("pick")
if self.arm_state == "holding" and self.held_item:
commands.append("place")
if self.held_item.food_type in ("liquid", "semi-solid"):
commands.append("pour")
return commands
# -------------------------------------------------------------------
# Helpers
# -------------------------------------------------------------------
def _find_item(self, item_id: int) -> Optional[FoodItem]:
for item in self.food_items:
if item.id == item_id:
return item
return None
def _find_container(self, container_id: int) -> Optional[Container]:
for c in self.containers:
if c.id == container_id:
return c
return None
@property
def all_packed(self) -> bool:
return all(i.status == "packed" for i in self.food_items)
@property
def unpacked_count(self) -> int:
return sum(1 for i in self.food_items if i.status != "packed")