File size: 6,675 Bytes
ac326a6 b7e4141 7c2c5f2 ac326a6 b7e4141 7c2c5f2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | from __future__ import annotations
from cleanops_env.graders import grade_tables
from cleanops_env.local_env import LocalCleanOpsEnv
from cleanops_env.models import DataCleaningAction
from cleanops_env.tasks import TASK_CATALOG, clone_tables
def test_reset_step_state_api() -> None:
env = LocalCleanOpsEnv()
observation = env.reset(task_id="customer_contacts_easy", seed=7)
assert observation.task_id == "customer_contacts_easy"
assert observation.requested_seed == 7
assert observation.review_budget_remaining == 1
assert observation.supported_sync_targets == ["crm"]
assert len(observation.available_review_targets) == 1
assert 0.0 < observation.downstream_health.overall_health_score < 1.0
assert observation.done is False
assert observation.quality_score < 1.0
observation, reward, done, info = env.step(
DataCleaningAction(action_type="inspect_table", table_name="customers", reasoning="Inspect the main table first.")
)
assert isinstance(reward, float)
assert done is False
assert info["state"]["task_id"] == "customer_contacts_easy"
assert env.state().task_id == "customer_contacts_easy"
assert observation.focus_table is not None
assert observation.focus_table.name == "customers"
def test_oracle_solution_scores_one_for_all_tasks() -> None:
env = LocalCleanOpsEnv()
for task_id, task_spec in TASK_CATALOG.items():
observation = env.reset(task_id=task_id, seed=7)
for operation_id in task_spec.solution_operation_ids:
observation, _, done, _ = env.step(DataCleaningAction(action_type="apply_operation", operation_id=operation_id, reasoning=f"Apply {operation_id}"))
assert done is False
observation, _, done, _ = env.step(DataCleaningAction(action_type="submit", reasoning="Submit cleaned tables."))
assert done is True
assert observation.quality_score == 1.0
assert observation.grader.final_score == 1.0
def test_decoy_operation_lowers_easy_task_quality() -> None:
task_spec = TASK_CATALOG["customer_contacts_easy"]
clean_tables = clone_tables(task_spec.gold_tables)
damaged_tables = task_spec.operations["easy_drop_inactive_customers"].transform(clone_tables(clean_tables))
clean_grade = grade_tables(task_spec, clean_tables)
damaged_grade = grade_tables(task_spec, damaged_tables)
assert clean_grade.score == 1.0
assert damaged_grade.score < clean_grade.score
def test_seed_changes_visible_preview_rows() -> None:
env = LocalCleanOpsEnv()
observation_seed_2 = env.reset(task_id="customer_contacts_easy", seed=2)
preview_seed_2 = [row["customer_id"] for row in observation_seed_2.focus_table.rows[:4]]
observation_seed_7 = env.reset(task_id="customer_contacts_easy", seed=7)
preview_seed_7 = [row["customer_id"] for row in observation_seed_7.focus_table.rows[:4]]
assert observation_seed_2.requested_seed == 2
assert observation_seed_7.requested_seed == 7
assert preview_seed_2 != preview_seed_7
def test_request_review_queues_and_releases_deterministic_response() -> None:
env = LocalCleanOpsEnv()
observation = env.reset(task_id="crm_migration_hard", seed=7)
assert observation.review_budget_remaining == 2
assert len(observation.pending_reviews) == 0
assert len(observation.resolved_reviews) == 0
observation, reward, done, info = env.step(
DataCleaningAction(
action_type="request_review",
entity_type="customer",
entity_id="CU101",
reason_code="possible_duplicate",
reasoning="Escalate the ambiguous Ana Lopez duplicate before merging.",
)
)
assert done is False
assert reward < 0.0
assert observation.review_budget_remaining == 1
assert len(observation.pending_reviews) == 1
assert len(observation.resolved_reviews) == 0
assert "response will be available on the next step" in observation.last_action_status
assert info["state"]["requested_review_ids"] == ["hard_customer_merge_review"]
observation, reward, done, _ = env.step(
DataCleaningAction(
action_type="inspect_table",
table_name="customers",
reasoning="Read the customer table again after the review response arrives.",
)
)
assert done is False
assert reward > 0.0
assert len(observation.pending_reviews) == 0
assert len(observation.resolved_reviews) == 1
resolved_review = observation.resolved_reviews[0]
assert resolved_review.review_id == "hard_customer_merge_review"
assert "hard_merge_customers_by_email" in resolved_review.recommended_operation_ids
assert "Review response available" in observation.last_action_status
def test_run_sync_dry_run_surfaces_downstream_findings() -> None:
env = LocalCleanOpsEnv()
observation = env.reset(task_id="crm_migration_hard", seed=7)
starting_health = observation.downstream_health.overall_health_score
observation, reward, done, info = env.step(
DataCleaningAction(
action_type="run_sync_dry_run",
target_system="billing",
reasoning="Check whether the current migration state would break downstream billing.",
)
)
assert done is False
assert observation.last_dry_run is not None
assert observation.last_dry_run.target_system == "billing"
assert observation.last_dry_run.finding_count > 0
assert observation.last_dry_run.success_rate == observation.downstream_health.billing_link_integrity
assert "billing" in info["state"]["dry_run_targets"]
assert observation.downstream_health.overall_health_score == starting_health
def test_duplicate_review_request_is_penalized() -> None:
env = LocalCleanOpsEnv()
env.reset(task_id="customer_contacts_easy", seed=7)
env.step(
DataCleaningAction(
action_type="request_review",
entity_type="customer",
entity_id="C005",
reason_code="possible_duplicate",
reasoning="Ask for confirmation once.",
)
)
observation, reward, done, _ = env.step(
DataCleaningAction(
action_type="request_review",
entity_type="customer",
entity_id="C005",
reason_code="possible_duplicate",
reasoning="Repeat the same review request.",
)
)
assert done is False
assert reward < 0.0
assert observation.review_budget_remaining == 0
assert len(observation.pending_reviews) == 0
assert len(observation.resolved_reviews) == 1
assert "already requested" in observation.last_action_status
|