File size: 29,110 Bytes
d72231c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
"""NEMOCITY petition queue — one city hall, one petition at a time.

A single global asyncio worker serializes every job:

    wish:  pop -> moderate -> planner.grant(text, summary, act, emit)
           [act(building_req) runs the engine: synonyms -> sanitize ->
            placement.place -> world.apply -> world_delta per event]
           -> infill rule -> persist trace -> wish_granted
    fix:   pop -> traffic snapshot (stats + candidates) -> planner.fix(
           stats, candidates, act_fix, emit) -> ONE apply_fix event ->
           persist -> wish_granted

Admission control in `submit`/`submit_fix`: 1 pending job per client, queue
cap 50. Hourly rate limits + fix cooldowns live in the server, not here.
`submit_fix` raises TrafficSmooth (-> 409 upstream) when the static
assignment says there is nothing to fix.

The emit passed to the planner is wrapped: any event missing a wish_id gets
this job's id injected — so the planner can emit {type:"plan", plan:{...}}
right after its JSON parses without knowing the wish_id.

Crash-safe by construction: an exception while granting one job emits a
readable generic wish_rejected and the worker moves on. SSE emit failures
are swallowed; persist failures are logged but do not undo a grant.

Wire names keep godseed's wish_* verbatim; user-facing copy says petition.
Stdlib only; this is the only async module in the engine.
"""

from __future__ import annotations

import asyncio
import contextlib
import inspect
import logging
import re
import time
import uuid
from dataclasses import dataclass
from typing import Any, Optional

from . import constants as C
from . import placement, tools, traffic
from .city import CityState

log = logging.getLogger("nemocity.engine.queue")

QUEUE_MAX = 50
MAX_BUILDINGS_PER_WISH = 3   # act() invocations the planner gets per petition
MAX_CALLS_PER_WISH = 12      # engine backstop on applied events
TEXT_PREVIEW_LEN = 60
_SUBMIT_TEXT_CAP = 500  # keep raw text bounded; moderation still sees >160 and denies

# Browser-invoked granting (ZeroGPU): GPU work must run inside the PETITIONER'S
# gradio request (their HF auth headers carry the quota — a server-side
# background spaces.GPU call fails by design). The worker emits your_turn and
# waits for the browser to fetch its token (authenticated) then invoke().
INVOCATION_WINDOW_S = 25.0
GRANT_DEADLINE_S = 150.0  # one-shot 9B runs well under this; bounds a hung grant

QUEUE_FULL_REASON = "city hall is swamped; come back in a few minutes"
ALREADY_PENDING_REASON = "one petition per visitor at a time; yours is still on the desk"
WANDERED_REASON = "the petitioner left the counter; city hall moved on"
INTERRUPTED_REASON = "the review was interrupted; city hall moved on"
NOT_YOUR_TURN_REASON = "this petition is not at the counter"
RITE_BEGUN_REASON = "this petition is already being processed"
CRASH_REASON = "city hall misfiled this petition; please submit it again"
BUSY_HEAVENS_REASON = "the planning office is busy; offer your petition again in a moment"
HANDS_FULL_OBSERVATION = "rejected: this petition's permit limit is reached"
FALLBACK_EPITAPH = "the city granted a home"
TRAFFIC_SMOOTH_REASON = "Traffic is flowing smoothly."
FIX_JOB_TEXT = "City Engineer: traffic review"
INFILL_NOTE = "New families are moving in."

_WISH_ID_RE = re.compile(r"^w_(\d+)$")


class QueueError(Exception):
    """Admission failure; `.reason` is readable and safe to show users."""

    def __init__(self, reason: str) -> None:
        super().__init__(reason)
        self.reason = reason


class QueueFull(QueueError):
    pass


class AlreadyPending(QueueError):
    pass


class TrafficSmooth(QueueError):
    """Nothing to fix — the server matches this by type name -> 409."""


@dataclass
class _Item:
    wish_id: str
    text: str
    client_id: str
    submitted_at: float
    kind: str = "wish"  # "wish" | "fix"


def _trace_get(trace: Any, key: str) -> Any:
    if isinstance(trace, dict):
        return trace.get(key)
    return getattr(trace, key, None)


def _trace_set(trace: Any, key: str, value: Any) -> None:
    try:
        if isinstance(trace, dict):
            trace[key] = value
        else:
            setattr(trace, key, value)
    except Exception:  # frozen dataclass etc. — best-effort
        pass


def _epitaph_from_reading(reading: Any) -> Optional[str]:
    """Salvage the blurb from the model's last sentence so the ledger stays
    varied when the model omits a clean one."""
    text = str(reading or "").strip()
    if not text:
        return None
    parts = [p.strip() for p in re.split(r"(?<=[.!?])\s+", text) if p.strip()]
    return parts[-1][:120] if parts else None


def _num(value: Any) -> Optional[float]:
    if isinstance(value, bool):
        return None
    if isinstance(value, (int, float)):
        return float(value)
    try:
        return float(str(value).strip())
    except (ValueError, TypeError):
        return None


class QueueWorker:
    """Single serialized worker; construct once, `await start()` at boot.

    Injected collaborators (duck-typed, no engine->mind/server imports):
      world      engine.world.World (apply/summary/record_epitaph/epoch)
      moderator  engine.moderation.Moderator (async check)
      planner    object with `async grant(text, summary, act, emit)` and
                 (optionally) `async fix(stats, candidates, act_fix, emit)`
      emit       async callable broadcasting one SSE event dict
      persist    callable (sync or async) receiving the WishTrace; optional
    """

    def __init__(
        self,
        world: Any,
        moderator: Any,
        planner: Any,
        emit: Any,
        persist: Any = None,
        *,
        max_queue: int = QUEUE_MAX,
        max_calls_per_wish: int = MAX_CALLS_PER_WISH,
        next_wish_index: Optional[int] = None,
        browser_invoked: bool = False,
        invocation_window: float = INVOCATION_WINDOW_S,
        grant_deadline: float = GRANT_DEADLINE_S,
    ) -> None:
        self._world = world
        self._moderator = moderator
        self._planner = planner
        self._emit = emit
        self._persist = persist
        self._max_queue = max_queue
        self._max_calls = max_calls_per_wish
        self._queue: asyncio.Queue[_Item] = asyncio.Queue()
        self._pending: dict[str, str] = {}  # client_id -> wish_id
        self._current: Optional[_Item] = None
        self._task: Optional[asyncio.Task] = None
        self._browser_invoked = browser_invoked
        self._invocation_window = invocation_window
        self._grant_deadline = grant_deadline
        self._awaiting: Optional[dict[str, Any]] = None
        self._wish_counter = (
            next_wish_index if next_wish_index is not None else self._derive_wish_counter(world)
        )

    @staticmethod
    def _derive_wish_counter(world: Any) -> int:
        """Continue numbering after the highest persisted w_NNNNNN."""
        best = 0
        for feature in getattr(world, "features", ()) or ():
            match = _WISH_ID_RE.match(getattr(feature, "wish_id", "") or "")
            if match:
                best = max(best, int(match.group(1)) + 1)
        return best or 1  # first-ever petition is w_000001

    def _city(self) -> CityState:
        return CityState.from_events(getattr(self._world, "features", ()) or ())

    # --------------------------------------------------------------- admission

    def _admit(self, client_id: str) -> None:
        if client_id in self._pending:
            raise AlreadyPending(ALREADY_PENDING_REASON)
        if self._queue.qsize() >= self._max_queue:
            raise QueueFull(QUEUE_FULL_REASON)

    def _enqueue(self, text: str, client_id: str, kind: str) -> tuple[str, int]:
        wish_id = f"w_{self._wish_counter:06d}"
        self._wish_counter += 1
        item = _Item(
            wish_id=wish_id, text=text, client_id=client_id,
            submitted_at=time.time(), kind=kind,
        )
        self._pending[client_id] = wish_id
        self._queue.put_nowait(item)
        return wish_id, self._queue.qsize() + (1 if self._current else 0)

    async def submit(self, text: Any, client_id: Any) -> tuple[str, int]:
        """Enqueue a petition. Returns (wish_id, position). Position counts the
        petition itself plus everyone ahead of it (including the one being
        granted), so an idle empty queue yields position 1.

        Raises AlreadyPending / QueueFull with readable `.reason`.
        """
        text = ("" if text is None else str(text)).strip()[:_SUBMIT_TEXT_CAP]
        client_id = str(client_id)
        self._admit(client_id)
        wish_id, position = self._enqueue(text, client_id, "wish")
        await self._emit_queue()
        return wish_id, position

    async def submit_fix(self, client_id: Any) -> tuple[str, int]:
        """Enqueue a City Engineer job. The static-assignment gate runs HERE:
        when the worst demand ratio is below the gate there is nothing to fix
        and TrafficSmooth is raised (the server turns it into a 409). Rate
        limits and cooldowns live in the server."""
        client_id = str(client_id)
        self._admit(client_id)
        assignment = traffic.assign(self._city(), time.time())
        if assignment.max_ratio < C.FIX_GATE_RATIO:
            raise TrafficSmooth(TRAFFIC_SMOOTH_REASON)
        wish_id, position = self._enqueue(FIX_JOB_TEXT, client_id, "fix")
        await self._emit_queue()
        return wish_id, position

    # --------------------------------------------------------------- lifecycle

    async def start(self) -> None:
        if self._task is None or self._task.done():
            self._task = asyncio.get_running_loop().create_task(
                self._run(), name="nemocity-queue-worker"
            )

    async def stop(self) -> None:
        if self._task is not None:
            self._task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._task
            self._task = None

    @property
    def is_running(self) -> bool:
        return self._task is not None and not self._task.done()

    @property
    def queue_length(self) -> int:
        return self._queue.qsize()

    @property
    def current(self) -> Optional[dict]:
        if self._current is None:
            return None
        return {
            "wish_id": self._current.wish_id,
            "text_preview": self._current.text[:TEXT_PREVIEW_LEN],
        }

    def snapshot(self) -> dict:
        """The server's QueueLike view: waiting count + the job being granted."""
        return {"length": self._queue.qsize(), "current": self.current}

    # --------------------------------------------------------------- worker

    async def _run(self) -> None:
        while True:
            item = await self._queue.get()
            self._current = item
            try:
                await self._process(item)
            except asyncio.CancelledError:
                raise
            except Exception:
                # one petition must NEVER kill the worker
                log.exception("wish %s crashed; city hall moves on", item.wish_id)
                await self._safe_emit(
                    {"type": "wish_rejected", "wish_id": item.wish_id, "reason": CRASH_REASON}
                )
            finally:
                self._current = None
                self._pending.pop(item.client_id, None)
                self._queue.task_done()
                await self._emit_queue()

    async def _process(self, item: _Item) -> None:
        if not self._browser_invoked:
            await self.grant_item(item)
            return

        # Browser-invoked mode: announce the turn, then wait for the petitioner's
        # browser to call invoke() (a gradio API request carrying their ZeroGPU
        # quota). No invocation within the window -> city hall moves on.
        token = uuid.uuid4().hex
        loop = asyncio.get_running_loop()
        started: asyncio.Future[bool] = loop.create_future()
        done: asyncio.Future[bool] = loop.create_future()
        self._awaiting = {
            "wish_id": item.wish_id,
            "token": token,
            "client_id": item.client_id,
            "item": item,
            "started": started,
            "done": done,
            "begun": False,
        }
        try:
            # SECURITY: the token is NOT broadcast. your_turn carries only the
            # wish_id over the public SSE; the owner's browser fetches the secret
            # token from GET /api/turn (cookie-authenticated, identity-bound).
            turn_event = {
                "type": "your_turn",
                "wish_id": item.wish_id,
                "window_s": self._invocation_window,
            }
            # Announce in thirds: re-emits cover SSE reconnects and slow tabs.
            third = max(self._invocation_window / 3.0, 0.01)
            invoked = False
            for _ in range(3):
                await self._safe_emit(turn_event)
                try:
                    await asyncio.wait_for(asyncio.shield(started), third)
                    invoked = True
                    break
                except asyncio.TimeoutError:
                    continue
            if not invoked:
                await self._safe_emit(
                    {"type": "wish_rejected", "wish_id": item.wish_id, "reason": WANDERED_REASON}
                )
                return
            try:
                await asyncio.wait_for(asyncio.shield(done), self._grant_deadline)
            except asyncio.TimeoutError:
                # The invoked grant is hung past any sane petition length; the
                # queue must not stall behind it.
                log.error("grant for %s exceeded deadline; moving on", item.wish_id)
                await self._safe_emit(
                    {"type": "wish_rejected", "wish_id": item.wish_id, "reason": INTERRUPTED_REASON}
                )
        finally:
            self._awaiting = None

    def turn_token(self, wish_id: Any, client_id: Any) -> Optional[str]:
        """Return the secret invocation token for the job at the counter — but
        ONLY to its owner (identity-bound). Returns None when it isn't this
        caller's turn."""
        awaiting = self._awaiting
        if (
            awaiting is None
            or awaiting["wish_id"] != str(wish_id)
            or awaiting["client_id"] != str(client_id)
        ):
            return None
        return awaiting["token"]

    async def invoke(self, wish_id: Any, token: Any) -> dict:
        """Run the grant for the job at the counter. Called from the petitioner's
        gradio API request so the GPU work runs in THEIR quota context."""
        awaiting = self._awaiting
        if (
            awaiting is None
            or awaiting["wish_id"] != str(wish_id)
            or awaiting["token"] != str(token)
        ):
            raise QueueError(NOT_YOUR_TURN_REASON)
        if awaiting["begun"]:
            raise QueueError(RITE_BEGUN_REASON)
        awaiting["begun"] = True
        if not awaiting["started"].done():
            awaiting["started"].set_result(True)
        try:
            await self.grant_item(awaiting["item"])
            return {"ok": True, "wish_id": str(wish_id)}
        except Exception:
            log.exception("invoked grant for %s crashed", wish_id)
            await self._safe_emit(
                {"type": "wish_rejected", "wish_id": str(wish_id), "reason": CRASH_REASON}
            )
            return {"ok": False, "wish_id": str(wish_id), "reason": CRASH_REASON}
        finally:
            if not awaiting["done"].done():
                awaiting["done"].set_result(True)

    # ----------------------------------------------------------------- grants

    async def grant_item(self, item: _Item) -> None:
        """The whole rite for one job; runs in the worker task (server mode) or
        inside the petitioner's gradio request (browser-invoked mode)."""
        if item.kind == "fix":
            await self._grant_fix(item)
        else:
            await self._grant_wish(item)

    def _wrap_emit(self, item: _Item):
        """Planner-facing emit: inject this job's wish_id into any event that
        lacks one (the plan event, thought tokens)."""

        async def emit(event: dict) -> None:
            if isinstance(event, dict) and "wish_id" not in event:
                event = {**event, "wish_id": item.wish_id}
            await self._safe_emit(event)

        return emit

    async def _apply_events(
        self, item: _Item, city: CityState, events: list[dict], index: int
    ) -> tuple[int, int, str]:
        """world.apply each event; mirror into the live CityState; emit
        tool_call + world_delta. Returns (next_index, landed, first_obs)."""
        landed = 0
        first_obs = ""
        for call in events:
            feature, observation = self._world.apply(item.wish_id, index, call)
            index += 1
            if not first_obs:
                first_obs = observation
            if feature is None:
                log.warning("engine-composed event rejected: %s", observation)
                continue
            landed += 1
            city.apply(feature)
            await self._safe_emit(
                {
                    "type": "tool_call",
                    "wish_id": item.wish_id,
                    "call_index": index - 1,
                    "tool": feature.tool,
                    "args": dict(feature.args),
                }
            )
            await self._safe_emit({"type": "world_delta", "feature": feature.to_dict()})
        return index, landed, first_obs

    async def _grant_wish(self, item: _Item) -> None:
        await self._safe_emit({"type": "wish_started", "wish_id": item.wish_id, "text": item.text})
        await self._emit_queue()

        verdict = await self._moderator.check(item.text)
        if not verdict.allowed:
            await self._safe_emit(
                {
                    "type": "wish_rejected",
                    "wish_id": item.wish_id,
                    "reason": verdict.poetic_reason or "city hall declines this petition",
                }
            )
            return

        world_summary = self._world.summary()
        city = self._city()
        state = {"index": 0, "acts": 0, "landed": 0, "jobs_anchor": ""}

        async def act(building_req: Any) -> str:
            """ONE building entry {kind,name,near,floors,hue} -> engine places
            it (plus an auto-routed connector) and the events land."""
            if not isinstance(building_req, dict):
                return "rejected: malformed building request"
            if state["acts"] >= MAX_BUILDINGS_PER_WISH or state["index"] >= self._max_calls:
                return HANDS_FULL_OBSERVATION
            state["acts"] += 1
            kind = tools.resolve_kind(building_req.get("kind"))
            name = tools.sanitize_name(
                building_req.get("name"),
                default=tools.default_name(kind, item.text, item.wish_id),
            )
            events = placement.place(
                city,
                kind=kind,
                name=name,
                floors=_num(building_req.get("floors")),
                hue=_num(building_req.get("hue")),
                near=str(building_req.get("near") or ""),
                wish_id=item.wish_id,
                call_index=state["index"],
            )
            state["index"], landed, first_obs = await self._apply_events(
                item, city, events, state["index"]
            )
            state["landed"] += landed
            if landed and C.BUILDINGS[kind]["jobs"] > 0:
                state["jobs_anchor"] = name
            return first_obs or "rejected: nothing placed"

        trace = await self._planner.grant(item.text, world_summary, act, self._wrap_emit(item))

        # If the petition built NOTHING and the planner never actually planned —
        # every turn an error (GPU/model failure) — reject honestly and DON'T
        # persist an empty trace that pollutes the shared ledger.
        turns = _trace_get(trace, "turns") or []
        all_errored = bool(turns) and all(
            str((t or {}).get("observation") or "").startswith("error:") for t in turns
        )
        if state["landed"] == 0 and (not turns or all_errored):
            await self._safe_emit(
                {"type": "wish_rejected", "wish_id": item.wish_id, "reason": BUSY_HEAVENS_REASON}
            )
            return

        if state["landed"]:
            await self._apply_infill(item, city, state)

        # SECURITY: the reading + epitaph are model-composed and persist to the
        # public dataset + ledger. Re-moderate; redact on denial so no unvetted
        # prose survives in the archive.
        reading = _trace_get(trace, "reading")
        if reading and not self._moderator.check_content(reading).allowed:
            _trace_set(trace, "reading", "city hall read this petition in silence.")
        epitaph = _trace_get(trace, "epitaph")
        if not epitaph or not self._moderator.check_content(epitaph).allowed:
            epitaph = _epitaph_from_reading(_trace_get(trace, "reading")) or FALLBACK_EPITAPH
            _trace_set(trace, "epitaph", epitaph)
        self._enrich_trace(trace, item)
        await self._persist_trace(trace, item)

        self._world.record_epitaph(epitaph)
        await self._safe_emit(
            {
                "type": "wish_granted",
                "wish_id": item.wish_id,
                "epitaph": str(epitaph),
                "epoch": self._world.epoch,
            }
        )

    async def _apply_infill(self, item: _Item, city: CityState, state: dict) -> None:
        """Demand-driven infill: jobs outpacing homes pulls ONE bonus home in
        near the new jobs (max 1 per petition)."""
        jobs, housing = city.jobs, city.housing_capacity
        if jobs <= housing * C.INFILL_RATIO or state["index"] + 2 > self._max_calls:
            return
        kind = "apartments" if jobs - housing >= C.INFILL_APARTMENTS_DEFICIT else "house"
        events = placement.place(
            city,
            kind=kind,
            name=tools.default_name(kind, item.text, item.wish_id + ":infill"),
            floors=None,
            hue=None,
            near=state["jobs_anchor"],
            wish_id=item.wish_id,
            call_index=state["index"],
        )
        events.append({"tool": "note", "args": {"text": INFILL_NOTE, "kind": "infill"}})
        state["index"], landed, _ = await self._apply_events(
            item, city, events, state["index"]
        )
        state["landed"] += landed

    async def _grant_fix(self, item: _Item) -> None:
        await self._safe_emit({"type": "wish_started", "wish_id": item.wish_id, "text": item.text})
        await self._emit_queue()

        city = self._city()
        now_s = time.time()
        stats, candidates = traffic.snapshot(city, now_s)
        if stats["max_ratio"] < C.FIX_GATE_RATIO or not candidates:
            await self._safe_emit(
                {
                    "type": "wish_rejected",
                    "wish_id": item.wish_id,
                    "reason": TRAFFIC_SMOOTH_REASON,
                }
            )
            return

        by_id = {c["id"]: c for c in candidates}
        best = candidates[0]  # sorted best-predicted-first
        state = {"index": 0, "landed": 0, "applied": ""}

        def canned_diagnosis(cand: dict) -> str:
            return (
                f"{stats['worst']} carries {stats['max_ratio']}x capacity at rush — "
                f"{cand['desc']} cuts the index "
                f"{stats['traffic_index']}->{cand['predicted_index']}."
            )

        async def act_fix(choice: Any) -> str:
            """The planner's pick {choice, diagnosis, blurb} -> ONE apply_fix
            event. Invalid/missing choice falls back to the best candidate."""
            if state["landed"]:
                return "rejected: the fix is already applied"
            choice = choice if isinstance(choice, dict) else {}
            cand = by_id.get(str(choice.get("choice") or "").strip().upper()) or best
            # model-composed diagnosis goes on the shared world: wordlist-gate it
            raw = str(choice.get("diagnosis") or "").strip()
            diagnosis = raw[:200] if raw and self._moderator.check_content(raw).allowed else ""
            if not diagnosis:
                diagnosis = canned_diagnosis(cand)
            call = {
                "tool": "apply_fix",
                "args": {
                    "action": cand["action"],
                    "cells": cand["cells"],
                    "klass": cand["klass"],
                    "name": cand["name"],
                    "diagnosis": diagnosis,
                    "metrics_before": {
                        "traffic_index": stats["traffic_index"],
                        "worst": stats["worst"] or "",
                    },
                    "metrics_predicted": {"traffic_index": cand["predicted_index"]},
                },
            }
            state["index"], landed, obs = await self._apply_events(
                item, city, [call], state["index"]
            )
            state["landed"] += landed
            if landed:
                state["applied"] = cand["id"]
            return obs

        public = [
            {k: c[k] for k in ("id", "action", "desc", "predicted_index")}
            for c in candidates
        ]
        trace: Any = None
        planner_fix = getattr(self._planner, "fix", None)
        if planner_fix is not None:
            try:
                trace = await planner_fix(stats, public, act_fix, self._wrap_emit(item))
            except Exception:
                log.exception("planner.fix crashed for %s; engine applies best", item.wish_id)
        if state["landed"] == 0:
            # parse failure / no planner: the engine applies the best predicted
            # candidate with the canned diagnosis — a fix ALWAYS lands.
            await act_fix({"choice": best["id"]})

        epitaph = _trace_get(trace, "epitaph")
        if not epitaph or not self._moderator.check_content(epitaph).allowed:
            applied = by_id.get(state["applied"]) or best
            epitaph = f"The City Engineer opened {applied['name']}."
        if not isinstance(trace, dict):
            trace = {"reading": "", "turns": []}
        trace["epitaph"] = str(epitaph)
        trace.setdefault("kind", "fix")
        trace.setdefault("stats", stats)
        self._enrich_trace(trace, item)
        await self._persist_trace(trace, item)

        self._world.record_epitaph(epitaph)
        await self._safe_emit(
            {
                "type": "wish_granted",
                "wish_id": item.wish_id,
                "epitaph": str(epitaph),
                "epoch": self._world.epoch,
            }
        )

    # --------------------------------------------------------------- helpers

    async def _persist_trace(self, trace: Any, item: _Item) -> None:
        if self._persist is None or trace is None:
            return
        try:
            result = self._persist(trace)
            if inspect.isawaitable(result):
                await result
        except asyncio.CancelledError:
            raise
        except Exception:
            log.exception("persist failed for %s (grant stands)", item.wish_id)

    def _enrich_trace(self, trace: Any, item: _Item) -> None:
        """Fill trace fields the planner cannot know (wish_id, moderation, ...)."""
        fields = {
            "wish_id": item.wish_id,
            "text": item.text,
            "submitted_at": item.submitted_at,
        }
        try:
            if isinstance(trace, dict):
                for key, value in fields.items():
                    trace.setdefault(key, value)
                trace.setdefault("moderation", {"allowed": True, "category": None})
            elif trace is not None:
                for key, value in fields.items():
                    if not getattr(trace, key, None):
                        setattr(trace, key, value)
                if getattr(trace, "moderation", None) is None:
                    setattr(trace, "moderation", {"allowed": True, "category": None})
        except Exception:  # frozen dataclass etc. — enrichment is best-effort
            log.debug("trace enrichment skipped for %s", item.wish_id, exc_info=True)

    async def _emit_queue(self) -> None:
        await self._safe_emit(
            {"type": "queue", "length": self._queue.qsize(), "current": self.current}
        )

    async def _safe_emit(self, event: dict) -> None:
        """Broadcast one SSE event; emit failures must never break a grant."""
        try:
            await self._emit(event)
        except asyncio.CancelledError:
            raise
        except Exception:
            log.warning("emit failed for event type %s", event.get("type"), exc_info=True)