Spaces:
Sleeping
driftcall/env.py β DriftCallEnv integration class
Design doc. No code is written against this spec until β₯2 fresh critics return NOTHING_FURTHER.
Implements DESIGN.md Β§4 (OpenEnv Interface) in full.
Sections per CLAUDE.md Β§3.1: Purpose Β· Interface Β· Behavior spec Β· Data structures Β· Error modes Β· Dependencies Β· Edge cases Β· Examples Β· Open questions.
1. Purpose
DriftCallEnv is the one public surface that the OpenEnv runner, the training loop, the FastAPI shim in app.py, and the Gradio demo all call. It is the sole integration point that composes every other module into an RL-compliant episode:
task_generator.generateβ samples a seededGoalSpecatreset()(task_generator.md Β§2 generate()).drift_injector.build_schedule/apply_driftβ pre-computes the episode's drift timetable and fires each event at the start of its scheduled turn.vendors/*(airline, cab, restaurant, hotel, payment) β dispatchesTOOL_CALLactions and evolves per-vendor frozenVendorStaterecords (vendors.md Β§2 dispatch()).modelsβ supplies every frozen dataclass crossing the boundary (DriftCallAction,DriftCallObservation,DriftCallState,ToolResult,DriftEvent,GoalSpec,Episode).rewards.compute_rewardsβ called exactly once at termination to produce the finalRewardsscalar consumed by GRPO.audio/*β optional boundary-only pipeline (ASR on user utterance entry, TTS onSPEAKreplies); disabled during training, enabled behind a config flag on the deployed env Space (DESIGN.md Β§9.4).
The env is the judge (DESIGN.md Β§0, Β§7). It holds the authoritative DriftCallState, enforces the budget, triggers drifts deterministically, dispatches tools, detects termination, and computes rewards server-side β no LLM judge anywhere. The agent observes through DriftCallObservation and acts through DriftCallAction; everything else is env-internal.
This module has zero domain logic of its own. Every vendor rule, every drift mutation, every reward clause is owned by the downstream module. env.py is plumbing: validate β drift β dispatch β record β terminate β observe.
Cross-references: DESIGN.md Β§3.1 (high-level diagram), Β§4.1β4.5 (dataclasses, reset, step, termination, budget), Β§6.2 (drift trigger logic), Β§7 (reward invariants), Β§9.4 (audio training/deploy split).
2. Interface
from __future__ import annotations
from typing import Any, Literal, Protocol
from driftcall.models import (
DriftCallAction,
DriftCallObservation,
DriftCallState,
Episode,
GoalSpec,
ToolResult,
DriftEvent,
ActionType,
)
from driftcall.rewards import Rewards
class DriftCallEnv:
"""
OpenEnv-compliant RL environment for DriftCall.
One instance == one episode lifecycle controller. Reuse across episodes
via reset(); do not share a single instance across concurrent episodes
(not thread-safe; see Β§3.9 / Error mode E7).
"""
# -- construction --------------------------------------------------------
def __init__(self, config: dict[str, Any] | None = None) -> None: ...
# -- OpenEnv primitives --------------------------------------------------
def reset(self, seed: int | None = None) -> DriftCallObservation: ...
def step(
self,
action: DriftCallAction,
*,
force_drift_pattern: str | None = None,
) -> DriftCallObservation: ...
def state(self) -> DriftCallState: ...
def close(self) -> None: ...
# -- terminal-only accessors (see Β§3.6) ---------------------------------
def episode(self) -> Episode: ...
def rewards(self) -> Rewards: ...
def done(self) -> bool: ...
2.1 __init__(self, config)
Config contract (frozen-copy stored on the instance as self._config: EnvConfig, a frozen dataclass β Β§4.1):
| Key | Type | Default | Meaning |
|---|---|---|---|
curriculum_stage |
Literal[1, 2, 3] |
1 |
Drift frequency (DESIGN.md Β§4.5, Β§6.2). |
language_weights |
dict[LanguageCode, float] |
{"en": 0.4, "hinglish": 0.4, "hi": 0.1, "ta": 0.05, "kn": 0.05} |
Non-negative; must sum to 1.0 Β± 1e-6. Passed to task_generator.generate. |
audio_boundary_enabled |
bool |
False |
If True, ASR/TTS runs at the env boundary (see Β§3.7). MUST be False during GRPO training (DESIGN.md Β§9.4). |
max_turns_override |
int | None |
None |
Only for tests. If None, derived from stage via Β§3.2 Table A. |
scheduler |
DriftScheduler |
drift_injector.build_schedule |
Injection seam for scripted schedules in tests (drift_injector.md Β§2). |
tts_engine |
TTSEngine | None |
None |
Required iff audio_boundary_enabled; otherwise must be None. |
asr_engine |
ASREngine | None |
None |
Same as above. |
Postconditions: __init__ performs no I/O, no model load, no network call. All validation is pure (raises InvalidConfigError on malformed config). The env is not ready for step() β callers MUST call reset() first.
2.2 reset(seed)
def reset(self, seed: int | None = None) -> DriftCallObservation:
"""
Begin a new episode.
Signature takes `seed` ONLY (DESIGN.md Β§4.2 β locked 2026-04-24). Config
(curriculum_stage, language_weights, audio_boundary_enabled, engines) is
bound at `__init__` and is immutable for the env instance's lifetime.
Callers that need a different stage or language mix MUST construct a new
`DriftCallEnv`.
If seed is None, one is drawn from os.urandom(8) and stored. Same seed β
byte-identical GoalSpec, drift_schedule, and initial vendor_states
(DESIGN.md Β§4.2).
Steps (in this order):
1. episode_id = uuid4() (purely for audit/log correlation; NOT seeded).
2. goal = task_generator.generate(seed, stage, language_weights).
3. vendor_states = {d: vendors[d].initial_state(seed, goal) for d in DOMAINS}.
4. schema_versions = {d: "v1" for d in DOMAINS}.
5. drift_schedule = self._config.scheduler(stage, seed, goal).
6. Build fresh DriftCallState (turn=0, max_turns per Β§3.2, done=False,
actions=(), drift_fired=()).
7. Build DriftCallObservation for turn 0 via _build_observation (Β§3.4).
7b. If `self._config.audio_boundary_enabled` is True, invoke `tts_engine.synthesize(goal.seed_utterance, goal.language)` and emit audio on the side channel per Β§3.7. Canonical `last_transcript` remains `goal.seed_utterance` β audio is a boundary artifact, not the source of truth.
8. Return observation; store state on self._state.
Raises: InvalidLanguageWeightError, InvalidStageError from task_generator
(propagated; see Β§5 E1). On any failure the env remains unready
(self._state stays None).
"""
2.3 step(action)
def step(
self,
action: DriftCallAction,
*,
force_drift_pattern: str | None = None,
) -> DriftCallObservation:
"""
Advance one turn. Full pipeline (DESIGN.md Β§4.3):
0. Preconditions: self._state is not None; self._state.done is False.
1a. Validate action via `_validate_action` (pure β no state mutation). On
validation failure, raise `InvalidActionError` immediately. No turn
counter increment. No action stored. No termination marker yet.
1b. If the caller catches the `InvalidActionError` but continues to invoke
step() with another malformed action, repeated failures are tracked by
an outer anti-hack handler in the caller. The env itself treats each
malformed step() as a pure exception-raising operation with zero side
effects.
2. turn_current = self._state.turn + 1.
3. Fire drifts for turn_current (see Β§3.3 firing spec):
- If `force_drift_pattern is not None`, inject that single pattern AT
THIS TURN, overriding the pre-computed `drift_schedule`. Any
schedule-driven drifts that would have fired on turn_current are
SUPPRESSED for this step β judge intent wins over RNG. The forced
pattern must be a valid `DRIFT_PATTERN_IDS` literal; unknown ids
raise `InvalidActionError` before any state mutation.
- If `force_drift_pattern is None`, fire all drifts from
`drift_schedule` whose turn == turn_current, folding `apply_drift`
in (turn asc, pattern_id asc) order (drift_injector.md Β§3.1). Each
application returns a new frozen state; the fold produces the
post-drift state.
4. For every vendor whose state just mutated (i.e., whose domain appeared
in at least one pending drift), call vendors[d].emit_side_channel_if_pending
and stash any (notice, new_vendor_state) for attachment to the first
subsequent ToolResult on that domain (Β§3.3).
5. Dispatch the action:
TOOL_CALL β vendors[d].dispatch(...) β (ToolResult, new_vendor_state)
SPEAK/CLARIFYβ no state change; pure log (Β§3.4)
PROBE_SCHEMA β vendors[d].describe_schema(current_schema_version)
packaged as a ToolResult with status="ok"
SUBMIT β terminate (see Β§3.5); compute rewards
ABORT β terminate with R1=0 (see Β§3.5)
6. Append action (and any resulting ToolResult) to self._state via
dataclasses.replace β state is rebuilt frozen each step; no in-place
mutation (Β§3.8).
7. Budget check: if new turn >= max_turns and not already terminal,
terminate with terminated_by="TIMEOUT", R1 forced to 0.
8. If terminal, build Episode (Β§4.3) and call rewards.compute_rewards
EXACTLY ONCE. Cache the Rewards on self._rewards. Set state.done=True.
9. Build and return the new observation.
"""
2.4 state()
def state(self) -> DriftCallState:
"""
Return the current frozen state. Required by OpenEnv validator.
Returns the frozen `self._state` reference directly (no deepcopy needed β dataclass is `frozen=True` and every field is a frozen object, tuple, or primitive; external mutation is impossible by construction).
Raises EnvNotReadyError if called before reset().
"""
2.5 close()
def close(self) -> None:
"""
Release any resources (audio engines if loaded, nothing else). Idempotent.
After close(), calls to reset/step raise EnvClosedError.
"""
2.6 episode() / rewards() / done() (terminal-only)
def episode(self) -> Episode:
"""Return the finalized Episode. Raises EpisodeNotTerminalError if not done."""
def rewards(self) -> Rewards:
"""Return the cached Rewards. Raises EpisodeNotTerminalError if not done."""
def done(self) -> bool:
"""True iff the current episode has terminated. False before reset()."""
episode() and rewards() are memoized β they return the cached objects, never recompute.
3. Behavior spec
3.1 Action validation (_validate_action)
For every DriftCallAction, enforce the per-ActionType required-field matrix. Any violation raises InvalidActionError and triggers the anti-hack termination path (Β§3.5) β validation failures are agent failures, not env bugs.
action_type |
Required | Forbidden | Extra constraints |
|---|---|---|---|
TOOL_CALL |
tool_name, tool_args (may be {}) |
message, confidence |
tool_name β self._available_tools(); tool_args JSON-serializable dict. |
SPEAK |
message (len β [1, 2000]) |
tool_name, tool_args, confidence |
Valid UTF-8; no NUL bytes. |
CLARIFY |
message (len β [1, 2000]) |
tool_name, tool_args, confidence |
Same as SPEAK. |
PROBE_SCHEMA |
tool_name (domain, e.g. "airline") |
tool_args, message, confidence |
Domain must exist in self._state.vendor_states. |
SUBMIT |
confidence β [0.0, 1.0] |
tool_name, tool_args |
message optional (final user-facing reply); rationale optional β€ 200 chars. |
ABORT |
β | all of tool_name, tool_args, confidence |
message optional. |
rationale, if present, is always bounded to 200 chars (matches models.md Β§3.5). Longer β InvalidActionError.
3.2 Budget & max_turns
Derived in reset() from curriculum_stage (Table A, DESIGN.md Β§4.5):
| Stage | max_turns |
|---|---|
| 1 | 8 |
| 2 | 12 |
| 3 | 16 |
Override only via config.max_turns_override (tests only).
DriftCallObservation.budget_remaining = max_turns - turn. When turn >= max_turns after a non-terminal action, termination fires (Β§3.5).
3.3 Drift firing order and side channels
Within a single step(), drift application occurs before the agent's action is dispatched (drift_injector.md Β§3.1). This has three consequences the env MUST enforce:
- Deterministic fold order. If two drifts are scheduled for the same turn (possible in Stage 3 only), they apply in
(turn asc, pattern_id asc)order. This is env-enforced β drift_injector does not sort. force_drift_patternoverride (manual drift firing). Whenstep()is called withforce_drift_pattern=<pattern_id>:- The forced pattern is resolved against
drift_injector.DRIFT_PATTERN_IDS; unknown ids raiseInvalidActionErrorBEFORE any state mutation. - Exactly ONE
DriftEventis synthesized forturn_current(not read fromdrift_schedule) and applied viaapply_drift. - Any schedule-driven drifts that would have fired on
turn_currentare SUPPRESSED for this step and do NOT carry over to a later turn β judge intent overrides RNG (see deploy_demo_space.md Β§3.8, Β§7.3). - The synthesized event is appended to
state.drift_firedwithfrom_version/to_versiontaken from the pattern's catalogue entry;drift_logon the returned observation reflects it exactly like any scheduled drift. The source (manual vs scheduled) is NOT encoded inDriftEvent; the demo's trace panel decorates the row withactor="drift", action_or_event="manual:<pattern_id>"out-of-band (deploy_demo_space.md Β§3.8).
- The forced pattern is resolved against
- Side-channel notices (drift_injector Β§3.4
side_channel_notice_append): the vendor stashes the notice onVendorState.side_channel_notice. At the top of the nextstep()(not the same one that fired the drift), the env callsemit_side_channel_if_pendingper mutated domain and, when the agent's action dispatches a tool on that domain, attaches the returned notice toToolResult.response["_notice"]. One-shot: the same notice never appears on two ToolResults. If no tool on that domain is called, the notice remains pending until one is (or episode ends β it is then recorded on the final Episode.vendor_states_final and the reward pipeline may read it via R2 detection hints).
3.4 Observation builder (_build_observation)
A pure helper that projects DriftCallState β DriftCallObservation for the agent. Rules:
turn,goal,budget_remaining = max_turns - turncome straight from state.tool_results= full episode history, not just the latest (DESIGN.md Β§4.1). Immutable tuple.drift_log=state.drift_firedas-is. The env does NOT hide unfired drifts from the agent;drift_scheduleis NEVER exposed (that would leak future information).available_tools= union of every vendor'sTOOLSconstant whose domain is relevant to the goal, PLUS"payment.charge"and"payment.refund"(payment is cross-cutting). The set is fixed for the episode (DESIGN.md Β§4.1). Drift may change field shapes but never adds/removes a tool name.last_transcript/last_lang/last_confidence:- Turn 0 (reset): =
(goal.seed_utterance, goal.language, 1.0)β the user's opening utterance, deterministic. - After a
SPEAK/CLARIFYagent action: no new user utterance exists; the fields carry forward from the previous turn (they describe the user's latest message, not the agent's). - When audio pipeline is enabled (Β§3.7): after the env's sim-caller replies to a
CLARIFY,last_transcript/last_lang/last_confidenceare updated from the ASR output.
- Turn 0 (reset): =
3.5 Termination
Termination sets state.done = True and invokes rewards.compute_rewards exactly once. terminated_by is one of:
| Value | Trigger |
|---|---|
"SUBMIT" |
Agent issued SUBMIT. Reward reflects full R1β¦R5. |
"ABORT" |
Agent issued ABORT. R1 forced to 0; R2β¦R5 still computed for diagnostics. |
"TIMEOUT" |
turn >= max_turns after a non-terminal action. R1 forced to 0. |
"ANTI_HACK" |
Triggered by β₯3 consecutive validation failures (tracked by an outer anti-hack handler in the caller, not the env itself) OR by state-corruption attempts detected by R5 (DESIGN.md Β§4.4, Β§7.1 R5). A single InvalidActionError raise from _validate_action is pure β it does NOT terminate the episode, does NOT record the action, and does NOT increment the turn counter. State-corruption-path ANTI_HACK termination records no ToolResult. |
Termination is atomic within a step(): either the step commits a terminal transition (and rewards are computed) or it raises before any state change. There is no partial termination.
3.6 Terminal-only accessors
episode() / rewards() are valid only after done() == True. Episode (Β§4.3) is built by snapshotting the final state and flattening it into the shape rewards.md Β§2.4 expects. Reward computation is memoized β two calls to rewards() return the same object (not just equal).
3.7 Optional audio boundary
audio_boundary_enabled == False (default, training path) means the env is purely textual: DriftCallObservation.last_transcript carries the goal's seed_utterance on turn 0 and a deterministic simulated user reply whenever the agent CLARIFYs (drawn from a scripted responder keyed on (seed, turn); see audio.md Β§3.1 non-training path).
audio_boundary_enabled == True (deployed env Space path) means:
- On
reset(), the env usestts_engine.synthesize(goal.seed_utterance, goal.language)to produce the opening user audio for display/demo, but the canonicallast_transcriptis still theGoalSpec.seed_utteranceβ TTS is a sim-caller convenience, not a new source of truth. - On
CLARIFY, the env's sim-caller composes a text reply, runstts_engine.synthesize, thenasr_engine.transcribe, and uses the ASRTranscriptResult.text/language/confidenceto populate the next observation'slast_*fields. This intentionally round-trips through audio to mirror production noise (DESIGN.md Β§9.4). - On
SPEAK, the env optionally synthesizes TTS for the agent's reply β this is emitted on a side channel (e.g., FastAPI SSE) but does not enter the state.
The audio pipeline is a post-observation wrapper. The env NEVER feeds TTS bytes back into the RL loop; reward computation is 100% textual (DESIGN.md Β§7). Audio disabled β no import driftcall.audio anywhere in the training call graph (audio.md Β§6.3 import firewall).
3.8 Immutability & state evolution
Every DriftCallState transition goes through dataclasses.replace. The env holds one self._state reference; it is reassigned to a new frozen object on every transition. No vendor state, no drift log, no action tuple is ever mutated in place. The tests/test_env.py property test asserts id(prev_state.actions) != id(next_state.actions) whenever len(next.actions) > len(prev.actions).
The env's own instance state fields:
self._config: EnvConfig(frozen, set at__init__)self._state: DriftCallState | None(reassigned each step)self._rewards: Rewards | None(set once at termination)self._episode: Episode | None(set once at termination)self._closed: boolself._seed: int | Noneself._episode_id: str | None
No other mutable attributes.
3.9 Concurrency
A DriftCallEnv instance is single-session. The FastAPI shim in app.py creates one instance per session (pooled by episode_id). Sharing one instance across threads is undefined behavior (no locks, state reassignment is not atomic). This matches DESIGN.md Β§11.1 (env Space spawns one env per request).
4. Data structures
4.1 EnvConfig (frozen, env-internal)
@dataclass(frozen=True)
class EnvConfig:
curriculum_stage: Literal[1, 2, 3]
language_weights: dict[LanguageCode, float] # key-copied to dict in __init__
audio_boundary_enabled: bool
max_turns_override: int | None
scheduler: DriftScheduler # drift_injector.DriftScheduler
tts_engine: TTSEngine | None # audio.TTSEngine
asr_engine: ASREngine | None # audio.ASREngine
Construction is an explicit EnvConfig.from_mapping(config_dict) -> EnvConfig classmethod that raises InvalidConfigError on any key not in the table above, on type mismatches, or on mutually-exclusive fields (e.g., audio_boundary_enabled=True with tts_engine=None). The dict fed into __init__ is never stored directly β it is validated and frozen.
4.2 Internal session state
The env holds exactly one live DriftCallState reference (self._state). Every field inside is frozen or a tuple of frozen objects. self._side_channel_pending: dict[str, str] caches pending notices keyed by domain (pure dict, module-private, reassigned on each emit/consume) β see Β§3.3.
There is no rolling log buffer separate from state; state.actions, state.drift_fired, and the per-turn tool_results tuple in the observation are the only record.
4.3 Episode construction (terminal-only)
At termination the env synthesizes the Episode object (rewards.md Β§2.4) by:
Episode(
episode_id = self._episode_id,
goal = self._state.goal,
actions = self._state.actions,
tool_results = observation_tool_results_at_termination, # full tuple
drift_log = self._state.drift_fired,
vendor_states_final = {d: dataclasses.asdict(vs)
for d, vs in self._state.vendor_states.items()},
schema_versions_final = dict(self._state.schema_versions),
max_turns = self._state.max_turns,
turns_used = len(self._state.actions),
terminated_by = <one of "SUBMIT" | "ABORT" | "TIMEOUT" | "ANTI_HACK">,
stage = self._config.curriculum_stage,
)
vendor_states_final is a snapshot dict (not the frozen VendorState dataclass), produced by dataclasses.asdict β this matches rewards.md Β§2.4's dict[str, dict[str, Any]].
4.4 Frozen invariants (checked by property tests)
DriftCallEnv.__dataclass_params__β not a dataclass; instance attrs are the only mutation surface.- Every
self._statetransition produces a new object:prev is not next. self._rewardsandself._episodeare write-once.
5. Error modes
Every error is a typed exception from driftcall.env.errors. All extend DriftCallEnvError (which extends Exception). No bare raise anywhere.
| # | Exception | Raised from | Condition |
|---|---|---|---|
| E1 | InvalidConfigError |
__init__ / EnvConfig.from_mapping |
Unknown key, wrong type, weights don't sum to 1, audio_boundary_enabled=True with missing TTS/ASR engine. |
| E2 | EnvNotReadyError |
step, state, episode, rewards |
Called before reset(). |
| E3 | EnvClosedError |
reset, step |
Called after close(). |
| E4 | InvalidActionError |
step._validate_action |
Any Β§3.1 validation failure. Pure exception β raised BEFORE any state mutation: no turn increment, no action stored, no termination marker set. The env remains in its pre-step state and is still valid for a subsequent step() call. Repeated malformed actions (β₯3 consecutive) are the concern of an outer anti-hack handler in the caller, which may then terminate the episode with terminated_by="ANTI_HACK" via a separate mechanism (see Β§3.5). |
| E5 | EpisodeAlreadyTerminalError |
step |
self._state.done == True when step called. |
| E6 | EpisodeNotTerminalError |
episode, rewards |
Called before termination. |
| E7 | ConcurrentStepError |
step |
Reentrant call detected via a per-instance _step_in_progress flag β single-threaded usage is a hard invariant (Β§3.9). |
| E8 | UnknownDomainError |
step (PROBE_SCHEMA), _build_observation |
Agent named a vendor domain not present in state. |
| E9 | UnknownToolError |
step (TOOL_CALL) |
tool_name not in self._available_tools(). Treated as anti-hack (same path as E4). |
| E10 | DriftInjectionError |
step (during fold) |
Propagated from drift_injector.apply_drift; not caught β indicates an internal invariant violation, not an agent error. |
| E11 | RewardComputationError |
step (termination) |
Propagated from rewards.compute_rewards if Episode is malformed; this is a bug in env, never silently swallowed. |
| E12 | AudioPipelineError |
step / reset |
Only when audio_boundary_enabled=True and the engine raises. Surfaced as-is; episode does NOT terminate unless audio fails on reset() (then E1-style reset failure). |
Never caught, never wrapped: any exception not in the table escapes as-is β silent failure is a design-doc violation (CLAUDE.md Β§0, DESIGN.md Β§7).
6. Dependencies
6.1 Upstream (imports from)
driftcall.modelsβ all frozen dataclasses andActionType.driftcall.task_generatorβgenerate(seed, stage, language_weights)(task_generator.md Β§2 generate).driftcall.drift_injectorβbuild_schedule,apply_drift,DriftSchedulerprotocol, list of drift exceptions.driftcall.vendors.{airline,cab,restaurant,hotel,payment}β the 5 vendor modules, each exposingdispatch,initial_state,apply_schema_mutation,describe_schema,emit_side_channel_if_pending,TOOLS(vendors.md Β§2.1 six-method interface).driftcall.rewardsβcompute_rewards(episode) -> Rewards. Imported at module top; NOT lazy.driftcall.audio.tts_kokoro,driftcall.audio.asr_whisperβ imported lazily, guarded byif self._config.audio_boundary_enabled:to preserve the training-path import firewall (audio.md Β§6.3).dataclasses,uuid,os(foros.urandom),typingβ stdlib.
6.2 Downstream (consumed by)
driftcall/app.pyβ FastAPI OpenEnv server; owns the env pool (one instance per session).training/train_grpo.pyβ viaopenenv.client.EnvClientpointed at a local uvicorn process.training/eval_baseline.py,training/eval_final.pyβ same path.demo/app_gradio.pyβ for interactive demos.tests/test_env.py,tests/test_e2e.pyβ direct instantiation.
6.3 Prohibited dependencies
- No LLM / HTTP calls. The env composes deterministic, in-process modules only.
- No direct disk I/O beyond what
task_generator.load_templatesanddrift_injector.list_patternsalready own. - No
threading, noasyncioβ the env is synchronous; concurrency is the caller's concern. - No mutable global state. All config is per-instance.
7. Edge cases
7.1 Action issued before reset()
env = DriftCallEnv(config)
env.step(some_action) # raises E2 EnvNotReadyError
Same for state(), episode(), rewards(). done() returns False (not an error β "no episode" implies "not done").
7.2 Double SUBMIT (agent emits SUBMIT twice)
First SUBMIT terminates normally. Second step raises E5 EpisodeAlreadyTerminalError β the env does NOT silently ignore the second action. Callers MUST check done() between steps. This is intentional: in training the rollout loop always stops at done=True; anything else is a bug upstream.
7.3 ABORT mid-turn (after some tool calls)
Legal. ABORT always terminates with terminated_by="ABORT", R1=0.0 (forced by rewards.md Β§3.2). R2β¦R5 are still computed over the partial episode β the training signal reflects what the agent did do before giving up. confidence on the ABORT action is ignored (models.md Β§3.5 specifies confidence is for SUBMIT only; forbidden on ABORT per Β§3.1 Table).
7.4 Drift scheduled on a turn that is also terminal (TIMEOUT race)
Pre-condition: drift_schedule placement rules (drift_injector Β§3.2) exclude the last 3 turns for stage 2 and the last 3 for stage 3, so this shouldn't arise from build_schedule. But a scripted test scheduler could inject one.
Resolution: drift fires FIRST (step 3 in Β§2.3), then the agent's action is dispatched, then the budget check runs. So a drift on turn == max_turns - 1 with the agent issuing TOOL_CALL that turn: the tool sees the new schema, the ToolResult is recorded, then budget check fires TIMEOUT. The drift is recorded in drift_fired and the Episode.
If a scripted scheduler places a drift on turn > max_turns, build_schedule's own guard (drift_injector.md Β§5) raises DriftScheduleError at reset(). The env surfaces this as an E1-class reset failure.
7.5 Audio pipeline disabled but config passes tts_engine
EnvConfig.from_mapping raises E1 InvalidConfigError ("tts_engine must be None when audio_boundary_enabled is False"). Fail loud at construction; never silently drop.
7.6 Same tool called twice in a row with identical args
Legal. Vendors are pure; same (vendor_state, args, seed) β identical ToolResult. Both results appear in the history tuple. The env does not de-duplicate β R4 (format compliance, rewards.md Β§3.5) may penalize for wasted turns, but that is the reward layer's call, not the env's.
7.7 PROBE_SCHEMA on a domain with no prior interaction
Legal. describe_schema(vendor_state, current_version) is defined for any (vendor, v1|v2|v3) pair. PROBE_SCHEMA produces a ToolResult with status="ok", tool_name="probe:<domain>", response=describe_schema(...), schema_version=current, latency_ms=0 (probes are instantaneous). It counts against the budget like any other action (DESIGN.md Β§4.3).
7.8 SUBMIT with out-of-range confidence
Caught by _validate_action: confidence must be float in [0.0, 1.0]. Outside β E4 InvalidActionError β anti-hack termination.
7.9 Seed collision across concurrent episodes
Not an env concern β each DriftCallEnv instance is a single episode and holds its own self._seed. Two instances created with the same seed produce identical trajectories by design (reproducibility). Callers that want distinct episodes must supply distinct seeds.
7.10 Vendor dispatch returns a ToolResult for a drift-renamed field
Vendors' dispatch already handles this β the vendor reads schema_versions[domain] and returns the schema's current field names. The env does not post-process ToolResult.response; it is passed verbatim to the observation so the agent sees the actual post-drift shape. R2's detection hints (drift_injector DriftPattern.detection_hints) are the contract for the reward layer.
7.11 Closed env still queried
After close(), reset and step raise E3 EnvClosedError. done() returns whatever it returned before close (sticky). state(), episode(), rewards() return the cached terminal objects if termination happened before close β callers can still audit a finished episode post-close.
8. Examples
8.1 Stage-1 happy path (no drift, SUBMIT with full credit)
Stage 1: English airline booking, no drift, agent completes in 5 turns.
from driftcall.env import DriftCallEnv
from driftcall.models import DriftCallAction, ActionType
env = DriftCallEnv({"curriculum_stage": 1}) # audio disabled (default)
obs = env.reset(seed=42)
# obs.turn == 0
# obs.goal.domain == "airline" (example; seed-dependent)
# obs.goal.language == "en" (seed+weights dependent)
# obs.drift_log == () (stage 1)
# obs.budget_remaining == 8 (stage 1)
# obs.last_transcript == obs.goal.seed_utterance
obs = env.step(DriftCallAction(
action_type=ActionType.TOOL_CALL,
tool_name="airline.search",
tool_args={"from_": "HYD", "to": "BLR", "date": "2026-04-25"},
))
# obs.turn == 1
# len(obs.tool_results) == 1, tool_results[0].status == "ok"
# obs.drift_log == () (still none)
# ... 3 more tool calls to book + confirm ...
obs = env.step(DriftCallAction(
action_type=ActionType.SUBMIT,
confidence=0.9,
message="Booking confirmed, PNR X3K9Z, total βΉ7200.",
))
# env.done() == True
# env.rewards().r1 == 1.0 (goal satisfied)
# env.rewards().r2 == 0.5 (stage 1 β R2 neutral)
# env.rewards().reward β [0.85, 1.0] (quality minus tiny Brier)
# env.episode().terminated_by == "SUBMIT"
# env.episode().turns_used == 5
8.2 Stage-2 drift detected and adapted
Stage 2 airline with price_rename drift at turn 3. Agent detects the drift on turn 4 via a SPEAK that mentions the rename, then adapts.
env = DriftCallEnv({"curriculum_stage": 2})
obs = env.reset(seed=7)
# obs.goal.domain == "airline"
# state.drift_schedule has length 1; e.g. turn=3, pattern_id="airline.price_rename"
# Turn 1: search (pre-drift) β returns schema v1 with "price" field
obs = env.step(DriftCallAction(action_type=ActionType.TOOL_CALL,
tool_name="airline.search",
tool_args={"from_": "HYD", "to": "BLR", "date": "2026-04-25"}))
# Turn 2: agent picks a flight (still v1)
# Turn 3: DRIFT FIRES at start of step β schema moves to v2
# agent's turn-3 action sees the new schema already.
obs = env.step(DriftCallAction(action_type=ActionType.TOOL_CALL,
tool_name="airline.search",
tool_args={"from_": "HYD", "to": "BLR", "date": "2026-04-25"}))
# obs.drift_log == (DriftEvent(turn=3, drift_type="schema", domain="airline",
# pattern_id="airline.price_rename", from_version="v1",
# to_version="v2", description="price renamed to total_fare_inr"),)
# obs.tool_results[-1].response uses "total_fare_inr" not "price"
# Turn 4: agent announces the rename (R2 detection window = [3, 5], mention hits)
obs = env.step(DriftCallAction(action_type=ActionType.SPEAK,
message="Note: the price field was renamed to total_fare_inr."))
# Turn 5: adapted tool_args β book with new field awareness
obs = env.step(DriftCallAction(action_type=ActionType.TOOL_CALL,
tool_name="airline.book",
tool_args={"flight_id": "6E-2345", "payment_token": "tok_v1"}))
# Turn 6: submit
obs = env.step(DriftCallAction(action_type=ActionType.SUBMIT, confidence=0.8,
message="Booked with updated pricing."))
# env.rewards().r1 == 1.0
# env.rewards().r2 == 1.0 (drift detected within 2-turn window, mention in SPEAK)
# env.rewards().reward β 0.90
8.3 Stage-3 compound drift, TIMEOUT
Stage 3, two drifts: airline policy at turn 3, payment auth at turn 9. Agent handles the first but runs out of budget on the second.
env = DriftCallEnv({"curriculum_stage": 3})
obs = env.reset(seed=2026)
# state.drift_schedule has length 2
# obs.budget_remaining == 16
# ... turns 1-8 consumed detecting and adapting to first drift ...
# turn 9: payment auth drift fires; old token_v1 returns 401 on payment.charge
# turns 10-15: agent probing and retrying, accumulating 6 actions
# turn 16: budget exhausted β TIMEOUT
# Final step (turn 16): agent attempted another payment.charge, TIMEOUT fires
# after action dispatch because turn == max_turns.
assert env.done()
assert env.episode().terminated_by == "TIMEOUT"
assert env.rewards().r1 == 0.0 # TIMEOUT forces R1=0
assert env.rewards().r2 in (0.5, 1.0) # one of two drifts may still have been detected
assert env.rewards().reward < 0.3 # quality dominated by r3, r4; brier small
8.4 Audio-boundary-enabled deployed Space
from driftcall.audio.tts_kokoro import get_tts_engine
from driftcall.audio.asr_whisper import get_asr_engine
env = DriftCallEnv({
"curriculum_stage": 1,
"audio_boundary_enabled": True,
"tts_engine": get_tts_engine(),
"asr_engine": get_asr_engine(),
})
obs = env.reset(seed=11)
# obs.last_transcript is still the textual seed_utterance (canonical source);
# the TTS audio is emitted on a side channel for UI rendering.
# Agent asks a clarifying question
obs = env.step(DriftCallAction(action_type=ActionType.CLARIFY,
message="When do you want to depart, morning or evening?"))
# env synthesizes a sim-caller reply ("shaam ko, 7 baje"), TTSβASR round-trips,
# and obs.last_transcript/last_lang/last_confidence now reflect the ASR result.
# last_confidence may be < 1.0 due to whisper uncertainty.
9. Open questions
Sim-caller responder for
CLARIFYin text-only mode. This doc specifies "deterministic scripted responder keyed on (seed, turn)" but the responder is not yet specified anywhere. Where does it live? Options: (a) an internal helper inenv.py, (b) a newdriftcall.simcallermodule, (c) a field on the template intask_generator. Proposal: keep it inenv.pyas a private helper; it is env-internal plumbing and does not need a module boundary. To be resolved at critic gate.PROBE_SCHEMAtool_name vs domain. DESIGN.md Β§4.1 showstool_nameforPROBE_SCHEMA, but semantically the action names a domain, not a tool. This doc uses the convention "PROBE_SCHEMA'stool_nameholds the domain string (e.g.,'airline')". Alternative: introduce an explicitdomainfield onDriftCallAction. Pick one and reflect inmodels.md. Proposal: keeptool_namecarrying the domain, add a models.md clarifying note. Cheaper than a schema change.Side-channel notice attachment ordering. If two drifts on the same domain both stash notices (stage 3 edge case), does the agent see both concatenated, or only the first, on the next matching tool call? This doc says "one-shot per call, carry-over until consumed"; need a concrete concatenation rule if both are pending. Proposal: concat with
\n---\nseparator, emit all pending notices at once, clear together. Resolve with drift_injector critic.env.episode().vendor_states_finalfield-stability across refactors. Rewards consumesvendor_states_finalas a rawdict[str, dict[str, Any]]. If a vendor later adds/removes a VendorState field, the Episode's shape changes. Do we need a versioned snapshot schema? Proposal: No; rewards reads only documented fields and uses.get(..., default)everywhere. Confirm with rewards critic.Deterministic episode_id. Currently
uuid4()β non-deterministic. This is intentional for audit uniqueness but means tworeset(seed=42)calls produce differentepisode_ids. Is that acceptable to DESIGN.md Β§4.2's "Deterministic for reproducibility"? Proposal: Yes β the trajectory is deterministic, the audit ID is not. Same stance as rewards.md (episode_id is opaque). Flag for gate-D cross-doc review.Thread-safety guard strength.
E7 ConcurrentStepErrorrequires a per-instance_step_in_progressbool. This catches re-entrancy but not two threads colliding outside the GIL (same bool is not atomic). Do we needthreading.Lock? Proposal: No β Β§3.9 declares single-threaded contract; the bool is diagnostic, not an invariant. Note in code comment; do not add a lock (adds import, slows step).Audio engine lifecycle on
close(). TTS/ASR engines are singletons (audio.md Β§2.1get_tts_engine/ Β§2.2get_asr_engine). If multiple envs share them,close()must NOT free the engine. Proposal:close()releases only per-env state (dropsself._state,self._rewards,self._episode) and setsself._closed=True. Audio engines are process-global and outlive the env. Confirm at critic gate.