File size: 40,327 Bytes
f2df60e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
# `driftcall/env.py` — DriftCallEnv integration class

**Design doc.** No code is written against this spec until ≥2 fresh critics return `NOTHING_FURTHER`.
**Implements DESIGN.md §4 (OpenEnv Interface) in full.**
**Sections per CLAUDE.md §3.1:** Purpose · Interface · Behavior spec · Data structures · Error modes · Dependencies · Edge cases · Examples · Open questions.

---

## 1. Purpose

`DriftCallEnv` is the **one public surface** that the OpenEnv runner, the training loop, the FastAPI shim in `app.py`, and the Gradio demo all call. It is the sole integration point that composes every other module into an RL-compliant episode:

- **`task_generator.generate`** — samples a seeded `GoalSpec` at `reset()` (task_generator.md §2 generate()).
- **`drift_injector.build_schedule` / `apply_drift`** — pre-computes the episode's drift timetable and fires each event at the start of its scheduled turn.
- **`vendors/*`** (airline, cab, restaurant, hotel, payment) — dispatches `TOOL_CALL` actions and evolves per-vendor frozen `VendorState` records (vendors.md §2 dispatch()).
- **`models`** — supplies every frozen dataclass crossing the boundary (`DriftCallAction`, `DriftCallObservation`, `DriftCallState`, `ToolResult`, `DriftEvent`, `GoalSpec`, `Episode`).
- **`rewards.compute_rewards`** — called **exactly once** at termination to produce the final `Rewards` scalar consumed by GRPO.
- **`audio/*`** — *optional* boundary-only pipeline (ASR on user utterance entry, TTS on `SPEAK` replies); disabled during training, enabled behind a config flag on the deployed env Space (DESIGN.md §9.4).

The env is the **judge** (DESIGN.md §0, §7). It holds the authoritative `DriftCallState`, enforces the budget, triggers drifts deterministically, dispatches tools, detects termination, and computes rewards server-side — no LLM judge anywhere. The agent observes through `DriftCallObservation` and acts through `DriftCallAction`; everything else is env-internal.

This module has **zero domain logic of its own**. Every vendor rule, every drift mutation, every reward clause is owned by the downstream module. `env.py` is plumbing: validate → drift → dispatch → record → terminate → observe.

Cross-references: DESIGN.md §3.1 (high-level diagram), §4.1–4.5 (dataclasses, reset, step, termination, budget), §6.2 (drift trigger logic), §7 (reward invariants), §9.4 (audio training/deploy split).

---

## 2. Interface

```python
from __future__ import annotations

from typing import Any, Literal, Protocol

from driftcall.models import (
    DriftCallAction,
    DriftCallObservation,
    DriftCallState,
    Episode,
    GoalSpec,
    ToolResult,
    DriftEvent,
    ActionType,
)
from driftcall.rewards import Rewards


class DriftCallEnv:
    """
    OpenEnv-compliant RL environment for DriftCall.

    One instance == one episode lifecycle controller. Reuse across episodes
    via reset(); do not share a single instance across concurrent episodes
    (not thread-safe; see §3.9 / Error mode E7).
    """

    # -- construction --------------------------------------------------------
    def __init__(self, config: dict[str, Any] | None = None) -> None: ...

    # -- OpenEnv primitives --------------------------------------------------
    def reset(self, seed: int | None = None) -> DriftCallObservation: ...

    def step(
        self,
        action: DriftCallAction,
        *,
        force_drift_pattern: str | None = None,
    ) -> DriftCallObservation: ...

    def state(self) -> DriftCallState: ...

    def close(self) -> None: ...

    # -- terminal-only accessors (see §3.6) ---------------------------------
    def episode(self) -> Episode: ...

    def rewards(self) -> Rewards: ...

    def done(self) -> bool: ...
```

### 2.1 `__init__(self, config)`

**Config contract** (frozen-copy stored on the instance as `self._config: EnvConfig`, a frozen dataclass — §4.1):

| Key | Type | Default | Meaning |
|---|---|---|---|
| `curriculum_stage` | `Literal[1, 2, 3]` | `1` | Drift frequency (DESIGN.md §4.5, §6.2). |
| `language_weights` | `dict[LanguageCode, float]` | `{"en": 0.4, "hinglish": 0.4, "hi": 0.1, "ta": 0.05, "kn": 0.05}` | Non-negative; must sum to `1.0 ± 1e-6`. Passed to `task_generator.generate`. |
| `audio_boundary_enabled` | `bool` | `False` | If True, ASR/TTS runs at the env boundary (see §3.7). **MUST be False during GRPO training** (DESIGN.md §9.4). |
| `max_turns_override` | `int \| None` | `None` | Only for tests. If None, derived from stage via §3.2 Table A. |
| `scheduler` | `DriftScheduler` | `drift_injector.build_schedule` | Injection seam for scripted schedules in tests (drift_injector.md §2). |
| `tts_engine` | `TTSEngine \| None` | `None` | Required iff `audio_boundary_enabled`; otherwise must be None. |
| `asr_engine` | `ASREngine \| None` | `None` | Same as above. |

**Postconditions:** `__init__` performs **no I/O, no model load, no network call**. All validation is pure (raises `InvalidConfigError` on malformed config). The env is **not ready for `step()`** — callers MUST call `reset()` first.

### 2.2 `reset(seed)`

```python
def reset(self, seed: int | None = None) -> DriftCallObservation:
    """
    Begin a new episode.

    Signature takes `seed` ONLY (DESIGN.md §4.2 — locked 2026-04-24). Config
    (curriculum_stage, language_weights, audio_boundary_enabled, engines) is
    bound at `__init__` and is immutable for the env instance's lifetime.
    Callers that need a different stage or language mix MUST construct a new
    `DriftCallEnv`.

    If seed is None, one is drawn from os.urandom(8) and stored. Same seed ⇒
    byte-identical GoalSpec, drift_schedule, and initial vendor_states
    (DESIGN.md §4.2).

    Steps (in this order):
        1. episode_id = uuid4() (purely for audit/log correlation; NOT seeded).
        2. goal = task_generator.generate(seed, stage, language_weights).
        3. vendor_states = {d: vendors[d].initial_state(seed, goal) for d in DOMAINS}.
        4. schema_versions = {d: "v1" for d in DOMAINS}.
        5. drift_schedule = self._config.scheduler(stage, seed, goal).
        6. Build fresh DriftCallState (turn=0, max_turns per §3.2, done=False,
           actions=(), drift_fired=()).
        7. Build DriftCallObservation for turn 0 via _build_observation (§3.4).
        7b. If `self._config.audio_boundary_enabled` is True, invoke `tts_engine.synthesize(goal.seed_utterance, goal.language)` and emit audio on the side channel per §3.7. Canonical `last_transcript` remains `goal.seed_utterance` — audio is a boundary artifact, not the source of truth.
        8. Return observation; store state on self._state.

    Raises: InvalidLanguageWeightError, InvalidStageError from task_generator
            (propagated; see §5 E1). On any failure the env remains unready
            (self._state stays None).
    """
```

### 2.3 `step(action)`

```python
def step(
    self,
    action: DriftCallAction,
    *,
    force_drift_pattern: str | None = None,
) -> DriftCallObservation:
    """
    Advance one turn. Full pipeline (DESIGN.md §4.3):

        0. Preconditions: self._state is not None; self._state.done is False.
        1a. Validate action via `_validate_action` (pure — no state mutation). On
            validation failure, raise `InvalidActionError` immediately. No turn
            counter increment. No action stored. No termination marker yet.
        1b. If the caller catches the `InvalidActionError` but continues to invoke
            step() with another malformed action, repeated failures are tracked by
            an outer anti-hack handler in the caller. The env itself treats each
            malformed step() as a pure exception-raising operation with zero side
            effects.
        2. turn_current = self._state.turn + 1.
        3. Fire drifts for turn_current (see §3.3 firing spec):
           - If `force_drift_pattern is not None`, inject that single pattern AT
             THIS TURN, overriding the pre-computed `drift_schedule`. Any
             schedule-driven drifts that would have fired on turn_current are
             SUPPRESSED for this step — judge intent wins over RNG. The forced
             pattern must be a valid `DRIFT_PATTERN_IDS` literal; unknown ids
             raise `InvalidActionError` before any state mutation.
           - If `force_drift_pattern is None`, fire all drifts from
             `drift_schedule` whose turn == turn_current, folding `apply_drift`
             in (turn asc, pattern_id asc) order (drift_injector.md §3.1). Each
             application returns a new frozen state; the fold produces the
             post-drift state.
        4. For every vendor whose state just mutated (i.e., whose domain appeared
           in at least one pending drift), call vendors[d].emit_side_channel_if_pending
           and stash any (notice, new_vendor_state) for attachment to the first
           subsequent ToolResult on that domain (§3.3).
        5. Dispatch the action:
              TOOL_CALL    → vendors[d].dispatch(...) → (ToolResult, new_vendor_state)
              SPEAK/CLARIFY→ no state change; pure log (§3.4)
              PROBE_SCHEMA → vendors[d].describe_schema(current_schema_version)
                             packaged as a ToolResult with status="ok"
              SUBMIT       → terminate (see §3.5); compute rewards
              ABORT        → terminate with R1=0 (see §3.5)
        6. Append action (and any resulting ToolResult) to self._state via
           dataclasses.replace — state is rebuilt frozen each step; no in-place
           mutation (§3.8).
        7. Budget check: if new turn >= max_turns and not already terminal,
           terminate with terminated_by="TIMEOUT", R1 forced to 0.
        8. If terminal, build Episode (§4.3) and call rewards.compute_rewards
           EXACTLY ONCE. Cache the Rewards on self._rewards. Set state.done=True.
        9. Build and return the new observation.
    """
```

### 2.4 `state()`

```python
def state(self) -> DriftCallState:
    """
    Return the current frozen state. Required by OpenEnv validator.
    Returns the frozen `self._state` reference directly (no deepcopy needed — dataclass is `frozen=True` and every field is a frozen object, tuple, or primitive; external mutation is impossible by construction).
    Raises EnvNotReadyError if called before reset().
    """
```

### 2.5 `close()`

```python
def close(self) -> None:
    """
    Release any resources (audio engines if loaded, nothing else). Idempotent.
    After close(), calls to reset/step raise EnvClosedError.
    """
```

### 2.6 `episode()` / `rewards()` / `done()` (terminal-only)

```python
def episode(self) -> Episode:
    """Return the finalized Episode. Raises EpisodeNotTerminalError if not done."""

def rewards(self) -> Rewards:
    """Return the cached Rewards. Raises EpisodeNotTerminalError if not done."""

def done(self) -> bool:
    """True iff the current episode has terminated. False before reset()."""
```

`episode()` and `rewards()` are memoized — they return the cached objects, never recompute.

---

## 3. Behavior spec

### 3.1 Action validation (`_validate_action`)

For every `DriftCallAction`, enforce the per-`ActionType` required-field matrix. Any violation raises `InvalidActionError` and triggers the anti-hack termination path (§3.5) — validation failures are agent failures, not env bugs.

| `action_type` | Required | Forbidden | Extra constraints |
|---|---|---|---|
| `TOOL_CALL` | `tool_name`, `tool_args` (may be `{}`) | `message`, `confidence` | `tool_name``self._available_tools()`; `tool_args` JSON-serializable dict. |
| `SPEAK` | `message` (len ∈ [1, 2000]) | `tool_name`, `tool_args`, `confidence` | Valid UTF-8; no NUL bytes. |
| `CLARIFY` | `message` (len ∈ [1, 2000]) | `tool_name`, `tool_args`, `confidence` | Same as SPEAK. |
| `PROBE_SCHEMA` | `tool_name` (domain, e.g. `"airline"`) | `tool_args`, `message`, `confidence` | Domain must exist in `self._state.vendor_states`. |
| `SUBMIT` | `confidence``[0.0, 1.0]` | `tool_name`, `tool_args` | `message` optional (final user-facing reply); `rationale` optional ≤ 200 chars. |
| `ABORT` | — | all of `tool_name`, `tool_args`, `confidence` | `message` optional. |

`rationale`, if present, is always bounded to 200 chars (matches models.md §3.5). Longer ⇒ `InvalidActionError`.

### 3.2 Budget & max_turns

Derived in `reset()` from `curriculum_stage` (Table A, DESIGN.md §4.5):

| Stage | `max_turns` |
|---|---|
| 1 | 8 |
| 2 | 12 |
| 3 | 16 |

Override only via `config.max_turns_override` (tests only).

`DriftCallObservation.budget_remaining = max_turns - turn`. When `turn >= max_turns` after a non-terminal action, termination fires (§3.5).

### 3.3 Drift firing order and side channels

Within a single `step()`, drift application occurs **before** the agent's action is dispatched (drift_injector.md §3.1). This has three consequences the env MUST enforce:

1. **Deterministic fold order.** If two drifts are scheduled for the same turn (possible in Stage 3 only), they apply in `(turn asc, pattern_id asc)` order. This is **env-enforced** — drift_injector does not sort.
2. **`force_drift_pattern` override (manual drift firing).** When `step()` is called with `force_drift_pattern=<pattern_id>`:
   - The forced pattern is resolved against `drift_injector.DRIFT_PATTERN_IDS`; unknown ids raise `InvalidActionError` BEFORE any state mutation.
   - Exactly ONE `DriftEvent` is synthesized for `turn_current` (not read from `drift_schedule`) and applied via `apply_drift`.
   - Any schedule-driven drifts that would have fired on `turn_current` are SUPPRESSED for this step and do NOT carry over to a later turn — judge intent overrides RNG (see deploy_demo_space.md §3.8, §7.3).
   - The synthesized event is appended to `state.drift_fired` with `from_version/to_version` taken from the pattern's catalogue entry; `drift_log` on the returned observation reflects it exactly like any scheduled drift. The source (manual vs scheduled) is NOT encoded in `DriftEvent`; the demo's trace panel decorates the row with `actor="drift", action_or_event="manual:<pattern_id>"` out-of-band (deploy_demo_space.md §3.8).
3. **Side-channel notices** (drift_injector §3.4 `side_channel_notice_append`): the vendor stashes the notice on `VendorState.side_channel_notice`. At the **top of the next `step()`** (not the same one that fired the drift), the env calls `emit_side_channel_if_pending` per mutated domain and, **when the agent's action dispatches a tool on that domain**, attaches the returned notice to `ToolResult.response["_notice"]`. One-shot: the same notice never appears on two ToolResults. If no tool on that domain is called, the notice remains pending until one is (or episode ends — it is then recorded on the final Episode.vendor_states_final and the reward pipeline may read it via R2 detection hints).

### 3.4 Observation builder (`_build_observation`)

A pure helper that projects `DriftCallState` → `DriftCallObservation` for the agent. Rules:

1. `turn`, `goal`, `budget_remaining = max_turns - turn` come straight from state.
2. `tool_results` = **full** episode history, not just the latest (DESIGN.md §4.1). Immutable tuple.
3. `drift_log` = `state.drift_fired` as-is. **The env does NOT hide unfired drifts from the agent**; `drift_schedule` is NEVER exposed (that would leak future information).
4. `available_tools` = union of every vendor's `TOOLS` constant whose domain is relevant to the goal, PLUS `"payment.charge"` and `"payment.refund"` (payment is cross-cutting). The set is **fixed for the episode** (DESIGN.md §4.1). Drift may change field shapes but never adds/removes a tool name.
5. `last_transcript` / `last_lang` / `last_confidence`:
   - **Turn 0** (reset): = `(goal.seed_utterance, goal.language, 1.0)` — the user's opening utterance, deterministic.
   - **After a `SPEAK`/`CLARIFY` agent action**: no new user utterance exists; the fields carry forward from the previous turn (they describe the **user's** latest message, not the agent's).
   - **When audio pipeline is enabled** (§3.7): after the env's sim-caller replies to a `CLARIFY`, `last_transcript/last_lang/last_confidence` are updated from the ASR output.

### 3.5 Termination

Termination sets `state.done = True` and invokes `rewards.compute_rewards` exactly once. `terminated_by` is one of:

| Value | Trigger |
|---|---|
| `"SUBMIT"` | Agent issued `SUBMIT`. Reward reflects full R1…R5. |
| `"ABORT"` | Agent issued `ABORT`. R1 forced to 0; R2…R5 still computed for diagnostics. |
| `"TIMEOUT"` | `turn >= max_turns` after a non-terminal action. R1 forced to 0. |
| `"ANTI_HACK"` | Triggered by ≥3 consecutive validation failures (tracked by an outer anti-hack handler in the caller, not the env itself) OR by state-corruption attempts detected by R5 (DESIGN.md §4.4, §7.1 R5). A single `InvalidActionError` raise from `_validate_action` is pure — it does NOT terminate the episode, does NOT record the action, and does NOT increment the turn counter. State-corruption-path ANTI_HACK termination records no ToolResult. |

Termination is **atomic within a `step()`**: either the step commits a terminal transition (and rewards are computed) or it raises before any state change. There is no partial termination.

### 3.6 Terminal-only accessors

`episode()` / `rewards()` are valid only after `done() == True`. `Episode` (§4.3) is built by snapshotting the final state and flattening it into the shape rewards.md §2.4 expects. Reward computation is **memoized** — two calls to `rewards()` return the same object (not just equal).

### 3.7 Optional audio boundary

`audio_boundary_enabled == False` (default, training path) means the env is **purely textual**: `DriftCallObservation.last_transcript` carries the goal's `seed_utterance` on turn 0 and a deterministic simulated user reply whenever the agent `CLARIFY`s (drawn from a scripted responder keyed on (seed, turn); see audio.md §3.1 non-training path).

`audio_boundary_enabled == True` (deployed env Space path) means:

1. On `reset()`, the env uses `tts_engine.synthesize(goal.seed_utterance, goal.language)` to produce the opening user audio for display/demo, but the **canonical `last_transcript` is still the `GoalSpec.seed_utterance`** — TTS is a sim-caller convenience, not a new source of truth.
2. On `CLARIFY`, the env's sim-caller composes a text reply, runs `tts_engine.synthesize`, then `asr_engine.transcribe`, and uses the ASR `TranscriptResult.text/language/confidence` to populate the next observation's `last_*` fields. This intentionally round-trips through audio to mirror production noise (DESIGN.md §9.4).
3. On `SPEAK`, the env optionally synthesizes TTS for the agent's reply — this is emitted on a side channel (e.g., FastAPI SSE) but **does not** enter the state.

The audio pipeline is a **post-observation wrapper**. The env NEVER feeds TTS bytes back into the RL loop; reward computation is 100% textual (DESIGN.md §7). Audio disabled ⇒ no `import driftcall.audio` anywhere in the training call graph (audio.md §6.3 import firewall).

### 3.8 Immutability & state evolution

Every `DriftCallState` transition goes through `dataclasses.replace`. The env holds **one** `self._state` reference; it is reassigned to a new frozen object on every transition. No vendor state, no drift log, no action tuple is ever mutated in place. The `tests/test_env.py` property test asserts `id(prev_state.actions) != id(next_state.actions)` whenever `len(next.actions) > len(prev.actions)`.

The env's own instance state fields:
- `self._config: EnvConfig` (frozen, set at `__init__`)
- `self._state: DriftCallState | None` (reassigned each step)
- `self._rewards: Rewards | None` (set once at termination)
- `self._episode: Episode | None` (set once at termination)
- `self._closed: bool`
- `self._seed: int | None`
- `self._episode_id: str | None`

No other mutable attributes.

### 3.9 Concurrency

A `DriftCallEnv` instance is **single-session**. The FastAPI shim in `app.py` creates one instance per session (pooled by episode_id). Sharing one instance across threads is undefined behavior (no locks, state reassignment is not atomic). This matches DESIGN.md §11.1 (env Space spawns one env per request).

---

## 4. Data structures

### 4.1 `EnvConfig` (frozen, env-internal)

```python
@dataclass(frozen=True)
class EnvConfig:
    curriculum_stage: Literal[1, 2, 3]
    language_weights: dict[LanguageCode, float]      # key-copied to dict in __init__
    audio_boundary_enabled: bool
    max_turns_override: int | None
    scheduler: DriftScheduler                         # drift_injector.DriftScheduler
    tts_engine: TTSEngine | None                      # audio.TTSEngine
    asr_engine: ASREngine | None                      # audio.ASREngine
```

Construction is an explicit `EnvConfig.from_mapping(config_dict) -> EnvConfig` classmethod that raises `InvalidConfigError` on any key not in the table above, on type mismatches, or on mutually-exclusive fields (e.g., `audio_boundary_enabled=True` with `tts_engine=None`). The dict fed into `__init__` is **never stored directly** — it is validated and frozen.

### 4.2 Internal session state

The env holds exactly one live `DriftCallState` reference (`self._state`). Every field inside is frozen or a tuple of frozen objects. `self._side_channel_pending: dict[str, str]` caches pending notices keyed by domain (pure dict, module-private, reassigned on each emit/consume) — see §3.3.

There is **no rolling log buffer** separate from state; `state.actions`, `state.drift_fired`, and the per-turn `tool_results` tuple in the observation are the only record.

### 4.3 `Episode` construction (terminal-only)

At termination the env synthesizes the `Episode` object (rewards.md §2.4) by:

```python
Episode(
    episode_id       = self._episode_id,
    goal             = self._state.goal,
    actions          = self._state.actions,
    tool_results     = observation_tool_results_at_termination,  # full tuple
    drift_log        = self._state.drift_fired,
    vendor_states_final   = {d: dataclasses.asdict(vs)
                             for d, vs in self._state.vendor_states.items()},
    schema_versions_final = dict(self._state.schema_versions),
    max_turns        = self._state.max_turns,
    turns_used       = len(self._state.actions),
    terminated_by    = <one of "SUBMIT" | "ABORT" | "TIMEOUT" | "ANTI_HACK">,
    stage            = self._config.curriculum_stage,
)
```

`vendor_states_final` is a **snapshot dict** (not the frozen `VendorState` dataclass), produced by `dataclasses.asdict` — this matches rewards.md §2.4's `dict[str, dict[str, Any]]`.

### 4.4 Frozen invariants (checked by property tests)

- `DriftCallEnv.__dataclass_params__` — not a dataclass; instance attrs are the only mutation surface.
- Every `self._state` transition produces a new object: `prev is not next`.
- `self._rewards` and `self._episode` are write-once.

---

## 5. Error modes

Every error is a typed exception from `driftcall.env.errors`. All extend `DriftCallEnvError` (which extends `Exception`). No bare `raise` anywhere.

| # | Exception | Raised from | Condition |
|---|---|---|---|
| E1 | `InvalidConfigError` | `__init__` / `EnvConfig.from_mapping` | Unknown key, wrong type, weights don't sum to 1, `audio_boundary_enabled=True` with missing TTS/ASR engine. |
| E2 | `EnvNotReadyError` | `step`, `state`, `episode`, `rewards` | Called before `reset()`. |
| E3 | `EnvClosedError` | `reset`, `step` | Called after `close()`. |
| E4 | `InvalidActionError` | `step._validate_action` | Any §3.1 validation failure. **Pure exception** — raised BEFORE any state mutation: no turn increment, no action stored, no termination marker set. The env remains in its pre-step state and is still valid for a subsequent `step()` call. Repeated malformed actions (≥3 consecutive) are the concern of an outer anti-hack handler in the caller, which may then terminate the episode with `terminated_by="ANTI_HACK"` via a separate mechanism (see §3.5). |
| E5 | `EpisodeAlreadyTerminalError` | `step` | `self._state.done == True` when `step` called. |
| E6 | `EpisodeNotTerminalError` | `episode`, `rewards` | Called before termination. |
| E7 | `ConcurrentStepError` | `step` | Reentrant call detected via a per-instance `_step_in_progress` flag — single-threaded usage is a hard invariant (§3.9). |
| E8 | `UnknownDomainError` | `step` (PROBE_SCHEMA), `_build_observation` | Agent named a vendor domain not present in state. |
| E9 | `UnknownToolError` | `step` (TOOL_CALL) | `tool_name` not in `self._available_tools()`. Treated as anti-hack (same path as E4). |
| E10 | `DriftInjectionError` | `step` (during fold) | Propagated from `drift_injector.apply_drift`; not caught — indicates an internal invariant violation, not an agent error. |
| E11 | `RewardComputationError` | `step` (termination) | Propagated from `rewards.compute_rewards` if Episode is malformed; this is a bug in env, never silently swallowed. |
| E12 | `AudioPipelineError` | `step` / `reset` | Only when `audio_boundary_enabled=True` and the engine raises. Surfaced as-is; episode does NOT terminate unless audio fails on `reset()` (then `E1`-style reset failure). |

**Never caught, never wrapped:** any exception not in the table escapes as-is — silent failure is a design-doc violation (CLAUDE.md §0, DESIGN.md §7).

---

## 6. Dependencies

### 6.1 Upstream (imports from)

- `driftcall.models` — all frozen dataclasses and `ActionType`.
- `driftcall.task_generator` — `generate(seed, stage, language_weights)` (task_generator.md §2 generate).
- `driftcall.drift_injector` — `build_schedule`, `apply_drift`, `DriftScheduler` protocol, list of drift exceptions.
- `driftcall.vendors.{airline,cab,restaurant,hotel,payment}` — the 5 vendor modules, each exposing `dispatch`, `initial_state`, `apply_schema_mutation`, `describe_schema`, `emit_side_channel_if_pending`, `TOOLS` (vendors.md §2.1 six-method interface).
- `driftcall.rewards` — `compute_rewards(episode) -> Rewards`. **Imported at module top; NOT lazy.**
- `driftcall.audio.tts_kokoro`, `driftcall.audio.asr_whisper` — **imported lazily**, guarded by `if self._config.audio_boundary_enabled:` to preserve the training-path import firewall (audio.md §6.3).
- `dataclasses`, `uuid`, `os` (for `os.urandom`), `typing` — stdlib.

### 6.2 Downstream (consumed by)

- `driftcall/app.py` — FastAPI OpenEnv server; owns the env pool (one instance per session).
- `training/train_grpo.py` — via `openenv.client.EnvClient` pointed at a local uvicorn process.
- `training/eval_baseline.py`, `training/eval_final.py` — same path.
- `demo/app_gradio.py` — for interactive demos.
- `tests/test_env.py`, `tests/test_e2e.py` — direct instantiation.

### 6.3 Prohibited dependencies

- **No LLM / HTTP calls.** The env composes deterministic, in-process modules only.
- **No direct disk I/O** beyond what `task_generator.load_templates` and `drift_injector.list_patterns` already own.
- **No `threading`, no `asyncio`** — the env is synchronous; concurrency is the caller's concern.
- **No mutable global state.** All config is per-instance.

---

## 7. Edge cases

### 7.1 Action issued before `reset()`

```python
env = DriftCallEnv(config)
env.step(some_action)   # raises E2 EnvNotReadyError
```

Same for `state()`, `episode()`, `rewards()`. `done()` returns `False` (not an error — "no episode" implies "not done").

### 7.2 Double `SUBMIT` (agent emits SUBMIT twice)

First `SUBMIT` terminates normally. Second `step` raises `E5 EpisodeAlreadyTerminalError` — the env does NOT silently ignore the second action. Callers MUST check `done()` between steps. This is intentional: in training the rollout loop always stops at `done=True`; anything else is a bug upstream.

### 7.3 `ABORT` mid-turn (after some tool calls)

Legal. `ABORT` always terminates with `terminated_by="ABORT"`, `R1=0.0` (forced by rewards.md §3.2). R2…R5 are still computed over the partial episode — the training signal reflects what the agent *did* do before giving up. `confidence` on the ABORT action is ignored (models.md §3.5 specifies confidence is for SUBMIT only; forbidden on ABORT per §3.1 Table).

### 7.4 Drift scheduled on a turn that is also terminal (TIMEOUT race)

Pre-condition: `drift_schedule` placement rules (drift_injector §3.2) exclude the last 3 turns for stage 2 and the last 3 for stage 3, so this shouldn't arise from `build_schedule`. But a scripted test scheduler could inject one.

**Resolution:** drift fires FIRST (step 3 in §2.3), then the agent's action is dispatched, then the budget check runs. So a drift on `turn == max_turns - 1` with the agent issuing `TOOL_CALL` that turn: the tool sees the new schema, the ToolResult is recorded, then budget check fires TIMEOUT. The drift is recorded in `drift_fired` and the Episode.

If a scripted scheduler places a drift on `turn > max_turns`, `build_schedule`'s own guard (drift_injector.md §5) raises `DriftScheduleError` at `reset()`. The env surfaces this as an `E1`-class reset failure.

### 7.5 Audio pipeline disabled but config passes `tts_engine`

`EnvConfig.from_mapping` raises `E1 InvalidConfigError` ("tts_engine must be None when audio_boundary_enabled is False"). Fail loud at construction; never silently drop.

### 7.6 Same tool called twice in a row with identical args

Legal. Vendors are pure; same `(vendor_state, args, seed)` ⇒ identical `ToolResult`. Both results appear in the history tuple. The env does not de-duplicate — R4 (format compliance, rewards.md §3.5) may penalize for wasted turns, but that is the reward layer's call, not the env's.

### 7.7 `PROBE_SCHEMA` on a domain with no prior interaction

Legal. `describe_schema(vendor_state, current_version)` is defined for any `(vendor, v1|v2|v3)` pair. PROBE_SCHEMA produces a `ToolResult` with `status="ok"`, `tool_name="probe:<domain>"`, `response=describe_schema(...)`, `schema_version=current`, `latency_ms=0` (probes are instantaneous). It counts against the budget like any other action (DESIGN.md §4.3).

### 7.8 `SUBMIT` with out-of-range `confidence`

Caught by `_validate_action`: `confidence` must be `float` in `[0.0, 1.0]`. Outside ⇒ `E4 InvalidActionError` → anti-hack termination.

### 7.9 Seed collision across concurrent episodes

Not an env concern — each `DriftCallEnv` instance is a single episode and holds its own `self._seed`. Two instances created with the same seed produce identical trajectories by design (reproducibility). Callers that want distinct episodes must supply distinct seeds.

### 7.10 Vendor `dispatch` returns a `ToolResult` for a drift-renamed field

Vendors' `dispatch` already handles this — the vendor reads `schema_versions[domain]` and returns the schema's current field names. The env does not post-process `ToolResult.response`; it is passed verbatim to the observation so the agent sees the actual post-drift shape. R2's detection hints (drift_injector `DriftPattern.detection_hints`) are the contract for the reward layer.

### 7.11 Closed env still queried

After `close()`, `reset` and `step` raise `E3 EnvClosedError`. `done()` returns whatever it returned before close (sticky). `state()`, `episode()`, `rewards()` return the cached terminal objects if termination happened before close — callers can still audit a finished episode post-close.

---

## 8. Examples

### 8.1 Stage-1 happy path (no drift, SUBMIT with full credit)

Stage 1: English airline booking, no drift, agent completes in 5 turns.

```python
from driftcall.env import DriftCallEnv
from driftcall.models import DriftCallAction, ActionType

env = DriftCallEnv({"curriculum_stage": 1})           # audio disabled (default)
obs = env.reset(seed=42)
# obs.turn == 0
# obs.goal.domain == "airline"            (example; seed-dependent)
# obs.goal.language == "en"               (seed+weights dependent)
# obs.drift_log == ()                     (stage 1)
# obs.budget_remaining == 8               (stage 1)
# obs.last_transcript == obs.goal.seed_utterance

obs = env.step(DriftCallAction(
    action_type=ActionType.TOOL_CALL,
    tool_name="airline.search",
    tool_args={"from_": "HYD", "to": "BLR", "date": "2026-04-25"},
))
# obs.turn == 1
# len(obs.tool_results) == 1, tool_results[0].status == "ok"
# obs.drift_log == ()                     (still none)

# ... 3 more tool calls to book + confirm ...

obs = env.step(DriftCallAction(
    action_type=ActionType.SUBMIT,
    confidence=0.9,
    message="Booking confirmed, PNR X3K9Z, total ₹7200.",
))
# env.done() == True
# env.rewards().r1 == 1.0          (goal satisfied)
# env.rewards().r2 == 0.5          (stage 1 → R2 neutral)
# env.rewards().reward ∈ [0.85, 1.0]   (quality minus tiny Brier)
# env.episode().terminated_by == "SUBMIT"
# env.episode().turns_used == 5
```

### 8.2 Stage-2 drift detected and adapted

Stage 2 airline with `price_rename` drift at turn 3. Agent detects the drift on turn 4 via a `SPEAK` that mentions the rename, then adapts.

```python
env = DriftCallEnv({"curriculum_stage": 2})
obs = env.reset(seed=7)
# obs.goal.domain == "airline"
# state.drift_schedule has length 1; e.g. turn=3, pattern_id="airline.price_rename"

# Turn 1: search (pre-drift) — returns schema v1 with "price" field
obs = env.step(DriftCallAction(action_type=ActionType.TOOL_CALL,
                               tool_name="airline.search",
                               tool_args={"from_": "HYD", "to": "BLR", "date": "2026-04-25"}))

# Turn 2: agent picks a flight (still v1)
# Turn 3: DRIFT FIRES at start of step — schema moves to v2
#         agent's turn-3 action sees the new schema already.
obs = env.step(DriftCallAction(action_type=ActionType.TOOL_CALL,
                               tool_name="airline.search",
                               tool_args={"from_": "HYD", "to": "BLR", "date": "2026-04-25"}))
# obs.drift_log == (DriftEvent(turn=3, drift_type="schema", domain="airline",
#                              pattern_id="airline.price_rename", from_version="v1",
#                              to_version="v2", description="price renamed to total_fare_inr"),)
# obs.tool_results[-1].response uses "total_fare_inr" not "price"

# Turn 4: agent announces the rename (R2 detection window = [3, 5], mention hits)
obs = env.step(DriftCallAction(action_type=ActionType.SPEAK,
                               message="Note: the price field was renamed to total_fare_inr."))

# Turn 5: adapted tool_args — book with new field awareness
obs = env.step(DriftCallAction(action_type=ActionType.TOOL_CALL,
                               tool_name="airline.book",
                               tool_args={"flight_id": "6E-2345", "payment_token": "tok_v1"}))

# Turn 6: submit
obs = env.step(DriftCallAction(action_type=ActionType.SUBMIT, confidence=0.8,
                               message="Booked with updated pricing."))
# env.rewards().r1 == 1.0
# env.rewards().r2 == 1.0          (drift detected within 2-turn window, mention in SPEAK)
# env.rewards().reward ≈ 0.90
```

### 8.3 Stage-3 compound drift, TIMEOUT

Stage 3, two drifts: airline policy at turn 3, payment auth at turn 9. Agent handles the first but runs out of budget on the second.

```python
env = DriftCallEnv({"curriculum_stage": 3})
obs = env.reset(seed=2026)
# state.drift_schedule has length 2
# obs.budget_remaining == 16

# ... turns 1-8 consumed detecting and adapting to first drift ...
# turn 9: payment auth drift fires; old token_v1 returns 401 on payment.charge
# turns 10-15: agent probing and retrying, accumulating 6 actions
# turn 16: budget exhausted → TIMEOUT

# Final step (turn 16): agent attempted another payment.charge, TIMEOUT fires
# after action dispatch because turn == max_turns.

assert env.done()
assert env.episode().terminated_by == "TIMEOUT"
assert env.rewards().r1 == 0.0            # TIMEOUT forces R1=0
assert env.rewards().r2 in (0.5, 1.0)     # one of two drifts may still have been detected
assert env.rewards().reward < 0.3         # quality dominated by r3, r4; brier small
```

### 8.4 Audio-boundary-enabled deployed Space

```python
from driftcall.audio.tts_kokoro import get_tts_engine
from driftcall.audio.asr_whisper import get_asr_engine

env = DriftCallEnv({
    "curriculum_stage": 1,
    "audio_boundary_enabled": True,
    "tts_engine": get_tts_engine(),
    "asr_engine": get_asr_engine(),
})
obs = env.reset(seed=11)
# obs.last_transcript is still the textual seed_utterance (canonical source);
# the TTS audio is emitted on a side channel for UI rendering.

# Agent asks a clarifying question
obs = env.step(DriftCallAction(action_type=ActionType.CLARIFY,
                               message="When do you want to depart, morning or evening?"))
# env synthesizes a sim-caller reply ("shaam ko, 7 baje"), TTS→ASR round-trips,
# and obs.last_transcript/last_lang/last_confidence now reflect the ASR result.
# last_confidence may be < 1.0 due to whisper uncertainty.
```

---

## 9. Open questions

1. **Sim-caller responder for `CLARIFY` in text-only mode.** This doc specifies "deterministic scripted responder keyed on (seed, turn)" but the responder is not yet specified anywhere. Where does it live? Options: (a) an internal helper in `env.py`, (b) a new `driftcall.simcaller` module, (c) a field on the template in `task_generator`. *Proposal:* keep it in `env.py` as a private helper; it is env-internal plumbing and does not need a module boundary. To be resolved at critic gate.

2. **`PROBE_SCHEMA` tool_name vs domain.** DESIGN.md §4.1 shows `tool_name` for `PROBE_SCHEMA`, but semantically the action names a *domain*, not a tool. This doc uses the convention "PROBE_SCHEMA's `tool_name` holds the domain string (e.g., `'airline'`)". Alternative: introduce an explicit `domain` field on `DriftCallAction`. Pick one and reflect in `models.md`. *Proposal:* keep `tool_name` carrying the domain, add a models.md clarifying note. Cheaper than a schema change.

3. **Side-channel notice attachment ordering.** If two drifts on the same domain both stash notices (stage 3 edge case), does the agent see both concatenated, or only the first, on the next matching tool call? This doc says "one-shot per call, carry-over until consumed"; need a concrete concatenation rule if both are pending. *Proposal:* concat with `\n---\n` separator, emit all pending notices at once, clear together. Resolve with drift_injector critic.

4. **`env.episode().vendor_states_final` field-stability across refactors.** Rewards consumes `vendor_states_final` as a raw `dict[str, dict[str, Any]]`. If a vendor later adds/removes a VendorState field, the Episode's shape changes. Do we need a versioned snapshot schema? *Proposal:* No; rewards reads only documented fields and uses `.get(..., default)` everywhere. Confirm with rewards critic.

5. **Deterministic episode_id.** Currently `uuid4()` — *non-deterministic*. This is intentional for audit uniqueness but means two `reset(seed=42)` calls produce different `episode_id`s. Is that acceptable to DESIGN.md §4.2's "Deterministic for reproducibility"? *Proposal:* Yes — the *trajectory* is deterministic, the audit ID is not. Same stance as rewards.md (episode_id is opaque). Flag for gate-D cross-doc review.

6. **Thread-safety guard strength.** `E7 ConcurrentStepError` requires a per-instance `_step_in_progress` bool. This catches re-entrancy but not two threads colliding outside the GIL (same bool is not atomic). Do we need `threading.Lock`? *Proposal:* No — §3.9 declares single-threaded contract; the bool is diagnostic, not an invariant. Note in code comment; do not add a lock (adds import, slows step).

7. **Audio engine lifecycle on `close()`.** TTS/ASR engines are singletons (audio.md §2.1 `get_tts_engine` / §2.2 `get_asr_engine`). If multiple envs share them, `close()` must NOT free the engine. *Proposal:* `close()` releases only per-env state (drops `self._state`, `self._rewards`, `self._episode`) and sets `self._closed=True`. Audio engines are process-global and outlive the env. Confirm at critic gate.