Spaces:
Running
Running
| """ | |
| Procedural Task Generator for the Skill Invocation Environment. | |
| Generates unlimited unique tasks at runtime using seeded randomization, | |
| preventing LLM memorization of fixed task content. Each template produces | |
| a task dict compatible with TASK_BANK format, plus any generated skills. | |
| Templates: | |
| 1. Auth Protocol — randomized API name, hash algo, signing format, header format | |
| 2. Binary Format — randomized format name, magic bytes, endianness, header fields | |
| """ | |
| import hashlib | |
| import hmac | |
| import base64 | |
| import random | |
| import struct | |
| import binascii | |
| from typing import Callable | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _strip_markdown_fences(code: str) -> str: | |
| """Remove markdown code fences if present.""" | |
| import re | |
| code = code.strip() | |
| match = re.search(r'```(?:python)?\s*\n(.*?)```', code, re.DOTALL) | |
| if match: | |
| return match.group(1) | |
| if code.startswith("```"): | |
| lines = code.split("\n") | |
| lines = [l for l in lines if not l.strip().startswith("```")] | |
| return "\n".join(lines) | |
| return code | |
| _SAFE_IMPORTS = "import hmac, hashlib, base64, struct, json, re, binascii, math" | |
| def _exec_verifier(func_name: str, test_cases: list[dict]) -> Callable[[str], bool]: | |
| """Execute agent code, extract func_name, run test_cases.""" | |
| def verify(answer: str) -> bool: | |
| try: | |
| code = _strip_markdown_fences(answer) | |
| namespace: dict = {} | |
| exec(_SAFE_IMPORTS, namespace) | |
| exec(code, namespace) | |
| if func_name not in namespace: | |
| return False | |
| func = namespace[func_name] | |
| for tc in test_cases: | |
| result = func(*tc.get("args", []), **tc.get("kwargs", {})) | |
| if not tc["check"](result): | |
| return False | |
| return True | |
| except Exception: | |
| return False | |
| return verify | |
| # --------------------------------------------------------------------------- | |
| # Distractor skill pool for procedural tasks | |
| # --------------------------------------------------------------------------- | |
| _DISTRACTOR_SKILLS = [ | |
| { | |
| "id": "skill_proc_dist_001", | |
| "name": "Rate Limiting Strategies", | |
| "short_description": "Common rate limiting algorithms: token bucket, sliding window, leaky bucket.", | |
| "full_content": ( | |
| "# Rate Limiting Strategies\n\n" | |
| "## Token Bucket\nMaintain a bucket of tokens that refills at rate R. " | |
| "Each request consumes one token. Reject when empty.\n\n" | |
| "## Sliding Window\nTrack request timestamps in a window of W seconds. " | |
| "Reject when count exceeds threshold T.\n\n" | |
| "## Leaky Bucket\nQueue requests and process at constant rate. " | |
| "Reject when queue is full." | |
| ), | |
| }, | |
| { | |
| "id": "skill_proc_dist_002", | |
| "name": "Webhook Configuration", | |
| "short_description": "How to set up and manage webhook endpoints for event notifications.", | |
| "full_content": ( | |
| "# Webhook Configuration\n\n" | |
| "Register an endpoint URL via POST /webhooks with event types. " | |
| "Verify signatures using HMAC-SHA256 of the payload with your webhook secret. " | |
| "Respond with 200 within 5s or the webhook will be retried 3 times with exponential backoff." | |
| ), | |
| }, | |
| { | |
| "id": "skill_proc_dist_003", | |
| "name": "Data Compression Algorithms", | |
| "short_description": "Overview of LZ4, Zstd, and DEFLATE compression for binary data.", | |
| "full_content": ( | |
| "# Data Compression\n\n" | |
| "## LZ4\nFast compression, moderate ratio. Use for real-time streaming.\n\n" | |
| "## Zstd\nHigh ratio with configurable levels (1-22). Good for storage.\n\n" | |
| "## DEFLATE\nWidely compatible (gzip/zip). Use for interchange formats." | |
| ), | |
| }, | |
| { | |
| "id": "skill_proc_dist_004", | |
| "name": "Service Mesh Routing", | |
| "short_description": "Traffic splitting, circuit breaking, and retry policies for microservices.", | |
| "full_content": ( | |
| "# Service Mesh Routing\n\n" | |
| "Configure traffic splitting with weight-based routing. " | |
| "Set circuit breakers with failure thresholds and recovery windows. " | |
| "Retry policies: max 3 retries with exponential backoff, only on 5xx errors." | |
| ), | |
| }, | |
| { | |
| "id": "skill_proc_dist_005", | |
| "name": "OAuth2 Token Exchange", | |
| "short_description": "OAuth2 authorization code flow, token refresh, and scope management.", | |
| "full_content": ( | |
| "# OAuth2 Token Exchange\n\n" | |
| "1. Redirect user to /authorize with client_id and scope.\n" | |
| "2. Exchange authorization code for access token via POST /token.\n" | |
| "3. Refresh expired tokens using refresh_token grant type.\n" | |
| "4. Validate scopes on each API call." | |
| ), | |
| }, | |
| { | |
| "id": "skill_proc_dist_006", | |
| "name": "Database Connection Pooling", | |
| "short_description": "Connection pool sizing, timeout strategies, and health check configuration.", | |
| "full_content": ( | |
| "# Database Connection Pooling\n\n" | |
| "Set pool size to 2x CPU cores. Use 30s idle timeout.\n" | |
| "Enable health checks with SELECT 1 every 10s.\n" | |
| "Use connection validation on borrow, not on return." | |
| ), | |
| }, | |
| { | |
| "id": "skill_proc_dist_007", | |
| "name": "Message Queue Patterns", | |
| "short_description": "Pub/sub, fan-out, and dead letter queue patterns for async messaging.", | |
| "full_content": ( | |
| "# Message Queue Patterns\n\n" | |
| "## Pub/Sub\nPublish to topic, multiple subscribers receive copies.\n" | |
| "## Fan-Out\nSingle message routed to N queues for parallel processing.\n" | |
| "## Dead Letter\nFailed messages after max retries sent to DLQ for inspection." | |
| ), | |
| }, | |
| { | |
| "id": "skill_proc_dist_008", | |
| "name": "TLS Certificate Management", | |
| "short_description": "Certificate rotation, chain validation, and pinning strategies.", | |
| "full_content": ( | |
| "# TLS Certificate Management\n\n" | |
| "Rotate certificates 30 days before expiry. Validate full chain including " | |
| "intermediates. Use certificate pinning for mobile clients. " | |
| "Store private keys in HSM or KMS, never on disk." | |
| ), | |
| }, | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Template 1: Auth Protocol | |
| # --------------------------------------------------------------------------- | |
| _API_NAMES = [ | |
| "Zephyr", "Nebula", "Quantum", "Prism", "Helix", | |
| "Vortex", "Apex", "Nimbus", "Zenith", "Orion", | |
| "Pulse", "Flux", "Stratos", "Cipher", "Forge", | |
| "Atlas", "Beacon", "Crest", "Drift", "Echo", | |
| ] | |
| _HASH_ALGOS = [ | |
| ("sha256", "SHA-256", hashlib.sha256), | |
| ("sha384", "SHA-384", hashlib.sha384), | |
| ("sha512", "SHA-512", hashlib.sha512), | |
| ("md5", "MD5", hashlib.md5), | |
| ] | |
| _SIGNING_FORMATS = [ | |
| # (format_name, format_template, builder) | |
| ( | |
| "key:timestamp", | |
| "{api_key}:{timestamp}", | |
| lambda api_key, timestamp, **_: f"{api_key}:{timestamp}", | |
| ), | |
| ( | |
| "timestamp.key", | |
| "{timestamp}.{api_key}", | |
| lambda api_key, timestamp, **_: f"{timestamp}.{api_key}", | |
| ), | |
| ( | |
| "key|timestamp|method", | |
| "{api_key}|{timestamp}|{method}", | |
| lambda api_key, timestamp, method="GET", **_: f"{api_key}|{timestamp}|{method}", | |
| ), | |
| ( | |
| "method:key:timestamp", | |
| "{method}:{api_key}:{timestamp}", | |
| lambda api_key, timestamp, method="GET", **_: f"{method}:{api_key}:{timestamp}", | |
| ), | |
| ] | |
| _HEADER_FORMATS = [ | |
| # (header_name, prefix, format_builder) | |
| ( | |
| "X-{api}-Auth", | |
| "{prefix}", | |
| lambda prefix, api_key, sig, timestamp, **_: f"{prefix} {api_key}:{sig}:{timestamp}", | |
| ), | |
| ( | |
| "Authorization", | |
| "Bearer", | |
| lambda prefix, api_key, sig, timestamp, **_: f"Bearer {sig}", | |
| ), | |
| ( | |
| "X-{api}-Signature", | |
| "{prefix}", | |
| lambda prefix, api_key, sig, timestamp, **_: f"{prefix} sig={sig},key={api_key},ts={timestamp}", | |
| ), | |
| ( | |
| "X-{api}-Token", | |
| "{prefix}", | |
| lambda prefix, api_key, sig, timestamp, **_: f"{prefix} {timestamp}:{sig}", | |
| ), | |
| ] | |
| def _gen_auth_protocol(rng: random.Random, seed: int) -> dict: | |
| """Generate an auth protocol task with randomized parameters.""" | |
| api_name = rng.choice(_API_NAMES) | |
| api_version = f"{rng.randint(1, 9)}.{rng.randint(0, 9)}" | |
| api_prefix = api_name[:3].upper() | |
| hash_id, hash_name, hash_func = rng.choice(_HASH_ALGOS) | |
| signing_fmt_name, signing_fmt_template, signing_builder = rng.choice(_SIGNING_FORMATS) | |
| # Does this format use a method parameter? | |
| uses_method = "method" in signing_fmt_template | |
| header_template_name, header_prefix_template, header_builder = rng.choice(_HEADER_FORMATS) | |
| header_name = header_template_name.replace("{api}", api_name) | |
| header_prefix = header_prefix_template.replace("{prefix}", f"{api_prefix}") | |
| # Determine function signature based on whether method is needed | |
| if uses_method: | |
| func_sig = "api_key: str, timestamp: int, method: str = 'GET'" | |
| func_name = "generate_auth_header" | |
| else: | |
| func_sig = "api_key: str, timestamp: int" | |
| func_name = "generate_auth_header" | |
| # Build the signing string description | |
| signing_desc = signing_fmt_template.replace("{api_key}", "API_KEY").replace( | |
| "{timestamp}", "TIMESTAMP" | |
| ).replace("{method}", "METHOD") | |
| # Build expected computation function | |
| def compute_expected(api_key, timestamp, method="GET"): | |
| signing_string = signing_builder( | |
| api_key=api_key, timestamp=timestamp, method=method | |
| ) | |
| digest = hmac.new( | |
| api_key.encode(), signing_string.encode(), hash_func | |
| ).digest() | |
| sig = base64.b64encode(digest).decode() | |
| return { | |
| header_name: header_builder( | |
| prefix=header_prefix, api_key=api_key, | |
| sig=sig, timestamp=timestamp, method=method, | |
| ) | |
| } | |
| # Build test cases | |
| test_keys = [ | |
| (f"test_key_{seed}", 1700000000 + seed), | |
| (f"another_key_{seed}", 1700000001 + seed), | |
| (f"k{seed}", seed), | |
| ] | |
| if uses_method: | |
| test_keys_with_method = [ | |
| (k, t, rng.choice(["GET", "POST", "PUT", "DELETE"])) | |
| for k, t in test_keys | |
| ] | |
| else: | |
| test_keys_with_method = [(k, t, "GET") for k, t in test_keys] | |
| test_cases = [] | |
| for api_key, timestamp, method in test_keys_with_method: | |
| expected = compute_expected(api_key, timestamp, method) | |
| if uses_method: | |
| test_cases.append({ | |
| "args": [api_key, timestamp, method], | |
| "check": lambda result, exp=expected: ( | |
| isinstance(result, dict) and result == exp | |
| ), | |
| }) | |
| else: | |
| test_cases.append({ | |
| "args": [api_key, timestamp], | |
| "check": lambda result, exp=expected: ( | |
| isinstance(result, dict) and result == exp | |
| ), | |
| }) | |
| verifier = _exec_verifier(func_name, test_cases) | |
| # Skill content (the procedural knowledge the agent needs) | |
| if uses_method: | |
| signing_step = ( | |
| f"2. Build signing string: `{signing_desc}` " | |
| f"(concatenate the API key, timestamp, and HTTP method)" | |
| ) | |
| else: | |
| signing_step = ( | |
| f"2. Build signing string: `{signing_desc}` " | |
| f"(concatenate the API key and timestamp)" | |
| ) | |
| skill_content = ( | |
| f"# {api_name} API v{api_version} Authentication\n\n" | |
| f"## Auth Header Generation\n\n" | |
| f"To authenticate requests to the {api_name} API:\n\n" | |
| f"1. Obtain your API key from the dashboard\n" | |
| f"{signing_step}\n" | |
| f"3. Compute HMAC-{hash_name} of the signing string, " | |
| f"using the API key as the HMAC key\n" | |
| f"4. Base64-encode the raw digest bytes\n" | |
| f"5. Set header `{header_name}` to: " | |
| f"`{header_builder(prefix=header_prefix, api_key='<KEY>', sig='<SIG>', timestamp='<TS>', method='<METHOD>')}`\n\n" | |
| f"## Example\n\n" | |
| f"```python\n" | |
| f"import hmac, hashlib, base64\n\n" | |
| f"def {func_name}({func_sig}):\n" | |
| f" signing_string = f\"{signing_fmt_template}\"\n" | |
| f" digest = hmac.new(api_key.encode(), signing_string.encode(), hashlib.{hash_id}).digest()\n" | |
| f" sig = base64.b64encode(digest).decode()\n" | |
| f" return {{'{header_name}': ..." | |
| f"}}\n" | |
| f"```\n\n" | |
| f"**Important**: Use `hmac.new()` (not `hashlib` directly) with `hashlib.{hash_id}` as the digest algorithm.\n" | |
| f"The HMAC key is always the API key encoded as UTF-8.\n" | |
| ) | |
| # Task description | |
| if uses_method: | |
| task_desc = ( | |
| f"Write a Python function `{func_name}({func_sig})` that generates " | |
| f"the authentication header for the {api_name} API v{api_version}.\n\n" | |
| f"The function should:\n" | |
| f"1. Build the signing string by combining the API key, timestamp, and HTTP method " | |
| f"in the format: `{signing_desc}`\n" | |
| f"2. Compute the HMAC-{hash_name} digest using the API key as the HMAC key\n" | |
| f"3. Base64-encode the raw digest\n" | |
| f"4. Return a dict with a single key `{header_name}` containing the formatted header value\n\n" | |
| f"You will need to invoke the relevant skill to learn the exact header format and signing protocol." | |
| ) | |
| else: | |
| task_desc = ( | |
| f"Write a Python function `{func_name}({func_sig})` that generates " | |
| f"the authentication header for the {api_name} API v{api_version}.\n\n" | |
| f"The function should:\n" | |
| f"1. Build the signing string by combining the API key and timestamp " | |
| f"in the format: `{signing_desc}`\n" | |
| f"2. Compute the HMAC-{hash_name} digest using the API key as the HMAC key\n" | |
| f"3. Base64-encode the raw digest\n" | |
| f"4. Return a dict with a single key `{header_name}` containing the formatted header value\n\n" | |
| f"You will need to invoke the relevant skill to learn the exact header format and signing protocol." | |
| ) | |
| # Skill entry | |
| skill_id = f"skill_proc_auth_{seed}" | |
| skill = { | |
| "id": skill_id, | |
| "name": f"{api_name} API Authentication", | |
| "short_description": ( | |
| f"Authentication protocol for the {api_name} API v{api_version}. " | |
| f"Covers signing, header format, and HMAC computation." | |
| ), | |
| "full_content": skill_content, | |
| } | |
| # Pick 4-6 distractor skills | |
| n_distractors = rng.randint(4, min(6, len(_DISTRACTOR_SKILLS))) | |
| distractor_ids = [d["id"] for d in rng.sample(_DISTRACTOR_SKILLS, n_distractors)] | |
| task = { | |
| "id": f"task_proc_auth_{seed}", | |
| "description": task_desc, | |
| "difficulty": "easy", | |
| "relevant_skills": [skill_id], | |
| "distractor_skills": distractor_ids, | |
| "verifier": verifier, | |
| "source": "procedural", | |
| "template": "auth_protocol", | |
| } | |
| # Generated skills dict (relevant + distractors) | |
| generated_skills = {skill_id: skill} | |
| for d in _DISTRACTOR_SKILLS: | |
| generated_skills[d["id"]] = d | |
| return {"task": task, "skills": generated_skills} | |
| # --------------------------------------------------------------------------- | |
| # Template 2: Binary Format | |
| # --------------------------------------------------------------------------- | |
| _FORMAT_NAMES = [ | |
| "NovaBin", "HexPack", "DataForge", "ByteStream", "PacketX", | |
| "BinFrame", "CrystalPack", "FluxBinary", "QuantumPack", "NexusBin", | |
| "VectorPack", "PulseBin", "ArchivX", "StreamPack", "CoreBin", | |
| "SignalPack", "MatrixBin", "GridPack", "TensorBin", "WavePack", | |
| ] | |
| _MAGIC_BYTES_OPTIONS = [ | |
| (b"NOVB", "NOVB"), (b"HXPK", "HXPK"), (b"DFGE", "DFGE"), | |
| (b"BYST", "BYST"), (b"PKTX", "PKTX"), (b"BNFR", "BNFR"), | |
| (b"CRPK", "CRPK"), (b"FLXB", "FLXB"), (b"QPAK", "QPAK"), | |
| (b"NXBN", "NXBN"), | |
| ] | |
| _FLAG_SETS = [ | |
| # (flag_names, bit_positions) | |
| (["compressed", "encrypted", "checksummed"], [0, 1, 2]), | |
| (["compressed", "signed", "indexed"], [0, 1, 2]), | |
| (["encrypted", "compressed", "verified"], [0, 1, 2]), | |
| (["indexed", "compressed", "encrypted", "signed"], [0, 1, 2, 3]), | |
| ] | |
| def _gen_binary_format(rng: random.Random, seed: int) -> dict: | |
| """Generate a binary format parsing task with randomized parameters.""" | |
| format_name = rng.choice(_FORMAT_NAMES) | |
| magic_bytes, magic_str = rng.choice(_MAGIC_BYTES_OPTIONS) | |
| endian = rng.choice(["big", "little"]) | |
| endian_char = ">" if endian == "big" else "<" | |
| # Version format: major.minor packed as 16-bit | |
| version_major = rng.randint(1, 5) | |
| version_minor = rng.randint(0, 9) | |
| version_packed = (version_major << 8) | version_minor | |
| # Flag configuration | |
| flag_names, flag_bits = rng.choice(_FLAG_SETS) | |
| # Choose header fields order (always: magic, version, record_count, flags, crc32) | |
| func_name = "parse_header" | |
| # Build test headers | |
| def build_header(record_count: int, flag_values: dict) -> bytes: | |
| buf = bytearray() | |
| buf += magic_bytes | |
| buf += struct.pack(f"{endian_char}H", version_packed) | |
| buf += struct.pack(f"{endian_char}I", record_count) | |
| flag_int = 0 | |
| for fname, fbit in zip(flag_names, flag_bits): | |
| if flag_values.get(fname, False): | |
| flag_int |= (1 << fbit) | |
| buf += struct.pack(f"{endian_char}H", flag_int) | |
| # CRC32 of everything so far | |
| crc = binascii.crc32(bytes(buf)) & 0xFFFFFFFF | |
| buf += struct.pack(f"{endian_char}I", crc) | |
| return bytes(buf) | |
| def expected_parse(record_count: int, flag_values: dict) -> dict: | |
| result = { | |
| "version": version_packed, | |
| "record_count": record_count, | |
| } | |
| for fname in flag_names: | |
| result[fname] = flag_values.get(fname, False) | |
| return result | |
| # Test case 1: some flags set | |
| flags1 = {} | |
| for fname in flag_names: | |
| flags1[fname] = rng.choice([True, False]) | |
| # Ensure at least one flag is True | |
| flags1[flag_names[0]] = True | |
| header1 = build_header(42 + seed % 100, flags1) | |
| expected1 = expected_parse(42 + seed % 100, flags1) | |
| # Test case 2: no flags | |
| flags2 = {fname: False for fname in flag_names} | |
| header2 = build_header(1, flags2) | |
| expected2 = expected_parse(1, flags2) | |
| # Test case 3: all flags set | |
| flags3 = {fname: True for fname in flag_names} | |
| header3 = build_header(1000 + seed, flags3) | |
| expected3 = expected_parse(1000 + seed, flags3) | |
| test_cases = [ | |
| { | |
| "args": [header1], | |
| "check": lambda r, exp=expected1: ( | |
| isinstance(r, dict) | |
| and r.get("record_count") == exp["record_count"] | |
| and all(r.get(fn) == exp[fn] for fn in exp if fn not in ("version",)) | |
| ), | |
| }, | |
| { | |
| "args": [header2], | |
| "check": lambda r, exp=expected2: ( | |
| isinstance(r, dict) | |
| and r.get("record_count") == exp["record_count"] | |
| and all(r.get(fn) is False for fn in flag_names) | |
| ), | |
| }, | |
| { | |
| "args": [header3], | |
| "check": lambda r, exp=expected3: ( | |
| isinstance(r, dict) | |
| and r.get("record_count") == exp["record_count"] | |
| and all(r.get(fn) is True for fn in flag_names) | |
| ), | |
| }, | |
| ] | |
| verifier = _exec_verifier(func_name, test_cases) | |
| # Flag description for skill content | |
| flag_desc_lines = [] | |
| for fname, fbit in zip(flag_names, flag_bits): | |
| flag_desc_lines.append(f" - Bit {fbit}: `{fname}`") | |
| flag_desc = "\n".join(flag_desc_lines) | |
| header_size = 4 + 2 + 4 + 2 + 4 # magic + version + record_count + flags + crc32 | |
| skill_content = ( | |
| f"# {format_name} Binary Format Specification\n\n" | |
| f"## Header Layout ({header_size} bytes)\n\n" | |
| f"| Offset | Size | Field | Description |\n" | |
| f"|--------|------|-------|-------------|\n" | |
| f"| 0 | 4 | Magic | `{magic_str}` (ASCII) |\n" | |
| f"| 4 | 2 | Version | {endian}-endian uint16, packed as (major << 8) | minor |\n" | |
| f"| 6 | 4 | Record Count | {endian}-endian uint32 |\n" | |
| f"| 10 | 2 | Flags | {endian}-endian uint16, bitfield |\n" | |
| f"| 12 | 4 | CRC32 | {endian}-endian uint32, CRC32 of bytes 0-11 |\n\n" | |
| f"## Flags Bitfield\n\n" | |
| f"{flag_desc}\n\n" | |
| f"## Byte Order\n\n" | |
| f"All multi-byte fields are **{endian}-endian** " | |
| f"(struct format: `'{endian_char}'`).\n\n" | |
| f"## Validation\n\n" | |
| f"1. Check magic bytes match `{magic_str}`\n" | |
| f"2. Compute CRC32 of bytes 0..11 and compare with stored CRC32 at offset 12\n" | |
| f"3. If CRC mismatch, raise ValueError\n\n" | |
| f"## Parsing Example\n\n" | |
| f"```python\n" | |
| f"import struct, binascii\n\n" | |
| f"def {func_name}(data: bytes) -> dict:\n" | |
| f" magic = data[0:4]\n" | |
| f" assert magic == b'{magic_str}'\n" | |
| f" version = struct.unpack('{endian_char}H', data[4:6])[0]\n" | |
| f" record_count = struct.unpack('{endian_char}I', data[6:10])[0]\n" | |
| f" flags = struct.unpack('{endian_char}H', data[10:12])[0]\n" | |
| f" crc_stored = struct.unpack('{endian_char}I', data[12:16])[0]\n" | |
| f" crc_computed = binascii.crc32(data[0:12]) & 0xFFFFFFFF\n" | |
| f" if crc_stored != crc_computed:\n" | |
| f" raise ValueError('CRC mismatch')\n" | |
| f" return {{\n" | |
| f" 'version': version,\n" | |
| f" 'record_count': record_count,\n" | |
| f" ... # extract flags from bitfield\n" | |
| f" }}\n" | |
| f"```\n" | |
| ) | |
| task_desc = ( | |
| f"Write a Python function `{func_name}(data: bytes) -> dict` that parses " | |
| f"a {format_name} binary file header.\n\n" | |
| f"The function should:\n" | |
| f"1. Validate the 4-byte magic number\n" | |
| f"2. Parse the version (uint16), record count (uint32), and flags (uint16)\n" | |
| f"3. Validate the CRC32 checksum\n" | |
| f"4. Return a dict with keys: `version`, `record_count`, " | |
| + ", ".join(f"`{fn}`" for fn in flag_names) + " (booleans from bitfield)\n\n" | |
| f"The exact byte layout, endianness, and flag bit positions are specified " | |
| f"in the {format_name} format skill. You must invoke it to get the details." | |
| ) | |
| skill_id = f"skill_proc_bin_{seed}" | |
| skill = { | |
| "id": skill_id, | |
| "name": f"{format_name} Format Specification", | |
| "short_description": ( | |
| f"Binary header format for {format_name} files. " | |
| f"Defines magic bytes, field layout, flags, and CRC32 validation." | |
| ), | |
| "full_content": skill_content, | |
| } | |
| n_distractors = rng.randint(4, min(6, len(_DISTRACTOR_SKILLS))) | |
| distractor_ids = [d["id"] for d in rng.sample(_DISTRACTOR_SKILLS, n_distractors)] | |
| task = { | |
| "id": f"task_proc_bin_{seed}", | |
| "description": task_desc, | |
| "difficulty": "easy", | |
| "relevant_skills": [skill_id], | |
| "distractor_skills": distractor_ids, | |
| "verifier": verifier, | |
| "source": "procedural", | |
| "template": "binary_format", | |
| } | |
| generated_skills = {skill_id: skill} | |
| for d in _DISTRACTOR_SKILLS: | |
| generated_skills[d["id"]] = d | |
| return {"task": task, "skills": generated_skills} | |
| # --------------------------------------------------------------------------- | |
| # TaskGenerator | |
| # --------------------------------------------------------------------------- | |
| _TEMPLATES = { | |
| "auth_protocol": _gen_auth_protocol, | |
| "binary_format": _gen_binary_format, | |
| } | |
| class TaskGenerator: | |
| """ | |
| Procedural task generator with seeded randomization. | |
| Usage: | |
| gen = TaskGenerator(seed=42) | |
| result = gen.generate() # returns {"task": ..., "skills": ...} | |
| result = gen.generate(template="auth_protocol") # specific template | |
| """ | |
| def __init__(self, seed: int = 0): | |
| self._base_seed = seed | |
| self._counter = 0 | |
| def generate(self, template: str | None = None) -> dict: | |
| """ | |
| Generate a task. Returns {"task": dict, "skills": dict}. | |
| Args: | |
| template: Optional template name. If None, picks randomly. | |
| """ | |
| episode_seed = self._base_seed * 10000 + self._counter | |
| self._counter += 1 | |
| rng = random.Random(episode_seed) | |
| if template is None: | |
| template = rng.choice(list(_TEMPLATES.keys())) | |
| if template not in _TEMPLATES: | |
| raise ValueError(f"Unknown template: {template}. Available: {list(_TEMPLATES.keys())}") | |
| return _TEMPLATES[template](rng, episode_seed) | |
| def generate_with_seed(self, seed: int, template: str | None = None) -> dict: | |
| """ | |
| Generate a task with an explicit seed (deterministic). | |
| Args: | |
| seed: Exact seed to use for this generation. | |
| template: Optional template name. | |
| """ | |
| rng = random.Random(seed) | |
| if template is None: | |
| template = rng.choice(list(_TEMPLATES.keys())) | |
| if template not in _TEMPLATES: | |
| raise ValueError(f"Unknown template: {template}. Available: {list(_TEMPLATES.keys())}") | |
| return _TEMPLATES[template](rng, seed) | |
| def available_templates(self) -> list[str]: | |
| return list(_TEMPLATES.keys()) | |