File size: 18,363 Bytes
ac326a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c2c5f2
ac326a6
 
 
 
7c2c5f2
 
 
ac326a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c2c5f2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac326a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c2c5f2
ac326a6
 
7c2c5f2
 
 
 
ac326a6
 
 
 
 
 
 
 
 
b7e4141
ac326a6
 
 
 
 
7c2c5f2
 
 
 
 
 
ac326a6
 
 
7c2c5f2
 
 
ac326a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b7e4141
ac326a6
7c2c5f2
 
ac326a6
 
 
 
7c2c5f2
 
ac326a6
 
 
 
7c2c5f2
 
 
 
ac326a6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
"""Typed models for the CleanOps environment."""

from __future__ import annotations

from typing import Literal

from pydantic import BaseModel, Field

from openenv.core.env_server.types import Action, Observation, State


class RewardBreakdown(BaseModel):
    """Explains how a scalar reward was produced."""

    quality_delta: float = Field(default=0.0, description="Change in overall grader score after the action.")
    issue_delta: float = Field(default=0.0, description="Normalized change in outstanding validation issues.")
    downstream_health_delta: float = Field(default=0.0, description="Change in downstream operational health after the action.")
    insight_bonus: float = Field(default=0.0, description="Small positive reward for inspecting new assets.")
    efficiency_penalty: float = Field(default=0.0, description="Per-step penalty to discourage long episodes.")
    invalid_action_penalty: float = Field(default=0.0, description="Penalty for malformed or unsupported actions.")
    noop_penalty: float = Field(default=0.0, description="Penalty for no-op or repeated actions.")
    review_bonus: float = Field(default=0.0, description="Positive reward when a queued review response becomes available.")
    review_cost_penalty: float = Field(default=0.0, description="Small cost for consuming limited human-review budget.")
    action_cost_penalty: float = Field(default=0.0, description="Cost-aware penalty attached to the chosen action.")
    submit_bonus: float = Field(default=0.0, description="End-of-episode bonus based on final score.")
    total: float = Field(default=0.0, description="Final scalar reward returned.")


class ValidationIssue(BaseModel):
    """A concrete validation problem the agent should resolve."""

    code: str = Field(..., description="Stable machine-readable issue code.")
    severity: Literal["low", "medium", "high"] = Field(..., description="Issue severity.")
    table_name: str = Field(..., description="Table containing the issue.")
    column_name: str | None = Field(default=None, description="Column containing the issue, if applicable.")
    row_ids: list[str] = Field(default_factory=list, description="Affected primary-key values.")
    message: str = Field(..., description="Human-readable issue summary.")


class IssueCard(BaseModel):
    """Aggregated issue summary paired with likely remediation operations."""

    title: str = Field(..., description="Short issue title.")
    detail: str = Field(..., description="Why the issue matters in this task.")
    issue_codes: list[str] = Field(default_factory=list, description="Validation codes represented by this card.")
    recommended_operation_ids: list[str] = Field(default_factory=list, description="Operations likely to address the issue.")


class ReviewTarget(BaseModel):
    """A reviewable entity that can be escalated to a human reviewer."""

    review_id: str = Field(..., description="Stable review case identifier.")
    entity_type: str = Field(..., description="Type of entity under review.")
    entity_id: str = Field(..., description="Primary identifier for the reviewed entity.")
    reason_code: str = Field(..., description="Why the review would be requested.")
    title: str = Field(..., description="Short human-readable review title.")
    detail: str = Field(..., description="Why this review matters.")
    recommended_operation_ids: list[str] = Field(default_factory=list, description="Operations likely to be safe once review resolves.")


class PendingReview(BaseModel):
    """A queued review request awaiting a deterministic response."""

    review_id: str = Field(..., description="Stable review case identifier.")
    entity_type: str = Field(..., description="Type of entity under review.")
    entity_id: str = Field(..., description="Primary identifier for the reviewed entity.")
    reason_code: str = Field(..., description="Why the review was requested.")
    title: str = Field(..., description="Short human-readable review title.")
    requested_at_step: int = Field(..., description="Step index when the review was requested.")
    ready_at_step: int = Field(..., description="First step on which the deterministic response becomes available.")


class ReviewResolution(BaseModel):
    """A resolved human-review response surfaced back to the agent."""

    review_id: str = Field(..., description="Stable review case identifier.")
    entity_type: str = Field(..., description="Type of entity under review.")
    entity_id: str = Field(..., description="Primary identifier for the reviewed entity.")
    reason_code: str = Field(..., description="Why the review was requested.")
    title: str = Field(..., description="Short human-readable review title.")
    resolution: str = Field(..., description="Deterministic review outcome label.")
    response_summary: str = Field(..., description="What the reviewer concluded.")
    evidence_summary: str = Field(..., description="Short explanation for the decision.")
    recommended_operation_ids: list[str] = Field(default_factory=list, description="Operations that become safer after the review response.")


class DryRunFinding(BaseModel):
    """A deterministic downstream issue surfaced by a dry-run sync."""

    code: str = Field(..., description="Stable machine-readable issue code.")
    severity: Literal["low", "medium", "high"] = Field(..., description="Issue severity.")
    table_name: str | None = Field(default=None, description="Table implicated by the dry-run finding.")
    row_ids: list[str] = Field(default_factory=list, description="Primary-key values implicated by the finding.")
    message: str = Field(..., description="Human-readable dry-run explanation.")


class DryRunReport(BaseModel):
    """A dry-run simulation result for a downstream business system."""

    target_system: Literal["crm", "billing"] = Field(..., description="Which downstream system was tested.")
    success_rate: float = Field(default=0.0, description="Deterministic estimate of how many records would import successfully.")
    finding_count: int = Field(default=0, description="How many concrete blockers or risks were found.")
    findings: list[DryRunFinding] = Field(default_factory=list, description="Structured findings from the simulated sync.")
    summary: str = Field(default="", description="Short narrative summary of the dry-run result.")
    generated_at_step: int = Field(default=0, description="Step on which the report was generated.")


class DownstreamHealth(BaseModel):
    """Operational health estimates for downstream systems."""

    crm_sync_success_rate: float = Field(default=0.0, description="Estimated CRM import success rate.")
    billing_link_integrity: float = Field(default=0.0, description="Estimated correctness of billing/customer linkages.")
    duplicate_contact_risk: float = Field(default=0.0, description="Estimated risk that duplicate contacts still remain.")
    revenue_reporting_risk: float = Field(default=0.0, description="Estimated risk of duplicate or mislinked revenue facts.")
    overall_health_score: float = Field(default=0.0, description="Composite downstream health score used for reward shaping.")


class RiskCard(BaseModel):
    """A compact operational risk summary derived from downstream health."""

    title: str = Field(..., description="Short risk title.")
    detail: str = Field(..., description="Why this risk matters operationally.")
    severity: Literal["low", "medium", "high"] = Field(..., description="Severity for UI and agent prioritization.")
    metric_name: str = Field(..., description="Downstream metric represented by this card.")
    current_value: float = Field(default=0.0, description="Current metric or risk value in [0, 1].")
    recommended_action_ids: list[str] = Field(default_factory=list, description="Operations likely to improve this risk.")


class ActionCostEntry(BaseModel):
    """Estimated operational cost of taking an action."""

    action_key: str = Field(..., description="Stable action or risk key.")
    estimated_cost: float = Field(default=0.0, description="Relative action cost used in reward shaping.")
    description: str = Field(default="", description="Why this action costs reviewer or system capacity.")


class TableSummary(BaseModel):
    """Compact summary of a table."""

    name: str = Field(..., description="Table name.")
    primary_key: str = Field(..., description="Primary key column.")
    row_count: int = Field(..., description="Number of rows in the current table.")
    columns: list[str] = Field(default_factory=list, description="Column names.")
    missing_cells: int = Field(default=0, description="Count of blank required or optional cells.")
    duplicate_groups: int = Field(default=0, description="Count of duplicate identity groups.")
    preview_rows: list[dict[str, str]] = Field(default_factory=list, description="Small row preview for quick inspection.")


class TableView(BaseModel):
    """Full table contents for one focused table."""

    name: str = Field(..., description="Table name.")
    primary_key: str = Field(..., description="Primary key column.")
    columns: list[str] = Field(default_factory=list, description="Column names.")
    rows: list[dict[str, str]] = Field(default_factory=list, description="Current table rows.")


class RowChange(BaseModel):
    """Before/after preview for a changed row."""

    primary_key_value: str = Field(..., description="Changed row identifier.")
    before: dict[str, str] | None = Field(default=None, description="Row values before applying an operation.")
    after: dict[str, str] | None = Field(default=None, description="Row values after applying an operation.")


class OperationSummary(BaseModel):
    """A cleaning operation the agent can choose."""

    operation_id: str = Field(..., description="Stable operation identifier.")
    title: str = Field(..., description="Short action title.")
    category: str = Field(..., description="Operation category.")
    risk: Literal["safe", "review", "destructive"] = Field(..., description="Risk level for the operation.")
    tables_affected: list[str] = Field(default_factory=list, description="Tables changed by the operation.")
    description: str = Field(..., description="What the operation does.")
    already_applied: bool = Field(default=False, description="Whether this operation has already been applied.")


class OperationDetail(OperationSummary):
    """Extra context for one operation."""

    why_it_matters: str = Field(default="", description="Business-oriented explanation of the operation.")
    change_preview: list[RowChange] = Field(default_factory=list, description="Predicted row changes if the operation were applied now.")


class GradeBreakdown(BaseModel):
    """Deterministic grader components."""

    cell_match_score: float = Field(default=0.0, description="Fraction of gold cells matched.")
    key_recall_score: float = Field(default=0.0, description="Row identity and deduplication quality.")
    validation_score: float = Field(default=0.0, description="How well the current tables satisfy constraints.")
    final_score: float = Field(default=0.0, description="Weighted final task score.")


class DataCleaningAction(Action):
    """Action model for the environment."""

    action_type: Literal["inspect_table", "inspect_operation", "apply_operation", "request_review", "run_sync_dry_run", "submit"] = Field(..., description="Type of action to perform.")
    table_name: str | None = Field(default=None, description="Table to inspect when action_type=inspect_table.")
    operation_id: str | None = Field(default=None, description="Operation to inspect or apply when action_type is inspect_operation or apply_operation.")
    entity_type: str | None = Field(default=None, description="Entity type to review when action_type=request_review.")
    entity_id: str | None = Field(default=None, description="Entity identifier to review when action_type=request_review.")
    target_system: Literal["crm", "billing"] | None = Field(default=None, description="Downstream system to simulate when action_type=run_sync_dry_run.")
    reason_code: str | None = Field(default=None, description="Reason for escalating a review request.")
    reasoning: str = Field(default="", description="Optional natural-language reasoning for debugging baselines.")


class DataCleaningObservation(Observation):
    """Observation returned after each environment interaction."""

    task_id: str = Field(..., description="Current task identifier.")
    task_title: str = Field(..., description="Human-readable task title.")
    difficulty: Literal["easy", "medium", "hard"] = Field(..., description="Task difficulty.")
    requested_seed: int | None = Field(default=None, description="Seed used when resetting the current episode.")
    objective: str = Field(..., description="Concrete task objective.")
    dataset_context: str = Field(..., description="Why this dataset exists in the real world.")
    quality_score: float = Field(default=0.0, description="Current deterministic grader score.")
    best_score: float = Field(default=0.0, description="Best score seen in the current episode.")
    remaining_steps: int = Field(default=0, description="How many actions remain before truncation.")
    review_budget_remaining: int = Field(default=0, description="How many human-review requests remain in the current episode.")
    supported_sync_targets: list[str] = Field(default_factory=list, description="Downstream systems that can be tested with run_sync_dry_run.")
    downstream_health: DownstreamHealth = Field(default_factory=DownstreamHealth, description="Current operational health estimates for downstream systems.")
    risk_cards: list[RiskCard] = Field(default_factory=list, description="Operational risk summaries derived from downstream health.")
    last_dry_run: DryRunReport | None = Field(default=None, description="Most recent downstream dry-run result, if any.")
    action_costs: list[ActionCostEntry] = Field(default_factory=list, description="Estimated cost of each action family.")
    table_summaries: list[TableSummary] = Field(default_factory=list, description="Compact summaries of all tables.")
    focus_table: TableView | None = Field(default=None, description="Detailed contents for the currently inspected table.")
    available_operations: list[OperationSummary] = Field(default_factory=list, description="Available cleaning actions.")
    available_review_targets: list[ReviewTarget] = Field(default_factory=list, description="Entities that can be escalated for deterministic review.")
    pending_reviews: list[PendingReview] = Field(default_factory=list, description="Review requests that have been queued but not yet resolved.")
    resolved_reviews: list[ReviewResolution] = Field(default_factory=list, description="Resolved review responses available to the agent.")
    focus_operation: OperationDetail | None = Field(default=None, description="Detailed preview for the currently inspected operation.")
    validation_issues: list[ValidationIssue] = Field(default_factory=list, description="Current unresolved validation issues.")
    issue_cards: list[IssueCard] = Field(default_factory=list, description="Aggregated issue cards with suggested next actions.")
    recent_history: list[str] = Field(default_factory=list, description="Recent action log.")
    grader: GradeBreakdown = Field(default_factory=GradeBreakdown, description="Deterministic score components.")
    reward_breakdown: RewardBreakdown = Field(default_factory=RewardBreakdown, description="How the last reward was computed.")
    last_action_status: str = Field(default="", description="Outcome message for the most recent action.")
    last_action_error: str | None = Field(default=None, description="Raw error string for the last action, or null when no error occurred.")


class DataCleaningState(State):
    """Full server-side state for the current episode."""

    task_id: str = Field(..., description="Current task identifier.")
    task_title: str = Field(..., description="Current task title.")
    difficulty: Literal["easy", "medium", "hard"] = Field(..., description="Current task difficulty.")
    requested_seed: int | None = Field(default=None, description="Seed used when resetting the current episode.")
    max_steps: int = Field(..., description="Task step budget.")
    review_budget_total: int = Field(default=0, description="Total number of review requests available in this task.")
    review_budget_remaining: int = Field(default=0, description="Remaining number of review requests available in this task.")
    submitted: bool = Field(default=False, description="Whether submit was called.")
    current_score: float = Field(default=0.0, description="Current deterministic grader score.")
    best_score: float = Field(default=0.0, description="Best score achieved this episode.")
    outstanding_issue_count: int = Field(default=0, description="Number of unresolved validation issues.")
    downstream_health: DownstreamHealth = Field(default_factory=DownstreamHealth, description="Current downstream operational health.")
    last_dry_run: DryRunReport | None = Field(default=None, description="Most recent downstream dry-run result.")
    tables: dict[str, list[dict[str, str]]] = Field(default_factory=dict, description="Current mutable table contents.")
    applied_operation_ids: list[str] = Field(default_factory=list, description="Operations already applied.")
    inspected_tables: list[str] = Field(default_factory=list, description="Tables inspected so far.")
    inspected_operations: list[str] = Field(default_factory=list, description="Operations inspected so far.")
    requested_review_ids: list[str] = Field(default_factory=list, description="Review cases already requested in this episode.")
    pending_reviews: list[PendingReview] = Field(default_factory=list, description="Queued review requests awaiting deterministic responses.")
    resolved_reviews: list[ReviewResolution] = Field(default_factory=list, description="Resolved review responses available to the agent.")
    dry_run_targets: list[str] = Field(default_factory=list, description="Downstream targets that have already been dry-run in this episode.")
    recent_history: list[str] = Field(default_factory=list, description="Recent action log.")