File size: 14,188 Bytes
f6d8768
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
from __future__ import annotations

import argparse
from pathlib import Path
from typing import Any

import numpy as np
import torch
from torch.nn.utils import clip_grad_norm_
from tqdm import trange

from minidreamer.config import ensure_run_dirs, load_config, merge_dicts, save_config
from minidreamer.data.collect_random import collect_bootstrap_dataset
from minidreamer.data.replay_buffer import ReplayBuffer
from minidreamer.evaluation import evaluate_random_policy, evaluate_world_model
from minidreamer.envs.make_env import make_env_from_config
from minidreamer.models.world_model import WorldModel
from minidreamer.planning.cem import DiscreteCEMPlanner
from minidreamer.planning.evaluate_planner import evaluate_planner
from minidreamer.serialization import save_world_model_checkpoint
from minidreamer.utils.common import get_device, seed_everything, write_json, write_jsonl


def train_world_model_updates(
    model: WorldModel,
    replay: ReplayBuffer,
    optimizer: torch.optim.Optimizer,
    config: dict[str, Any],
    num_updates: int,
    device: torch.device,
) -> list[dict[str, float]]:
    if num_updates <= 0:
        return []
    model.train()
    logs: list[dict[str, float]] = []
    progress = trange(num_updates, desc="world-model-updates", leave=False)
    for _ in progress:
        batch = ReplayBuffer.batch_to_torch(replay.sample_sequences(split="train"), device=device)
        losses = model.compute_losses(batch, config)
        optimizer.zero_grad(set_to_none=True)
        losses["loss"].backward()
        clip_grad_norm_(model.parameters(), float(config["training"].get("grad_clip_norm", 100.0)))
        optimizer.step()
        log_row = {
            "loss": float(losses["loss"].detach().cpu()),
            "reward_loss": float(losses["reward_loss"].cpu()),
            "done_loss": float(losses["done_loss"].cpu()),
            "kl_loss": float(losses["kl_loss"].cpu()),
            "recon_loss": float(losses["recon_loss"].cpu()),
        }
        logs.append(log_row)
        progress.set_postfix({key: f"{value:.3f}" for key, value in log_row.items()})
    return logs


def optimizer_to_device(optimizer: torch.optim.Optimizer, device: torch.device) -> None:
    for state in optimizer.state.values():
        for key, value in state.items():
            if torch.is_tensor(value):
                state[key] = value.to(device)


def load_training_state(
    checkpoint_path: str | Path,
    config: dict[str, Any],
    action_dim: int,
    device: torch.device,
) -> tuple[dict[str, Any], WorldModel, torch.optim.Optimizer, dict[str, Any]]:
    payload = torch.load(checkpoint_path, map_location=device, weights_only=False)
    resolved_config = merge_dicts(payload["config"], config)
    model = WorldModel.from_config(resolved_config, action_dim=action_dim).to(device)
    model.load_state_dict(payload["model_state"])
    optimizer = torch.optim.Adam(model.parameters(), lr=float(resolved_config["training"]["lr"]))
    optimizer_state = payload.get("optimizer_state")
    if optimizer_state is not None:
        optimizer.load_state_dict(optimizer_state)
        optimizer_to_device(optimizer, device)
    return resolved_config, model, optimizer, payload.get("metadata", {})


def find_existing_run_artifacts(base_dir: str | Path) -> list[Path]:
    base = Path(base_dir)
    if not base.exists():
        return []

    artifact_files = [
        base / "metrics" / "run_summary.json",
        base / "metrics" / "train_metrics.jsonl",
        base / "metrics" / "eval_metrics.jsonl",
        base / "checkpoints" / "world_model_latest.pt",
        base / "replay" / "metadata.json",
    ]
    found = [path for path in artifact_files if path.exists()]
    if found:
        return found

    for subdir_name in ("checkpoints", "metrics", "replay"):
        subdir = base / subdir_name
        if subdir.exists():
            for child in subdir.iterdir():
                found.append(child)
                break
    return found


def collect_planner_steps(
    env,
    replay: ReplayBuffer,
    model: WorldModel,
    planner: DiscreteCEMPlanner,
    num_steps: int,
    random_action_fraction: float,
    rng: np.random.Generator,
) -> dict[str, int]:
    collected_steps = 0
    episodes = 0
    success_episodes = 0
    model.eval()
    while collected_steps < num_steps:
        obs, _ = env.reset()
        observations = [obs]
        actions: list[int] = []
        rewards: list[float] = []
        terminated_flags: list[float] = []
        truncated_flags: list[float] = []
        done_flags: list[float] = []
        terminated = False
        truncated = False

        with torch.no_grad():
            state = model.posterior_step(model.initial_state(1), None, obs, sample=False)
            while not (terminated or truncated):
                if rng.random() < random_action_fraction:
                    action = int(env.action_space.sample())
                else:
                    action = planner.plan(state).action
                obs, reward, terminated, truncated, _ = env.step(action)
                actions.append(action)
                rewards.append(float(reward))
                terminated_flags.append(float(terminated))
                truncated_flags.append(float(truncated))
                done_flags.append(float(terminated or truncated))
                observations.append(obs)
                collected_steps += 1
                if terminated or truncated:
                    break
                state = model.posterior_step(state, action, obs, sample=False)

        replay.add_episode(
            obs=np.asarray(observations, dtype=np.float32),
            actions=np.asarray(actions, dtype=np.int64),
            rewards=np.asarray(rewards, dtype=np.float32),
            terminated=np.asarray(terminated_flags, dtype=np.float32),
            truncated=np.asarray(truncated_flags, dtype=np.float32),
            done=np.asarray(done_flags, dtype=np.float32),
        )
        episodes += 1
        success_episodes += int(bool(terminated and np.sum(rewards) > 0.0))
    return {
        "env_steps": collected_steps,
        "episodes": episodes,
        "success_episodes": success_episodes,
    }


def run_training(
    config: dict[str, Any],
    output_dir: str | Path,
    replay_dir: str | Path | None = None,
    resume_checkpoint: str | Path | None = None,
    allow_overwrite_existing_output: bool = False,
) -> dict[str, Any]:
    seed = config.get("project", {}).get("seed", 0)
    seed_everything(seed)
    existing_artifacts = find_existing_run_artifacts(output_dir)
    if existing_artifacts and resume_checkpoint is None and not allow_overwrite_existing_output:
        preview = ", ".join(str(path) for path in existing_artifacts[:3])
        raise FileExistsError(
            f"Refusing to overwrite existing run directory '{output_dir}'. "
            f"Found existing artifacts: {preview}. "
            "Choose a new --output-dir, resume with --resume-checkpoint, "
            "or pass --allow-overwrite-existing-output to overwrite intentionally."
        )
    run_dirs = ensure_run_dirs(output_dir)
    device = get_device(config.get("training", {}).get("device"))

    env = make_env_from_config(config, seed=seed)
    action_dim = env.action_space.n
    env.close()

    if replay_dir is not None and Path(replay_dir).exists():
        replay = ReplayBuffer.load(replay_dir)
        collection_summary = {"replay_loaded": replay.summary()}
    else:
        replay, collection_summary = collect_bootstrap_dataset(config, output_dir=run_dirs["replay"], seed=seed)

    resume_metadata: dict[str, Any] = {}
    if resume_checkpoint is not None:
        config, model, optimizer, resume_metadata = load_training_state(
            checkpoint_path=resume_checkpoint,
            config=config,
            action_dim=action_dim,
            device=device,
        )
    else:
        model = WorldModel.from_config(config, action_dim=action_dim).to(device)
        optimizer = torch.optim.Adam(model.parameters(), lr=float(config["training"]["lr"]))

    save_config(config, run_dirs["base"] / "resolved_config.yaml")
    training_logs: list[dict[str, float]] = []
    evaluation_logs: list[dict[str, float]] = []

    train_collect_ratio = float(config["collection"].get("train_collect_ratio", 1.0))
    total_updates_budget = int(config["training"]["train_steps"])
    if resume_checkpoint is not None:
        updates_done = int(resume_metadata.get("updates_done", 0))
        checkpoint_env_steps = int(resume_metadata.get("env_steps", 0))
        if replay.env_steps > checkpoint_env_steps and updates_done < total_updates_budget:
            collect_steps_per_iteration = max(1, int(config["collection"].get("collect_steps_per_iteration", 1)))
            per_iteration_updates = int(
                config["collection"].get(
                    "gradient_updates_per_iteration",
                    round(collect_steps_per_iteration * train_collect_ratio),
                )
            )
            missed_iterations = max(0, round((replay.env_steps - checkpoint_env_steps) / collect_steps_per_iteration))
            catch_up_updates = min(total_updates_budget - updates_done, per_iteration_updates * missed_iterations)
            catch_up_logs = train_world_model_updates(model, replay, optimizer, config, catch_up_updates, device)
            training_logs.extend(catch_up_logs)
            updates_done += len(catch_up_logs)
    else:
        initial_updates = min(total_updates_budget, max(1, int(round(replay.env_steps * train_collect_ratio))))
        training_logs.extend(train_world_model_updates(model, replay, optimizer, config, initial_updates, device))
        updates_done = len(training_logs)

    comparison_budgets = config.get("comparison", {}).get("env_steps", [replay.env_steps])
    target_env_steps = int(max(comparison_budgets))
    rng = np.random.default_rng(seed)
    env = make_env_from_config(config, seed=seed)
    planner = DiscreteCEMPlanner.from_config(model, env.action_space.n, config)
    eval_every_steps = int(config["evaluation"].get("eval_every_env_steps", target_env_steps))
    next_eval_step = replay.env_steps

    while replay.env_steps < target_env_steps and updates_done < total_updates_budget:
        collect_steps = min(
            int(config["collection"]["collect_steps_per_iteration"]),
            target_env_steps - replay.env_steps,
        )
        collection_row = collect_planner_steps(
            env,
            replay,
            model,
            planner,
            num_steps=collect_steps,
            random_action_fraction=float(config["collection"].get("random_action_fraction_after_planner", 0.0)),
            rng=rng,
        )
        updates = int(config["collection"].get("gradient_updates_per_iteration", round(collection_row["env_steps"] * train_collect_ratio)))
        updates = min(updates, total_updates_budget - updates_done)
        training_logs.extend(train_world_model_updates(model, replay, optimizer, config, updates, device))
        updates_done = len(training_logs)
        replay.save(run_dirs["replay"])

        if replay.env_steps >= next_eval_step:
            world_model_metrics = evaluate_world_model(config, model, replay, split="val", max_episodes=10)
            planner_metrics = evaluate_planner(config, model, episodes=min(10, config["evaluation"]["episodes"]), seed=seed)
            random_metrics = evaluate_random_policy(config, episodes=min(10, config["evaluation"]["episodes"]), seed=seed)
            eval_row = {
                "env_steps": replay.env_steps,
                "updates_done": updates_done,
                **{f"world_model/{key}": value for key, value in world_model_metrics.items()},
                **{f"planner/{key}": value for key, value in planner_metrics.items()},
                **{f"random/{key}": value for key, value in random_metrics.items()},
            }
            evaluation_logs.append(eval_row)
            next_eval_step += eval_every_steps
            save_world_model_checkpoint(
                run_dirs["checkpoints"] / f"world_model_env_steps_{replay.env_steps}.pt",
                model,
                config,
                optimizer=optimizer,
                metadata={"env_steps": replay.env_steps, "updates_done": updates_done},
            )

    env.close()
    save_world_model_checkpoint(
        run_dirs["checkpoints"] / "world_model_latest.pt",
        model,
        config,
        optimizer=optimizer,
        metadata={"env_steps": replay.env_steps, "updates_done": updates_done},
    )
    write_json(run_dirs["metrics"] / "collection_summary.json", collection_summary)
    write_jsonl(run_dirs["metrics"] / "train_metrics.jsonl", training_logs)
    write_jsonl(run_dirs["metrics"] / "eval_metrics.jsonl", evaluation_logs)
    summary = {
        "replay": replay.summary(),
        "updates_done": updates_done,
        "device": str(device),
    }
    write_json(run_dirs["metrics"] / "run_summary.json", summary)
    return summary


def build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(description="Train the MiniDreamer world model.")
    parser.add_argument("--config", type=Path, required=True)
    parser.add_argument("--output-dir", type=Path, required=True)
    parser.add_argument("--replay-dir", type=Path, default=None, help="Optional existing replay directory.")
    parser.add_argument("--resume-checkpoint", type=Path, default=None, help="Optional checkpoint to resume from.")
    parser.add_argument(
        "--allow-overwrite-existing-output",
        action="store_true",
        help="Allow overwriting an existing run directory when not resuming.",
    )
    return parser


def main() -> None:
    parser = build_arg_parser()
    args = parser.parse_args()
    config = load_config(args.config)
    summary = run_training(
        config,
        args.output_dir,
        replay_dir=args.replay_dir,
        resume_checkpoint=args.resume_checkpoint,
        allow_overwrite_existing_output=args.allow_overwrite_existing_output,
    )
    print(summary)


if __name__ == "__main__":
    main()