Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import uuid | |
| from collections import defaultdict | |
| from pathlib import Path | |
| from typing import Any | |
| from textworld.generator import GameMaker, GameOptions, compile_game | |
| from textworld.generator.data import KnowledgeBase | |
| from .base import ARTIFACTS_ROOT, DMCompileError, parser_safe_text | |
| from .check import validate_and_normalize | |
| from .graph import ( | |
| door_room_mapping, | |
| hidden_readable_ids, | |
| npc_trade_mapping, | |
| produced_item_ids, | |
| readable_clue_mapping, | |
| recipe_mapping, | |
| use_effect_mapping, | |
| ) | |
| from .logic import build_grammar_dir, build_logic_dir, solver_policy, submit_command_text, write_artifacts | |
| from .quest import parse_quest_action, simulate_walkthrough, topological_linearize | |
| from .schema import CompiledWorld, WorldDefinition | |
| class WorldCompiler: | |
| def __init__(self, artifacts_root: Path | None = None) -> None: | |
| self.artifacts_root = artifacts_root or ARTIFACTS_ROOT | |
| def compile(self, world_input: WorldDefinition | dict[str, Any], episode_id: str | None = None) -> CompiledWorld: | |
| world = validate_and_normalize(world_input) | |
| episode_id = episode_id or uuid.uuid4().hex[:12] | |
| artifacts_dir = self.artifacts_root / episode_id | |
| artifacts_dir.mkdir(parents=True, exist_ok=True) | |
| parsed_steps = [parse_quest_action(step.action) for step in topological_linearize(world.quest_chain)] | |
| entity_names = self._assign_command_names(world) | |
| options = GameOptions() | |
| options.kb = KnowledgeBase.load( | |
| logic_path=str(build_logic_dir(artifacts_dir, world)), | |
| grammar_path=str(build_grammar_dir(artifacts_dir)), | |
| ) | |
| options.path = str(artifacts_dir / "game.z8") | |
| options.force_recompile = True | |
| maker = GameMaker(options=options) | |
| rooms, entities = self._build_entities(maker, world, entity_names) | |
| maker.set_player(rooms[world.meta.start_node_id]) | |
| self._compile_edges(maker, world, rooms, entities) | |
| self._compile_clue_sources(maker, world, entities) | |
| self._compile_fixtures(maker, world, entities) | |
| self._compile_npcs(maker, world, entities) | |
| self._compile_recipes(maker, world, entities) | |
| guardian = entities[world.meta.win_condition.target_npc_id] | |
| answer = maker.new(type="answer", name="final answer token") | |
| maker.nowhere.append(answer) | |
| entities["__answer__"] = answer | |
| maker.add_fact("guardian", guardian) | |
| maker.add_fact("correct", answer, guardian) | |
| walkthrough_commands = simulate_walkthrough(world, parsed_steps, entity_names) | |
| game = maker.build() | |
| game.objective = ( | |
| f"Explore {world.meta.title}, manipulate the dungeon's tools, gather every clue, " | |
| f"speak to {entities[world.meta.win_condition.target_npc_id].name}, and submit the answer." | |
| ) | |
| game.metadata.update( | |
| {"episode_id": episode_id, "dm_title": world.meta.title, "start_node_id": world.meta.start_node_id} | |
| ) | |
| compile_game(game, options) | |
| write_artifacts(artifacts_dir, world, walkthrough_commands) | |
| policy = solver_policy(str(options.path)) | |
| if not policy: | |
| policy = list(walkthrough_commands) | |
| return self._compiled_world( | |
| episode_id, | |
| artifacts_dir, | |
| Path(options.path), | |
| world, | |
| entity_names, | |
| walkthrough_commands, | |
| policy, | |
| ) | |
| def _build_entities( | |
| self, | |
| maker: GameMaker, | |
| world: WorldDefinition, | |
| entity_names: dict[str, str], | |
| ) -> tuple[dict[str, Any], dict[str, Any]]: | |
| rooms = { | |
| node.id: maker.new(type="r", name=entity_names[node.id], desc=node.description) | |
| for node in world.nodes | |
| if node.type in {"location", "junction"} | |
| } | |
| entities: dict[str, Any] = {} | |
| hidden_readables = hidden_readable_ids(world) | |
| recipe_outputs = {recipe.output_item_id for recipe in world.recipes} | |
| produced_items = produced_item_ids(world) | |
| for node in world.nodes: | |
| if node.type in {"location", "junction"}: | |
| continue | |
| entity = self._make_node_entity(maker, node, entity_names[node.id]) | |
| entities[node.id] = entity | |
| if node.type == "door": | |
| maker.nowhere.append(entity) | |
| elif node.type == "readable" and node.id in hidden_readables: | |
| maker.nowhere.append(entity) | |
| maker.add_fact("hidden_readable", entity) | |
| else: | |
| rooms[node.parent_id].add(entity) | |
| for item in world.items: | |
| item_type = "k" if item.subtype == "key" else "o" | |
| entity = maker.new(type=item_type, name=entity_names[item.id], desc=item.description) | |
| entities[item.id] = entity | |
| if item.id in produced_items: | |
| maker.nowhere.append(entity) | |
| if item.id in recipe_outputs: | |
| maker.add_fact("fresh", entity) | |
| else: | |
| maker.add_fact("stored_item", entity) | |
| continue | |
| holder = item.start_node_id | |
| if holder is None: | |
| raise DMCompileError(f"Placed item '{item.id}' is missing start_node_id.") | |
| if holder in rooms: | |
| rooms[holder].add(entity) | |
| else: | |
| entities[holder].add(entity) | |
| return rooms, entities | |
| def _make_node_entity(maker: GameMaker, node: object, name: str) -> Any: | |
| if node.type == "container": | |
| entity = maker.new(type="c", name=name, desc=node.description) | |
| entity.add_property("open" if node.open else "locked" if node.locked else "closed") | |
| return entity | |
| if node.type == "door": | |
| entity = maker.new(type="d", name=name, desc=node.description) | |
| entity.add_property("open" if node.open else "locked" if node.locked else "closed") | |
| return entity | |
| if node.type == "readable": | |
| return maker.new(type="readable", name=name, desc=node.description) | |
| if node.type == "fixture": | |
| return maker.new(type="fixture", name=name, desc=node.description) | |
| if node.type == "npc": | |
| return maker.new(type="npc", name=name, desc=node.description) | |
| raise DMCompileError(f"Unsupported node type '{node.type}'.") | |
| def _compile_clue_sources( | |
| self, | |
| maker: GameMaker, | |
| world: WorldDefinition, | |
| entities: dict[str, Any], | |
| ) -> None: | |
| hidden_readables = hidden_readable_ids(world) | |
| for node in world.nodes: | |
| if node.type != "readable": | |
| continue | |
| readable = entities[node.id] | |
| if node.requires_item_id: | |
| maker.add_fact("read_requires", readable, entities[node.requires_item_id]) | |
| maker.add_fact("read_consumes_use" if node.consumes_item else "read_keeps_use", readable) | |
| else: | |
| maker.add_fact("free_read", readable) | |
| if node.id in hidden_readables: | |
| continue | |
| def _compile_fixtures(self, maker: GameMaker, world: WorldDefinition, entities: dict[str, Any]) -> None: | |
| for node in world.nodes: | |
| if node.type != "fixture": | |
| continue | |
| fixture = entities[node.id] | |
| maker.add_fact("fixture_requires", fixture, entities[node.requires_item_id]) | |
| maker.add_fact("sealed", fixture) | |
| maker.add_fact("fixture_consumes_use" if node.consumes_item else "fixture_keeps_use", fixture) | |
| if node.reveals_item_id: | |
| maker.add_fact("reveals_item", fixture, entities[node.reveals_item_id]) | |
| if node.reveals_readable_id: | |
| maker.add_fact("reveals_readable", fixture, entities[node.reveals_readable_id]) | |
| def _compile_npcs( | |
| self, | |
| maker: GameMaker, | |
| world: WorldDefinition, | |
| entities: dict[str, Any], | |
| ) -> None: | |
| guardian_id = world.meta.win_condition.target_npc_id | |
| for node in world.nodes: | |
| if node.type != "npc": | |
| continue | |
| npc = entities[node.id] | |
| if node.id == guardian_id: | |
| continue | |
| maker.add_fact("trade_pending", npc) | |
| maker.add_fact("trade_requires", npc, entities[node.requires_item_id]) | |
| if node.gives_item_id: | |
| maker.add_fact("trade_gives_item", npc, entities[node.gives_item_id]) | |
| if node.gives_clue_id: | |
| maker.add_fact("trade_gives_clue", npc) | |
| def _compile_recipes(self, maker: GameMaker, world: WorldDefinition, entities: dict[str, Any]) -> None: | |
| for recipe in world.recipes: | |
| a_id, b_id = recipe.input_item_ids | |
| output = entities[recipe.output_item_id] | |
| maker.add_fact("combines_with", entities[a_id], entities[b_id], output) | |
| maker.add_fact("combines_with", entities[b_id], entities[a_id], output) | |
| def _compile_edges( | |
| maker: GameMaker, | |
| world: WorldDefinition, | |
| rooms: dict[str, Any], | |
| entities: dict[str, Any], | |
| ) -> None: | |
| pair_groups: dict[frozenset[str], list[Any]] = defaultdict(list) | |
| for edge in world.edges: | |
| pair_groups.setdefault(frozenset({edge.from_node_id, edge.to_node_id}), []).append(edge) | |
| for edges in pair_groups.values(): | |
| forward, backward = sorted(edges, key=lambda edge: edge.id) | |
| for edge in (forward, backward): | |
| maker.add_fact(f"{edge.direction}_of", rooms[edge.to_node_id], rooms[edge.from_node_id]) | |
| if forward.door_node_id: | |
| door = entities[forward.door_node_id] | |
| room_a = rooms[forward.from_node_id] | |
| room_b = rooms[forward.to_node_id] | |
| maker.add_fact("link", room_a, door, room_b) | |
| maker.add_fact("link", room_b, door, room_a) | |
| if forward.required_item_id: | |
| maker.add_fact("match", entities[forward.required_item_id], door) | |
| door_is_open = door.has_property("open") | |
| if door_is_open: | |
| maker.add_fact("free", room_a, room_b) | |
| maker.add_fact("free", room_b, room_a) | |
| else: | |
| maker.add_fact("free", rooms[forward.from_node_id], rooms[forward.to_node_id]) | |
| maker.add_fact("free", rooms[forward.to_node_id], rooms[forward.from_node_id]) | |
| def _compiled_world( | |
| self, | |
| episode_id: str, | |
| artifacts_dir: Path, | |
| game_file: Path, | |
| world: WorldDefinition, | |
| entity_names: dict[str, str], | |
| walkthrough_commands: list[str], | |
| policy: list[str], | |
| ) -> CompiledWorld: | |
| node_by_id = {node.id: node for node in world.nodes} | |
| return CompiledWorld( | |
| episode_id=episode_id, | |
| world=world, | |
| artifacts_dir=artifacts_dir, | |
| game_file=game_file, | |
| walkthrough_commands=walkthrough_commands, | |
| solver_policy=policy, | |
| correct_answer_normalized=submit_command_text(world).replace("submit ", "", 1), | |
| correct_submit_command=submit_command_text(world), | |
| guardian_id=world.meta.win_condition.target_npc_id, | |
| guardian_room_id=node_by_id[world.meta.win_condition.target_npc_id].parent_id, | |
| room_name_to_id={ | |
| entity_names[node.id]: node.id for node in world.nodes if node.type in {"location", "junction"} | |
| }, | |
| node_command_names={node.id: entity_names[node.id] for node in world.nodes}, | |
| item_command_names={item.id: entity_names[item.id] for item in world.items}, | |
| item_start_locations={item.id: item.start_node_id for item in world.items}, | |
| clue_text_by_id={clue.id: clue.text for clue in world.clues}, | |
| readable_clue_by_id=readable_clue_mapping(world), | |
| npc_trade_map=npc_trade_mapping(world), | |
| recipe_map=recipe_mapping(world), | |
| use_effects=use_effect_mapping(world), | |
| produced_item_ids=produced_item_ids(world), | |
| room_edges_by_target={(edge.from_node_id, edge.to_node_id): edge for edge in world.edges}, | |
| room_edges_by_direction={(edge.from_node_id, edge.direction): edge for edge in world.edges}, | |
| door_rooms=door_room_mapping(world), | |
| ) | |
| def _assign_command_names(world: WorldDefinition) -> dict[str, str]: | |
| names = {node.id: parser_safe_text(node.label) for node in world.nodes} | |
| names.update({item.id: parser_safe_text(item.label) for item in world.items}) | |
| return names | |