cross-session-continuity-env / server /task_generator.py
Aswini-Kumar's picture
upload: server/task_generator.py
f83d2aa verified
"""
server/task_generator.py
Generates coding tasks with:
1. Name randomization per episode seed — prevents session 2 reconstructing from
pretrained knowledge without reading the handoff note.
2. Injected hidden adversarial tests — prevents visible-test overfitting.
3. Handoff-critical calibration — tasks are designed so session 1 cannot
fully finish within the step limit.
"""
import copy
import json
import os
import random
from dataclasses import dataclass, field
from typing import Dict, List, Optional
# ---------------------------------------------------------------------------
# Canonical → variant name bank (expanded per task template)
# ---------------------------------------------------------------------------
NAME_BANK: Dict[str, List[str]] = {
# Data structures
"merge_intervals": ["combine_ranges", "fuse_spans", "join_segments"],
"Stack": ["Accumulator", "PushPop", "LifoStore"],
"push": ["enqueue_item", "add_entry", "store_val"],
"pop": ["dequeue_item", "remove_entry", "fetch_val"],
# Rate limiting
"RateLimiter": ["ThrottleGuard", "RequestBucket", "AccessGate"],
"is_allowed": ["check_permit", "can_proceed", "gate_request"],
# LRU Cache
"LRUCache": ["BoundedCache", "EvictStore", "MruVault"],
"get": ["fetch", "retrieve", "lookup"],
"put": ["store", "insert", "upsert"],
# Data processing
"process_data": ["transform_records", "handle_payload", "digest_input"],
"normalize": ["standardize", "rescale", "calibrate"],
# Graph
"TopologicalSort": ["DependencyOrder", "DAGResolver", "LayerSorter"],
"add_edge": ["link_nodes", "connect_dep", "wire_pair"],
# Retry
"RetryExecutor": ["FaultTolerant", "BackoffRunner", "ResilienceWrap"],
"execute": ["run_with_retry", "attempt_call", "safe_invoke"],
}
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class Task:
task_id: str
difficulty: str
description: str
starter_code: Dict[str, str] # filename → source
test_code: str # visible pytest suite
hidden_test_code: str # adversarial hidden suite
files: Dict[str, str] = field(default_factory=dict) # runtime state
# ---------------------------------------------------------------------------
# Task templates (inline — no external files required for dev/demo)
# ---------------------------------------------------------------------------
TASK_TEMPLATES = {
# -----------------------------------------------------------------------
# EASY tasks
# -----------------------------------------------------------------------
"easy_merge_intervals": {
"difficulty": "easy",
"description": (
"Implement merge_intervals(intervals: list[list[int]]) -> list[list[int]] "
"in solution.py. The function receives a list of [start, end] intervals "
"and must return a merged list with no overlaps. "
"Session 1 should implement the sort + sweep logic and pass visible tests. "
"Session 2 must handle edge cases (empty input, single interval, touching "
"but non-overlapping intervals)."
),
"starter_code": {
"solution.py": (
"def merge_intervals(intervals):\n"
" # TODO: implement\n"
" pass\n"
)
},
"test_code": (
"from solution import merge_intervals\n\n"
"def test_basic():\n"
" assert merge_intervals([[1,3],[2,6],[8,10],[15,18]]) == [[1,6],[8,10],[15,18]]\n\n"
"def test_overlapping():\n"
" assert merge_intervals([[1,4],[4,5]]) == [[1,5]]\n\n"
"def test_no_overlap():\n"
" assert merge_intervals([[1,2],[3,4]]) == [[1,2],[3,4]]\n"
),
"hidden_test_code": (
"from solution import merge_intervals\n\n"
"def test_empty():\n"
" assert merge_intervals([]) == []\n\n"
"def test_single():\n"
" assert merge_intervals([[5,5]]) == [[5,5]]\n"
),
},
"easy_stack": {
"difficulty": "easy",
"description": (
"Implement a Stack class in solution.py with push(val), pop() -> val, "
"peek() -> val, is_empty() -> bool, and size() -> int. "
"pop() and peek() on empty stack should raise IndexError. "
"Session 1: implement and pass visible tests. "
"Session 2: add __repr__ and make the class iterable."
),
"starter_code": {
"solution.py": (
"class Stack:\n"
" def __init__(self):\n"
" # TODO\n"
" pass\n"
)
},
"test_code": (
"from solution import Stack\n\n"
"def test_push_pop():\n"
" s = Stack()\n"
" s.push(1); s.push(2)\n"
" assert s.pop() == 2\n\n"
"def test_empty_pop():\n"
" import pytest\n"
" s = Stack()\n"
" with pytest.raises(IndexError):\n"
" s.pop()\n\n"
"def test_size():\n"
" s = Stack()\n"
" s.push(10); s.push(20)\n"
" assert s.size() == 2\n"
),
"hidden_test_code": (
"from solution import Stack\n\n"
"def test_repr():\n"
" s = Stack()\n"
" s.push(1)\n"
" assert '1' in repr(s)\n\n"
"def test_iterable():\n"
" s = Stack()\n"
" for v in [3,2,1]:\n"
" s.push(v)\n"
" assert list(s) == [1, 2, 3]\n"
),
},
"easy_running_median": {
"difficulty": "easy",
"description": (
"Implement RunningMedian in solution.py. It must support add(num) and "
"get_median() -> float. Uses two heaps internally. "
"Session 1: implement heap-based median, pass visible tests. "
"Session 2: add reset() and from_list(nums) classmethod."
),
"starter_code": {
"solution.py": (
"class RunningMedian:\n"
" def __init__(self):\n"
" # TODO: two-heap approach\n"
" pass\n"
)
},
"test_code": (
"from solution import RunningMedian\n\n"
"def test_basic():\n"
" rm = RunningMedian()\n"
" rm.add(1); rm.add(2); rm.add(3)\n"
" assert rm.get_median() == 2.0\n\n"
"def test_even():\n"
" rm = RunningMedian()\n"
" rm.add(1); rm.add(2)\n"
" assert rm.get_median() == 1.5\n"
),
"hidden_test_code": (
"from solution import RunningMedian\n\n"
"def test_reset():\n"
" rm = RunningMedian()\n"
" rm.add(5)\n"
" rm.reset()\n"
" rm.add(1)\n"
" assert rm.get_median() == 1.0\n\n"
"def test_from_list():\n"
" rm = RunningMedian.from_list([3, 1, 2])\n"
" assert rm.get_median() == 2.0\n"
),
},
# -----------------------------------------------------------------------
# MEDIUM tasks
# -----------------------------------------------------------------------
"medium_rate_limiter": {
"difficulty": "medium",
"description": (
"Implement a token-bucket RateLimiter in solution.py. "
"Constructor: RateLimiter(rate: int, capacity: int). "
"is_allowed(n_tokens=1) -> bool: returns True if n_tokens can be consumed, "
"refilling at 'rate' tokens per second. "
"Use time.monotonic() for timestamps. "
"Session 1: core token-bucket logic + visible tests. "
"Session 2: add burst_remaining() -> int and thread-safety via threading.Lock."
),
"starter_code": {
"solution.py": (
"import time\n\n"
"class RateLimiter:\n"
" def __init__(self, rate: int, capacity: int):\n"
" # TODO\n"
" pass\n"
)
},
"test_code": (
"import time\n"
"from solution import RateLimiter\n\n"
"def test_basic_allow():\n"
" rl = RateLimiter(10, 10)\n"
" assert rl.is_allowed() is True\n\n"
"def test_exhaustion():\n"
" rl = RateLimiter(1, 3)\n"
" assert rl.is_allowed(3) is True\n"
" assert rl.is_allowed() is False\n\n"
"def test_refill():\n"
" rl = RateLimiter(10, 10)\n"
" rl.is_allowed(10)\n"
" time.sleep(0.2)\n"
" assert rl.is_allowed(2) is True\n\n"
"def test_over_capacity():\n"
" rl = RateLimiter(5, 5)\n"
" assert rl.is_allowed(6) is False\n\n"
"def test_zero_tokens():\n"
" rl = RateLimiter(5, 5)\n"
" assert rl.is_allowed(0) is True\n"
),
"hidden_test_code": (
"import threading, time\n"
"from solution import RateLimiter\n\n"
"def test_burst_remaining():\n"
" rl = RateLimiter(10, 10)\n"
" rl.is_allowed(4)\n"
" assert rl.burst_remaining() == 6\n\n"
"def test_thread_safe():\n"
" rl = RateLimiter(100, 100)\n"
" results = []\n"
" def task():\n"
" results.append(rl.is_allowed(10))\n"
" threads = [threading.Thread(target=task) for _ in range(10)]\n"
" for t in threads: t.start()\n"
" for t in threads: t.join()\n"
" assert results.count(True) == 10\n"
),
},
"medium_lru_cache": {
"difficulty": "medium",
"description": (
"Implement LRUCache(capacity: int) in solution.py. "
"get(key) -> int: return value or -1 if not present. "
"put(key, value): insert, evicting LRU entry if at capacity. "
"Both O(1) using dict + doubly-linked list. "
"Session 1: core get/put + visible tests. "
"Session 2: add keys() -> list (in MRU→LRU order) and "
"clear() method."
),
"starter_code": {
"solution.py": (
"class LRUCache:\n"
" def __init__(self, capacity: int):\n"
" # TODO: doubly-linked list + dict\n"
" pass\n"
)
},
"test_code": (
"from solution import LRUCache\n\n"
"def test_basic():\n"
" c = LRUCache(2)\n"
" c.put(1, 1); c.put(2, 2)\n"
" assert c.get(1) == 1\n"
" c.put(3, 3)\n"
" assert c.get(2) == -1\n\n"
"def test_overwrite():\n"
" c = LRUCache(2)\n"
" c.put(1, 10); c.put(1, 20)\n"
" assert c.get(1) == 20\n\n"
"def test_capacity_one():\n"
" c = LRUCache(1)\n"
" c.put(1, 1); c.put(2, 2)\n"
" assert c.get(1) == -1\n"
" assert c.get(2) == 2\n\n"
"def test_miss():\n"
" c = LRUCache(3)\n"
" assert c.get(99) == -1\n\n"
"def test_no_eviction_under_cap():\n"
" c = LRUCache(5)\n"
" for i in range(5):\n"
" c.put(i, i*10)\n"
" for i in range(5):\n"
" assert c.get(i) == i*10\n"
),
"hidden_test_code": (
"from solution import LRUCache\n\n"
"def test_keys_order():\n"
" c = LRUCache(3)\n"
" c.put(1,1); c.put(2,2); c.put(3,3)\n"
" c.get(1)\n"
" assert c.keys()[0] == 1\n\n"
"def test_clear():\n"
" c = LRUCache(3)\n"
" c.put(1,1); c.put(2,2)\n"
" c.clear()\n"
" assert c.get(1) == -1\n"
),
},
# -----------------------------------------------------------------------
# HARD tasks
# -----------------------------------------------------------------------
"hard_topological_sort": {
"difficulty": "hard",
"description": (
"Implement a TopologicalSort class in solution.py for a DAG. "
"add_edge(u, v): add directed edge u→v. "
"sort() -> list: return topological order (raise CycleError if cycle). "
"has_path(src, dst) -> bool: BFS/DFS reachability. "
"Also implement parallel_layers() -> list[list]: return nodes grouped "
"by execution layer (Kahn's algorithm variant). "
"Session 1: add_edge + sort + CycleError + visible tests. "
"Session 2: has_path + parallel_layers + hidden tests."
),
"starter_code": {
"solution.py": (
"from collections import defaultdict, deque\n\n"
"class CycleError(Exception):\n"
" pass\n\n"
"class TopologicalSort:\n"
" def __init__(self):\n"
" self.graph = defaultdict(list)\n"
" self.nodes = set()\n\n"
" def add_edge(self, u, v):\n"
" # TODO\n"
" pass\n\n"
" def sort(self):\n"
" # TODO: Kahn's algorithm\n"
" pass\n"
)
},
"test_code": (
"import pytest\n"
"from solution import TopologicalSort, CycleError\n\n"
"def test_linear():\n"
" ts = TopologicalSort()\n"
" ts.add_edge('a','b'); ts.add_edge('b','c')\n"
" order = ts.sort()\n"
" assert order.index('a') < order.index('b') < order.index('c')\n\n"
"def test_cycle():\n"
" ts = TopologicalSort()\n"
" ts.add_edge('a','b'); ts.add_edge('b','a')\n"
" with pytest.raises(CycleError):\n"
" ts.sort()\n\n"
"def test_diamond():\n"
" ts = TopologicalSort()\n"
" ts.add_edge('a','b'); ts.add_edge('a','c')\n"
" ts.add_edge('b','d'); ts.add_edge('c','d')\n"
" order = ts.sort()\n"
" assert order[0] == 'a' and order[-1] == 'd'\n\n"
"def test_isolated_node():\n"
" ts = TopologicalSort()\n"
" ts.add_edge('a','b')\n"
" order = ts.sort()\n"
" assert set(order) == {'a','b'}\n\n"
"def test_empty():\n"
" ts = TopologicalSort()\n"
" assert ts.sort() == []\n\n"
"def test_single_node():\n"
" ts = TopologicalSort()\n"
" ts.add_edge('x','x')\n"
" with pytest.raises(CycleError):\n"
" ts.sort()\n\n"
"def test_large_dag():\n"
" ts = TopologicalSort()\n"
" for i in range(9):\n"
" ts.add_edge(str(i), str(i+1))\n"
" order = ts.sort()\n"
" assert order == [str(i) for i in range(10)]\n\n"
"def test_multi_root():\n"
" ts = TopologicalSort()\n"
" ts.add_edge('a','c'); ts.add_edge('b','c')\n"
" order = ts.sort()\n"
" assert order.index('a') < order.index('c')\n"
" assert order.index('b') < order.index('c')\n"
),
"hidden_test_code": (
"from solution import TopologicalSort\n\n"
"def test_has_path_true():\n"
" ts = TopologicalSort()\n"
" ts.add_edge('a','b'); ts.add_edge('b','c')\n"
" assert ts.has_path('a','c') is True\n\n"
"def test_has_path_false():\n"
" ts = TopologicalSort()\n"
" ts.add_edge('a','b')\n"
" assert ts.has_path('b','a') is False\n\n"
"def test_parallel_layers():\n"
" ts = TopologicalSort()\n"
" ts.add_edge('a','c'); ts.add_edge('b','c')\n"
" layers = ts.parallel_layers()\n"
" assert set(layers[0]) == {'a','b'}\n"
" assert layers[-1] == ['c']\n"
),
},
}
# Holdout tasks (simplified for eval only)
HOLDOUT_TEMPLATES = {
"holdout_two_sum": TASK_TEMPLATES["easy_merge_intervals"], # placeholder
"holdout_word_count": TASK_TEMPLATES["easy_stack"],
"holdout_retry_exec": TASK_TEMPLATES["medium_rate_limiter"],
}
# ---------------------------------------------------------------------------
# TaskGenerator
# ---------------------------------------------------------------------------
class TaskGenerator:
"""
Samples tasks from the template bank, applies name randomization per seed,
and injects hidden adversarial tests.
"""
def __init__(self, difficulty: str = "medium"):
assert difficulty in {"easy", "medium", "hard"}, f"Invalid difficulty: {difficulty}"
self.difficulty = difficulty
self._bank = {
k: v for k, v in TASK_TEMPLATES.items()
if v["difficulty"] == difficulty
}
self._holdout = HOLDOUT_TEMPLATES
def sample(
self,
task_id: Optional[str] = None,
seed: Optional[int] = None,
) -> Task:
if seed is not None:
random.seed(seed)
if task_id and task_id in TASK_TEMPLATES:
template = copy.deepcopy(TASK_TEMPLATES[task_id])
chosen_id = task_id
else:
chosen_id = random.choice(list(self._bank.keys()))
template = copy.deepcopy(self._bank[chosen_id])
task = self._build_task(chosen_id, template)
task = self._randomize_names(task)
return task
def sample_holdout(self, task_id: Optional[str] = None) -> Task:
"""Sample from holdout set (never used in training)."""
if task_id and task_id in self._holdout:
template = copy.deepcopy(self._holdout[task_id])
chosen_id = task_id
else:
chosen_id = random.choice(list(self._holdout.keys()))
template = copy.deepcopy(self._holdout[chosen_id])
return self._build_task(chosen_id, template)
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
@staticmethod
def _build_task(task_id: str, template: dict) -> Task:
return Task(
task_id=task_id,
difficulty=template["difficulty"],
description=template["description"],
starter_code=dict(template["starter_code"]),
test_code=template["test_code"],
hidden_test_code=template["hidden_test_code"],
files=dict(template["starter_code"]), # runtime mutable copy
)
@staticmethod
def _randomize_names(task: Task) -> Task:
"""
Randomly remap canonical names to episode-specific variants.
This prevents Session 2 from reconstructing the solution purely from
pretrained knowledge without reading the handoff note.
"""
mapping = {}
for canonical, variants in NAME_BANK.items():
mapping[canonical] = random.choice(variants)
def apply(text: str) -> str:
for canon, variant in mapping.items():
text = text.replace(canon, variant)
return text
task.description = apply(task.description)
task.files = {k: apply(v) for k, v in task.files.items()}
task.starter_code = {k: apply(v) for k, v in task.starter_code.items()}
task.test_code = apply(task.test_code)
# Hidden tests use canonical names — not randomized (consistent eval)
return task