| from __future__ import annotations |
|
|
| from cleanops_env.graders import grade_tables |
| from cleanops_env.local_env import LocalCleanOpsEnv |
| from cleanops_env.models import DataCleaningAction |
| from cleanops_env.tasks import TASK_CATALOG, clone_tables |
|
|
|
|
| def test_reset_step_state_api() -> None: |
| env = LocalCleanOpsEnv() |
| observation = env.reset(task_id="customer_contacts_easy", seed=7) |
| assert observation.task_id == "customer_contacts_easy" |
| assert observation.requested_seed == 7 |
| assert observation.review_budget_remaining == 1 |
| assert observation.supported_sync_targets == ["crm"] |
| assert len(observation.available_review_targets) == 1 |
| assert 0.0 < observation.downstream_health.overall_health_score < 1.0 |
| assert observation.done is False |
| assert observation.quality_score < 1.0 |
|
|
| observation, reward, done, info = env.step( |
| DataCleaningAction(action_type="inspect_table", table_name="customers", reasoning="Inspect the main table first.") |
| ) |
| assert isinstance(reward, float) |
| assert done is False |
| assert info["state"]["task_id"] == "customer_contacts_easy" |
| assert env.state().task_id == "customer_contacts_easy" |
| assert observation.focus_table is not None |
| assert observation.focus_table.name == "customers" |
|
|
|
|
| def test_oracle_solution_scores_one_for_all_tasks() -> None: |
| env = LocalCleanOpsEnv() |
| for task_id, task_spec in TASK_CATALOG.items(): |
| observation = env.reset(task_id=task_id, seed=7) |
| for operation_id in task_spec.solution_operation_ids: |
| observation, _, done, _ = env.step(DataCleaningAction(action_type="apply_operation", operation_id=operation_id, reasoning=f"Apply {operation_id}")) |
| assert done is False |
| observation, _, done, _ = env.step(DataCleaningAction(action_type="submit", reasoning="Submit cleaned tables.")) |
| assert done is True |
| assert observation.quality_score == 1.0 |
| assert observation.grader.final_score == 1.0 |
|
|
|
|
| def test_decoy_operation_lowers_easy_task_quality() -> None: |
| task_spec = TASK_CATALOG["customer_contacts_easy"] |
| clean_tables = clone_tables(task_spec.gold_tables) |
| damaged_tables = task_spec.operations["easy_drop_inactive_customers"].transform(clone_tables(clean_tables)) |
| clean_grade = grade_tables(task_spec, clean_tables) |
| damaged_grade = grade_tables(task_spec, damaged_tables) |
| assert clean_grade.score == 1.0 |
| assert damaged_grade.score < clean_grade.score |
|
|
|
|
| def test_seed_changes_visible_preview_rows() -> None: |
| env = LocalCleanOpsEnv() |
| observation_seed_2 = env.reset(task_id="customer_contacts_easy", seed=2) |
| preview_seed_2 = [row["customer_id"] for row in observation_seed_2.focus_table.rows[:4]] |
|
|
| observation_seed_7 = env.reset(task_id="customer_contacts_easy", seed=7) |
| preview_seed_7 = [row["customer_id"] for row in observation_seed_7.focus_table.rows[:4]] |
|
|
| assert observation_seed_2.requested_seed == 2 |
| assert observation_seed_7.requested_seed == 7 |
| assert preview_seed_2 != preview_seed_7 |
|
|
|
|
| def test_request_review_queues_and_releases_deterministic_response() -> None: |
| env = LocalCleanOpsEnv() |
| observation = env.reset(task_id="crm_migration_hard", seed=7) |
| assert observation.review_budget_remaining == 2 |
| assert len(observation.pending_reviews) == 0 |
| assert len(observation.resolved_reviews) == 0 |
|
|
| observation, reward, done, info = env.step( |
| DataCleaningAction( |
| action_type="request_review", |
| entity_type="customer", |
| entity_id="CU101", |
| reason_code="possible_duplicate", |
| reasoning="Escalate the ambiguous Ana Lopez duplicate before merging.", |
| ) |
| ) |
| assert done is False |
| assert reward < 0.0 |
| assert observation.review_budget_remaining == 1 |
| assert len(observation.pending_reviews) == 1 |
| assert len(observation.resolved_reviews) == 0 |
| assert "response will be available on the next step" in observation.last_action_status |
| assert info["state"]["requested_review_ids"] == ["hard_customer_merge_review"] |
|
|
| observation, reward, done, _ = env.step( |
| DataCleaningAction( |
| action_type="inspect_table", |
| table_name="customers", |
| reasoning="Read the customer table again after the review response arrives.", |
| ) |
| ) |
| assert done is False |
| assert reward > 0.0 |
| assert len(observation.pending_reviews) == 0 |
| assert len(observation.resolved_reviews) == 1 |
| resolved_review = observation.resolved_reviews[0] |
| assert resolved_review.review_id == "hard_customer_merge_review" |
| assert "hard_merge_customers_by_email" in resolved_review.recommended_operation_ids |
| assert "Review response available" in observation.last_action_status |
|
|
|
|
| def test_run_sync_dry_run_surfaces_downstream_findings() -> None: |
| env = LocalCleanOpsEnv() |
| observation = env.reset(task_id="crm_migration_hard", seed=7) |
| starting_health = observation.downstream_health.overall_health_score |
|
|
| observation, reward, done, info = env.step( |
| DataCleaningAction( |
| action_type="run_sync_dry_run", |
| target_system="billing", |
| reasoning="Check whether the current migration state would break downstream billing.", |
| ) |
| ) |
| assert done is False |
| assert observation.last_dry_run is not None |
| assert observation.last_dry_run.target_system == "billing" |
| assert observation.last_dry_run.finding_count > 0 |
| assert observation.last_dry_run.success_rate == observation.downstream_health.billing_link_integrity |
| assert "billing" in info["state"]["dry_run_targets"] |
| assert observation.downstream_health.overall_health_score == starting_health |
|
|
|
|
| def test_duplicate_review_request_is_penalized() -> None: |
| env = LocalCleanOpsEnv() |
| env.reset(task_id="customer_contacts_easy", seed=7) |
| env.step( |
| DataCleaningAction( |
| action_type="request_review", |
| entity_type="customer", |
| entity_id="C005", |
| reason_code="possible_duplicate", |
| reasoning="Ask for confirmation once.", |
| ) |
| ) |
| observation, reward, done, _ = env.step( |
| DataCleaningAction( |
| action_type="request_review", |
| entity_type="customer", |
| entity_id="C005", |
| reason_code="possible_duplicate", |
| reasoning="Repeat the same review request.", |
| ) |
| ) |
| assert done is False |
| assert reward < 0.0 |
| assert observation.review_budget_remaining == 0 |
| assert len(observation.pending_reviews) == 0 |
| assert len(observation.resolved_reviews) == 1 |
| assert "already requested" in observation.last_action_status |
|
|