dataops-env / server /task_specs.py
visheshrathi's picture
Upload folder using huggingface_hub
f89b1ac verified
"""Seeded task metadata and deterministic scenario builders for DataOpsEnv."""
from __future__ import annotations
import random
import re
import textwrap
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Any, Iterable
TASK_IDS = [
"task_1_easy_anomaly",
"task_2_medium_syntax",
"task_3_hard_e2e",
]
@dataclass(frozen=True)
class SQLPolicy:
allowed_commands: frozenset[str]
required_table: str
@dataclass(frozen=True)
class TaskMetadata:
task_id: str
name: str
difficulty: str
short_description: str
benchmark_focus: str
allowed_actions: tuple[str, ...]
@dataclass(frozen=True)
class Task1Scenario:
description: str
all_rows: tuple[dict[str, Any], ...]
expected_rows: tuple[dict[str, Any], ...]
corrupted_row_ids: tuple[int, ...]
@dataclass(frozen=True)
class Task2Scenario:
description: str
visible_batch: tuple[dict[str, Any], ...]
visible_expected: tuple[dict[str, Any], ...]
hidden_cases: tuple[tuple[dict[str, Any], ...], ...]
hidden_expected: tuple[tuple[dict[str, Any], ...], ...]
broken_script: str
@dataclass(frozen=True)
class Task3Scenario:
description: str
target_date: str
recipient: str
subject: str
report_title: str
all_rows: tuple[dict[str, Any], ...]
expected_rows: tuple[dict[str, Any], ...]
broken_script: str
@dataclass(frozen=True)
class TaskScenarioBundle:
task_id: str
seed: int
description: str
task_1: Task1Scenario | None = None
task_2: Task2Scenario | None = None
task_3: Task3Scenario | None = None
TASK_METADATA = {
"task_1_easy_anomaly": TaskMetadata(
task_id="task_1_easy_anomaly",
name="Delete Corrupted Transaction Rows",
difficulty="easy",
short_description=(
"Inspect a transaction table and remove only the seeded rows with NULL amounts while preserving legitimate non-null edge values."
),
benchmark_focus="Careful data cleanup without collateral damage.",
allowed_actions=("ExecuteSQL",),
),
"task_2_medium_syntax": TaskMetadata(
task_id="task_2_medium_syntax",
name="Repair Seeded Pipeline Script",
difficulty="medium",
short_description=(
"Repair a seeded ETL normalization script and verify it on visible and hidden seeded batches."
),
benchmark_focus="Code reading, precise repair, and generalization beyond the demo batch.",
allowed_actions=("ReadFile", "WriteFile", "RunScript"),
),
"task_3_hard_e2e": TaskMetadata(
task_id="task_3_hard_e2e",
name="Resolve Revenue Reporting Incident",
difficulty="hard",
short_description=(
"Extract a seeded reporting slice, repair the formatter, and send the exact generated report."
),
benchmark_focus="End-to-end data extraction, file repair, and communication with provenance.",
allowed_actions=("ExecuteSQL", "ReadFile", "WriteFile", "RunScript", "SendEmail"),
),
}
TASK_DESCRIPTIONS = {
task_id: metadata.short_description for task_id, metadata in TASK_METADATA.items()
}
TASK_ALLOWED_WRITE_FILES = {
"task_1_easy_anomaly": frozenset(),
"task_2_medium_syntax": frozenset({"broken_pipeline.py"}),
"task_3_hard_e2e": frozenset({"format_report.py", "report_data.json"}),
}
TASK_ALLOWED_RUN_FILES = {
"task_1_easy_anomaly": frozenset(),
"task_2_medium_syntax": frozenset({"broken_pipeline.py"}),
"task_3_hard_e2e": frozenset({"format_report.py"}),
}
TASK_EMAIL_ENABLED = frozenset({"task_3_hard_e2e"})
TASK_ALLOWED_READ_FILES = {
"task_1_easy_anomaly": frozenset(),
"task_2_medium_syntax": frozenset({"broken_pipeline.py"}),
"task_3_hard_e2e": frozenset({"format_report.py", "report_data.json"}),
}
TASK_SQL_POLICIES = {
"task_1_easy_anomaly": SQLPolicy(
allowed_commands=frozenset({"SELECT", "DELETE"}),
required_table="transactions",
),
"task_3_hard_e2e": SQLPolicy(
allowed_commands=frozenset({"SELECT", "WITH"}),
required_table="daily_reports",
),
}
_REPORT_RECORD_RE = re.compile(
r"Department:\s*(?P<department>[^\n]+)\n"
r"\s*Revenue:\s*\$(?P<revenue>-?\d+(?:\.\d+)?)\n"
r"\s*Expenses:\s*\$(?P<expenses>-?\d+(?:\.\d+)?)\n"
r"\s*Net:\s*\$(?P<net>-?\d+(?:\.\d+)?)",
re.MULTILINE,
)
_REPORT_TOTAL_RE = re.compile(r"Total Revenue:\s*\$(?P<total>-?\d+(?:\.\d+)?)")
_TASK_1_VALID_STATUSES = ("success", "settled", "approved", "completed")
_TASK_1_CORRUPTED_STATUSES = ("pending", "retrying", "failed", "queued")
_TASK_2_READY_STATUS = "ready"
_TASK_2_NON_READY_STATUSES = ("queued", "hold", "failed")
_TASK_2_REGIONS = ("us-east", "eu-west", "ap-south", "sa-east")
_TASK_3_RECIPIENTS = (
"bhavik@example.com",
"marta@example.com",
"ops-lead@example.com",
"finance-review@example.com",
)
_TASK_3_DEPARTMENTS = (
"Engineering",
"Sales",
"Marketing",
"Operations",
"Support",
"Finance",
)
def task_manifest_entries() -> list[dict[str, Any]]:
return [
{
"id": metadata.task_id,
"name": metadata.name,
"difficulty": metadata.difficulty,
"description": metadata.short_description,
"benchmark_focus": metadata.benchmark_focus,
"allowed_actions": list(metadata.allowed_actions),
}
for metadata in TASK_METADATA.values()
]
def build_task_scenario(task_id: str, seed: int | None = None) -> TaskScenarioBundle:
resolved_seed = 0 if seed is None else int(seed)
if task_id == "task_1_easy_anomaly":
task = _build_task_1_scenario(resolved_seed)
return TaskScenarioBundle(
task_id=task_id,
seed=resolved_seed,
description=task.description,
task_1=task,
)
if task_id == "task_2_medium_syntax":
task = _build_task_2_scenario(resolved_seed)
return TaskScenarioBundle(
task_id=task_id,
seed=resolved_seed,
description=task.description,
task_2=task,
)
if task_id == "task_3_hard_e2e":
task = _build_task_3_scenario(resolved_seed)
return TaskScenarioBundle(
task_id=task_id,
seed=resolved_seed,
description=task.description,
task_3=task,
)
raise KeyError(f"Unknown task_id: {task_id}")
def normalize_task_3_rows(
rows: Iterable[dict[str, Any]], *, require_headcount: bool = False
) -> list[dict[str, Any]]:
"""Normalise extracted rows for deterministic comparison."""
normalised: list[dict[str, Any]] = []
for row in rows:
try:
hc_raw = row.get("headcount")
if hc_raw is None or hc_raw == "":
if require_headcount:
return []
headcount: int | None = None
else:
headcount = int(hc_raw)
normalised.append(
{
"department": str(row["department"]),
"revenue": round(float(row["revenue"]), 2),
"expenses": round(float(row["expenses"]), 2),
"headcount": headcount,
}
)
except (KeyError, TypeError, ValueError):
return []
return sorted(normalised, key=lambda item: item["department"])
def normalize_task_2_output_rows(rows: Any) -> list[dict[str, Any]]:
"""Normalise Task 2 ETL output rows while preserving list order for sort checks."""
if not isinstance(rows, list):
return []
normalised: list[dict[str, Any]] = []
for row in rows:
if not isinstance(row, dict):
return []
try:
order_id = str(row["order_id"])
region = str(row["region"])
amount_usd = round(float(row["amount_usd"]), 2)
priority_band = str(row["priority_band"])
except (KeyError, TypeError, ValueError):
return []
if priority_band not in {"high", "normal"}:
return []
normalised.append(
{
"order_id": order_id,
"region": region,
"amount_usd": amount_usd,
"priority_band": priority_band,
}
)
return normalised
def build_task_2_expected(
batch: Iterable[dict[str, Any]]
) -> list[dict[str, Any]]:
processed: list[dict[str, Any]] = []
for record in batch:
try:
status = str(record["status"])
amount_cents = int(record["amount_cents"])
priority = int(record["priority"])
amount_usd = round(amount_cents / 100.0, 2)
if status != _TASK_2_READY_STATUS or amount_cents <= 0:
continue
processed.append(
{
"order_id": str(record["order_id"]),
"region": str(record["region"]),
"amount_usd": amount_usd,
"priority_band": "high"
if priority >= 8 or amount_usd >= 500.0
else "normal",
}
)
except (KeyError, TypeError, ValueError):
return []
processed.sort(key=lambda item: (-item["amount_usd"], item["order_id"]))
return processed
def task_3_data_matches_expected(
rows: list[dict[str, Any]],
expected_rows: Iterable[dict[str, Any]],
*,
require_headcount: bool,
) -> bool:
expected = normalize_task_3_rows(expected_rows, require_headcount=require_headcount)
return rows == expected
def task_3_headcount_fully_matches(
rows: list[dict[str, Any]], expected_rows: Iterable[dict[str, Any]]
) -> bool:
expected = normalize_task_3_rows(expected_rows, require_headcount=True)
return rows == expected
def build_task_3_report(rows: Iterable[dict[str, Any]], target_date: str) -> str:
report_rows = normalize_task_3_rows(rows, require_headcount=True)
lines = [f"=== Daily Revenue Report ({target_date}) ===", ""]
total_revenue = 0.0
for row in report_rows:
revenue = float(row["revenue"])
expenses = float(row["expenses"])
net = revenue - expenses
lines.append(f"Department: {row['department']}")
lines.append(f" Revenue: ${revenue:.2f}")
lines.append(f" Expenses: ${expenses:.2f}")
lines.append(f" Net: ${net:.2f}")
lines.append("")
total_revenue += revenue
lines.append(f"Total Revenue: ${total_revenue:.2f}")
lines.append("=== End of Report ===")
return "\n".join(lines)
def extract_task_3_report_block(text: str, target_date: str) -> str | None:
raw = text.replace("\r\n", "\n")
start_marker = f"=== Daily Revenue Report ({target_date}) ==="
start = raw.find(start_marker)
end_marker = "=== End of Report ==="
end = raw.find(end_marker)
if start == -1 or end == -1 or end < start:
return None
return raw[start : end + len(end_marker)].strip()
def parse_task_3_report(text: str, target_date: str) -> dict[str, Any] | None:
block = extract_task_3_report_block(text, target_date)
if block is None:
return None
records: list[dict[str, Any]] = []
for match in _REPORT_RECORD_RE.finditer(block):
revenue = round(float(match.group("revenue")), 2)
expenses = round(float(match.group("expenses")), 2)
net = round(float(match.group("net")), 2)
records.append(
{
"department": match.group("department").strip(),
"revenue": revenue,
"expenses": expenses,
"headcount": None,
"net": net,
}
)
total_match = _REPORT_TOTAL_RE.search(block)
if not total_match:
return None
return {
"records": sorted(records, key=lambda item: item["department"]),
"total_revenue": round(float(total_match.group("total")), 2),
}
def report_matches_expected(
text: str, expected_rows: Iterable[dict[str, Any]], target_date: str
) -> bool:
parsed = parse_task_3_report(text, target_date)
if parsed is None:
return False
expected = normalize_task_3_rows(expected_rows, require_headcount=True)
expected_records = [
{
"department": row["department"],
"revenue": row["revenue"],
"expenses": row["expenses"],
"headcount": None,
"net": round(float(row["revenue"]) - float(row["expenses"]), 2),
}
for row in expected
]
expected_total = round(sum(float(row["revenue"]) for row in expected), 2)
return (
parsed["records"] == expected_records
and parsed["total_revenue"] == expected_total
)
def task_3_semantic_match_fraction_rows(
rows: list[dict[str, Any]], expected_rows: Iterable[dict[str, Any]]
) -> float:
if not rows:
return 0.0
expected = normalize_task_3_rows(expected_rows, require_headcount=False)
exp_by_dept = {row["department"]: row for row in expected}
matched = 0
for row in rows:
department = row.get("department")
if department not in exp_by_dept:
continue
expected_row = exp_by_dept[department]
if (
row.get("revenue") == expected_row["revenue"]
and row.get("expenses") == expected_row["expenses"]
):
matched += 1
return matched / len(expected) if expected else 0.0
def task_3_semantic_match_fraction_parsed(
parsed: dict[str, Any] | None, expected_rows: Iterable[dict[str, Any]]
) -> float:
if not parsed or not parsed.get("records"):
return 0.0
expected = normalize_task_3_rows(expected_rows, require_headcount=False)
exp_by_dept = {row["department"]: row for row in expected}
matched = 0
for record in parsed["records"]:
department = record.get("department")
if department not in exp_by_dept:
continue
expected_row = exp_by_dept[department]
if (
record.get("revenue") == expected_row["revenue"]
and record.get("expenses") == expected_row["expenses"]
):
matched += 1
return matched / len(expected) if expected else 0.0
def task_3_semantic_match_fraction_text(
text: str, expected_rows: Iterable[dict[str, Any]], target_date: str
) -> float:
return task_3_semantic_match_fraction_parsed(
parse_task_3_report(text, target_date), expected_rows
)
def _build_task_1_scenario(seed: int) -> Task1Scenario:
rng = random.Random(f"task-1:{seed}")
valid_count = 3 + rng.randrange(3)
corrupted_count = 2 + rng.randrange(2)
combined_rows: list[dict[str, Any]] = []
valid_templates = []
for index in range(valid_count):
valid_templates.append(
{
"kind": "valid",
"user_id": 1000 + seed * 10 + index,
"amount": round(rng.uniform(75.0, 975.0), 2),
"status": rng.choice(_TASK_1_VALID_STATUSES),
}
)
if valid_templates:
valid_templates[0]["amount"] = 0.0
valid_templates[0]["status"] = "settled"
if len(valid_templates) > 1:
valid_templates[1]["amount"] = -round(float(valid_templates[1]["amount"]) / 10.0, 2)
valid_templates[1]["status"] = "approved"
corrupted_templates = []
for index in range(corrupted_count):
corrupted_templates.append(
{
"kind": "corrupted",
"user_id": 2000 + seed * 10 + index,
"amount": None,
"status": rng.choice(_TASK_1_CORRUPTED_STATUSES),
}
)
templates = valid_templates + corrupted_templates
rng.shuffle(templates)
expected_rows: list[dict[str, Any]] = []
corrupted_row_ids: list[int] = []
for row_id, template in enumerate(templates, start=1):
row = {
"id": row_id,
"user_id": int(template["user_id"]),
"amount": template["amount"],
"status": str(template["status"]),
}
combined_rows.append(row)
if template["kind"] == "valid":
expected_rows.append(row)
else:
corrupted_row_ids.append(row_id)
description = (
"Find and delete all corrupted records (rows with NULL amounts) from the "
f"'transactions' table. This seeded episode contains {corrupted_count} corrupted "
f"rows mixed with {valid_count} valid rows. Only NULL amounts are corrupted; "
"legitimate zero-value reconciliations and negative refund adjustments may also "
"appear and must be preserved exactly."
)
return Task1Scenario(
description=description,
all_rows=tuple(combined_rows),
expected_rows=tuple(expected_rows),
corrupted_row_ids=tuple(sorted(corrupted_row_ids)),
)
def _build_task_2_scenario(seed: int) -> Task2Scenario:
rng = random.Random(f"task-2:{seed}")
visible_batch = _sample_task_2_batch(rng, batch_index=0)
hidden_cases = tuple(
_sample_task_2_batch(rng, batch_index=index + 1)
for index in range(6)
)
visible_expected = tuple(build_task_2_expected(visible_batch))
hidden_expected = tuple(
tuple(build_task_2_expected(batch)) for batch in hidden_cases
)
description = (
"The script 'broken_pipeline.py' prepares downstream billing candidates from "
"seeded order records. Repair it so it keeps only ready records with positive "
"amounts, converts cents to USD, flags high priority when priority >= 8 or "
"amount_usd >= 500.00, and returns rows sorted by amount_usd descending then "
"order_id ascending. The grader checks the visible demo batch and additional "
"unseen seeded batches."
)
return Task2Scenario(
description=description,
visible_batch=visible_batch,
visible_expected=visible_expected,
hidden_cases=hidden_cases,
hidden_expected=hidden_expected,
broken_script=_render_broken_pipeline_script(visible_batch),
)
def _build_task_3_scenario(seed: int) -> Task3Scenario:
rng = random.Random(f"task-3:{seed}")
base_date = date(2025, 3, 25) + timedelta(days=rng.randrange(0, 7))
target_date = base_date.isoformat()
recipient = rng.choice(_TASK_3_RECIPIENTS)
subject = f"Daily Revenue Report - {target_date}"
report_title = f"Daily Revenue Report ({target_date})"
selected_departments = sorted(rng.sample(_TASK_3_DEPARTMENTS, k=4))
expected_rows: list[dict[str, Any]] = []
warehouse_rows: list[dict[str, Any]] = []
row_id = 1
for offset in (-2, -1, 0, 1):
report_date = (base_date + timedelta(days=offset)).isoformat()
for department in selected_departments:
if offset == 0:
revenue = round(rng.uniform(12_000.0, 95_000.0), 2)
expenses = round(rng.uniform(8_000.0, revenue + 18_000.0), 2)
headcount = rng.randint(8, 48)
seeded_row = {
"department": department,
"revenue": revenue,
"expenses": expenses,
"headcount": headcount,
}
expected_rows.append(seeded_row)
else:
revenue = round(rng.uniform(9_000.0, 90_000.0), 2)
expenses = round(rng.uniform(7_000.0, revenue + 14_000.0), 2)
headcount = rng.randint(8, 48)
warehouse_rows.append(
{
"id": row_id,
"report_date": report_date,
"department": department,
"revenue": revenue,
"expenses": expenses,
"headcount": headcount,
}
)
row_id += 1
description = (
f"Extract the daily report for date '{target_date}' from the 'daily_reports' table, "
"repair the broken 'format_report.py' script, save the exact extracted rows to "
f"'report_data.json', run the script with that file, and send the generated report "
f"to '{recipient}' with subject '{subject}'. The grader expects the exact seeded slice, "
"including headcount."
)
return Task3Scenario(
description=description,
target_date=target_date,
recipient=recipient,
subject=subject,
report_title=report_title,
all_rows=tuple(warehouse_rows),
expected_rows=tuple(
normalize_task_3_rows(expected_rows, require_headcount=True)
),
broken_script=_render_broken_format_report_script(target_date),
)
def _sample_task_2_batch(
rng: random.Random, *, batch_index: int
) -> tuple[dict[str, Any], ...]:
def make_record(
suffix: str,
*,
status: str,
amount_cents: int,
priority: int,
) -> dict[str, Any]:
return {
"order_id": f"ORD-{batch_index:02d}-{suffix}",
"status": status,
"amount_cents": amount_cents,
"priority": priority,
"region": rng.choice(_TASK_2_REGIONS),
}
records = [
make_record(
"normal",
status=_TASK_2_READY_STATUS,
amount_cents=rng.randrange(12_125, 28_975, 25),
priority=rng.randint(2, 6),
),
make_record(
"priority",
status=_TASK_2_READY_STATUS,
amount_cents=rng.randrange(13_175, 32_775, 25),
priority=rng.randint(8, 10),
),
make_record(
"amount",
status=_TASK_2_READY_STATUS,
amount_cents=rng.randrange(50_025, 88_975, 25),
priority=rng.randint(2, 6),
),
make_record(
"queued",
status=rng.choice(_TASK_2_NON_READY_STATUSES[:2]),
amount_cents=rng.randrange(18_125, 42_975, 25),
priority=rng.randint(4, 9),
),
make_record(
"drop",
status=_TASK_2_READY_STATUS,
amount_cents=-rng.randrange(125, 2_975, 25),
priority=rng.randint(8, 10),
),
]
if batch_index % 2 == 0:
records.append(
make_record(
"hold",
status=rng.choice(_TASK_2_NON_READY_STATUSES),
amount_cents=rng.randrange(24_125, 48_975, 25),
priority=rng.randint(1, 7),
)
)
rng.shuffle(records)
return tuple(records)
def _render_broken_pipeline_script(
visible_batch: tuple[dict[str, Any], ...]
) -> str:
return textwrap.dedent(
f'''\
import json
def process_data_stream(payloads):
"""
Normalize downstream billing candidates.
Keep only records whose status is "ready" and whose amount_cents is positive.
Convert amount_cents to amount_usd rounded to 2 decimals.
Mark priority_band as "high" when priority >= 8 or amount_usd >= 500.00.
Return rows sorted by amount_usd descending, then order_id ascending.
"""
processed_records = []
for payload in payloads:
if payload["status"] == "failed" or payload["amount_cents"] <= 0:
continue
amount_usd = round(payload["amount_cents"] // 100, 2)
priority_band = (
"high"
if payload["priority"] >= 8 and amount_usd >= 500.0
else "normal"
)
processed_records.append(
{{
"order_id": payload["order_id"],
"region": payload["region"],
"amount_usd": amount_usd,
"priority_band": priority_band,
}}
)
processed_records.sort(key=lambda item: (item["amount_usd"], item["order_id"]))
return processed_records
if __name__ == "__main__":
mock_batch = {list(visible_batch)!r}
print(json.dumps(process_data_stream(mock_batch), indent=2, sort_keys=True))
'''
).lstrip()
def _render_broken_format_report_script(target_date: str) -> str:
title = f"=== Daily Revenue Report ({target_date}) ==="
return textwrap.dedent(
f'''\
import json
import sys
def format_report(input_path):
"""Reads extracted data from JSON and produces a formatted stakeholder report."""
with open(input_path, encoding="utf-8") as f:
records = json.load(f)
lines = ["{title}", ""]
total_revenue = 0
for rec in records:
dept = rec["department"]
rev = int(rec["revenue"]) # BUG 1: int() truncates decimal precision
exp = rec["expenses"]
net = rev - exp
lines.append(f"Department: {{dept}}")
lines.append(f" Revenue: ${{rev}}")
lines.append(f" Expenses: ${{exp:.2f}}")
lines.append(f" Net: ${{net:.2f}}")
lines.append("")
total_revenue += rev
lines.append(f"Total Revenue: ${{total_revenue}}")
lines.append("=== End of Report ===")
output = "\\n".join(lines)
print(output)
return output
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python format_report.py <input.json>", file=sys.stderr)
sys.exit(1)
format_report(sys.argv[0]) # BUG 2: should be sys.argv[1]
'''
).lstrip()