harsharajkumar273 commited on
Commit
7c2c5f2
·
1 Parent(s): 3493624

Merge V2 review and dry-run mechanics

Browse files
README.md CHANGED
@@ -13,33 +13,52 @@ tags:
13
 
14
  # CleanOps OpenEnv
15
 
16
- CleanOps is a real-world OpenEnv benchmark where an agent cleans operational
17
- tabular data from CRM, order, subscription, and payment pipelines. The agent
18
- must inspect tables, choose remediation operations, avoid destructive shortcuts,
19
- and submit a cleaned dataset scored by deterministic graders.
20
-
21
- This is intentionally not a game or toy task. It models the kind of operational
22
- data cleanup that sales ops, RevOps, support ops, and data platform teams perform
23
- before loading systems-of-record or analytics warehouses.
24
-
25
- ## Why This Environment Is Useful
26
-
27
- - Realistic domain: tabular data standardization, missing-value repair,
28
- deduplication, and referential integrity fixes.
29
- - Deterministic programmatic graders: every task returns a reproducible
30
- `0.0-1.0` score with interpretable components.
31
- - Dense reward shaping: reward is driven by score deltas, issue-count reduction,
32
- inspection bonuses, step costs, no-op penalties, and submission bonuses.
33
- - Curriculum-ready tasks: one easy, one medium, and one hard task with increasing
34
- schema complexity and cross-table dependencies.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
 
36
  ## Task Suite
37
 
38
  | Task ID | Difficulty | Description |
39
  |---|---|---|
40
- | `customer_contacts_easy` | Easy | Clean a CRM contacts export by normalizing names/emails/phones/states, filling one missing state, and merging duplicate customers without dropping inactive accounts. |
41
- | `orders_reconciliation_medium` | Medium | Clean an e-commerce order extract by standardizing dates, currency, amounts, statuses, and shipping states while deduplicating repeated exports and preserving cancelled orders. |
42
- | `crm_migration_hard` | Hard | Repair a 3-table CRM migration extract by normalizing customer/subscription/payment fields, merging duplicate customer IDs, fixing foreign keys from email joins, and removing duplicate payment facts. |
43
 
44
  ## API
45
 
@@ -65,11 +84,10 @@ state = env.state()
65
  ### OpenEnv Server API
66
 
67
  ```bash
68
- cd /Users/harsharajkumar/Downloads/research_paper_simplifier-main/meta
69
  PYTHONPATH="$PWD" python -m server.app --host 0.0.0.0 --port 8000
70
  ```
71
 
72
- Then use the typed WebSocket client:
73
 
74
  ```python
75
  from cleanops_env import CleanOpsEnvClient, DataCleaningAction
@@ -92,30 +110,26 @@ with CleanOpsEnvClient(base_url="http://127.0.0.1:8000") as env:
92
 
93
  | Field | Type | Meaning |
94
  |---|---|---|
95
- | `action_type` | `"inspect_table" \| "inspect_operation" \| "apply_operation" \| "submit"` | Selects the action family. |
96
  | `table_name` | `str \| null` | Table to inspect when `action_type="inspect_table"`. |
97
  | `operation_id` | `str \| null` | Cleaning operation to inspect/apply. |
 
 
98
  | `reasoning` | `str` | Optional trace text used by baseline scripts. |
99
- | `metadata` | `dict` | OpenEnv metadata channel. |
100
 
101
  ## Observation Space
102
 
103
- `DataCleaningObservation` extends OpenEnv's typed `Observation` model and includes:
104
 
105
  | Field | Meaning |
106
  |---|---|
107
- | `task_id`, `task_title`, `difficulty`, `objective`, `dataset_context` | Task metadata and objective. |
108
  | `quality_score`, `best_score`, `grader` | Deterministic score and score decomposition. |
109
- | `remaining_steps`, `done`, `reward`, `reward_breakdown` | Episode and reward state. |
110
- | `table_summaries` | Compact per-table statistics and previews. |
111
- | `focus_table` | Full rows for the currently inspected table. |
112
- | `available_operations` | Typed catalog of cleaning actions and risk labels. |
113
- | `focus_operation` | Predicted row-level before/after diff for an inspected operation. |
114
  | `validation_issues`, `issue_cards` | Current rule failures and remediation hints. |
115
- | `recent_history`, `last_action_status`, `last_action_error`, `metadata` | Interaction trace and episode metadata. |
116
-
117
- `DataCleaningState` returns the current mutable tables, applied operations,
118
- inspection history, step count, and score state.
119
 
120
  ## Reward Function
121
 
@@ -123,26 +137,42 @@ Each step computes:
123
 
124
  ```text
125
  reward =
126
- 1.25 * score_delta
127
  + 0.35 * issue_count_delta
 
128
  + inspection_bonus
 
129
  + step_penalty
130
  + invalid_action_penalty
131
  + no_op_penalty
 
 
132
  + submit_bonus
133
  ```
134
 
135
- This gives partial progress credit throughout the trajectory and penalizes
136
- repeat/no-op actions, invalid operations, and low-quality premature submission.
 
 
 
 
 
 
 
 
 
 
 
 
137
 
138
  ## Grading
139
 
140
- Each task uses a deterministic grader that outputs a final score in `[0.0, 1.0]`
141
  from three components:
142
 
143
- - `cell_match_score`: exact canonicalized cell match against gold cleaned tables.
144
- - `key_recall_score`: entity/row identity quality after dedupe and row retention.
145
- - `validation_score`: fraction of unresolved data-quality checks eliminated.
146
 
147
  Final score:
148
 
@@ -153,7 +183,8 @@ Final score:
153
  ## Setup
154
 
155
  ```bash
156
- cd /Users/harsharajkumar/Downloads/research_paper_simplifier-main/meta
 
157
  python -m venv .venv
158
  source .venv/bin/activate
159
  pip install -e ".[dev]"
@@ -162,7 +193,6 @@ pip install -e ".[dev]"
162
  ## Validate
163
 
164
  ```bash
165
- cd /Users/harsharajkumar/Downloads/research_paper_simplifier-main/meta
166
  openenv validate --verbose
167
  pytest -q
168
  ```
@@ -188,92 +218,50 @@ Environment variables:
188
  | `LOCAL_IMAGE_NAME` | Optional local Docker image name used with `CleanOpsEnvClient.from_docker_image()`. |
189
  | `TASK_NAME` | Task to run, or `all` for all tasks. Defaults to `all`. |
190
 
191
- Example:
192
-
193
- ```bash
194
- cd /Users/harsharajkumar/Downloads/research_paper_simplifier-main/meta
195
- export API_BASE_URL="https://router.huggingface.co/v1"
196
- export MODEL_NAME="Qwen/Qwen2.5-72B-Instruct"
197
- export HF_TOKEN="..."
198
- PYTHONPATH="$PWD" python inference.py
199
- ```
200
-
201
  ## Baselines
202
 
203
  ### Deterministic Oracle Smoke Baseline
204
 
205
  ```bash
206
- cd /Users/harsharajkumar/Downloads/research_paper_simplifier-main/meta
207
  PYTHONPATH="$PWD" python scripts/run_oracle_smoke.py
208
  ```
209
 
210
- Expected scores measured locally:
211
 
212
  | Task ID | Score | Steps | Total Reward |
213
  |---|---:|---:|---:|
214
- | `customer_contacts_easy` | 1.0000 | 7 | 1.1430 |
215
- | `orders_reconciliation_medium` | 1.0000 | 6 | 1.0222 |
216
- | `crm_migration_hard` | 1.0000 | 8 | 1.0827 |
217
- | Mean | 1.0000 | - | - |
218
 
219
  ### OpenAI Baseline Agent
220
 
221
  ```bash
222
- cd /Users/harsharajkumar/Downloads/research_paper_simplifier-main/meta
223
  export OPENAI_API_KEY="..."
224
  export OPENAI_MODEL="gpt-4.1-mini"
225
  export OPENAI_SEED=7
226
  PYTHONPATH="$PWD" python scripts/run_openai_baseline.py --output openai_baseline.json
227
  ```
228
 
229
- The OpenAI runner uses temperature `0`, fixed seed values, and the typed
230
- `DataCleaningAction` schema to produce reproducible rollouts.
231
-
232
  ## Docker
233
 
234
  ```bash
235
- cd /Users/harsharajkumar/Downloads/research_paper_simplifier-main/meta
236
  docker build -t cleanops-env:latest .
237
  docker run --rm -p 8000:8000 cleanops-env:latest
238
  curl http://127.0.0.1:8000/health
239
  ```
240
 
241
- ## Hugging Face Spaces Deployment
242
-
243
- 1. Create a new Docker Space.
244
- 2. Upload this directory as the Space repo contents.
245
- 3. Keep the README metadata frontmatter and `Dockerfile` at repo root.
246
- 4. Ensure the Space has the `openenv` tag.
247
- 5. If needed, push with the OpenEnv CLI:
248
-
249
- ```bash
250
- cd /Users/harsharajkumar/Downloads/research_paper_simplifier-main/meta
251
- openenv push
252
- ```
253
-
254
  ## Project Structure
255
 
256
  ```text
257
- meta/
258
  ├── cleanops_env/
259
- │ ├── client.py
260
- │ ├── environment.py
261
- │ ├── graders.py
262
- │ ├── local_env.py
263
- │ ├── models.py
264
- │ └── tasks.py
265
  ├── scripts/
266
- │ ├── run_openai_baseline.py
267
- │ └── run_oracle_smoke.py
268
  ├── server/
269
- │ ├── app.py
270
- │ └── Dockerfile
271
  ├── tests/
272
- │ └── test_environment.py
273
  ├── Dockerfile
274
  ├── inference.py
275
  ├── openenv.yaml
276
- ├── pyproject.toml
277
- ├── uv.lock
278
  └── README.md
279
  ```
 
13
 
14
  # CleanOps OpenEnv
15
 
16
+ CleanOps is a real-world OpenEnv benchmark for evaluating AI agents on
17
+ operational data-cleaning workflows. Instead of solving a toy problem, the
18
+ agent has to inspect messy business tables, choose remediation operations,
19
+ escalate ambiguous records for human review, run downstream dry-run syncs, and
20
+ submit a cleaned dataset scored by deterministic graders.
21
+
22
+ The benchmark models the kind of cleanup work that sales ops, RevOps, support
23
+ ops, and data platform teams perform before loading data into CRMs, billing
24
+ systems, and analytics warehouses.
25
+
26
+ ## Live Links
27
+
28
+ - Hugging Face Space: [harsharajkumar273/cleanops-openenv](https://huggingface.co/spaces/harsharajkumar273/cleanops-openenv)
29
+ - Live App: [harsharajkumar273-cleanops-openenv.hf.space](https://harsharajkumar273-cleanops-openenv.hf.space/)
30
+ - GitHub Repository: [harsharajkumar/cleanops-openenv](https://github.com/harsharajkumar/cleanops-openenv)
31
+
32
+ ## Highlights
33
+
34
+ - Real-world benchmark: evaluates agents on CRM, order, subscription, and
35
+ payment cleanup rather than games.
36
+ - Full OpenEnv implementation: typed `Action`, `Observation`, and `State`
37
+ models plus `reset()`, `step()`, and `state()`.
38
+ - Human-in-the-loop realism: agents can request deterministic review responses
39
+ for ambiguous records.
40
+ - Downstream simulation: agents can run CRM or billing dry runs before submit.
41
+ - Cost-aware reward shaping: the environment rewards useful progress while
42
+ penalizing wasted review budget, repeated actions, and risky shortcuts.
43
+
44
+ ## What The Agent Does
45
+
46
+ On each episode, the agent:
47
+
48
+ 1. inspects noisy business tables and validation issues
49
+ 2. chooses from a typed catalog of cleaning operations
50
+ 3. requests review for ambiguous merges or broken references
51
+ 4. runs deterministic downstream dry runs against CRM or billing systems
52
+ 5. applies targeted fixes while avoiding destructive shortcuts
53
+ 6. submits the cleaned dataset for deterministic scoring
54
 
55
  ## Task Suite
56
 
57
  | Task ID | Difficulty | Description |
58
  |---|---|---|
59
+ | `customer_contacts_easy` | Easy | Clean a CRM contacts export by normalizing names/emails/phones/states, handling one reviewable duplicate, and preparing the table for CRM import. |
60
+ | `orders_reconciliation_medium` | Medium | Clean an e-commerce order extract by standardizing dates, currency, amounts, statuses, and shipping states while preserving returned orders and checking downstream billing readiness. |
61
+ | `crm_migration_hard` | Hard | Repair a 3-table CRM migration extract with duplicate customers, broken foreign keys, ambiguous payment/customer linkages, review escalation, and CRM/billing dry-run checks. |
62
 
63
  ## API
64
 
 
84
  ### OpenEnv Server API
85
 
86
  ```bash
 
87
  PYTHONPATH="$PWD" python -m server.app --host 0.0.0.0 --port 8000
88
  ```
89
 
90
+ Then use the typed client:
91
 
92
  ```python
93
  from cleanops_env import CleanOpsEnvClient, DataCleaningAction
 
110
 
111
  | Field | Type | Meaning |
112
  |---|---|---|
113
+ | `action_type` | `"inspect_table" \| "inspect_operation" \| "apply_operation" \| "request_review" \| "run_sync_dry_run" \| "submit"` | Selects the action family. |
114
  | `table_name` | `str \| null` | Table to inspect when `action_type="inspect_table"`. |
115
  | `operation_id` | `str \| null` | Cleaning operation to inspect/apply. |
116
+ | `entity_type`, `entity_id`, `reason_code` | `str \| null` | Structured review request fields for ambiguous entities. |
117
+ | `target_system` | `"crm" \| "billing" \| null` | Downstream system to test with a dry run. |
118
  | `reasoning` | `str` | Optional trace text used by baseline scripts. |
 
119
 
120
  ## Observation Space
121
 
122
+ `DataCleaningObservation` includes:
123
 
124
  | Field | Meaning |
125
  |---|---|
 
126
  | `quality_score`, `best_score`, `grader` | Deterministic score and score decomposition. |
127
+ | `review_budget_remaining`, `available_review_targets`, `pending_reviews`, `resolved_reviews` | Human-review queue state. |
128
+ | `supported_sync_targets`, `downstream_health`, `risk_cards`, `last_dry_run` | Downstream business-system simulation state. |
129
+ | `action_costs` | Estimated cost profile for the action families available in this benchmark. |
130
+ | `table_summaries`, `focus_table`, `available_operations`, `focus_operation` | Structured data/task context for the agent. |
 
131
  | `validation_issues`, `issue_cards` | Current rule failures and remediation hints. |
132
+ | `recent_history`, `last_action_status`, `last_action_error` | Interaction trace and outcome details. |
 
 
 
133
 
134
  ## Reward Function
135
 
 
137
 
138
  ```text
139
  reward =
140
+ 1.00 * score_delta
141
  + 0.35 * issue_count_delta
142
+ + 0.55 * downstream_health_delta
143
  + inspection_bonus
144
+ + review_bonus
145
  + step_penalty
146
  + invalid_action_penalty
147
  + no_op_penalty
148
+ + review_cost_penalty
149
+ + action_cost_penalty
150
  + submit_bonus
151
  ```
152
 
153
+ This gives partial progress credit throughout the trajectory while penalizing
154
+ invalid actions, repeated work, wasted review budget, and low-quality
155
+ submission.
156
+
157
+ ## System Design
158
+
159
+ - `cleanops_env/tasks.py`: task definitions, gold tables, operation catalog,
160
+ review cases, and sync-target support.
161
+ - `cleanops_env/graders.py`: deterministic table-quality grading and validation
162
+ checks.
163
+ - `cleanops_env/environment.py`: episode state, reward shaping, review queues,
164
+ dry-run simulation, and typed `step()` / `reset()` / `state()`.
165
+ - `server/app.py`: FastAPI/OpenEnv server plus the Hugging Face demo UI.
166
+ - `inference.py`: submission-ready baseline runner with structured logs.
167
 
168
  ## Grading
169
 
170
+ Each task uses a deterministic grader that outputs a final score in `(0.0, 1.0)`
171
  from three components:
172
 
173
+ - `cell_match_score`
174
+ - `key_recall_score`
175
+ - `validation_score`
176
 
177
  Final score:
178
 
 
183
  ## Setup
184
 
185
  ```bash
186
+ git clone https://github.com/harsharajkumar/cleanops-openenv.git
187
+ cd cleanops-openenv
188
  python -m venv .venv
189
  source .venv/bin/activate
190
  pip install -e ".[dev]"
 
193
  ## Validate
194
 
195
  ```bash
 
196
  openenv validate --verbose
197
  pytest -q
198
  ```
 
218
  | `LOCAL_IMAGE_NAME` | Optional local Docker image name used with `CleanOpsEnvClient.from_docker_image()`. |
219
  | `TASK_NAME` | Task to run, or `all` for all tasks. Defaults to `all`. |
220
 
 
 
 
 
 
 
 
 
 
 
221
  ## Baselines
222
 
223
  ### Deterministic Oracle Smoke Baseline
224
 
225
  ```bash
 
226
  PYTHONPATH="$PWD" python scripts/run_oracle_smoke.py
227
  ```
228
 
229
+ Expected local scores:
230
 
231
  | Task ID | Score | Steps | Total Reward |
232
  |---|---:|---:|---:|
233
+ | `customer_contacts_easy` | 0.9900 | 7 | 1.1280 |
234
+ | `orders_reconciliation_medium` | 0.9900 | 6 | 1.0325 |
235
+ | `crm_migration_hard` | 0.9900 | 8 | 1.2568 |
236
+ | Mean | 0.9900 | - | - |
237
 
238
  ### OpenAI Baseline Agent
239
 
240
  ```bash
 
241
  export OPENAI_API_KEY="..."
242
  export OPENAI_MODEL="gpt-4.1-mini"
243
  export OPENAI_SEED=7
244
  PYTHONPATH="$PWD" python scripts/run_openai_baseline.py --output openai_baseline.json
245
  ```
246
 
 
 
 
247
  ## Docker
248
 
249
  ```bash
 
250
  docker build -t cleanops-env:latest .
251
  docker run --rm -p 8000:8000 cleanops-env:latest
252
  curl http://127.0.0.1:8000/health
253
  ```
254
 
 
 
 
 
 
 
 
 
 
 
 
 
 
255
  ## Project Structure
256
 
257
  ```text
258
+ cleanops-openenv/
259
  ├── cleanops_env/
 
 
 
 
 
 
260
  ├── scripts/
 
 
261
  ├── server/
 
 
262
  ├── tests/
 
263
  ├── Dockerfile
264
  ├── inference.py
265
  ├── openenv.yaml
 
 
266
  └── README.md
267
  ```
cleanops_env/__init__.py CHANGED
@@ -4,19 +4,32 @@ from cleanops_env.client import CleanOpsEnvClient
4
  from cleanops_env.environment import CleanOpsEnvironment
5
  from cleanops_env.local_env import LocalCleanOpsEnv
6
  from cleanops_env.models import (
 
7
  DataCleaningAction,
8
  DataCleaningObservation,
9
  DataCleaningState,
 
 
 
 
10
  RewardBreakdown,
 
 
11
  )
12
 
13
  __all__ = [
14
  "CleanOpsEnvClient",
15
  "CleanOpsEnvironment",
 
16
  "DataCleaningAction",
17
  "DataCleaningObservation",
18
  "DataCleaningState",
 
 
 
19
  "LocalCleanOpsEnv",
 
20
  "RewardBreakdown",
 
 
21
  ]
22
-
 
4
  from cleanops_env.environment import CleanOpsEnvironment
5
  from cleanops_env.local_env import LocalCleanOpsEnv
6
  from cleanops_env.models import (
7
+ ActionCostEntry,
8
  DataCleaningAction,
9
  DataCleaningObservation,
10
  DataCleaningState,
11
+ DownstreamHealth,
12
+ DryRunFinding,
13
+ DryRunReport,
14
+ PendingReview,
15
  RewardBreakdown,
16
+ ReviewResolution,
17
+ ReviewTarget,
18
  )
19
 
20
  __all__ = [
21
  "CleanOpsEnvClient",
22
  "CleanOpsEnvironment",
23
+ "ActionCostEntry",
24
  "DataCleaningAction",
25
  "DataCleaningObservation",
26
  "DataCleaningState",
27
+ "DownstreamHealth",
28
+ "DryRunFinding",
29
+ "DryRunReport",
30
  "LocalCleanOpsEnv",
31
+ "PendingReview",
32
  "RewardBreakdown",
33
+ "ReviewResolution",
34
+ "ReviewTarget",
35
  ]
 
cleanops_env/environment.py CHANGED
@@ -9,18 +9,27 @@ from uuid import uuid4
9
  from openenv.core.env_server.interfaces import Environment
10
  from openenv.core.env_server.types import EnvironmentMetadata
11
 
12
- from cleanops_env.graders import build_table_summary, grade_tables
13
  from cleanops_env.models import (
 
14
  DataCleaningAction,
15
  DataCleaningObservation,
16
  DataCleaningState,
 
 
 
17
  OperationDetail,
18
  OperationSummary,
 
 
 
 
19
  RewardBreakdown,
20
  RowChange,
21
  TableView,
22
  )
23
  from cleanops_env.tasks import (
 
24
  TaskSpec,
25
  apply_operation_to_tables,
26
  clone_tables,
@@ -31,6 +40,28 @@ from cleanops_env.tasks import (
31
  sorted_rows,
32
  )
33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
 
35
  class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservation, DataCleaningState]):
36
  """A realistic data-cleaning workflow environment with deterministic graders."""
@@ -46,6 +77,8 @@ class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservatio
46
  self._focus_operation_detail: OperationDetail | None = None
47
  self._done = False
48
  self._initial_issue_count = max(1, len(self._grade.validation_issues))
 
 
49
  self._state = DataCleaningState(
50
  episode_id=str(uuid4()),
51
  step_count=0,
@@ -54,14 +87,22 @@ class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservatio
54
  difficulty=self._task_spec.difficulty,
55
  requested_seed=None,
56
  max_steps=self._task_spec.max_steps,
 
 
57
  submitted=False,
58
  current_score=self._grade.score,
59
  best_score=self._grade.score,
60
  outstanding_issue_count=len(self._grade.validation_issues),
61
- tables=clone_tables(self._task_spec.dirty_tables),
 
 
62
  applied_operation_ids=[],
63
  inspected_tables=[self._focus_table_name],
64
  inspected_operations=[],
 
 
 
 
65
  recent_history=[],
66
  )
67
 
@@ -81,6 +122,8 @@ class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservatio
81
  self._done = False
82
  self._grade = grade_tables(self._task_spec, self._task_spec.dirty_tables)
83
  self._initial_issue_count = max(1, len(self._grade.validation_issues))
 
 
84
  self._state = DataCleaningState(
85
  episode_id=episode_id or str(uuid4()),
86
  step_count=0,
@@ -89,14 +132,22 @@ class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservatio
89
  difficulty=self._task_spec.difficulty,
90
  requested_seed=normalized_seed,
91
  max_steps=self._task_spec.max_steps,
 
 
92
  submitted=False,
93
  current_score=self._grade.score,
94
  best_score=self._grade.score,
95
  outstanding_issue_count=len(self._grade.validation_issues),
96
- tables=clone_tables(self._task_spec.dirty_tables),
 
 
97
  applied_operation_ids=[],
98
  inspected_tables=[self._focus_table_name],
99
  inspected_operations=[],
 
 
 
 
100
  recent_history=[f"reset -> loaded task {self._task_spec.task_id} ({self._task_spec.difficulty}) seed={normalized_seed}"],
101
  )
102
  return self._build_observation(
@@ -127,13 +178,20 @@ class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservatio
127
  self._state.step_count += 1
128
  previous_score = self._state.current_score
129
  previous_issue_count = self._state.outstanding_issue_count
 
130
 
131
  invalid_action_penalty = 0.0
132
  noop_penalty = 0.0
133
  insight_bonus = 0.0
 
 
 
134
  submit_bonus = 0.0
135
  status_message = ""
136
  action_error: str | None = None
 
 
 
137
 
138
  if action.action_type == "inspect_table":
139
  table_name = normalize_whitespace(action.table_name or "")
@@ -189,36 +247,119 @@ class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservatio
189
  if self._task_spec.operations[operation_id].tables_affected:
190
  self._focus_table_name = self._task_spec.operations[operation_id].tables_affected[0]
191
  status_message = f"Applied '{operation_id}' to {affected_tables or 'current tables'}."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192
  elif action.action_type == "submit":
193
  self._state.submitted = True
194
  self._done = True
195
  status_message = "Submitted cleaned tables for grading."
196
 
 
 
197
  self._grade = grade_tables(self._task_spec, self._state.tables)
198
  self._state.current_score = self._grade.score
199
  self._state.best_score = max(self._state.best_score, self._grade.score)
200
  self._state.outstanding_issue_count = len(self._grade.validation_issues)
 
201
 
202
  quality_delta = round(self._state.current_score - previous_score, 4)
203
  issue_delta = round((previous_issue_count - self._state.outstanding_issue_count) / self._initial_issue_count, 4)
 
204
  efficiency_penalty = -0.01
205
 
206
  if action.action_type == "submit":
207
- submit_bonus = round(0.4 * self._state.current_score, 4) if self._state.current_score >= 0.8 else round(-0.2 * (1.0 - self._state.current_score), 4)
 
208
 
209
  if self._state.step_count >= self._state.max_steps and not self._done:
210
  self._done = True
211
  self._state.submitted = False
212
  status_message = f"{status_message} Step budget exhausted; episode truncated.".strip()
213
 
214
- reward_total = round(1.25 * quality_delta + 0.35 * issue_delta + insight_bonus + efficiency_penalty + invalid_action_penalty + noop_penalty + submit_bonus, 4)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
215
  reward_breakdown = RewardBreakdown(
216
  quality_delta=quality_delta,
217
  issue_delta=issue_delta,
 
218
  insight_bonus=insight_bonus,
 
219
  efficiency_penalty=efficiency_penalty,
220
  invalid_action_penalty=invalid_action_penalty,
221
  noop_penalty=noop_penalty,
 
 
222
  submit_bonus=submit_bonus,
223
  total=reward_total,
224
  )
@@ -228,6 +369,10 @@ class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservatio
228
  action_descriptor += f"[{action.operation_id}]"
229
  if action.table_name:
230
  action_descriptor += f"[{action.table_name}]"
 
 
 
 
231
  self._state.recent_history.append(f"step {self._state.step_count}: {action_descriptor} -> score={self._state.current_score:.4f}")
232
  self._state.recent_history = self._state.recent_history[-10:]
233
 
@@ -274,6 +419,18 @@ class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservatio
274
  )
275
  for operation in sorted(self._task_spec.operations.values(), key=lambda op: op.operation_id)
276
  ]
 
 
 
 
 
 
 
 
 
 
 
 
277
  return DataCleaningObservation(
278
  task_id=self._task_spec.task_id,
279
  task_title=self._task_spec.title,
@@ -284,9 +441,18 @@ class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservatio
284
  quality_score=self._state.current_score,
285
  best_score=self._state.best_score,
286
  remaining_steps=max(0, self._state.max_steps - self._state.step_count),
 
 
 
 
 
 
287
  table_summaries=summaries,
288
  focus_table=focus_table,
289
  available_operations=available_operations,
 
 
 
290
  focus_operation=self._focus_operation_detail,
291
  validation_issues=self._grade.validation_issues,
292
  issue_cards=list(self._task_spec.issue_cards),
@@ -301,6 +467,9 @@ class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservatio
301
  "episode_id": self._state.episode_id,
302
  "requested_seed": self._state.requested_seed,
303
  "applied_operation_ids": list(self._state.applied_operation_ids),
 
 
 
304
  "submitted": self._state.submitted,
305
  },
306
  )
@@ -334,6 +503,229 @@ class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservatio
334
  random.Random(seed + sum(ord(char) for char in table_name)).shuffle(shuffled_rows)
335
  return shuffled_rows
336
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
337
  def _build_operation_detail(
338
  self,
339
  task_spec: TaskSpec,
 
9
  from openenv.core.env_server.interfaces import Environment
10
  from openenv.core.env_server.types import EnvironmentMetadata
11
 
12
+ from cleanops_env.graders import build_table_summary, count_duplicate_groups, grade_tables
13
  from cleanops_env.models import (
14
+ ActionCostEntry,
15
  DataCleaningAction,
16
  DataCleaningObservation,
17
  DataCleaningState,
18
+ DownstreamHealth,
19
+ DryRunFinding,
20
+ DryRunReport,
21
  OperationDetail,
22
  OperationSummary,
23
+ PendingReview,
24
+ ReviewResolution,
25
+ ReviewTarget,
26
+ RiskCard,
27
  RewardBreakdown,
28
  RowChange,
29
  TableView,
30
  )
31
  from cleanops_env.tasks import (
32
+ ReviewCaseSpec,
33
  TaskSpec,
34
  apply_operation_to_tables,
35
  clone_tables,
 
40
  sorted_rows,
41
  )
42
 
43
+ ACTION_COSTS: dict[str, float] = {
44
+ "inspect_table": 0.005,
45
+ "inspect_operation": 0.005,
46
+ "apply_operation:safe": 0.01,
47
+ "apply_operation:review": 0.015,
48
+ "apply_operation:destructive": 0.03,
49
+ "request_review": 0.025,
50
+ "run_sync_dry_run": 0.02,
51
+ "submit": 0.005,
52
+ }
53
+
54
+ ACTION_COST_DESCRIPTIONS: dict[str, str] = {
55
+ "inspect_table": "Low-cost inspection to understand current records.",
56
+ "inspect_operation": "Low-cost preview to inspect an operation before applying it.",
57
+ "apply_operation:safe": "Safe automated cleanup with low operational risk.",
58
+ "apply_operation:review": "Review-sensitive cleanup that should be used more deliberately.",
59
+ "apply_operation:destructive": "Destructive cleanup with higher business risk if applied incorrectly.",
60
+ "request_review": "Consumes limited human-review budget to resolve ambiguity safely.",
61
+ "run_sync_dry_run": "Runs a deterministic downstream system simulation before submit.",
62
+ "submit": "Low-cost finalization step after cleanup is complete.",
63
+ }
64
+
65
 
66
  class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservation, DataCleaningState]):
67
  """A realistic data-cleaning workflow environment with deterministic graders."""
 
77
  self._focus_operation_detail: OperationDetail | None = None
78
  self._done = False
79
  self._initial_issue_count = max(1, len(self._grade.validation_issues))
80
+ initial_tables = clone_tables(self._task_spec.dirty_tables)
81
+ initial_downstream_health = self._compute_downstream_health(self._task_spec, initial_tables, self._grade.validation_issues)
82
  self._state = DataCleaningState(
83
  episode_id=str(uuid4()),
84
  step_count=0,
 
87
  difficulty=self._task_spec.difficulty,
88
  requested_seed=None,
89
  max_steps=self._task_spec.max_steps,
90
+ review_budget_total=self._task_spec.review_budget,
91
+ review_budget_remaining=self._task_spec.review_budget,
92
  submitted=False,
93
  current_score=self._grade.score,
94
  best_score=self._grade.score,
95
  outstanding_issue_count=len(self._grade.validation_issues),
96
+ downstream_health=initial_downstream_health,
97
+ last_dry_run=None,
98
+ tables=initial_tables,
99
  applied_operation_ids=[],
100
  inspected_tables=[self._focus_table_name],
101
  inspected_operations=[],
102
+ requested_review_ids=[],
103
+ pending_reviews=[],
104
+ resolved_reviews=[],
105
+ dry_run_targets=[],
106
  recent_history=[],
107
  )
108
 
 
122
  self._done = False
123
  self._grade = grade_tables(self._task_spec, self._task_spec.dirty_tables)
124
  self._initial_issue_count = max(1, len(self._grade.validation_issues))
125
+ initial_tables = clone_tables(self._task_spec.dirty_tables)
126
+ initial_downstream_health = self._compute_downstream_health(self._task_spec, initial_tables, self._grade.validation_issues)
127
  self._state = DataCleaningState(
128
  episode_id=episode_id or str(uuid4()),
129
  step_count=0,
 
132
  difficulty=self._task_spec.difficulty,
133
  requested_seed=normalized_seed,
134
  max_steps=self._task_spec.max_steps,
135
+ review_budget_total=self._task_spec.review_budget,
136
+ review_budget_remaining=self._task_spec.review_budget,
137
  submitted=False,
138
  current_score=self._grade.score,
139
  best_score=self._grade.score,
140
  outstanding_issue_count=len(self._grade.validation_issues),
141
+ downstream_health=initial_downstream_health,
142
+ last_dry_run=None,
143
+ tables=initial_tables,
144
  applied_operation_ids=[],
145
  inspected_tables=[self._focus_table_name],
146
  inspected_operations=[],
147
+ requested_review_ids=[],
148
+ pending_reviews=[],
149
+ resolved_reviews=[],
150
+ dry_run_targets=[],
151
  recent_history=[f"reset -> loaded task {self._task_spec.task_id} ({self._task_spec.difficulty}) seed={normalized_seed}"],
152
  )
153
  return self._build_observation(
 
178
  self._state.step_count += 1
179
  previous_score = self._state.current_score
180
  previous_issue_count = self._state.outstanding_issue_count
181
+ previous_downstream_score = self._state.downstream_health.overall_health_score
182
 
183
  invalid_action_penalty = 0.0
184
  noop_penalty = 0.0
185
  insight_bonus = 0.0
186
+ review_bonus = 0.0
187
+ review_cost_penalty = 0.0
188
+ action_cost_penalty = 0.0
189
  submit_bonus = 0.0
190
  status_message = ""
191
  action_error: str | None = None
192
+ released_reviews = self._release_ready_reviews()
193
+ if released_reviews:
194
+ review_bonus = round(0.04 * len(released_reviews), 4)
195
 
196
  if action.action_type == "inspect_table":
197
  table_name = normalize_whitespace(action.table_name or "")
 
247
  if self._task_spec.operations[operation_id].tables_affected:
248
  self._focus_table_name = self._task_spec.operations[operation_id].tables_affected[0]
249
  status_message = f"Applied '{operation_id}' to {affected_tables or 'current tables'}."
250
+ elif action.action_type == "request_review":
251
+ entity_type = normalize_whitespace(action.entity_type or "").lower()
252
+ entity_id = normalize_whitespace(action.entity_id or "")
253
+ reason_code = normalize_whitespace(action.reason_code or "")
254
+ review_case = self._find_review_case(entity_type, entity_id, reason_code)
255
+ if not entity_type or not entity_id or not reason_code:
256
+ invalid_action_penalty = -0.25
257
+ status_message = "request_review requires entity_type, entity_id, and reason_code."
258
+ action_error = status_message
259
+ elif review_case is None:
260
+ invalid_action_penalty = -0.2
261
+ status_message = f"No deterministic review case exists for {entity_type}:{entity_id} ({reason_code})."
262
+ action_error = status_message
263
+ elif review_case.review_id in self._state.requested_review_ids:
264
+ noop_penalty = -0.05
265
+ status_message = f"Review '{review_case.review_id}' was already requested."
266
+ elif self._state.review_budget_remaining <= 0:
267
+ invalid_action_penalty = -0.18
268
+ status_message = "No review budget remaining for this episode."
269
+ action_error = status_message
270
+ else:
271
+ self._state.review_budget_remaining -= 1
272
+ self._state.requested_review_ids.append(review_case.review_id)
273
+ self._state.pending_reviews.append(
274
+ PendingReview(
275
+ review_id=review_case.review_id,
276
+ entity_type=review_case.entity_type,
277
+ entity_id=review_case.entity_id,
278
+ reason_code=review_case.reason_code,
279
+ title=review_case.title,
280
+ requested_at_step=self._state.step_count,
281
+ ready_at_step=self._state.step_count + 1,
282
+ )
283
+ )
284
+ review_cost_penalty = -0.02
285
+ status_message = (
286
+ f"Queued review '{review_case.review_id}' for {review_case.entity_type} {review_case.entity_id}; "
287
+ "response will be available on the next step."
288
+ )
289
+ elif action.action_type == "run_sync_dry_run":
290
+ target_system = action.target_system
291
+ if target_system is None:
292
+ invalid_action_penalty = -0.2
293
+ status_message = "run_sync_dry_run requires target_system."
294
+ action_error = status_message
295
+ elif target_system not in self._task_spec.sync_targets:
296
+ invalid_action_penalty = -0.2
297
+ status_message = f"Task '{self._task_spec.task_id}' does not support dry-run target '{target_system}'."
298
+ action_error = status_message
299
+ else:
300
+ self._state.last_dry_run = self._build_dry_run_report(target_system)
301
+ if target_system not in self._state.dry_run_targets:
302
+ self._state.dry_run_targets.append(target_system)
303
+ insight_bonus = max(insight_bonus, 0.01)
304
+ else:
305
+ noop_penalty = min(noop_penalty, -0.01)
306
+ status_message = self._state.last_dry_run.summary
307
  elif action.action_type == "submit":
308
  self._state.submitted = True
309
  self._done = True
310
  status_message = "Submitted cleaned tables for grading."
311
 
312
+ action_cost_penalty = -self._estimate_action_cost(action)
313
+
314
  self._grade = grade_tables(self._task_spec, self._state.tables)
315
  self._state.current_score = self._grade.score
316
  self._state.best_score = max(self._state.best_score, self._grade.score)
317
  self._state.outstanding_issue_count = len(self._grade.validation_issues)
318
+ self._state.downstream_health = self._compute_downstream_health(self._task_spec, self._state.tables, self._grade.validation_issues)
319
 
320
  quality_delta = round(self._state.current_score - previous_score, 4)
321
  issue_delta = round((previous_issue_count - self._state.outstanding_issue_count) / self._initial_issue_count, 4)
322
+ downstream_health_delta = round(self._state.downstream_health.overall_health_score - previous_downstream_score, 4)
323
  efficiency_penalty = -0.01
324
 
325
  if action.action_type == "submit":
326
+ submission_health = round(0.65 * self._state.current_score + 0.35 * self._state.downstream_health.overall_health_score, 4)
327
+ submit_bonus = round(0.4 * submission_health, 4) if submission_health >= 0.82 else round(-0.2 * (1.0 - submission_health), 4)
328
 
329
  if self._state.step_count >= self._state.max_steps and not self._done:
330
  self._done = True
331
  self._state.submitted = False
332
  status_message = f"{status_message} Step budget exhausted; episode truncated.".strip()
333
 
334
+ if released_reviews:
335
+ release_note = ", ".join(review.review_id for review in released_reviews)
336
+ status_message = f"{status_message} Review response available: {release_note}.".strip()
337
+
338
+ reward_total = round(
339
+ 1.0 * quality_delta
340
+ + 0.35 * issue_delta
341
+ + 0.55 * downstream_health_delta
342
+ + insight_bonus
343
+ + review_bonus
344
+ + efficiency_penalty
345
+ + invalid_action_penalty
346
+ + noop_penalty
347
+ + review_cost_penalty
348
+ + action_cost_penalty
349
+ + submit_bonus,
350
+ 4,
351
+ )
352
  reward_breakdown = RewardBreakdown(
353
  quality_delta=quality_delta,
354
  issue_delta=issue_delta,
355
+ downstream_health_delta=downstream_health_delta,
356
  insight_bonus=insight_bonus,
357
+ review_bonus=review_bonus,
358
  efficiency_penalty=efficiency_penalty,
359
  invalid_action_penalty=invalid_action_penalty,
360
  noop_penalty=noop_penalty,
361
+ review_cost_penalty=review_cost_penalty,
362
+ action_cost_penalty=action_cost_penalty,
363
  submit_bonus=submit_bonus,
364
  total=reward_total,
365
  )
 
369
  action_descriptor += f"[{action.operation_id}]"
370
  if action.table_name:
371
  action_descriptor += f"[{action.table_name}]"
372
+ if action.entity_id:
373
+ action_descriptor += f"[{action.entity_id}]"
374
+ if action.target_system:
375
+ action_descriptor += f"[{action.target_system}]"
376
  self._state.recent_history.append(f"step {self._state.step_count}: {action_descriptor} -> score={self._state.current_score:.4f}")
377
  self._state.recent_history = self._state.recent_history[-10:]
378
 
 
419
  )
420
  for operation in sorted(self._task_spec.operations.values(), key=lambda op: op.operation_id)
421
  ]
422
+ available_review_targets = [
423
+ ReviewTarget(
424
+ review_id=review_case.review_id,
425
+ entity_type=review_case.entity_type,
426
+ entity_id=review_case.entity_id,
427
+ reason_code=review_case.reason_code,
428
+ title=review_case.title,
429
+ detail=review_case.detail,
430
+ recommended_operation_ids=list(review_case.recommended_operation_ids),
431
+ )
432
+ for review_case in sorted(self._task_spec.review_cases.values(), key=lambda case: case.review_id)
433
+ ]
434
  return DataCleaningObservation(
435
  task_id=self._task_spec.task_id,
436
  task_title=self._task_spec.title,
 
441
  quality_score=self._state.current_score,
442
  best_score=self._state.best_score,
443
  remaining_steps=max(0, self._state.max_steps - self._state.step_count),
444
+ review_budget_remaining=self._state.review_budget_remaining,
445
+ supported_sync_targets=list(self._task_spec.sync_targets),
446
+ downstream_health=self._state.downstream_health,
447
+ risk_cards=self._build_risk_cards(),
448
+ last_dry_run=self._state.last_dry_run,
449
+ action_costs=self._build_action_cost_entries(),
450
  table_summaries=summaries,
451
  focus_table=focus_table,
452
  available_operations=available_operations,
453
+ available_review_targets=available_review_targets,
454
+ pending_reviews=list(self._state.pending_reviews),
455
+ resolved_reviews=list(self._state.resolved_reviews),
456
  focus_operation=self._focus_operation_detail,
457
  validation_issues=self._grade.validation_issues,
458
  issue_cards=list(self._task_spec.issue_cards),
 
467
  "episode_id": self._state.episode_id,
468
  "requested_seed": self._state.requested_seed,
469
  "applied_operation_ids": list(self._state.applied_operation_ids),
470
+ "review_budget_remaining": self._state.review_budget_remaining,
471
+ "requested_review_ids": list(self._state.requested_review_ids),
472
+ "dry_run_targets": list(self._state.dry_run_targets),
473
  "submitted": self._state.submitted,
474
  },
475
  )
 
503
  random.Random(seed + sum(ord(char) for char in table_name)).shuffle(shuffled_rows)
504
  return shuffled_rows
505
 
506
+ def _find_review_case(self, entity_type: str, entity_id: str, reason_code: str) -> ReviewCaseSpec | None:
507
+ for review_case in self._task_spec.review_cases.values():
508
+ if (
509
+ review_case.entity_type == entity_type
510
+ and review_case.entity_id == entity_id
511
+ and review_case.reason_code == reason_code
512
+ ):
513
+ return review_case
514
+ return None
515
+
516
+ def _release_ready_reviews(self) -> list[ReviewResolution]:
517
+ if not self._state.pending_reviews:
518
+ return []
519
+
520
+ still_pending: list[PendingReview] = []
521
+ released: list[ReviewResolution] = []
522
+ for pending_review in self._state.pending_reviews:
523
+ if pending_review.ready_at_step > self._state.step_count:
524
+ still_pending.append(pending_review)
525
+ continue
526
+ review_case = self._task_spec.review_cases[pending_review.review_id]
527
+ released_review = ReviewResolution(
528
+ review_id=review_case.review_id,
529
+ entity_type=review_case.entity_type,
530
+ entity_id=review_case.entity_id,
531
+ reason_code=review_case.reason_code,
532
+ title=review_case.title,
533
+ resolution=review_case.resolution,
534
+ response_summary=review_case.response_summary,
535
+ evidence_summary=review_case.evidence_summary,
536
+ recommended_operation_ids=list(review_case.recommended_operation_ids),
537
+ )
538
+ self._state.resolved_reviews.append(released_review)
539
+ released.append(released_review)
540
+ self._state.pending_reviews = still_pending
541
+ return released
542
+
543
+ def _estimate_action_cost(self, action: DataCleaningAction) -> float:
544
+ if action.action_type == "apply_operation":
545
+ operation = self._task_spec.operations.get(normalize_whitespace(action.operation_id or ""))
546
+ if operation is None:
547
+ return ACTION_COSTS["apply_operation:safe"]
548
+ if operation.risk == "review":
549
+ return ACTION_COSTS["apply_operation:review"]
550
+ if operation.risk == "destructive":
551
+ return ACTION_COSTS["apply_operation:destructive"]
552
+ return ACTION_COSTS["apply_operation:safe"]
553
+ return ACTION_COSTS.get(action.action_type, 0.01)
554
+
555
+ def _build_action_cost_entries(self) -> list[ActionCostEntry]:
556
+ return [
557
+ ActionCostEntry(action_key=action_key, estimated_cost=estimated_cost, description=ACTION_COST_DESCRIPTIONS[action_key])
558
+ for action_key, estimated_cost in ACTION_COSTS.items()
559
+ ]
560
+
561
+ @staticmethod
562
+ def _open_metric(value: float) -> float:
563
+ return round(min(0.99, max(0.01, value)), 4)
564
+
565
+ def _compute_downstream_health(
566
+ self,
567
+ task_spec: TaskSpec,
568
+ tables: dict[str, list[dict[str, str]]],
569
+ validation_issues: list,
570
+ ) -> DownstreamHealth:
571
+ customers = tables.get("customers", [])
572
+ orders = tables.get("orders", [])
573
+ subscriptions = tables.get("subscriptions", [])
574
+ payments = tables.get("payments", [])
575
+
576
+ crm_rows = max(1, len(customers) + len(subscriptions))
577
+ billing_rows = max(1, len(orders) + len(subscriptions) + len(payments))
578
+ payment_rows = max(1, len(orders) + len(payments))
579
+
580
+ crm_issue_weight = sum(max(1, len(issue.row_ids)) for issue in validation_issues if issue.table_name in {"customers", "subscriptions"})
581
+ billing_issue_weight = sum(
582
+ max(1, len(issue.row_ids))
583
+ for issue in validation_issues
584
+ if issue.table_name in {"orders", "payments", "subscriptions"}
585
+ and (issue.code.startswith("foreign_key:") or issue.code.startswith("required:") or issue.code.startswith("unique:"))
586
+ )
587
+ payment_issue_weight = sum(
588
+ max(1, len(issue.row_ids))
589
+ for issue in validation_issues
590
+ if issue.table_name in {"orders", "payments"}
591
+ )
592
+
593
+ customer_duplicate_groups = count_duplicate_groups(task_spec, "customers", customers) if "customers" in task_spec.duplicate_identity_columns else 0
594
+ customer_rows = max(1, len(customers))
595
+ payment_duplicate_groups = count_duplicate_groups(task_spec, "payments", payments) if "payments" in task_spec.duplicate_identity_columns else 0
596
+
597
+ crm_sync_success_rate = self._open_metric(1.0 - (crm_issue_weight / max(2, crm_rows * 2)))
598
+ if not orders and not payments:
599
+ billing_link_integrity = 0.99
600
+ revenue_reporting_risk = 0.01
601
+ else:
602
+ billing_link_integrity = self._open_metric(1.0 - (billing_issue_weight / max(2, billing_rows * 2)))
603
+ revenue_reporting_risk = self._open_metric(min(0.99, (payment_issue_weight / max(2, payment_rows * 2)) + (payment_duplicate_groups / max(1, payment_rows))))
604
+
605
+ duplicate_contact_risk = self._open_metric(min(0.99, (customer_duplicate_groups / customer_rows) + 0.06 * sum(1 for issue in validation_issues if issue.code.startswith("unique:customers"))))
606
+ overall_health_score = self._open_metric(
607
+ (
608
+ crm_sync_success_rate
609
+ + billing_link_integrity
610
+ + (1.0 - duplicate_contact_risk)
611
+ + (1.0 - revenue_reporting_risk)
612
+ )
613
+ / 4.0
614
+ )
615
+
616
+ return DownstreamHealth(
617
+ crm_sync_success_rate=crm_sync_success_rate,
618
+ billing_link_integrity=billing_link_integrity,
619
+ duplicate_contact_risk=duplicate_contact_risk,
620
+ revenue_reporting_risk=revenue_reporting_risk,
621
+ overall_health_score=overall_health_score,
622
+ )
623
+
624
+ def _build_risk_cards(self) -> list[RiskCard]:
625
+ health = self._state.downstream_health
626
+ cards = [
627
+ RiskCard(
628
+ title="CRM import risk",
629
+ detail="Customer and subscription issues can block CRM migration syncs.",
630
+ severity="high" if health.crm_sync_success_rate < 0.8 else "medium" if health.crm_sync_success_rate < 0.92 else "low",
631
+ metric_name="crm_sync_success_rate",
632
+ current_value=health.crm_sync_success_rate,
633
+ recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_tables({"customers", "subscriptions"})],
634
+ ),
635
+ RiskCard(
636
+ title="Billing linkage risk",
637
+ detail="Broken foreign keys or missing IDs can mislink orders, subscriptions, and payments.",
638
+ severity="high" if health.billing_link_integrity < 0.8 else "medium" if health.billing_link_integrity < 0.92 else "low",
639
+ metric_name="billing_link_integrity",
640
+ current_value=health.billing_link_integrity,
641
+ recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_tables({"orders", "subscriptions", "payments"})],
642
+ ),
643
+ RiskCard(
644
+ title="Duplicate contact risk",
645
+ detail="Remaining duplicate customer identities can create bad merges downstream.",
646
+ severity="high" if health.duplicate_contact_risk > 0.3 else "medium" if health.duplicate_contact_risk > 0.12 else "low",
647
+ metric_name="duplicate_contact_risk",
648
+ current_value=health.duplicate_contact_risk,
649
+ recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_keyword("merge")],
650
+ ),
651
+ RiskCard(
652
+ title="Revenue reporting risk",
653
+ detail="Duplicate or mislinked payment and order facts can distort downstream reporting.",
654
+ severity="high" if health.revenue_reporting_risk > 0.3 else "medium" if health.revenue_reporting_risk > 0.12 else "low",
655
+ metric_name="revenue_reporting_risk",
656
+ current_value=health.revenue_reporting_risk,
657
+ recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_tables({"orders", "payments"})],
658
+ ),
659
+ ]
660
+ return cards
661
+
662
+ def _recommended_operation_ids_for_tables(self, table_names: set[str]) -> list[str]:
663
+ return [
664
+ operation.operation_id
665
+ for operation in sorted(self._task_spec.operations.values(), key=lambda op: op.operation_id)
666
+ if set(operation.tables_affected) & table_names
667
+ ][:4]
668
+
669
+ def _recommended_operation_ids_for_keyword(self, keyword: str) -> list[str]:
670
+ lowered = keyword.lower()
671
+ return [
672
+ operation.operation_id
673
+ for operation in sorted(self._task_spec.operations.values(), key=lambda op: op.operation_id)
674
+ if lowered in operation.operation_id.lower() or lowered in operation.title.lower()
675
+ ][:4]
676
+
677
+ def _build_dry_run_report(self, target_system: str) -> DryRunReport:
678
+ findings: list[DryRunFinding] = []
679
+ for issue in self._grade.validation_issues:
680
+ if target_system == "crm" and issue.table_name not in {"customers", "subscriptions"}:
681
+ continue
682
+ if target_system == "billing" and issue.table_name not in {"orders", "subscriptions", "payments"}:
683
+ continue
684
+ findings.append(
685
+ DryRunFinding(
686
+ code=issue.code,
687
+ severity=issue.severity,
688
+ table_name=issue.table_name,
689
+ row_ids=list(issue.row_ids),
690
+ message=issue.message,
691
+ )
692
+ )
693
+
694
+ health = self._state.downstream_health
695
+ success_rate = health.crm_sync_success_rate if target_system == "crm" else health.billing_link_integrity
696
+
697
+ if target_system == "crm" and health.duplicate_contact_risk > 0.12:
698
+ findings.append(
699
+ DryRunFinding(
700
+ code="risk:duplicate_contacts",
701
+ severity="medium" if health.duplicate_contact_risk <= 0.3 else "high",
702
+ table_name="customers",
703
+ message="CRM dry run predicts duplicate-contact collisions after import.",
704
+ )
705
+ )
706
+ if target_system == "billing" and health.revenue_reporting_risk > 0.12:
707
+ findings.append(
708
+ DryRunFinding(
709
+ code="risk:revenue_reporting",
710
+ severity="medium" if health.revenue_reporting_risk <= 0.3 else "high",
711
+ table_name="payments" if "payments" in self._state.tables else "orders",
712
+ message="Billing dry run predicts mislinked or duplicated revenue facts.",
713
+ )
714
+ )
715
+
716
+ summary = (
717
+ f"Dry run for {target_system.upper()} found {len(findings)} blocking or risky findings; "
718
+ f"estimated success rate is {success_rate:.2f}."
719
+ )
720
+ return DryRunReport(
721
+ target_system=target_system,
722
+ success_rate=success_rate,
723
+ finding_count=len(findings),
724
+ findings=findings,
725
+ summary=summary,
726
+ generated_at_step=self._state.step_count,
727
+ )
728
+
729
  def _build_operation_detail(
730
  self,
731
  task_spec: TaskSpec,
cleanops_env/models.py CHANGED
@@ -14,10 +14,14 @@ class RewardBreakdown(BaseModel):
14
 
15
  quality_delta: float = Field(default=0.0, description="Change in overall grader score after the action.")
16
  issue_delta: float = Field(default=0.0, description="Normalized change in outstanding validation issues.")
 
17
  insight_bonus: float = Field(default=0.0, description="Small positive reward for inspecting new assets.")
18
  efficiency_penalty: float = Field(default=0.0, description="Per-step penalty to discourage long episodes.")
19
  invalid_action_penalty: float = Field(default=0.0, description="Penalty for malformed or unsupported actions.")
20
  noop_penalty: float = Field(default=0.0, description="Penalty for no-op or repeated actions.")
 
 
 
21
  submit_bonus: float = Field(default=0.0, description="End-of-episode bonus based on final score.")
22
  total: float = Field(default=0.0, description="Final scalar reward returned.")
23
 
@@ -42,6 +46,94 @@ class IssueCard(BaseModel):
42
  recommended_operation_ids: list[str] = Field(default_factory=list, description="Operations likely to address the issue.")
43
 
44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  class TableSummary(BaseModel):
46
  """Compact summary of a table."""
47
 
@@ -102,9 +194,13 @@ class GradeBreakdown(BaseModel):
102
  class DataCleaningAction(Action):
103
  """Action model for the environment."""
104
 
105
- action_type: Literal["inspect_table", "inspect_operation", "apply_operation", "submit"] = Field(..., description="Type of action to perform.")
106
  table_name: str | None = Field(default=None, description="Table to inspect when action_type=inspect_table.")
107
  operation_id: str | None = Field(default=None, description="Operation to inspect or apply when action_type is inspect_operation or apply_operation.")
 
 
 
 
108
  reasoning: str = Field(default="", description="Optional natural-language reasoning for debugging baselines.")
109
 
110
 
@@ -120,9 +216,18 @@ class DataCleaningObservation(Observation):
120
  quality_score: float = Field(default=0.0, description="Current deterministic grader score.")
121
  best_score: float = Field(default=0.0, description="Best score seen in the current episode.")
122
  remaining_steps: int = Field(default=0, description="How many actions remain before truncation.")
 
 
 
 
 
 
123
  table_summaries: list[TableSummary] = Field(default_factory=list, description="Compact summaries of all tables.")
124
  focus_table: TableView | None = Field(default=None, description="Detailed contents for the currently inspected table.")
125
  available_operations: list[OperationSummary] = Field(default_factory=list, description="Available cleaning actions.")
 
 
 
126
  focus_operation: OperationDetail | None = Field(default=None, description="Detailed preview for the currently inspected operation.")
127
  validation_issues: list[ValidationIssue] = Field(default_factory=list, description="Current unresolved validation issues.")
128
  issue_cards: list[IssueCard] = Field(default_factory=list, description="Aggregated issue cards with suggested next actions.")
@@ -141,12 +246,20 @@ class DataCleaningState(State):
141
  difficulty: Literal["easy", "medium", "hard"] = Field(..., description="Current task difficulty.")
142
  requested_seed: int | None = Field(default=None, description="Seed used when resetting the current episode.")
143
  max_steps: int = Field(..., description="Task step budget.")
 
 
144
  submitted: bool = Field(default=False, description="Whether submit was called.")
145
  current_score: float = Field(default=0.0, description="Current deterministic grader score.")
146
  best_score: float = Field(default=0.0, description="Best score achieved this episode.")
147
  outstanding_issue_count: int = Field(default=0, description="Number of unresolved validation issues.")
 
 
148
  tables: dict[str, list[dict[str, str]]] = Field(default_factory=dict, description="Current mutable table contents.")
149
  applied_operation_ids: list[str] = Field(default_factory=list, description="Operations already applied.")
150
  inspected_tables: list[str] = Field(default_factory=list, description="Tables inspected so far.")
151
  inspected_operations: list[str] = Field(default_factory=list, description="Operations inspected so far.")
 
 
 
 
152
  recent_history: list[str] = Field(default_factory=list, description="Recent action log.")
 
14
 
15
  quality_delta: float = Field(default=0.0, description="Change in overall grader score after the action.")
16
  issue_delta: float = Field(default=0.0, description="Normalized change in outstanding validation issues.")
17
+ downstream_health_delta: float = Field(default=0.0, description="Change in downstream operational health after the action.")
18
  insight_bonus: float = Field(default=0.0, description="Small positive reward for inspecting new assets.")
19
  efficiency_penalty: float = Field(default=0.0, description="Per-step penalty to discourage long episodes.")
20
  invalid_action_penalty: float = Field(default=0.0, description="Penalty for malformed or unsupported actions.")
21
  noop_penalty: float = Field(default=0.0, description="Penalty for no-op or repeated actions.")
22
+ review_bonus: float = Field(default=0.0, description="Positive reward when a queued review response becomes available.")
23
+ review_cost_penalty: float = Field(default=0.0, description="Small cost for consuming limited human-review budget.")
24
+ action_cost_penalty: float = Field(default=0.0, description="Cost-aware penalty attached to the chosen action.")
25
  submit_bonus: float = Field(default=0.0, description="End-of-episode bonus based on final score.")
26
  total: float = Field(default=0.0, description="Final scalar reward returned.")
27
 
 
46
  recommended_operation_ids: list[str] = Field(default_factory=list, description="Operations likely to address the issue.")
47
 
48
 
49
+ class ReviewTarget(BaseModel):
50
+ """A reviewable entity that can be escalated to a human reviewer."""
51
+
52
+ review_id: str = Field(..., description="Stable review case identifier.")
53
+ entity_type: str = Field(..., description="Type of entity under review.")
54
+ entity_id: str = Field(..., description="Primary identifier for the reviewed entity.")
55
+ reason_code: str = Field(..., description="Why the review would be requested.")
56
+ title: str = Field(..., description="Short human-readable review title.")
57
+ detail: str = Field(..., description="Why this review matters.")
58
+ recommended_operation_ids: list[str] = Field(default_factory=list, description="Operations likely to be safe once review resolves.")
59
+
60
+
61
+ class PendingReview(BaseModel):
62
+ """A queued review request awaiting a deterministic response."""
63
+
64
+ review_id: str = Field(..., description="Stable review case identifier.")
65
+ entity_type: str = Field(..., description="Type of entity under review.")
66
+ entity_id: str = Field(..., description="Primary identifier for the reviewed entity.")
67
+ reason_code: str = Field(..., description="Why the review was requested.")
68
+ title: str = Field(..., description="Short human-readable review title.")
69
+ requested_at_step: int = Field(..., description="Step index when the review was requested.")
70
+ ready_at_step: int = Field(..., description="First step on which the deterministic response becomes available.")
71
+
72
+
73
+ class ReviewResolution(BaseModel):
74
+ """A resolved human-review response surfaced back to the agent."""
75
+
76
+ review_id: str = Field(..., description="Stable review case identifier.")
77
+ entity_type: str = Field(..., description="Type of entity under review.")
78
+ entity_id: str = Field(..., description="Primary identifier for the reviewed entity.")
79
+ reason_code: str = Field(..., description="Why the review was requested.")
80
+ title: str = Field(..., description="Short human-readable review title.")
81
+ resolution: str = Field(..., description="Deterministic review outcome label.")
82
+ response_summary: str = Field(..., description="What the reviewer concluded.")
83
+ evidence_summary: str = Field(..., description="Short explanation for the decision.")
84
+ recommended_operation_ids: list[str] = Field(default_factory=list, description="Operations that become safer after the review response.")
85
+
86
+
87
+ class DryRunFinding(BaseModel):
88
+ """A deterministic downstream issue surfaced by a dry-run sync."""
89
+
90
+ code: str = Field(..., description="Stable machine-readable issue code.")
91
+ severity: Literal["low", "medium", "high"] = Field(..., description="Issue severity.")
92
+ table_name: str | None = Field(default=None, description="Table implicated by the dry-run finding.")
93
+ row_ids: list[str] = Field(default_factory=list, description="Primary-key values implicated by the finding.")
94
+ message: str = Field(..., description="Human-readable dry-run explanation.")
95
+
96
+
97
+ class DryRunReport(BaseModel):
98
+ """A dry-run simulation result for a downstream business system."""
99
+
100
+ target_system: Literal["crm", "billing"] = Field(..., description="Which downstream system was tested.")
101
+ success_rate: float = Field(default=0.0, description="Deterministic estimate of how many records would import successfully.")
102
+ finding_count: int = Field(default=0, description="How many concrete blockers or risks were found.")
103
+ findings: list[DryRunFinding] = Field(default_factory=list, description="Structured findings from the simulated sync.")
104
+ summary: str = Field(default="", description="Short narrative summary of the dry-run result.")
105
+ generated_at_step: int = Field(default=0, description="Step on which the report was generated.")
106
+
107
+
108
+ class DownstreamHealth(BaseModel):
109
+ """Operational health estimates for downstream systems."""
110
+
111
+ crm_sync_success_rate: float = Field(default=0.0, description="Estimated CRM import success rate.")
112
+ billing_link_integrity: float = Field(default=0.0, description="Estimated correctness of billing/customer linkages.")
113
+ duplicate_contact_risk: float = Field(default=0.0, description="Estimated risk that duplicate contacts still remain.")
114
+ revenue_reporting_risk: float = Field(default=0.0, description="Estimated risk of duplicate or mislinked revenue facts.")
115
+ overall_health_score: float = Field(default=0.0, description="Composite downstream health score used for reward shaping.")
116
+
117
+
118
+ class RiskCard(BaseModel):
119
+ """A compact operational risk summary derived from downstream health."""
120
+
121
+ title: str = Field(..., description="Short risk title.")
122
+ detail: str = Field(..., description="Why this risk matters operationally.")
123
+ severity: Literal["low", "medium", "high"] = Field(..., description="Severity for UI and agent prioritization.")
124
+ metric_name: str = Field(..., description="Downstream metric represented by this card.")
125
+ current_value: float = Field(default=0.0, description="Current metric or risk value in [0, 1].")
126
+ recommended_action_ids: list[str] = Field(default_factory=list, description="Operations likely to improve this risk.")
127
+
128
+
129
+ class ActionCostEntry(BaseModel):
130
+ """Estimated operational cost of taking an action."""
131
+
132
+ action_key: str = Field(..., description="Stable action or risk key.")
133
+ estimated_cost: float = Field(default=0.0, description="Relative action cost used in reward shaping.")
134
+ description: str = Field(default="", description="Why this action costs reviewer or system capacity.")
135
+
136
+
137
  class TableSummary(BaseModel):
138
  """Compact summary of a table."""
139
 
 
194
  class DataCleaningAction(Action):
195
  """Action model for the environment."""
196
 
197
+ action_type: Literal["inspect_table", "inspect_operation", "apply_operation", "request_review", "run_sync_dry_run", "submit"] = Field(..., description="Type of action to perform.")
198
  table_name: str | None = Field(default=None, description="Table to inspect when action_type=inspect_table.")
199
  operation_id: str | None = Field(default=None, description="Operation to inspect or apply when action_type is inspect_operation or apply_operation.")
200
+ entity_type: str | None = Field(default=None, description="Entity type to review when action_type=request_review.")
201
+ entity_id: str | None = Field(default=None, description="Entity identifier to review when action_type=request_review.")
202
+ target_system: Literal["crm", "billing"] | None = Field(default=None, description="Downstream system to simulate when action_type=run_sync_dry_run.")
203
+ reason_code: str | None = Field(default=None, description="Reason for escalating a review request.")
204
  reasoning: str = Field(default="", description="Optional natural-language reasoning for debugging baselines.")
205
 
206
 
 
216
  quality_score: float = Field(default=0.0, description="Current deterministic grader score.")
217
  best_score: float = Field(default=0.0, description="Best score seen in the current episode.")
218
  remaining_steps: int = Field(default=0, description="How many actions remain before truncation.")
219
+ review_budget_remaining: int = Field(default=0, description="How many human-review requests remain in the current episode.")
220
+ supported_sync_targets: list[str] = Field(default_factory=list, description="Downstream systems that can be tested with run_sync_dry_run.")
221
+ downstream_health: DownstreamHealth = Field(default_factory=DownstreamHealth, description="Current operational health estimates for downstream systems.")
222
+ risk_cards: list[RiskCard] = Field(default_factory=list, description="Operational risk summaries derived from downstream health.")
223
+ last_dry_run: DryRunReport | None = Field(default=None, description="Most recent downstream dry-run result, if any.")
224
+ action_costs: list[ActionCostEntry] = Field(default_factory=list, description="Estimated cost of each action family.")
225
  table_summaries: list[TableSummary] = Field(default_factory=list, description="Compact summaries of all tables.")
226
  focus_table: TableView | None = Field(default=None, description="Detailed contents for the currently inspected table.")
227
  available_operations: list[OperationSummary] = Field(default_factory=list, description="Available cleaning actions.")
228
+ available_review_targets: list[ReviewTarget] = Field(default_factory=list, description="Entities that can be escalated for deterministic review.")
229
+ pending_reviews: list[PendingReview] = Field(default_factory=list, description="Review requests that have been queued but not yet resolved.")
230
+ resolved_reviews: list[ReviewResolution] = Field(default_factory=list, description="Resolved review responses available to the agent.")
231
  focus_operation: OperationDetail | None = Field(default=None, description="Detailed preview for the currently inspected operation.")
232
  validation_issues: list[ValidationIssue] = Field(default_factory=list, description="Current unresolved validation issues.")
233
  issue_cards: list[IssueCard] = Field(default_factory=list, description="Aggregated issue cards with suggested next actions.")
 
246
  difficulty: Literal["easy", "medium", "hard"] = Field(..., description="Current task difficulty.")
247
  requested_seed: int | None = Field(default=None, description="Seed used when resetting the current episode.")
248
  max_steps: int = Field(..., description="Task step budget.")
249
+ review_budget_total: int = Field(default=0, description="Total number of review requests available in this task.")
250
+ review_budget_remaining: int = Field(default=0, description="Remaining number of review requests available in this task.")
251
  submitted: bool = Field(default=False, description="Whether submit was called.")
252
  current_score: float = Field(default=0.0, description="Current deterministic grader score.")
253
  best_score: float = Field(default=0.0, description="Best score achieved this episode.")
254
  outstanding_issue_count: int = Field(default=0, description="Number of unresolved validation issues.")
255
+ downstream_health: DownstreamHealth = Field(default_factory=DownstreamHealth, description="Current downstream operational health.")
256
+ last_dry_run: DryRunReport | None = Field(default=None, description="Most recent downstream dry-run result.")
257
  tables: dict[str, list[dict[str, str]]] = Field(default_factory=dict, description="Current mutable table contents.")
258
  applied_operation_ids: list[str] = Field(default_factory=list, description="Operations already applied.")
259
  inspected_tables: list[str] = Field(default_factory=list, description="Tables inspected so far.")
260
  inspected_operations: list[str] = Field(default_factory=list, description="Operations inspected so far.")
261
+ requested_review_ids: list[str] = Field(default_factory=list, description="Review cases already requested in this episode.")
262
+ pending_reviews: list[PendingReview] = Field(default_factory=list, description="Queued review requests awaiting deterministic responses.")
263
+ resolved_reviews: list[ReviewResolution] = Field(default_factory=list, description="Resolved review responses available to the agent.")
264
+ dry_run_targets: list[str] = Field(default_factory=list, description="Downstream targets that have already been dry-run in this episode.")
265
  recent_history: list[str] = Field(default_factory=list, description="Recent action log.")
cleanops_env/tasks.py CHANGED
@@ -98,6 +98,20 @@ class OperationSpec:
98
  transform: TransformFn
99
 
100
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
  @dataclass(frozen=True)
102
  class TaskSpec:
103
  task_id: str
@@ -106,6 +120,8 @@ class TaskSpec:
106
  objective: str
107
  dataset_context: str
108
  max_steps: int
 
 
109
  primary_keys: dict[str, str]
110
  duplicate_identity_columns: dict[str, tuple[str, ...]]
111
  dirty_tables: Tables
@@ -114,6 +130,7 @@ class TaskSpec:
114
  operations: dict[str, OperationSpec]
115
  solution_operation_ids: tuple[str, ...]
116
  issue_cards: tuple[IssueCard, ...]
 
117
 
118
 
119
  def clone_tables(tables: Tables) -> Tables:
@@ -353,6 +370,8 @@ def _task_from_solution(
353
  objective: str,
354
  dataset_context: str,
355
  max_steps: int,
 
 
356
  primary_keys: dict[str, str],
357
  duplicate_identity_columns: dict[str, tuple[str, ...]],
358
  dirty_tables: Tables,
@@ -360,6 +379,7 @@ def _task_from_solution(
360
  operations: dict[str, OperationSpec],
361
  solution_operation_ids: tuple[str, ...],
362
  issue_cards: tuple[IssueCard, ...],
 
363
  ) -> TaskSpec:
364
  gold_tables = clone_tables(dirty_tables)
365
  for operation_id in solution_operation_ids:
@@ -371,6 +391,8 @@ def _task_from_solution(
371
  objective=objective,
372
  dataset_context=dataset_context,
373
  max_steps=max_steps,
 
 
374
  primary_keys=primary_keys,
375
  duplicate_identity_columns=duplicate_identity_columns,
376
  dirty_tables=dirty_tables,
@@ -379,6 +401,7 @@ def _task_from_solution(
379
  operations=operations,
380
  solution_operation_ids=solution_operation_ids,
381
  issue_cards=issue_cards,
 
382
  )
383
 
384
 
@@ -418,6 +441,20 @@ def _build_easy_task() -> TaskSpec:
418
  IssueCard(title="A missing state value blocks validation", detail="One customer record has city information but no state code.", issue_codes=["required:customers.state"], recommended_operation_ids=["easy_fill_state_from_city"]),
419
  IssueCard(title="Duplicate customer identities exist", detail="Two rows refer to the same customer once emails are normalized.", issue_codes=["unique:customers.email"], recommended_operation_ids=["easy_merge_customers_by_email"]),
420
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
421
  return _task_from_solution(
422
  task_id="customer_contacts_easy",
423
  title="Customer Contacts Standardization",
@@ -425,6 +462,8 @@ def _build_easy_task() -> TaskSpec:
425
  objective="Prepare a customer-contact export for CRM import by standardizing contact fields, filling one missing state, and merging duplicate customer rows without deleting valid inactive accounts.",
426
  dataset_context="This table simulates a weekly B2B CRM export that sales ops cleans before loading into a customer system.",
427
  max_steps=10,
 
 
428
  primary_keys={"customers": "customer_id"},
429
  duplicate_identity_columns={"customers": ("email",)},
430
  dirty_tables=dirty_tables,
@@ -432,6 +471,7 @@ def _build_easy_task() -> TaskSpec:
432
  operations=operations,
433
  solution_operation_ids=("easy_normalize_names", "easy_normalize_emails", "easy_normalize_phones", "easy_normalize_states", "easy_fill_state_from_city", "easy_merge_customers_by_email"),
434
  issue_cards=issue_cards,
 
435
  )
436
 
437
 
@@ -477,6 +517,20 @@ def _build_medium_task() -> TaskSpec:
477
  IssueCard(title="Shipping state labels are not canonical", detail="Downstream warehouse tools require two-letter state abbreviations.", issue_codes=["enum:orders.shipping_state"], recommended_operation_ids=["med_normalize_shipping_states"]),
478
  IssueCard(title="A duplicated order row exists", detail="One record is a second export copy of another order.", issue_codes=["unique:orders.order_id"], recommended_operation_ids=["med_dedupe_orders"]),
479
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
480
  return _task_from_solution(
481
  task_id="orders_reconciliation_medium",
482
  title="E-commerce Order Reconciliation",
@@ -484,6 +538,8 @@ def _build_medium_task() -> TaskSpec:
484
  objective="Clean a transactional orders export by normalizing dates, money, statuses, and shipping states while deduplicating repeated order exports without deleting legitimate cancelled orders.",
485
  dataset_context="This table simulates a daily order extract from an e-commerce platform that revenue ops must reconcile before BI ingestion.",
486
  max_steps=12,
 
 
487
  primary_keys={"orders": "order_id"},
488
  duplicate_identity_columns={"orders": ("order_id",)},
489
  dirty_tables=dirty_tables,
@@ -491,6 +547,7 @@ def _build_medium_task() -> TaskSpec:
491
  operations=operations,
492
  solution_operation_ids=("med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses", "med_normalize_shipping_states", "med_dedupe_orders"),
493
  issue_cards=issue_cards,
 
494
  )
495
 
496
 
@@ -571,6 +628,32 @@ def _build_hard_task() -> TaskSpec:
571
  IssueCard(title="Subscription and payment facts use inconsistent formats", detail="Plans, statuses, dates, amounts, and currency values need canonicalization before loading.", issue_codes=["enum:subscriptions.plan_code", "enum:subscriptions.status", "pattern:subscriptions.renewal_date", "pattern:payments.amount", "enum:payments.payment_status", "pattern:payments.paid_at"], recommended_operation_ids=["hard_normalize_subscriptions", "hard_normalize_payments"]),
572
  IssueCard(title="Duplicate payment facts are present", detail="Two payment rows represent the same invoice settlement and one should be removed.", issue_codes=["unique:payments.customer_email+subscription_id+amount+paid_at"], recommended_operation_ids=["hard_remove_duplicate_payments"]),
573
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
574
  return _task_from_solution(
575
  task_id="crm_migration_hard",
576
  title="CRM Migration Referential Cleanup",
@@ -578,6 +661,8 @@ def _build_hard_task() -> TaskSpec:
578
  objective="Repair a three-table CRM migration extract by standardizing customer, subscription, and payment data; merging duplicate customers; fixing foreign keys from email joins; and removing duplicate payment facts without dropping legitimate orphan-like child rows.",
579
  dataset_context="This dataset simulates a SaaS CRM and billing migration where a team must clean customer master data and child ledger references before import.",
580
  max_steps=18,
 
 
581
  primary_keys={"customers": "customer_id", "subscriptions": "subscription_id", "payments": "payment_id"},
582
  duplicate_identity_columns={"customers": ("email",), "subscriptions": ("subscription_id",), "payments": ("customer_email", "subscription_id", "amount", "paid_at")},
583
  dirty_tables=dirty_tables,
@@ -585,6 +670,7 @@ def _build_hard_task() -> TaskSpec:
585
  operations=operations,
586
  solution_operation_ids=("hard_normalize_customer_fields", "hard_merge_customers_by_email", "hard_normalize_subscriptions", "hard_repair_subscription_customer_refs", "hard_normalize_payments", "hard_repair_payment_customer_refs", "hard_remove_duplicate_payments"),
587
  issue_cards=issue_cards,
 
588
  )
589
 
590
 
 
98
  transform: TransformFn
99
 
100
 
101
+ @dataclass(frozen=True)
102
+ class ReviewCaseSpec:
103
+ review_id: str
104
+ entity_type: str
105
+ entity_id: str
106
+ reason_code: str
107
+ title: str
108
+ detail: str
109
+ resolution: str
110
+ response_summary: str
111
+ evidence_summary: str
112
+ recommended_operation_ids: tuple[str, ...] = ()
113
+
114
+
115
  @dataclass(frozen=True)
116
  class TaskSpec:
117
  task_id: str
 
120
  objective: str
121
  dataset_context: str
122
  max_steps: int
123
+ review_budget: int
124
+ sync_targets: tuple[str, ...]
125
  primary_keys: dict[str, str]
126
  duplicate_identity_columns: dict[str, tuple[str, ...]]
127
  dirty_tables: Tables
 
130
  operations: dict[str, OperationSpec]
131
  solution_operation_ids: tuple[str, ...]
132
  issue_cards: tuple[IssueCard, ...]
133
+ review_cases: dict[str, ReviewCaseSpec]
134
 
135
 
136
  def clone_tables(tables: Tables) -> Tables:
 
370
  objective: str,
371
  dataset_context: str,
372
  max_steps: int,
373
+ review_budget: int,
374
+ sync_targets: tuple[str, ...],
375
  primary_keys: dict[str, str],
376
  duplicate_identity_columns: dict[str, tuple[str, ...]],
377
  dirty_tables: Tables,
 
379
  operations: dict[str, OperationSpec],
380
  solution_operation_ids: tuple[str, ...],
381
  issue_cards: tuple[IssueCard, ...],
382
+ review_cases: dict[str, ReviewCaseSpec],
383
  ) -> TaskSpec:
384
  gold_tables = clone_tables(dirty_tables)
385
  for operation_id in solution_operation_ids:
 
391
  objective=objective,
392
  dataset_context=dataset_context,
393
  max_steps=max_steps,
394
+ review_budget=review_budget,
395
+ sync_targets=sync_targets,
396
  primary_keys=primary_keys,
397
  duplicate_identity_columns=duplicate_identity_columns,
398
  dirty_tables=dirty_tables,
 
401
  operations=operations,
402
  solution_operation_ids=solution_operation_ids,
403
  issue_cards=issue_cards,
404
+ review_cases=review_cases,
405
  )
406
 
407
 
 
441
  IssueCard(title="A missing state value blocks validation", detail="One customer record has city information but no state code.", issue_codes=["required:customers.state"], recommended_operation_ids=["easy_fill_state_from_city"]),
442
  IssueCard(title="Duplicate customer identities exist", detail="Two rows refer to the same customer once emails are normalized.", issue_codes=["unique:customers.email"], recommended_operation_ids=["easy_merge_customers_by_email"]),
443
  )
444
+ review_cases = {
445
+ "easy_customer_duplicate_review": ReviewCaseSpec(
446
+ review_id="easy_customer_duplicate_review",
447
+ entity_type="customer",
448
+ entity_id="C005",
449
+ reason_code="possible_duplicate",
450
+ title="Confirm duplicate customer merge",
451
+ detail="Alice Johnson appears twice with status conflicts after email normalization.",
452
+ resolution="merge_confirmed_keep_c001",
453
+ response_summary="Merge C005 into C001. Keep the active account record and preserve inactive customers elsewhere in the file.",
454
+ evidence_summary="Normalized emails match and both rows describe the same Nashville customer; C001 is the canonical CRM ID.",
455
+ recommended_operation_ids=("easy_merge_customers_by_email",),
456
+ )
457
+ }
458
  return _task_from_solution(
459
  task_id="customer_contacts_easy",
460
  title="Customer Contacts Standardization",
 
462
  objective="Prepare a customer-contact export for CRM import by standardizing contact fields, filling one missing state, and merging duplicate customer rows without deleting valid inactive accounts.",
463
  dataset_context="This table simulates a weekly B2B CRM export that sales ops cleans before loading into a customer system.",
464
  max_steps=10,
465
+ review_budget=1,
466
+ sync_targets=("crm",),
467
  primary_keys={"customers": "customer_id"},
468
  duplicate_identity_columns={"customers": ("email",)},
469
  dirty_tables=dirty_tables,
 
471
  operations=operations,
472
  solution_operation_ids=("easy_normalize_names", "easy_normalize_emails", "easy_normalize_phones", "easy_normalize_states", "easy_fill_state_from_city", "easy_merge_customers_by_email"),
473
  issue_cards=issue_cards,
474
+ review_cases=review_cases,
475
  )
476
 
477
 
 
517
  IssueCard(title="Shipping state labels are not canonical", detail="Downstream warehouse tools require two-letter state abbreviations.", issue_codes=["enum:orders.shipping_state"], recommended_operation_ids=["med_normalize_shipping_states"]),
518
  IssueCard(title="A duplicated order row exists", detail="One record is a second export copy of another order.", issue_codes=["unique:orders.order_id"], recommended_operation_ids=["med_dedupe_orders"]),
519
  )
520
+ review_cases = {
521
+ "med_returned_order_review": ReviewCaseSpec(
522
+ review_id="med_returned_order_review",
523
+ entity_type="order",
524
+ entity_id="O1005",
525
+ reason_code="preserve_operational_record",
526
+ title="Confirm whether returned order should be retained",
527
+ detail="Returned orders often look removable during cleanup, but finance may still require them.",
528
+ resolution="retain_returned_order",
529
+ response_summary="Keep O1005 in the dataset. Normalize it, but do not delete returned or cancelled orders for this reconciliation task.",
530
+ evidence_summary="Returned orders are part of audit trails and downstream refund reporting; the row is legitimate, not noise.",
531
+ recommended_operation_ids=("med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses"),
532
+ )
533
+ }
534
  return _task_from_solution(
535
  task_id="orders_reconciliation_medium",
536
  title="E-commerce Order Reconciliation",
 
538
  objective="Clean a transactional orders export by normalizing dates, money, statuses, and shipping states while deduplicating repeated order exports without deleting legitimate cancelled orders.",
539
  dataset_context="This table simulates a daily order extract from an e-commerce platform that revenue ops must reconcile before BI ingestion.",
540
  max_steps=12,
541
+ review_budget=1,
542
+ sync_targets=("crm", "billing"),
543
  primary_keys={"orders": "order_id"},
544
  duplicate_identity_columns={"orders": ("order_id",)},
545
  dirty_tables=dirty_tables,
 
547
  operations=operations,
548
  solution_operation_ids=("med_normalize_dates", "med_normalize_currency_amounts", "med_normalize_order_statuses", "med_normalize_shipping_states", "med_dedupe_orders"),
549
  issue_cards=issue_cards,
550
+ review_cases=review_cases,
551
  )
552
 
553
 
 
628
  IssueCard(title="Subscription and payment facts use inconsistent formats", detail="Plans, statuses, dates, amounts, and currency values need canonicalization before loading.", issue_codes=["enum:subscriptions.plan_code", "enum:subscriptions.status", "pattern:subscriptions.renewal_date", "pattern:payments.amount", "enum:payments.payment_status", "pattern:payments.paid_at"], recommended_operation_ids=["hard_normalize_subscriptions", "hard_normalize_payments"]),
629
  IssueCard(title="Duplicate payment facts are present", detail="Two payment rows represent the same invoice settlement and one should be removed.", issue_codes=["unique:payments.customer_email+subscription_id+amount+paid_at"], recommended_operation_ids=["hard_remove_duplicate_payments"]),
630
  )
631
+ review_cases = {
632
+ "hard_customer_merge_review": ReviewCaseSpec(
633
+ review_id="hard_customer_merge_review",
634
+ entity_type="customer",
635
+ entity_id="CU101",
636
+ reason_code="possible_duplicate",
637
+ title="Confirm duplicate customer merge",
638
+ detail="CU100 and CU101 normalize to the same email, but child tables disagree on which customer ID is canonical.",
639
+ resolution="merge_cu101_into_cu100",
640
+ response_summary="Treat CU100 as the canonical CRM customer and merge CU101 into it before repairing child foreign keys.",
641
+ evidence_summary="Customer master history shows CU100 was created first and both Ana Lopez rows share the same normalized email.",
642
+ recommended_operation_ids=("hard_merge_customers_by_email", "hard_repair_subscription_customer_refs", "hard_repair_payment_customer_refs"),
643
+ ),
644
+ "hard_payment_orphan_review": ReviewCaseSpec(
645
+ review_id="hard_payment_orphan_review",
646
+ entity_type="payment",
647
+ entity_id="P501",
648
+ reason_code="blank_customer_id",
649
+ title="Confirm how to repair blank payment customer_id",
650
+ detail="Payment P501 has a blank customer_id but a valid customer email that may identify the correct customer dimension row.",
651
+ resolution="repair_from_customer_email",
652
+ response_summary="Repair P501 by matching its normalized customer_email to the customer master; do not delete the row.",
653
+ evidence_summary="The billing export preserved ben.carter@example.com, so the customer foreign key can be restored deterministically.",
654
+ recommended_operation_ids=("hard_normalize_payments", "hard_repair_payment_customer_refs"),
655
+ ),
656
+ }
657
  return _task_from_solution(
658
  task_id="crm_migration_hard",
659
  title="CRM Migration Referential Cleanup",
 
661
  objective="Repair a three-table CRM migration extract by standardizing customer, subscription, and payment data; merging duplicate customers; fixing foreign keys from email joins; and removing duplicate payment facts without dropping legitimate orphan-like child rows.",
662
  dataset_context="This dataset simulates a SaaS CRM and billing migration where a team must clean customer master data and child ledger references before import.",
663
  max_steps=18,
664
+ review_budget=2,
665
+ sync_targets=("crm", "billing"),
666
  primary_keys={"customers": "customer_id", "subscriptions": "subscription_id", "payments": "payment_id"},
667
  duplicate_identity_columns={"customers": ("email",), "subscriptions": ("subscription_id",), "payments": ("customer_email", "subscription_id", "amount", "paid_at")},
668
  dirty_tables=dirty_tables,
 
670
  operations=operations,
671
  solution_operation_ids=("hard_normalize_customer_fields", "hard_merge_customers_by_email", "hard_normalize_subscriptions", "hard_repair_subscription_customer_refs", "hard_normalize_payments", "hard_repair_payment_customer_refs", "hard_remove_duplicate_payments"),
672
  issue_cards=issue_cards,
673
+ review_cases=review_cases,
674
  )
675
 
676
 
inference.py CHANGED
@@ -33,12 +33,18 @@ SYSTEM_PROMPT = textwrap.dedent(
33
  You are a data-cleaning operations agent working in the CleanOps OpenEnv benchmark.
34
  Choose exactly one JSON action per turn using this schema:
35
  {
36
- "action_type": "inspect_table" | "inspect_operation" | "apply_operation" | "submit",
37
  "table_name": string | null,
38
  "operation_id": string | null,
 
 
 
 
39
  "reasoning": string
40
  }
41
  Prefer safe/review operations that directly resolve current validation issues.
 
 
42
  Avoid destructive operations unless the task objective explicitly asks for deletions.
43
  Submit once quality_score is high and remaining validation issues are gone.
44
  Return only a single JSON object.
@@ -68,6 +74,15 @@ def build_observation_prompt(observation: DataCleaningObservation) -> str:
68
  "objective": observation.objective,
69
  "quality_score": observation.quality_score,
70
  "remaining_steps": observation.remaining_steps,
 
 
 
 
 
 
 
 
 
71
  "table_summaries": [summary.model_dump() for summary in observation.table_summaries],
72
  "focus_table": observation.focus_table.model_dump() if observation.focus_table else None,
73
  "focus_operation": observation.focus_operation.model_dump() if observation.focus_operation else None,
@@ -121,6 +136,10 @@ def action_to_string(action: DataCleaningAction) -> str:
121
  return f"inspect_operation({action.operation_id})"
122
  if action.action_type == "apply_operation":
123
  return f"apply_operation({action.operation_id})"
 
 
 
 
124
  return "submit()"
125
 
126
 
 
33
  You are a data-cleaning operations agent working in the CleanOps OpenEnv benchmark.
34
  Choose exactly one JSON action per turn using this schema:
35
  {
36
+ "action_type": "inspect_table" | "inspect_operation" | "apply_operation" | "request_review" | "run_sync_dry_run" | "submit",
37
  "table_name": string | null,
38
  "operation_id": string | null,
39
+ "entity_type": string | null,
40
+ "entity_id": string | null,
41
+ "target_system": "crm" | "billing" | null,
42
+ "reason_code": string | null,
43
  "reasoning": string
44
  }
45
  Prefer safe/review operations that directly resolve current validation issues.
46
+ Use request_review when the environment flags an ambiguous merge or repair decision.
47
+ Use run_sync_dry_run before submit on medium and hard tasks when downstream risk still looks material.
48
  Avoid destructive operations unless the task objective explicitly asks for deletions.
49
  Submit once quality_score is high and remaining validation issues are gone.
50
  Return only a single JSON object.
 
74
  "objective": observation.objective,
75
  "quality_score": observation.quality_score,
76
  "remaining_steps": observation.remaining_steps,
77
+ "review_budget_remaining": observation.review_budget_remaining,
78
+ "supported_sync_targets": observation.supported_sync_targets,
79
+ "downstream_health": observation.downstream_health.model_dump(),
80
+ "risk_cards": [risk_card.model_dump() for risk_card in observation.risk_cards],
81
+ "available_review_targets": [target.model_dump() for target in observation.available_review_targets],
82
+ "pending_reviews": [review.model_dump() for review in observation.pending_reviews],
83
+ "resolved_reviews": [review.model_dump() for review in observation.resolved_reviews],
84
+ "last_dry_run": observation.last_dry_run.model_dump() if observation.last_dry_run else None,
85
+ "action_costs": [entry.model_dump() for entry in observation.action_costs],
86
  "table_summaries": [summary.model_dump() for summary in observation.table_summaries],
87
  "focus_table": observation.focus_table.model_dump() if observation.focus_table else None,
88
  "focus_operation": observation.focus_operation.model_dump() if observation.focus_operation else None,
 
136
  return f"inspect_operation({action.operation_id})"
137
  if action.action_type == "apply_operation":
138
  return f"apply_operation({action.operation_id})"
139
+ if action.action_type == "request_review":
140
+ return f"request_review({action.entity_type},{action.entity_id},{action.reason_code})"
141
+ if action.action_type == "run_sync_dry_run":
142
+ return f"run_sync_dry_run({action.target_system})"
143
  return "submit()"
144
 
145
 
scripts/run_openai_baseline.py CHANGED
@@ -23,13 +23,19 @@ SYSTEM_PROMPT = """You are a careful data-cleaning operations agent.
23
  Your job is to improve the current task score by choosing one JSON action at a time.
24
  Use only this JSON schema:
25
  {
26
- "action_type": "inspect_table" | "inspect_operation" | "apply_operation" | "submit",
27
  "table_name": string | null,
28
  "operation_id": string | null,
 
 
 
 
29
  "reasoning": string
30
  }
31
  Rules:
32
  - Prefer safe/review operations that directly address unresolved validation issues.
 
 
33
  - Avoid destructive operations unless the objective explicitly asks for row deletion.
34
  - Call submit only when the data looks clean or there is 1 step left.
35
  - Return a single JSON object and no extra text."""
@@ -44,6 +50,15 @@ def compact_observation(observation: DataCleaningObservation) -> dict[str, Any]:
44
  "dataset_context": observation.dataset_context,
45
  "quality_score": observation.quality_score,
46
  "remaining_steps": observation.remaining_steps,
 
 
 
 
 
 
 
 
 
47
  "last_action_status": observation.last_action_status,
48
  "recent_history": observation.recent_history[-5:],
49
  "table_summaries": [summary.model_dump() for summary in observation.table_summaries],
@@ -148,4 +163,3 @@ def main() -> None:
148
 
149
  if __name__ == "__main__":
150
  main()
151
-
 
23
  Your job is to improve the current task score by choosing one JSON action at a time.
24
  Use only this JSON schema:
25
  {
26
+ "action_type": "inspect_table" | "inspect_operation" | "apply_operation" | "request_review" | "run_sync_dry_run" | "submit",
27
  "table_name": string | null,
28
  "operation_id": string | null,
29
+ "entity_type": string | null,
30
+ "entity_id": string | null,
31
+ "target_system": "crm" | "billing" | null,
32
+ "reason_code": string | null,
33
  "reasoning": string
34
  }
35
  Rules:
36
  - Prefer safe/review operations that directly address unresolved validation issues.
37
+ - Use request_review when an ambiguous merge or foreign-key repair needs confirmation.
38
+ - Use run_sync_dry_run before submit when downstream health is still weak.
39
  - Avoid destructive operations unless the objective explicitly asks for row deletion.
40
  - Call submit only when the data looks clean or there is 1 step left.
41
  - Return a single JSON object and no extra text."""
 
50
  "dataset_context": observation.dataset_context,
51
  "quality_score": observation.quality_score,
52
  "remaining_steps": observation.remaining_steps,
53
+ "review_budget_remaining": observation.review_budget_remaining,
54
+ "supported_sync_targets": observation.supported_sync_targets,
55
+ "downstream_health": observation.downstream_health.model_dump(),
56
+ "risk_cards": [risk_card.model_dump() for risk_card in observation.risk_cards],
57
+ "available_review_targets": [target.model_dump() for target in observation.available_review_targets],
58
+ "pending_reviews": [review.model_dump() for review in observation.pending_reviews],
59
+ "resolved_reviews": [review.model_dump() for review in observation.resolved_reviews],
60
+ "last_dry_run": observation.last_dry_run.model_dump() if observation.last_dry_run else None,
61
+ "action_costs": [entry.model_dump() for entry in observation.action_costs],
62
  "last_action_status": observation.last_action_status,
63
  "recent_history": observation.recent_history[-5:],
64
  "table_summaries": [summary.model_dump() for summary in observation.table_summaries],
 
163
 
164
  if __name__ == "__main__":
165
  main()
 
tests/test_environment.py CHANGED
@@ -11,6 +11,10 @@ def test_reset_step_state_api() -> None:
11
  observation = env.reset(task_id="customer_contacts_easy", seed=7)
12
  assert observation.task_id == "customer_contacts_easy"
13
  assert observation.requested_seed == 7
 
 
 
 
14
  assert observation.done is False
15
  assert observation.quality_score < 1.0
16
 
@@ -59,3 +63,94 @@ def test_seed_changes_visible_preview_rows() -> None:
59
  assert observation_seed_2.requested_seed == 2
60
  assert observation_seed_7.requested_seed == 7
61
  assert preview_seed_2 != preview_seed_7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
  observation = env.reset(task_id="customer_contacts_easy", seed=7)
12
  assert observation.task_id == "customer_contacts_easy"
13
  assert observation.requested_seed == 7
14
+ assert observation.review_budget_remaining == 1
15
+ assert observation.supported_sync_targets == ["crm"]
16
+ assert len(observation.available_review_targets) == 1
17
+ assert 0.0 < observation.downstream_health.overall_health_score < 1.0
18
  assert observation.done is False
19
  assert observation.quality_score < 1.0
20
 
 
63
  assert observation_seed_2.requested_seed == 2
64
  assert observation_seed_7.requested_seed == 7
65
  assert preview_seed_2 != preview_seed_7
66
+
67
+
68
+ def test_request_review_queues_and_releases_deterministic_response() -> None:
69
+ env = LocalCleanOpsEnv()
70
+ observation = env.reset(task_id="crm_migration_hard", seed=7)
71
+ assert observation.review_budget_remaining == 2
72
+ assert len(observation.pending_reviews) == 0
73
+ assert len(observation.resolved_reviews) == 0
74
+
75
+ observation, reward, done, info = env.step(
76
+ DataCleaningAction(
77
+ action_type="request_review",
78
+ entity_type="customer",
79
+ entity_id="CU101",
80
+ reason_code="possible_duplicate",
81
+ reasoning="Escalate the ambiguous Ana Lopez duplicate before merging.",
82
+ )
83
+ )
84
+ assert done is False
85
+ assert reward < 0.0
86
+ assert observation.review_budget_remaining == 1
87
+ assert len(observation.pending_reviews) == 1
88
+ assert len(observation.resolved_reviews) == 0
89
+ assert "response will be available on the next step" in observation.last_action_status
90
+ assert info["state"]["requested_review_ids"] == ["hard_customer_merge_review"]
91
+
92
+ observation, reward, done, _ = env.step(
93
+ DataCleaningAction(
94
+ action_type="inspect_table",
95
+ table_name="customers",
96
+ reasoning="Read the customer table again after the review response arrives.",
97
+ )
98
+ )
99
+ assert done is False
100
+ assert reward > 0.0
101
+ assert len(observation.pending_reviews) == 0
102
+ assert len(observation.resolved_reviews) == 1
103
+ resolved_review = observation.resolved_reviews[0]
104
+ assert resolved_review.review_id == "hard_customer_merge_review"
105
+ assert "hard_merge_customers_by_email" in resolved_review.recommended_operation_ids
106
+ assert "Review response available" in observation.last_action_status
107
+
108
+
109
+ def test_run_sync_dry_run_surfaces_downstream_findings() -> None:
110
+ env = LocalCleanOpsEnv()
111
+ observation = env.reset(task_id="crm_migration_hard", seed=7)
112
+ starting_health = observation.downstream_health.overall_health_score
113
+
114
+ observation, reward, done, info = env.step(
115
+ DataCleaningAction(
116
+ action_type="run_sync_dry_run",
117
+ target_system="billing",
118
+ reasoning="Check whether the current migration state would break downstream billing.",
119
+ )
120
+ )
121
+ assert done is False
122
+ assert observation.last_dry_run is not None
123
+ assert observation.last_dry_run.target_system == "billing"
124
+ assert observation.last_dry_run.finding_count > 0
125
+ assert observation.last_dry_run.success_rate == observation.downstream_health.billing_link_integrity
126
+ assert "billing" in info["state"]["dry_run_targets"]
127
+ assert observation.downstream_health.overall_health_score == starting_health
128
+
129
+
130
+ def test_duplicate_review_request_is_penalized() -> None:
131
+ env = LocalCleanOpsEnv()
132
+ env.reset(task_id="customer_contacts_easy", seed=7)
133
+ env.step(
134
+ DataCleaningAction(
135
+ action_type="request_review",
136
+ entity_type="customer",
137
+ entity_id="C005",
138
+ reason_code="possible_duplicate",
139
+ reasoning="Ask for confirmation once.",
140
+ )
141
+ )
142
+ observation, reward, done, _ = env.step(
143
+ DataCleaningAction(
144
+ action_type="request_review",
145
+ entity_type="customer",
146
+ entity_id="C005",
147
+ reason_code="possible_duplicate",
148
+ reasoning="Repeat the same review request.",
149
+ )
150
+ )
151
+ assert done is False
152
+ assert reward < 0.0
153
+ assert observation.review_budget_remaining == 0
154
+ assert len(observation.pending_reviews) == 0
155
+ assert len(observation.resolved_reviews) == 1
156
+ assert "already requested" in observation.last_action_status