cleanops-openenv / tests /test_environment.py
harsharajkumar273's picture
Merge V2 review and dry-run mechanics
7c2c5f2
from __future__ import annotations
from cleanops_env.graders import grade_tables
from cleanops_env.local_env import LocalCleanOpsEnv
from cleanops_env.models import DataCleaningAction
from cleanops_env.tasks import TASK_CATALOG, clone_tables
def test_reset_step_state_api() -> None:
env = LocalCleanOpsEnv()
observation = env.reset(task_id="customer_contacts_easy", seed=7)
assert observation.task_id == "customer_contacts_easy"
assert observation.requested_seed == 7
assert observation.review_budget_remaining == 1
assert observation.supported_sync_targets == ["crm"]
assert len(observation.available_review_targets) == 1
assert 0.0 < observation.downstream_health.overall_health_score < 1.0
assert observation.done is False
assert observation.quality_score < 1.0
observation, reward, done, info = env.step(
DataCleaningAction(action_type="inspect_table", table_name="customers", reasoning="Inspect the main table first.")
)
assert isinstance(reward, float)
assert done is False
assert info["state"]["task_id"] == "customer_contacts_easy"
assert env.state().task_id == "customer_contacts_easy"
assert observation.focus_table is not None
assert observation.focus_table.name == "customers"
def test_oracle_solution_scores_one_for_all_tasks() -> None:
env = LocalCleanOpsEnv()
for task_id, task_spec in TASK_CATALOG.items():
observation = env.reset(task_id=task_id, seed=7)
for operation_id in task_spec.solution_operation_ids:
observation, _, done, _ = env.step(DataCleaningAction(action_type="apply_operation", operation_id=operation_id, reasoning=f"Apply {operation_id}"))
assert done is False
observation, _, done, _ = env.step(DataCleaningAction(action_type="submit", reasoning="Submit cleaned tables."))
assert done is True
assert observation.quality_score == 1.0
assert observation.grader.final_score == 1.0
def test_decoy_operation_lowers_easy_task_quality() -> None:
task_spec = TASK_CATALOG["customer_contacts_easy"]
clean_tables = clone_tables(task_spec.gold_tables)
damaged_tables = task_spec.operations["easy_drop_inactive_customers"].transform(clone_tables(clean_tables))
clean_grade = grade_tables(task_spec, clean_tables)
damaged_grade = grade_tables(task_spec, damaged_tables)
assert clean_grade.score == 1.0
assert damaged_grade.score < clean_grade.score
def test_seed_changes_visible_preview_rows() -> None:
env = LocalCleanOpsEnv()
observation_seed_2 = env.reset(task_id="customer_contacts_easy", seed=2)
preview_seed_2 = [row["customer_id"] for row in observation_seed_2.focus_table.rows[:4]]
observation_seed_7 = env.reset(task_id="customer_contacts_easy", seed=7)
preview_seed_7 = [row["customer_id"] for row in observation_seed_7.focus_table.rows[:4]]
assert observation_seed_2.requested_seed == 2
assert observation_seed_7.requested_seed == 7
assert preview_seed_2 != preview_seed_7
def test_request_review_queues_and_releases_deterministic_response() -> None:
env = LocalCleanOpsEnv()
observation = env.reset(task_id="crm_migration_hard", seed=7)
assert observation.review_budget_remaining == 2
assert len(observation.pending_reviews) == 0
assert len(observation.resolved_reviews) == 0
observation, reward, done, info = env.step(
DataCleaningAction(
action_type="request_review",
entity_type="customer",
entity_id="CU101",
reason_code="possible_duplicate",
reasoning="Escalate the ambiguous Ana Lopez duplicate before merging.",
)
)
assert done is False
assert reward < 0.0
assert observation.review_budget_remaining == 1
assert len(observation.pending_reviews) == 1
assert len(observation.resolved_reviews) == 0
assert "response will be available on the next step" in observation.last_action_status
assert info["state"]["requested_review_ids"] == ["hard_customer_merge_review"]
observation, reward, done, _ = env.step(
DataCleaningAction(
action_type="inspect_table",
table_name="customers",
reasoning="Read the customer table again after the review response arrives.",
)
)
assert done is False
assert reward > 0.0
assert len(observation.pending_reviews) == 0
assert len(observation.resolved_reviews) == 1
resolved_review = observation.resolved_reviews[0]
assert resolved_review.review_id == "hard_customer_merge_review"
assert "hard_merge_customers_by_email" in resolved_review.recommended_operation_ids
assert "Review response available" in observation.last_action_status
def test_run_sync_dry_run_surfaces_downstream_findings() -> None:
env = LocalCleanOpsEnv()
observation = env.reset(task_id="crm_migration_hard", seed=7)
starting_health = observation.downstream_health.overall_health_score
observation, reward, done, info = env.step(
DataCleaningAction(
action_type="run_sync_dry_run",
target_system="billing",
reasoning="Check whether the current migration state would break downstream billing.",
)
)
assert done is False
assert observation.last_dry_run is not None
assert observation.last_dry_run.target_system == "billing"
assert observation.last_dry_run.finding_count > 0
assert observation.last_dry_run.success_rate == observation.downstream_health.billing_link_integrity
assert "billing" in info["state"]["dry_run_targets"]
assert observation.downstream_health.overall_health_score == starting_health
def test_duplicate_review_request_is_penalized() -> None:
env = LocalCleanOpsEnv()
env.reset(task_id="customer_contacts_easy", seed=7)
env.step(
DataCleaningAction(
action_type="request_review",
entity_type="customer",
entity_id="C005",
reason_code="possible_duplicate",
reasoning="Ask for confirmation once.",
)
)
observation, reward, done, _ = env.step(
DataCleaningAction(
action_type="request_review",
entity_type="customer",
entity_id="C005",
reason_code="possible_duplicate",
reasoning="Repeat the same review request.",
)
)
assert done is False
assert reward < 0.0
assert observation.review_budget_remaining == 0
assert len(observation.pending_reviews) == 0
assert len(observation.resolved_reviews) == 1
assert "already requested" in observation.last_action_status