File size: 7,624 Bytes
8c8ea52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""
Domain randomization for the tabletop planning environment.

Randomizes everything that can vary in a real tabletop scene:
  - number of objects
  - which object is the target
  - which bin is the target
  - how many blockers, and what they block
  - object positions (within reachable workspace)
  - task instruction (generated from the sampled scene)
  - distractor objects (present but irrelevant to the task)
  - constraint type (fragile first, heavy last, etc.)

The model must generalize across all of this — not memorize one layout.
"""
import random
import string
from dataclasses import dataclass, field
from typing import Optional


OBJECT_NAMES   = ["red_block", "blue_block", "green_block", "yellow_block", "purple_block"]
OBJECT_COLORS  = {"red_block": "red", "blue_block": "blue", "green_block": "green",
                  "yellow_block": "yellow", "purple_block": "purple"}
BINS           = ["A", "B"]
CONSTRAINTS    = ["fragile_first", "heavy_last", "urgent_first", None, None, None]  # None = no constraint


@dataclass
class ScenarioConfig:
    # Objects actually present in the scene
    objects: list[str] = field(default_factory=list)

    # Which objects are targets (must be placed in a bin)
    targets: dict[str, str] = field(default_factory=dict)  # obj_name -> bin

    # Blocking relationships: blocker -> blocked
    blockers: dict[str, str] = field(default_factory=dict)

    # Distractors: present but not part of the task
    distractors: list[str] = field(default_factory=list)

    # Active constraint
    constraint: Optional[str] = None

    # Generated instruction string
    instruction: str = ""

    # Object positions on the table (x, y) — workspace is roughly ±0.25
    positions: dict[str, tuple] = field(default_factory=dict)
    # Hidden traits, revealed via scan or proximity.
    hidden_traits: dict[str, str] = field(default_factory=dict)
    # Optional deadlines in steps for selected target objects.
    deadlines: dict[str, int] = field(default_factory=dict)


def randomize_scenario(
    n_objects: Optional[int] = None,
    n_targets: Optional[int] = None,
    n_blockers: Optional[int] = None,
    force_blocked: bool = False,
    scenario_pack: str = "default",
) -> ScenarioConfig:
    """
    Generate a fully randomized scenario.

    n_objects:  total objects on table (default: random 2-5)
    n_targets:  how many must be placed in bins (default: random 1-2)
    n_blockers: how many blocking relationships (default: random 0-2)
    force_blocked: always have at least one blocker (good for training recovery)
    """
    # Sample object count
    pack = SCENARIO_PACKS.get(scenario_pack, OBJECT_NAMES)
    total = n_objects or random.randint(2, 5)
    total = min(total, len(pack))

    # Pick which objects appear
    present = random.sample(pack, total)

    # Pick targets (subset of present objects)
    max_targets = min(n_targets or random.randint(1, 2), len(present))
    targets_list = random.sample(present, max_targets)
    target_bins = {obj: random.choice(BINS) for obj in targets_list}

    # Distractors = present but not targets
    distractors = [o for o in present if o not in target_bins]

    # Build blocking relationships
    n_block = n_blockers if n_blockers is not None else random.randint(0, min(2, len(distractors)))
    if force_blocked:
        n_block = max(1, n_block)

    blockers = {}
    # A blocker must be a non-target (distractor) blocking a target
    available_blockers = list(distractors)
    available_targets  = list(targets_list)
    random.shuffle(available_blockers)
    random.shuffle(available_targets)
    for i in range(min(n_block, len(available_blockers), len(available_targets))):
        blockers[available_blockers[i]] = available_targets[i]

    # Positions: place targets first, then put blockers in front of them
    positions = {}
    x_slots = [-0.15, 0.0, 0.15, -0.08, 0.08]
    random.shuffle(x_slots)
    slot_idx = 0
    for obj in present:
        if obj in blockers.values():
            # target that gets blocked — place it further back
            positions[obj] = (x_slots[slot_idx % len(x_slots)], -0.05)
        else:
            positions[obj] = (x_slots[slot_idx % len(x_slots)], 0.05)
        slot_idx += 1

    # Blocker slightly in front of what it blocks
    for blocker, blocked in blockers.items():
        tx, ty = positions[blocked]
        positions[blocker] = (tx + random.uniform(-0.03, 0.03), ty + 0.08)

    # Constraint
    constraint = random.choice(CONSTRAINTS)
    hidden_traits = {}
    for obj in targets_list:
        # Keep trait labels simple and interpretable for LLM reasoning.
        hidden_traits[obj] = random.choice(["fragile", "heavy", "standard"])

    deadlines = {}
    if targets_list and random.random() < 0.6:
        urgent_obj = random.choice(targets_list)
        deadlines[urgent_obj] = random.randint(5, 10)

    # Generate instruction
    instruction = _build_instruction(target_bins, constraint, hidden_traits, deadlines)

    return ScenarioConfig(
        objects=present,
        targets=target_bins,
        blockers=blockers,
        distractors=distractors,
        constraint=constraint,
        instruction=instruction,
        positions=positions,
        hidden_traits=hidden_traits,
        deadlines=deadlines,
    )


def _build_instruction(target_bins: dict[str, str], constraint: Optional[str],
                       hidden_traits: dict[str, str], deadlines: dict[str, int]) -> str:
    parts = []
    for obj, bin_ in target_bins.items():
        display = OBJECT_COLORS.get(obj, obj.replace("_block", ""))
        # Use bare display name for non-block objects (professional packs)
        label = f"the {display} block" if obj.endswith("_block") else f"the {display}"
        parts.append(f"{label} in bin {bin_}")

    if len(parts) == 1:
        base = f"Place {parts[0]}."
    else:
        base = "Place " + ", then ".join(parts) + "."

    if constraint == "fragile_first":
        base += " Handle fragile items first."
    elif constraint == "heavy_last":
        base += " Move heavy items last."
    elif constraint == "urgent_first":
        base += " Prioritize urgent items first."

    if deadlines:
        for obj, step in deadlines.items():
            display = OBJECT_COLORS.get(obj, obj.replace("_block", ""))
            label = f"the {display} block" if obj.endswith("_block") else f"the {display}"
            base += f" Place {label} by step {step}."

    if hidden_traits:
        base += " Some object traits are hidden until you inspect the scene."

    return base
SCENARIO_PACKS = {
    "default":   OBJECT_NAMES,
    # Professional task skins — same mechanics, domain-appropriate names
    "warehouse": ["fragile_package", "heavy_pallet", "urgent_parcel", "standard_box", "hazmat_drum"],
    "pharmacy":  ["morphine_vial", "saline_bag", "insulin_pen", "blood_sample", "contrast_agent"],
    "lab":       ["reagent_alpha", "catalyst_beta", "sample_gamma", "solvent_delta", "enzyme_epsilon"],
}

# Color/display name for each object in each pack
OBJECT_COLORS.update({
    "fragile_package": "fragile package", "heavy_pallet": "heavy pallet",
    "urgent_parcel": "urgent parcel", "standard_box": "standard box",
    "hazmat_drum": "hazmat drum",
    "morphine_vial": "morphine vial", "saline_bag": "saline bag",
    "insulin_pen": "insulin pen", "blood_sample": "blood sample",
    "contrast_agent": "contrast agent",
    "reagent_alpha": "reagent-α", "catalyst_beta": "catalyst-β",
    "sample_gamma": "sample-γ", "solvent_delta": "solvent-δ",
    "enzyme_epsilon": "enzyme-ε",
})