yashmarathe commited on
Commit
f380b90
Β·
0 Parent(s):

feat: Data Cleaning RL Environment for OpenEnv Hackathon

Browse files

Complete OpenEnv-compliant RL environment where an agent cleans tabular
datasets (iris/adult/credit-g from OpenML) using structured commands.

- datasets.py: load and cache OpenML datasets with sampling for large sets
- noise_injector.py: deterministic seeded noise (missing, type errors,
duplicates, outliers, schema violations) across 3 difficulty tiers
- models.py: typed Pydantic models for actions, observations, episode state
- server/environment.py: episode management, all 7 action types, rewards
- grader.py: RandomForest-based bracketed normalization grader
- server/app.py: FastAPI with /reset /step /state /tasks /grader /baseline /health
- baseline.py: heuristic agent with HTTP and internal execution modes
- client.py: async httpx client wrapper
- openenv.yaml, Dockerfile, requirements.txt, pyproject.toml, README.md

.gitignore ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Python
2
+ __pycache__/
3
+ *.py[cod]
4
+ *.pyo
5
+ *.pyd
6
+ .Python
7
+ *.egg
8
+ *.egg-info/
9
+ dist/
10
+ build/
11
+ .eggs/
12
+ .venv/
13
+ venv/
14
+ env/
15
+ .env
16
+
17
+ # scikit-learn / OpenML cache
18
+ ~/scikit_learn_data/
19
+ scikit_learn_data/
20
+
21
+ # Pytest
22
+ .pytest_cache/
23
+ .coverage
24
+ htmlcov/
25
+
26
+ # Ruff
27
+ .ruff_cache/
28
+
29
+ # IDEs
30
+ .vscode/
31
+ .idea/
32
+ *.swp
33
+ *.swo
34
+
35
+ # OS
36
+ .DS_Store
37
+ Thumbs.db
38
+
39
+ # Docker
40
+ *.log
data_cleaning_env/.dockerignore ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ __pycache__/
2
+ *.py[cod]
3
+ *.egg-info/
4
+ .eggs/
5
+ dist/
6
+ build/
7
+ .env
8
+ outputs/
9
+ *.log
10
+ .pytest_cache/
11
+ .coverage
data_cleaning_env/README.md ADDED
@@ -0,0 +1,212 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Data Cleaning OpenEnv
2
+
3
+ An [OpenEnv](https://github.com/meta-pytorch/OpenEnv) RL environment where an AI agent cleans tabular datasets using structured commands. The agent is rewarded for improving data quality β€” measured by column-level accuracy each step and by downstream ML model accuracy at episode end.
4
+
5
+ Built for the [Meta PyTorch OpenEnv Hackathon x SST](https://www.scaler.com/school-of-technology) Round 1.
6
+
7
+ ---
8
+
9
+ ## Environment Description
10
+ The agent receives a dirty tabular dataset and must apply cleaning operations to restore it toward the original ground-truth data. Each episode ends either when the agent issues a `done` action or the step limit is reached. The episode is then scored by training a RandomForest classifier on the cleaned data and measuring accuracy improvement over the dirty baseline.
11
+
12
+ **Datasets** (from [OpenML](https://openml.org)):
13
+
14
+ | Task | Dataset | OpenML ID | Noise Types | Max Steps |
15
+ |----|--------|---------|-----------------------|-----------|
16
+ | easy | iris | 61 | Missing values (15% of numeric cols) | 20 |
17
+ | medium | adult | 1590 | Missing values (20%), type errors, duplicate rows (3%) | 40 |
18
+ | hard | credit-g | 31 | Missing values (25%), type errors, duplicates (5%), outliers, schema violations | 60 |
19
+
20
+ Noise injection is **deterministic** (seeded at 42), ensuring reproducibility.
21
+
22
+ ---
23
+
24
+ ## Action Space
25
+
26
+ Actions are typed JSON objects. The `action_type` field is always required.
27
+
28
+ | Action | Required Fields | Description |
29
+ |---------------------|-----------------|---------------------|
30
+ | `fill_missing` | `column`, `strategy` | Fill NaN values. Strategy: `mean\|median\|mode\|constant` |
31
+ | `drop_duplicates` | β€” | Remove all duplicate rows |
32
+ | `fix_type` | `column`, `dtype` | Coerce column dtype. dtype: `int\|float\|str` |
33
+ | `normalize` | `column` | Z-score normalize a numeric column |
34
+ | `drop_outliers` | `column`, `method` | Remove outliers. method: `iqr\|zscore` |
35
+ | `fix_schema_violation` | `column`, `constraint` | Fix constraint. constraint: `non_negative\|clamp_range` |
36
+ | `done` | β€” | Signal episode completion |
37
+ **Example action JSON:**
38
+ ```json
39
+ {
40
+ "action_type": "fill_missing",
41
+ "column": "age",
42
+ "strategy": "median"
43
+ }
44
+ ```
45
+
46
+ ---
47
+
48
+ ## Observation Space
49
+
50
+ Each `step()` and `reset()` call returns an observation with the following fields:
51
+
52
+ | Field | Type | Description |
53
+ |----------------|----------------|-----------------|
54
+ | `task` | `str` | Task tier: `easy`, `medium`, or `hard` |
55
+ | `step` | `int` | Current step number (0-indexed) |
56
+ | `max_steps` | `int` | Maximum steps in this episode |
57
+ | `columns` | `List[str]` | All column names |
58
+ | `column_issues` | `Dict[str, ColumnIssues]` | Per-column data quality issues |
59
+ | `column_stats` | `Dict[str, ColumnStats]` | Per-column statistics (mean, std, null_count, …) |
60
+ | `reward` | `float` | Per-step reward from the last action |
61
+ | `done` | `bool` | True if the episode has ended |
62
+
63
+ **`ColumnIssues` fields:**
64
+ - `missing_count` β€” number of NaN values
65
+ - `missing_pct` β€” fraction of NaN values [0, 1]
66
+ - `type_errors` β€” values that cannot be parsed as the expected dtype
67
+ - `outlier_count` β€” values outside 1.5Γ—IQR
68
+ - `has_duplicates` β€” whether any duplicate rows exist in the dataset
69
+
70
+ ---
71
+
72
+ ## Reward Function
73
+
74
+ - **Per-step reward:** Column-level accuracy delta relative to clean ground truth.
75
+ `reward = clip(new_accuracy - prev_accuracy, -0.1, +0.1)`
76
+ Invalid actions receive `-0.05`.
77
+
78
+ - **Episode grader score:** Downstream RandomForest accuracy improvement, normalized:
79
+ `score = clip((agent_acc - dirty_acc) / (oracle_acc - dirty_acc), 0.0, 1.0)`
80
+ - `0.0` = no improvement over the dirty baseline
81
+ - `1.0` = restored to oracle (original) quality
82
+
83
+ ---
84
+
85
+ ## API Endpoints
86
+
87
+ | Method | Path | Description |
88
+ |------|-------------|-------------|
89
+ | POST | `/reset` | Start a new episode. Body: `{"task": "easy\|medium\|hard"}` |
90
+ | POST | `/step` | Apply an action. Body: `{"episode_id": "...", "action": {…}}`|
91
+ | GET | `/state` | Get episode metadata. Query: `?episode_id=…` |
92
+ | GET | `/tasks` | List tasks and the full action schema |
93
+ | POST | `/grader` | Grade a completed episode. Body: `{"episode_id": "..."}` |
94
+ | POST | `/baseline` | Run the heuristic baseline agent across all 3 tasks |
95
+ | GET | `/health` | Liveness check |
96
+
97
+ Interactive API docs available at `/docs` when the server is running.
98
+
99
+ ---
100
+
101
+ ## Setup
102
+
103
+ ### Local (Python)
104
+
105
+ ```bash
106
+ cd data_cleaning_env
107
+
108
+ # Install dependencies
109
+ pip install -r server/requirements.txt
110
+
111
+ # Run the server
112
+ uvicorn server.app:app --host 0.0.0.0 --port 8000
113
+
114
+ # In another terminal β€” run the baseline agent
115
+ python baseline.py --url http://localhost:8000
116
+ ```
117
+
118
+ ### Docker
119
+
120
+ ```bash
121
+ # Build from data_cleaning_env/
122
+ docker build -f server/Dockerfile -t data-cleaning-env .
123
+
124
+ # Run
125
+ docker run -p 8000:8000 data-cleaning-env
126
+
127
+ # Test
128
+ curl http://localhost:8000/health
129
+ curl -X POST http://localhost:8000/reset -H "Content-Type: application/json" -d '{"task":"easy"}'
130
+ ```
131
+
132
+ ---
133
+
134
+ ## Baseline Scores
135
+
136
+ Run the heuristic baseline agent (fill missing β†’ fix types β†’ drop duplicates β†’ done):
137
+
138
+ ```
139
+ Task easy: ~0.70–0.85
140
+ Task medium: ~0.40–0.60
141
+ Task hard: ~0.20–0.40
142
+ ```
143
+
144
+ Actual scores depend on the dataset split and RandomForest training run. They are deterministic given the same seed.
145
+
146
+ To reproduce:
147
+ ```bash
148
+ python baseline.py --url http://localhost:8000
149
+ ```
150
+
151
+ Or via the API:
152
+ ```bash
153
+ curl -X POST http://localhost:8000/baseline
154
+ ```
155
+
156
+ ---
157
+
158
+ ## Quick Example
159
+
160
+ ```python
161
+ import requests
162
+
163
+ BASE = "http://localhost:8000"
164
+
165
+ # Start a medium-difficulty episode
166
+ resp = requests.post(f"{BASE}/reset", json={"task": "medium"}).json()
167
+ episode_id = resp["state"]["episode_id"]
168
+ obs = resp["observation"]
169
+
170
+ print(f"Columns: {obs['columns']}")
171
+ print(f"Issues: {obs['column_issues']}")
172
+
173
+ # Fill missing values in the first column
174
+ first_col = obs["columns"][0]
175
+ resp = requests.post(f"{BASE}/step", json={
176
+ "episode_id": episode_id,
177
+ "action": {"action_type": "fill_missing", "column": first_col, "strategy": "median"}
178
+ }).json()
179
+ print(f"Reward: {resp['reward']}")
180
+
181
+ # Grade the episode
182
+ resp = requests.post(f"{BASE}/grader", json={"episode_id": episode_id}).json()
183
+ print(f"Score: {resp['score']}")
184
+ ```
185
+
186
+ ---
187
+ ## Project Structure
188
+
189
+ ```
190
+ data_cleaning_env/
191
+ β”œβ”€β”€ __init__.py # Package exports
192
+ β”œβ”€β”€ models.py # Pydantic: CleaningAction, Observation, EpisodeState
193
+ β”œβ”€β”€ datasets.py # OpenML dataset loading and caching
194
+ β”œβ”€β”€ noise_injector.py # Deterministic noise injection (3 task levels)
195
+ β”œβ”€β”€ grader.py # Episode grader: sklearn RandomForest + bracketed score
196
+ β”œβ”€β”€ baseline.py # Heuristic baseline agent
197
+ β”œβ”€β”€ client.py # Async HTTP client wrapper
198
+ β”œβ”€β”€ openenv.yaml # OpenEnv manifest
199
+ β”œβ”€β”€ pyproject.toml # Package metadata
200
+ └── server/
201
+ β”œβ”€β”€ __init__.py
202
+ β”œβ”€β”€ environment.py # Core env logic (episodes, actions, rewards, observations)
203
+ β”œβ”€β”€ app.py # FastAPI application
204
+ β”œβ”€β”€ requirements.txt # Docker dependencies
205
+ └── Dockerfile # Container image
206
+ ```
207
+
208
+ ---
209
+
210
+ ## License
211
+
212
+ MIT
data_cleaning_env/__init__.py ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Data Cleaning RL Environment for OpenEnv.
3
+
4
+ An RL environment where agents clean tabular datasets using structured commands,
5
+ graded by downstream ML model accuracy improvement.
6
+ """
7
+
8
+ from models import CleaningAction, Observation, EpisodeState
9
+ from client import DataCleaningEnvClient
10
+
11
+ __all__ = ["CleaningAction", "Observation", "EpisodeState", "DataCleaningEnvClient"]
12
+ __version__ = "1.0.0"
data_cleaning_env/baseline.py ADDED
@@ -0,0 +1,253 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Heuristic baseline agent for the Data Cleaning RL Environment.
3
+
4
+ Strategy (per task):
5
+ 1. Fill missing values with median for all columns that have missing data
6
+ 2. Fix type errors (coerce to float) for all columns with type errors
7
+ 3. Drop duplicate rows once
8
+ 4. Drop outliers (IQR) for columns with many outliers (medium/hard tasks)
9
+ 5. Fix schema violations (non_negative) for first 2 columns (hard task only)
10
+ 6. Signal done
11
+
12
+ Run standalone:
13
+ python baseline.py [--url http://localhost:8000]
14
+
15
+ Or import and call from app.py:
16
+ from baseline import run_baseline_internal
17
+ scores = run_baseline_internal(env)
18
+ """
19
+
20
+ from __future__ import annotations
21
+
22
+ import argparse
23
+ import json
24
+ import sys
25
+ from typing import TYPE_CHECKING
26
+
27
+ import requests
28
+
29
+ if TYPE_CHECKING:
30
+ from server.environment import DataCleaningEnvironment
31
+
32
+
33
+ BASE_URL_DEFAULT = "http://localhost:8000"
34
+
35
+
36
+ def run_single_episode_http(task: str, base_url: str) -> float:
37
+ """Run one baseline episode via the HTTP API. Returns the grader score."""
38
+ # Reset
39
+ resp = requests.post(f"{base_url}/reset", json={"task": task}, timeout=30)
40
+ resp.raise_for_status()
41
+ data = resp.json()
42
+ episode_id = data["state"]["episode_id"]
43
+ obs = data["observation"]
44
+ issues = obs["column_issues"]
45
+ task_max_steps = obs["max_steps"]
46
+ steps_used = 0
47
+
48
+ def post_step(action_payload: dict) -> dict:
49
+ nonlocal steps_used
50
+ r = requests.post(
51
+ f"{base_url}/step",
52
+ json={"episode_id": episode_id, "action": action_payload},
53
+ timeout=30,
54
+ )
55
+ r.raise_for_status()
56
+ steps_used += 1
57
+ return r.json()
58
+
59
+ # 1. Fill missing values
60
+ for col, col_issues in issues.items():
61
+ if steps_used >= task_max_steps - 3:
62
+ break
63
+ if col_issues["missing_count"] > 0:
64
+ post_step(
65
+ {
66
+ "action_type": "fill_missing",
67
+ "column": col,
68
+ "strategy": "median",
69
+ }
70
+ )
71
+
72
+ # 2. Fix type errors
73
+ for col, col_issues in issues.items():
74
+ if steps_used >= task_max_steps - 3:
75
+ break
76
+ if col_issues.get("type_errors", 0) > 0:
77
+ post_step(
78
+ {
79
+ "action_type": "fix_type",
80
+ "column": col,
81
+ "dtype": "float",
82
+ }
83
+ )
84
+
85
+ # 3. Drop duplicates
86
+ if steps_used < task_max_steps - 2 and any(
87
+ c.get("has_duplicates", False) for c in issues.values()
88
+ ):
89
+ post_step({"action_type": "drop_duplicates"})
90
+
91
+ # 4. Drop outliers (medium and hard tasks)
92
+ if task in ("medium", "hard"):
93
+ for col, col_issues in issues.items():
94
+ if steps_used >= task_max_steps - 1:
95
+ break
96
+ if col_issues.get("outlier_count", 0) > 3:
97
+ post_step(
98
+ {
99
+ "action_type": "drop_outliers",
100
+ "column": col,
101
+ "method": "iqr",
102
+ }
103
+ )
104
+
105
+ # 5. Fix schema violations (hard task only)
106
+ if task == "hard":
107
+ for col in list(issues.keys())[:2]:
108
+ if steps_used >= task_max_steps - 1:
109
+ break
110
+ post_step(
111
+ {
112
+ "action_type": "fix_schema_violation",
113
+ "column": col,
114
+ "constraint": "non_negative",
115
+ }
116
+ )
117
+
118
+ # 6. Done
119
+ post_step({"action_type": "done"})
120
+
121
+ # Grade
122
+ resp = requests.post(
123
+ f"{base_url}/grader", json={"episode_id": episode_id}, timeout=60
124
+ )
125
+ resp.raise_for_status()
126
+ return float(resp.json()["score"])
127
+
128
+
129
+ def run_baseline_http(base_url: str = BASE_URL_DEFAULT) -> dict[str, float]:
130
+ """Run baseline across all tasks via HTTP. Returns {task: score}."""
131
+ scores: dict[str, float] = {}
132
+ for task in ["easy", "medium", "hard"]:
133
+ score = run_single_episode_http(task, base_url)
134
+ scores[task] = round(score, 4)
135
+ print(f" Task {task}: {scores[task]:.4f}")
136
+ return scores
137
+
138
+
139
+ def run_baseline_internal(env: "DataCleaningEnvironment") -> dict[str, float]:
140
+ """
141
+ Run baseline directly against the environment object (no HTTP).
142
+ Used by the /baseline endpoint to avoid HTTP round-trips.
143
+ """
144
+ from models import (
145
+ ActionType,
146
+ CleaningAction,
147
+ DType,
148
+ FillStrategy,
149
+ OutlierMethod,
150
+ SchemaConstraint,
151
+ )
152
+ from grader import grade_episode
153
+
154
+ scores: dict[str, float] = {}
155
+
156
+ for task in ["easy", "medium", "hard"]:
157
+ obs, state = env.reset(task=task)
158
+ episode_id = state.episode_id
159
+ issues = obs.column_issues
160
+ max_steps = obs.max_steps
161
+ steps_used = 0
162
+
163
+ def do_step(action: CleaningAction) -> None:
164
+ nonlocal steps_used, obs
165
+ if steps_used >= max_steps - 1:
166
+ return
167
+ obs_new, _reward, _done = env.step(episode_id, action)
168
+ obs = obs_new
169
+ steps_used += 1
170
+
171
+ # 1. Fill missing
172
+ for col, col_issues in issues.items():
173
+ if steps_used >= max_steps - 3:
174
+ break
175
+ if col_issues.missing_count > 0:
176
+ do_step(
177
+ CleaningAction(
178
+ action_type=ActionType.fill_missing,
179
+ column=col,
180
+ strategy=FillStrategy.median,
181
+ )
182
+ )
183
+
184
+ # 2. Fix type errors
185
+ for col, col_issues in issues.items():
186
+ if steps_used >= max_steps - 3:
187
+ break
188
+ if col_issues.type_errors > 0:
189
+ do_step(
190
+ CleaningAction(
191
+ action_type=ActionType.fix_type,
192
+ column=col,
193
+ dtype=DType.float,
194
+ )
195
+ )
196
+
197
+ # 3. Drop duplicates
198
+ if steps_used < max_steps - 2 and any(
199
+ c.has_duplicates for c in issues.values()
200
+ ):
201
+ do_step(CleaningAction(action_type=ActionType.drop_duplicates))
202
+
203
+ # 4. Drop outliers (medium/hard)
204
+ if task in ("medium", "hard"):
205
+ for col, col_issues in issues.items():
206
+ if steps_used >= max_steps - 1:
207
+ break
208
+ if col_issues.outlier_count > 3:
209
+ do_step(
210
+ CleaningAction(
211
+ action_type=ActionType.drop_outliers,
212
+ column=col,
213
+ method=OutlierMethod.iqr,
214
+ )
215
+ )
216
+
217
+ # 5. Fix schema violations (hard only)
218
+ if task == "hard":
219
+ for col in list(issues.keys())[:2]:
220
+ if steps_used >= max_steps - 1:
221
+ break
222
+ do_step(
223
+ CleaningAction(
224
+ action_type=ActionType.fix_schema_violation,
225
+ column=col,
226
+ constraint=SchemaConstraint.non_negative,
227
+ )
228
+ )
229
+
230
+ # 6. Done
231
+ env.step(episode_id, CleaningAction(action_type=ActionType.done))
232
+
233
+ # Grade
234
+ ep = env.episodes[episode_id]
235
+ score = grade_episode(ep["current_df"], task, ep["target_col"])
236
+ scores[task] = round(score, 4)
237
+ print(f" Task {task}: {scores[task]:.4f}")
238
+
239
+ return scores
240
+
241
+
242
+ if __name__ == "__main__":
243
+ parser = argparse.ArgumentParser(description="Run baseline heuristic agent")
244
+ parser.add_argument(
245
+ "--url",
246
+ default=BASE_URL_DEFAULT,
247
+ help=f"Base URL of the running server (default: {BASE_URL_DEFAULT})",
248
+ )
249
+ args = parser.parse_args()
250
+
251
+ print(f"Running baseline agent against {args.url} ...")
252
+ scores = run_baseline_http(args.url)
253
+ print(f"\nBaseline scores:\n{json.dumps(scores, indent=2)}")
data_cleaning_env/client.py ADDED
@@ -0,0 +1,99 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Python client for the Data Cleaning RL Environment.
3
+
4
+ Provides a lightweight async wrapper for local testing and integration
5
+ with RL training frameworks.
6
+
7
+ Usage (async):
8
+ import asyncio
9
+ from client import DataCleaningEnvClient
10
+ from models import CleaningAction, ActionType, FillStrategy
11
+
12
+ async def main():
13
+ client = DataCleaningEnvClient(base_url="http://localhost:8000")
14
+ result = await client.reset(task="easy")
15
+ episode_id = result["state"]["episode_id"]
16
+ action = CleaningAction(
17
+ action_type=ActionType.fill_missing,
18
+ column="sepallength",
19
+ strategy=FillStrategy.median,
20
+ )
21
+ result = await client.step(episode_id, action)
22
+ print(result)
23
+
24
+ asyncio.run(main())
25
+ """
26
+
27
+ from __future__ import annotations
28
+
29
+ from typing import Any
30
+
31
+ try:
32
+ import httpx
33
+
34
+ _HAS_HTTPX = True
35
+ except ImportError:
36
+ _HAS_HTTPX = False
37
+
38
+ from models import CleaningAction
39
+
40
+
41
+ class DataCleaningEnvClient:
42
+ """Async HTTP client for the Data Cleaning OpenEnv server."""
43
+
44
+ def __init__(self, base_url: str = "http://localhost:8000") -> None:
45
+ self.base_url = base_url.rstrip("/")
46
+
47
+ async def reset(self, task: str = "easy") -> dict[str, Any]:
48
+ """Start a new episode. Returns {observation, state}."""
49
+ return await self._post("/reset", {"task": task})
50
+
51
+ async def step(self, episode_id: str, action: CleaningAction) -> dict[str, Any]:
52
+ """Apply a cleaning action. Returns {observation, reward, done, info}."""
53
+ return await self._post(
54
+ "/step",
55
+ {
56
+ "episode_id": episode_id,
57
+ "action": action.model_dump(),
58
+ },
59
+ )
60
+
61
+ async def get_state(self, episode_id: str) -> dict[str, Any]:
62
+ """Get episode metadata."""
63
+ return await self._get(f"/state?episode_id={episode_id}")
64
+
65
+ async def grade(self, episode_id: str) -> dict[str, Any]:
66
+ """Grade the current episode. Returns {episode_id, task, score}."""
67
+ return await self._post("/grader", {"episode_id": episode_id})
68
+
69
+ async def get_tasks(self) -> dict[str, Any]:
70
+ """Get available tasks and action schema."""
71
+ return await self._get("/tasks")
72
+
73
+ async def baseline(self) -> dict[str, Any]:
74
+ """Trigger the baseline agent and return scores."""
75
+ return await self._post("/baseline", {})
76
+
77
+ async def health(self) -> dict[str, Any]:
78
+ """Liveness check."""
79
+ return await self._get("/health")
80
+
81
+ async def _post(self, path: str, payload: dict) -> dict[str, Any]:
82
+ if not _HAS_HTTPX:
83
+ raise ImportError(
84
+ "httpx is required for async HTTP. Install it: pip install httpx"
85
+ )
86
+ async with httpx.AsyncClient(base_url=self.base_url, timeout=60) as client:
87
+ resp = await client.post(path, json=payload)
88
+ resp.raise_for_status()
89
+ return resp.json()
90
+
91
+ async def _get(self, path: str) -> dict[str, Any]:
92
+ if not _HAS_HTTPX:
93
+ raise ImportError(
94
+ "httpx is required for async HTTP. Install it: pip install httpx"
95
+ )
96
+ async with httpx.AsyncClient(base_url=self.base_url, timeout=60) as client:
97
+ resp = await client.get(path)
98
+ resp.raise_for_status()
99
+ return resp.json()
data_cleaning_env/datasets.py ADDED
@@ -0,0 +1,90 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Dataset loading and caching for the Data Cleaning RL Environment.
3
+
4
+ Uses sklearn.datasets.fetch_openml to load public OpenML datasets.
5
+ Results are cached in memory after the first load to avoid repeated downloads.
6
+ """
7
+
8
+ from __future__ import annotations
9
+
10
+ import pandas as pd
11
+ from sklearn.datasets import fetch_openml
12
+
13
+ TASK_CONFIGS: dict[str, dict] = {
14
+ "easy": {
15
+ "data_id": 61, # iris β€” 150 rows, 4 numeric features, 3-class
16
+ "name": "iris",
17
+ "target": "class",
18
+ "sample_size": None, # use full dataset
19
+ },
20
+ "medium": {
21
+ "data_id": 1590, # adult v2 β€” 48,842 rows, 15 features, binary class
22
+ "name": "adult",
23
+ "target": "class",
24
+ "sample_size": 2000, # sample for grading speed
25
+ },
26
+ "hard": {
27
+ "data_id": 31, # credit-g β€” 1,000 rows, 20 features, binary class
28
+ "name": "credit-g",
29
+ "target": "class",
30
+ "sample_size": None, # use full dataset
31
+ },
32
+ }
33
+
34
+ # In-memory cache: task -> (clean_df, target_col)
35
+ _DATASET_CACHE: dict[str, tuple[pd.DataFrame, str]] = {}
36
+
37
+
38
+ def load_clean_dataset(task: str) -> tuple[pd.DataFrame, str]:
39
+ """
40
+ Load a clean OpenML dataset for the given task.
41
+
42
+ Returns:
43
+ (clean_df, target_col) β€” DataFrame with original (clean) data and the
44
+ name of the target column.
45
+
46
+ Raises:
47
+ ValueError: if task is not one of "easy", "medium", "hard".
48
+ """
49
+ if task not in TASK_CONFIGS:
50
+ raise ValueError(f"Unknown task '{task}'. Must be one of: {list(TASK_CONFIGS)}")
51
+
52
+ if task in _DATASET_CACHE:
53
+ df, target = _DATASET_CACHE[task]
54
+ return df.copy(), target
55
+
56
+ cfg = TASK_CONFIGS[task]
57
+ dataset = fetch_openml(
58
+ data_id=cfg["data_id"],
59
+ as_frame=True,
60
+ cache=True,
61
+ parser="auto",
62
+ )
63
+ df: pd.DataFrame = dataset.frame.copy()
64
+
65
+ # Rename target column to "class" for consistency if needed
66
+ target_col: str = cfg["target"]
67
+ if dataset.target_names and dataset.target_names[0] != target_col:
68
+ actual_target = dataset.target_names[0]
69
+ if actual_target in df.columns:
70
+ df = df.rename(columns={actual_target: target_col})
71
+
72
+ # Ensure target column exists; if it's in dataset.target but not frame, add it
73
+ if target_col not in df.columns and dataset.target is not None:
74
+ df[target_col] = dataset.target.values
75
+
76
+ # Sample for large datasets to keep grading fast
77
+ if cfg["sample_size"] is not None and len(df) > cfg["sample_size"]:
78
+ df = df.sample(n=cfg["sample_size"], random_state=42).reset_index(drop=True)
79
+
80
+ # Reset index for clean indexing
81
+ df = df.reset_index(drop=True)
82
+
83
+ _DATASET_CACHE[task] = (df.copy(), target_col)
84
+ return df.copy(), target_col
85
+
86
+
87
+ def preload_all() -> None:
88
+ """Preload all datasets into cache. Call at server startup."""
89
+ for task in TASK_CONFIGS:
90
+ load_clean_dataset(task)
data_cleaning_env/grader.py ADDED
@@ -0,0 +1,161 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Grader for the Data Cleaning RL Environment.
3
+
4
+ Trains a simple sklearn RandomForest on the agent cleaned dataset and scores
5
+ improvement using a bracketed normalization formula:
6
+
7
+ score = clip((agent_acc - dirty_acc) / (oracle_acc - dirty_acc), 0.0, 1.0)
8
+
9
+ Oracle and dirty baselines are precomputed once at server startup.
10
+ """
11
+
12
+ from __future__ import annotations
13
+
14
+ import numpy as np
15
+ import pandas as pd
16
+ from sklearn.compose import ColumnTransformer
17
+ from sklearn.ensemble import RandomForestClassifier
18
+ from sklearn.impute import SimpleImputer
19
+ from sklearn.model_selection import train_test_split
20
+ from sklearn.pipeline import Pipeline
21
+ from sklearn.preprocessing import LabelEncoder, StandardScaler
22
+
23
+ # Precomputed at startup, keyed by task name
24
+ _ORACLE_SCORES: dict[str, float] = {}
25
+ _DIRTY_SCORES: dict[str, float] = {}
26
+
27
+
28
+ def train_and_score(df: pd.DataFrame, target_col: str) -> float:
29
+ """
30
+ Train a RandomForest classifier on df and return test accuracy.
31
+ Returns 0.0 on any failure to avoid crashing the grader endpoint.
32
+ """
33
+ try:
34
+ if target_col not in df.columns:
35
+ return 0.0
36
+
37
+ df = df.copy()
38
+ X = df.drop(columns=[target_col])
39
+ y = df[target_col].astype(str)
40
+
41
+ # Encode target labels
42
+ le = LabelEncoder()
43
+ y_enc = le.fit_transform(y.fillna("__missing__"))
44
+
45
+ # Need at least 2 classes and enough samples
46
+ if len(np.unique(y_enc)) < 2 or len(df) < 10:
47
+ return 0.0
48
+
49
+ # Build column transformer
50
+ num_cols = X.select_dtypes(include="number").columns.tolist()
51
+ cat_cols = X.select_dtypes(exclude="number").columns.tolist()
52
+
53
+ transformers: list = []
54
+ if num_cols:
55
+ transformers.append(
56
+ (
57
+ "num",
58
+ Pipeline(
59
+ [
60
+ ("impute", SimpleImputer(strategy="median")),
61
+ ("scale", StandardScaler()),
62
+ ]
63
+ ),
64
+ num_cols,
65
+ )
66
+ )
67
+ if cat_cols:
68
+ transformers.append(
69
+ (
70
+ "cat",
71
+ SimpleImputer(strategy="most_frequent"),
72
+ cat_cols,
73
+ )
74
+ )
75
+
76
+ if not transformers:
77
+ return 0.0
78
+
79
+ preprocessor = ColumnTransformer(transformers, remainder="drop")
80
+ clf = Pipeline(
81
+ [
82
+ ("pre", preprocessor),
83
+ (
84
+ "clf",
85
+ RandomForestClassifier(
86
+ n_estimators=50,
87
+ random_state=42,
88
+ n_jobs=-1,
89
+ max_depth=10,
90
+ ),
91
+ ),
92
+ ]
93
+ )
94
+
95
+ # Stratified split
96
+ try:
97
+ X_train, X_test, y_train, y_test = train_test_split(
98
+ X,
99
+ y_enc,
100
+ test_size=0.2,
101
+ random_state=42,
102
+ stratify=y_enc,
103
+ )
104
+ except ValueError:
105
+ X_train, X_test, y_train, y_test = train_test_split(
106
+ X,
107
+ y_enc,
108
+ test_size=0.2,
109
+ random_state=42,
110
+ )
111
+
112
+ clf.fit(X_train, y_train)
113
+ return float(clf.score(X_test, y_test))
114
+
115
+ except Exception:
116
+ return 0.0
117
+
118
+
119
+ def compute_oracle_and_dirty_baselines(
120
+ task: str,
121
+ clean_df: pd.DataFrame,
122
+ dirty_df: pd.DataFrame,
123
+ target_col: str,
124
+ ) -> None:
125
+ """
126
+ Precompute oracle and dirty baseline scores for a task.
127
+ Call once at server startup for each task.
128
+ """
129
+ _ORACLE_SCORES[task] = train_and_score(clean_df, target_col)
130
+ _DIRTY_SCORES[task] = train_and_score(dirty_df, target_col)
131
+
132
+
133
+ def grade_episode(
134
+ episode_cleaned_df: pd.DataFrame,
135
+ task: str,
136
+ target_col: str,
137
+ ) -> float:
138
+ """
139
+ Grade a completed episode.
140
+ Returns a score in [0.0, 1.0].
141
+ """
142
+ agent_acc = train_and_score(episode_cleaned_df, target_col)
143
+ oracle = _ORACLE_SCORES.get(task, 1.0)
144
+ dirty = _DIRTY_SCORES.get(task, 0.0)
145
+
146
+ if oracle <= dirty:
147
+ return 1.0 if agent_acc >= oracle else 0.0
148
+
149
+ score = (agent_acc - dirty) / (oracle - dirty)
150
+ return float(np.clip(score, 0.0, 1.0))
151
+
152
+
153
+ def get_baselines() -> dict[str, dict[str, float]]:
154
+ """Return the precomputed oracle and dirty scores for all tasks."""
155
+ return {
156
+ task: {
157
+ "oracle_accuracy": _ORACLE_SCORES.get(task),
158
+ "dirty_accuracy": _DIRTY_SCORES.get(task),
159
+ }
160
+ for task in ["easy", "medium", "hard"]
161
+ }
data_cleaning_env/models.py ADDED
@@ -0,0 +1,157 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Pydantic models for the Data Cleaning RL Environment.
3
+
4
+ Defines the typed action, observation, and state structures used by the
5
+ OpenEnv step()/reset()/state() API.
6
+ """
7
+
8
+ from __future__ import annotations
9
+
10
+ from enum import Enum
11
+ from typing import Any, Dict, List, Optional
12
+
13
+ from pydantic import BaseModel, Field
14
+
15
+
16
+ # ----------------------
17
+ # Action types
18
+ # -----------------------
19
+
20
+
21
+ class ActionType(str, Enum):
22
+ fill_missing = "fill_missing"
23
+ drop_duplicates = "drop_duplicates"
24
+ fix_type = "fix_type"
25
+ normalize = "normalize"
26
+ drop_outliers = "drop_outliers"
27
+ fix_schema_violation = "fix_schema_violation"
28
+ done = "done"
29
+
30
+
31
+ class FillStrategy(str, Enum):
32
+ mean = "mean"
33
+ median = "median"
34
+ mode = "mode"
35
+ constant = "constant"
36
+
37
+
38
+ class DType(str, Enum):
39
+ int = "int"
40
+ float = "float"
41
+ str = "str"
42
+
43
+
44
+ class OutlierMethod(str, Enum):
45
+ iqr = "iqr"
46
+ zscore = "zscore"
47
+
48
+
49
+ class SchemaConstraint(str, Enum):
50
+ non_negative = "non_negative"
51
+ clamp_range = "clamp_range"
52
+
53
+
54
+ class CleaningAction(BaseModel):
55
+ """A single cleaning action issued by the agent."""
56
+
57
+ action_type: ActionType = Field(
58
+ ...,
59
+ description="Type of cleaning action to perform.",
60
+ )
61
+ column: Optional[str] = Field(
62
+ None,
63
+ description="Target column name. Required for all column-level actions.",
64
+ )
65
+ strategy: Optional[FillStrategy] = Field(
66
+ None,
67
+ description="Fill strategy for fill_missing action.",
68
+ )
69
+ dtype: Optional[DType] = Field(
70
+ None,
71
+ description="Target dtype for fix_type action.",
72
+ )
73
+ method: Optional[OutlierMethod] = Field(
74
+ None,
75
+ description="Outlier detection method for drop_outliers action.",
76
+ )
77
+ constraint: Optional[SchemaConstraint] = Field(
78
+ None,
79
+ description="Constraint type for fix_schema_violation action.",
80
+ )
81
+ constant_value: Optional[Any] = Field(
82
+ None,
83
+ description="Constant fill value for fill_missing with strategy=constant.",
84
+ )
85
+
86
+
87
+ # ---------------
88
+ # Observation
89
+ # -------------------
90
+
91
+
92
+ class ColumnIssues(BaseModel):
93
+ """Per-column data quality issues detected in the current state."""
94
+
95
+ missing_count: int = Field(..., description="Number of missing (NaN) values.")
96
+ missing_pct: float = Field(..., description="Fraction of missing values [0, 1].")
97
+ type_errors: int = Field(
98
+ ...,
99
+ description="Number of cells that cannot be parsed as the expected dtype.",
100
+ )
101
+ outlier_count: int = Field(
102
+ ...,
103
+ description="Number of outliers detected via IQR rule.",
104
+ )
105
+ has_duplicates: bool = Field(
106
+ ...,
107
+ description="True if the dataset currently contains duplicate rows.",
108
+ )
109
+
110
+
111
+ class ColumnStats(BaseModel):
112
+ """Compact statistical summary for a column."""
113
+
114
+ mean: Optional[float] = None
115
+ std: Optional[float] = None
116
+ null_count: int = 0
117
+ unique_count: int = 0
118
+
119
+
120
+ class Observation(BaseModel):
121
+ """Observation returned by reset() and step()."""
122
+
123
+ task: str = Field(..., description="Task tier: 'easy', 'medium', or 'hard'.")
124
+ step: int = Field(..., description="Current step number (0-indexed).")
125
+ max_steps: int = Field(..., description="Maximum steps allowed in this episode.")
126
+ columns: List[str] = Field(..., description="Column names in the dataset.")
127
+ column_issues: Dict[str, ColumnIssues] = Field(
128
+ ...,
129
+ description="Data quality issues per column.",
130
+ )
131
+ column_stats: Dict[str, ColumnStats] = Field(
132
+ ...,
133
+ description="Compact statistics per column.",
134
+ )
135
+ reward: float = Field(
136
+ ...,
137
+ description="Per-step reward from the most recent action.",
138
+ )
139
+ done: bool = Field(..., description="True if the episode has ended.")
140
+
141
+
142
+ # ----------------
143
+ # Episode state
144
+ # --------------------
145
+
146
+
147
+ class EpisodeState(BaseModel):
148
+ """Metadata about the current episode, returned by state()."""
149
+
150
+ episode_id: str = Field(..., description="Unique episode identifier (UUID).")
151
+ task: str = Field(..., description="Task tier: 'easy', 'medium', or 'hard'.")
152
+ step: int = Field(..., description="Current step number.")
153
+ max_steps: int = Field(..., description="Maximum steps allowed.")
154
+ score: Optional[float] = Field(
155
+ None,
156
+ description="Final grader score (0.0–1.0). Only set after /grader is called.",
157
+ )
data_cleaning_env/noise_injector.py ADDED
@@ -0,0 +1,115 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Deterministic noise injection for the Data Cleaning RL Environment.
3
+
4
+ Each task tier injects a different combination and severity of noise into a
5
+ clean dataset. The same seed always produces the same dirty dataset, ensuring
6
+ reproducibility for judges and baseline evaluation.
7
+ """
8
+
9
+ from __future__ import annotations
10
+
11
+ import numpy as np
12
+ import pandas as pd
13
+
14
+
15
+ def inject_noise(df: pd.DataFrame, task: str, seed: int = 42) -> pd.DataFrame:
16
+ """
17
+ Inject noise into a clean DataFrame according to the task difficulty.
18
+ Args:
19
+ df: Clean source DataFrame (will not be modified in place).
20
+ task: One of "easy", "medium", "hard".
21
+ seed: RNG seed for reproducibility.
22
+
23
+ Returns:
24
+ A new dirty DataFrame.
25
+ """
26
+ if task == "easy":
27
+ return _inject_easy(df.copy(), seed)
28
+ elif task == "medium":
29
+ return _inject_medium(df.copy(), seed)
30
+ elif task == "hard":
31
+ return _inject_hard(df.copy(), seed)
32
+ else:
33
+ raise ValueError(f"Unknown task '{task}'. Must be one of: easy, medium, hard")
34
+
35
+
36
+ def _inject_easy(dirty: pd.DataFrame, seed: int) -> pd.DataFrame:
37
+ """Easy: 15 percent missing values in numeric columns only."""
38
+ rng = np.random.default_rng(seed)
39
+ numeric_cols = dirty.select_dtypes(include="number").columns.tolist()
40
+
41
+ for col in numeric_cols:
42
+ mask = rng.random(len(dirty)) < 0.15
43
+ dirty.loc[mask, col] = np.nan
44
+
45
+ return dirty.reset_index(drop=True)
46
+
47
+
48
+ def _inject_medium(dirty: pd.DataFrame, seed: int) -> pd.DataFrame:
49
+ """Medium: 20 percent missing + type errors in 2 numeric cols + 3 percent duplicate rows."""
50
+ rng = np.random.default_rng(seed)
51
+
52
+ # 1. Missing values in all columns
53
+ for col in dirty.columns:
54
+ mask = rng.random(len(dirty)) < 0.20
55
+ dirty.loc[mask, col] = np.nan
56
+
57
+ # 2. Type corruption: convert some non-null numeric cells to string
58
+ numeric_cols = dirty.select_dtypes(include="number").columns.tolist()
59
+ for col in numeric_cols[:2]:
60
+ mask = (rng.random(len(dirty)) < 0.05) & dirty[col].notna()
61
+ dirty[col] = dirty[col].astype(object)
62
+ dirty.loc[mask, col] = dirty.loc[mask, col].apply(
63
+ lambda x: f"err_{x}" if pd.notna(x) else x
64
+ )
65
+
66
+ # 3. Duplicate rows
67
+ n_dups = max(1, int(len(dirty) * 0.03))
68
+ dup_indices = rng.choice(len(dirty), size=n_dups, replace=True)
69
+ dup_rows = dirty.iloc[dup_indices]
70
+ dirty = pd.concat([dirty, dup_rows], ignore_index=True)
71
+
72
+ return dirty.reset_index(drop=True)
73
+
74
+
75
+ def _inject_hard(dirty: pd.DataFrame, seed: int) -> pd.DataFrame:
76
+ """Hard: 25 percent missing + type errors + outliers + duplicates + schema violations."""
77
+ rng = np.random.default_rng(seed)
78
+ numeric_cols = dirty.select_dtypes(include="number").columns.tolist()
79
+
80
+ # 1. Missing values in all columns
81
+ for col in dirty.columns:
82
+ mask = rng.random(len(dirty)) < 0.25
83
+ dirty.loc[mask, col] = np.nan
84
+
85
+ # 2. Type corruption in 3 numeric columns
86
+ for col in numeric_cols[:3]:
87
+ mask = (rng.random(len(dirty)) < 0.07) & dirty[col].notna()
88
+ dirty[col] = dirty[col].astype(object)
89
+ dirty.loc[mask, col] = dirty.loc[mask, col].apply(
90
+ lambda x: f"err_{x}" if pd.notna(x) else x
91
+ )
92
+
93
+ # 3. Outliers: set 5 percent of numeric values to 10x their column max
94
+ for col in numeric_cols:
95
+ numeric_series = pd.to_numeric(dirty[col], errors="coerce")
96
+ col_max = numeric_series.max()
97
+ if pd.notna(col_max) and col_max != 0:
98
+ mask = (rng.random(len(dirty)) < 0.05) & dirty[col].notna()
99
+ dirty.loc[mask, col] = col_max * 10
100
+
101
+ # 4. Duplicate rows
102
+ n_dups = max(1, int(len(dirty) * 0.05))
103
+ dup_indices = rng.choice(len(dirty), size=n_dups, replace=True)
104
+ dup_rows = dirty.iloc[dup_indices]
105
+ dirty = pd.concat([dirty, dup_rows], ignore_index=True)
106
+
107
+ # 5. Schema violations: negative values in first 2 strictly-positive columns
108
+ pos_cols = numeric_cols[:2]
109
+ for col in pos_cols:
110
+ numeric_series = pd.to_numeric(dirty[col], errors="coerce")
111
+ mask = (rng.random(len(dirty)) < 0.05) & dirty[col].notna()
112
+ positive_vals = numeric_series[mask].abs()
113
+ dirty.loc[mask, col] = -positive_vals
114
+
115
+ return dirty.reset_index(drop=True)
data_cleaning_env/openenv.yaml ADDED
@@ -0,0 +1,35 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ name: data-cleaning-env
2
+ version: "1.0.0"
3
+ description: >
4
+ RL environment for tabular data cleaning. An AI agent receives a dirty
5
+ OpenML dataset and issues structured cleaning commands (fill_missing,
6
+ fix_type, drop_duplicates, drop_outliers, fix_schema_violation, normalize).
7
+ Graded by downstream RandomForest accuracy improvement over a dirty baseline.
8
+ author: Yash Marathe
9
+ tags:
10
+ - data-engineering
11
+ - tabular
12
+ - real-world
13
+ - openml
14
+ - data-quality
15
+
16
+ tasks:
17
+ - id: easy
18
+ description: "Fix missing values in the Iris dataset (15% of numeric values missing)"
19
+ - id: medium
20
+ description: "Fix missing values, type errors, and duplicates in Adult Income dataset (2k sample)"
21
+ - id: hard
22
+ description: "Fix missing values, type errors, duplicates, outliers, and schema violations in Credit-G dataset"
23
+
24
+ server:
25
+ port: 8000
26
+ health_endpoint: /health
27
+
28
+ endpoints:
29
+ reset: POST /reset
30
+ step: POST /step
31
+ state: GET /state
32
+ tasks: GET /tasks
33
+ grader: POST /grader
34
+ baseline: POST /baseline
35
+ health: GET /health
data_cleaning_env/pyproject.toml ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ [build-system]
2
+ requires = ["hatchling"]
3
+ build-backend = "hatchling.build"
4
+
5
+ [project]
6
+ name = "data-cleaning-env"
7
+ version = "1.0.0"
8
+ description = "OpenEnv RL environment for tabular data cleaning. Agent issues structured commands to clean dirty datasets from OpenML, graded by downstream ML accuracy."
9
+ readme = "README.md"
10
+ requires-python = ">=3.11"
11
+ dependencies = [
12
+ "fastapi>=0.115",
13
+ "uvicorn[standard]>=0.30",
14
+ "pydantic>=2.0",
15
+ "scikit-learn>=1.4",
16
+ "pandas>=2.0",
17
+ "numpy>=1.26",
18
+ "requests>=2.31",
19
+ ]
20
+
21
+ [project.optional-dependencies]
22
+ dev = [
23
+ "pytest>=8.0",
24
+ "httpx>=0.27",
25
+ ]
26
+
27
+ [tool.hatch.build.targets.wheel]
28
+ packages = ["data_cleaning_env"]
data_cleaning_env/server/Dockerfile ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.11-slim
2
+
3
+ WORKDIR /app
4
+
5
+ # Install dependencies first (layer caching)
6
+ COPY server/requirements.txt .
7
+ RUN pip install --no-cache-dir -r requirements.txt
8
+
9
+ # Copy source
10
+ COPY . .
11
+
12
+ # Pre-download and cache OpenML datasets at build time to avoid cold-start latency.
13
+ # This also validates that all 3 datasets are reachable.
14
+ RUN python -c "\
15
+ import sys; sys.path.insert(0, '/app'); \
16
+ from datasets import preload_all; \
17
+ preload_all(); \
18
+ print('Datasets cached successfully.')"
19
+
20
+ EXPOSE 8000
21
+
22
+ # Run from /app so that all module imports resolve against data_cleaning_env/
23
+ CMD ["uvicorn", "server.app:app", "--host", "0.0.0.0", "--port", "8000"]
data_cleaning_env/server/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # server package
data_cleaning_env/server/app.py ADDED
@@ -0,0 +1,282 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ FastAPI server for the Data Cleaning RL Environment.
3
+
4
+ Implements the full OpenEnv standard API (reset/step/state) plus the
5
+ hackathon-required endpoints (/tasks, /grader, /baseline, /health).
6
+ """
7
+
8
+ from __future__ import annotations
9
+
10
+ import sys
11
+ import os
12
+
13
+ _HERE = os.path.dirname(os.path.abspath(__file__))
14
+ _ROOT = os.path.dirname(_HERE)
15
+ if _ROOT not in sys.path:
16
+ sys.path.insert(0, _ROOT)
17
+
18
+ from contextlib import asynccontextmanager
19
+ from typing import Optional
20
+
21
+ from fastapi import FastAPI, HTTPException
22
+ from pydantic import BaseModel
23
+
24
+ from datasets import load_clean_dataset, preload_all
25
+ from grader import compute_oracle_and_dirty_baselines, grade_episode
26
+ from models import CleaningAction
27
+ from noise_injector import inject_noise
28
+ from server.environment import DataCleaningEnvironment
29
+
30
+
31
+ # ---------------
32
+ # Startup / lifespan
33
+ # --------------
34
+
35
+
36
+ @asynccontextmanager
37
+ async def lifespan(app: FastAPI):
38
+ """
39
+ Preload datasets and compute oracle/dirty baselines at startup.
40
+ Avoids cold-start latency on the first API call.
41
+ """
42
+ preload_all()
43
+ for task in ["easy", "medium", "hard"]:
44
+ clean_df, target_col = load_clean_dataset(task)
45
+ dirty_df = inject_noise(clean_df, task)
46
+ compute_oracle_and_dirty_baselines(task, clean_df, dirty_df, target_col)
47
+ yield
48
+
49
+
50
+ app = FastAPI(
51
+ title="Data Cleaning OpenEnv",
52
+ description=(
53
+ "An OpenEnv RL environment where agents clean tabular datasets "
54
+ "using structured commands. Graded by downstream ML accuracy improvement."
55
+ ),
56
+ version="1.0.0",
57
+ lifespan=lifespan,
58
+ )
59
+
60
+ env = DataCleaningEnvironment()
61
+
62
+ # ------------
63
+ # Request/response models
64
+ # -------------
65
+
66
+
67
+ class ResetRequest(BaseModel):
68
+ task: Optional[str] = "easy"
69
+
70
+
71
+ class StepRequest(BaseModel):
72
+ episode_id: str
73
+ action: CleaningAction
74
+
75
+
76
+ class GraderRequest(BaseModel):
77
+ episode_id: str
78
+
79
+
80
+ # --------------
81
+ # Standard OpenEnv endpoints
82
+ # ----------------------
83
+
84
+
85
+ @app.post("/reset", summary="Start a new episode")
86
+ async def reset(req: ResetRequest):
87
+ """
88
+ Initialize a new episode with the given task tier.
89
+
90
+ Body: {"task": "easy" | "medium" | "hard"}
91
+
92
+ Returns the initial observation and episode state.
93
+ """
94
+ try:
95
+ obs, state = env.reset(task=req.task or "easy")
96
+ except ValueError as e:
97
+ raise HTTPException(status_code=400, detail=str(e))
98
+ return {"observation": obs.model_dump(), "state": state.model_dump()}
99
+
100
+
101
+ @app.post("/step", summary="Apply a cleaning action")
102
+ async def step(req: StepRequest):
103
+ """
104
+ Apply one cleaning action to the active episode.
105
+
106
+ Returns the updated observation, step reward, and done flag.
107
+ """
108
+ try:
109
+ obs, reward, done = env.step(req.episode_id, req.action)
110
+ except KeyError:
111
+ raise HTTPException(
112
+ status_code=404,
113
+ detail=f"Episode '{req.episode_id}' not found.",
114
+ )
115
+ except ValueError as e:
116
+ raise HTTPException(status_code=400, detail=str(e))
117
+ return {
118
+ "observation": obs.model_dump(),
119
+ "reward": reward,
120
+ "done": done,
121
+ "info": {},
122
+ }
123
+
124
+
125
+ @app.get("/state", summary="Get episode metadata")
126
+ async def state(episode_id: str):
127
+ """Return metadata about an active episode."""
128
+ ep = env.episodes.get(episode_id)
129
+ if not ep:
130
+ raise HTTPException(
131
+ status_code=404,
132
+ detail=f"Episode '{episode_id}' not found.",
133
+ )
134
+ return {
135
+ "episode_id": episode_id,
136
+ "task": ep["task"],
137
+ "step": ep["step"],
138
+ "max_steps": ep["max_steps"],
139
+ "done": ep["done"],
140
+ }
141
+
142
+
143
+ # -------------------------
144
+ # Hackathon-required endpoints
145
+ # -------------------------
146
+
147
+
148
+ @app.get("/tasks", summary="List tasks and action schema")
149
+ async def tasks():
150
+ """
151
+ Return the list of available tasks and the full action schema.
152
+
153
+ Required by the hackathon pre-submission checklist.
154
+ """
155
+ return {
156
+ "tasks": [
157
+ {
158
+ "id": "easy",
159
+ "description": (
160
+ "Fix missing values in the Iris dataset. "
161
+ "15% of numeric values are missing."
162
+ ),
163
+ "dataset": "iris (OpenML ID 61)",
164
+ "max_steps": 20,
165
+ "noise_types": ["missing_values"],
166
+ },
167
+ {
168
+ "id": "medium",
169
+ "description": (
170
+ "Fix missing values, type errors, and duplicate rows "
171
+ "in the Adult Income dataset (2,000-row sample)."
172
+ ),
173
+ "dataset": "adult (OpenML ID 1590, 2k sample)",
174
+ "max_steps": 40,
175
+ "noise_types": ["missing_values", "type_errors", "duplicates"],
176
+ },
177
+ {
178
+ "id": "hard",
179
+ "description": (
180
+ "Fix missing values, type errors, duplicates, outliers, "
181
+ "and schema violations in the Credit-G dataset."
182
+ ),
183
+ "dataset": "credit-g (OpenML ID 31)",
184
+ "max_steps": 60,
185
+ "noise_types": [
186
+ "missing_values",
187
+ "type_errors",
188
+ "duplicates",
189
+ "outliers",
190
+ "schema_violations",
191
+ ],
192
+ },
193
+ ],
194
+ "action_schema": {
195
+ "action_type": {
196
+ "type": "string",
197
+ "required": True,
198
+ "values": [
199
+ "fill_missing",
200
+ "drop_duplicates",
201
+ "fix_type",
202
+ "normalize",
203
+ "drop_outliers",
204
+ "fix_schema_violation",
205
+ "done",
206
+ ],
207
+ },
208
+ "column": {
209
+ "type": "string",
210
+ "required": "for column-level actions",
211
+ "description": "Target column name from the dataset.",
212
+ },
213
+ "strategy": {
214
+ "type": "string",
215
+ "required": "for fill_missing",
216
+ "values": ["mean", "median", "mode", "constant"],
217
+ },
218
+ "dtype": {
219
+ "type": "string",
220
+ "required": "for fix_type",
221
+ "values": ["int", "float", "str"],
222
+ },
223
+ "method": {
224
+ "type": "string",
225
+ "required": "for drop_outliers",
226
+ "values": ["iqr", "zscore"],
227
+ },
228
+ "constraint": {
229
+ "type": "string",
230
+ "required": "for fix_schema_violation",
231
+ "values": ["non_negative", "clamp_range"],
232
+ },
233
+ "constant_value": {
234
+ "type": "any",
235
+ "required": "for fill_missing with strategy=constant",
236
+ "description": "The constant value to fill missing cells with.",
237
+ },
238
+ },
239
+ }
240
+
241
+
242
+ @app.post("/grader", summary="Grade a completed episode")
243
+ async def grader(req: GraderRequest):
244
+ """
245
+ Compute the grader score for a completed (or ongoing) episode.
246
+
247
+ Score is in [0.0, 1.0]:
248
+ - 0.0 = no improvement over the dirty baseline
249
+ - 1.0 = dataset restored to oracle (original) quality
250
+ """
251
+ ep = env.episodes.get(req.episode_id)
252
+ if not ep:
253
+ raise HTTPException(
254
+ status_code=404,
255
+ detail=f"Episode '{req.episode_id}' not found.",
256
+ )
257
+ score = grade_episode(ep["current_df"], ep["task"], ep["target_col"])
258
+ return {
259
+ "episode_id": req.episode_id,
260
+ "task": ep["task"],
261
+ "score": score,
262
+ }
263
+
264
+
265
+ @app.post("/baseline", summary="Run the baseline heuristic agent")
266
+ async def baseline():
267
+ """
268
+ Run the built-in heuristic baseline agent through all 3 tasks and return scores.
269
+
270
+ The baseline uses a simple rule-based strategy: fill missing (median),
271
+ fix types, drop duplicates, drop outliers, then done.
272
+ """
273
+ from baseline import run_baseline_internal
274
+
275
+ scores = run_baseline_internal(env)
276
+ return {"baseline_scores": scores}
277
+
278
+
279
+ @app.get("/health", summary="Health check")
280
+ async def health():
281
+ """Liveness check -- returns 200 when the server is running."""
282
+ return {"status": "ok"}
data_cleaning_env/server/environment.py ADDED
@@ -0,0 +1,371 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Core environment logic for the Data Cleaning RL Environment.
3
+
4
+ Manages episodes, applies cleaning actions, computes per-step rewards,
5
+ and assembles observations.
6
+ """
7
+
8
+ from __future__ import annotations
9
+
10
+ import sys
11
+ import os
12
+
13
+ _HERE = os.path.dirname(os.path.abspath(__file__))
14
+ _ROOT = os.path.dirname(_HERE)
15
+ if _ROOT not in sys.path:
16
+ sys.path.insert(0, _ROOT)
17
+
18
+ import uuid
19
+ from typing import Any
20
+
21
+ import numpy as np
22
+ import pandas as pd
23
+
24
+ from datasets import load_clean_dataset
25
+ from noise_injector import inject_noise
26
+ from models import (
27
+ ActionType,
28
+ CleaningAction,
29
+ ColumnIssues,
30
+ ColumnStats,
31
+ EpisodeState,
32
+ Observation,
33
+ )
34
+
35
+ MAX_STEPS: dict[str, int] = {
36
+ "easy": 20,
37
+ "medium": 40,
38
+ "hard": 60,
39
+ }
40
+
41
+ REWARD_CLIP = 0.1
42
+
43
+
44
+ class DataCleaningEnvironment:
45
+ """
46
+ Manages multiple concurrent episodes.
47
+
48
+ Episodes are stored in-memory. Each reset() creates a fresh episode.
49
+ """
50
+
51
+ def __init__(self) -> None:
52
+ self.episodes: dict[str, dict[str, Any]] = {}
53
+
54
+ # --------------------
55
+ # Public API
56
+ # ----------------------
57
+
58
+ def reset(self, task: str = "easy") -> tuple[Observation, EpisodeState]:
59
+ """Initialize a new episode and return the initial observation."""
60
+ if task not in MAX_STEPS:
61
+ raise ValueError(
62
+ f"Unknown task '{task}'. Must be one of: {list(MAX_STEPS)}"
63
+ )
64
+
65
+ episode_id = str(uuid.uuid4())
66
+ clean_df, target_col = load_clean_dataset(task)
67
+ dirty_df = inject_noise(clean_df, task)
68
+
69
+ initial_accuracy = self._column_accuracy(dirty_df, clean_df)
70
+
71
+ self.episodes[episode_id] = {
72
+ "task": task,
73
+ "clean_df": clean_df,
74
+ "current_df": dirty_df.copy(),
75
+ "target_col": target_col,
76
+ "step": 0,
77
+ "max_steps": MAX_STEPS[task],
78
+ "done": False,
79
+ "prev_column_accuracy": initial_accuracy,
80
+ }
81
+
82
+ obs = self._make_observation(episode_id, reward=0.0)
83
+ state = EpisodeState(
84
+ episode_id=episode_id,
85
+ task=task,
86
+ step=0,
87
+ max_steps=MAX_STEPS[task],
88
+ )
89
+ return obs, state
90
+
91
+ def step(
92
+ self, episode_id: str, action: CleaningAction
93
+ ) -> tuple[Observation, float, bool]:
94
+ """
95
+ Apply an action to the current episode state.
96
+
97
+ Returns:
98
+ (observation, reward, done)
99
+
100
+ Raises:
101
+ KeyError: if episode_id is unknown.
102
+ ValueError: if the episode is already done.
103
+ """
104
+ ep = self.episodes[episode_id] # raises KeyError if unknown
105
+
106
+ if ep["done"]:
107
+ raise ValueError(
108
+ "Episode already done. Call /reset to start a new episode."
109
+ )
110
+
111
+ reward = 0.0
112
+ df = ep["current_df"]
113
+
114
+ if action.action_type == ActionType.done:
115
+ ep["done"] = True
116
+ reward = 0.0
117
+ else:
118
+ try:
119
+ df = self._apply_action(df, action, ep)
120
+ ep["current_df"] = df
121
+
122
+ new_accuracy = self._column_accuracy(df, ep["clean_df"])
123
+ delta = new_accuracy - ep["prev_column_accuracy"]
124
+ reward = float(np.clip(delta, -REWARD_CLIP, REWARD_CLIP))
125
+ ep["prev_column_accuracy"] = new_accuracy
126
+
127
+ except Exception:
128
+ # Penalize invalid/no-op actions
129
+ reward = -0.05
130
+
131
+ ep["step"] += 1
132
+ if ep["step"] >= ep["max_steps"]:
133
+ ep["done"] = True
134
+ reward = 0.0
135
+
136
+ obs = self._make_observation(episode_id, reward=reward)
137
+ return obs, reward, ep["done"]
138
+
139
+ # --------------
140
+ # Action application
141
+ # ---------------------
142
+
143
+ def _apply_action(
144
+ self, df: pd.DataFrame, action: CleaningAction, ep: dict
145
+ ) -> pd.DataFrame:
146
+ """Apply a cleaning action and return the modified DataFrame."""
147
+ df = df.copy()
148
+ col = action.column
149
+
150
+ if action.action_type == ActionType.fill_missing:
151
+ df = self._fill_missing(df, col, action)
152
+
153
+ elif action.action_type == ActionType.drop_duplicates:
154
+ df = df.drop_duplicates().reset_index(drop=True)
155
+
156
+ elif action.action_type == ActionType.fix_type:
157
+ df = self._fix_type(df, col, action)
158
+
159
+ elif action.action_type == ActionType.normalize:
160
+ df = self._normalize(df, col)
161
+
162
+ elif action.action_type == ActionType.drop_outliers:
163
+ df = self._drop_outliers(df, col, action)
164
+
165
+ elif action.action_type == ActionType.fix_schema_violation:
166
+ df = self._fix_schema_violation(df, col, action, ep)
167
+
168
+ else:
169
+ raise ValueError(f"Unhandled action type: {action.action_type}")
170
+
171
+ return df
172
+
173
+ def _fill_missing(
174
+ self, df: pd.DataFrame, col: str, action: CleaningAction
175
+ ) -> pd.DataFrame:
176
+ if col not in df.columns:
177
+ raise ValueError(f"Column '{col}' not found.")
178
+
179
+ numeric = pd.to_numeric(df[col], errors="coerce")
180
+ strategy = action.strategy.value if action.strategy else "median"
181
+
182
+ if strategy == "mean":
183
+ fill_value = numeric.mean()
184
+ elif strategy == "median":
185
+ fill_value = numeric.median()
186
+ elif strategy == "mode":
187
+ mode_vals = numeric.mode()
188
+ fill_value = mode_vals.iloc[0] if not mode_vals.empty else 0
189
+ elif strategy == "constant":
190
+ fill_value = action.constant_value
191
+ else:
192
+ fill_value = numeric.median()
193
+
194
+ df[col] = numeric.fillna(fill_value)
195
+ return df
196
+
197
+ def _fix_type(
198
+ self, df: pd.DataFrame, col: str, action: CleaningAction
199
+ ) -> pd.DataFrame:
200
+ if col not in df.columns:
201
+ raise ValueError(f"Column '{col}' not found.")
202
+
203
+ dtype = action.dtype.value if action.dtype else "float"
204
+
205
+ if dtype in ("int", "float"):
206
+ coerced = pd.to_numeric(df[col], errors="coerce")
207
+ if dtype == "int":
208
+ df[col] = coerced.astype("Int64")
209
+ else:
210
+ df[col] = coerced.astype("float64")
211
+ else:
212
+ df[col] = df[col].astype(str)
213
+
214
+ return df
215
+
216
+ def _normalize(self, df: pd.DataFrame, col: str) -> pd.DataFrame:
217
+ if col not in df.columns:
218
+ raise ValueError(f"Column '{col}' not found.")
219
+ numeric = pd.to_numeric(df[col], errors="coerce")
220
+ mean = numeric.mean()
221
+ std = numeric.std()
222
+
223
+ if pd.isna(mean) or std == 0 or pd.isna(std):
224
+ return df
225
+
226
+ df[col] = (numeric - mean) / std
227
+ return df
228
+
229
+ def _drop_outliers(
230
+ self, df: pd.DataFrame, col: str, action: CleaningAction
231
+ ) -> pd.DataFrame:
232
+ if col not in df.columns:
233
+ raise ValueError(f"Column '{col}' not found.")
234
+
235
+ numeric = pd.to_numeric(df[col], errors="coerce")
236
+ method = action.method.value if action.method else "iqr"
237
+
238
+ if method == "iqr":
239
+ q1, q3 = numeric.quantile(0.25), numeric.quantile(0.75)
240
+ iqr = q3 - q1
241
+ if iqr == 0:
242
+ return df
243
+ mask = numeric.between(q1 - 1.5 * iqr, q3 + 1.5 * iqr) | numeric.isna()
244
+ else: # zscore
245
+ mean, std = numeric.mean(), numeric.std()
246
+ if std == 0 or pd.isna(std):
247
+ return df
248
+ z = (numeric - mean) / std
249
+ mask = z.abs() < 3
250
+
251
+ return df[mask].reset_index(drop=True)
252
+
253
+ def _fix_schema_violation(
254
+ self, df: pd.DataFrame, col: str, action: CleaningAction, ep: dict
255
+ ) -> pd.DataFrame:
256
+ if col not in df.columns:
257
+ raise ValueError(f"Column '{col}' not found.")
258
+
259
+ numeric = pd.to_numeric(df[col], errors="coerce")
260
+ constraint = action.constraint.value if action.constraint else "non_negative"
261
+
262
+ if constraint == "non_negative":
263
+ df[col] = numeric.clip(lower=0)
264
+ elif constraint == "clamp_range":
265
+ clean_col = pd.to_numeric(ep["clean_df"][col], errors="coerce")
266
+ lo, hi = clean_col.quantile(0.05), clean_col.quantile(0.95)
267
+ df[col] = numeric.clip(lo, hi)
268
+
269
+ return df
270
+
271
+ # ------------------
272
+ # Reward computation
273
+ # ---------------------------
274
+
275
+ def _column_accuracy(
276
+ self, current_df: pd.DataFrame, clean_df: pd.DataFrame
277
+ ) -> float:
278
+ """
279
+ Mean fraction of values matching clean ground truth, averaged across columns.
280
+ """
281
+ scores: list[float] = []
282
+ common_cols = [c for c in clean_df.columns if c in current_df.columns]
283
+ n_rows = min(len(current_df), len(clean_df))
284
+
285
+ for col in common_cols:
286
+ cur = current_df[col].iloc[:n_rows].reset_index(drop=True)
287
+ cln = clean_df[col].iloc[:n_rows].reset_index(drop=True)
288
+
289
+ cur_num = pd.to_numeric(cur, errors="coerce")
290
+ cln_num = pd.to_numeric(cln, errors="coerce")
291
+
292
+ if cln_num.notna().mean() > 0.9:
293
+ both_valid = cur_num.notna() & cln_num.notna()
294
+ if both_valid.sum() == 0:
295
+ scores.append(0.0)
296
+ continue
297
+ match = (cur_num - cln_num).abs() < 1e-4
298
+ scores.append(float(match.mean()))
299
+ else:
300
+ scores.append(float((cur.astype(str) == cln.astype(str)).mean()))
301
+
302
+ return float(np.mean(scores)) if scores else 0.0
303
+
304
+ # ----------------------
305
+ # Observation assembly
306
+ # ---------------
307
+
308
+ def _make_observation(self, episode_id: str, reward: float) -> Observation:
309
+ ep = self.episodes[episode_id]
310
+ df = ep["current_df"]
311
+ clean_df = ep["clean_df"]
312
+
313
+ column_issues: dict[str, ColumnIssues] = {}
314
+ column_stats: dict[str, ColumnStats] = {}
315
+ n_dups = int(df.duplicated().sum())
316
+
317
+ for col in clean_df.columns:
318
+ if col not in df.columns:
319
+ continue
320
+
321
+ col_data = df[col]
322
+ numeric = pd.to_numeric(col_data, errors="coerce")
323
+ clean_numeric = pd.to_numeric(clean_df[col], errors="coerce")
324
+
325
+ # Type errors: cells non-null in series but unparseable as numeric
326
+ type_errs = 0
327
+ if clean_numeric.notna().mean() > 0.9:
328
+ raw_nulls = col_data.isna().sum()
329
+ numeric_nulls = numeric.isna().sum()
330
+ type_errs = max(0, int(numeric_nulls - raw_nulls))
331
+
332
+ # Outlier count via IQR
333
+ outlier_count = 0
334
+ if numeric.notna().sum() > 4:
335
+ q1, q3 = numeric.quantile(0.25), numeric.quantile(0.75)
336
+ iqr = q3 - q1
337
+ if iqr > 0:
338
+ outlier_count = int(
339
+ ((numeric < q1 - 1.5 * iqr) | (numeric > q3 + 1.5 * iqr)).sum()
340
+ )
341
+
342
+ column_issues[col] = ColumnIssues(
343
+ missing_count=int(col_data.isna().sum()),
344
+ missing_pct=round(float(col_data.isna().mean()), 4),
345
+ type_errors=type_errs,
346
+ outlier_count=outlier_count,
347
+ has_duplicates=n_dups > 0,
348
+ )
349
+ column_stats[col] = ColumnStats(
350
+ mean=(
351
+ round(float(numeric.mean()), 4) if numeric.notna().any() else None
352
+ ),
353
+ std=(
354
+ round(float(numeric.std()), 4)
355
+ if numeric.notna().sum() > 1
356
+ else None
357
+ ),
358
+ null_count=int(col_data.isna().sum()),
359
+ unique_count=int(col_data.nunique()),
360
+ )
361
+
362
+ return Observation(
363
+ task=ep["task"],
364
+ step=ep["step"],
365
+ max_steps=ep["max_steps"],
366
+ columns=list(clean_df.columns),
367
+ column_issues=column_issues,
368
+ column_stats=column_stats,
369
+ reward=reward,
370
+ done=ep["done"],
371
+ )
data_cleaning_env/server/requirements.txt ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ fastapi>=0.115
2
+ uvicorn[standard]>=0.30
3
+ pydantic>=2.0
4
+ scikit-learn>=1.4
5
+ pandas>=2.0
6
+ numpy>=1.26
7
+ requests>=2.31
8
+ httpx>=0.27
docs/brainstorms/2026-03-27-data-cleaning-env-requirements.md ADDED
@@ -0,0 +1,143 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ---
2
+ date: 2026-03-27
3
+ topic: data-cleaning-openenv
4
+ ---
5
+
6
+ # Data Cleaning RL Environment (OpenEnv Hackathon Round 1)
7
+
8
+ ## Problem Frame
9
+
10
+ AI agents have no standardized way to practice and be evaluated on real-world data cleaning tasks.
11
+ This environment gives an RL agent a dirty tabular dataset and lets it iteratively apply structured
12
+ cleaning actions, rewarded by how much it improves data quality β€” measured both incrementally
13
+ (column-level accuracy per step) and holistically (downstream ML model accuracy at episode end).
14
+
15
+ Target users: RL researchers and LLM training practitioners who want to train/evaluate agents on
16
+ realistic, verifiable data engineering tasks.
17
+
18
+ Competition constraint: must comply fully with the OpenEnv spec and be deployed to Hugging Face Spaces by 7 Apr 11:59 PM.
19
+
20
+ ---
21
+
22
+ ## Requirements
23
+
24
+ - **R1.** The environment simulates a tabular data cleaning task: the agent receives a dirty dataset
25
+ as its observation and applies typed cleaning commands until the episode ends or a max-step limit
26
+ is reached.
27
+
28
+ - **R2.** The action space consists of structured, typed commands:
29
+ - `fill_missing(column, strategy)` β€” strategy in {mean, median, mode, constant}
30
+ - `drop_duplicates()`
31
+ - `fix_type(column, dtype)` β€” dtype in {int, float, str, datetime}
32
+ - `normalize(column)` β€” z-score normalization
33
+ - `drop_outliers(column, method)` β€” method in {iqr, zscore}
34
+ - `fix_schema_violation(column, constraint)` β€” e.g., clamp to valid range
35
+ - `done()` β€” signal episode completion
36
+
37
+ - **R3.** The observation at each step contains:
38
+ - Current dirty dataset (serialized as JSON records or column stats summary)
39
+ - List of detected issues per column (missing count, type errors, duplicate count, outlier count)
40
+ - Current step number and remaining steps
41
+
42
+ - **R4.** Three task tiers, each using a different OpenML dataset with injected noise:
43
+ - **Task 1 (Easy):** Missing values only. OpenML dataset: `iris` or `wine`. Score: 0.0–1.0.
44
+ - **Task 2 (Medium):** Missing values + type errors + duplicate rows. OpenML dataset: `adult` (income). Score: 0.0–1.0.
45
+ - **Task 3 (Hard):** All of the above + outliers + schema violations (value range constraints). OpenML dataset: `credit-g` or `diabetes`. Score: 0.0–1.0.
46
+
47
+ - **R5.** Reward function:
48
+ - **Per-step reward:** column-level accuracy delta against ground-truth clean dataset. Computed as
49
+ mean across columns of (correct values / total values). Range: [-0.1, +0.1] per step to
50
+ discourage harmful actions.
51
+ - **Episode reward (grader score):** train a simple sklearn classifier/regressor on the cleaned
52
+ dataset; score = accuracy on a held-out test split. Normalized to [0.0, 1.0] relative to
53
+ baseline dirty-data accuracy and oracle clean-data accuracy.
54
+
55
+ - **R6.** Ground truth: each dirty dataset has a corresponding clean version (the original OpenML
56
+ dataset before noise injection). The noise injection script is deterministic (seeded RNG) and
57
+ reproducible.
58
+
59
+ - **R7.** The environment server exposes the full OpenEnv spec:
60
+ - `POST /reset` β€” initialize episode, return initial observation
61
+ - `POST /step` β€” apply action, return (observation, reward, done, info)
62
+ - `GET /state` β€” return episode metadata (step count, episode id, task name)
63
+ - `GET /tasks` β€” return list of tasks and action schema
64
+ - `POST /grader` β€” run grader on completed episode, return score
65
+ - `POST /baseline` β€” run baseline inference script, return scores for all 3 tasks
66
+
67
+ - **R8.** A baseline inference script (`baseline.py`) uses a simple heuristic agent (e.g., always
68
+ fill missing with median, drop duplicates, then done) as the reproducible baseline. Must complete
69
+ without error and produce deterministic scores.
70
+
71
+ - **R9.** Deployment: working Dockerfile, deployed to Hugging Face Spaces, `openenv.yaml` manifest.
72
+
73
+ - **R10.** README covers environment description, action/observation schema, setup instructions,
74
+ baseline scores.
75
+
76
+ ---
77
+
78
+ ## Success Criteria
79
+
80
+ - HF Space returns HTTP 200 and responds to `reset()` (automated ping passes)
81
+ - `openenv.yaml` validates against the OpenEnv spec validator
82
+ - Docker image builds without error
83
+ - Baseline script completes and produces scores for all 3 tasks
84
+ - All 3 graders return scores strictly in [0.0, 1.0]
85
+ - Grader scores are meaningfully differentiated across task tiers (harder = lower baseline score)
86
+ - Episode reward correlates with actual data quality improvement (sanity test)
87
+
88
+ ---
89
+
90
+ ## Scope Boundaries
91
+
92
+ - Not building a general-purpose data cleaning pipeline or production tool β€” this is a training environment only
93
+ - No natural-language action space; actions are typed/structured only
94
+ - No multi-table / relational joins in v1 (single DataFrame per episode)
95
+ - No real external database connections; datasets are loaded from OpenML at environment startup and cached
96
+ - No UI or visualization β€” headless server only
97
+ - Downstream ML model is simple (sklearn LogisticRegression or RandomForest, not a deep model) to keep grading fast
98
+
99
+ ---
100
+
101
+ ## Key Decisions
102
+
103
+ - **OpenML datasets:** Widely known, programmatically accessible via `openml` Python package, no API key needed for public datasets. Clean ground truth = original dataset; dirty = noise-injected copy.
104
+ - **Noise injection is deterministic (seeded):** Ensures reproducibility for judges running the baseline script.
105
+ - **Hybrid reward (per-step + episode):** Gives the agent dense learning signal during traing while reporting a realistic business metric (ML accuracy) as the final grader score.
106
+ - **Structured action space:** Keeps grading deterministic and makes the `/tasks` endpoint action schema self-documenting with no ambiguity.
107
+ - **Simple sklearn baseline model:** Balances realism of the metric with fast grading latency (< 2s per episode grading call).
108
+
109
+ ---
110
+
111
+ ## Dependencies / Assumptions
112
+
113
+ - OpenML Python package (`openml`) is pip-installable and works inside Docker without external auth for public datasets
114
+ - Chosen OpenML datasets (iris, adult, credit-g / diabetes) are stable and publicly available
115
+ - sklearn is available for baseline model training in the grading step
116
+ - OpenEnv CLI (`openenv`) is available for scaffolding and HF Space deployment
117
+ - Hackathon submission window: 28 Mar – 7 Apr 2026
118
+
119
+ ---
120
+
121
+ ## Outstanding Questions
122
+
123
+ ### Resolve Before Planning
124
+ - None β€” all product decisions are resolved.
125
+
126
+ ### Deferred to Planning
127
+ - [Affects R5][Needs research] Exact normalization formula for episode reward: how to compute the
128
+ oracle clean-data accuracy upper bound (run on unmodified OpenML data) and dirty-data lower bound
129
+ (run on fully noise-injected data) to bracket the [0.0, 1.0] range correctly.
130
+ - [Affects R7][Technical] Confirm `/grader` endpoint contract: does it accept episode history or
131
+ just the final cleaned dataset? Check OpenEnv spec RFC for grader interface.
132
+ - [Affects R9][Technical] Confirm HF Spaces Dockerfile constraints (CPU-only, memory limits) to
133
+ ensure sklearn training fits within free-tier limits.
134
+ - [Affects R4][Needs research] Verify OpenML dataset IDs and that noise injection levels produce
135
+ meaningfully different difficulty (e.g., 10% missing for easy, 30% + type errors for medium).
136
+
137
+ ---
138
+
139
+ ## Next Steps
140
+
141
+ All blocking questions resolved.
142
+
143
+ β†’ `/ce:plan` for structured implementation planning
docs/plans/2026-03-27-001-feat-data-cleaning-openenv-environment-plan.md ADDED
@@ -0,0 +1,955 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ---
2
+ title: "feat: Data Cleaning RL Environment (OpenEnv Hackathon Round 1)"
3
+ type: feat
4
+ status: active
5
+ date: 2026-03-27
6
+ origin: docs/brainstorms/2026-03-27-data-cleaning-env-requirements.md
7
+ ---
8
+
9
+ # feat: Data Cleaning RL Environment (OpenEnv Hackathon Round 1)
10
+
11
+ ## Overview
12
+
13
+ Build a fully compliant OpenEnv RL environment where an AI agent cleans tabular datasets by
14
+ issuing structured commands. The environment uses OpenML public datasets with synthetically
15
+ injected noise across three difficulty tiers. The grader rewards the agent based on downstream
16
+ ML model accuracy improvement. Deployed to Hugging Face Spaces with Docker, passing all
17
+ pre-submission validation checks.
18
+
19
+ **Deadline:** 7 Apr 2026, 11:59 PM
20
+ **Submission window opens:** 28 Mar 2026
21
+
22
+ ---
23
+
24
+ ## Problem Statement / Motivation
25
+
26
+ There is no standardized RL environment for tabular data cleaning, despite it being one of the
27
+ most time-consuming real-world tasks for data engineers. This environment gives agents a
28
+ reproducible, objectively scorable task with dense intermediate rewards and a realistic final
29
+ metric (downstream ML accuracy). It is both hackathon-viable and genuinely novel as a
30
+ community environment.
31
+
32
+ (see origin: `docs/brainstorms/2026-03-27-data-cleaning-env-requirements.md`)
33
+
34
+ ---
35
+
36
+ ## Architecture
37
+
38
+ ```
39
+ data_cleaning_env/
40
+ β”œβ”€β”€ __init__.py # Export Action, Observation, DataCleaningEnv
41
+ β”œβ”€β”€ models.py # Pydantic: CleaningAction, Observation, EpisodeState
42
+ β”œβ”€β”€ client.py # DataCleaningEnv(EnvClient) β€” async + sync API
43
+ β”œβ”€β”€ noise_injector.py # Deterministic noise injection (seeded RNG)
44
+ β”œβ”€β”€ grader.py # Episode grader β€” trains sklearn model, returns 0.0–1.0
45
+ β”œβ”€β”€ datasets.py # Load & cache OpenML datasets at startup
46
+ β”œβ”€β”€ baseline.py # Heuristic baseline agent script
47
+ β”œβ”€β”€ openenv.yaml # OpenEnv manifest
48
+ β”œβ”€β”€ README.md # Environment docs
49
+ β”œβ”€β”€ pyproject.toml # Package metadata + dependencies
50
+ └── server/
51
+ β”œβ”€β”€ environment.py # DataCleaningEnvironment(Environment) β€” core logic
52
+ β”œβ”€β”€ app.py # FastAPI app: /reset /step /state /tasks /grader /baseline
53
+ β”œβ”€β”€ requirements.txt # Docker dependencies
54
+ └── Dockerfile # Container image
55
+ ```
56
+
57
+ ---
58
+
59
+ ## Proposed Solution
60
+
61
+ ### Phase 1: Environment Scaffold & Data Layer
62
+
63
+ **Scaffold the OpenEnv project structure** using `openenv init data_cleaning_env`, then build
64
+ the data layer.
65
+
66
+ #### 1.1 Dataset Loading (`datasets.py`)
67
+
68
+ Use `sklearn.datasets.fetch_openml` (no extra package needed β€” sklearn is already a dependency):
69
+
70
+ ```python
71
+ # datasets.py
72
+ from sklearn.datasets import fetch_openml
73
+ import pandas as pd
74
+
75
+ TASK_CONFIGS = {
76
+ "easy": {"data_id": 61, "name": "iris", "target": "class"},
77
+ "medium": {"data_id": 1590, "name": "adult", "target": "class"},
78
+ "hard": {"data_id": 31, "name": "credit-g", "target": "class"},
79
+ }
80
+
81
+ def load_clean_dataset(task: str) -> tuple[pd.DataFrame, str]:
82
+ """Returns (clean_df, target_col). Cached after first load."""
83
+ cfg = TASK_CONFIGS[task]
84
+ dataset = fetch_openml(data_id=cfg["data_id"], as_frame=True, cache=True)
85
+ df = dataset.frame.copy()
86
+ return df, cfg["target"]
87
+ ```
88
+
89
+ **Dataset IDs confirmed:**
90
+ - iris β†’ OpenML ID 61 (150 rows, 4 numeric features, 3-class)
91
+ - adult β†’ OpenML ID 1590 (48,842 rows, 15 features, binary class) β€” use a 2,000-row sample for speed
92
+ - credit-g β†’ OpenML ID 31 (1,000 rows, 20 features, binary class)
93
+
94
+ #### 1.2 Noise Injector (`noise_injector.py`)
95
+
96
+ Deterministic noise injection with seeded RNG ensures judges reproduce the same dirty dataset:
97
+
98
+ ```python
99
+ # noise_injector.py
100
+ import numpy as np
101
+ import pandas as pd
102
+
103
+ def inject_noise(df: pd.DataFrame, task: str, seed: int = 42) -> pd.DataFrame:
104
+ rng = np.random.default_rng(seed)
105
+ dirty = df.copy()
106
+
107
+ if task == "easy":
108
+ # 15% missing values in numeric columns only
109
+ for col in dirty.select_dtypes(include="number").columns:
110
+ mask = rng.random(len(dirty)) < 0.15
111
+ dirty.loc[mask, col] = np.nan
112
+
113
+ elif task == "medium":
114
+ # 20% missing values (all columns)
115
+ for col in dirty.columns:
116
+ mask = rng.random(len(dirty)) < 0.20
117
+ dirty.loc[mask, col] = np.nan
118
+ # 5% type corruption: convert numeric to string "err_<value>"
119
+ for col in dirty.select_dtypes(include="number").columns[:2]:
120
+ mask = rng.random(len(dirty)) < 0.05
121
+ dirty.loc[mask, col] = dirty.loc[mask, col].apply(
122
+ lambda x: f"err_{x}" if pd.notna(x) else x
123
+ )
124
+ # Inject 3% duplicate rows
125
+ n_dups = int(len(dirty) * 0.03)
126
+ dup_rows = dirty.sample(n=n_dups, random_state=seed)
127
+ dirty = pd.concat([dirty, dup_rows], ignore_index=True)
128
+
129
+ elif task == "hard":
130
+ # All medium noise plus:
131
+ # 25% missing values
132
+ for col in dirty.columns:
133
+ mask = rng.random(len(dirty)) < 0.25
134
+ dirty.loc[mask, col] = np.nan
135
+ # Type errors in 3 columns
136
+ for col in dirty.select_dtypes(include="number").columns[:3]:
137
+ mask = rng.random(len(dirty)) < 0.07
138
+ dirty.loc[mask, col] = dirty.loc[mask, col].apply(
139
+ lambda x: f"err_{x}" if pd.notna(x) else x
140
+ )
141
+ # Outliers: set 5% of numeric values to 10x their max
142
+ for col in dirty.select_dtypes(include="number").columns:
143
+ col_max = dirty[col].max()
144
+ if pd.notna(col_max):
145
+ mask = rng.random(len(dirty)) < 0.05
146
+ dirty.loc[mask, col] = col_max * 10
147
+ # Duplicate rows 5%
148
+ n_dups = int(len(dirty) * 0.05)
149
+ dup_rows = dirty.sample(n=n_dups, random_state=seed)
150
+ dirty = pd.concat([dirty, dup_rows], ignore_index=True)
151
+ # Schema violation: negative values in strictly positive columns
152
+ pos_cols = dirty.select_dtypes(include="number").columns[:2]
153
+ for col in pos_cols:
154
+ mask = rng.random(len(dirty)) < 0.05
155
+ dirty.loc[mask, col] = dirty.loc[mask, col].abs() * -1
156
+
157
+ return dirty
158
+ ```
159
+
160
+ ---
161
+
162
+ ### Phase 2: Models & Action Space (`models.py`)
163
+
164
+ Use Pydantic (already required by OpenEnv FastAPI stack):
165
+
166
+ ```python
167
+ # models.py
168
+ from pydantic import BaseModel
169
+ from typing import Literal, Optional, List, Dict, Any
170
+ from enum import Enum
171
+
172
+ class FillStrategy(str, Enum):
173
+ mean = "mean"
174
+ median = "median"
175
+ mode = "mode"
176
+ constant = "constant"
177
+
178
+ class OutlierMethod(str, Enum):
179
+ iqr = "iqr"
180
+ zscore = "zscore"
181
+
182
+ class ActionType(str, Enum):
183
+ fill_missing = "fill_missing"
184
+ drop_duplicates = "drop_duplicates"
185
+ fix_type = "fix_type"
186
+ normalize = "normalize"
187
+ drop_outliers = "drop_outliers"
188
+ fix_schema_violation = "fix_schema_violation"
189
+ done = "done"
190
+
191
+ class CleaningAction(BaseModel):
192
+ action_type: ActionType
193
+ column: Optional[str] = None # required for column-level actions
194
+ strategy: Optional[FillStrategy] = None # for fill_missing
195
+ dtype: Optional[str] = None # for fix_type: "int", "float", "str"
196
+ method: Optional[OutlierMethod] = None # for drop_outliers
197
+ constraint: Optional[str] = None # for fix_schema_violation: e.g. "non_negative"
198
+ constant_value: Optional[Any] = None # for fill_missing with strategy=constant
199
+
200
+ class ColumnIssues(BaseModel):
201
+ missing_count: int
202
+ missing_pct: float
203
+ type_errors: int
204
+ outlier_count: int
205
+ has_duplicates: bool # dataset-level, repeated per column for convenience
206
+
207
+ class Observation(BaseModel):
208
+ task: str
209
+ step: int
210
+ max_steps: int
211
+ columns: List[str]
212
+ column_issues: Dict[str, ColumnIssues]
213
+ # Compact dataset representation: column stats (not full data β€” avoids huge payloads)
214
+ column_stats: Dict[str, Dict[str, Any]] # {"col": {"mean": .., "null_count": ..}}
215
+ reward: float # per-step reward from last action
216
+ done: bool
217
+
218
+ class EpisodeState(BaseModel):
219
+ episode_id: str
220
+ task: str
221
+ step: int
222
+ max_steps: int
223
+ score: Optional[float] = None # only set after grader runs
224
+ ```
225
+
226
+ **Action schema for `/tasks` endpoint:**
227
+
228
+ ```json
229
+ {
230
+ "tasks": ["easy", "medium", "hard"],
231
+ "action_schema": {
232
+ "action_type": "string (required): fill_missing | drop_duplicates | fix_type | normalize | drop_outliers | fix_schema_violation | done",
233
+ "column": "string (optional): target column name",
234
+ "strategy": "string (optional for fill_missing): mean | median | mode | constant",
235
+ "dtype": "string (optional for fix_type): int | float | str",
236
+ "method": "string (optional for drop_outliers): iqr | zscore",
237
+ "constraint": "string (optional for fix_schema_violation): non_negative | clamp_range",
238
+ "constant_value": "any (optional for fill_missing with strategy=constant)"
239
+ }
240
+ ```
241
+
242
+ ---
243
+
244
+ ### Phase 3: Core Environment Logic (`server/environment.py`)
245
+
246
+ ```python
247
+ # server/environment.py
248
+ import uuid
249
+ import pandas as pd
250
+ import numpy as np
251
+ from datasets import load_clean_dataset
252
+ from noise_injector import inject_noise
253
+ from models import CleaningAction, Observation, EpisodeState, ActionType
254
+
255
+ MAX_STEPS = {"easy": 20, "medium": 40, "hard": 60}
256
+
257
+ class DataCleaningEnvironment:
258
+ def __init__(self):
259
+ self.episodes: dict[str, dict] = {} # episode_id -> state dict
260
+
261
+ def reset(self, task: str = "easy") -> tuple[Observation, EpisodeState]:
262
+ episode_id = str(uuid.uuid4())
263
+ clean_df, target_col = load_clean_dataset(task)
264
+ dirty_df = inject_noise(clean_df, task)
265
+
266
+ self.episodes[episode_id] = {
267
+ "task": task,
268
+ "clean_df": clean_df,
269
+ "current_df": dirty_df.copy(),
270
+ "target_col": target_col,
271
+ "step": 0,
272
+ "max_steps": MAX_STEPS[task],
273
+ "done": False,
274
+ "prev_column_accuracy": self._column_accuracy(dirty_df, clean_df),
275
+ }
276
+
277
+ obs = self._make_observation(episode_id, reward=0.0)
278
+ state = EpisodeState(
279
+ episode_id=episode_id,
280
+ task=task,
281
+ step=0,
282
+ max_steps=MAX_STEPS[task],
283
+ )
284
+ return obs, state
285
+
286
+ def step(self, episode_id: str, action: CleaningAction) -> tuple[Observation, float, bool]:
287
+ ep = self.episodes[episode_id]
288
+ if ep["done"]:
289
+ raise ValueError("Episode already done. Call reset().")
290
+
291
+ df = ep["current_df"]
292
+ reward = 0.0
293
+
294
+ try:
295
+ df = self._apply_action(df, action, ep)
296
+ ep["current_df"] = df
297
+ except Exception as e:
298
+ reward = -0.05 # penalize invalid actions
299
+
300
+ ep["step"] += 1
301
+
302
+ if action.action_type == ActionType.done or ep["step"] >= ep["max_steps"]:
303
+ ep["done"] = True
304
+ reward = 0.0 # terminal step reward is 0; grader score is the final signal
305
+ else:
306
+ new_accuracy = self._column_accuracy(df, ep["clean_df"])
307
+ reward = float(np.clip(new_accuracy - ep["prev_column_accuracy"], -0.1, 0.1))
308
+ ep["prev_column_accuracy"] = new_accuracy
309
+
310
+ obs = self._make_observation(episode_id, reward=reward)
311
+ return obs, reward, ep["done"]
312
+
313
+ def _apply_action(self, df: pd.DataFrame, action: CleaningAction, ep: dict) -> pd.DataFrame:
314
+ col = action.column
315
+
316
+ if action.action_type == ActionType.fill_missing:
317
+ if col not in df.columns:
318
+ raise ValueError(f"Column {col} not found")
319
+ # Coerce column to numeric first if possible, then fill
320
+ df[col] = pd.to_numeric(df[col], errors="coerce")
321
+ strategy = action.strategy.value
322
+ if strategy == "mean":
323
+ df[col].fillna(df[col].mean(), inplace=True)
324
+ elif strategy == "median":
325
+ df[col].fillna(df[col].median(), inplace=True)
326
+ elif strategy == "mode":
327
+ df[col].fillna(df[col].mode().iloc[0], inplace=True)
328
+ elif strategy == "constant":
329
+ df[col].fillna(action.constant_value, inplace=True)
330
+
331
+ elif action.action_type == ActionType.drop_duplicates:
332
+ df = df.drop_duplicates().reset_index(drop=True)
333
+
334
+ elif action.action_type == ActionType.fix_type:
335
+ if col not in df.columns:
336
+ raise ValueError(f"Column {col} not found")
337
+ dtype_map = {"int": "Int64", "float": "float64", "str": "str"}
338
+ df[col] = pd.to_numeric(df[col], errors="coerce").astype(dtype_map.get(action.dtype, "str"))
339
+
340
+ elif action.action_type == ActionType.normalize:
341
+ if col not in df.columns:
342
+ raise ValueError(f"Column {col} not found")
343
+ df[col] = pd.to_numeric(df[col], errors="coerce")
344
+ mean, std = df[col].mean(), df[col].std()
345
+ if std > 0:
346
+ df[col] = (df[col] - mean) / std
347
+
348
+ elif action.action_type == ActionType.drop_outliers:
349
+ if col not in df.columns:
350
+ raise ValueError(f"Column {col} not found")
351
+ numeric_col = pd.to_numeric(df[col], errors="coerce")
352
+ if action.method.value == "iqr":
353
+ q1, q3 = numeric_col.quantile(0.25), numeric_col.quantile(0.75)
354
+ iqr = q3 - q1
355
+ mask = numeric_col.between(q1 - 1.5 * iqr, q3 + 1.5 * iqr)
356
+ else: # zscore
357
+ z = (numeric_col - numeric_col.mean()) / numeric_col.std()
358
+ mask = z.abs() < 3
359
+ df = df[mask].reset_index(drop=True)
360
+
361
+ elif action.action_type == ActionType.fix_schema_violation:
362
+ if col not in df.columns:
363
+ raise ValueError(f"Column {col} not found")
364
+ numeric_col = pd.to_numeric(df[col], errors="coerce")
365
+ if action.constraint == "non_negative":
366
+ df[col] = numeric_col.clip(lower=0)
367
+ elif action.constraint == "clamp_range":
368
+ # Clamp to [p5, p95] of the clean dataset for that column
369
+ clean_col = pd.to_numeric(ep["clean_df"][col], errors="coerce")
370
+ df[col] = numeric_col.clip(clean_col.quantile(0.05), clean_col.quantile(0.95))
371
+
372
+ return df
373
+
374
+ def _column_accuracy(self, current_df: pd.DataFrame, clean_df: pd.DataFrame) -> float:
375
+ """Mean fraction of values matching clean ground truth, per column."""
376
+ scores = []
377
+ # Align by index and columns
378
+ common_cols = [c for c in clean_df.columns if c in current_df.columns]
379
+ n_rows = min(len(current_df), len(clean_df))
380
+ for col in common_cols:
381
+ cur = current_df[col].iloc[:n_rows].reset_index(drop=True)
382
+ cln = clean_df[col].iloc[:n_rows].reset_index(drop=True)
383
+ try:
384
+ cur_num = pd.to_numeric(cur, errors="coerce")
385
+ cln_num = pd.to_numeric(cln, errors="coerce")
386
+ match = (cur_num - cln_num).abs() < 1e-6
387
+ scores.append(match.mean())
388
+ except Exception:
389
+ scores.append((cur == cln).mean())
390
+ return float(np.mean(scores)) if scores else 0.0
391
+
392
+ def _make_observation(self, episode_id: str, reward: float) -> Observation:
393
+ ep = self.episodes[episode_id]
394
+ df = ep["current_df"]
395
+ clean_df = ep["clean_df"]
396
+
397
+ column_issues = {}
398
+ column_stats = {}
399
+ n_dups = df.duplicated().sum()
400
+
401
+ for col in clean_df.columns:
402
+ if col not in df.columns:
403
+ continue
404
+ col_data = df[col]
405
+ numeric = pd.to_numeric(col_data, errors="coerce")
406
+ # count type errors: values that can't parse as numeric when clean column is numeric
407
+ clean_numeric = pd.to_numeric(clean_df[col], errors="coerce")
408
+ type_errs = 0
409
+ if clean_numeric.notna().mean() > 0.9: # column is mostly numeric
410
+ type_errs = int(numeric.isna().sum() - col_data.isna().sum())
411
+ type_errs = max(0, type_errs)
412
+
413
+ # outlier count via IQR
414
+ outlier_count = 0
415
+ if numeric.notna().sum() > 4:
416
+ q1, q3 = numeric.quantile(0.25), numeric.quantile(0.75)
417
+ iqr = q3 - q1
418
+ if iqr > 0:
419
+ outlier_count = int(((numeric < q1 - 1.5*iqr) | (numeric > q3 + 1.5*iqr)).sum())
420
+
421
+ column_issues[col] = {
422
+ "missing_count": int(col_data.isna().sum()),
423
+ "missing_pct": round(float(col_data.isna().mean()), 3),
424
+ "type_errors": type_errs,
425
+ "outlier_count": outlier_count,
426
+ "has_duplicates": bool(n_dups > 0),
427
+ }
428
+ column_stats[col] = {
429
+ "mean": round(float(numeric.mean()), 4) if numeric.notna().any() else None,
430
+ "std": round(float(numeric.std()), 4) if numeric.notna().any() else None,
431
+ "null_count": int(col_data.isna().sum()),
432
+ "unique_count": int(col_data.nunique()),
433
+ }
434
+
435
+ return Observation(
436
+ task=ep["task"],
437
+ step=ep["step"],
438
+ max_steps=ep["max_steps"],
439
+ columns=list(clean_df.columns),
440
+ column_issues=column_issues,
441
+ column_stats=column_stats,
442
+ reward=reward,
443
+ done=ep["done"],
444
+ )
445
+ ```
446
+
447
+ ---
448
+
449
+ ### Phase 4: Grader (`grader.py`)
450
+
451
+ The grader trains a simple sklearn model on the cleaned dataset and scores using the bracketed
452
+ normalization formula: `(agent_acc - dirty_acc) / (oracle_acc - dirty_acc)`.
453
+
454
+ The oracle and dirty baselines are precomputed at server startup (deterministic, cached).
455
+
456
+ ```python
457
+ # grader.py
458
+ import numpy as np
459
+ import pandas as pd
460
+ from sklearn.ensemble import RandomForestClassifier
461
+ from sklearn.model_selection import train_test_split
462
+ from sklearn.preprocessing import LabelEncoder
463
+ from sklearn.pipeline import Pipeline
464
+ from sklearn.impute import SimpleImputer
465
+ from sklearn.preprocessing import StandardScaler
466
+ from sklearn.compose import ColumnTransformer
467
+ from sklearn.pipeline import Pipeline as SKPipeline
468
+ # Precomputed at startup, keyed by task
469
+ _ORACLE_SCORES: dict[str, float] = {}
470
+ _DIRTY_SCORES: dict[str, float] = {}
471
+
472
+ def train_and_score(df: pd.DataFrame, target_col: str) -> float:
473
+ """Train RandomForest on df, return test accuracy. Returns 0.0 on failure."""
474
+ try:
475
+ X = df.drop(columns=[target_col])
476
+ y = df[target_col].astype(str)
477
+
478
+ # Encode target
479
+ le = LabelEncoder()
480
+ y_enc = le.fit_transform(y.fillna("missing"))
481
+
482
+ # Numeric pipeline: impute + scale
483
+ num_cols = X.select_dtypes(include="number").columns.tolist()
484
+ cat_cols = X.select_dtypes(exclude="number").columns.tolist()
485
+
486
+ transformers = []
487
+ if num_cols:
488
+ transformers.append(("num", Pipeline([
489
+ ("impute", SimpleImputer(strategy="median")),
490
+ ("scale", StandardScaler()),
491
+ ]), num_cols))
492
+ if cat_cols:
493
+ transformers.append(("cat", SimpleImputer(strategy="most_frequent"), cat_cols))
494
+
495
+ preprocessor = ColumnTransformer(transformers, remainder="drop")
496
+ clf = SKPipeline([
497
+ ("pre", preprocessor),
498
+ ("clf", RandomForestClassifier(n_estimators=50, random_state=42, n_jobs=-1)),
499
+ ])
500
+
501
+ X_train, X_test, y_train, y_test = train_test_split(
502
+ X, y_enc, test_size=0.2, random_state=42, stratify=y_enc
503
+ )
504
+ clf.fit(X_train, y_train)
505
+ return float(clf.score(X_test, y_test))
506
+ except Exception:
507
+ return 0.0
508
+
509
+ def compute_oracle_and_dirty_baselines(task: str, clean_df: pd.DataFrame,
510
+ dirty_df: pd.DataFrame, target_col: str):
511
+ """Call once at startup."""
512
+ _ORACLE_SCORES[task] = train_and_score(clean_df, target_col)
513
+ _DIRTY_SCORES[task] = train_and_score(dirty_df, target_col)
514
+
515
+ def grade_episode(episode_cleaned_df: pd.DataFrame, task: str, target_col: str) -> float:
516
+ """Returns score in [0.0, 1.0]."""
517
+ agent_acc = train_and_score(episode_cleaned_df, target_col)
518
+ oracle = _ORACLE_SCORES.get(task, 1.0)
519
+ dirty = _DIRTY_SCORES.get(task, 0.0)
520
+ if oracle <= dirty:
521
+ return float(agent_acc >= oracle)
522
+ score = (agent_acc - dirty) / (oracle - dirty)
523
+ return float(np.clip(score, 0.0, 1.0))
524
+ ```
525
+
526
+ ---
527
+
528
+ ### Phase 5: FastAPI Server (`server/app.py`)
529
+
530
+ Implements all required endpoints: OpenEnv standard + hackathon-specific.
531
+
532
+ ```python
533
+ # server/app.py
534
+ from fastapi import FastAPI, HTTPException
535
+ from pydantic import BaseModel
536
+ from typing import Optional
537
+ from server.environment import DataCleaningEnvironment
538
+ from grader import grade_episode
539
+ from baseline import run_baseline
540
+ from models import CleaningAction
541
+
542
+ app = FastAPI(title="Data Cleaning OpenEnv", version="1.0.0")
543
+ env = DataCleaningEnvironment()
544
+
545
+ class ResetRequest(BaseModel):
546
+ task: Optional[str] = "easy"
547
+
548
+ class StepRequest(BaseModel):
549
+ episode_id: str
550
+ action: CleaningAction
551
+
552
+ class GraderRequest(BaseModel):
553
+ episode_id: str
554
+
555
+ # --- Standard OpenEnv endpoints ---
556
+
557
+ @app.post("/reset")
558
+ async def reset(req: ResetRequest):
559
+ obs, state = env.reset(task=req.task)
560
+ return {"observation": obs.model_dump(), "state": state.model_dump()}
561
+
562
+ @app.post("/step")
563
+ async def step(req: StepRequest):
564
+ try:
565
+ obs, reward, done = env.step(req.episode_id, req.action)
566
+ return {"observation": obs.model_dump(), "reward": reward, "done": done, "info": {}}
567
+ except KeyError:
568
+ raise HTTPException(status_code=404, detail="Episode not found")
569
+ except ValueError as e:
570
+ raise HTTPException(status_code=400, detail=str(e))
571
+
572
+ @app.get("/state")
573
+ async def state(episode_id: str):
574
+ ep = env.episodes.get(episode_id)
575
+ if not ep:
576
+ raise HTTPException(status_code=404, detail="Episode not found")
577
+ return {"episode_id": episode_id, "task": ep["task"],
578
+ "step": ep["step"], "max_steps": ep["max_steps"], "done": ep["done"]}
579
+
580
+ # --- Hackathon-required endpoints ---
581
+
582
+ @app.get("/tasks")
583
+ async def tasks():
584
+ return {
585
+ "tasks": [
586
+ {
587
+ "id": "easy",
588
+ "description": "Fix missing values in the Iris dataset (15% missing, numeric only).",
589
+ "max_steps": 20,
590
+ },
591
+ {
592
+ "id": "medium",
593
+ "description": "Fix missing values, type errors, and duplicates in the Adult Income dataset.",
594
+ "max_steps": 40,
595
+ },
596
+ {
597
+ "id": "hard",
598
+ "description": "Fix missing values, type errors, duplicates, outliers, and schema violations in the Credit-G dataset.",
599
+ "max_steps": 60,
600
+ },
601
+ ],
602
+ "action_schema": {
603
+ "action_type": "string (required): fill_missing | drop_duplicates | fix_type | normalize | drop_outliers | fix_schema_violation | done",
604
+ "column": "string (optional): target column name",
605
+ "strategy": "string (for fill_missing): mean | median | mode | constant",
606
+ "dtype": "string (for fix_type): int | float | str",
607
+ "method": "string (for drop_outliers): iqr | zscore",
608
+ "constraint": "string (for fix_schema_violation): non_negative | clamp_range",
609
+ "constant_value": "any (for fill_missing with strategy=constant)",
610
+ },
611
+ }
612
+
613
+ @app.post("/grader")
614
+ async def grader(req: GraderRequest):
615
+ ep = env.episodes.get(req.episode_id)
616
+ if not ep:
617
+ raise HTTPException(status_code=404, detail="Episode not found")
618
+ score = grade_episode(ep["current_df"], ep["task"], ep["target_col"])
619
+ return {"episode_id": req.episode_id, "task": ep["task"], "score": score}
620
+
621
+ @app.post("/baseline")
622
+ async def baseline():
623
+ scores = run_baseline()
624
+ return {"baseline_scores": scores}
625
+
626
+ @app.get("/health")
627
+ async def health():
628
+ return {"status": "ok"}
629
+ ```
630
+
631
+ ---
632
+
633
+ ### Phase 6: Baseline Inference Script (`baseline.py`)
634
+
635
+ Heuristic agent: fill all missing with median, fix types (coerce to numeric), drop duplicates, drop outliers via IQR, then call `done`.
636
+
637
+ ```python
638
+ # baseline.py
639
+ """
640
+ Heuristic baseline agent for the Data Cleaning environment.
641
+ Run: python baseline.py
642
+ """
643
+ import requests
644
+ import json
645
+
646
+ BASE_URL = "http://localhost:8000"
647
+
648
+ def run_single_episode(task: str) -> float:
649
+ # Reset
650
+ resp = requests.post(f"{BASE_URL}/reset", json={"task": task})
651
+ data = resp.json()
652
+ episode_id = data["state"]["episode_id"]
653
+ obs = data["observation"]
654
+
655
+ # Strategy: fill_missing β†’ fix_type β†’ drop_duplicates β†’ drop_outliers β†’ done
656
+ columns = obs["columns"]
657
+ issues = obs["column_issues"]
658
+
659
+ # 1. Fill missing values (median) for all columns with missing
660
+ for col, col_issues in issues.items():
661
+ if col_issues["missing_count"] > 0:
662
+ requests.post(f"{BASE_URL}/step", json={
663
+ "episode_id": episode_id,
664
+ "action": {"action_type": "fill_missing", "column": col, "strategy": "median"}
665
+ })
666
+
667
+ # 2. Fix type errors
668
+ for col, col_issues in issues.items():
669
+ if col_issues["type_errors"] > 0:
670
+ requests.post(f"{BASE_URL}/step", json={
671
+ "episode_id": episode_id,
672
+ "action": {"action_type": "fix_type", "column": col, "dtype": "float"}
673
+ })
674
+
675
+ # 3. Drop duplicates (once)
676
+ requests.post(f"{BASE_URL}/step", json={
677
+ "episode_id": episode_id,
678
+ "action": {"action_type": "drop_duplicates"}
679
+ })
680
+
681
+ # 4. Drop outliers for columns with many outliers
682
+ for col, col_issues in issues.items():
683
+ if col_issues["outlier_count"] > 5:
684
+ requests.post(f"{BASE_URL}/step", json={
685
+ "episode_id": episode_id,
686
+ "action": {"action_type": "drop_outliers", "column": col, "method": "iqr"}
687
+ })
688
+
689
+ # 5. Done
690
+ requests.post(f"{BASE_URL}/step", json={
691
+ "episode_id": episode_id,
692
+ "action": {"action_type": "done"}
693
+ })
694
+
695
+ # Grade
696
+ resp = requests.post(f"{BASE_URL}/grader", json={"episode_id": episode_id})
697
+ return resp.json()["score"]
698
+
699
+ def run_baseline() -> dict:
700
+ results = {}
701
+ for task in ["easy", "medium", "hard"]:
702
+ score = run_single_episode(task)
703
+ results[task] = round(score, 4)
704
+ print(f" Task {task}: {results[task]:.4f}")
705
+ return results
706
+
707
+ if __name__ == "__main__":
708
+ print("Running baseline agent...")
709
+ scores = run_baseline()
710
+ print(f"\nBaseline scores: {json.dumps(scores, indent=2)}")
711
+ ```
712
+
713
+ ---
714
+
715
+ ### Phase 7: OpenEnv Configuration Files
716
+
717
+ #### `openenv.yaml`
718
+
719
+ ```yaml
720
+ name: data-cleaning-env
721
+ version: "1.0.0"
722
+ description: "RL environment for tabular data cleaning. Agent issues structured commands to clean dirty datasets from OpenML. Graded by downstream ML model accuracy."
723
+ author: "Yash Marathe"
724
+ tags:
725
+ - data-engineering
726
+ - tabular
727
+ - real-world
728
+ - openml
729
+ tasks:
730
+ - id: easy
731
+ description: "Fix missing values in Iris dataset"
732
+ - id: medium
733
+ description: "Fix missing values, type errors, duplicates in Adult Income dataset"
734
+ - id: hard
735
+ description: "Fix all noise types in Credit-G dataset including outliers and schema violations"
736
+ server:
737
+ port: 8000
738
+ health_endpoint: /health
739
+ ```
740
+
741
+ #### `server/Dockerfile`
742
+
743
+ ```dockerfile
744
+ FROM python:3.11-slim
745
+
746
+ WORKDIR /app
747
+
748
+ COPY server/requirements.txt .
749
+ RUN pip install --no-cache-dir -r requirements.txt
750
+
751
+ COPY . .
752
+
753
+ # Pre-download datasets at build time (avoids cold-start latency)
754
+ RUN python -c "from datasets import load_clean_dataset; \
755
+ [load_clean_dataset(t) for t in ['easy', 'medium', 'hard']]"
756
+
757
+ EXPOSE 8000
758
+
759
+ CMD ["uvicorn", "server.app:app", "--host", "0.0.0.0", "--port", "8000"]
760
+ ```
761
+
762
+ #### `server/requirements.txt`
763
+
764
+ ```
765
+ fastapi>=0.115
766
+ uvicorn[standard]>=0.30
767
+ pydantic>=2.0
768
+ scikit-learn>=1.4
769
+ pandas>=2.0
770
+ numpy>=1.26
771
+ openenv-core>=0.1
772
+ ```
773
+ ---
774
+
775
+ ### Phase 8: Client (`client.py`)
776
+
777
+ ```python
778
+ # client.py β€” standard OpenEnv EnvClient wrapper
779
+ from openenv import EnvClient
780
+ from models import CleaningAction, Observation
781
+
782
+ class DataCleaningEnv(EnvClient):
783
+ async def reset(self, task: str = "easy") -> dict:
784
+ return await self._post("/reset", {"task": task})
785
+
786
+ async def step(self, episode_id: str, action: CleaningAction) -> dict:
787
+ return await self._post("/step", {
788
+ "episode_id": episode_id,
789
+ "action": action.model_dump()
790
+ })
791
+
792
+ async def get_state(self, episode_id: str) -> dict:
793
+ return await self._get(f"/state?episode_id={episode_id}")
794
+ ```
795
+
796
+ ---
797
+
798
+ ## Technical Considerations
799
+
800
+ ### Normalization Formula for Episode Score (Deferred to Planning β€” resolved here)
801
+
802
+ ```
803
+ score = clip((agent_acc - dirty_acc) / (oracle_acc - dirty_acc), 0.0, 1.0)
804
+ ```
805
+
806
+ - `oracle_acc` = RandomForest accuracy on original unmodified OpenML dataset
807
+ - `dirty_acc` = RandomForest accuracy on fully noise-injected dataset
808
+ - Precomputed once at startup using `datasets.py` + `noise_injector.py`
809
+ - Edge case: if `oracle_acc == dirty_acc` (noise had no effect), return 1.0 if agent matches oracle else 0.0
810
+
811
+ ### HF Spaces Constraints (Deferred β€” resolved here)
812
+
813
+ - Free HF Spaces: 2 CPU cores, 16GB RAM β€” sufficient for RandomForest on these small datasets
814
+ - adult dataset: use 2,000-row sample to keep grading under 2s
815
+ - Set `n_estimators=50` and `n_jobs=-1` for speed
816
+ - Datasets pre-downloaded at Docker build time via `RUN python -c "..."` step
817
+
818
+ ### Episode Memory
819
+
820
+ - Episodes stored in-memory in server process (`self.episodes` dict)
821
+ - HF Spaces restarts the container daily β€” acceptable for a hackathon env
822
+ - No persistence needed; each `reset()` creates a fresh episode
823
+
824
+ ### Concurrent Safety
825
+
826
+ - FastAPI is async; `environment.py` uses a plain dict which is safe for single-process uvicorn
827
+ - For multi-worker deployments: switch to process-safe storage (not needed for hackathon)
828
+
829
+ ---
830
+
831
+ ## System-Wide Impact
832
+
833
+ ### Interaction Graph
834
+
835
+ 1. Agent calls `POST /reset` β†’ `DataCleaningEnvironment.reset()` β†’ `load_clean_dataset()` β†’ `inject_noise()` β†’ `_make_observation()` β†’ returns `Observation` + `EpisodeState`
836
+ 2. Agent calls `POST /step` β†’ `env.step()` β†’ `_apply_action()` β†’ `_column_accuracy()` (per-step reward) β†’ `_make_observation()` β†’ returns updated `Observation`
837
+ 3. Agent calls `POST /grader` β†’ `grade_episode()` β†’ `train_and_score(cleaned_df)` β†’ bracketed normalization β†’ float score
838
+ 4. Agent calls `POST /baseline` β†’ `run_baseline()` β†’ runs heuristic agent through all 3 tasks β†’ returns dict of scores
839
+
840
+ ### Error Propagation
841
+
842
+ - Invalid `episode_id` β†’ 404 HTTPException (does not crash server)
843
+ - Invalid action (column not found, bad dtype) β†’ `-0.05` reward penalty, observation still returned; action silently no-ops
844
+ - `train_and_score` failure β†’ returns `0.0` (grader never crashes)
845
+ - Dataset download failure at startup β†’ server fails to start; this is caught at Docker build time by the pre-download step
846
+
847
+ ### State Lifecycle
848
+ - `reset()` creates a new episode entry. Episode lives in `self.episodes[episode_id]` until process restart.
849
+ - Calling `step()` after `done=True` raises `ValueError` (surfaced as 400)
850
+ - No partial state corruption risk: `_apply_action` returns a new DataFrame copy on success; original is only replaced on success
851
+
852
+ ---
853
+
854
+ ## Acceptance Criteria
855
+
856
+ ### Functional Requirements
857
+
858
+ - [ ] R1: `POST /reset?task=easy|medium|hard` returns `Observation` with `done=False`
859
+ - [ ] R2: All 7 action types are implemented and affect the DataFrame correctly
860
+ - [ ] R3: Observation includes column issues, column stats, step, reward, done
861
+ - [ ] R4: Three tasks use iris (easy), adult-2k-sample (medium), credit-g (hard) with the defined noise levels
862
+ - [ ] R5: Per-step reward is in `[-0.1, +0.1]`; episode grader score is in `[0.0, 1.0]`
863
+ - [ ] R6: Noise injection with same seed produces identical dirty dataset on every run
864
+ - [ ] R7: All required endpoints respond correctly: `/reset`, `/step`, `/state`, `/tasks`, `/grader`, `/baseline`, `/health`
865
+ - [ ] R8: `baseline.py` runs end-to-end without error and prints scores for all 3 tasks
866
+ - [ ] R9: `docker build` succeeds; container starts and `/health` returns 200
867
+ - [ ] R10: README documents action schema, observation format, setup, and baseline scores
868
+
869
+ ### Quality Gates
870
+
871
+ - [ ] Grader scores differ meaningfully across tasks: easy > medium > hard for the heuristic baseline
872
+ - [ ] Grader score for a perfect clean (oracle) = 1.0
873
+ - [ ] Grader score for an untouched dirty dataset β‰ˆ 0.0
874
+ - [ ] Episode completes in < 5s for easy/medium tasks on a single CPU core
875
+ - [ ] HF Space passes automated ping (HTTP 200 + valid `/reset` response)
876
+ - [ ] `openenv.yaml` validates with `openenv validate`
877
+
878
+ ---
879
+
880
+ ## Success Metrics
881
+
882
+ (see origin: `docs/brainstorms/2026-03-27-data-cleaning-env-requirements.md#success-criteria`)
883
+
884
+ - HF Space automated ping passes
885
+ - OpenEnv spec validator passes
886
+ - Docker build succeeds
887
+ - Baseline script completes with deterministic scores
888
+ - All 3 graders return scores in [0.0, 1.0]
889
+ - Harder tasks produce lower baseline scores (validates difficulty progression)
890
+
891
+ ---
892
+
893
+ ## Dependencies & Risks
894
+
895
+ | Risk | Likelihood | Mitigation |
896
+ |------|-----------|------|
897
+ | OpenML API down at Docker build time | Low | Add retry logic; cache datasets in repo as CSV fallback |
898
+ | adult dataset too large β†’ grader > 2s | Medium | Use 2,000-row sample; set `n_estimators=50` |
899
+ | HF Spaces memory limit exceeded | Low | RandomForest on <2k rows uses <200MB RAM |
900
+ | OpenEnv spec changes before deadline | Low | Pin `openenv-core` version in requirements.txt |
901
+ | Noise levels don't produce meaningful difficulty gap | Medium | Verify with a quick manual test run before submission |
902
+
903
+ ---
904
+
905
+ ## Implementation Order
906
+
907
+ ```
908
+ Day 1 (Mar 27-28): Scaffold + Data Layer
909
+ β”œβ”€β”€ openenv init data_cleaning_env
910
+ β”œβ”€β”€ datasets.py (fetch_openml, cache)
911
+ β”œβ”€β”€ noise_injector.py (3 task levels, seeded)
912
+ └── Smoke test: verify 3 datasets load and noise is visible
913
+
914
+ Day 2 (Mar 29-30): Core Environment + Models
915
+ β”œβ”€β”€ models.py (Pydantic models)
916
+ β”œβ”€β”€ server/environment.py (_apply_action, _column_accuracy, _make_observation)
917
+ └── Unit test: step through all 7 action types, verify reward sign
918
+
919
+ Day 3 (Mar 31 - Apr 1): Grader + API
920
+ β”œβ”€β”€ grader.py (train_and_score, oracle/dirty precompute, grade_episode)
921
+ β”œβ”€β”€ server/app.py (all endpoints)
922
+ └── Integration test: full episode resetβ†’step*Nβ†’grader for each task
923
+
924
+ Day 4 (Apr 2-3): Baseline + Packaging
925
+ β”œβ”€β”€ baseline.py (heuristic agent, run_baseline)
926
+ β”œβ”€β”€ client.py (EnvClient wrapper)
927
+ β”œβ”€β”€ openenv.yaml, Dockerfile, requirements.txt
928
+ └── docker build && docker run test
929
+
930
+ Day 5 (Apr 4-5): Deploy + README + Validation
931
+ β”œβ”€β”€ openenv push (HF Spaces deploy)
932
+ β”œβ”€β”€ README.md (action schema, observation format, setup, baseline scores)
933
+ β”œβ”€β”€ openenv validate
934
+ └── Submit via hackathon portal
935
+ ```
936
+
937
+ ---
938
+
939
+ ## Sources & References
940
+
941
+ ### Origin
942
+
943
+ - **Origin document:** [`docs/brainstorms/2026-03-27-data-cleaning-env-requirements.md`](../brainstorms/2026-03-27-data-cleaning-env-requirements.md)
944
+ Key decisions carried forward: structured action space, OpenML datasets (iris/adult/credit-g),
945
+ hybrid reward (per-step column accuracy + episode ML accuracy), simple sklearn RandomForest grader.
946
+
947
+ ### External References
948
+
949
+ - OpenEnv framework: https://github.com/meta-pytorch/OpenEnv
950
+ - OpenEnv docs: https://meta-pytorch.org/OpenEnv/
951
+ - OpenML dataset IDs: iris=61, adult=1590, credit-g=31
952
+ - `sklearn.datasets.fetch_openml`: https://scikit-learn.org/stable/modules/generated/sklearn.datasets.fetch_openml.html
953
+ - OpenEnv env structure: `openenv init` scaffold (see README: `envs/README.md`)
954
+ - HF Spaces deployment: `openenv push` CLI command
955
+ - Hackathon submission deadline: 7 Apr 2026, 11:59 PM