Spaces:
Sleeping
Sleeping
| """ | |
| Domain randomization for the tabletop planning environment. | |
| Randomizes everything that can vary in a real tabletop scene: | |
| - number of objects | |
| - which object is the target | |
| - which bin is the target | |
| - how many blockers, and what they block | |
| - object positions (within reachable workspace) | |
| - task instruction (generated from the sampled scene) | |
| - distractor objects (present but irrelevant to the task) | |
| - constraint type (fragile first, heavy last, etc.) | |
| The model must generalize across all of this — not memorize one layout. | |
| """ | |
| import random | |
| import string | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| OBJECT_NAMES = ["red_block", "blue_block", "green_block", "yellow_block", "purple_block"] | |
| OBJECT_COLORS = {"red_block": "red", "blue_block": "blue", "green_block": "green", | |
| "yellow_block": "yellow", "purple_block": "purple"} | |
| BINS = ["A", "B"] | |
| CONSTRAINTS = ["fragile_first", "heavy_last", "urgent_first", None, None, None] # None = no constraint | |
| class ScenarioConfig: | |
| # Objects actually present in the scene | |
| objects: list[str] = field(default_factory=list) | |
| # Which objects are targets (must be placed in a bin) | |
| targets: dict[str, str] = field(default_factory=dict) # obj_name -> bin | |
| # Blocking relationships: blocker -> blocked | |
| blockers: dict[str, str] = field(default_factory=dict) | |
| # Distractors: present but not part of the task | |
| distractors: list[str] = field(default_factory=list) | |
| # Active constraint | |
| constraint: Optional[str] = None | |
| # Generated instruction string | |
| instruction: str = "" | |
| # Object positions on the table (x, y) — workspace is roughly ±0.25 | |
| positions: dict[str, tuple] = field(default_factory=dict) | |
| # Hidden traits, revealed via scan or proximity. | |
| hidden_traits: dict[str, str] = field(default_factory=dict) | |
| # Optional deadlines in steps for selected target objects. | |
| deadlines: dict[str, int] = field(default_factory=dict) | |
| def randomize_scenario( | |
| n_objects: Optional[int] = None, | |
| n_targets: Optional[int] = None, | |
| n_blockers: Optional[int] = None, | |
| force_blocked: bool = False, | |
| scenario_pack: str = "default", | |
| ) -> ScenarioConfig: | |
| """ | |
| Generate a fully randomized scenario. | |
| n_objects: total objects on table (default: random 2-5) | |
| n_targets: how many must be placed in bins (default: random 1-2) | |
| n_blockers: how many blocking relationships (default: random 0-2) | |
| force_blocked: always have at least one blocker (good for training recovery) | |
| """ | |
| # Sample object count | |
| pack = SCENARIO_PACKS.get(scenario_pack, OBJECT_NAMES) | |
| total = n_objects or random.randint(2, 5) | |
| total = min(total, len(pack)) | |
| # Pick which objects appear | |
| present = random.sample(pack, total) | |
| # Pick targets (subset of present objects) | |
| max_targets = min(n_targets or random.randint(1, 2), len(present)) | |
| targets_list = random.sample(present, max_targets) | |
| target_bins = {obj: random.choice(BINS) for obj in targets_list} | |
| # Distractors = present but not targets | |
| distractors = [o for o in present if o not in target_bins] | |
| # Build blocking relationships | |
| n_block = n_blockers if n_blockers is not None else random.randint(0, min(2, len(distractors))) | |
| if force_blocked: | |
| n_block = max(1, n_block) | |
| blockers = {} | |
| # A blocker must be a non-target (distractor) blocking a target | |
| available_blockers = list(distractors) | |
| available_targets = list(targets_list) | |
| random.shuffle(available_blockers) | |
| random.shuffle(available_targets) | |
| for i in range(min(n_block, len(available_blockers), len(available_targets))): | |
| blockers[available_blockers[i]] = available_targets[i] | |
| # Positions: place targets first, then put blockers in front of them | |
| positions = {} | |
| x_slots = [-0.15, 0.0, 0.15, -0.08, 0.08] | |
| random.shuffle(x_slots) | |
| slot_idx = 0 | |
| for obj in present: | |
| if obj in blockers.values(): | |
| # target that gets blocked — place it further back | |
| positions[obj] = (x_slots[slot_idx % len(x_slots)], -0.05) | |
| else: | |
| positions[obj] = (x_slots[slot_idx % len(x_slots)], 0.05) | |
| slot_idx += 1 | |
| # Blocker slightly in front of what it blocks | |
| for blocker, blocked in blockers.items(): | |
| tx, ty = positions[blocked] | |
| positions[blocker] = (tx + random.uniform(-0.03, 0.03), ty + 0.08) | |
| # Constraint | |
| constraint = random.choice(CONSTRAINTS) | |
| hidden_traits = {} | |
| for obj in targets_list: | |
| # Keep trait labels simple and interpretable for LLM reasoning. | |
| hidden_traits[obj] = random.choice(["fragile", "heavy", "standard"]) | |
| deadlines = {} | |
| if targets_list and random.random() < 0.6: | |
| urgent_obj = random.choice(targets_list) | |
| deadlines[urgent_obj] = random.randint(5, 10) | |
| # Generate instruction | |
| instruction = _build_instruction(target_bins, constraint, hidden_traits, deadlines) | |
| return ScenarioConfig( | |
| objects=present, | |
| targets=target_bins, | |
| blockers=blockers, | |
| distractors=distractors, | |
| constraint=constraint, | |
| instruction=instruction, | |
| positions=positions, | |
| hidden_traits=hidden_traits, | |
| deadlines=deadlines, | |
| ) | |
| def _build_instruction(target_bins: dict[str, str], constraint: Optional[str], | |
| hidden_traits: dict[str, str], deadlines: dict[str, int]) -> str: | |
| parts = [] | |
| for obj, bin_ in target_bins.items(): | |
| display = OBJECT_COLORS.get(obj, obj.replace("_block", "")) | |
| # Use bare display name for non-block objects (professional packs) | |
| label = f"the {display} block" if obj.endswith("_block") else f"the {display}" | |
| parts.append(f"{label} in bin {bin_}") | |
| if len(parts) == 1: | |
| base = f"Place {parts[0]}." | |
| else: | |
| base = "Place " + ", then ".join(parts) + "." | |
| if constraint == "fragile_first": | |
| base += " Handle fragile items first." | |
| elif constraint == "heavy_last": | |
| base += " Move heavy items last." | |
| elif constraint == "urgent_first": | |
| base += " Prioritize urgent items first." | |
| if deadlines: | |
| for obj, step in deadlines.items(): | |
| display = OBJECT_COLORS.get(obj, obj.replace("_block", "")) | |
| label = f"the {display} block" if obj.endswith("_block") else f"the {display}" | |
| base += f" Place {label} by step {step}." | |
| if hidden_traits: | |
| base += " Some object traits are hidden until you inspect the scene." | |
| return base | |
| SCENARIO_PACKS = { | |
| "default": OBJECT_NAMES, | |
| # Professional task skins — same mechanics, domain-appropriate names | |
| "warehouse": ["fragile_package", "heavy_pallet", "urgent_parcel", "standard_box", "hazmat_drum"], | |
| "pharmacy": ["morphine_vial", "saline_bag", "insulin_pen", "blood_sample", "contrast_agent"], | |
| "lab": ["reagent_alpha", "catalyst_beta", "sample_gamma", "solvent_delta", "enzyme_epsilon"], | |
| } | |
| # Color/display name for each object in each pack | |
| OBJECT_COLORS.update({ | |
| "fragile_package": "fragile package", "heavy_pallet": "heavy pallet", | |
| "urgent_parcel": "urgent parcel", "standard_box": "standard box", | |
| "hazmat_drum": "hazmat drum", | |
| "morphine_vial": "morphine vial", "saline_bag": "saline bag", | |
| "insulin_pen": "insulin pen", "blood_sample": "blood sample", | |
| "contrast_agent": "contrast agent", | |
| "reagent_alpha": "reagent-α", "catalyst_beta": "catalyst-β", | |
| "sample_gamma": "sample-γ", "solvent_delta": "solvent-δ", | |
| "enzyme_epsilon": "enzyme-ε", | |
| }) | |