| """Output parser. |
| |
| Turns the model's raw text into (narrative, choices, list-of-deltas), then applies |
| the deltas to the GameState with full validation. This module is where "the model |
| proposes, Python disposes" actually happens. |
| |
| It is intentionally forgiving about formatting (small models drift) but strict about |
| *effects*: an unparseable number is ignored, an unknown key is ignored, and every |
| applied change is clamped by GameState. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import re |
| from dataclasses import dataclass |
|
|
| from .game_state import GameState, NPC, Enemy |
|
|
|
|
| @dataclass |
| class TurnResult: |
| narrative: str |
| choices: list[str] |
| applied: list[str] |
| raw: str |
|
|
|
|
| _BLOCK = lambda tag, text: _extract_block(tag, text) |
|
|
|
|
| def _extract_block(tag: str, text: str) -> str: |
| """Grab the content between <tag> ... </tag>. Tolerates a missing closing tag |
| by reading to the next block or end of string.""" |
| |
| m = re.search(rf"<{tag}>(.*?)</{tag}>", text, re.DOTALL | re.IGNORECASE) |
| if m: |
| return m.group(1).strip() |
| |
| m = re.search(rf"<{tag}>(.*?)(?=<\w+>|\Z)", text, re.DOTALL | re.IGNORECASE) |
| if m: |
| return m.group(1).strip() |
| return "" |
|
|
|
|
| def parse(raw: str) -> tuple[str, list[str], list[str]]: |
| """Return (narrative, choices, raw_state_lines) from model text.""" |
| narrative = _extract_block("narrative", raw) |
| state_block = _extract_block("state", raw) |
| choices_block = _extract_block("choices", raw) |
|
|
| |
| if not narrative and not state_block and not choices_block: |
| narrative = raw.strip() |
|
|
| choices: list[str] = [] |
| for line in choices_block.splitlines(): |
| line = line.strip() |
| if not line: |
| continue |
| |
| line = re.sub(r"^\s*(?:\d+[.)]|[-*•])\s*", "", line) |
| if line: |
| choices.append(line) |
|
|
| state_lines = [ln.strip() for ln in state_block.splitlines() if ln.strip()] |
| return narrative, choices, state_lines |
|
|
|
|
| def _parse_int(token: str) -> int | None: |
| m = re.search(r"[-+]?\d+", token) |
| return int(m.group()) if m else None |
|
|
|
|
| def apply_state_changes(state: GameState, state_lines: list[str]) -> list[str]: |
| """Validate and apply each proposed change. Returns a human-readable changelog.""" |
| applied: list[str] = [] |
|
|
| for line in state_lines: |
| if ":" not in line and not line.upper().startswith(("ENEMY_DEFEATED", "GAME_OVER")): |
| continue |
|
|
| key, _, value = line.partition(":") |
| key = key.strip().upper() |
| value = value.strip() |
|
|
| if key == "HP": |
| n = _parse_int(value) |
| if n is None: |
| continue |
| if n < 0: |
| state.damage(-n) |
| applied.append(f"HP {n}") |
| else: |
| state.heal(n) |
| applied.append(f"HP +{n}") |
|
|
| elif key == "GOLD": |
| n = _parse_int(value) |
| if n is not None: |
| state.add_gold(n) |
| applied.append(f"Gold {n:+d}") |
|
|
| elif key == "XP": |
| n = _parse_int(value) |
| if n is not None and n > 0: |
| before = state.level |
| state.add_xp(n) |
| applied.append(f"XP +{n}") |
| if state.level > before: |
| applied.append(f"Level up → {state.level}") |
|
|
| elif key == "ITEM_ADD": |
| if value and state.add_item(value): |
| applied.append(f"+ {value}") |
|
|
| elif key == "ITEM_REMOVE": |
| if value and state.remove_item(value): |
| applied.append(f"- {value}") |
|
|
| elif key == "LOCATION": |
| if value: |
| state.location = value |
| applied.append(f"Moved to {value}") |
|
|
| elif key == "QUEST": |
| if value: |
| state.quest = value |
| applied.append("Quest updated") |
|
|
| elif key == "NPC": |
| npc = _parse_npc(value) |
| if npc: |
| state.upsert_npc(npc) |
| applied.append(f"Met {npc.name}") |
|
|
| elif key == "ENEMY": |
| enemy = _parse_enemy(value) |
| if enemy: |
| state.start_combat(enemy) |
| applied.append(f"Combat: {enemy.name}") |
|
|
| elif key == "ENEMY_HP": |
| n = _parse_int(value) |
| if n is not None and state.enemy: |
| state.enemy.hp = max(0, state.enemy.hp + n) |
| applied.append(f"{state.enemy.name} HP {n:+d}") |
| if not state.enemy.alive: |
| applied.append(f"{state.enemy.name} defeated") |
| state.end_combat() |
|
|
| elif key.startswith("ENEMY_DEFEATED"): |
| if state.enemy: |
| applied.append(f"{state.enemy.name} defeated") |
| state.end_combat() |
|
|
| elif key.startswith("GAME_OVER"): |
| state.game_over = True |
| applied.append("GAME OVER") |
|
|
| return applied |
|
|
|
|
| def _parse_npc(value: str) -> NPC | None: |
| |
| parts = [p.strip() for p in value.split("|")] |
| if not parts or not parts[0]: |
| return None |
| name = parts[0] |
| role = parts[1] if len(parts) > 1 else "" |
| disp = parts[2] if len(parts) > 2 else "neutral" |
| note = parts[3] if len(parts) > 3 else "" |
| return NPC(name=name, role=role, disposition=disp, note=note) |
|
|
|
|
| def _parse_enemy(value: str) -> Enemy | None: |
| |
| parts = [p.strip() for p in value.split("|")] |
| if not parts or not parts[0]: |
| return None |
| name = parts[0] |
| hp, atk = 10, 3 |
| for p in parts[1:]: |
| m = re.search(r"(hp|atk|attack)\s*=\s*(\d+)", p, re.IGNORECASE) |
| if m: |
| if m.group(1).lower() == "hp": |
| hp = int(m.group(2)) |
| else: |
| atk = int(m.group(2)) |
| hp = max(1, min(hp, 200)) |
| atk = max(0, min(atk, 50)) |
| return Enemy(name=name, hp=hp, max_hp=hp, attack=atk) |
|
|
|
|
| def run_turn(state: GameState, raw: str) -> TurnResult: |
| """Full pipeline: parse model text, apply changes, advance the turn counter.""" |
| narrative, choices, state_lines = parse(raw) |
| applied = apply_state_changes(state, state_lines) |
| state.turn += 1 |
| if applied: |
| state.log.append(f"Turn {state.turn}: " + "; ".join(applied)) |
| return TurnResult(narrative=narrative, choices=choices, applied=applied, raw=raw) |
|
|