| """OpenEnv server-side environment for operational data cleaning tasks.""" |
|
|
| from __future__ import annotations |
|
|
| import copy |
| import random |
| from uuid import uuid4 |
|
|
| from openenv.core.env_server.interfaces import Environment |
| from openenv.core.env_server.types import EnvironmentMetadata |
|
|
| from cleanops_env.graders import build_table_summary, count_duplicate_groups, grade_tables |
| from cleanops_env.models import ( |
| ActionCostEntry, |
| DataCleaningAction, |
| DataCleaningObservation, |
| DataCleaningState, |
| DownstreamHealth, |
| DryRunFinding, |
| DryRunReport, |
| OperationDetail, |
| OperationSummary, |
| PendingReview, |
| ReviewResolution, |
| ReviewTarget, |
| RiskCard, |
| RewardBreakdown, |
| RowChange, |
| TableView, |
| ) |
| from cleanops_env.tasks import ( |
| ReviewCaseSpec, |
| TaskSpec, |
| apply_operation_to_tables, |
| clone_tables, |
| first_table_name, |
| get_task_spec, |
| list_task_ids, |
| normalize_whitespace, |
| sorted_rows, |
| ) |
|
|
| ACTION_COSTS: dict[str, float] = { |
| "inspect_table": 0.005, |
| "inspect_operation": 0.005, |
| "apply_operation:safe": 0.01, |
| "apply_operation:review": 0.015, |
| "apply_operation:destructive": 0.03, |
| "request_review": 0.025, |
| "run_sync_dry_run": 0.02, |
| "submit": 0.005, |
| } |
|
|
| ACTION_COST_DESCRIPTIONS: dict[str, str] = { |
| "inspect_table": "Low-cost inspection to understand current records.", |
| "inspect_operation": "Low-cost preview to inspect an operation before applying it.", |
| "apply_operation:safe": "Safe automated cleanup with low operational risk.", |
| "apply_operation:review": "Review-sensitive cleanup that should be used more deliberately.", |
| "apply_operation:destructive": "Destructive cleanup with higher business risk if applied incorrectly.", |
| "request_review": "Consumes limited human-review budget to resolve ambiguity safely.", |
| "run_sync_dry_run": "Runs a deterministic downstream system simulation before submit.", |
| "submit": "Low-cost finalization step after cleanup is complete.", |
| } |
|
|
|
|
| class CleanOpsEnvironment(Environment[DataCleaningAction, DataCleaningObservation, DataCleaningState]): |
| """A realistic data-cleaning workflow environment with deterministic graders.""" |
|
|
| SUPPORTS_CONCURRENT_SESSIONS = True |
|
|
| def __init__(self) -> None: |
| super().__init__() |
| self._task_order = list_task_ids() |
| self._task_spec = get_task_spec(self._task_order[0]) |
| self._grade = grade_tables(self._task_spec, self._task_spec.dirty_tables) |
| self._focus_table_name = first_table_name(self._task_spec) |
| self._focus_operation_detail: OperationDetail | None = None |
| self._done = False |
| self._initial_issue_count = max(1, len(self._grade.validation_issues)) |
| initial_tables = clone_tables(self._task_spec.dirty_tables) |
| initial_downstream_health = self._compute_downstream_health(self._task_spec, initial_tables, self._grade.validation_issues) |
| self._state = DataCleaningState( |
| episode_id=str(uuid4()), |
| step_count=0, |
| task_id=self._task_spec.task_id, |
| task_title=self._task_spec.title, |
| difficulty=self._task_spec.difficulty, |
| requested_seed=None, |
| max_steps=self._task_spec.max_steps, |
| review_budget_total=self._task_spec.review_budget, |
| review_budget_remaining=self._task_spec.review_budget, |
| submitted=False, |
| current_score=self._grade.score, |
| best_score=self._grade.score, |
| outstanding_issue_count=len(self._grade.validation_issues), |
| downstream_health=initial_downstream_health, |
| last_dry_run=None, |
| tables=initial_tables, |
| applied_operation_ids=[], |
| inspected_tables=[self._focus_table_name], |
| inspected_operations=[], |
| requested_review_ids=[], |
| pending_reviews=[], |
| resolved_reviews=[], |
| dry_run_targets=[], |
| recent_history=[], |
| ) |
|
|
| def reset( |
| self, |
| seed: int | None = None, |
| episode_id: str | None = None, |
| task_id: str | None = None, |
| **kwargs: object, |
| ) -> DataCleaningObservation: |
| del kwargs |
| selected_task_id = task_id or self._task_order[0] |
| self._task_spec = get_task_spec(selected_task_id) |
| normalized_seed = seed if seed is None else max(0, int(seed)) |
| self._focus_table_name = self._choose_initial_focus_table(self._task_spec, normalized_seed) |
| self._focus_operation_detail = None |
| self._done = False |
| self._grade = grade_tables(self._task_spec, self._task_spec.dirty_tables) |
| self._initial_issue_count = max(1, len(self._grade.validation_issues)) |
| initial_tables = clone_tables(self._task_spec.dirty_tables) |
| initial_downstream_health = self._compute_downstream_health(self._task_spec, initial_tables, self._grade.validation_issues) |
| self._state = DataCleaningState( |
| episode_id=episode_id or str(uuid4()), |
| step_count=0, |
| task_id=self._task_spec.task_id, |
| task_title=self._task_spec.title, |
| difficulty=self._task_spec.difficulty, |
| requested_seed=normalized_seed, |
| max_steps=self._task_spec.max_steps, |
| review_budget_total=self._task_spec.review_budget, |
| review_budget_remaining=self._task_spec.review_budget, |
| submitted=False, |
| current_score=self._grade.score, |
| best_score=self._grade.score, |
| outstanding_issue_count=len(self._grade.validation_issues), |
| downstream_health=initial_downstream_health, |
| last_dry_run=None, |
| tables=initial_tables, |
| applied_operation_ids=[], |
| inspected_tables=[self._focus_table_name], |
| inspected_operations=[], |
| requested_review_ids=[], |
| pending_reviews=[], |
| resolved_reviews=[], |
| dry_run_targets=[], |
| recent_history=[f"reset -> loaded task {self._task_spec.task_id} ({self._task_spec.difficulty}) seed={normalized_seed}"], |
| ) |
| return self._build_observation( |
| reward_breakdown=RewardBreakdown(total=0.0), |
| reward=0.0, |
| done=False, |
| last_action_status=f"Environment reset to task {self._task_spec.task_id}.", |
| last_action_error=None, |
| ) |
|
|
| def step( |
| self, |
| action: DataCleaningAction, |
| timeout_s: float | None = None, |
| **kwargs: object, |
| ) -> DataCleaningObservation: |
| del timeout_s, kwargs |
| if self._done: |
| penalty = RewardBreakdown(invalid_action_penalty=-0.25, total=-0.25) |
| return self._build_observation( |
| reward_breakdown=penalty, |
| reward=penalty.total, |
| done=True, |
| last_action_status="Episode already finished. Call reset() to start a new task.", |
| last_action_error="Episode already finished. Call reset() to start a new task.", |
| ) |
|
|
| self._state.step_count += 1 |
| previous_score = self._state.current_score |
| previous_issue_count = self._state.outstanding_issue_count |
| previous_downstream_score = self._state.downstream_health.overall_health_score |
|
|
| invalid_action_penalty = 0.0 |
| noop_penalty = 0.0 |
| insight_bonus = 0.0 |
| review_bonus = 0.0 |
| review_cost_penalty = 0.0 |
| action_cost_penalty = 0.0 |
| submit_bonus = 0.0 |
| status_message = "" |
| action_error: str | None = None |
| released_reviews = self._release_ready_reviews() |
| if released_reviews: |
| review_bonus = round(0.04 * len(released_reviews), 4) |
|
|
| if action.action_type == "inspect_table": |
| table_name = normalize_whitespace(action.table_name or "") |
| if table_name not in self._state.tables: |
| invalid_action_penalty = -0.25 |
| status_message = f"Unknown table '{table_name}'." |
| action_error = status_message |
| else: |
| self._focus_table_name = table_name |
| if table_name not in self._state.inspected_tables: |
| self._state.inspected_tables.append(table_name) |
| insight_bonus = 0.01 |
| status_message = f"Inspected table '{table_name}'." |
| else: |
| noop_penalty = -0.02 |
| status_message = f"Table '{table_name}' was already inspected." |
| elif action.action_type == "inspect_operation": |
| operation_id = normalize_whitespace(action.operation_id or "") |
| if operation_id not in self._task_spec.operations: |
| invalid_action_penalty = -0.25 |
| status_message = f"Unknown operation '{operation_id}'." |
| action_error = status_message |
| else: |
| self._focus_operation_detail = self._build_operation_detail(self._task_spec, operation_id, self._state.tables, None) |
| if operation_id not in self._state.inspected_operations: |
| self._state.inspected_operations.append(operation_id) |
| insight_bonus = 0.01 |
| status_message = f"Inspected operation '{operation_id}'." |
| else: |
| noop_penalty = -0.02 |
| status_message = f"Operation '{operation_id}' was already inspected." |
| elif action.action_type == "apply_operation": |
| operation_id = normalize_whitespace(action.operation_id or "") |
| if operation_id not in self._task_spec.operations: |
| invalid_action_penalty = -0.25 |
| status_message = f"Unknown operation '{operation_id}'." |
| action_error = status_message |
| elif operation_id in self._state.applied_operation_ids: |
| noop_penalty = -0.12 |
| self._focus_operation_detail = self._build_operation_detail(self._task_spec, operation_id, self._state.tables, self._state.tables) |
| status_message = f"Operation '{operation_id}' was already applied." |
| else: |
| before_tables = clone_tables(self._state.tables) |
| after_tables = apply_operation_to_tables(self._task_spec, before_tables, operation_id) |
| self._focus_operation_detail = self._build_operation_detail(self._task_spec, operation_id, before_tables, after_tables) |
| if after_tables == before_tables: |
| noop_penalty = -0.08 |
| status_message = f"Operation '{operation_id}' produced no table changes." |
| else: |
| self._state.tables = clone_tables(after_tables) |
| self._state.applied_operation_ids.append(operation_id) |
| affected_tables = ", ".join(self._task_spec.operations[operation_id].tables_affected) |
| if self._task_spec.operations[operation_id].tables_affected: |
| self._focus_table_name = self._task_spec.operations[operation_id].tables_affected[0] |
| status_message = f"Applied '{operation_id}' to {affected_tables or 'current tables'}." |
| elif action.action_type == "request_review": |
| entity_type = normalize_whitespace(action.entity_type or "").lower() |
| entity_id = normalize_whitespace(action.entity_id or "") |
| reason_code = normalize_whitespace(action.reason_code or "") |
| review_case = self._find_review_case(entity_type, entity_id, reason_code) |
| if not entity_type or not entity_id or not reason_code: |
| invalid_action_penalty = -0.25 |
| status_message = "request_review requires entity_type, entity_id, and reason_code." |
| action_error = status_message |
| elif review_case is None: |
| invalid_action_penalty = -0.2 |
| status_message = f"No deterministic review case exists for {entity_type}:{entity_id} ({reason_code})." |
| action_error = status_message |
| elif review_case.review_id in self._state.requested_review_ids: |
| noop_penalty = -0.05 |
| status_message = f"Review '{review_case.review_id}' was already requested." |
| elif self._state.review_budget_remaining <= 0: |
| invalid_action_penalty = -0.18 |
| status_message = "No review budget remaining for this episode." |
| action_error = status_message |
| else: |
| self._state.review_budget_remaining -= 1 |
| self._state.requested_review_ids.append(review_case.review_id) |
| self._state.pending_reviews.append( |
| PendingReview( |
| review_id=review_case.review_id, |
| entity_type=review_case.entity_type, |
| entity_id=review_case.entity_id, |
| reason_code=review_case.reason_code, |
| title=review_case.title, |
| requested_at_step=self._state.step_count, |
| ready_at_step=self._state.step_count + 1, |
| ) |
| ) |
| review_cost_penalty = -0.02 |
| status_message = ( |
| f"Queued review '{review_case.review_id}' for {review_case.entity_type} {review_case.entity_id}; " |
| "response will be available on the next step." |
| ) |
| elif action.action_type == "run_sync_dry_run": |
| target_system = action.target_system |
| if target_system is None: |
| invalid_action_penalty = -0.2 |
| status_message = "run_sync_dry_run requires target_system." |
| action_error = status_message |
| elif target_system not in self._task_spec.sync_targets: |
| invalid_action_penalty = -0.2 |
| status_message = f"Task '{self._task_spec.task_id}' does not support dry-run target '{target_system}'." |
| action_error = status_message |
| else: |
| self._state.last_dry_run = self._build_dry_run_report(target_system) |
| if target_system not in self._state.dry_run_targets: |
| self._state.dry_run_targets.append(target_system) |
| insight_bonus = max(insight_bonus, 0.01) |
| else: |
| noop_penalty = min(noop_penalty, -0.01) |
| status_message = self._state.last_dry_run.summary |
| elif action.action_type == "submit": |
| self._state.submitted = True |
| self._done = True |
| status_message = "Submitted cleaned tables for grading." |
|
|
| action_cost_penalty = -self._estimate_action_cost(action) |
|
|
| self._grade = grade_tables(self._task_spec, self._state.tables) |
| self._state.current_score = self._grade.score |
| self._state.best_score = max(self._state.best_score, self._grade.score) |
| self._state.outstanding_issue_count = len(self._grade.validation_issues) |
| self._state.downstream_health = self._compute_downstream_health(self._task_spec, self._state.tables, self._grade.validation_issues) |
|
|
| quality_delta = round(self._state.current_score - previous_score, 4) |
| issue_delta = round((previous_issue_count - self._state.outstanding_issue_count) / self._initial_issue_count, 4) |
| downstream_health_delta = round(self._state.downstream_health.overall_health_score - previous_downstream_score, 4) |
| efficiency_penalty = -0.01 |
|
|
| if action.action_type == "submit": |
| submission_health = round(0.65 * self._state.current_score + 0.35 * self._state.downstream_health.overall_health_score, 4) |
| submit_bonus = round(0.4 * submission_health, 4) if submission_health >= 0.82 else round(-0.2 * (1.0 - submission_health), 4) |
|
|
| if self._state.step_count >= self._state.max_steps and not self._done: |
| self._done = True |
| self._state.submitted = False |
| status_message = f"{status_message} Step budget exhausted; episode truncated.".strip() |
|
|
| if released_reviews: |
| release_note = ", ".join(review.review_id for review in released_reviews) |
| status_message = f"{status_message} Review response available: {release_note}.".strip() |
|
|
| reward_total = round( |
| 1.0 * quality_delta |
| + 0.35 * issue_delta |
| + 0.55 * downstream_health_delta |
| + insight_bonus |
| + review_bonus |
| + efficiency_penalty |
| + invalid_action_penalty |
| + noop_penalty |
| + review_cost_penalty |
| + action_cost_penalty |
| + submit_bonus, |
| 4, |
| ) |
| reward_breakdown = RewardBreakdown( |
| quality_delta=quality_delta, |
| issue_delta=issue_delta, |
| downstream_health_delta=downstream_health_delta, |
| insight_bonus=insight_bonus, |
| review_bonus=review_bonus, |
| efficiency_penalty=efficiency_penalty, |
| invalid_action_penalty=invalid_action_penalty, |
| noop_penalty=noop_penalty, |
| review_cost_penalty=review_cost_penalty, |
| action_cost_penalty=action_cost_penalty, |
| submit_bonus=submit_bonus, |
| total=reward_total, |
| ) |
|
|
| action_descriptor = action.action_type |
| if action.operation_id: |
| action_descriptor += f"[{action.operation_id}]" |
| if action.table_name: |
| action_descriptor += f"[{action.table_name}]" |
| if action.entity_id: |
| action_descriptor += f"[{action.entity_id}]" |
| if action.target_system: |
| action_descriptor += f"[{action.target_system}]" |
| self._state.recent_history.append(f"step {self._state.step_count}: {action_descriptor} -> score={self._state.current_score:.4f}") |
| self._state.recent_history = self._state.recent_history[-10:] |
|
|
| return self._build_observation( |
| reward_breakdown=reward_breakdown, |
| reward=reward_total, |
| done=self._done, |
| last_action_status=status_message or "Action processed.", |
| last_action_error=action_error, |
| ) |
|
|
| @property |
| def state(self) -> DataCleaningState: |
| return self._state |
|
|
| def get_metadata(self) -> EnvironmentMetadata: |
| return EnvironmentMetadata( |
| name="CleanOpsEnvironment", |
| description="A realistic OpenEnv benchmark where an agent cleans operational customer, order, subscription, and payment tables using a curated data-cleaning toolkit.", |
| version="0.1.0", |
| author="OpenEnv CleanOps", |
| ) |
|
|
| def _build_observation( |
| self, |
| *, |
| reward_breakdown: RewardBreakdown, |
| reward: float, |
| done: bool, |
| last_action_status: str, |
| last_action_error: str | None, |
| ) -> DataCleaningObservation: |
| summaries = [build_table_summary(self._task_spec, table_name, self._state.tables) for table_name in self._task_spec.dirty_tables] |
| focus_table = self._build_table_view(self._task_spec, self._focus_table_name) |
| available_operations = [ |
| OperationSummary( |
| operation_id=operation.operation_id, |
| title=operation.title, |
| category=operation.category, |
| risk=operation.risk, |
| tables_affected=list(operation.tables_affected), |
| description=operation.description, |
| already_applied=operation.operation_id in self._state.applied_operation_ids, |
| ) |
| for operation in sorted(self._task_spec.operations.values(), key=lambda op: op.operation_id) |
| ] |
| available_review_targets = [ |
| ReviewTarget( |
| review_id=review_case.review_id, |
| entity_type=review_case.entity_type, |
| entity_id=review_case.entity_id, |
| reason_code=review_case.reason_code, |
| title=review_case.title, |
| detail=review_case.detail, |
| recommended_operation_ids=list(review_case.recommended_operation_ids), |
| ) |
| for review_case in sorted(self._task_spec.review_cases.values(), key=lambda case: case.review_id) |
| ] |
| return DataCleaningObservation( |
| task_id=self._task_spec.task_id, |
| task_title=self._task_spec.title, |
| difficulty=self._task_spec.difficulty, |
| requested_seed=self._state.requested_seed, |
| objective=self._task_spec.objective, |
| dataset_context=self._task_spec.dataset_context, |
| quality_score=self._state.current_score, |
| best_score=self._state.best_score, |
| remaining_steps=max(0, self._state.max_steps - self._state.step_count), |
| review_budget_remaining=self._state.review_budget_remaining, |
| supported_sync_targets=list(self._task_spec.sync_targets), |
| downstream_health=self._state.downstream_health, |
| risk_cards=self._build_risk_cards(), |
| last_dry_run=self._state.last_dry_run, |
| action_costs=self._build_action_cost_entries(), |
| table_summaries=summaries, |
| focus_table=focus_table, |
| available_operations=available_operations, |
| available_review_targets=available_review_targets, |
| pending_reviews=list(self._state.pending_reviews), |
| resolved_reviews=list(self._state.resolved_reviews), |
| focus_operation=self._focus_operation_detail, |
| validation_issues=self._grade.validation_issues, |
| issue_cards=list(self._task_spec.issue_cards), |
| recent_history=list(self._state.recent_history), |
| grader=self._grade.breakdown, |
| reward_breakdown=reward_breakdown, |
| last_action_status=last_action_status, |
| last_action_error=last_action_error, |
| reward=reward, |
| done=done, |
| metadata={ |
| "episode_id": self._state.episode_id, |
| "requested_seed": self._state.requested_seed, |
| "applied_operation_ids": list(self._state.applied_operation_ids), |
| "review_budget_remaining": self._state.review_budget_remaining, |
| "requested_review_ids": list(self._state.requested_review_ids), |
| "dry_run_targets": list(self._state.dry_run_targets), |
| "submitted": self._state.submitted, |
| }, |
| ) |
|
|
| def _build_table_view(self, task_spec: TaskSpec, table_name: str) -> TableView: |
| primary_key = task_spec.primary_keys[table_name] |
| rows = self._preview_rows(task_spec, table_name, self._state.tables.get(table_name, [])) |
| columns = sorted({column_name for row in rows for column_name in row}) |
| return TableView(name=table_name, primary_key=primary_key, columns=columns, rows=rows) |
|
|
| def _choose_initial_focus_table(self, task_spec: TaskSpec, seed: int | None) -> str: |
| table_names = sorted(task_spec.dirty_tables) |
| if not table_names: |
| return first_table_name(task_spec) |
| if seed is None: |
| return table_names[0] |
| return table_names[seed % len(table_names)] |
|
|
| def _preview_rows( |
| self, |
| task_spec: TaskSpec, |
| table_name: str, |
| rows: list[dict[str, str]], |
| ) -> list[dict[str, str]]: |
| primary_key = task_spec.primary_keys[table_name] |
| ordered_rows = sorted_rows(rows, primary_key) |
| seed = self._state.requested_seed |
| if seed is None or len(ordered_rows) <= 1: |
| return ordered_rows |
| shuffled_rows = copy.deepcopy(ordered_rows) |
| random.Random(seed + sum(ord(char) for char in table_name)).shuffle(shuffled_rows) |
| return shuffled_rows |
|
|
| def _find_review_case(self, entity_type: str, entity_id: str, reason_code: str) -> ReviewCaseSpec | None: |
| for review_case in self._task_spec.review_cases.values(): |
| if ( |
| review_case.entity_type == entity_type |
| and review_case.entity_id == entity_id |
| and review_case.reason_code == reason_code |
| ): |
| return review_case |
| return None |
|
|
| def _release_ready_reviews(self) -> list[ReviewResolution]: |
| if not self._state.pending_reviews: |
| return [] |
|
|
| still_pending: list[PendingReview] = [] |
| released: list[ReviewResolution] = [] |
| for pending_review in self._state.pending_reviews: |
| if pending_review.ready_at_step > self._state.step_count: |
| still_pending.append(pending_review) |
| continue |
| review_case = self._task_spec.review_cases[pending_review.review_id] |
| released_review = ReviewResolution( |
| review_id=review_case.review_id, |
| entity_type=review_case.entity_type, |
| entity_id=review_case.entity_id, |
| reason_code=review_case.reason_code, |
| title=review_case.title, |
| resolution=review_case.resolution, |
| response_summary=review_case.response_summary, |
| evidence_summary=review_case.evidence_summary, |
| recommended_operation_ids=list(review_case.recommended_operation_ids), |
| ) |
| self._state.resolved_reviews.append(released_review) |
| released.append(released_review) |
| self._state.pending_reviews = still_pending |
| return released |
|
|
| def _estimate_action_cost(self, action: DataCleaningAction) -> float: |
| if action.action_type == "apply_operation": |
| operation = self._task_spec.operations.get(normalize_whitespace(action.operation_id or "")) |
| if operation is None: |
| return ACTION_COSTS["apply_operation:safe"] |
| if operation.risk == "review": |
| return ACTION_COSTS["apply_operation:review"] |
| if operation.risk == "destructive": |
| return ACTION_COSTS["apply_operation:destructive"] |
| return ACTION_COSTS["apply_operation:safe"] |
| return ACTION_COSTS.get(action.action_type, 0.01) |
|
|
| def _build_action_cost_entries(self) -> list[ActionCostEntry]: |
| return [ |
| ActionCostEntry(action_key=action_key, estimated_cost=estimated_cost, description=ACTION_COST_DESCRIPTIONS[action_key]) |
| for action_key, estimated_cost in ACTION_COSTS.items() |
| ] |
|
|
| @staticmethod |
| def _open_metric(value: float) -> float: |
| return round(min(0.99, max(0.01, value)), 4) |
|
|
| def _compute_downstream_health( |
| self, |
| task_spec: TaskSpec, |
| tables: dict[str, list[dict[str, str]]], |
| validation_issues: list, |
| ) -> DownstreamHealth: |
| customers = tables.get("customers", []) |
| orders = tables.get("orders", []) |
| subscriptions = tables.get("subscriptions", []) |
| payments = tables.get("payments", []) |
|
|
| crm_rows = max(1, len(customers) + len(subscriptions)) |
| billing_rows = max(1, len(orders) + len(subscriptions) + len(payments)) |
| payment_rows = max(1, len(orders) + len(payments)) |
|
|
| crm_issue_weight = sum(max(1, len(issue.row_ids)) for issue in validation_issues if issue.table_name in {"customers", "subscriptions"}) |
| billing_issue_weight = sum( |
| max(1, len(issue.row_ids)) |
| for issue in validation_issues |
| if issue.table_name in {"orders", "payments", "subscriptions"} |
| and (issue.code.startswith("foreign_key:") or issue.code.startswith("required:") or issue.code.startswith("unique:")) |
| ) |
| payment_issue_weight = sum( |
| max(1, len(issue.row_ids)) |
| for issue in validation_issues |
| if issue.table_name in {"orders", "payments"} |
| ) |
|
|
| customer_duplicate_groups = count_duplicate_groups(task_spec, "customers", customers) if "customers" in task_spec.duplicate_identity_columns else 0 |
| customer_rows = max(1, len(customers)) |
| payment_duplicate_groups = count_duplicate_groups(task_spec, "payments", payments) if "payments" in task_spec.duplicate_identity_columns else 0 |
|
|
| crm_sync_success_rate = self._open_metric(1.0 - (crm_issue_weight / max(2, crm_rows * 2))) |
| if not orders and not payments: |
| billing_link_integrity = 0.99 |
| revenue_reporting_risk = 0.01 |
| else: |
| billing_link_integrity = self._open_metric(1.0 - (billing_issue_weight / max(2, billing_rows * 2))) |
| revenue_reporting_risk = self._open_metric(min(0.99, (payment_issue_weight / max(2, payment_rows * 2)) + (payment_duplicate_groups / max(1, payment_rows)))) |
|
|
| duplicate_contact_risk = self._open_metric(min(0.99, (customer_duplicate_groups / customer_rows) + 0.06 * sum(1 for issue in validation_issues if issue.code.startswith("unique:customers")))) |
| overall_health_score = self._open_metric( |
| ( |
| crm_sync_success_rate |
| + billing_link_integrity |
| + (1.0 - duplicate_contact_risk) |
| + (1.0 - revenue_reporting_risk) |
| ) |
| / 4.0 |
| ) |
|
|
| return DownstreamHealth( |
| crm_sync_success_rate=crm_sync_success_rate, |
| billing_link_integrity=billing_link_integrity, |
| duplicate_contact_risk=duplicate_contact_risk, |
| revenue_reporting_risk=revenue_reporting_risk, |
| overall_health_score=overall_health_score, |
| ) |
|
|
| def _build_risk_cards(self) -> list[RiskCard]: |
| health = self._state.downstream_health |
| cards = [ |
| RiskCard( |
| title="CRM import risk", |
| detail="Customer and subscription issues can block CRM migration syncs.", |
| severity="high" if health.crm_sync_success_rate < 0.8 else "medium" if health.crm_sync_success_rate < 0.92 else "low", |
| metric_name="crm_sync_success_rate", |
| current_value=health.crm_sync_success_rate, |
| recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_tables({"customers", "subscriptions"})], |
| ), |
| RiskCard( |
| title="Billing linkage risk", |
| detail="Broken foreign keys or missing IDs can mislink orders, subscriptions, and payments.", |
| severity="high" if health.billing_link_integrity < 0.8 else "medium" if health.billing_link_integrity < 0.92 else "low", |
| metric_name="billing_link_integrity", |
| current_value=health.billing_link_integrity, |
| recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_tables({"orders", "subscriptions", "payments"})], |
| ), |
| RiskCard( |
| title="Duplicate contact risk", |
| detail="Remaining duplicate customer identities can create bad merges downstream.", |
| severity="high" if health.duplicate_contact_risk > 0.3 else "medium" if health.duplicate_contact_risk > 0.12 else "low", |
| metric_name="duplicate_contact_risk", |
| current_value=health.duplicate_contact_risk, |
| recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_keyword("merge")], |
| ), |
| RiskCard( |
| title="Revenue reporting risk", |
| detail="Duplicate or mislinked payment and order facts can distort downstream reporting.", |
| severity="high" if health.revenue_reporting_risk > 0.3 else "medium" if health.revenue_reporting_risk > 0.12 else "low", |
| metric_name="revenue_reporting_risk", |
| current_value=health.revenue_reporting_risk, |
| recommended_action_ids=[op_id for op_id in self._recommended_operation_ids_for_tables({"orders", "payments"})], |
| ), |
| ] |
| return cards |
|
|
| def _recommended_operation_ids_for_tables(self, table_names: set[str]) -> list[str]: |
| return [ |
| operation.operation_id |
| for operation in sorted(self._task_spec.operations.values(), key=lambda op: op.operation_id) |
| if set(operation.tables_affected) & table_names |
| ][:4] |
|
|
| def _recommended_operation_ids_for_keyword(self, keyword: str) -> list[str]: |
| lowered = keyword.lower() |
| return [ |
| operation.operation_id |
| for operation in sorted(self._task_spec.operations.values(), key=lambda op: op.operation_id) |
| if lowered in operation.operation_id.lower() or lowered in operation.title.lower() |
| ][:4] |
|
|
| def _build_dry_run_report(self, target_system: str) -> DryRunReport: |
| findings: list[DryRunFinding] = [] |
| for issue in self._grade.validation_issues: |
| if target_system == "crm" and issue.table_name not in {"customers", "subscriptions"}: |
| continue |
| if target_system == "billing" and issue.table_name not in {"orders", "subscriptions", "payments"}: |
| continue |
| findings.append( |
| DryRunFinding( |
| code=issue.code, |
| severity=issue.severity, |
| table_name=issue.table_name, |
| row_ids=list(issue.row_ids), |
| message=issue.message, |
| ) |
| ) |
|
|
| health = self._state.downstream_health |
| success_rate = health.crm_sync_success_rate if target_system == "crm" else health.billing_link_integrity |
|
|
| if target_system == "crm" and health.duplicate_contact_risk > 0.12: |
| findings.append( |
| DryRunFinding( |
| code="risk:duplicate_contacts", |
| severity="medium" if health.duplicate_contact_risk <= 0.3 else "high", |
| table_name="customers", |
| message="CRM dry run predicts duplicate-contact collisions after import.", |
| ) |
| ) |
| if target_system == "billing" and health.revenue_reporting_risk > 0.12: |
| findings.append( |
| DryRunFinding( |
| code="risk:revenue_reporting", |
| severity="medium" if health.revenue_reporting_risk <= 0.3 else "high", |
| table_name="payments" if "payments" in self._state.tables else "orders", |
| message="Billing dry run predicts mislinked or duplicated revenue facts.", |
| ) |
| ) |
|
|
| summary = ( |
| f"Dry run for {target_system.upper()} found {len(findings)} blocking or risky findings; " |
| f"estimated success rate is {success_rate:.2f}." |
| ) |
| return DryRunReport( |
| target_system=target_system, |
| success_rate=success_rate, |
| finding_count=len(findings), |
| findings=findings, |
| summary=summary, |
| generated_at_step=self._state.step_count, |
| ) |
|
|
| def _build_operation_detail( |
| self, |
| task_spec: TaskSpec, |
| operation_id: str, |
| before_tables: dict[str, list[dict[str, str]]], |
| after_tables: dict[str, list[dict[str, str]]] | None, |
| ) -> OperationDetail: |
| operation = task_spec.operations[operation_id] |
| simulated_after = after_tables |
| if simulated_after is None: |
| simulated_after = apply_operation_to_tables(task_spec, before_tables, operation_id) |
|
|
| preview: list[RowChange] = [] |
| for table_name in operation.tables_affected: |
| primary_key = task_spec.primary_keys[table_name] |
| before_rows = {normalize_whitespace(row.get(primary_key, "")): dict(row) for row in before_tables.get(table_name, [])} |
| after_rows = {normalize_whitespace(row.get(primary_key, "")): dict(row) for row in simulated_after.get(table_name, [])} |
| changed_keys = sorted(set(before_rows) | set(after_rows)) |
| for row_key in changed_keys: |
| if before_rows.get(row_key) == after_rows.get(row_key): |
| continue |
| preview.append(RowChange(primary_key_value=row_key, before=before_rows.get(row_key), after=after_rows.get(row_key))) |
| if len(preview) >= 12: |
| break |
| if len(preview) >= 12: |
| break |
|
|
| return OperationDetail( |
| operation_id=operation.operation_id, |
| title=operation.title, |
| category=operation.category, |
| risk=operation.risk, |
| tables_affected=list(operation.tables_affected), |
| description=operation.description, |
| already_applied=operation.operation_id in self._state.applied_operation_ids, |
| why_it_matters=operation.why_it_matters, |
| change_preview=preview, |
| ) |
|
|