Spaces:
Sleeping
Sleeping
| """Seeded task metadata and deterministic scenario builders for DataOpsEnv.""" | |
| from __future__ import annotations | |
| import random | |
| import re | |
| import textwrap | |
| from dataclasses import dataclass | |
| from datetime import date, timedelta | |
| from typing import Any, Iterable | |
| TASK_IDS = [ | |
| "task_1_easy_anomaly", | |
| "task_2_medium_syntax", | |
| "task_3_hard_e2e", | |
| ] | |
| class SQLPolicy: | |
| allowed_commands: frozenset[str] | |
| required_table: str | |
| class TaskMetadata: | |
| task_id: str | |
| name: str | |
| difficulty: str | |
| short_description: str | |
| benchmark_focus: str | |
| allowed_actions: tuple[str, ...] | |
| class Task1Scenario: | |
| description: str | |
| all_rows: tuple[dict[str, Any], ...] | |
| expected_rows: tuple[dict[str, Any], ...] | |
| corrupted_row_ids: tuple[int, ...] | |
| class Task2Scenario: | |
| description: str | |
| visible_batch: tuple[dict[str, Any], ...] | |
| visible_expected: tuple[dict[str, Any], ...] | |
| hidden_cases: tuple[tuple[dict[str, Any], ...], ...] | |
| hidden_expected: tuple[tuple[dict[str, Any], ...], ...] | |
| broken_script: str | |
| class Task3Scenario: | |
| description: str | |
| target_date: str | |
| recipient: str | |
| subject: str | |
| report_title: str | |
| all_rows: tuple[dict[str, Any], ...] | |
| expected_rows: tuple[dict[str, Any], ...] | |
| broken_script: str | |
| class TaskScenarioBundle: | |
| task_id: str | |
| seed: int | |
| description: str | |
| task_1: Task1Scenario | None = None | |
| task_2: Task2Scenario | None = None | |
| task_3: Task3Scenario | None = None | |
| TASK_METADATA = { | |
| "task_1_easy_anomaly": TaskMetadata( | |
| task_id="task_1_easy_anomaly", | |
| name="Delete Corrupted Transaction Rows", | |
| difficulty="easy", | |
| short_description=( | |
| "Inspect a transaction table and remove only the seeded rows with NULL amounts while preserving legitimate non-null edge values." | |
| ), | |
| benchmark_focus="Careful data cleanup without collateral damage.", | |
| allowed_actions=("ExecuteSQL",), | |
| ), | |
| "task_2_medium_syntax": TaskMetadata( | |
| task_id="task_2_medium_syntax", | |
| name="Repair Seeded Pipeline Script", | |
| difficulty="medium", | |
| short_description=( | |
| "Repair a seeded ETL normalization script and verify it on visible and hidden seeded batches." | |
| ), | |
| benchmark_focus="Code reading, precise repair, and generalization beyond the demo batch.", | |
| allowed_actions=("ReadFile", "WriteFile", "RunScript"), | |
| ), | |
| "task_3_hard_e2e": TaskMetadata( | |
| task_id="task_3_hard_e2e", | |
| name="Resolve Revenue Reporting Incident", | |
| difficulty="hard", | |
| short_description=( | |
| "Extract a seeded reporting slice, repair the formatter, and send the exact generated report." | |
| ), | |
| benchmark_focus="End-to-end data extraction, file repair, and communication with provenance.", | |
| allowed_actions=("ExecuteSQL", "ReadFile", "WriteFile", "RunScript", "SendEmail"), | |
| ), | |
| } | |
| TASK_DESCRIPTIONS = { | |
| task_id: metadata.short_description for task_id, metadata in TASK_METADATA.items() | |
| } | |
| TASK_ALLOWED_WRITE_FILES = { | |
| "task_1_easy_anomaly": frozenset(), | |
| "task_2_medium_syntax": frozenset({"broken_pipeline.py"}), | |
| "task_3_hard_e2e": frozenset({"format_report.py", "report_data.json"}), | |
| } | |
| TASK_ALLOWED_RUN_FILES = { | |
| "task_1_easy_anomaly": frozenset(), | |
| "task_2_medium_syntax": frozenset({"broken_pipeline.py"}), | |
| "task_3_hard_e2e": frozenset({"format_report.py"}), | |
| } | |
| TASK_EMAIL_ENABLED = frozenset({"task_3_hard_e2e"}) | |
| TASK_ALLOWED_READ_FILES = { | |
| "task_1_easy_anomaly": frozenset(), | |
| "task_2_medium_syntax": frozenset({"broken_pipeline.py"}), | |
| "task_3_hard_e2e": frozenset({"format_report.py", "report_data.json"}), | |
| } | |
| TASK_SQL_POLICIES = { | |
| "task_1_easy_anomaly": SQLPolicy( | |
| allowed_commands=frozenset({"SELECT", "DELETE"}), | |
| required_table="transactions", | |
| ), | |
| "task_3_hard_e2e": SQLPolicy( | |
| allowed_commands=frozenset({"SELECT", "WITH"}), | |
| required_table="daily_reports", | |
| ), | |
| } | |
| _REPORT_RECORD_RE = re.compile( | |
| r"Department:\s*(?P<department>[^\n]+)\n" | |
| r"\s*Revenue:\s*\$(?P<revenue>-?\d+(?:\.\d+)?)\n" | |
| r"\s*Expenses:\s*\$(?P<expenses>-?\d+(?:\.\d+)?)\n" | |
| r"\s*Net:\s*\$(?P<net>-?\d+(?:\.\d+)?)", | |
| re.MULTILINE, | |
| ) | |
| _REPORT_TOTAL_RE = re.compile(r"Total Revenue:\s*\$(?P<total>-?\d+(?:\.\d+)?)") | |
| _TASK_1_VALID_STATUSES = ("success", "settled", "approved", "completed") | |
| _TASK_1_CORRUPTED_STATUSES = ("pending", "retrying", "failed", "queued") | |
| _TASK_2_READY_STATUS = "ready" | |
| _TASK_2_NON_READY_STATUSES = ("queued", "hold", "failed") | |
| _TASK_2_REGIONS = ("us-east", "eu-west", "ap-south", "sa-east") | |
| _TASK_3_RECIPIENTS = ( | |
| "bhavik@example.com", | |
| "marta@example.com", | |
| "ops-lead@example.com", | |
| "finance-review@example.com", | |
| ) | |
| _TASK_3_DEPARTMENTS = ( | |
| "Engineering", | |
| "Sales", | |
| "Marketing", | |
| "Operations", | |
| "Support", | |
| "Finance", | |
| ) | |
| def task_manifest_entries() -> list[dict[str, Any]]: | |
| return [ | |
| { | |
| "id": metadata.task_id, | |
| "name": metadata.name, | |
| "difficulty": metadata.difficulty, | |
| "description": metadata.short_description, | |
| "benchmark_focus": metadata.benchmark_focus, | |
| "allowed_actions": list(metadata.allowed_actions), | |
| } | |
| for metadata in TASK_METADATA.values() | |
| ] | |
| def build_task_scenario(task_id: str, seed: int | None = None) -> TaskScenarioBundle: | |
| resolved_seed = 0 if seed is None else int(seed) | |
| if task_id == "task_1_easy_anomaly": | |
| task = _build_task_1_scenario(resolved_seed) | |
| return TaskScenarioBundle( | |
| task_id=task_id, | |
| seed=resolved_seed, | |
| description=task.description, | |
| task_1=task, | |
| ) | |
| if task_id == "task_2_medium_syntax": | |
| task = _build_task_2_scenario(resolved_seed) | |
| return TaskScenarioBundle( | |
| task_id=task_id, | |
| seed=resolved_seed, | |
| description=task.description, | |
| task_2=task, | |
| ) | |
| if task_id == "task_3_hard_e2e": | |
| task = _build_task_3_scenario(resolved_seed) | |
| return TaskScenarioBundle( | |
| task_id=task_id, | |
| seed=resolved_seed, | |
| description=task.description, | |
| task_3=task, | |
| ) | |
| raise KeyError(f"Unknown task_id: {task_id}") | |
| def normalize_task_3_rows( | |
| rows: Iterable[dict[str, Any]], *, require_headcount: bool = False | |
| ) -> list[dict[str, Any]]: | |
| """Normalise extracted rows for deterministic comparison.""" | |
| normalised: list[dict[str, Any]] = [] | |
| for row in rows: | |
| try: | |
| hc_raw = row.get("headcount") | |
| if hc_raw is None or hc_raw == "": | |
| if require_headcount: | |
| return [] | |
| headcount: int | None = None | |
| else: | |
| headcount = int(hc_raw) | |
| normalised.append( | |
| { | |
| "department": str(row["department"]), | |
| "revenue": round(float(row["revenue"]), 2), | |
| "expenses": round(float(row["expenses"]), 2), | |
| "headcount": headcount, | |
| } | |
| ) | |
| except (KeyError, TypeError, ValueError): | |
| return [] | |
| return sorted(normalised, key=lambda item: item["department"]) | |
| def normalize_task_2_output_rows(rows: Any) -> list[dict[str, Any]]: | |
| """Normalise Task 2 ETL output rows while preserving list order for sort checks.""" | |
| if not isinstance(rows, list): | |
| return [] | |
| normalised: list[dict[str, Any]] = [] | |
| for row in rows: | |
| if not isinstance(row, dict): | |
| return [] | |
| try: | |
| order_id = str(row["order_id"]) | |
| region = str(row["region"]) | |
| amount_usd = round(float(row["amount_usd"]), 2) | |
| priority_band = str(row["priority_band"]) | |
| except (KeyError, TypeError, ValueError): | |
| return [] | |
| if priority_band not in {"high", "normal"}: | |
| return [] | |
| normalised.append( | |
| { | |
| "order_id": order_id, | |
| "region": region, | |
| "amount_usd": amount_usd, | |
| "priority_band": priority_band, | |
| } | |
| ) | |
| return normalised | |
| def build_task_2_expected( | |
| batch: Iterable[dict[str, Any]] | |
| ) -> list[dict[str, Any]]: | |
| processed: list[dict[str, Any]] = [] | |
| for record in batch: | |
| try: | |
| status = str(record["status"]) | |
| amount_cents = int(record["amount_cents"]) | |
| priority = int(record["priority"]) | |
| amount_usd = round(amount_cents / 100.0, 2) | |
| if status != _TASK_2_READY_STATUS or amount_cents <= 0: | |
| continue | |
| processed.append( | |
| { | |
| "order_id": str(record["order_id"]), | |
| "region": str(record["region"]), | |
| "amount_usd": amount_usd, | |
| "priority_band": "high" | |
| if priority >= 8 or amount_usd >= 500.0 | |
| else "normal", | |
| } | |
| ) | |
| except (KeyError, TypeError, ValueError): | |
| return [] | |
| processed.sort(key=lambda item: (-item["amount_usd"], item["order_id"])) | |
| return processed | |
| def task_3_data_matches_expected( | |
| rows: list[dict[str, Any]], | |
| expected_rows: Iterable[dict[str, Any]], | |
| *, | |
| require_headcount: bool, | |
| ) -> bool: | |
| expected = normalize_task_3_rows(expected_rows, require_headcount=require_headcount) | |
| return rows == expected | |
| def task_3_headcount_fully_matches( | |
| rows: list[dict[str, Any]], expected_rows: Iterable[dict[str, Any]] | |
| ) -> bool: | |
| expected = normalize_task_3_rows(expected_rows, require_headcount=True) | |
| return rows == expected | |
| def build_task_3_report(rows: Iterable[dict[str, Any]], target_date: str) -> str: | |
| report_rows = normalize_task_3_rows(rows, require_headcount=True) | |
| lines = [f"=== Daily Revenue Report ({target_date}) ===", ""] | |
| total_revenue = 0.0 | |
| for row in report_rows: | |
| revenue = float(row["revenue"]) | |
| expenses = float(row["expenses"]) | |
| net = revenue - expenses | |
| lines.append(f"Department: {row['department']}") | |
| lines.append(f" Revenue: ${revenue:.2f}") | |
| lines.append(f" Expenses: ${expenses:.2f}") | |
| lines.append(f" Net: ${net:.2f}") | |
| lines.append("") | |
| total_revenue += revenue | |
| lines.append(f"Total Revenue: ${total_revenue:.2f}") | |
| lines.append("=== End of Report ===") | |
| return "\n".join(lines) | |
| def extract_task_3_report_block(text: str, target_date: str) -> str | None: | |
| raw = text.replace("\r\n", "\n") | |
| start_marker = f"=== Daily Revenue Report ({target_date}) ===" | |
| start = raw.find(start_marker) | |
| end_marker = "=== End of Report ===" | |
| end = raw.find(end_marker) | |
| if start == -1 or end == -1 or end < start: | |
| return None | |
| return raw[start : end + len(end_marker)].strip() | |
| def parse_task_3_report(text: str, target_date: str) -> dict[str, Any] | None: | |
| block = extract_task_3_report_block(text, target_date) | |
| if block is None: | |
| return None | |
| records: list[dict[str, Any]] = [] | |
| for match in _REPORT_RECORD_RE.finditer(block): | |
| revenue = round(float(match.group("revenue")), 2) | |
| expenses = round(float(match.group("expenses")), 2) | |
| net = round(float(match.group("net")), 2) | |
| records.append( | |
| { | |
| "department": match.group("department").strip(), | |
| "revenue": revenue, | |
| "expenses": expenses, | |
| "headcount": None, | |
| "net": net, | |
| } | |
| ) | |
| total_match = _REPORT_TOTAL_RE.search(block) | |
| if not total_match: | |
| return None | |
| return { | |
| "records": sorted(records, key=lambda item: item["department"]), | |
| "total_revenue": round(float(total_match.group("total")), 2), | |
| } | |
| def report_matches_expected( | |
| text: str, expected_rows: Iterable[dict[str, Any]], target_date: str | |
| ) -> bool: | |
| parsed = parse_task_3_report(text, target_date) | |
| if parsed is None: | |
| return False | |
| expected = normalize_task_3_rows(expected_rows, require_headcount=True) | |
| expected_records = [ | |
| { | |
| "department": row["department"], | |
| "revenue": row["revenue"], | |
| "expenses": row["expenses"], | |
| "headcount": None, | |
| "net": round(float(row["revenue"]) - float(row["expenses"]), 2), | |
| } | |
| for row in expected | |
| ] | |
| expected_total = round(sum(float(row["revenue"]) for row in expected), 2) | |
| return ( | |
| parsed["records"] == expected_records | |
| and parsed["total_revenue"] == expected_total | |
| ) | |
| def task_3_semantic_match_fraction_rows( | |
| rows: list[dict[str, Any]], expected_rows: Iterable[dict[str, Any]] | |
| ) -> float: | |
| if not rows: | |
| return 0.0 | |
| expected = normalize_task_3_rows(expected_rows, require_headcount=False) | |
| exp_by_dept = {row["department"]: row for row in expected} | |
| matched = 0 | |
| for row in rows: | |
| department = row.get("department") | |
| if department not in exp_by_dept: | |
| continue | |
| expected_row = exp_by_dept[department] | |
| if ( | |
| row.get("revenue") == expected_row["revenue"] | |
| and row.get("expenses") == expected_row["expenses"] | |
| ): | |
| matched += 1 | |
| return matched / len(expected) if expected else 0.0 | |
| def task_3_semantic_match_fraction_parsed( | |
| parsed: dict[str, Any] | None, expected_rows: Iterable[dict[str, Any]] | |
| ) -> float: | |
| if not parsed or not parsed.get("records"): | |
| return 0.0 | |
| expected = normalize_task_3_rows(expected_rows, require_headcount=False) | |
| exp_by_dept = {row["department"]: row for row in expected} | |
| matched = 0 | |
| for record in parsed["records"]: | |
| department = record.get("department") | |
| if department not in exp_by_dept: | |
| continue | |
| expected_row = exp_by_dept[department] | |
| if ( | |
| record.get("revenue") == expected_row["revenue"] | |
| and record.get("expenses") == expected_row["expenses"] | |
| ): | |
| matched += 1 | |
| return matched / len(expected) if expected else 0.0 | |
| def task_3_semantic_match_fraction_text( | |
| text: str, expected_rows: Iterable[dict[str, Any]], target_date: str | |
| ) -> float: | |
| return task_3_semantic_match_fraction_parsed( | |
| parse_task_3_report(text, target_date), expected_rows | |
| ) | |
| def _build_task_1_scenario(seed: int) -> Task1Scenario: | |
| rng = random.Random(f"task-1:{seed}") | |
| valid_count = 3 + rng.randrange(3) | |
| corrupted_count = 2 + rng.randrange(2) | |
| combined_rows: list[dict[str, Any]] = [] | |
| valid_templates = [] | |
| for index in range(valid_count): | |
| valid_templates.append( | |
| { | |
| "kind": "valid", | |
| "user_id": 1000 + seed * 10 + index, | |
| "amount": round(rng.uniform(75.0, 975.0), 2), | |
| "status": rng.choice(_TASK_1_VALID_STATUSES), | |
| } | |
| ) | |
| if valid_templates: | |
| valid_templates[0]["amount"] = 0.0 | |
| valid_templates[0]["status"] = "settled" | |
| if len(valid_templates) > 1: | |
| valid_templates[1]["amount"] = -round(float(valid_templates[1]["amount"]) / 10.0, 2) | |
| valid_templates[1]["status"] = "approved" | |
| corrupted_templates = [] | |
| for index in range(corrupted_count): | |
| corrupted_templates.append( | |
| { | |
| "kind": "corrupted", | |
| "user_id": 2000 + seed * 10 + index, | |
| "amount": None, | |
| "status": rng.choice(_TASK_1_CORRUPTED_STATUSES), | |
| } | |
| ) | |
| templates = valid_templates + corrupted_templates | |
| rng.shuffle(templates) | |
| expected_rows: list[dict[str, Any]] = [] | |
| corrupted_row_ids: list[int] = [] | |
| for row_id, template in enumerate(templates, start=1): | |
| row = { | |
| "id": row_id, | |
| "user_id": int(template["user_id"]), | |
| "amount": template["amount"], | |
| "status": str(template["status"]), | |
| } | |
| combined_rows.append(row) | |
| if template["kind"] == "valid": | |
| expected_rows.append(row) | |
| else: | |
| corrupted_row_ids.append(row_id) | |
| description = ( | |
| "Find and delete all corrupted records (rows with NULL amounts) from the " | |
| f"'transactions' table. This seeded episode contains {corrupted_count} corrupted " | |
| f"rows mixed with {valid_count} valid rows. Only NULL amounts are corrupted; " | |
| "legitimate zero-value reconciliations and negative refund adjustments may also " | |
| "appear and must be preserved exactly." | |
| ) | |
| return Task1Scenario( | |
| description=description, | |
| all_rows=tuple(combined_rows), | |
| expected_rows=tuple(expected_rows), | |
| corrupted_row_ids=tuple(sorted(corrupted_row_ids)), | |
| ) | |
| def _build_task_2_scenario(seed: int) -> Task2Scenario: | |
| rng = random.Random(f"task-2:{seed}") | |
| visible_batch = _sample_task_2_batch(rng, batch_index=0) | |
| hidden_cases = tuple( | |
| _sample_task_2_batch(rng, batch_index=index + 1) | |
| for index in range(6) | |
| ) | |
| visible_expected = tuple(build_task_2_expected(visible_batch)) | |
| hidden_expected = tuple( | |
| tuple(build_task_2_expected(batch)) for batch in hidden_cases | |
| ) | |
| description = ( | |
| "The script 'broken_pipeline.py' prepares downstream billing candidates from " | |
| "seeded order records. Repair it so it keeps only ready records with positive " | |
| "amounts, converts cents to USD, flags high priority when priority >= 8 or " | |
| "amount_usd >= 500.00, and returns rows sorted by amount_usd descending then " | |
| "order_id ascending. The grader checks the visible demo batch and additional " | |
| "unseen seeded batches." | |
| ) | |
| return Task2Scenario( | |
| description=description, | |
| visible_batch=visible_batch, | |
| visible_expected=visible_expected, | |
| hidden_cases=hidden_cases, | |
| hidden_expected=hidden_expected, | |
| broken_script=_render_broken_pipeline_script(visible_batch), | |
| ) | |
| def _build_task_3_scenario(seed: int) -> Task3Scenario: | |
| rng = random.Random(f"task-3:{seed}") | |
| base_date = date(2025, 3, 25) + timedelta(days=rng.randrange(0, 7)) | |
| target_date = base_date.isoformat() | |
| recipient = rng.choice(_TASK_3_RECIPIENTS) | |
| subject = f"Daily Revenue Report - {target_date}" | |
| report_title = f"Daily Revenue Report ({target_date})" | |
| selected_departments = sorted(rng.sample(_TASK_3_DEPARTMENTS, k=4)) | |
| expected_rows: list[dict[str, Any]] = [] | |
| warehouse_rows: list[dict[str, Any]] = [] | |
| row_id = 1 | |
| for offset in (-2, -1, 0, 1): | |
| report_date = (base_date + timedelta(days=offset)).isoformat() | |
| for department in selected_departments: | |
| if offset == 0: | |
| revenue = round(rng.uniform(12_000.0, 95_000.0), 2) | |
| expenses = round(rng.uniform(8_000.0, revenue + 18_000.0), 2) | |
| headcount = rng.randint(8, 48) | |
| seeded_row = { | |
| "department": department, | |
| "revenue": revenue, | |
| "expenses": expenses, | |
| "headcount": headcount, | |
| } | |
| expected_rows.append(seeded_row) | |
| else: | |
| revenue = round(rng.uniform(9_000.0, 90_000.0), 2) | |
| expenses = round(rng.uniform(7_000.0, revenue + 14_000.0), 2) | |
| headcount = rng.randint(8, 48) | |
| warehouse_rows.append( | |
| { | |
| "id": row_id, | |
| "report_date": report_date, | |
| "department": department, | |
| "revenue": revenue, | |
| "expenses": expenses, | |
| "headcount": headcount, | |
| } | |
| ) | |
| row_id += 1 | |
| description = ( | |
| f"Extract the daily report for date '{target_date}' from the 'daily_reports' table, " | |
| "repair the broken 'format_report.py' script, save the exact extracted rows to " | |
| f"'report_data.json', run the script with that file, and send the generated report " | |
| f"to '{recipient}' with subject '{subject}'. The grader expects the exact seeded slice, " | |
| "including headcount." | |
| ) | |
| return Task3Scenario( | |
| description=description, | |
| target_date=target_date, | |
| recipient=recipient, | |
| subject=subject, | |
| report_title=report_title, | |
| all_rows=tuple(warehouse_rows), | |
| expected_rows=tuple( | |
| normalize_task_3_rows(expected_rows, require_headcount=True) | |
| ), | |
| broken_script=_render_broken_format_report_script(target_date), | |
| ) | |
| def _sample_task_2_batch( | |
| rng: random.Random, *, batch_index: int | |
| ) -> tuple[dict[str, Any], ...]: | |
| def make_record( | |
| suffix: str, | |
| *, | |
| status: str, | |
| amount_cents: int, | |
| priority: int, | |
| ) -> dict[str, Any]: | |
| return { | |
| "order_id": f"ORD-{batch_index:02d}-{suffix}", | |
| "status": status, | |
| "amount_cents": amount_cents, | |
| "priority": priority, | |
| "region": rng.choice(_TASK_2_REGIONS), | |
| } | |
| records = [ | |
| make_record( | |
| "normal", | |
| status=_TASK_2_READY_STATUS, | |
| amount_cents=rng.randrange(12_125, 28_975, 25), | |
| priority=rng.randint(2, 6), | |
| ), | |
| make_record( | |
| "priority", | |
| status=_TASK_2_READY_STATUS, | |
| amount_cents=rng.randrange(13_175, 32_775, 25), | |
| priority=rng.randint(8, 10), | |
| ), | |
| make_record( | |
| "amount", | |
| status=_TASK_2_READY_STATUS, | |
| amount_cents=rng.randrange(50_025, 88_975, 25), | |
| priority=rng.randint(2, 6), | |
| ), | |
| make_record( | |
| "queued", | |
| status=rng.choice(_TASK_2_NON_READY_STATUSES[:2]), | |
| amount_cents=rng.randrange(18_125, 42_975, 25), | |
| priority=rng.randint(4, 9), | |
| ), | |
| make_record( | |
| "drop", | |
| status=_TASK_2_READY_STATUS, | |
| amount_cents=-rng.randrange(125, 2_975, 25), | |
| priority=rng.randint(8, 10), | |
| ), | |
| ] | |
| if batch_index % 2 == 0: | |
| records.append( | |
| make_record( | |
| "hold", | |
| status=rng.choice(_TASK_2_NON_READY_STATUSES), | |
| amount_cents=rng.randrange(24_125, 48_975, 25), | |
| priority=rng.randint(1, 7), | |
| ) | |
| ) | |
| rng.shuffle(records) | |
| return tuple(records) | |
| def _render_broken_pipeline_script( | |
| visible_batch: tuple[dict[str, Any], ...] | |
| ) -> str: | |
| return textwrap.dedent( | |
| f'''\ | |
| import json | |
| def process_data_stream(payloads): | |
| """ | |
| Normalize downstream billing candidates. | |
| Keep only records whose status is "ready" and whose amount_cents is positive. | |
| Convert amount_cents to amount_usd rounded to 2 decimals. | |
| Mark priority_band as "high" when priority >= 8 or amount_usd >= 500.00. | |
| Return rows sorted by amount_usd descending, then order_id ascending. | |
| """ | |
| processed_records = [] | |
| for payload in payloads: | |
| if payload["status"] == "failed" or payload["amount_cents"] <= 0: | |
| continue | |
| amount_usd = round(payload["amount_cents"] // 100, 2) | |
| priority_band = ( | |
| "high" | |
| if payload["priority"] >= 8 and amount_usd >= 500.0 | |
| else "normal" | |
| ) | |
| processed_records.append( | |
| {{ | |
| "order_id": payload["order_id"], | |
| "region": payload["region"], | |
| "amount_usd": amount_usd, | |
| "priority_band": priority_band, | |
| }} | |
| ) | |
| processed_records.sort(key=lambda item: (item["amount_usd"], item["order_id"])) | |
| return processed_records | |
| if __name__ == "__main__": | |
| mock_batch = {list(visible_batch)!r} | |
| print(json.dumps(process_data_stream(mock_batch), indent=2, sort_keys=True)) | |
| ''' | |
| ).lstrip() | |
| def _render_broken_format_report_script(target_date: str) -> str: | |
| title = f"=== Daily Revenue Report ({target_date}) ===" | |
| return textwrap.dedent( | |
| f'''\ | |
| import json | |
| import sys | |
| def format_report(input_path): | |
| """Reads extracted data from JSON and produces a formatted stakeholder report.""" | |
| with open(input_path, encoding="utf-8") as f: | |
| records = json.load(f) | |
| lines = ["{title}", ""] | |
| total_revenue = 0 | |
| for rec in records: | |
| dept = rec["department"] | |
| rev = int(rec["revenue"]) # BUG 1: int() truncates decimal precision | |
| exp = rec["expenses"] | |
| net = rev - exp | |
| lines.append(f"Department: {{dept}}") | |
| lines.append(f" Revenue: ${{rev}}") | |
| lines.append(f" Expenses: ${{exp:.2f}}") | |
| lines.append(f" Net: ${{net:.2f}}") | |
| lines.append("") | |
| total_revenue += rev | |
| lines.append(f"Total Revenue: ${{total_revenue}}") | |
| lines.append("=== End of Report ===") | |
| output = "\\n".join(lines) | |
| print(output) | |
| return output | |
| if __name__ == "__main__": | |
| if len(sys.argv) < 2: | |
| print("Usage: python format_report.py <input.json>", file=sys.stderr) | |
| sys.exit(1) | |
| format_report(sys.argv[0]) # BUG 2: should be sys.argv[1] | |
| ''' | |
| ).lstrip() | |