"""Seeded task metadata and deterministic scenario builders for DataOpsEnv.""" from __future__ import annotations import random import re import textwrap from dataclasses import dataclass from datetime import date, timedelta from typing import Any, Iterable TASK_IDS = [ "task_1_easy_anomaly", "task_2_medium_syntax", "task_3_hard_e2e", ] @dataclass(frozen=True) class SQLPolicy: allowed_commands: frozenset[str] required_table: str @dataclass(frozen=True) class TaskMetadata: task_id: str name: str difficulty: str short_description: str benchmark_focus: str allowed_actions: tuple[str, ...] @dataclass(frozen=True) class Task1Scenario: description: str all_rows: tuple[dict[str, Any], ...] expected_rows: tuple[dict[str, Any], ...] corrupted_row_ids: tuple[int, ...] @dataclass(frozen=True) class Task2Scenario: description: str visible_batch: tuple[dict[str, Any], ...] visible_expected: tuple[dict[str, Any], ...] hidden_cases: tuple[tuple[dict[str, Any], ...], ...] hidden_expected: tuple[tuple[dict[str, Any], ...], ...] broken_script: str @dataclass(frozen=True) class Task3Scenario: description: str target_date: str recipient: str subject: str report_title: str all_rows: tuple[dict[str, Any], ...] expected_rows: tuple[dict[str, Any], ...] broken_script: str @dataclass(frozen=True) class TaskScenarioBundle: task_id: str seed: int description: str task_1: Task1Scenario | None = None task_2: Task2Scenario | None = None task_3: Task3Scenario | None = None TASK_METADATA = { "task_1_easy_anomaly": TaskMetadata( task_id="task_1_easy_anomaly", name="Delete Corrupted Transaction Rows", difficulty="easy", short_description=( "Inspect a transaction table and remove only the seeded rows with NULL amounts while preserving legitimate non-null edge values." ), benchmark_focus="Careful data cleanup without collateral damage.", allowed_actions=("ExecuteSQL",), ), "task_2_medium_syntax": TaskMetadata( task_id="task_2_medium_syntax", name="Repair Seeded Pipeline Script", difficulty="medium", short_description=( "Repair a seeded ETL normalization script and verify it on visible and hidden seeded batches." ), benchmark_focus="Code reading, precise repair, and generalization beyond the demo batch.", allowed_actions=("ReadFile", "WriteFile", "RunScript"), ), "task_3_hard_e2e": TaskMetadata( task_id="task_3_hard_e2e", name="Resolve Revenue Reporting Incident", difficulty="hard", short_description=( "Extract a seeded reporting slice, repair the formatter, and send the exact generated report." ), benchmark_focus="End-to-end data extraction, file repair, and communication with provenance.", allowed_actions=("ExecuteSQL", "ReadFile", "WriteFile", "RunScript", "SendEmail"), ), } TASK_DESCRIPTIONS = { task_id: metadata.short_description for task_id, metadata in TASK_METADATA.items() } TASK_ALLOWED_WRITE_FILES = { "task_1_easy_anomaly": frozenset(), "task_2_medium_syntax": frozenset({"broken_pipeline.py"}), "task_3_hard_e2e": frozenset({"format_report.py", "report_data.json"}), } TASK_ALLOWED_RUN_FILES = { "task_1_easy_anomaly": frozenset(), "task_2_medium_syntax": frozenset({"broken_pipeline.py"}), "task_3_hard_e2e": frozenset({"format_report.py"}), } TASK_EMAIL_ENABLED = frozenset({"task_3_hard_e2e"}) TASK_ALLOWED_READ_FILES = { "task_1_easy_anomaly": frozenset(), "task_2_medium_syntax": frozenset({"broken_pipeline.py"}), "task_3_hard_e2e": frozenset({"format_report.py", "report_data.json"}), } TASK_SQL_POLICIES = { "task_1_easy_anomaly": SQLPolicy( allowed_commands=frozenset({"SELECT", "DELETE"}), required_table="transactions", ), "task_3_hard_e2e": SQLPolicy( allowed_commands=frozenset({"SELECT", "WITH"}), required_table="daily_reports", ), } _REPORT_RECORD_RE = re.compile( r"Department:\s*(?P[^\n]+)\n" r"\s*Revenue:\s*\$(?P-?\d+(?:\.\d+)?)\n" r"\s*Expenses:\s*\$(?P-?\d+(?:\.\d+)?)\n" r"\s*Net:\s*\$(?P-?\d+(?:\.\d+)?)", re.MULTILINE, ) _REPORT_TOTAL_RE = re.compile(r"Total Revenue:\s*\$(?P-?\d+(?:\.\d+)?)") _TASK_1_VALID_STATUSES = ("success", "settled", "approved", "completed") _TASK_1_CORRUPTED_STATUSES = ("pending", "retrying", "failed", "queued") _TASK_2_READY_STATUS = "ready" _TASK_2_NON_READY_STATUSES = ("queued", "hold", "failed") _TASK_2_REGIONS = ("us-east", "eu-west", "ap-south", "sa-east") _TASK_3_RECIPIENTS = ( "bhavik@example.com", "marta@example.com", "ops-lead@example.com", "finance-review@example.com", ) _TASK_3_DEPARTMENTS = ( "Engineering", "Sales", "Marketing", "Operations", "Support", "Finance", ) def task_manifest_entries() -> list[dict[str, Any]]: return [ { "id": metadata.task_id, "name": metadata.name, "difficulty": metadata.difficulty, "description": metadata.short_description, "benchmark_focus": metadata.benchmark_focus, "allowed_actions": list(metadata.allowed_actions), } for metadata in TASK_METADATA.values() ] def build_task_scenario(task_id: str, seed: int | None = None) -> TaskScenarioBundle: resolved_seed = 0 if seed is None else int(seed) if task_id == "task_1_easy_anomaly": task = _build_task_1_scenario(resolved_seed) return TaskScenarioBundle( task_id=task_id, seed=resolved_seed, description=task.description, task_1=task, ) if task_id == "task_2_medium_syntax": task = _build_task_2_scenario(resolved_seed) return TaskScenarioBundle( task_id=task_id, seed=resolved_seed, description=task.description, task_2=task, ) if task_id == "task_3_hard_e2e": task = _build_task_3_scenario(resolved_seed) return TaskScenarioBundle( task_id=task_id, seed=resolved_seed, description=task.description, task_3=task, ) raise KeyError(f"Unknown task_id: {task_id}") def normalize_task_3_rows( rows: Iterable[dict[str, Any]], *, require_headcount: bool = False ) -> list[dict[str, Any]]: """Normalise extracted rows for deterministic comparison.""" normalised: list[dict[str, Any]] = [] for row in rows: try: hc_raw = row.get("headcount") if hc_raw is None or hc_raw == "": if require_headcount: return [] headcount: int | None = None else: headcount = int(hc_raw) normalised.append( { "department": str(row["department"]), "revenue": round(float(row["revenue"]), 2), "expenses": round(float(row["expenses"]), 2), "headcount": headcount, } ) except (KeyError, TypeError, ValueError): return [] return sorted(normalised, key=lambda item: item["department"]) def normalize_task_2_output_rows(rows: Any) -> list[dict[str, Any]]: """Normalise Task 2 ETL output rows while preserving list order for sort checks.""" if not isinstance(rows, list): return [] normalised: list[dict[str, Any]] = [] for row in rows: if not isinstance(row, dict): return [] try: order_id = str(row["order_id"]) region = str(row["region"]) amount_usd = round(float(row["amount_usd"]), 2) priority_band = str(row["priority_band"]) except (KeyError, TypeError, ValueError): return [] if priority_band not in {"high", "normal"}: return [] normalised.append( { "order_id": order_id, "region": region, "amount_usd": amount_usd, "priority_band": priority_band, } ) return normalised def build_task_2_expected( batch: Iterable[dict[str, Any]] ) -> list[dict[str, Any]]: processed: list[dict[str, Any]] = [] for record in batch: try: status = str(record["status"]) amount_cents = int(record["amount_cents"]) priority = int(record["priority"]) amount_usd = round(amount_cents / 100.0, 2) if status != _TASK_2_READY_STATUS or amount_cents <= 0: continue processed.append( { "order_id": str(record["order_id"]), "region": str(record["region"]), "amount_usd": amount_usd, "priority_band": "high" if priority >= 8 or amount_usd >= 500.0 else "normal", } ) except (KeyError, TypeError, ValueError): return [] processed.sort(key=lambda item: (-item["amount_usd"], item["order_id"])) return processed def task_3_data_matches_expected( rows: list[dict[str, Any]], expected_rows: Iterable[dict[str, Any]], *, require_headcount: bool, ) -> bool: expected = normalize_task_3_rows(expected_rows, require_headcount=require_headcount) return rows == expected def task_3_headcount_fully_matches( rows: list[dict[str, Any]], expected_rows: Iterable[dict[str, Any]] ) -> bool: expected = normalize_task_3_rows(expected_rows, require_headcount=True) return rows == expected def build_task_3_report(rows: Iterable[dict[str, Any]], target_date: str) -> str: report_rows = normalize_task_3_rows(rows, require_headcount=True) lines = [f"=== Daily Revenue Report ({target_date}) ===", ""] total_revenue = 0.0 for row in report_rows: revenue = float(row["revenue"]) expenses = float(row["expenses"]) net = revenue - expenses lines.append(f"Department: {row['department']}") lines.append(f" Revenue: ${revenue:.2f}") lines.append(f" Expenses: ${expenses:.2f}") lines.append(f" Net: ${net:.2f}") lines.append("") total_revenue += revenue lines.append(f"Total Revenue: ${total_revenue:.2f}") lines.append("=== End of Report ===") return "\n".join(lines) def extract_task_3_report_block(text: str, target_date: str) -> str | None: raw = text.replace("\r\n", "\n") start_marker = f"=== Daily Revenue Report ({target_date}) ===" start = raw.find(start_marker) end_marker = "=== End of Report ===" end = raw.find(end_marker) if start == -1 or end == -1 or end < start: return None return raw[start : end + len(end_marker)].strip() def parse_task_3_report(text: str, target_date: str) -> dict[str, Any] | None: block = extract_task_3_report_block(text, target_date) if block is None: return None records: list[dict[str, Any]] = [] for match in _REPORT_RECORD_RE.finditer(block): revenue = round(float(match.group("revenue")), 2) expenses = round(float(match.group("expenses")), 2) net = round(float(match.group("net")), 2) records.append( { "department": match.group("department").strip(), "revenue": revenue, "expenses": expenses, "headcount": None, "net": net, } ) total_match = _REPORT_TOTAL_RE.search(block) if not total_match: return None return { "records": sorted(records, key=lambda item: item["department"]), "total_revenue": round(float(total_match.group("total")), 2), } def report_matches_expected( text: str, expected_rows: Iterable[dict[str, Any]], target_date: str ) -> bool: parsed = parse_task_3_report(text, target_date) if parsed is None: return False expected = normalize_task_3_rows(expected_rows, require_headcount=True) expected_records = [ { "department": row["department"], "revenue": row["revenue"], "expenses": row["expenses"], "headcount": None, "net": round(float(row["revenue"]) - float(row["expenses"]), 2), } for row in expected ] expected_total = round(sum(float(row["revenue"]) for row in expected), 2) return ( parsed["records"] == expected_records and parsed["total_revenue"] == expected_total ) def task_3_semantic_match_fraction_rows( rows: list[dict[str, Any]], expected_rows: Iterable[dict[str, Any]] ) -> float: if not rows: return 0.0 expected = normalize_task_3_rows(expected_rows, require_headcount=False) exp_by_dept = {row["department"]: row for row in expected} matched = 0 for row in rows: department = row.get("department") if department not in exp_by_dept: continue expected_row = exp_by_dept[department] if ( row.get("revenue") == expected_row["revenue"] and row.get("expenses") == expected_row["expenses"] ): matched += 1 return matched / len(expected) if expected else 0.0 def task_3_semantic_match_fraction_parsed( parsed: dict[str, Any] | None, expected_rows: Iterable[dict[str, Any]] ) -> float: if not parsed or not parsed.get("records"): return 0.0 expected = normalize_task_3_rows(expected_rows, require_headcount=False) exp_by_dept = {row["department"]: row for row in expected} matched = 0 for record in parsed["records"]: department = record.get("department") if department not in exp_by_dept: continue expected_row = exp_by_dept[department] if ( record.get("revenue") == expected_row["revenue"] and record.get("expenses") == expected_row["expenses"] ): matched += 1 return matched / len(expected) if expected else 0.0 def task_3_semantic_match_fraction_text( text: str, expected_rows: Iterable[dict[str, Any]], target_date: str ) -> float: return task_3_semantic_match_fraction_parsed( parse_task_3_report(text, target_date), expected_rows ) def _build_task_1_scenario(seed: int) -> Task1Scenario: rng = random.Random(f"task-1:{seed}") valid_count = 3 + rng.randrange(3) corrupted_count = 2 + rng.randrange(2) combined_rows: list[dict[str, Any]] = [] valid_templates = [] for index in range(valid_count): valid_templates.append( { "kind": "valid", "user_id": 1000 + seed * 10 + index, "amount": round(rng.uniform(75.0, 975.0), 2), "status": rng.choice(_TASK_1_VALID_STATUSES), } ) if valid_templates: valid_templates[0]["amount"] = 0.0 valid_templates[0]["status"] = "settled" if len(valid_templates) > 1: valid_templates[1]["amount"] = -round(float(valid_templates[1]["amount"]) / 10.0, 2) valid_templates[1]["status"] = "approved" corrupted_templates = [] for index in range(corrupted_count): corrupted_templates.append( { "kind": "corrupted", "user_id": 2000 + seed * 10 + index, "amount": None, "status": rng.choice(_TASK_1_CORRUPTED_STATUSES), } ) templates = valid_templates + corrupted_templates rng.shuffle(templates) expected_rows: list[dict[str, Any]] = [] corrupted_row_ids: list[int] = [] for row_id, template in enumerate(templates, start=1): row = { "id": row_id, "user_id": int(template["user_id"]), "amount": template["amount"], "status": str(template["status"]), } combined_rows.append(row) if template["kind"] == "valid": expected_rows.append(row) else: corrupted_row_ids.append(row_id) description = ( "Find and delete all corrupted records (rows with NULL amounts) from the " f"'transactions' table. This seeded episode contains {corrupted_count} corrupted " f"rows mixed with {valid_count} valid rows. Only NULL amounts are corrupted; " "legitimate zero-value reconciliations and negative refund adjustments may also " "appear and must be preserved exactly." ) return Task1Scenario( description=description, all_rows=tuple(combined_rows), expected_rows=tuple(expected_rows), corrupted_row_ids=tuple(sorted(corrupted_row_ids)), ) def _build_task_2_scenario(seed: int) -> Task2Scenario: rng = random.Random(f"task-2:{seed}") visible_batch = _sample_task_2_batch(rng, batch_index=0) hidden_cases = tuple( _sample_task_2_batch(rng, batch_index=index + 1) for index in range(6) ) visible_expected = tuple(build_task_2_expected(visible_batch)) hidden_expected = tuple( tuple(build_task_2_expected(batch)) for batch in hidden_cases ) description = ( "The script 'broken_pipeline.py' prepares downstream billing candidates from " "seeded order records. Repair it so it keeps only ready records with positive " "amounts, converts cents to USD, flags high priority when priority >= 8 or " "amount_usd >= 500.00, and returns rows sorted by amount_usd descending then " "order_id ascending. The grader checks the visible demo batch and additional " "unseen seeded batches." ) return Task2Scenario( description=description, visible_batch=visible_batch, visible_expected=visible_expected, hidden_cases=hidden_cases, hidden_expected=hidden_expected, broken_script=_render_broken_pipeline_script(visible_batch), ) def _build_task_3_scenario(seed: int) -> Task3Scenario: rng = random.Random(f"task-3:{seed}") base_date = date(2025, 3, 25) + timedelta(days=rng.randrange(0, 7)) target_date = base_date.isoformat() recipient = rng.choice(_TASK_3_RECIPIENTS) subject = f"Daily Revenue Report - {target_date}" report_title = f"Daily Revenue Report ({target_date})" selected_departments = sorted(rng.sample(_TASK_3_DEPARTMENTS, k=4)) expected_rows: list[dict[str, Any]] = [] warehouse_rows: list[dict[str, Any]] = [] row_id = 1 for offset in (-2, -1, 0, 1): report_date = (base_date + timedelta(days=offset)).isoformat() for department in selected_departments: if offset == 0: revenue = round(rng.uniform(12_000.0, 95_000.0), 2) expenses = round(rng.uniform(8_000.0, revenue + 18_000.0), 2) headcount = rng.randint(8, 48) seeded_row = { "department": department, "revenue": revenue, "expenses": expenses, "headcount": headcount, } expected_rows.append(seeded_row) else: revenue = round(rng.uniform(9_000.0, 90_000.0), 2) expenses = round(rng.uniform(7_000.0, revenue + 14_000.0), 2) headcount = rng.randint(8, 48) warehouse_rows.append( { "id": row_id, "report_date": report_date, "department": department, "revenue": revenue, "expenses": expenses, "headcount": headcount, } ) row_id += 1 description = ( f"Extract the daily report for date '{target_date}' from the 'daily_reports' table, " "repair the broken 'format_report.py' script, save the exact extracted rows to " f"'report_data.json', run the script with that file, and send the generated report " f"to '{recipient}' with subject '{subject}'. The grader expects the exact seeded slice, " "including headcount." ) return Task3Scenario( description=description, target_date=target_date, recipient=recipient, subject=subject, report_title=report_title, all_rows=tuple(warehouse_rows), expected_rows=tuple( normalize_task_3_rows(expected_rows, require_headcount=True) ), broken_script=_render_broken_format_report_script(target_date), ) def _sample_task_2_batch( rng: random.Random, *, batch_index: int ) -> tuple[dict[str, Any], ...]: def make_record( suffix: str, *, status: str, amount_cents: int, priority: int, ) -> dict[str, Any]: return { "order_id": f"ORD-{batch_index:02d}-{suffix}", "status": status, "amount_cents": amount_cents, "priority": priority, "region": rng.choice(_TASK_2_REGIONS), } records = [ make_record( "normal", status=_TASK_2_READY_STATUS, amount_cents=rng.randrange(12_125, 28_975, 25), priority=rng.randint(2, 6), ), make_record( "priority", status=_TASK_2_READY_STATUS, amount_cents=rng.randrange(13_175, 32_775, 25), priority=rng.randint(8, 10), ), make_record( "amount", status=_TASK_2_READY_STATUS, amount_cents=rng.randrange(50_025, 88_975, 25), priority=rng.randint(2, 6), ), make_record( "queued", status=rng.choice(_TASK_2_NON_READY_STATUSES[:2]), amount_cents=rng.randrange(18_125, 42_975, 25), priority=rng.randint(4, 9), ), make_record( "drop", status=_TASK_2_READY_STATUS, amount_cents=-rng.randrange(125, 2_975, 25), priority=rng.randint(8, 10), ), ] if batch_index % 2 == 0: records.append( make_record( "hold", status=rng.choice(_TASK_2_NON_READY_STATUSES), amount_cents=rng.randrange(24_125, 48_975, 25), priority=rng.randint(1, 7), ) ) rng.shuffle(records) return tuple(records) def _render_broken_pipeline_script( visible_batch: tuple[dict[str, Any], ...] ) -> str: return textwrap.dedent( f'''\ import json def process_data_stream(payloads): """ Normalize downstream billing candidates. Keep only records whose status is "ready" and whose amount_cents is positive. Convert amount_cents to amount_usd rounded to 2 decimals. Mark priority_band as "high" when priority >= 8 or amount_usd >= 500.00. Return rows sorted by amount_usd descending, then order_id ascending. """ processed_records = [] for payload in payloads: if payload["status"] == "failed" or payload["amount_cents"] <= 0: continue amount_usd = round(payload["amount_cents"] // 100, 2) priority_band = ( "high" if payload["priority"] >= 8 and amount_usd >= 500.0 else "normal" ) processed_records.append( {{ "order_id": payload["order_id"], "region": payload["region"], "amount_usd": amount_usd, "priority_band": priority_band, }} ) processed_records.sort(key=lambda item: (item["amount_usd"], item["order_id"])) return processed_records if __name__ == "__main__": mock_batch = {list(visible_batch)!r} print(json.dumps(process_data_stream(mock_batch), indent=2, sort_keys=True)) ''' ).lstrip() def _render_broken_format_report_script(target_date: str) -> str: title = f"=== Daily Revenue Report ({target_date}) ===" return textwrap.dedent( f'''\ import json import sys def format_report(input_path): """Reads extracted data from JSON and produces a formatted stakeholder report.""" with open(input_path, encoding="utf-8") as f: records = json.load(f) lines = ["{title}", ""] total_revenue = 0 for rec in records: dept = rec["department"] rev = int(rec["revenue"]) # BUG 1: int() truncates decimal precision exp = rec["expenses"] net = rev - exp lines.append(f"Department: {{dept}}") lines.append(f" Revenue: ${{rev}}") lines.append(f" Expenses: ${{exp:.2f}}") lines.append(f" Net: ${{net:.2f}}") lines.append("") total_revenue += rev lines.append(f"Total Revenue: ${{total_revenue}}") lines.append("=== End of Report ===") output = "\\n".join(lines) print(output) return output if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python format_report.py ", file=sys.stderr) sys.exit(1) format_report(sys.argv[0]) # BUG 2: should be sys.argv[1] ''' ).lstrip()