from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import Any from .source_parsers import load_source_fragment as _load_src TASKS_DIR = Path(__file__).resolve().parents[1] / "tasks" LEAN_DIR = Path(__file__).resolve().parents[1] / "lean" TASK_SOURCE_DIRECTORIES = { "rbac_auth": "rbac", "pricing_engine": "pricing", "payment_saga": "saga", "path_canonicalizer": "path", "expression_eval": "expr", "lru_cache": "lru", "shortest_path": "shortest", "interval_scheduler": "interval", } # Map (task_id, spec_name) → (source_filename, source_function_name). # Only needed for tasks whose source is C/C++; Python/JS tasks use inline # code already. The source_function_name is the name as it appears in # the C/C++ file (snake_case or short method name). _C_SOURCE_MAP: dict[tuple[str, str], tuple[str, str]] = { # expression_eval — C ("expression_eval", "evalBinOp"): ("source.c", "eval_bin_op"), ("expression_eval", "evalExpr"): ("source.c", "eval_expr"), # path_canonicalizer — C ("path_canonicalizer", "normalizePath"): ("source.c", "normalize_path"), ("path_canonicalizer", "joinPaths"): ("source.c", "join_paths"), ("path_canonicalizer", "pathDepth"): ("source.c", "path_depth"), # lru_cache — C++ (inline class methods) ("lru_cache", "lruEvict"): ("source.cpp", "evict"), ("lru_cache", "lruPut"): ("source.cpp", "put"), ("lru_cache", "lruGet"): ("source.cpp", "get"), # shortest_path — C++ ("shortest_path", "dijkstra"): ("source.cpp", "dijkstra"), ("shortest_path", "shortestDist"): ("source.cpp", "shortest_dist"), # interval_scheduler — C++ ("interval_scheduler", "mergeIntervals"): ("source.cpp", "merge_intervals"), ("interval_scheduler", "maxSchedule"): ("source.cpp", "max_schedule"), } def _src(task_id: str, spec_name: str) -> str: """Return the real source fragment for a C/C++ function via tree-sitter.""" entry = _C_SOURCE_MAP.get((task_id, spec_name)) if entry is None: return f"// No source mapping for {task_id}::{spec_name}" filename, fn_name = entry subdir = TASK_SOURCE_DIRECTORIES[task_id] source_path = TASKS_DIR / subdir / filename return _load_src(source_path, fn_name) @dataclass(frozen=True) class FunctionSpec: name: str description: str lean_fragment: str source_fragment: str depends_on: list[str] is_proof_required: bool = False rust_dispatch: str | None = None @dataclass(frozen=True) class SampleCase: args: tuple[Any, ...] def _case(*args: Any) -> SampleCase: return SampleCase(args=tuple(args)) @dataclass(frozen=True) class Task: task_id: str display_name: str difficulty: str source_language: str target_language: str lean_spec_module: str lean_spec_docs: str max_steps: int functions: list[FunctionSpec] weight_functional: float weight_property: float weight_proof: float @property def function_names(self) -> list[str]: return [function_spec.name for function_spec in self.functions] @property def source_files(self) -> list[str]: return self.function_names @property def dependency_graph(self) -> dict[str, list[str]]: return { function_spec.name: list(function_spec.depends_on) for function_spec in self.functions } @property def topo_order(self) -> list[str]: function_order = {name: index for index, name in enumerate(self.function_names)} dependents: dict[str, set[str]] = {name: set() for name in self.function_names} in_degree = {name: 0 for name in self.function_names} for function_name, dependencies in self.dependency_graph.items(): in_degree[function_name] = len(dependencies) for dependency in dependencies: dependents.setdefault(dependency, set()).add(function_name) ready = [name for name, degree in in_degree.items() if degree == 0] ready.sort(key=lambda function_name: function_order[function_name]) ordered: list[str] = [] while ready: current = ready.pop(0) ordered.append(current) for dependent in dependents.get(current, set()): in_degree[dependent] -= 1 if ( in_degree[dependent] == 0 and dependent not in ordered and dependent not in ready ): ready.append(dependent) ready.sort(key=lambda function_name: function_order[function_name]) if len(ordered) != len(self.function_names): return self.function_names return ordered def lean_spec_full(self) -> str: spec_file = LEAN_DIR / f"{self.lean_spec_module}.lean" if spec_file.exists(): return spec_file.read_text() return f"-- Missing Lean spec: {spec_file}\n" def get_function(self, name: str) -> FunctionSpec | None: for function_spec in self.functions: if function_spec.name == name: return function_spec return None @property def sample_inputs(self) -> dict[str, list[SampleCase]]: return TASK_SAMPLE_INPUTS.get(self.task_id, {}) def _load_source(task_id: str, filename: str) -> str: source_directory = TASK_SOURCE_DIRECTORIES.get(task_id, task_id) source_path = TASKS_DIR / source_directory / filename if source_path.exists(): return source_path.read_text() return f"# Missing source file: {source_path}\n" _TASK_MANIFEST: list[dict] = [ { "task_id": "rbac_auth", "display_name": "RBAC Authorization Engine", "difficulty": "easy", "source_language": "python", "target_language": "typescript", "lean_spec_module": "AuthSpec", "lean_spec_docs": "Migrate a Python RBAC engine to TypeScript. Preserve role lookup, direct permission checks, and bounded inheritance traversal.", "max_steps": 15, "weight_functional": 1.0, "weight_property": 0.0, "weight_proof": 0.0, "functions": [ { "name": "findRole", "description": "Look up a role by name from the role map. Return None if not found.", "lean_fragment": """def findRole (roles : RoleMap) (name : String) : Option Role := roles.find? (fun role => role.name == name)""", "source_fragment": """def find_role(roles: list[dict], name: str) -> dict | None: return next((role for role in roles if role[\"name\"] == name), None)""", "depends_on": [], }, { "name": "hasDirectPermission", "description": "Check if a role has a specific resource/action permission directly, without inheritance traversal.", "lean_fragment": """def hasDirectPermission (role : Role) (resource action : String) : Bool := role.permissions.any (fun permission => permission.resource == resource && permission.action == action)""", "source_fragment": """def has_direct_permission(role: dict, resource: str, action: str) -> bool: return any( permission[\"resource\"] == resource and permission[\"action\"] == action for permission in role.get(\"permissions\", []) )""", "depends_on": [], }, { "name": "canAccess", "description": "Check permission with role inheritance up to depth 5.", "lean_fragment": """def canAccess (roles : RoleMap) (roleName resource action : String) (depth : Nat := 5) : Bool := match depth, findRole roles roleName with | 0, _ => false | _, none => false | n + 1, some role => hasDirectPermission role resource action || role.inherits.any (fun parentName => canAccess roles parentName resource action n)""", "source_fragment": """def can_access(roles, role_name, resource, action, depth=5): if depth == 0: return False role = find_role(roles, role_name) if role is None: return False if has_direct_permission(role, resource, action): return True return any( can_access(roles, parent_name, resource, action, depth - 1) for parent_name in role.get(\"inherits\", []) )""", "depends_on": ["findRole", "hasDirectPermission"], }, ], }, { "task_id": "pricing_engine", "display_name": "E-Commerce Pricing Engine", "difficulty": "medium", "source_language": "javascript", "target_language": "python", "lean_spec_module": "PricingSpec", "lean_spec_docs": "Migrate a JavaScript pricing engine to Python. Use integer cents only. Preserve subtotal, discounts, and region tax handling.", "max_steps": 25, "weight_functional": 0.7, "weight_property": 0.3, "weight_proof": 0.0, "functions": [ { "name": "subtotal", "description": "Sum of unit price times quantity for all line items.", "lean_fragment": """def subtotal (order : Order) : Int := order.items.foldl (fun total item => total + item.unitPrice * item.quantity) 0""", "source_fragment": """function subtotal(order) { return order.items.reduce( (total, item) => total + item.unitPrice * item.quantity, 0, ); }""", "depends_on": [], }, { "name": "taxRateBps", "description": "Return the tax rate in basis points for a region.", "lean_fragment": """def taxRateBps : String → Nat | \"US-CA\" => 875 | \"US-TX\" => 625 | \"US-NY\" => 800 | \"UK\" => 2000 | _ => 0""", "source_fragment": """const TAX_RATES_BPS = { \"US-CA\": 875, \"US-TX\": 625, \"US-NY\": 800, UK: 2000, }; function taxRateBps(regionId) { return TAX_RATES_BPS[regionId] ?? 0; }""", "depends_on": [], }, { "name": "couponDiscount", "description": "Coupon discount capped at 50 percent of subtotal.", "lean_fragment": """def couponDiscount (order : Order) : Int := let sub := subtotal order let rawDiscount := order.coupons.foldl (fun total coupon => total + (sub * coupon.discountPercent / 100)) 0 min rawDiscount (sub / 2)""", "source_fragment": """function couponDiscount(order) { const sub = subtotal(order); const rawDiscount = order.coupons.reduce( (total, coupon) => total + Math.floor((sub * coupon.discountPercent) / 100), 0, ); return Math.min(rawDiscount, Math.floor(sub / 2)); }""", "depends_on": ["subtotal"], }, { "name": "loyaltyDiscount", "description": "Loyalty points discount capped at 10 percent of subtotal.", "lean_fragment": """def loyaltyDiscount (order : Order) : Int := let sub := subtotal order min (order.loyaltyPoints : Int) (sub / 10)""", "source_fragment": """function loyaltyDiscount(order) { const sub = subtotal(order); return Math.min(order.loyaltyPoints, Math.floor(sub / 10)); }""", "depends_on": ["subtotal"], }, { "name": "finalPrice", "description": "Final price after discounts and tax.", "lean_fragment": """def finalPrice (order : Order) : Int := subtotal order - totalDiscount order + tax order""", "source_fragment": """function finalPrice(order) { const sub = subtotal(order); const discount = couponDiscount(order) + loyaltyDiscount(order); const afterDiscount = sub - discount; const taxAmount = Math.floor((afterDiscount * taxRateBps(order.regionId)) / 10000); return afterDiscount + taxAmount; }""", "depends_on": ["subtotal", "couponDiscount", "loyaltyDiscount", "taxRateBps"], }, ], }, { "task_id": "payment_saga", "display_name": "Payment Saga State Machine", "difficulty": "hard", "source_language": "javascript", "target_language": "python", "lean_spec_module": "SagaSpec", "lean_spec_docs": "Migrate an Express payment saga to Python/FastAPI and prove the no_double_charge theorem for the happy path.", "max_steps": 40, "weight_functional": 0.5, "weight_property": 0.2, "weight_proof": 0.3, "functions": [ { "name": "transition", "description": "Pure state transition function for the payment saga.", "lean_fragment": """def transition (state : SagaState) (event : SagaEvent) : SagaState := match state, event with | .Idle, .Reserve => .Reserved | .Reserved, .Authorize => .Authorized | .Authorized, .Capture => .Captured | .Captured, .Settle => .Settled | .Reserved, .CompensateReserve => .Compensated | .Authorized, .CompensateAuthorize => .Compensating | .Compensating, .CompensateReserve => .Compensated | .Captured, .CompensateCapture => .Compensating | _, .Fail => .Failed | current, _ => current""", "source_fragment": """function transition(state, event) { const key = `${state}:${event}`; const transitions = { \"Idle:Reserve\": \"Reserved\", \"Reserved:Authorize\": \"Authorized\", \"Authorized:Capture\": \"Captured\", \"Captured:Settle\": \"Settled\", \"Reserved:CompensateReserve\": \"Compensated\", \"Authorized:CompensateAuthorize\": \"Compensating\", \"Compensating:CompensateReserve\": \"Compensated\", \"Captured:CompensateCapture\": \"Compensating\", }; if (event === \"Fail\") { return \"Failed\"; } return transitions[key] ?? state; }""", "depends_on": [], }, { "name": "runSaga", "description": "Fold a sequence of events into the final saga state.", "lean_fragment": """def runSaga (events : List SagaEvent) : SagaState := events.foldl transition .Idle""", "source_fragment": """function runSaga(events) { return events.reduce(transition, \"Idle\"); }""", "depends_on": ["transition"], }, { "name": "isCharged", "description": "Return true when the saga is in a charged state.", "lean_fragment": """def isCharged (state : SagaState) : Bool := match state with | .Captured | .Settled => true | _ => false""", "source_fragment": """function isCharged(state) { return state === \"Captured\" || state === \"Settled\"; }""", "depends_on": [], }, { "name": "no_double_charge_proof", "description": "Formal proof that the happy path reaches Settled and is charged exactly once.", "lean_fragment": """theorem no_double_charge : runSaga happyPath = .Settled ∧ isCharged (runSaga happyPath) = true := by constructor · exact happy_path_settles · rw [happy_path_settles] simp [isCharged]""", "source_fragment": "# Formal proof task; no runtime source equivalent.", "depends_on": ["transition", "runSaga", "isCharged"], "is_proof_required": True, }, ], }, { "task_id": "path_canonicalizer", "display_name": "Path Canonicalizer", "difficulty": "medium", "source_language": "c", "target_language": "rust", "lean_spec_module": "PathSpec", "lean_spec_docs": "Migrate a C path normalization library to Rust. Preserve segment-level normalization, join semantics, and the idempotency invariant.", "max_steps": 25, "weight_functional": 0.5, "weight_property": 0.3, "weight_proof": 0.2, "functions": [ { "name": "normalizePath", "description": "Remove '.' segments and resolve '..' clamped at root. Return path as list of segments.", "lean_fragment": """def normalizeAux (acc : PathSpec.Path) : PathSpec.Path → PathSpec.Path | [] => acc.reverse | seg :: rest => if seg == "." then normalizeAux acc rest else if seg == ".." then match acc with | [] => normalizeAux [] rest | _ :: acc' => normalizeAux acc' rest else normalizeAux (seg :: acc) rest def normalizePath (p : PathSpec.Path) : PathSpec.Path := normalizeAux [] p""", "source_fragment": _src("path_canonicalizer", "normalizePath"), "depends_on": [], "rust_dispatch": """let path: Vec = serde_json::from_value(args[0].clone()).unwrap(); serde_json::to_value(normalize_path(path)).unwrap()""", }, { "name": "joinPaths", "description": "Join two segment lists: normalize the concatenation of base and rel.", "lean_fragment": """def joinPaths (base rel : PathSpec.Path) : PathSpec.Path := normalizePath (base ++ rel)""", "source_fragment": _src("path_canonicalizer", "joinPaths"), "depends_on": ["normalizePath"], "rust_dispatch": """let base: Vec = serde_json::from_value(args[0].clone()).unwrap(); let rel: Vec = serde_json::from_value(args[1].clone()).unwrap(); serde_json::to_value(join_paths(base, rel)).unwrap()""", }, { "name": "pathDepth", "description": "Return the number of segments in a (normalized) path.", "lean_fragment": """def pathDepth (p : PathSpec.Path) : Nat := p.length""", "source_fragment": _src("path_canonicalizer", "pathDepth"), "depends_on": [], "rust_dispatch": """let path: Vec = serde_json::from_value(args[0].clone()).unwrap(); serde_json::to_value(path_depth(path)).unwrap()""", }, { "name": "canonicalizeProof", "description": "Prove that normalize is idempotent: normalize(normalize(p)) = normalize(p).", "lean_fragment": """-- Proof task: no runtime equivalent. -- Agents must submit a Lean proof that normalizePath is idempotent.""", "source_fragment": "-- Formal proof task; no C source equivalent.", "depends_on": ["normalizePath"], "is_proof_required": True, }, ], }, { "task_id": "expression_eval", "display_name": "Expression Evaluator", "difficulty": "medium", "source_language": "c", "target_language": "rust", "lean_spec_module": "ExprSpec", "lean_spec_docs": "Migrate a C recursive-descent expression evaluator to Rust. Return Err/None on division by zero; preserve operator precedence.", "max_steps": 25, "weight_functional": 0.5, "weight_property": 0.3, "weight_proof": 0.2, "functions": [ { "name": "evalBinOp", "description": "Evaluate a binary operation on two integers. Return None for division by zero.", "lean_fragment": """def evalBinOp (op : ExprSpec.Op) (a b : Int) : Option Int := match op with | .Add => some (a + b) | .Sub => some (a - b) | .Mul => some (a * b) | .Div => if b = 0 then none else some (a / b)""", "source_fragment": _src("expression_eval", "evalBinOp"), "depends_on": [], "rust_dispatch": """let op_str: String = serde_json::from_value(args[0].clone()).unwrap(); let a: i64 = serde_json::from_value(args[1].clone()).unwrap(); let b: i64 = serde_json::from_value(args[2].clone()).unwrap(); let op = match op_str.as_str() { "Add" => Op::Add, "Sub" => Op::Sub, "Mul" => Op::Mul, "Div" => Op::Div, _ => panic!("unknown op: {}", op_str) }; serde_json::to_value(eval_bin_op(op, a, b)).unwrap()""", }, { "name": "evalExpr", "description": "Recursively evaluate an expression tree. Propagate None on division by zero.", "lean_fragment": """def evalExpr : ExprSpec.Expr → Option Int | .Lit n => some n | .BinOp op l r => match evalExpr l, evalExpr r with | some a, some b => evalBinOp op a b | _, _ => none""", "source_fragment": _src("expression_eval", "evalExpr"), "depends_on": ["evalBinOp"], "rust_dispatch": """fn parse_expr(value: &serde_json::Value) -> Expr { match value.get(\"tag\").and_then(|tag| tag.as_str()).unwrap() { \"Lit\" => Expr::Lit(value.get(\"n\").and_then(|n| n.as_i64()).unwrap()), \"BinOp\" => { let op = match value.get(\"op\").and_then(|op| op.as_str()).unwrap() { \"Add\" => Op::Add, \"Sub\" => Op::Sub, \"Mul\" => Op::Mul, \"Div\" => Op::Div, other => panic!(\"unknown op: {}\", other), }; let left = Box::new(parse_expr(value.get(\"l\").unwrap())); let right = Box::new(parse_expr(value.get(\"r\").unwrap())); Expr::BinOp(op, left, right) } other => panic!(\"unknown expr tag: {}\", other), } } let expr = parse_expr(&args[0]); serde_json::to_value(eval_expr(&expr)).unwrap()""", }, { "name": "divisionProof", "description": "Prove that evalBinOp Div _ 0 = none.", "lean_fragment": """-- Proof task: no runtime equivalent. -- Agents must prove: ∀ a, evalBinOp .Div a 0 = none""", "source_fragment": "-- Formal proof task; no C source equivalent.", "depends_on": ["evalBinOp"], "is_proof_required": True, }, ], }, { "task_id": "lru_cache", "display_name": "LRU Cache", "difficulty": "hard", "source_language": "cpp", "target_language": "rust", "lean_spec_module": "LruSpec", "lean_spec_docs": "Migrate a C++ LRU cache (std::list + unordered_map) to Rust Vec<(u64,u64)>. Front = MRU. Prove the capacity bound.", "max_steps": 40, "weight_functional": 0.5, "weight_property": 0.3, "weight_proof": 0.2, "functions": [ { "name": "lruEvict", "description": "Trim cache list to at most cap entries (keep front = MRU).", "lean_fragment": """def lruEvict (cache : LruSpec.CacheState) (cap : Nat) : LruSpec.CacheState := cache.take cap""", "source_fragment": _src("lru_cache", "lruEvict"), "depends_on": [], "rust_dispatch": """let cache: Vec<(u64,u64)> = serde_json::from_value(args[0].clone()).unwrap(); let cap: usize = args[1].as_u64().unwrap() as usize; serde_json::to_value(lru_evict(cache, cap)).unwrap()""", }, { "name": "lruPut", "description": "Insert/update key-val in cache (front=MRU). Evict LRU if over capacity.", "lean_fragment": """def lruPut (cache : LruSpec.CacheState) (cap : Nat) (key val : Nat) : LruSpec.CacheState := ((key, val) :: cache.filter (fun p => p.1 ≠ key)).take cap""", "source_fragment": _src("lru_cache", "lruPut"), "depends_on": ["lruEvict"], "rust_dispatch": """let cache: Vec<(u64,u64)> = serde_json::from_value(args[0].clone()).unwrap(); let cap: usize = args[1].as_u64().unwrap() as usize; let key: u64 = args[2].as_u64().unwrap(); let val: u64 = args[3].as_u64().unwrap(); serde_json::to_value(lru_put(cache, cap, key, val)).unwrap()""", }, { "name": "lruGet", "description": "Look up key in cache. On hit, move to front (MRU). Return (value, updated_cache).", "lean_fragment": """def lruGet (cache : LruSpec.CacheState) (key : Nat) : Option Nat × LruSpec.CacheState := match cache.find? (fun p => p.1 == key) with | none => (none, cache) | some (_, v) => let newCache := (key, v) :: cache.filter (fun p => p.1 ≠ key) (some v, newCache)""", "source_fragment": _src("lru_cache", "lruGet"), "depends_on": [], "rust_dispatch": """let cache: Vec<(u64,u64)> = serde_json::from_value(args[0].clone()).unwrap(); let key: u64 = args[1].as_u64().unwrap(); serde_json::to_value(lru_get(cache, key)).unwrap()""", }, { "name": "lruPutProof", "description": "Prove that lruPut never exceeds capacity: (lruPut cache cap k v).length ≤ cap.", "lean_fragment": """-- Proof task: no runtime equivalent. -- Agents must prove: ∀ cache cap k v, (lruPut cache cap k v).length ≤ cap""", "source_fragment": "-- Formal proof task; no C++ source equivalent.", "depends_on": ["lruPut"], "is_proof_required": True, }, ], }, { "task_id": "shortest_path", "display_name": "Shortest Path (Dijkstra)", "difficulty": "hard", "source_language": "cpp", "target_language": "rust", "lean_spec_module": "ShortestPathSpec", "lean_spec_docs": "Migrate a C++ Dijkstra implementation to Rust. Handle unreachable nodes as None. Prove self-distance is 0.", "max_steps": 40, "weight_functional": 0.5, "weight_property": 0.3, "weight_proof": 0.2, "functions": [ { "name": "dijkstra", "description": "Run Dijkstra/Bellman-Ford from src on a weighted graph with n nodes. Return list of shortest distances (None = unreachable).", "lean_fragment": """def updateDist (dists : List (Option Nat)) (i d : Nat) : List (Option Nat) := List.zipWith (fun j curr => if j == i then match curr with | none => some d | some existing => some (min existing d) else curr) (List.range dists.length) dists def relaxEdge (dists : List (Option Nat)) (fr to w : Nat) : List (Option Nat) := match ShortestPathSpec.listGet dists fr with | some (some df) => updateDist dists to (df + w) | _ => dists def relaxPass (dists : List (Option Nat)) (edges : ShortestPathSpec.Graph) : List (Option Nat) := edges.foldl (fun d e => relaxEdge d e.1 e.2.1 e.2.2) dists def dijkstra (graph : ShortestPathSpec.Graph) (n src : Nat) : List (Option Nat) := let init := (List.range n).map (fun i => if i == src then some 0 else none) (List.range (n - 1)).foldl (fun d _ => relaxPass d graph) init""", "source_fragment": _src("shortest_path", "dijkstra"), "depends_on": [], "rust_dispatch": """let edges: Vec<(u64,u64,u64)> = serde_json::from_value(args[0].clone()).unwrap(); let n: usize = args[1].as_u64().unwrap() as usize; let src: usize = args[2].as_u64().unwrap() as usize; serde_json::to_value(dijkstra(edges, n, src)).unwrap()""", }, { "name": "shortestDist", "description": "Return shortest distance from src to dst, or None if unreachable.", "lean_fragment": """def shortestDist (graph : ShortestPathSpec.Graph) (n src dst : Nat) : Option Nat := match ShortestPathSpec.listGet (dijkstra graph n src) dst with | some (some d) => some d | _ => none""", "source_fragment": _src("shortest_path", "shortestDist"), "depends_on": ["dijkstra"], "rust_dispatch": """let edges: Vec<(u64,u64,u64)> = serde_json::from_value(args[0].clone()).unwrap(); let n: usize = args[1].as_u64().unwrap() as usize; let src: usize = args[2].as_u64().unwrap() as usize; let dst: usize = args[3].as_u64().unwrap() as usize; serde_json::to_value(shortest_dist(edges, n, src, dst)).unwrap()""", }, { "name": "relaxProof", "description": "Prove that shortestDist graph n src src = some 0 when src < n.", "lean_fragment": """-- Proof task: no runtime equivalent. -- Agents must prove: ∀ graph n src, src < n → shortestDist graph n src src = some 0""", "source_fragment": "-- Formal proof task; no C++ source equivalent.", "depends_on": ["shortestDist"], "is_proof_required": True, }, ], }, { "task_id": "interval_scheduler", "display_name": "Interval Scheduler", "difficulty": "hard", "source_language": "cpp", "target_language": "rust", "lean_spec_module": "IntervalSpec", "lean_spec_docs": "Migrate a C++ interval scheduler to Rust. Merge overlapping intervals and implement greedy activity selection. Prove merge produces sorted non-overlapping output.", "max_steps": 40, "weight_functional": 0.5, "weight_property": 0.3, "weight_proof": 0.2, "functions": [ { "name": "mergeIntervals", "description": "Sort intervals by start and merge all overlapping/touching ones.", "lean_fragment": """def insertByStart (iv : IntervalSpec.Interval) : List IntervalSpec.Interval → List IntervalSpec.Interval | [] => [iv] | x :: rest => if iv.1 < x.1 then iv :: x :: rest else x :: insertByStart iv rest def sortByStart (ivs : List IntervalSpec.Interval) : List IntervalSpec.Interval := ivs.foldl (fun sorted iv => insertByStart iv sorted) [] def dropLastInterval : List IntervalSpec.Interval → List IntervalSpec.Interval | [] => [] | [_] => [] | x :: rest => x :: dropLastInterval rest def mergeIntervals (ivs : List IntervalSpec.Interval) : List IntervalSpec.Interval := (sortByStart ivs).foldl (fun acc iv => match acc.getLast? with | none => [iv] | some (ls, le) => if iv.1 ≤ le then dropLastInterval acc ++ [(ls, max le iv.2)] else acc ++ [iv]) []""", "source_fragment": _src("interval_scheduler", "mergeIntervals"), "depends_on": [], "rust_dispatch": """let intervals: Vec<(i64,i64)> = serde_json::from_value(args[0].clone()).unwrap(); serde_json::to_value(merge_intervals(intervals)).unwrap()""", }, { "name": "maxSchedule", "description": "Greedy activity selection: sort by end time, pick non-overlapping intervals maximizing count.", "lean_fragment": """def insertByEnd (iv : IntervalSpec.Interval) : List IntervalSpec.Interval → List IntervalSpec.Interval | [] => [iv] | x :: rest => if iv.2 < x.2 then iv :: x :: rest else x :: insertByEnd iv rest def sortByEnd (ivs : List IntervalSpec.Interval) : List IntervalSpec.Interval := ivs.foldl (fun sorted iv => insertByEnd iv sorted) [] def maxSchedule (ivs : List IntervalSpec.Interval) : List IntervalSpec.Interval := (sortByEnd ivs).foldl (fun acc iv => match acc.getLast? with | none => [iv] | some (_, le) => if le ≤ iv.1 then acc ++ [iv] else acc) []""", "source_fragment": _src("interval_scheduler", "maxSchedule"), "depends_on": [], "rust_dispatch": """let intervals: Vec<(i64,i64)> = serde_json::from_value(args[0].clone()).unwrap(); serde_json::to_value(max_schedule(intervals)).unwrap()""", }, { "name": "mergeProof", "description": "Prove that mergeIntervals output is sorted and non-overlapping.", "lean_fragment": """-- Proof task: no runtime equivalent. -- Agents must prove: ∀ ivs, isSorted (mergeIntervals ivs) ∧ noOverlap (mergeIntervals ivs)""", "source_fragment": "-- Formal proof task; no C++ source equivalent.", "depends_on": ["mergeIntervals"], "is_proof_required": True, }, ], }, ] def _task_from_manifest(entry: dict) -> Task: specs = [FunctionSpec(**f) for f in entry["functions"]] return Task( task_id=entry["task_id"], display_name=entry["display_name"], difficulty=entry["difficulty"], source_language=entry["source_language"], target_language=entry["target_language"], lean_spec_module=entry["lean_spec_module"], lean_spec_docs=entry["lean_spec_docs"], max_steps=entry["max_steps"], functions=specs, weight_functional=entry["weight_functional"], weight_property=entry["weight_property"], weight_proof=entry["weight_proof"], ) def _build_tasks() -> dict[str, Task]: return {e["task_id"]: _task_from_manifest(e) for e in _TASK_MANIFEST} _TASKS = _build_tasks() def _permission(resource: str, action: str) -> dict[str, str]: return {"resource": resource, "action": action} def _role( name: str, permissions: list[dict[str, str]] | None = None, inherits: list[str] | None = None, ) -> dict[str, object]: return { "name": name, "permissions": permissions or [], "inherits": inherits or [], } def _item(sku: str, quantity: int, unit_price: int) -> dict[str, object]: return {"sku": sku, "quantity": quantity, "unitPrice": unit_price} def _coupon(code: str, discount_percent: int) -> dict[str, object]: return {"code": code, "discountPercent": discount_percent} def _order( items: list[dict[str, object]], coupons: list[dict[str, object]], region_id: str, loyalty_points: int, ) -> dict[str, object]: return { "items": items, "coupons": coupons, "regionId": region_id, "loyaltyPoints": loyalty_points, } RBAC_VIEWER = _role("viewer", [_permission("reports", "read")]) RBAC_EDITOR = _role( "editor", [_permission("reports", "write")], ["viewer"], ) RBAC_ADMIN = _role( "admin", [_permission("billing", "write")], ["editor"], ) RBAC_AUDITOR = _role( "auditor", [_permission("reports", "read"), _permission("billing", "read")], ["viewer"], ) RBAC_MANAGER = _role( "manager", [_permission("reports", "approve")], ["editor", "auditor"], ) RBAC_GUEST = _role("guest") RBAC_CONTRACTOR = _role( "contractor", [_permission("reports", "read")], ) RBAC_CYCLE_A = _role("cycle-a", [], ["cycle-b"]) RBAC_CYCLE_B = _role("cycle-b", [_permission("billing", "write")], ["cycle-a"]) RBAC_FULL_ROLES = [ RBAC_VIEWER, RBAC_EDITOR, RBAC_ADMIN, RBAC_AUDITOR, RBAC_MANAGER, RBAC_GUEST, RBAC_CONTRACTOR, ] RBAC_DUPLICATE_ROLES = [ RBAC_VIEWER, _role( "viewer", [_permission("reports", "read"), _permission("reports", "archive")], ), RBAC_ADMIN, ] RBAC_CYCLIC_ROLES = [RBAC_CYCLE_A, RBAC_CYCLE_B] PRICING_ORDERS = [ _order([], [], "US-CA", 0), _order([_item("sku-1", 1, 1000)], [], "US-CA", 0), _order( [_item("sku-1", 2, 500), _item("sku-2", 1, 300)], [_coupon("TEN", 10)], "US-TX", 50, ), _order( [_item("sku-1", 4, 250)], [_coupon("TWENTYFIVE", 25), _coupon("THIRTY", 30)], "UK", 20, ), _order([_item("sku-1", 10, 100)], [], "US-NY", 200), _order( [_item("sku-1", 3, 333), _item("sku-2", 2, 111)], [_coupon("HALF", 50)], "US-CA", 0, ), _order([_item("sku-1", 0, 999), _item("sku-2", 1, 250)], [], "EU", 0), _order( [_item("sku-1", 3, 333)], [_coupon("FIFTEEN", 15)], "US-TX", 5, ), _order( [_item("sku-1", 1, 1000)], [_coupon("FIVE", 5), _coupon("FIVE2", 5), _coupon("FIVE3", 5)], "US-CA", 0, ), _order( [_item("sku-1", 5, 400), _item("sku-2", 2, 125)], [_coupon("EIGHT", 8)], "UK", 60, ), ] SAGA_TRANSITION_CASES = [ _case("Idle", "Reserve"), _case("Reserved", "Authorize"), _case("Authorized", "Capture"), _case("Captured", "Settle"), _case("Reserved", "CompensateReserve"), _case("Authorized", "CompensateAuthorize"), _case("Compensating", "CompensateReserve"), _case("Captured", "CompensateCapture"), _case("Settled", "Fail"), _case("Idle", "Authorize"), ] SAGA_RUN_CASES = [ _case([]), _case(["Reserve"]), _case(["Reserve", "Authorize"]), _case(["Reserve", "Authorize", "Capture"]), _case(["Reserve", "Authorize", "Capture", "Settle"]), _case(["Reserve", "Fail"]), _case(["Fail"]), _case(["Reserve", "CompensateReserve"]), _case(["Reserve", "Authorize", "CompensateAuthorize", "CompensateReserve"]), _case( ["Reserve", "Authorize", "Capture", "CompensateCapture", "CompensateReserve"] ), ] SAGA_CHARGED_CASES = [ _case("Idle"), _case("Reserved"), _case("Authorized"), _case("Captured"), _case("Settled"), _case("Compensating"), _case("Compensated"), _case("Failed"), _case("Captured"), _case("Settled"), ] TASK_SAMPLE_INPUTS: dict[str, dict[str, list[SampleCase]]] = { "rbac_auth": { "findRole": [ _case(RBAC_FULL_ROLES, "viewer"), _case(RBAC_FULL_ROLES, "admin"), _case(RBAC_FULL_ROLES, "auditor"), _case(RBAC_FULL_ROLES, "manager"), _case(RBAC_FULL_ROLES, "guest"), _case(RBAC_FULL_ROLES, "missing"), _case([RBAC_VIEWER, RBAC_EDITOR, RBAC_ADMIN], "auditor"), _case([RBAC_ADMIN, RBAC_VIEWER], "viewer"), _case([], "viewer"), _case(RBAC_DUPLICATE_ROLES, "viewer"), ], "hasDirectPermission": [ _case(RBAC_VIEWER, "reports", "read"), _case(RBAC_VIEWER, "reports", "write"), _case(RBAC_EDITOR, "reports", "write"), _case(RBAC_EDITOR, "reports", "read"), _case(RBAC_ADMIN, "billing", "write"), _case(RBAC_ADMIN, "reports", "read"), _case(RBAC_AUDITOR, "billing", "read"), _case(RBAC_GUEST, "reports", "read"), _case(RBAC_MANAGER, "reports", "approve"), _case(RBAC_MANAGER, "billing", "write"), ], "canAccess": [ _case(RBAC_FULL_ROLES, "admin", "reports", "read", 5), _case(RBAC_FULL_ROLES, "admin", "billing", "write", 5), _case(RBAC_FULL_ROLES, "viewer", "billing", "write", 5), _case(RBAC_FULL_ROLES, "editor", "reports", "read", 5), _case(RBAC_FULL_ROLES, "editor", "reports", "write", 5), _case(RBAC_FULL_ROLES, "guest", "reports", "read", 5), _case(RBAC_FULL_ROLES, "auditor", "billing", "read", 5), _case(RBAC_FULL_ROLES, "manager", "reports", "read", 5), _case(RBAC_FULL_ROLES, "admin", "reports", "read", 0), _case(RBAC_CYCLIC_ROLES, "cycle-a", "billing", "write", 5), ], }, "pricing_engine": { "subtotal": [_case(order) for order in PRICING_ORDERS], "taxRateBps": [ _case("US-CA"), _case("US-TX"), _case("US-NY"), _case("UK"), _case("EU"), _case(""), _case("us-ca"), _case("CA"), _case("US-CA"), _case("US-SW"), ], "couponDiscount": [_case(order) for order in PRICING_ORDERS], "loyaltyDiscount": [_case(order) for order in PRICING_ORDERS], "finalPrice": [_case(order) for order in PRICING_ORDERS], }, "payment_saga": { "transition": [_case(*case.args) for case in SAGA_TRANSITION_CASES], "runSaga": [_case(*case.args) for case in SAGA_RUN_CASES], "isCharged": [_case(*case.args) for case in SAGA_CHARGED_CASES], "no_double_charge_proof": [], }, "path_canonicalizer": { "normalizePath": [ _case(["usr", "local", "..", "bin"]), _case(["usr", ".", "local"]), _case(["..", "usr"]), _case([]), _case(["."]), _case(["a", "b", "c", "..", "..", "d"]), _case(["..", ".."]), _case(["a", "..", ".."]), _case(["a", "b"]), _case(["a", ".", "b", "..", "c"]), ], "joinPaths": [ _case(["usr", "local"], ["bin"]), _case(["usr", "local"], [".."]), _case([], ["a", "b"]), _case(["a", "b"], []), _case(["a", "b", "c"], ["..", "..", "d"]), _case(["usr", "local"], [".", "bin"]), _case(["a"], ["b", "c"]), _case(["a", "b"], ["c", "..", "d"]), _case(["a", "b", "c"], ["..", "..", ".."]), _case(["a"], ["."]), ], "pathDepth": [ _case(["usr", "local", "bin"]), _case([]), _case(["a"]), _case(["a", "b", "c", "d", "e"]), _case(["x"]), _case(["a", "b"]), _case(["home", "user", "docs"]), _case(["etc", "nginx", "conf.d"]), _case(["var", "log"]), _case(["usr", "share", "man", "man1"]), ], "canonicalizeProof": [], }, "expression_eval": { "evalBinOp": [ _case("Add", 3, 4), _case("Sub", 10, 3), _case("Mul", 3, 4), _case("Div", 10, 2), _case("Div", 10, 0), _case("Add", -5, 5), _case("Sub", 0, 0), _case("Mul", -2, 3), _case("Div", -10, 2), _case("Mul", 0, 999), ], "evalExpr": [ _case({"tag": "Lit", "n": 42}), _case({"tag": "BinOp", "op": "Add", "l": {"tag": "Lit", "n": 1}, "r": {"tag": "Lit", "n": 2}}), _case({"tag": "BinOp", "op": "Div", "l": {"tag": "Lit", "n": 5}, "r": {"tag": "Lit", "n": 0}}), _case({"tag": "BinOp", "op": "Mul", "l": {"tag": "Lit", "n": 3}, "r": {"tag": "Lit", "n": 4}}), _case({"tag": "BinOp", "op": "Sub", "l": {"tag": "BinOp", "op": "Add", "l": {"tag": "Lit", "n": 10}, "r": {"tag": "Lit", "n": 5}}, "r": {"tag": "Lit", "n": 3}}), _case({"tag": "BinOp", "op": "Div", "l": {"tag": "BinOp", "op": "Mul", "l": {"tag": "Lit", "n": 6}, "r": {"tag": "Lit", "n": 7}}, "r": {"tag": "Lit", "n": 6}}), _case({"tag": "Lit", "n": 0}), _case({"tag": "BinOp", "op": "Add", "l": {"tag": "Lit", "n": -5}, "r": {"tag": "Lit", "n": 5}}), _case({"tag": "BinOp", "op": "Div", "l": {"tag": "Lit", "n": 0}, "r": {"tag": "Lit", "n": 0}}), _case({"tag": "BinOp", "op": "Sub", "l": {"tag": "Lit", "n": 100}, "r": {"tag": "BinOp", "op": "Mul", "l": {"tag": "Lit", "n": 5}, "r": {"tag": "Lit", "n": 20}}}), ], "divisionProof": [], }, "lru_cache": { "lruEvict": [ _case([(1, 10), (2, 20), (3, 30)], 2), _case([(1, 10)], 0), _case([], 5), _case([(1, 1), (2, 2), (3, 3), (4, 4)], 3), _case([(5, 50)], 1), _case([(1, 10), (2, 20)], 5), _case([(1, 10), (2, 20), (3, 30)], 0), _case([(1, 1)], 1), _case([(1, 1), (2, 2)], 2), _case([(1, 1), (2, 2), (3, 3)], 1), ], "lruPut": [ _case([(1, 10), (2, 20)], 2, 1, 99), _case([(1, 10), (2, 20)], 2, 3, 30), _case([], 2, 1, 10), _case([(1, 10)], 1, 2, 20), _case([(2, 20), (1, 10)], 3, 3, 30), _case([(1, 10), (2, 20), (3, 30)], 2, 4, 40), _case([(1, 10)], 2, 1, 99), _case([(1, 10), (2, 20)], 0, 3, 30), _case([(3, 30), (1, 10), (2, 20)], 3, 2, 99), _case([(1, 10)], 3, 2, 20), ], "lruGet": [ _case([(1, 10), (2, 20), (3, 30)], 2), _case([(1, 10), (2, 20)], 3), _case([], 1), _case([(5, 50)], 5), _case([(1, 10), (2, 20), (3, 30)], 1), _case([(1, 10), (2, 20), (3, 30)], 3), _case([(1, 10)], 1), _case([(1, 10), (2, 20)], 1), _case([(2, 20), (1, 10)], 1), _case([(1, 1), (2, 2), (3, 3)], 2), ], "lruPutProof": [], }, "shortest_path": { "dijkstra": [ _case([(0, 1, 5), (1, 2, 3)], 3, 0), _case([], 3, 0), _case([(0, 1, 1), (0, 2, 4), (1, 2, 2)], 3, 0), _case([(0, 1, 10), (1, 2, 10)], 3, 1), _case([(0, 1, 7), (1, 2, 3), (0, 2, 15)], 3, 0), _case([(0, 1, 1)], 3, 2), _case([(0, 1, 5), (1, 0, 3)], 2, 0), _case([(0, 1, 1), (1, 2, 1), (2, 3, 1)], 4, 0), _case([(0, 1, 2), (0, 2, 5), (1, 3, 3), (2, 3, 1)], 4, 0), _case([], 1, 0), ], "shortestDist": [ _case([(0, 1, 5), (1, 2, 3)], 3, 0, 2), _case([(0, 1, 5)], 3, 0, 2), _case([], 3, 0, 0), _case([(0, 1, 1), (0, 2, 4), (1, 2, 2)], 3, 0, 2), _case([(0, 1, 7), (1, 2, 3), (0, 2, 15)], 3, 0, 2), _case([(0, 1, 1)], 2, 1, 0), _case([(0, 1, 1), (1, 2, 1), (2, 3, 1)], 4, 0, 3), _case([(0, 1, 2), (0, 2, 5), (1, 3, 3), (2, 3, 1)], 4, 0, 3), _case([(0, 1, 5), (1, 2, 3)], 3, 0, 0), _case([(0, 1, 5), (1, 2, 3)], 3, 0, 1), ], "relaxProof": [], }, "interval_scheduler": { "mergeIntervals": [ _case([(1, 3), (2, 5), (7, 9)]), _case([(1, 4), (4, 6)]), _case([(1, 2), (3, 4), (5, 6)]), _case([]), _case([(1, 10)]), _case([(5, 6), (1, 3), (2, 4)]), _case([(1, 5), (2, 3)]), _case([(1, 2), (2, 3), (3, 4)]), _case([(1, 100), (2, 3), (50, 60)]), _case([(3, 4), (1, 2)]), ], "maxSchedule": [ _case([(1, 3), (2, 4), (3, 5)]), _case([(1, 2), (3, 4), (5, 6)]), _case([]), _case([(1, 10)]), _case([(1, 4), (2, 3), (3, 5)]), _case([(1, 2), (2, 3), (3, 4)]), _case([(1, 5), (2, 3), (4, 6)]), _case([(6, 8), (1, 4), (2, 5), (5, 7)]), _case([(1, 3), (1, 4), (1, 5)]), _case([(1, 2), (1, 3), (2, 4), (3, 5)]), ], "mergeProof": [], }, } def get_task(task_id: str) -> Task: try: return _TASKS[task_id] except KeyError as error: available_tasks = ", ".join(sorted(_TASKS)) raise ValueError( f"Unknown task: {task_id}. Available tasks: {available_tasks}" ) from error def list_tasks() -> list[dict[str, object]]: return [ { "task_id": task.task_id, "display_name": task.display_name, "difficulty": task.difficulty, "max_steps": task.max_steps, "num_functions": len(task.functions), } for task in _TASKS.values() ]