"""OpenEnv server-side environment for operational data cleaning tasks.""" from __future__ import annotations import copy import random from uuid import uuid4 from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import EnvironmentMetadata from cleanops_env.graders import build_table_summary, count_duplicate_groups, grade_tables from cleanops_env.models import ( ActionCostEntry, DataCleaningAction, DataCleaningObservation, DataCleaningState, DownstreamHealth, DryRunFinding, DryRunReport, OperationDetail, OperationSummary, PendingReview, ReviewResolution, ReviewTarget, RiskCard, RewardBreakdown, RowChange, TableView, ) from cleanops_env.tasks import ( ReviewCaseSpec, TaskSpec, apply_operation_to_tables, clone_tables, first_table_name, get_task_spec, list_task_ids, normalize_whitespace, sorted_rows, ) ACTION_COSTS: dict[str, float] = { "inspect_table": 0.005, "inspect_operation": 0.005, "apply_operation:safe": 0.01, "apply_operation:review": 0.015, "apply_operation:destructive": 0.03, "request_review": 0.025, "run_sync_dry_run": 0.02, "submit": 0.005, } ACTION_COST_DESCRIPTIONS: dict[str, str] = { "inspect_table": "Low-cost inspection to understand current records.", "inspect_operation": "Low-cost preview to inspect an operation before applying it.", "apply_operation:safe": "Safe automated cleanup with low operational risk.", "apply_operation:review": "Review-sensitive cleanup that should be used more deliberately.", "apply_operation:destructive": "Destructive cleanup with higher business risk if applied incorrectly.", "request_review": "Consumes limited human-review budget to resolve ambiguity safely.", "run_sync_dry_run": "Runs a deterministic downstream system simulation before submit.", "submit": "Low-cost finalization step after cleanup is complete.", } class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservation, DataCleaningState]): """A realistic data-cleaning workflow environment with deterministic graders.""" SUPPORTS_CONCURRENT_SESSIONS = True def __init__(self) -> None: super().__init__() self._task_order = list_task_ids() self._task_spec = get_task_spec(self._task_order[0]) self._grade = grade_tables(self._task_spec, self._task_spec.dirty_tables) self._focus_table_name = first_table_name(self._task_spec) self._focus_operation_detail: OperationDetail | None = None self._done = False self._initial_issue_count = max(1, len(self._grade.validation_issues)) initial_tables = clone_tables(self._task_spec.dirty_tables) initial_downstream_health = self._compute_downstream_health(self._task_spec, initial_tables, self._grade.validation_issues) self._state = DataCleaningState( episode_id=str(uuid4()), step_count=0, task_id=self._task_spec.task_id, task_title=self._task_spec.title, difficulty=self._task_spec.difficulty, requested_seed=None, max_steps=self._task_spec.max_steps, review_budget_total=self._task_spec.review_budget, review_budget_remaining=self._task_spec.review_budget, submitted=False, current_score=self._grade.score, best_score=self._grade.score, outstanding_issue_count=len(self._grade.validation_issues), downstream_health=initial_downstream_health, last_dry_run=None, tables=initial_tables, applied_operation_ids=[], inspected_tables=[self._focus_table_name], inspected_operations=[], requested_review_ids=[], pending_reviews=[], resolved_reviews=[], dry_run_targets=[], recent_history=[], ) def reset( self, seed: int | None = None, episode_id: str | None = None, task_id: str | None = None, **kwargs: object, ) -> DataCleaningObservation: del kwargs selected_task_id = task_id or self._task_order[0] self._task_spec = get_task_spec(selected_task_id) normalized_seed = seed if seed is None else max(0, int(seed)) self._focus_table_name = self._choose_initial_focus_table(self._task_spec, normalized_seed) self._focus_operation_detail = None self._done = False self._grade = grade_tables(self._task_spec, self._task_spec.dirty_tables) self._initial_issue_count = max(1, len(self._grade.validation_issues)) initial_tables = clone_tables(self._task_spec.dirty_tables) initial_downstream_health = self._compute_downstream_health(self._task_spec, initial_tables, self._grade.validation_issues) self._state = DataCleaningState( episode_id=episode_id or str(uuid4()), step_count=0, task_id=self._task_spec.task_id, task_title=self._task_spec.title, difficulty=self._task_spec.difficulty, requested_seed=normalized_seed, max_steps=self._task_spec.max_steps, review_budget_total=self._task_spec.review_budget, review_budget_remaining=self._task_spec.review_budget, submitted=False, current_score=self._grade.score, best_score=self._grade.score, outstanding_issue_count=len(self._grade.validation_issues), downstream_health=initial_downstream_health, last_dry_run=None, tables=initial_tables, applied_operation_ids=[], inspected_tables=[self._focus_table_name], inspected_operations=[], requested_review_ids=[], pending_reviews=[], resolved_reviews=[], dry_run_targets=[], recent_history=[f"reset -> loaded task {self._task_spec.task_id} ({self._task_spec.difficulty}) seed={normalized_seed}"], ) return self._build_observation( reward_breakdown=RewardBreakdown(total=0.0), reward=0.0, done=False, last_action_status=f"Environment reset to task {self._task_spec.task_id}.", last_action_error=None, ) def step( self, action: DataCleaningAction, timeout_s: float | None = None, **kwargs: object, ) -> DataCleaningObservation: del timeout_s, kwargs if self._done: penalty = RewardBreakdown(invalid_action_penalty=-0.25, total=-0.25) return self._build_observation( reward_breakdown=penalty, reward=penalty.total, done=True, last_action_status="Episode already finished. Call reset() to start a new task.", last_action_error="Episode already finished. Call reset() to start a new task.", ) self._state.step_count += 1 previous_score = self._state.current_score previous_issue_count = self._state.outstanding_issue_count previous_downstream_score = self._state.downstream_health.overall_health_score invalid_action_penalty = 0.0 noop_penalty = 0.0 insight_bonus = 0.0 review_bonus = 0.0 review_cost_penalty = 0.0 action_cost_penalty = 0.0 submit_bonus = 0.0 status_message = "" action_error: str | None = None released_reviews = self._release_ready_reviews() if released_reviews: review_bonus = round(0.04 * len(released_reviews), 4) if action.action_type == "inspect_table": table_name = normalize_whitespace(action.table_name or "") if table_name not in self._state.tables: invalid_action_penalty = -0.25 status_message = f"Unknown table '{table_name}'." action_error = status_message else: self._focus_table_name = table_name if table_name not in self._state.inspected_tables: self._state.inspected_tables.append(table_name) insight_bonus = 0.01 status_message = f"Inspected table '{table_name}'." else: noop_penalty = -0.02 status_message = f"Table '{table_name}' was already inspected." elif action.action_type == "inspect_operation": operation_id = normalize_whitespace(action.operation_id or "") if operation_id not in self._task_spec.operations: invalid_action_penalty = -0.25 status_message = f"Unknown operation '{operation_id}'." action_error = status_message else: self._focus_operation_detail = self._build_operation_detail(self._task_spec, operation_id, self._state.tables, None) if operation_id not in self._state.inspected_operations: self._state.inspected_operations.append(operation_id) insight_bonus = 0.01 status_message = f"Inspected operation '{operation_id}'." else: noop_penalty = -0.02 status_message = f"Operation '{operation_id}' was already inspected." elif action.action_type == "apply_operation": operation_id = normalize_whitespace(action.operation_id or "") if operation_id not in self._task_spec.operations: invalid_action_penalty = -0.25 status_message = f"Unknown operation '{operation_id}'." action_error = status_message elif operation_id in self._state.applied_operation_ids: noop_penalty = -0.12 self._focus_operation_detail = self._build_operation_detail(self._task_spec, operation_id, self._state.tables, self._state.tables) status_message = f"Operation '{operation_id}' was already applied." else: before_tables = clone_tables(self._state.tables) after_tables = apply_operation_to_tables(self._task_spec, before_tables, operation_id) self._focus_operation_detail = self._build_operation_detail(self._task_spec, operation_id, before_tables, after_tables) if after_tables == before_tables: noop_penalty = -0.08 status_message = f"Operation '{operation_id}' produced no table changes." else: self._state.tables = clone_tables(after_tables) self._state.applied_operation_ids.append(operation_id) affected_tables = ", ".join(self._task_spec.operations[operation_id].tables_affected) if self._task_spec.operations[operation_id].tables_affected: self._focus_table_name = self._task_spec.operations[operation_id].tables_affected[0] status_message = f"Applied '{operation_id}' to {affected_tables or 'current tables'}." elif action.action_type == "request_review": entity_type = normalize_whitespace(action.entity_type or "").lower() entity_id = normalize_whitespace(action.entity_id or "") reason_code = normalize_whitespace(action.reason_code or "") review_case = self._find_review_case(entity_type, entity_id, reason_code) if not entity_type or not entity_id or not reason_code: invalid_action_penalty = -0.25 status_message = "request_review requires entity_type, entity_id, and reason_code." action_error = status_message elif review_case is None: invalid_action_penalty = -0.2 status_message = f"No deterministic review case exists for {entity_type}:{entity_id} ({reason_code})." action_error = status_message elif review_case.review_id in self._state.requested_review_ids: noop_penalty = -0.05 status_message = f"Review '{review_case.review_id}' was already requested." elif self._state.review_budget_remaining <= 0: invalid_action_penalty = -0.18 status_message = "No review budget remaining for this episode." action_error = status_message else: self._state.review_budget_remaining -= 1 self._state.requested_review_ids.append(review_case.review_id) self._state.pending_reviews.append( PendingReview( review_id=review_case.review_id, entity_type=review_case.entity_type, entity_id=review_case.entity_id, reason_code=review_case.reason_code, title=review_case.title, requested_at_step=self._state.step_count, ready_at_step=self._state.step_count + 1, ) ) review_cost_penalty = -0.02 status_message = ( f"Queued review '{review_case.review_id}' for {review_case.entity_type} {review_case.entity_id}; " "response will be available on the next step." ) elif action.action_type == "run_sync_dry_run": target_system = action.target_system if target_system is None: invalid_action_penalty = -0.2 status_message = "run_sync_dry_run requires target_system." action_error = status_message elif target_system not in self._task_spec.sync_targets: invalid_action_penalty = -0.2 status_message = f"Task '{self._task_spec.task_id}' does not support dry-run target '{target_system}'." action_error = status_message else: self._state.last_dry_run = self._build_dry_run_report(target_system) if target_system not in self._state.dry_run_targets: self._state.dry_run_targets.append(target_system) insight_bonus = max(insight_bonus, 0.01) else: noop_penalty = min(noop_penalty, -0.01) status_message = self._state.last_dry_run.summary elif action.action_type == "submit": self._state.submitted = True self._done = True status_message = "Submitted cleaned tables for grading." action_cost_penalty = -self._estimate_action_cost(action) self._grade = grade_tables(self._task_spec, self._state.tables) self._state.current_score = self._grade.score self._state.best_score = max(self._state.best_score, self._grade.score) self._state.outstanding_issue_count = len(self._grade.validation_issues) self._state.downstream_health = self._compute_downstream_health(self._task_spec, self._state.tables, self._grade.validation_issues) quality_delta = round(self._state.current_score - previous_score, 4) issue_delta = round((previous_issue_count - self._state.outstanding_issue_count) / self._initial_issue_count, 4) downstream_health_delta = round(self._state.downstream_health.overall_health_score - previous_downstream_score, 4) efficiency_penalty = -0.01 if action.action_type == "submit": submission_health = round(0.65 * self._state.current_score + 0.35 * self._state.downstream_health.overall_health_score, 4) submit_bonus = round(0.4 * submission_health, 4) if submission_health >= 0.82 else round(-0.2 * (1.0 - submission_health), 4) if self._state.step_count >= self._state.max_steps and not self._done: self._done = True self._state.submitted = False status_message = f"{status_message} Step budget exhausted; episode truncated.".strip() if released_reviews: release_note = ", ".join(review.review_id for review in released_reviews) status_message = f"{status_message} Review response available: {release_note}.".strip() reward_total = round( 1.0 * quality_delta + 0.35 * issue_delta + 0.55 * downstream_health_delta + insight_bonus + review_bonus + efficiency_penalty + invalid_action_penalty + noop_penalty + review_cost_penalty + action_cost_penalty + submit_bonus, 4, ) reward_breakdown = RewardBreakdown( quality_delta=quality_delta, issue_delta=issue_delta, downstream_health_delta=downstream_health_delta, insight_bonus=insight_bonus, review_bonus=review_bonus, efficiency_penalty=efficiency_penalty, invalid_action_penalty=invalid_action_penalty, noop_penalty=noop_penalty, review_cost_penalty=review_cost_penalty, action_cost_penalty=action_cost_penalty, submit_bonus=submit_bonus, total=reward_total, ) action_descriptor = action.action_type if action.operation_id: action_descriptor += f"[{action.operation_id}]" if action.table_name: action_descriptor += f"[{action.table_name}]" if action.entity_id: action_descriptor += f"[{action.entity_id}]" if action.target_system: action_descriptor += f"[{action.target_system}]" self._state.recent_history.append(f"step {self._state.step_count}: {action_descriptor} -> score={self._state.current_score:.4f}") self._state.recent_history = self._state.recent_history[-10:] return self._build_observation( reward_breakdown=reward_breakdown, reward=reward_total, done=self._done, last_action_status=status_message or "Action processed.", last_action_error=action_error, ) @property def state(self) -> DataCleaningState: return self._state def get_metadata(self) -> EnvironmentMetadata: return EnvironmentMetadata( name="CleanOpsEnvironment", description="A realistic OpenEnv benchmark where an agent cleans operational customer, order, subscription, and payment tables using a curated data-cleaning toolkit.", version="0.1.0", author="OpenEnv CleanOps", ) def _build_observation( self, *, reward_breakdown: RewardBreakdown, reward: float, done: bool, last_action_status: str, last_action_error: str | None, ) -> DataCleaningObservation: summaries = [build_table_summary(self._task_spec, table_name, self._state.tables) for table_name in self._task_spec.dirty_tables] focus_table = self._build_table_view(self._task_spec, self._focus_table_name) available_operations = [ OperationSummary( operation_id=operation.operation_id, title=operation.title, category=operation.category, risk=operation.risk, tables_affected=list(operation.tables_affected), description=operation.description, already_applied=operation.operation_id in self._state.applied_operation_ids, ) for operation in sorted(self._task_spec.operations.values(), key=lambda op: op.operation_id) ] available_review_targets = [ ReviewTarget( review_id=review_case.review_id, entity_type=review_case.entity_type, entity_id=review_case.entity_id, reason_code=review_case.reason_code, title=review_case.title, detail=review_case.detail, recommended_operation_ids=list(review_case.recommended_operation_ids), ) for review_case in sorted(self._task_spec.review_cases.values(), key=lambda case: case.review_id) ] return DataCleaningObservation( task_id=self._task_spec.task_id, task_title=self._task_spec.title, difficulty=self._task_spec.difficulty, requested_seed=self._state.requested_seed, objective=self._task_spec.objective, dataset_context=self._task_spec.dataset_context, quality_score=self._state.current_score, best_score=self._state.best_score, remaining_steps=max(0, self._state.max_steps - self._state.step_count), review_budget_remaining=self._state.review_budget_remaining, supported_sync_targets=list(self._task_spec.sync_targets), downstream_health=self._state.downstream_health, risk_cards=self._build_risk_cards(), last_dry_run=self._state.last_dry_run, action_costs=self._build_action_cost_entries(), table_summaries=summaries, focus_table=focus_table, available_operations=available_operations, available_review_targets=available_review_targets, pending_reviews=list(self._state.pending_reviews), resolved_reviews=list(self._state.resolved_reviews), focus_operation=self._focus_operation_detail, validation_issues=self._grade.validation_issues, issue_cards=list(self._task_spec.issue_cards), recent_history=list(self._state.recent_history), grader=self._grade.breakdown, reward_breakdown=reward_breakdown, last_action_status=last_action_status, last_action_error=last_action_error, reward=reward, done=done, metadata={ "episode_id": self._state.episode_id, "requested_seed": self._state.requested_seed, "applied_operation_ids": list(self._state.applied_operation_ids), "review_budget_remaining": self._state.review_budget_remaining, "requested_review_ids": list(self._state.requested_review_ids), "dry_run_targets": list(self._state.dry_run_targets), "submitted": self._state.submitted, }, ) def _build_table_view(self, task_spec: TaskSpec, table_name: str) -> TableView: primary_key = task_spec.primary_keys[table_name] rows = self._preview_rows(task_spec, table_name, self._state.tables.get(table_name, [])) columns = sorted({column_name for row in rows for column_name in row}) return TableView(name=table_name, primary_key=primary_key, columns=columns, rows=rows) def _choose_initial_focus_table(self, task_spec: TaskSpec, seed: int | None) -> str: table_names = sorted(task_spec.dirty_tables) if not table_names: return first_table_name(task_spec) if seed is None: return table_names[0] return table_names[seed % len(table_names)] def _preview_rows( self, task_spec: TaskSpec, table_name: str, rows: list[dict[str, str]], ) -> list[dict[str, str]]: primary_key = task_spec.primary_keys[table_name] ordered_rows = sorted_rows(rows, primary_key) seed = self._state.requested_seed if seed is None or len(ordered_rows) <= 1: return ordered_rows shuffled_rows = copy.deepcopy(ordered_rows) random.Random(seed + sum(ord(char) for char in table_name)).shuffle(shuffled_rows) return shuffled_rows def _find_review_case(self, entity_type: str, entity_id: str, reason_code: str) -> ReviewCaseSpec | None: for review_case in self._task_spec.review_cases.values(): if ( review_case.entity_type == entity_type and review_case.entity_id == entity_id and review_case.reason_code == reason_code ): return review_case return None def _release_ready_reviews(self) -> list[ReviewResolution]: if not self._state.pending_reviews: return [] still_pending: list[PendingReview] = [] released: list[ReviewResolution] = [] for pending_review in self._state.pending_reviews: if pending_review.ready_at_step > self._state.step_count: still_pending.append(pending_review) continue review_case = self._task_spec.review_cases[pending_review.review_id] released_review = ReviewResolution( review_id=review_case.review_id, entity_type=review_case.entity_type, entity_id=review_case.entity_id, reason_code=review_case.reason_code, title=review_case.title, resolution=review_case.resolution, response_summary=review_case.response_summary, evidence_summary=review_case.evidence_summary, recommended_operation_ids=list(review_case.recommended_operation_ids), ) self._state.resolved_reviews.append(released_review) released.append(released_review) self._state.pending_reviews = still_pending return released def _estimate_action_cost(self, action: DataCleaningAction) -> float: if action.action_type == "apply_operation": operation = self._task_spec.operations.get(normalize_whitespace(action.operation_id or "")) if operation is None: return ACTION_COSTS["apply_operation:safe"] if operation.risk == "review": return ACTION_COSTS["apply_operation:review"] if operation.risk == "destructive": return ACTION_COSTS["apply_operation:destructive"] return ACTION_COSTS["apply_operation:safe"] return ACTION_COSTS.get(action.action_type, 0.01) def _build_action_cost_entries(self) -> list[ActionCostEntry]: return [ ActionCostEntry(action_key=action_key, estimated_cost=estimated_cost, description=ACTION_COST_DESCRIPTIONS[action_key]) for action_key, estimated_cost in ACTION_COSTS.items() ] @staticmethod def _open_metric(value: float) -> float: return round(min(0.99, max(0.01, value)), 4) def _compute_downstream_health( self, task_spec: TaskSpec, tables: dict[str, list[dict[str, str]]], validation_issues: list, ) -> DownstreamHealth: customers = tables.get("customers", []) orders = tables.get("orders", []) subscriptions = tables.get("subscriptions", []) payments = tables.get("payments", []) crm_rows = max(1, len(customers) + len(subscriptions)) billing_rows = max(1, len(orders) + len(subscriptions) + len(payments)) payment_rows = max(1, len(orders) + len(payments)) crm_issue_weight = sum(max(1, len(issue.row_ids)) for issue in validation_issues if issue.table_name in {"customers", "subscriptions"}) billing_issue_weight = sum( max(1, len(issue.row_ids)) for issue in validation_issues if issue.table_name in {"orders", "payments", "subscriptions"} and (issue.code.startswith("foreign_key:") or issue.code.startswith("required:") or issue.code.startswith("unique:")) ) payment_issue_weight = sum( max(1, len(issue.row_ids)) for issue in validation_issues if issue.table_name in {"orders", "payments"} ) customer_duplicate_groups = count_duplicate_groups(task_spec, "customers", customers) if "customers" in task_spec.duplicate_identity_columns else 0 customer_rows = max(1, len(customers)) payment_duplicate_groups = count_duplicate_groups(task_spec, "payments", payments) if "payments" in task_spec.duplicate_identity_columns else 0 crm_sync_success_rate = self._open_metric(1.0 - (crm_issue_weight / max(2, crm_rows * 2))) if not orders and not payments: billing_link_integrity = 0.99 revenue_reporting_risk = 0.01 else: billing_link_integrity = self._open_metric(1.0 - (billing_issue_weight / max(2, billing_rows * 2))) revenue_reporting_risk = self._open_metric(min(0.99, (payment_issue_weight / max(2, payment_rows * 2)) + (payment_duplicate_groups / max(1, payment_rows)))) duplicate_contact_risk = self._open_metric(min(0.99, (customer_duplicate_groups / customer_rows) + 0.06 * sum(1 for issue in validation_issues if issue.code.startswith("unique:customers")))) overall_health_score = self._open_metric( ( crm_sync_success_rate + billing_link_integrity + (1.0 - duplicate_contact_risk) + (1.0 - revenue_reporting_risk) ) / 4.0 ) return DownstreamHealth( crm_sync_success_rate=crm_sync_success_rate, billing_link_integrity=billing_link_integrity, duplicate_contact_risk=duplicate_contact_risk, revenue_reporting_risk=revenue_reporting_risk, overall_health_score=overall_health_score, ) def _build_risk_cards(self) -> list[RiskCard]: health = self._state.downstream_health cards = [ RiskCard( title="CRM import risk", detail="Customer and subscription issues can block CRM migration syncs.", severity="high" if health.crm_sync_success_rate < 0.8 else "medium" if health.crm_sync_success_rate < 0.92 else "low", metric_name="crm_sync_success_rate", current_value=health.crm_sync_success_rate, recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_tables({"customers", "subscriptions"})], ), RiskCard( title="Billing linkage risk", detail="Broken foreign keys or missing IDs can mislink orders, subscriptions, and payments.", severity="high" if health.billing_link_integrity < 0.8 else "medium" if health.billing_link_integrity < 0.92 else "low", metric_name="billing_link_integrity", current_value=health.billing_link_integrity, recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_tables({"orders", "subscriptions", "payments"})], ), RiskCard( title="Duplicate contact risk", detail="Remaining duplicate customer identities can create bad merges downstream.", severity="high" if health.duplicate_contact_risk > 0.3 else "medium" if health.duplicate_contact_risk > 0.12 else "low", metric_name="duplicate_contact_risk", current_value=health.duplicate_contact_risk, recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_keyword("merge")], ), RiskCard( title="Revenue reporting risk", detail="Duplicate or mislinked payment and order facts can distort downstream reporting.", severity="high" if health.revenue_reporting_risk > 0.3 else "medium" if health.revenue_reporting_risk > 0.12 else "low", metric_name="revenue_reporting_risk", current_value=health.revenue_reporting_risk, recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_tables({"orders", "payments"})], ), ] return cards def _recommended_operation_ids_for_tables(self, table_names: set[str]) -> list[str]: return [ operation.operation_id for operation in sorted(self._task_spec.operations.values(), key=lambda op: op.operation_id) if set(operation.tables_affected) & table_names ][:4] def _recommended_operation_ids_for_keyword(self, keyword: str) -> list[str]: lowered = keyword.lower() return [ operation.operation_id for operation in sorted(self._task_spec.operations.values(), key=lambda op: op.operation_id) if lowered in operation.operation_id.lower() or lowered in operation.title.lower() ][:4] def _build_dry_run_report(self, target_system: str) -> DryRunReport: findings: list[DryRunFinding] = [] for issue in self._grade.validation_issues: if target_system == "crm" and issue.table_name not in {"customers", "subscriptions"}: continue if target_system == "billing" and issue.table_name not in {"orders", "subscriptions", "payments"}: continue findings.append( DryRunFinding( code=issue.code, severity=issue.severity, table_name=issue.table_name, row_ids=list(issue.row_ids), message=issue.message, ) ) health = self._state.downstream_health success_rate = health.crm_sync_success_rate if target_system == "crm" else health.billing_link_integrity if target_system == "crm" and health.duplicate_contact_risk > 0.12: findings.append( DryRunFinding( code="risk:duplicate_contacts", severity="medium" if health.duplicate_contact_risk <= 0.3 else "high", table_name="customers", message="CRM dry run predicts duplicate-contact collisions after import.", ) ) if target_system == "billing" and health.revenue_reporting_risk > 0.12: findings.append( DryRunFinding( code="risk:revenue_reporting", severity="medium" if health.revenue_reporting_risk <= 0.3 else "high", table_name="payments" if "payments" in self._state.tables else "orders", message="Billing dry run predicts mislinked or duplicated revenue facts.", ) ) summary = ( f"Dry run for {target_system.upper()} found {len(findings)} blocking or risky findings; " f"estimated success rate is {success_rate:.2f}." ) return DryRunReport( target_system=target_system, success_rate=success_rate, finding_count=len(findings), findings=findings, summary=summary, generated_at_step=self._state.step_count, ) def _build_operation_detail( self, task_spec: TaskSpec, operation_id: str, before_tables: dict[str, list[dict[str, str]]], after_tables: dict[str, list[dict[str, str]]] | None, ) -> OperationDetail: operation = task_spec.operations[operation_id] simulated_after = after_tables if simulated_after is None: simulated_after = apply_operation_to_tables(task_spec, before_tables, operation_id) preview: list[RowChange] = [] for table_name in operation.tables_affected: primary_key = task_spec.primary_keys[table_name] before_rows = {normalize_whitespace(row.get(primary_key, "")): dict(row) for row in before_tables.get(table_name, [])} after_rows = {normalize_whitespace(row.get(primary_key, "")): dict(row) for row in simulated_after.get(table_name, [])} changed_keys = sorted(set(before_rows) | set(after_rows)) for row_key in changed_keys: if before_rows.get(row_key) == after_rows.get(row_key): continue preview.append(RowChange(primary_key_value=row_key, before=before_rows.get(row_key), after=after_rows.get(row_key))) if len(preview) >= 12: break if len(preview) >= 12: break return OperationDetail( operation_id=operation.operation_id, title=operation.title, category=operation.category, risk=operation.risk, tables_affected=list(operation.tables_affected), description=operation.description, already_applied=operation.operation_id in self._state.applied_operation_ids, why_it_matters=operation.why_it_matters, change_preview=preview, )