Buckets:
bbkdevops/unicosys-hypergraph-bucket / tinymind-native-8b-remote-handoff /bundle /data /code_forge.py
| """ | |
| Code Forge — สร้าง code training data ระดับลึกสุด | |
| ครอบคลุม 8 หมวด: | |
| L1: Python Basics (functions, loops, conditions) | |
| L2: Data Structures (list, dict, set, deque) | |
| L3: String Manipulation | |
| L4: Sorting & Searching Algorithms | |
| L5: Dynamic Programming | |
| L6: Graph Algorithms (BFS/DFS) | |
| L7: Mathematical Algorithms | |
| L8: Competitive Programming (hard) | |
| แต่ละ problem มี: | |
| - question: โจทย์ภาษาอังกฤษ+ไทย | |
| - thinking: แนวคิด step-by-step (CoT) | |
| - answer: code solution พร้อม type hints | |
| - test_cases: unit tests สำหรับ verify | |
| - ground_truth: expected outputs (JSON) | |
| - complexity: O(?) notation | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import textwrap | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| OUTPUT_DIR = Path(__file__).parent / "filtered" | |
| OUTPUT_DIR.mkdir(exist_ok=True) | |
| CODE_OUTPUT = OUTPUT_DIR / "code_grpo.jsonl" | |
| class CodeProblem: | |
| question: str | |
| thinking: str | |
| answer: str # complete Python solution | |
| test_cases: str # pytest-style assertions | |
| ground_truth: str # JSON expected outputs for simple verification | |
| level: int | |
| category: str | |
| time_complexity: str | |
| space_complexity: str | |
| lang: str = "en" | |
| source: str = "code_forge" | |
| topic: str = "programming" | |
| context: str = "" | |
| def _dedent(code: str) -> str: | |
| return textwrap.dedent(code).strip() | |
| # ─── Level 1: Python Basics ─────────────────────────────────────────────────── | |
| L1_PROBLEMS: list[CodeProblem] = [ | |
| CodeProblem( | |
| question="Write `def fizzbuzz(n: int) -> list[str]` that returns FizzBuzz from 1 to n: 'Fizz' (÷3), 'Buzz' (÷5), 'FizzBuzz' (÷15), else the number as string.", | |
| thinking=_dedent(""" | |
| Iterate 1..n inclusive. | |
| Check divisibility: test 15 first (LCM of 3,5), then 3, then 5, else str(i). | |
| Return as list. | |
| """), | |
| answer=_dedent(""" | |
| def fizzbuzz(n: int) -> list[str]: | |
| result = [] | |
| for i in range(1, n + 1): | |
| if i % 15 == 0: | |
| result.append("FizzBuzz") | |
| elif i % 3 == 0: | |
| result.append("Fizz") | |
| elif i % 5 == 0: | |
| result.append("Buzz") | |
| else: | |
| result.append(str(i)) | |
| return result | |
| """), | |
| test_cases=_dedent(""" | |
| assert fizzbuzz(5) == ['1', '2', 'Fizz', '4', 'Buzz'] | |
| assert fizzbuzz(15)[-1] == 'FizzBuzz' | |
| assert fizzbuzz(1) == ['1'] | |
| """), | |
| ground_truth='["1","2","Fizz","4","Buzz"]', | |
| level=1, category="basics", time_complexity="O(n)", space_complexity="O(n)", | |
| ), | |
| CodeProblem( | |
| question="Write `def two_sum(nums: list[int], target: int) -> list[int]` that returns indices of two numbers that add to target. Assume exactly one solution.", | |
| thinking=_dedent(""" | |
| Naive O(n²): try all pairs. But O(n) with a hash map. | |
| For each num at index i, check if (target - num) already in seen. | |
| Store {value: index} as we go. | |
| """), | |
| answer=_dedent(""" | |
| def two_sum(nums: list[int], target: int) -> list[int]: | |
| seen: dict[int, int] = {} | |
| for i, num in enumerate(nums): | |
| complement = target - num | |
| if complement in seen: | |
| return [seen[complement], i] | |
| seen[num] = i | |
| return [] | |
| """), | |
| test_cases=_dedent(""" | |
| assert two_sum([2, 7, 11, 15], 9) == [0, 1] | |
| assert two_sum([3, 2, 4], 6) == [1, 2] | |
| assert two_sum([3, 3], 6) == [0, 1] | |
| """), | |
| ground_truth="[0,1]", | |
| level=1, category="basics", time_complexity="O(n)", space_complexity="O(n)", | |
| ), | |
| CodeProblem( | |
| question="Write `def flatten_once(lst: list) -> list` that flattens a list one level deep.", | |
| thinking="Use list comprehension: for each sublist, iterate its elements.", | |
| answer=_dedent(""" | |
| def flatten_once(lst: list) -> list: | |
| return [item for sublist in lst for item in sublist] | |
| """), | |
| test_cases=_dedent(""" | |
| assert flatten_once([[1,2],[3,4],[5]]) == [1,2,3,4,5] | |
| assert flatten_once([[1],[2,3],[4,5,6]]) == [1,2,3,4,5,6] | |
| assert flatten_once([]) == [] | |
| """), | |
| ground_truth="[1,2,3,4,5]", | |
| level=1, category="basics", time_complexity="O(n)", space_complexity="O(n)", | |
| ), | |
| CodeProblem( | |
| question="Write `def count_vowels(s: str) -> int` that counts vowels (a,e,i,o,u) case-insensitively.", | |
| thinking="Convert to lower, count chars in vowel set.", | |
| answer=_dedent(""" | |
| def count_vowels(s: str) -> int: | |
| return sum(1 for c in s.lower() if c in 'aeiou') | |
| """), | |
| test_cases=_dedent(""" | |
| assert count_vowels("Hello World") == 3 | |
| assert count_vowels("AEIOU") == 5 | |
| assert count_vowels("xyz") == 0 | |
| """), | |
| ground_truth="3", | |
| level=1, category="basics", time_complexity="O(n)", space_complexity="O(1)", | |
| ), | |
| CodeProblem( | |
| question="Write `def rotate_list(lst: list, k: int) -> list` that rotates a list right by k positions.", | |
| thinking="k can be > len. Use k % len to normalize. Slice: lst[-k:] + lst[:-k].", | |
| answer=_dedent(""" | |
| def rotate_list(lst: list, k: int) -> list: | |
| if not lst: | |
| return lst | |
| k = k % len(lst) | |
| return lst[-k:] + lst[:-k] if k else lst[:] | |
| """), | |
| test_cases=_dedent(""" | |
| assert rotate_list([1,2,3,4,5], 2) == [4,5,1,2,3] | |
| assert rotate_list([1,2,3], 0) == [1,2,3] | |
| assert rotate_list([1,2,3], 4) == [3,1,2] | |
| """), | |
| ground_truth="[4,5,1,2,3]", | |
| level=1, category="basics", time_complexity="O(n)", space_complexity="O(n)", | |
| ), | |
| ] | |
| # ─── Level 2: Data Structures ───────────────────────────────────────────────── | |
| L2_PROBLEMS: list[CodeProblem] = [ | |
| CodeProblem( | |
| question="Write `def lru_cache_impl(capacity: int)` as a class `LRUCache` with `get(key)` and `put(key, value)` both O(1). Use OrderedDict.", | |
| thinking=_dedent(""" | |
| OrderedDict maintains insertion order. On get/put, move key to end (most recent). | |
| On put when at capacity, pop first item (least recently used). | |
| get returns -1 if key absent. | |
| """), | |
| answer=_dedent(""" | |
| from collections import OrderedDict | |
| class LRUCache: | |
| def __init__(self, capacity: int): | |
| self.cap = capacity | |
| self.cache: OrderedDict[int, int] = OrderedDict() | |
| def get(self, key: int) -> int: | |
| if key not in self.cache: | |
| return -1 | |
| self.cache.move_to_end(key) | |
| return self.cache[key] | |
| def put(self, key: int, value: int) -> None: | |
| if key in self.cache: | |
| self.cache.move_to_end(key) | |
| self.cache[key] = value | |
| if len(self.cache) > self.cap: | |
| self.cache.popitem(last=False) | |
| """), | |
| test_cases=_dedent(""" | |
| cache = LRUCache(2) | |
| cache.put(1, 1); cache.put(2, 2) | |
| assert cache.get(1) == 1 | |
| cache.put(3, 3) | |
| assert cache.get(2) == -1 | |
| cache.put(4, 4) | |
| assert cache.get(1) == -1 | |
| assert cache.get(3) == 3 | |
| assert cache.get(4) == 4 | |
| """), | |
| ground_truth="-1", | |
| level=2, category="data_structures", time_complexity="O(1)", space_complexity="O(capacity)", | |
| ), | |
| CodeProblem( | |
| question="Write `def valid_parentheses(s: str) -> bool` that returns True if brackets are balanced. Handle '(', ')', '{', '}', '[', ']'.", | |
| thinking=_dedent(""" | |
| Use a stack. For each char: | |
| - Opening bracket → push | |
| - Closing bracket → check stack top is matching opening | |
| Empty stack at end = balanced. | |
| """), | |
| answer=_dedent(""" | |
| def valid_parentheses(s: str) -> bool: | |
| stack: list[str] = [] | |
| matching = {')': '(', '}': '{', ']': '['} | |
| for ch in s: | |
| if ch in '({[': | |
| stack.append(ch) | |
| elif ch in matching: | |
| if not stack or stack[-1] != matching[ch]: | |
| return False | |
| stack.pop() | |
| return len(stack) == 0 | |
| """), | |
| test_cases=_dedent(""" | |
| assert valid_parentheses("()[]{}") == True | |
| assert valid_parentheses("([{}])") == True | |
| assert valid_parentheses("(]") == False | |
| assert valid_parentheses("{[}]") == False | |
| assert valid_parentheses("") == True | |
| """), | |
| ground_truth="true", | |
| level=2, category="data_structures", time_complexity="O(n)", space_complexity="O(n)", | |
| ), | |
| CodeProblem( | |
| question="Write `def top_k_frequent(nums: list[int], k: int) -> list[int]` that returns k most frequent elements.", | |
| thinking=_dedent(""" | |
| Count frequencies with Counter. | |
| Use heap or just sorted. | |
| sorted(count.items(), key=lambda x: -x[1])[:k] gives top-k. | |
| For O(n log k) use heapq.nlargest. | |
| """), | |
| answer=_dedent(""" | |
| from collections import Counter | |
| import heapq | |
| def top_k_frequent(nums: list[int], k: int) -> list[int]: | |
| count = Counter(nums) | |
| return [x for x, _ in heapq.nlargest(k, count.items(), key=lambda p: p[1])] | |
| """), | |
| test_cases=_dedent(""" | |
| assert set(top_k_frequent([1,1,1,2,2,3], 2)) == {1, 2} | |
| assert top_k_frequent([1], 1) == [1] | |
| """), | |
| ground_truth="[1,2]", | |
| level=2, category="data_structures", time_complexity="O(n log k)", space_complexity="O(n)", | |
| ), | |
| ] | |
| # ─── Level 3: String Manipulation ───────────────────────────────────────────── | |
| L3_PROBLEMS: list[CodeProblem] = [ | |
| CodeProblem( | |
| question="Write `def longest_palindromic_substring(s: str) -> str` using Expand-Around-Center O(n²).", | |
| thinking=_dedent(""" | |
| For each center (and between chars), expand outward while chars match. | |
| Track max palindrome seen. Handle both odd and even length palindromes. | |
| """), | |
| answer=_dedent(""" | |
| def longest_palindromic_substring(s: str) -> str: | |
| def expand(l: int, r: int) -> str: | |
| while l >= 0 and r < len(s) and s[l] == s[r]: | |
| l -= 1; r += 1 | |
| return s[l+1:r] | |
| best = "" | |
| for i in range(len(s)): | |
| odd = expand(i, i) | |
| even = expand(i, i+1) | |
| if len(odd) > len(best): best = odd | |
| if len(even) > len(best): best = even | |
| return best | |
| """), | |
| test_cases=_dedent(""" | |
| assert longest_palindromic_substring("babad") in ("bab", "aba") | |
| assert longest_palindromic_substring("cbbd") == "bb" | |
| assert longest_palindromic_substring("a") == "a" | |
| assert longest_palindromic_substring("racecar") == "racecar" | |
| """), | |
| ground_truth='"racecar"', | |
| level=3, category="strings", time_complexity="O(n²)", space_complexity="O(1)", | |
| ), | |
| CodeProblem( | |
| question="Write `def group_anagrams(words: list[str]) -> list[list[str]]` that groups anagrams together.", | |
| thinking="Sort each word as key, group by key using defaultdict(list).", | |
| answer=_dedent(""" | |
| from collections import defaultdict | |
| def group_anagrams(words: list[str]) -> list[list[str]]: | |
| groups: dict[str, list[str]] = defaultdict(list) | |
| for w in words: | |
| key = ''.join(sorted(w)) | |
| groups[key].append(w) | |
| return list(groups.values()) | |
| """), | |
| test_cases=_dedent(""" | |
| result = group_anagrams(["eat","tea","tan","ate","nat","bat"]) | |
| assert sorted(sorted(g) for g in result) == [['ate','eat','tea'],['bat'],['nat','tan']] | |
| """), | |
| ground_truth='[["ate","eat","tea"],["bat"],["nat","tan"]]', | |
| level=3, category="strings", time_complexity="O(n·k·log k)", space_complexity="O(n·k)", | |
| ), | |
| ] | |
| # ─── Level 4: Sorting & Searching ───────────────────────────────────────────── | |
| L4_PROBLEMS: list[CodeProblem] = [ | |
| CodeProblem( | |
| question="Implement `def merge_sort(arr: list[int]) -> list[int]` from scratch.", | |
| thinking=_dedent(""" | |
| Divide: split array in half recursively until size 1. | |
| Conquer: merge two sorted halves. | |
| Merge: two-pointer comparison, append remainder. | |
| """), | |
| answer=_dedent(""" | |
| def merge_sort(arr: list[int]) -> list[int]: | |
| if len(arr) <= 1: | |
| return arr | |
| mid = len(arr) // 2 | |
| left = merge_sort(arr[:mid]) | |
| right = merge_sort(arr[mid:]) | |
| return _merge(left, right) | |
| def _merge(a: list[int], b: list[int]) -> list[int]: | |
| result: list[int] = [] | |
| i = j = 0 | |
| while i < len(a) and j < len(b): | |
| if a[i] <= b[j]: | |
| result.append(a[i]); i += 1 | |
| else: | |
| result.append(b[j]); j += 1 | |
| return result + a[i:] + b[j:] | |
| """), | |
| test_cases=_dedent(""" | |
| assert merge_sort([5,2,8,1,9,3]) == [1,2,3,5,8,9] | |
| assert merge_sort([1]) == [1] | |
| assert merge_sort([]) == [] | |
| assert merge_sort([3,1,4,1,5,9,2,6]) == sorted([3,1,4,1,5,9,2,6]) | |
| """), | |
| ground_truth="[1,2,3,5,8,9]", | |
| level=4, category="sorting", time_complexity="O(n log n)", space_complexity="O(n)", | |
| ), | |
| CodeProblem( | |
| question="Write `def binary_search(arr: list[int], target: int) -> int` that returns the index or -1 if not found. Array is sorted.", | |
| thinking=_dedent(""" | |
| Maintain lo, hi pointers. Mid = (lo+hi)//2. | |
| If arr[mid]==target → found. If arr[mid]<target → search right. Else left. | |
| """), | |
| answer=_dedent(""" | |
| def binary_search(arr: list[int], target: int) -> int: | |
| lo, hi = 0, len(arr) - 1 | |
| while lo <= hi: | |
| mid = (lo + hi) // 2 | |
| if arr[mid] == target: | |
| return mid | |
| elif arr[mid] < target: | |
| lo = mid + 1 | |
| else: | |
| hi = mid - 1 | |
| return -1 | |
| """), | |
| test_cases=_dedent(""" | |
| assert binary_search([1,3,5,7,9,11], 7) == 3 | |
| assert binary_search([1,3,5,7,9,11], 6) == -1 | |
| assert binary_search([], 1) == -1 | |
| assert binary_search([5], 5) == 0 | |
| """), | |
| ground_truth="3", | |
| level=4, category="searching", time_complexity="O(log n)", space_complexity="O(1)", | |
| ), | |
| ] | |
| # ─── Level 5: Dynamic Programming ───────────────────────────────────────────── | |
| L5_PROBLEMS: list[CodeProblem] = [ | |
| CodeProblem( | |
| question="Write `def longest_common_subsequence(s1: str, s2: str) -> int` using DP bottom-up.", | |
| thinking=_dedent(""" | |
| dp[i][j] = LCS of s1[:i] and s2[:j]. | |
| If s1[i-1]==s2[j-1]: dp[i][j] = dp[i-1][j-1] + 1 | |
| Else: dp[i][j] = max(dp[i-1][j], dp[i][j-1]) | |
| Time O(mn), Space O(mn) — can reduce to O(n). | |
| """), | |
| answer=_dedent(""" | |
| def longest_common_subsequence(s1: str, s2: str) -> int: | |
| m, n = len(s1), len(s2) | |
| dp = [[0] * (n + 1) for _ in range(m + 1)] | |
| for i in range(1, m + 1): | |
| for j in range(1, n + 1): | |
| if s1[i-1] == s2[j-1]: | |
| dp[i][j] = dp[i-1][j-1] + 1 | |
| else: | |
| dp[i][j] = max(dp[i-1][j], dp[i][j-1]) | |
| return dp[m][n] | |
| """), | |
| test_cases=_dedent(""" | |
| assert longest_common_subsequence("abcde", "ace") == 3 | |
| assert longest_common_subsequence("abc", "abc") == 3 | |
| assert longest_common_subsequence("abc", "def") == 0 | |
| assert longest_common_subsequence("AGGTAB", "GXTXAYB") == 4 | |
| """), | |
| ground_truth="3", | |
| level=5, category="dynamic_programming", time_complexity="O(mn)", space_complexity="O(mn)", | |
| ), | |
| CodeProblem( | |
| question="Write `def coin_change(coins: list[int], amount: int) -> int` — minimum number of coins to make amount, or -1 if impossible.", | |
| thinking=_dedent(""" | |
| Classic unbounded knapsack DP. | |
| dp[i] = min coins to make amount i. | |
| dp[0] = 0, dp[i] = min(dp[i-c]+1) for c in coins if i>=c. | |
| Initialize dp[1..amount] = infinity. | |
| """), | |
| answer=_dedent(""" | |
| def coin_change(coins: list[int], amount: int) -> int: | |
| dp = [float('inf')] * (amount + 1) | |
| dp[0] = 0 | |
| for i in range(1, amount + 1): | |
| for c in coins: | |
| if c <= i: | |
| dp[i] = min(dp[i], dp[i - c] + 1) | |
| return int(dp[amount]) if dp[amount] != float('inf') else -1 | |
| """), | |
| test_cases=_dedent(""" | |
| assert coin_change([1,5,6,9], 11) == 2 | |
| assert coin_change([1,2,5], 11) == 3 | |
| assert coin_change([2], 3) == -1 | |
| assert coin_change([1], 0) == 0 | |
| """), | |
| ground_truth="3", | |
| level=5, category="dynamic_programming", time_complexity="O(amount × len(coins))", space_complexity="O(amount)", | |
| ), | |
| CodeProblem( | |
| question="Write `def max_subarray(nums: list[int]) -> int` — maximum subarray sum (Kadane's Algorithm).", | |
| thinking=_dedent(""" | |
| Kadane's: track current_sum and max_sum. | |
| current_sum = max(num, current_sum + num) — restart if adding is worse. | |
| max_sum = max(max_sum, current_sum). | |
| """), | |
| answer=_dedent(""" | |
| def max_subarray(nums: list[int]) -> int: | |
| if not nums: | |
| return 0 | |
| current = best = nums[0] | |
| for num in nums[1:]: | |
| current = max(num, current + num) | |
| best = max(best, current) | |
| return best | |
| """), | |
| test_cases=_dedent(""" | |
| assert max_subarray([-2,1,-3,4,-1,2,1,-5,4]) == 6 | |
| assert max_subarray([1]) == 1 | |
| assert max_subarray([-1,-2,-3]) == -1 | |
| assert max_subarray([5,4,-1,7,8]) == 23 | |
| """), | |
| ground_truth="6", | |
| level=5, category="dynamic_programming", time_complexity="O(n)", space_complexity="O(1)", | |
| ), | |
| CodeProblem( | |
| question="Write `def knapsack_01(weights: list[int], values: list[int], capacity: int) -> int` — 0/1 Knapsack max value.", | |
| thinking=_dedent(""" | |
| dp[i][w] = max value using first i items with capacity w. | |
| If weight[i-1] > w: dp[i][w] = dp[i-1][w] | |
| Else: dp[i][w] = max(dp[i-1][w], dp[i-1][w-weight[i-1]] + value[i-1]) | |
| """), | |
| answer=_dedent(""" | |
| def knapsack_01(weights: list[int], values: list[int], capacity: int) -> int: | |
| n = len(weights) | |
| dp = [[0] * (capacity + 1) for _ in range(n + 1)] | |
| for i in range(1, n + 1): | |
| for w in range(capacity + 1): | |
| dp[i][w] = dp[i-1][w] | |
| if weights[i-1] <= w: | |
| dp[i][w] = max(dp[i][w], | |
| dp[i-1][w - weights[i-1]] + values[i-1]) | |
| return dp[n][capacity] | |
| """), | |
| test_cases=_dedent(""" | |
| assert knapsack_01([1,3,4,5],[1,4,5,7], 7) == 9 | |
| assert knapsack_01([2,3,4,5],[3,4,5,6], 8) == 10 | |
| assert knapsack_01([1],[10], 5) == 10 | |
| """), | |
| ground_truth="9", | |
| level=5, category="dynamic_programming", time_complexity="O(n × capacity)", space_complexity="O(n × capacity)", | |
| ), | |
| ] | |
| # ─── Level 6: Graph Algorithms ──────────────────────────────────────────────── | |
| L6_PROBLEMS: list[CodeProblem] = [ | |
| CodeProblem( | |
| question="Write `def bfs_shortest_path(graph: dict[int,list[int]], start: int, end: int) -> list[int]` — returns shortest path or [].", | |
| thinking=_dedent(""" | |
| BFS explores nodes level by level → guarantees shortest path in unweighted graph. | |
| Use deque for O(1) popleft. Track parent to reconstruct path. | |
| visited set prevents cycles. | |
| """), | |
| answer=_dedent(""" | |
| from collections import deque | |
| def bfs_shortest_path(graph: dict[int, list[int]], start: int, end: int) -> list[int]: | |
| if start == end: | |
| return [start] | |
| visited = {start} | |
| parent: dict[int, int] = {} | |
| queue = deque([start]) | |
| while queue: | |
| node = queue.popleft() | |
| for neighbor in graph.get(node, []): | |
| if neighbor not in visited: | |
| visited.add(neighbor) | |
| parent[neighbor] = node | |
| if neighbor == end: | |
| path = [] | |
| cur = end | |
| while cur != start: | |
| path.append(cur) | |
| cur = parent[cur] | |
| path.append(start) | |
| return path[::-1] | |
| queue.append(neighbor) | |
| return [] | |
| """), | |
| test_cases=_dedent(""" | |
| g = {1:[2,3], 2:[4], 3:[4,5], 4:[6], 5:[6], 6:[]} | |
| path = bfs_shortest_path(g, 1, 6) | |
| assert path[0] == 1 and path[-1] == 6 | |
| assert len(path) == 4 # shortest: 1→2→4→6 or 1→3→4→6 | |
| assert bfs_shortest_path(g, 1, 99) == [] | |
| """), | |
| ground_truth="[1,2,4,6]", | |
| level=6, category="graphs", time_complexity="O(V+E)", space_complexity="O(V)", | |
| ), | |
| CodeProblem( | |
| question="Write `def num_islands(grid: list[list[str]]) -> int` — count connected components of '1's. Use DFS.", | |
| thinking=_dedent(""" | |
| Iterate all cells. If '1', increment count and DFS to mark all connected '1's as '0'. | |
| DFS explores 4 directions. No extra visited needed — modify grid in place. | |
| """), | |
| answer=_dedent(""" | |
| def num_islands(grid: list[list[str]]) -> int: | |
| if not grid: | |
| return 0 | |
| rows, cols = len(grid), len(grid[0]) | |
| def dfs(r: int, c: int) -> None: | |
| if r < 0 or r >= rows or c < 0 or c >= cols or grid[r][c] != '1': | |
| return | |
| grid[r][c] = '0' | |
| dfs(r+1, c); dfs(r-1, c); dfs(r, c+1); dfs(r, c-1) | |
| count = 0 | |
| for r in range(rows): | |
| for c in range(cols): | |
| if grid[r][c] == '1': | |
| dfs(r, c) | |
| count += 1 | |
| return count | |
| """), | |
| test_cases=_dedent(""" | |
| g1 = [["1","1","0"],["0","1","0"],["0","0","1"]] | |
| assert num_islands([row[:] for row in g1]) == 2 | |
| g2 = [["1","1","1"],["1","1","1"],["1","1","1"]] | |
| assert num_islands([row[:] for row in g2]) == 1 | |
| """), | |
| ground_truth="2", | |
| level=6, category="graphs", time_complexity="O(V+E)", space_complexity="O(V)", | |
| ), | |
| CodeProblem( | |
| question="Write `def topological_sort(n: int, edges: list[tuple[int,int]]) -> list[int]` — return topo order or [] if cycle. Kahn's algorithm.", | |
| thinking=_dedent(""" | |
| Kahn's (BFS): compute in-degree for each node. | |
| Start with all nodes with in-degree 0 in queue. | |
| Pop node → add to result → decrement neighbors' in-degree → push if 0. | |
| If result length < n → cycle detected. | |
| """), | |
| answer=_dedent(""" | |
| from collections import deque, defaultdict | |
| def topological_sort(n: int, edges: list[tuple[int,int]]) -> list[int]: | |
| adj: dict[int, list[int]] = defaultdict(list) | |
| in_degree = [0] * n | |
| for u, v in edges: | |
| adj[u].append(v) | |
| in_degree[v] += 1 | |
| queue = deque(i for i in range(n) if in_degree[i] == 0) | |
| result: list[int] = [] | |
| while queue: | |
| node = queue.popleft() | |
| result.append(node) | |
| for nb in adj[node]: | |
| in_degree[nb] -= 1 | |
| if in_degree[nb] == 0: | |
| queue.append(nb) | |
| return result if len(result) == n else [] | |
| """), | |
| test_cases=_dedent(""" | |
| assert topological_sort(4, [(0,1),(0,2),(1,3),(2,3)]) in [[0,1,2,3],[0,2,1,3]] | |
| assert topological_sort(3, [(0,1),(1,2),(2,0)]) == [] # cycle | |
| assert topological_sort(1, []) == [0] | |
| """), | |
| ground_truth="[0,1,2,3]", | |
| level=6, category="graphs", time_complexity="O(V+E)", space_complexity="O(V+E)", | |
| ), | |
| ] | |
| # ─── Level 7: Mathematical Algorithms ──────────────────────────────────────── | |
| L7_PROBLEMS: list[CodeProblem] = [ | |
| CodeProblem( | |
| question="Write `def sieve_of_eratosthenes(n: int) -> list[int]` — return all primes ≤ n.", | |
| thinking=_dedent(""" | |
| Create boolean array size n+1, init True. | |
| Mark 0,1 as not prime. For p from 2 to √n: | |
| if prime[p]: mark p², p²+p, p²+2p, ... as not prime. | |
| Collect indices still True. | |
| """), | |
| answer=_dedent(""" | |
| def sieve_of_eratosthenes(n: int) -> list[int]: | |
| if n < 2: | |
| return [] | |
| is_prime = [True] * (n + 1) | |
| is_prime[0] = is_prime[1] = False | |
| p = 2 | |
| while p * p <= n: | |
| if is_prime[p]: | |
| for multiple in range(p * p, n + 1, p): | |
| is_prime[multiple] = False | |
| p += 1 | |
| return [i for i in range(2, n + 1) if is_prime[i]] | |
| """), | |
| test_cases=_dedent(""" | |
| assert sieve_of_eratosthenes(10) == [2,3,5,7] | |
| assert sieve_of_eratosthenes(1) == [] | |
| assert sieve_of_eratosthenes(30) == [2,3,5,7,11,13,17,19,23,29] | |
| """), | |
| ground_truth="[2,3,5,7]", | |
| level=7, category="math_algorithms", time_complexity="O(n log log n)", space_complexity="O(n)", | |
| ), | |
| CodeProblem( | |
| question="Write `def fast_power(base: int, exp: int, mod: int) -> int` — compute (base^exp) % mod using binary exponentiation.", | |
| thinking=_dedent(""" | |
| Binary exponentiation: if exp is odd, multiply base once, then square base and halve exp. | |
| Result = (result * base) % mod when bit is set. | |
| O(log exp) multiplications. | |
| """), | |
| answer=_dedent(""" | |
| def fast_power(base: int, exp: int, mod: int) -> int: | |
| result = 1 | |
| base %= mod | |
| while exp > 0: | |
| if exp & 1: | |
| result = result * base % mod | |
| base = base * base % mod | |
| exp >>= 1 | |
| return result | |
| """), | |
| test_cases=_dedent(""" | |
| assert fast_power(2, 10, 1000) == 24 | |
| assert fast_power(3, 0, 7) == 1 | |
| assert fast_power(2, 100, 1_000_000_007) == pow(2, 100, 1_000_000_007) | |
| """), | |
| ground_truth="24", | |
| level=7, category="math_algorithms", time_complexity="O(log exp)", space_complexity="O(1)", | |
| ), | |
| ] | |
| # ─── Level 8: Competitive Programming ──────────────────────────────────────── | |
| L8_PROBLEMS: list[CodeProblem] = [ | |
| CodeProblem( | |
| question="Write `def sliding_window_max(nums: list[int], k: int) -> list[int]` — max in each window of size k. O(n) using deque.", | |
| thinking=_dedent(""" | |
| Use monotonic deque (decreasing). Store indices. | |
| For each index i: | |
| 1. Remove from front if out of window (index <= i-k) | |
| 2. Remove from back while nums[back] <= nums[i] (smaller elements useless) | |
| 3. Append i | |
| 4. If i >= k-1, front of deque is max of current window. | |
| """), | |
| answer=_dedent(""" | |
| from collections import deque | |
| def sliding_window_max(nums: list[int], k: int) -> list[int]: | |
| result: list[int] = [] | |
| dq: deque[int] = deque() # indices, decreasing values | |
| for i, num in enumerate(nums): | |
| while dq and dq[0] <= i - k: | |
| dq.popleft() | |
| while dq and nums[dq[-1]] <= num: | |
| dq.pop() | |
| dq.append(i) | |
| if i >= k - 1: | |
| result.append(nums[dq[0]]) | |
| return result | |
| """), | |
| test_cases=_dedent(""" | |
| assert sliding_window_max([1,3,-1,-3,5,3,6,7], 3) == [3,3,5,5,6,7] | |
| assert sliding_window_max([1], 1) == [1] | |
| assert sliding_window_max([9,8,7,6,5], 2) == [9,8,7,6] | |
| """), | |
| ground_truth="[3,3,5,5,6,7]", | |
| level=8, category="competitive", time_complexity="O(n)", space_complexity="O(k)", | |
| ), | |
| CodeProblem( | |
| question="Write `def trapping_rain_water(height: list[int]) -> int` — total water trapped. Two-pointer O(n) O(1).", | |
| thinking=_dedent(""" | |
| Two pointers l=0, r=n-1. Track max_left, max_right. | |
| Move the pointer with smaller height: | |
| - If height[l] < height[r]: | |
| water += max(0, max_left - height[l]) | |
| max_left = max(max_left, height[l]) | |
| l++ | |
| - Else mirror for right side. | |
| Water = bounded by the shorter side. | |
| """), | |
| answer=_dedent(""" | |
| def trapping_rain_water(height: list[int]) -> int: | |
| l, r = 0, len(height) - 1 | |
| max_l = max_r = water = 0 | |
| while l < r: | |
| if height[l] < height[r]: | |
| if height[l] >= max_l: | |
| max_l = height[l] | |
| else: | |
| water += max_l - height[l] | |
| l += 1 | |
| else: | |
| if height[r] >= max_r: | |
| max_r = height[r] | |
| else: | |
| water += max_r - height[r] | |
| r -= 1 | |
| return water | |
| """), | |
| test_cases=_dedent(""" | |
| assert trapping_rain_water([0,1,0,2,1,0,1,3,2,1,2,1]) == 6 | |
| assert trapping_rain_water([4,2,0,3,2,5]) == 9 | |
| assert trapping_rain_water([1,1]) == 0 | |
| assert trapping_rain_water([3,0,2,0,4]) == 7 | |
| """), | |
| ground_truth="6", | |
| level=8, category="competitive", time_complexity="O(n)", space_complexity="O(1)", | |
| ), | |
| CodeProblem( | |
| question="Write `def word_break(s: str, word_dict: list[str]) -> bool` — can s be segmented using words in dict? DP O(n²).", | |
| thinking=_dedent(""" | |
| dp[i] = True if s[:i] can be segmented. | |
| dp[0] = True (empty string). | |
| For i in 1..n: dp[i] = any(dp[j] and s[j:i] in word_set for j in 0..i) | |
| Use set for O(1) word lookup. | |
| """), | |
| answer=_dedent(""" | |
| def word_break(s: str, word_dict: list[str]) -> bool: | |
| word_set = set(word_dict) | |
| n = len(s) | |
| dp = [False] * (n + 1) | |
| dp[0] = True | |
| for i in range(1, n + 1): | |
| for j in range(i): | |
| if dp[j] and s[j:i] in word_set: | |
| dp[i] = True | |
| break | |
| return dp[n] | |
| """), | |
| test_cases=_dedent(""" | |
| assert word_break("leetcode", ["leet","code"]) == True | |
| assert word_break("applepenapple", ["apple","pen"]) == True | |
| assert word_break("catsandog", ["cats","dog","sand","and","cat"]) == False | |
| """), | |
| ground_truth="true", | |
| level=8, category="competitive", time_complexity="O(n²)", space_complexity="O(n)", | |
| ), | |
| ] | |
| # ─── Wrap into training format ──────────────────────────────────────────────── | |
| def _to_training_record(prob: CodeProblem) -> dict: | |
| """แปลง CodeProblem → training record สำหรับ GRPO / SFT""" | |
| system = ( | |
| "You are an expert Python programmer. Think step-by-step in <think>...</think>, " | |
| "then provide the complete, correct Python solution in <answer>...</answer>.\n" | |
| "Include type hints and follow PEP 8." | |
| ) | |
| full_answer = ( | |
| f"<think>\n{prob.thinking}\n" | |
| f"Time: {prob.time_complexity}, Space: {prob.space_complexity}\n</think>\n" | |
| f"<answer>\n{prob.answer}\n</answer>" | |
| ) | |
| return { | |
| "question": prob.question, | |
| "thinking": prob.thinking, | |
| "answer": full_answer, | |
| "test_cases": prob.test_cases, | |
| "ground_truth": prob.ground_truth, | |
| "level": prob.level, | |
| "category": prob.category, | |
| "time_complexity": prob.time_complexity, | |
| "lang": prob.lang, | |
| "source": prob.source, | |
| "topic": prob.topic, | |
| "context": f"<system>{system}</system>", | |
| } | |
| def generate_code_dataset(output_path: Path = CODE_OUTPUT) -> int: | |
| all_problems = ( | |
| L1_PROBLEMS + L2_PROBLEMS + L3_PROBLEMS + L4_PROBLEMS + | |
| L5_PROBLEMS + L6_PROBLEMS + L7_PROBLEMS + L8_PROBLEMS | |
| ) | |
| total = 0 | |
| with open(output_path, "w", encoding="utf-8") as f: | |
| for prob in all_problems: | |
| record = _to_training_record(prob) | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| total += 1 | |
| cats = {} | |
| for p in all_problems: | |
| cats[p.category] = cats.get(p.category, 0) + 1 | |
| print(f"Code dataset: {total} problems") | |
| for cat, cnt in sorted(cats.items()): | |
| print(f" {cat}: {cnt}") | |
| print(f"→ {output_path}") | |
| return total | |
| if __name__ == "__main__": | |
| generate_code_dataset() | |
Xet Storage Details
- Size:
- 36.2 kB
- Xet hash:
- 02048edfb18dd39753070139f1616eb592d1d6743d3e63569f00cf1ea600aefe
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.