lean-migrate / env /tasks.py
Hrushi's picture
Upload folder using huggingface_hub
bf9c466 verified
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from .source_parsers import load_source_fragment as _load_src
TASKS_DIR = Path(__file__).resolve().parents[1] / "tasks"
LEAN_DIR = Path(__file__).resolve().parents[1] / "lean"
TASK_SOURCE_DIRECTORIES = {
"rbac_auth": "rbac",
"pricing_engine": "pricing",
"payment_saga": "saga",
"path_canonicalizer": "path",
"expression_eval": "expr",
"lru_cache": "lru",
"shortest_path": "shortest",
"interval_scheduler": "interval",
}
# Map (task_id, spec_name) → (source_filename, source_function_name).
# Only needed for tasks whose source is C/C++; Python/JS tasks use inline
# code already. The source_function_name is the name as it appears in
# the C/C++ file (snake_case or short method name).
_C_SOURCE_MAP: dict[tuple[str, str], tuple[str, str]] = {
# expression_eval — C
("expression_eval", "evalBinOp"): ("source.c", "eval_bin_op"),
("expression_eval", "evalExpr"): ("source.c", "eval_expr"),
# path_canonicalizer — C
("path_canonicalizer", "normalizePath"): ("source.c", "normalize_path"),
("path_canonicalizer", "joinPaths"): ("source.c", "join_paths"),
("path_canonicalizer", "pathDepth"): ("source.c", "path_depth"),
# lru_cache — C++ (inline class methods)
("lru_cache", "lruEvict"): ("source.cpp", "evict"),
("lru_cache", "lruPut"): ("source.cpp", "put"),
("lru_cache", "lruGet"): ("source.cpp", "get"),
# shortest_path — C++
("shortest_path", "dijkstra"): ("source.cpp", "dijkstra"),
("shortest_path", "shortestDist"): ("source.cpp", "shortest_dist"),
# interval_scheduler — C++
("interval_scheduler", "mergeIntervals"): ("source.cpp", "merge_intervals"),
("interval_scheduler", "maxSchedule"): ("source.cpp", "max_schedule"),
}
def _src(task_id: str, spec_name: str) -> str:
"""Return the real source fragment for a C/C++ function via tree-sitter."""
entry = _C_SOURCE_MAP.get((task_id, spec_name))
if entry is None:
return f"// No source mapping for {task_id}::{spec_name}"
filename, fn_name = entry
subdir = TASK_SOURCE_DIRECTORIES[task_id]
source_path = TASKS_DIR / subdir / filename
return _load_src(source_path, fn_name)
@dataclass(frozen=True)
class FunctionSpec:
name: str
description: str
lean_fragment: str
source_fragment: str
depends_on: list[str]
is_proof_required: bool = False
rust_dispatch: str | None = None
@dataclass(frozen=True)
class SampleCase:
args: tuple[Any, ...]
def _case(*args: Any) -> SampleCase:
return SampleCase(args=tuple(args))
@dataclass(frozen=True)
class Task:
task_id: str
display_name: str
difficulty: str
source_language: str
target_language: str
lean_spec_module: str
lean_spec_docs: str
max_steps: int
functions: list[FunctionSpec]
weight_functional: float
weight_property: float
weight_proof: float
@property
def function_names(self) -> list[str]:
return [function_spec.name for function_spec in self.functions]
@property
def source_files(self) -> list[str]:
return self.function_names
@property
def dependency_graph(self) -> dict[str, list[str]]:
return {
function_spec.name: list(function_spec.depends_on)
for function_spec in self.functions
}
@property
def topo_order(self) -> list[str]:
function_order = {name: index for index, name in enumerate(self.function_names)}
dependents: dict[str, set[str]] = {name: set() for name in self.function_names}
in_degree = {name: 0 for name in self.function_names}
for function_name, dependencies in self.dependency_graph.items():
in_degree[function_name] = len(dependencies)
for dependency in dependencies:
dependents.setdefault(dependency, set()).add(function_name)
ready = [name for name, degree in in_degree.items() if degree == 0]
ready.sort(key=lambda function_name: function_order[function_name])
ordered: list[str] = []
while ready:
current = ready.pop(0)
ordered.append(current)
for dependent in dependents.get(current, set()):
in_degree[dependent] -= 1
if (
in_degree[dependent] == 0
and dependent not in ordered
and dependent not in ready
):
ready.append(dependent)
ready.sort(key=lambda function_name: function_order[function_name])
if len(ordered) != len(self.function_names):
return self.function_names
return ordered
def lean_spec_full(self) -> str:
spec_file = LEAN_DIR / f"{self.lean_spec_module}.lean"
if spec_file.exists():
return spec_file.read_text()
return f"-- Missing Lean spec: {spec_file}\n"
def get_function(self, name: str) -> FunctionSpec | None:
for function_spec in self.functions:
if function_spec.name == name:
return function_spec
return None
@property
def sample_inputs(self) -> dict[str, list[SampleCase]]:
return TASK_SAMPLE_INPUTS.get(self.task_id, {})
def _load_source(task_id: str, filename: str) -> str:
source_directory = TASK_SOURCE_DIRECTORIES.get(task_id, task_id)
source_path = TASKS_DIR / source_directory / filename
if source_path.exists():
return source_path.read_text()
return f"# Missing source file: {source_path}\n"
_TASK_MANIFEST: list[dict] = [
{
"task_id": "rbac_auth",
"display_name": "RBAC Authorization Engine",
"difficulty": "easy",
"source_language": "python",
"target_language": "typescript",
"lean_spec_module": "AuthSpec",
"lean_spec_docs": "Migrate a Python RBAC engine to TypeScript. Preserve role lookup, direct permission checks, and bounded inheritance traversal.",
"max_steps": 15,
"weight_functional": 1.0,
"weight_property": 0.0,
"weight_proof": 0.0,
"functions": [
{
"name": "findRole",
"description": "Look up a role by name from the role map. Return None if not found.",
"lean_fragment": """def findRole (roles : RoleMap) (name : String) : Option Role :=
roles.find? (fun role => role.name == name)""",
"source_fragment": """def find_role(roles: list[dict], name: str) -> dict | None:
return next((role for role in roles if role[\"name\"] == name), None)""",
"depends_on": [],
},
{
"name": "hasDirectPermission",
"description": "Check if a role has a specific resource/action permission directly, without inheritance traversal.",
"lean_fragment": """def hasDirectPermission (role : Role) (resource action : String) : Bool :=
role.permissions.any (fun permission => permission.resource == resource && permission.action == action)""",
"source_fragment": """def has_direct_permission(role: dict, resource: str, action: str) -> bool:
return any(
permission[\"resource\"] == resource and permission[\"action\"] == action
for permission in role.get(\"permissions\", [])
)""",
"depends_on": [],
},
{
"name": "canAccess",
"description": "Check permission with role inheritance up to depth 5.",
"lean_fragment": """def canAccess (roles : RoleMap) (roleName resource action : String) (depth : Nat := 5) : Bool :=
match depth, findRole roles roleName with
| 0, _ => false
| _, none => false
| n + 1, some role =>
hasDirectPermission role resource action ||
role.inherits.any (fun parentName =>
canAccess roles parentName resource action n)""",
"source_fragment": """def can_access(roles, role_name, resource, action, depth=5):
if depth == 0:
return False
role = find_role(roles, role_name)
if role is None:
return False
if has_direct_permission(role, resource, action):
return True
return any(
can_access(roles, parent_name, resource, action, depth - 1)
for parent_name in role.get(\"inherits\", [])
)""",
"depends_on": ["findRole", "hasDirectPermission"],
},
],
},
{
"task_id": "pricing_engine",
"display_name": "E-Commerce Pricing Engine",
"difficulty": "medium",
"source_language": "javascript",
"target_language": "python",
"lean_spec_module": "PricingSpec",
"lean_spec_docs": "Migrate a JavaScript pricing engine to Python. Use integer cents only. Preserve subtotal, discounts, and region tax handling.",
"max_steps": 25,
"weight_functional": 0.7,
"weight_property": 0.3,
"weight_proof": 0.0,
"functions": [
{
"name": "subtotal",
"description": "Sum of unit price times quantity for all line items.",
"lean_fragment": """def subtotal (order : Order) : Int :=
order.items.foldl (fun total item =>
total + item.unitPrice * item.quantity) 0""",
"source_fragment": """function subtotal(order) {
return order.items.reduce(
(total, item) => total + item.unitPrice * item.quantity,
0,
);
}""",
"depends_on": [],
},
{
"name": "taxRateBps",
"description": "Return the tax rate in basis points for a region.",
"lean_fragment": """def taxRateBps : String → Nat
| \"US-CA\" => 875
| \"US-TX\" => 625
| \"US-NY\" => 800
| \"UK\" => 2000
| _ => 0""",
"source_fragment": """const TAX_RATES_BPS = {
\"US-CA\": 875,
\"US-TX\": 625,
\"US-NY\": 800,
UK: 2000,
};
function taxRateBps(regionId) {
return TAX_RATES_BPS[regionId] ?? 0;
}""",
"depends_on": [],
},
{
"name": "couponDiscount",
"description": "Coupon discount capped at 50 percent of subtotal.",
"lean_fragment": """def couponDiscount (order : Order) : Int :=
let sub := subtotal order
let rawDiscount := order.coupons.foldl (fun total coupon =>
total + (sub * coupon.discountPercent / 100)) 0
min rawDiscount (sub / 2)""",
"source_fragment": """function couponDiscount(order) {
const sub = subtotal(order);
const rawDiscount = order.coupons.reduce(
(total, coupon) => total + Math.floor((sub * coupon.discountPercent) / 100),
0,
);
return Math.min(rawDiscount, Math.floor(sub / 2));
}""",
"depends_on": ["subtotal"],
},
{
"name": "loyaltyDiscount",
"description": "Loyalty points discount capped at 10 percent of subtotal.",
"lean_fragment": """def loyaltyDiscount (order : Order) : Int :=
let sub := subtotal order
min (order.loyaltyPoints : Int) (sub / 10)""",
"source_fragment": """function loyaltyDiscount(order) {
const sub = subtotal(order);
return Math.min(order.loyaltyPoints, Math.floor(sub / 10));
}""",
"depends_on": ["subtotal"],
},
{
"name": "finalPrice",
"description": "Final price after discounts and tax.",
"lean_fragment": """def finalPrice (order : Order) : Int :=
subtotal order - totalDiscount order + tax order""",
"source_fragment": """function finalPrice(order) {
const sub = subtotal(order);
const discount = couponDiscount(order) + loyaltyDiscount(order);
const afterDiscount = sub - discount;
const taxAmount = Math.floor((afterDiscount * taxRateBps(order.regionId)) / 10000);
return afterDiscount + taxAmount;
}""",
"depends_on": ["subtotal", "couponDiscount", "loyaltyDiscount", "taxRateBps"],
},
],
},
{
"task_id": "payment_saga",
"display_name": "Payment Saga State Machine",
"difficulty": "hard",
"source_language": "javascript",
"target_language": "python",
"lean_spec_module": "SagaSpec",
"lean_spec_docs": "Migrate an Express payment saga to Python/FastAPI and prove the no_double_charge theorem for the happy path.",
"max_steps": 40,
"weight_functional": 0.5,
"weight_property": 0.2,
"weight_proof": 0.3,
"functions": [
{
"name": "transition",
"description": "Pure state transition function for the payment saga.",
"lean_fragment": """def transition (state : SagaState) (event : SagaEvent) : SagaState :=
match state, event with
| .Idle, .Reserve => .Reserved
| .Reserved, .Authorize => .Authorized
| .Authorized, .Capture => .Captured
| .Captured, .Settle => .Settled
| .Reserved, .CompensateReserve => .Compensated
| .Authorized, .CompensateAuthorize => .Compensating
| .Compensating, .CompensateReserve => .Compensated
| .Captured, .CompensateCapture => .Compensating
| _, .Fail => .Failed
| current, _ => current""",
"source_fragment": """function transition(state, event) {
const key = `${state}:${event}`;
const transitions = {
\"Idle:Reserve\": \"Reserved\",
\"Reserved:Authorize\": \"Authorized\",
\"Authorized:Capture\": \"Captured\",
\"Captured:Settle\": \"Settled\",
\"Reserved:CompensateReserve\": \"Compensated\",
\"Authorized:CompensateAuthorize\": \"Compensating\",
\"Compensating:CompensateReserve\": \"Compensated\",
\"Captured:CompensateCapture\": \"Compensating\",
};
if (event === \"Fail\") {
return \"Failed\";
}
return transitions[key] ?? state;
}""",
"depends_on": [],
},
{
"name": "runSaga",
"description": "Fold a sequence of events into the final saga state.",
"lean_fragment": """def runSaga (events : List SagaEvent) : SagaState :=
events.foldl transition .Idle""",
"source_fragment": """function runSaga(events) {
return events.reduce(transition, \"Idle\");
}""",
"depends_on": ["transition"],
},
{
"name": "isCharged",
"description": "Return true when the saga is in a charged state.",
"lean_fragment": """def isCharged (state : SagaState) : Bool :=
match state with
| .Captured | .Settled => true
| _ => false""",
"source_fragment": """function isCharged(state) {
return state === \"Captured\" || state === \"Settled\";
}""",
"depends_on": [],
},
{
"name": "no_double_charge_proof",
"description": "Formal proof that the happy path reaches Settled and is charged exactly once.",
"lean_fragment": """theorem no_double_charge :
runSaga happyPath = .Settled ∧ isCharged (runSaga happyPath) = true := by
constructor
· exact happy_path_settles
· rw [happy_path_settles]
simp [isCharged]""",
"source_fragment": "# Formal proof task; no runtime source equivalent.",
"depends_on": ["transition", "runSaga", "isCharged"],
"is_proof_required": True,
},
],
},
{
"task_id": "path_canonicalizer",
"display_name": "Path Canonicalizer",
"difficulty": "medium",
"source_language": "c",
"target_language": "rust",
"lean_spec_module": "PathSpec",
"lean_spec_docs": "Migrate a C path normalization library to Rust. Preserve segment-level normalization, join semantics, and the idempotency invariant.",
"max_steps": 25,
"weight_functional": 0.5,
"weight_property": 0.3,
"weight_proof": 0.2,
"functions": [
{
"name": "normalizePath",
"description": "Remove '.' segments and resolve '..' clamped at root. Return path as list of segments.",
"lean_fragment": """def normalizeAux (acc : PathSpec.Path) : PathSpec.Path → PathSpec.Path
| [] => acc.reverse
| seg :: rest =>
if seg == "." then normalizeAux acc rest
else if seg == ".." then
match acc with
| [] => normalizeAux [] rest
| _ :: acc' => normalizeAux acc' rest
else normalizeAux (seg :: acc) rest
def normalizePath (p : PathSpec.Path) : PathSpec.Path := normalizeAux [] p""",
"source_fragment": _src("path_canonicalizer", "normalizePath"),
"depends_on": [],
"rust_dispatch": """let path: Vec<String> = serde_json::from_value(args[0].clone()).unwrap();
serde_json::to_value(normalize_path(path)).unwrap()""",
},
{
"name": "joinPaths",
"description": "Join two segment lists: normalize the concatenation of base and rel.",
"lean_fragment": """def joinPaths (base rel : PathSpec.Path) : PathSpec.Path :=
normalizePath (base ++ rel)""",
"source_fragment": _src("path_canonicalizer", "joinPaths"),
"depends_on": ["normalizePath"],
"rust_dispatch": """let base: Vec<String> = serde_json::from_value(args[0].clone()).unwrap();
let rel: Vec<String> = serde_json::from_value(args[1].clone()).unwrap();
serde_json::to_value(join_paths(base, rel)).unwrap()""",
},
{
"name": "pathDepth",
"description": "Return the number of segments in a (normalized) path.",
"lean_fragment": """def pathDepth (p : PathSpec.Path) : Nat := p.length""",
"source_fragment": _src("path_canonicalizer", "pathDepth"),
"depends_on": [],
"rust_dispatch": """let path: Vec<String> = serde_json::from_value(args[0].clone()).unwrap();
serde_json::to_value(path_depth(path)).unwrap()""",
},
{
"name": "canonicalizeProof",
"description": "Prove that normalize is idempotent: normalize(normalize(p)) = normalize(p).",
"lean_fragment": """-- Proof task: no runtime equivalent.
-- Agents must submit a Lean proof that normalizePath is idempotent.""",
"source_fragment": "-- Formal proof task; no C source equivalent.",
"depends_on": ["normalizePath"],
"is_proof_required": True,
},
],
},
{
"task_id": "expression_eval",
"display_name": "Expression Evaluator",
"difficulty": "medium",
"source_language": "c",
"target_language": "rust",
"lean_spec_module": "ExprSpec",
"lean_spec_docs": "Migrate a C recursive-descent expression evaluator to Rust. Return Err/None on division by zero; preserve operator precedence.",
"max_steps": 25,
"weight_functional": 0.5,
"weight_property": 0.3,
"weight_proof": 0.2,
"functions": [
{
"name": "evalBinOp",
"description": "Evaluate a binary operation on two integers. Return None for division by zero.",
"lean_fragment": """def evalBinOp (op : ExprSpec.Op) (a b : Int) : Option Int :=
match op with
| .Add => some (a + b)
| .Sub => some (a - b)
| .Mul => some (a * b)
| .Div => if b = 0 then none else some (a / b)""",
"source_fragment": _src("expression_eval", "evalBinOp"),
"depends_on": [],
"rust_dispatch": """let op_str: String = serde_json::from_value(args[0].clone()).unwrap();
let a: i64 = serde_json::from_value(args[1].clone()).unwrap();
let b: i64 = serde_json::from_value(args[2].clone()).unwrap();
let op = match op_str.as_str() { "Add" => Op::Add, "Sub" => Op::Sub, "Mul" => Op::Mul, "Div" => Op::Div, _ => panic!("unknown op: {}", op_str) };
serde_json::to_value(eval_bin_op(op, a, b)).unwrap()""",
},
{
"name": "evalExpr",
"description": "Recursively evaluate an expression tree. Propagate None on division by zero.",
"lean_fragment": """def evalExpr : ExprSpec.Expr → Option Int
| .Lit n => some n
| .BinOp op l r =>
match evalExpr l, evalExpr r with
| some a, some b => evalBinOp op a b
| _, _ => none""",
"source_fragment": _src("expression_eval", "evalExpr"),
"depends_on": ["evalBinOp"],
"rust_dispatch": """fn parse_expr(value: &serde_json::Value) -> Expr {
match value.get(\"tag\").and_then(|tag| tag.as_str()).unwrap() {
\"Lit\" => Expr::Lit(value.get(\"n\").and_then(|n| n.as_i64()).unwrap()),
\"BinOp\" => {
let op = match value.get(\"op\").and_then(|op| op.as_str()).unwrap() {
\"Add\" => Op::Add,
\"Sub\" => Op::Sub,
\"Mul\" => Op::Mul,
\"Div\" => Op::Div,
other => panic!(\"unknown op: {}\", other),
};
let left = Box::new(parse_expr(value.get(\"l\").unwrap()));
let right = Box::new(parse_expr(value.get(\"r\").unwrap()));
Expr::BinOp(op, left, right)
}
other => panic!(\"unknown expr tag: {}\", other),
}
}
let expr = parse_expr(&args[0]);
serde_json::to_value(eval_expr(&expr)).unwrap()""",
},
{
"name": "divisionProof",
"description": "Prove that evalBinOp Div _ 0 = none.",
"lean_fragment": """-- Proof task: no runtime equivalent.
-- Agents must prove: ∀ a, evalBinOp .Div a 0 = none""",
"source_fragment": "-- Formal proof task; no C source equivalent.",
"depends_on": ["evalBinOp"],
"is_proof_required": True,
},
],
},
{
"task_id": "lru_cache",
"display_name": "LRU Cache",
"difficulty": "hard",
"source_language": "cpp",
"target_language": "rust",
"lean_spec_module": "LruSpec",
"lean_spec_docs": "Migrate a C++ LRU cache (std::list + unordered_map) to Rust Vec<(u64,u64)>. Front = MRU. Prove the capacity bound.",
"max_steps": 40,
"weight_functional": 0.5,
"weight_property": 0.3,
"weight_proof": 0.2,
"functions": [
{
"name": "lruEvict",
"description": "Trim cache list to at most cap entries (keep front = MRU).",
"lean_fragment": """def lruEvict (cache : LruSpec.CacheState) (cap : Nat) : LruSpec.CacheState :=
cache.take cap""",
"source_fragment": _src("lru_cache", "lruEvict"),
"depends_on": [],
"rust_dispatch": """let cache: Vec<(u64,u64)> = serde_json::from_value(args[0].clone()).unwrap();
let cap: usize = args[1].as_u64().unwrap() as usize;
serde_json::to_value(lru_evict(cache, cap)).unwrap()""",
},
{
"name": "lruPut",
"description": "Insert/update key-val in cache (front=MRU). Evict LRU if over capacity.",
"lean_fragment": """def lruPut (cache : LruSpec.CacheState) (cap : Nat) (key val : Nat) : LruSpec.CacheState :=
((key, val) :: cache.filter (fun p => p.1 ≠ key)).take cap""",
"source_fragment": _src("lru_cache", "lruPut"),
"depends_on": ["lruEvict"],
"rust_dispatch": """let cache: Vec<(u64,u64)> = serde_json::from_value(args[0].clone()).unwrap();
let cap: usize = args[1].as_u64().unwrap() as usize;
let key: u64 = args[2].as_u64().unwrap();
let val: u64 = args[3].as_u64().unwrap();
serde_json::to_value(lru_put(cache, cap, key, val)).unwrap()""",
},
{
"name": "lruGet",
"description": "Look up key in cache. On hit, move to front (MRU). Return (value, updated_cache).",
"lean_fragment": """def lruGet (cache : LruSpec.CacheState) (key : Nat) : Option Nat × LruSpec.CacheState :=
match cache.find? (fun p => p.1 == key) with
| none => (none, cache)
| some (_, v) =>
let newCache := (key, v) :: cache.filter (fun p => p.1 ≠ key)
(some v, newCache)""",
"source_fragment": _src("lru_cache", "lruGet"),
"depends_on": [],
"rust_dispatch": """let cache: Vec<(u64,u64)> = serde_json::from_value(args[0].clone()).unwrap();
let key: u64 = args[1].as_u64().unwrap();
serde_json::to_value(lru_get(cache, key)).unwrap()""",
},
{
"name": "lruPutProof",
"description": "Prove that lruPut never exceeds capacity: (lruPut cache cap k v).length ≤ cap.",
"lean_fragment": """-- Proof task: no runtime equivalent.
-- Agents must prove: ∀ cache cap k v, (lruPut cache cap k v).length ≤ cap""",
"source_fragment": "-- Formal proof task; no C++ source equivalent.",
"depends_on": ["lruPut"],
"is_proof_required": True,
},
],
},
{
"task_id": "shortest_path",
"display_name": "Shortest Path (Dijkstra)",
"difficulty": "hard",
"source_language": "cpp",
"target_language": "rust",
"lean_spec_module": "ShortestPathSpec",
"lean_spec_docs": "Migrate a C++ Dijkstra implementation to Rust. Handle unreachable nodes as None. Prove self-distance is 0.",
"max_steps": 40,
"weight_functional": 0.5,
"weight_property": 0.3,
"weight_proof": 0.2,
"functions": [
{
"name": "dijkstra",
"description": "Run Dijkstra/Bellman-Ford from src on a weighted graph with n nodes. Return list of shortest distances (None = unreachable).",
"lean_fragment": """def updateDist (dists : List (Option Nat)) (i d : Nat) : List (Option Nat) :=
List.zipWith (fun j curr =>
if j == i then
match curr with
| none => some d
| some existing => some (min existing d)
else curr)
(List.range dists.length)
dists
def relaxEdge (dists : List (Option Nat)) (fr to w : Nat) : List (Option Nat) :=
match ShortestPathSpec.listGet dists fr with
| some (some df) => updateDist dists to (df + w)
| _ => dists
def relaxPass (dists : List (Option Nat)) (edges : ShortestPathSpec.Graph) : List (Option Nat) :=
edges.foldl (fun d e => relaxEdge d e.1 e.2.1 e.2.2) dists
def dijkstra (graph : ShortestPathSpec.Graph) (n src : Nat) : List (Option Nat) :=
let init := (List.range n).map (fun i => if i == src then some 0 else none)
(List.range (n - 1)).foldl (fun d _ => relaxPass d graph) init""",
"source_fragment": _src("shortest_path", "dijkstra"),
"depends_on": [],
"rust_dispatch": """let edges: Vec<(u64,u64,u64)> = serde_json::from_value(args[0].clone()).unwrap();
let n: usize = args[1].as_u64().unwrap() as usize;
let src: usize = args[2].as_u64().unwrap() as usize;
serde_json::to_value(dijkstra(edges, n, src)).unwrap()""",
},
{
"name": "shortestDist",
"description": "Return shortest distance from src to dst, or None if unreachable.",
"lean_fragment": """def shortestDist (graph : ShortestPathSpec.Graph) (n src dst : Nat) : Option Nat :=
match ShortestPathSpec.listGet (dijkstra graph n src) dst with
| some (some d) => some d
| _ => none""",
"source_fragment": _src("shortest_path", "shortestDist"),
"depends_on": ["dijkstra"],
"rust_dispatch": """let edges: Vec<(u64,u64,u64)> = serde_json::from_value(args[0].clone()).unwrap();
let n: usize = args[1].as_u64().unwrap() as usize;
let src: usize = args[2].as_u64().unwrap() as usize;
let dst: usize = args[3].as_u64().unwrap() as usize;
serde_json::to_value(shortest_dist(edges, n, src, dst)).unwrap()""",
},
{
"name": "relaxProof",
"description": "Prove that shortestDist graph n src src = some 0 when src < n.",
"lean_fragment": """-- Proof task: no runtime equivalent.
-- Agents must prove: ∀ graph n src, src < n → shortestDist graph n src src = some 0""",
"source_fragment": "-- Formal proof task; no C++ source equivalent.",
"depends_on": ["shortestDist"],
"is_proof_required": True,
},
],
},
{
"task_id": "interval_scheduler",
"display_name": "Interval Scheduler",
"difficulty": "hard",
"source_language": "cpp",
"target_language": "rust",
"lean_spec_module": "IntervalSpec",
"lean_spec_docs": "Migrate a C++ interval scheduler to Rust. Merge overlapping intervals and implement greedy activity selection. Prove merge produces sorted non-overlapping output.",
"max_steps": 40,
"weight_functional": 0.5,
"weight_property": 0.3,
"weight_proof": 0.2,
"functions": [
{
"name": "mergeIntervals",
"description": "Sort intervals by start and merge all overlapping/touching ones.",
"lean_fragment": """def insertByStart (iv : IntervalSpec.Interval) : List IntervalSpec.Interval → List IntervalSpec.Interval
| [] => [iv]
| x :: rest =>
if iv.1 < x.1 then iv :: x :: rest
else x :: insertByStart iv rest
def sortByStart (ivs : List IntervalSpec.Interval) : List IntervalSpec.Interval :=
ivs.foldl (fun sorted iv => insertByStart iv sorted) []
def dropLastInterval : List IntervalSpec.Interval → List IntervalSpec.Interval
| [] => []
| [_] => []
| x :: rest => x :: dropLastInterval rest
def mergeIntervals (ivs : List IntervalSpec.Interval) : List IntervalSpec.Interval :=
(sortByStart ivs).foldl (fun acc iv =>
match acc.getLast? with
| none => [iv]
| some (ls, le) =>
if iv.1 ≤ le then dropLastInterval acc ++ [(ls, max le iv.2)]
else acc ++ [iv]) []""",
"source_fragment": _src("interval_scheduler", "mergeIntervals"),
"depends_on": [],
"rust_dispatch": """let intervals: Vec<(i64,i64)> = serde_json::from_value(args[0].clone()).unwrap();
serde_json::to_value(merge_intervals(intervals)).unwrap()""",
},
{
"name": "maxSchedule",
"description": "Greedy activity selection: sort by end time, pick non-overlapping intervals maximizing count.",
"lean_fragment": """def insertByEnd (iv : IntervalSpec.Interval) : List IntervalSpec.Interval → List IntervalSpec.Interval
| [] => [iv]
| x :: rest =>
if iv.2 < x.2 then iv :: x :: rest
else x :: insertByEnd iv rest
def sortByEnd (ivs : List IntervalSpec.Interval) : List IntervalSpec.Interval :=
ivs.foldl (fun sorted iv => insertByEnd iv sorted) []
def maxSchedule (ivs : List IntervalSpec.Interval) : List IntervalSpec.Interval :=
(sortByEnd ivs).foldl (fun acc iv =>
match acc.getLast? with
| none => [iv]
| some (_, le) =>
if le ≤ iv.1 then acc ++ [iv]
else acc) []""",
"source_fragment": _src("interval_scheduler", "maxSchedule"),
"depends_on": [],
"rust_dispatch": """let intervals: Vec<(i64,i64)> = serde_json::from_value(args[0].clone()).unwrap();
serde_json::to_value(max_schedule(intervals)).unwrap()""",
},
{
"name": "mergeProof",
"description": "Prove that mergeIntervals output is sorted and non-overlapping.",
"lean_fragment": """-- Proof task: no runtime equivalent.
-- Agents must prove: ∀ ivs, isSorted (mergeIntervals ivs) ∧ noOverlap (mergeIntervals ivs)""",
"source_fragment": "-- Formal proof task; no C++ source equivalent.",
"depends_on": ["mergeIntervals"],
"is_proof_required": True,
},
],
},
]
def _task_from_manifest(entry: dict) -> Task:
specs = [FunctionSpec(**f) for f in entry["functions"]]
return Task(
task_id=entry["task_id"],
display_name=entry["display_name"],
difficulty=entry["difficulty"],
source_language=entry["source_language"],
target_language=entry["target_language"],
lean_spec_module=entry["lean_spec_module"],
lean_spec_docs=entry["lean_spec_docs"],
max_steps=entry["max_steps"],
functions=specs,
weight_functional=entry["weight_functional"],
weight_property=entry["weight_property"],
weight_proof=entry["weight_proof"],
)
def _build_tasks() -> dict[str, Task]:
return {e["task_id"]: _task_from_manifest(e) for e in _TASK_MANIFEST}
_TASKS = _build_tasks()
def _permission(resource: str, action: str) -> dict[str, str]:
return {"resource": resource, "action": action}
def _role(
name: str,
permissions: list[dict[str, str]] | None = None,
inherits: list[str] | None = None,
) -> dict[str, object]:
return {
"name": name,
"permissions": permissions or [],
"inherits": inherits or [],
}
def _item(sku: str, quantity: int, unit_price: int) -> dict[str, object]:
return {"sku": sku, "quantity": quantity, "unitPrice": unit_price}
def _coupon(code: str, discount_percent: int) -> dict[str, object]:
return {"code": code, "discountPercent": discount_percent}
def _order(
items: list[dict[str, object]],
coupons: list[dict[str, object]],
region_id: str,
loyalty_points: int,
) -> dict[str, object]:
return {
"items": items,
"coupons": coupons,
"regionId": region_id,
"loyaltyPoints": loyalty_points,
}
RBAC_VIEWER = _role("viewer", [_permission("reports", "read")])
RBAC_EDITOR = _role(
"editor",
[_permission("reports", "write")],
["viewer"],
)
RBAC_ADMIN = _role(
"admin",
[_permission("billing", "write")],
["editor"],
)
RBAC_AUDITOR = _role(
"auditor",
[_permission("reports", "read"), _permission("billing", "read")],
["viewer"],
)
RBAC_MANAGER = _role(
"manager",
[_permission("reports", "approve")],
["editor", "auditor"],
)
RBAC_GUEST = _role("guest")
RBAC_CONTRACTOR = _role(
"contractor",
[_permission("reports", "read")],
)
RBAC_CYCLE_A = _role("cycle-a", [], ["cycle-b"])
RBAC_CYCLE_B = _role("cycle-b", [_permission("billing", "write")], ["cycle-a"])
RBAC_FULL_ROLES = [
RBAC_VIEWER,
RBAC_EDITOR,
RBAC_ADMIN,
RBAC_AUDITOR,
RBAC_MANAGER,
RBAC_GUEST,
RBAC_CONTRACTOR,
]
RBAC_DUPLICATE_ROLES = [
RBAC_VIEWER,
_role(
"viewer",
[_permission("reports", "read"), _permission("reports", "archive")],
),
RBAC_ADMIN,
]
RBAC_CYCLIC_ROLES = [RBAC_CYCLE_A, RBAC_CYCLE_B]
PRICING_ORDERS = [
_order([], [], "US-CA", 0),
_order([_item("sku-1", 1, 1000)], [], "US-CA", 0),
_order(
[_item("sku-1", 2, 500), _item("sku-2", 1, 300)],
[_coupon("TEN", 10)],
"US-TX",
50,
),
_order(
[_item("sku-1", 4, 250)],
[_coupon("TWENTYFIVE", 25), _coupon("THIRTY", 30)],
"UK",
20,
),
_order([_item("sku-1", 10, 100)], [], "US-NY", 200),
_order(
[_item("sku-1", 3, 333), _item("sku-2", 2, 111)],
[_coupon("HALF", 50)],
"US-CA",
0,
),
_order([_item("sku-1", 0, 999), _item("sku-2", 1, 250)], [], "EU", 0),
_order(
[_item("sku-1", 3, 333)],
[_coupon("FIFTEEN", 15)],
"US-TX",
5,
),
_order(
[_item("sku-1", 1, 1000)],
[_coupon("FIVE", 5), _coupon("FIVE2", 5), _coupon("FIVE3", 5)],
"US-CA",
0,
),
_order(
[_item("sku-1", 5, 400), _item("sku-2", 2, 125)],
[_coupon("EIGHT", 8)],
"UK",
60,
),
]
SAGA_TRANSITION_CASES = [
_case("Idle", "Reserve"),
_case("Reserved", "Authorize"),
_case("Authorized", "Capture"),
_case("Captured", "Settle"),
_case("Reserved", "CompensateReserve"),
_case("Authorized", "CompensateAuthorize"),
_case("Compensating", "CompensateReserve"),
_case("Captured", "CompensateCapture"),
_case("Settled", "Fail"),
_case("Idle", "Authorize"),
]
SAGA_RUN_CASES = [
_case([]),
_case(["Reserve"]),
_case(["Reserve", "Authorize"]),
_case(["Reserve", "Authorize", "Capture"]),
_case(["Reserve", "Authorize", "Capture", "Settle"]),
_case(["Reserve", "Fail"]),
_case(["Fail"]),
_case(["Reserve", "CompensateReserve"]),
_case(["Reserve", "Authorize", "CompensateAuthorize", "CompensateReserve"]),
_case(
["Reserve", "Authorize", "Capture", "CompensateCapture", "CompensateReserve"]
),
]
SAGA_CHARGED_CASES = [
_case("Idle"),
_case("Reserved"),
_case("Authorized"),
_case("Captured"),
_case("Settled"),
_case("Compensating"),
_case("Compensated"),
_case("Failed"),
_case("Captured"),
_case("Settled"),
]
TASK_SAMPLE_INPUTS: dict[str, dict[str, list[SampleCase]]] = {
"rbac_auth": {
"findRole": [
_case(RBAC_FULL_ROLES, "viewer"),
_case(RBAC_FULL_ROLES, "admin"),
_case(RBAC_FULL_ROLES, "auditor"),
_case(RBAC_FULL_ROLES, "manager"),
_case(RBAC_FULL_ROLES, "guest"),
_case(RBAC_FULL_ROLES, "missing"),
_case([RBAC_VIEWER, RBAC_EDITOR, RBAC_ADMIN], "auditor"),
_case([RBAC_ADMIN, RBAC_VIEWER], "viewer"),
_case([], "viewer"),
_case(RBAC_DUPLICATE_ROLES, "viewer"),
],
"hasDirectPermission": [
_case(RBAC_VIEWER, "reports", "read"),
_case(RBAC_VIEWER, "reports", "write"),
_case(RBAC_EDITOR, "reports", "write"),
_case(RBAC_EDITOR, "reports", "read"),
_case(RBAC_ADMIN, "billing", "write"),
_case(RBAC_ADMIN, "reports", "read"),
_case(RBAC_AUDITOR, "billing", "read"),
_case(RBAC_GUEST, "reports", "read"),
_case(RBAC_MANAGER, "reports", "approve"),
_case(RBAC_MANAGER, "billing", "write"),
],
"canAccess": [
_case(RBAC_FULL_ROLES, "admin", "reports", "read", 5),
_case(RBAC_FULL_ROLES, "admin", "billing", "write", 5),
_case(RBAC_FULL_ROLES, "viewer", "billing", "write", 5),
_case(RBAC_FULL_ROLES, "editor", "reports", "read", 5),
_case(RBAC_FULL_ROLES, "editor", "reports", "write", 5),
_case(RBAC_FULL_ROLES, "guest", "reports", "read", 5),
_case(RBAC_FULL_ROLES, "auditor", "billing", "read", 5),
_case(RBAC_FULL_ROLES, "manager", "reports", "read", 5),
_case(RBAC_FULL_ROLES, "admin", "reports", "read", 0),
_case(RBAC_CYCLIC_ROLES, "cycle-a", "billing", "write", 5),
],
},
"pricing_engine": {
"subtotal": [_case(order) for order in PRICING_ORDERS],
"taxRateBps": [
_case("US-CA"),
_case("US-TX"),
_case("US-NY"),
_case("UK"),
_case("EU"),
_case(""),
_case("us-ca"),
_case("CA"),
_case("US-CA"),
_case("US-SW"),
],
"couponDiscount": [_case(order) for order in PRICING_ORDERS],
"loyaltyDiscount": [_case(order) for order in PRICING_ORDERS],
"finalPrice": [_case(order) for order in PRICING_ORDERS],
},
"payment_saga": {
"transition": [_case(*case.args) for case in SAGA_TRANSITION_CASES],
"runSaga": [_case(*case.args) for case in SAGA_RUN_CASES],
"isCharged": [_case(*case.args) for case in SAGA_CHARGED_CASES],
"no_double_charge_proof": [],
},
"path_canonicalizer": {
"normalizePath": [
_case(["usr", "local", "..", "bin"]),
_case(["usr", ".", "local"]),
_case(["..", "usr"]),
_case([]),
_case(["."]),
_case(["a", "b", "c", "..", "..", "d"]),
_case(["..", ".."]),
_case(["a", "..", ".."]),
_case(["a", "b"]),
_case(["a", ".", "b", "..", "c"]),
],
"joinPaths": [
_case(["usr", "local"], ["bin"]),
_case(["usr", "local"], [".."]),
_case([], ["a", "b"]),
_case(["a", "b"], []),
_case(["a", "b", "c"], ["..", "..", "d"]),
_case(["usr", "local"], [".", "bin"]),
_case(["a"], ["b", "c"]),
_case(["a", "b"], ["c", "..", "d"]),
_case(["a", "b", "c"], ["..", "..", ".."]),
_case(["a"], ["."]),
],
"pathDepth": [
_case(["usr", "local", "bin"]),
_case([]),
_case(["a"]),
_case(["a", "b", "c", "d", "e"]),
_case(["x"]),
_case(["a", "b"]),
_case(["home", "user", "docs"]),
_case(["etc", "nginx", "conf.d"]),
_case(["var", "log"]),
_case(["usr", "share", "man", "man1"]),
],
"canonicalizeProof": [],
},
"expression_eval": {
"evalBinOp": [
_case("Add", 3, 4),
_case("Sub", 10, 3),
_case("Mul", 3, 4),
_case("Div", 10, 2),
_case("Div", 10, 0),
_case("Add", -5, 5),
_case("Sub", 0, 0),
_case("Mul", -2, 3),
_case("Div", -10, 2),
_case("Mul", 0, 999),
],
"evalExpr": [
_case({"tag": "Lit", "n": 42}),
_case({"tag": "BinOp", "op": "Add", "l": {"tag": "Lit", "n": 1}, "r": {"tag": "Lit", "n": 2}}),
_case({"tag": "BinOp", "op": "Div", "l": {"tag": "Lit", "n": 5}, "r": {"tag": "Lit", "n": 0}}),
_case({"tag": "BinOp", "op": "Mul", "l": {"tag": "Lit", "n": 3}, "r": {"tag": "Lit", "n": 4}}),
_case({"tag": "BinOp", "op": "Sub", "l": {"tag": "BinOp", "op": "Add", "l": {"tag": "Lit", "n": 10}, "r": {"tag": "Lit", "n": 5}}, "r": {"tag": "Lit", "n": 3}}),
_case({"tag": "BinOp", "op": "Div", "l": {"tag": "BinOp", "op": "Mul", "l": {"tag": "Lit", "n": 6}, "r": {"tag": "Lit", "n": 7}}, "r": {"tag": "Lit", "n": 6}}),
_case({"tag": "Lit", "n": 0}),
_case({"tag": "BinOp", "op": "Add", "l": {"tag": "Lit", "n": -5}, "r": {"tag": "Lit", "n": 5}}),
_case({"tag": "BinOp", "op": "Div", "l": {"tag": "Lit", "n": 0}, "r": {"tag": "Lit", "n": 0}}),
_case({"tag": "BinOp", "op": "Sub", "l": {"tag": "Lit", "n": 100}, "r": {"tag": "BinOp", "op": "Mul", "l": {"tag": "Lit", "n": 5}, "r": {"tag": "Lit", "n": 20}}}),
],
"divisionProof": [],
},
"lru_cache": {
"lruEvict": [
_case([(1, 10), (2, 20), (3, 30)], 2),
_case([(1, 10)], 0),
_case([], 5),
_case([(1, 1), (2, 2), (3, 3), (4, 4)], 3),
_case([(5, 50)], 1),
_case([(1, 10), (2, 20)], 5),
_case([(1, 10), (2, 20), (3, 30)], 0),
_case([(1, 1)], 1),
_case([(1, 1), (2, 2)], 2),
_case([(1, 1), (2, 2), (3, 3)], 1),
],
"lruPut": [
_case([(1, 10), (2, 20)], 2, 1, 99),
_case([(1, 10), (2, 20)], 2, 3, 30),
_case([], 2, 1, 10),
_case([(1, 10)], 1, 2, 20),
_case([(2, 20), (1, 10)], 3, 3, 30),
_case([(1, 10), (2, 20), (3, 30)], 2, 4, 40),
_case([(1, 10)], 2, 1, 99),
_case([(1, 10), (2, 20)], 0, 3, 30),
_case([(3, 30), (1, 10), (2, 20)], 3, 2, 99),
_case([(1, 10)], 3, 2, 20),
],
"lruGet": [
_case([(1, 10), (2, 20), (3, 30)], 2),
_case([(1, 10), (2, 20)], 3),
_case([], 1),
_case([(5, 50)], 5),
_case([(1, 10), (2, 20), (3, 30)], 1),
_case([(1, 10), (2, 20), (3, 30)], 3),
_case([(1, 10)], 1),
_case([(1, 10), (2, 20)], 1),
_case([(2, 20), (1, 10)], 1),
_case([(1, 1), (2, 2), (3, 3)], 2),
],
"lruPutProof": [],
},
"shortest_path": {
"dijkstra": [
_case([(0, 1, 5), (1, 2, 3)], 3, 0),
_case([], 3, 0),
_case([(0, 1, 1), (0, 2, 4), (1, 2, 2)], 3, 0),
_case([(0, 1, 10), (1, 2, 10)], 3, 1),
_case([(0, 1, 7), (1, 2, 3), (0, 2, 15)], 3, 0),
_case([(0, 1, 1)], 3, 2),
_case([(0, 1, 5), (1, 0, 3)], 2, 0),
_case([(0, 1, 1), (1, 2, 1), (2, 3, 1)], 4, 0),
_case([(0, 1, 2), (0, 2, 5), (1, 3, 3), (2, 3, 1)], 4, 0),
_case([], 1, 0),
],
"shortestDist": [
_case([(0, 1, 5), (1, 2, 3)], 3, 0, 2),
_case([(0, 1, 5)], 3, 0, 2),
_case([], 3, 0, 0),
_case([(0, 1, 1), (0, 2, 4), (1, 2, 2)], 3, 0, 2),
_case([(0, 1, 7), (1, 2, 3), (0, 2, 15)], 3, 0, 2),
_case([(0, 1, 1)], 2, 1, 0),
_case([(0, 1, 1), (1, 2, 1), (2, 3, 1)], 4, 0, 3),
_case([(0, 1, 2), (0, 2, 5), (1, 3, 3), (2, 3, 1)], 4, 0, 3),
_case([(0, 1, 5), (1, 2, 3)], 3, 0, 0),
_case([(0, 1, 5), (1, 2, 3)], 3, 0, 1),
],
"relaxProof": [],
},
"interval_scheduler": {
"mergeIntervals": [
_case([(1, 3), (2, 5), (7, 9)]),
_case([(1, 4), (4, 6)]),
_case([(1, 2), (3, 4), (5, 6)]),
_case([]),
_case([(1, 10)]),
_case([(5, 6), (1, 3), (2, 4)]),
_case([(1, 5), (2, 3)]),
_case([(1, 2), (2, 3), (3, 4)]),
_case([(1, 100), (2, 3), (50, 60)]),
_case([(3, 4), (1, 2)]),
],
"maxSchedule": [
_case([(1, 3), (2, 4), (3, 5)]),
_case([(1, 2), (3, 4), (5, 6)]),
_case([]),
_case([(1, 10)]),
_case([(1, 4), (2, 3), (3, 5)]),
_case([(1, 2), (2, 3), (3, 4)]),
_case([(1, 5), (2, 3), (4, 6)]),
_case([(6, 8), (1, 4), (2, 5), (5, 7)]),
_case([(1, 3), (1, 4), (1, 5)]),
_case([(1, 2), (1, 3), (2, 4), (3, 5)]),
],
"mergeProof": [],
},
}
def get_task(task_id: str) -> Task:
try:
return _TASKS[task_id]
except KeyError as error:
available_tasks = ", ".join(sorted(_TASKS))
raise ValueError(
f"Unknown task: {task_id}. Available tasks: {available_tasks}"
) from error
def list_tasks() -> list[dict[str, object]]:
return [
{
"task_id": task.task_id,
"display_name": task.display_name,
"difficulty": task.difficulty,
"max_steps": task.max_steps,
"num_functions": len(task.functions),
}
for task in _TASKS.values()
]