| |
| """ |
| GLADIUS v4.0 — Bytecode Tokenizer |
| |
| A structural tokenizer for bytecode. Not BPE. Not byte-level. |
| Direct opcode-to-token mapping with argument quantization. |
| |
| Design: |
| Token 0-255: Reserved for byte-level (machine code curriculum, future) |
| Token 256-258: PAD, BOS, EOS |
| Token 259-511: Opcodes (146 used, room for 253) |
| Token 512-639: Numeric arguments (128 quantized buckets) |
| Token 640-671: Register/local indices (0-31) |
| Token 672-687: Comparison operators (16 slots) |
| Token 688-703: Task markers (NEXT, STACK, EXEC, FILL, etc.) |
| Token 704-767: Stack names, difficulty markers, structural tokens |
| Token 768+: Overflow / future expansion |
| |
| Total vocab: ~768 active tokens (fits easily in 32K embedding space) |
| |
| Encoding is O(1) per token — table lookup, no iteration, no probability. |
| Decoding is deterministic — token → exact opcode string. |
| """ |
|
|
| import re |
| from typing import List, Optional, Dict, Tuple |
| from pathlib import Path |
|
|
|
|
| class BytecodeTokenizer: |
| """Structural tokenizer for Python/WASM/EVM bytecode.""" |
| |
| |
| BYTE_START = 0 |
| PAD = 256 |
| BOS = 257 |
| EOS = 258 |
| OPCODE_START = 259 |
| NUMERIC_START = 512 |
| REGISTER_START = 640 |
| COMPARE_START = 672 |
| TASK_START = 688 |
| STRUCT_START = 704 |
| |
| |
| |
| |
| |
| |
| |
| COMPARE_OPS = ['<', '>', '==', '!=', '<=', '>=', |
| 'lt_s', 'lt_u', 'gt_s', 'gt_u', 'le_s', 'le_u', |
| 'ge_s', 'ge_u', 'eq', 'ne'] |
| |
| TASK_MARKERS = ['NEXT', 'STACK', 'EXEC', 'FILL', 'TRACE', 'OUT', 'QED', |
| 'D1', 'D2', 'D3', 'D4', 'D5', |
| 'python', 'wasm', 'evm'] |
| |
| STRUCT_TOKENS = ['(', ')', ',', '|', ':', '=', |
| 'o=', 'a=', |
| 'computed', |
| 'SEPARATOR'] |
| |
| def __init__(self, vocab_file: Optional[str] = None): |
| """Initialize with opcode vocabulary. |
| |
| If vocab_file is None, uses built-in vocabulary. |
| """ |
| self._opcode_to_id: Dict[str, int] = {} |
| self._id_to_opcode: Dict[int, str] = {} |
| self._compare_to_id: Dict[str, int] = {} |
| self._task_to_id: Dict[str, int] = {} |
| self._struct_to_id: Dict[str, int] = {} |
| |
| |
| opcodes = self._get_builtin_opcodes() |
| if vocab_file and Path(vocab_file).exists(): |
| with open(vocab_file) as f: |
| file_ops = [line.strip() for line in f if line.strip()] |
| |
| opcodes = list(dict.fromkeys(file_ops + opcodes)) |
| |
| for i, op in enumerate(opcodes): |
| tid = self.OPCODE_START + i |
| self._opcode_to_id[op] = tid |
| self._id_to_opcode[tid] = op |
| |
| |
| for i, cmp in enumerate(self.COMPARE_OPS): |
| tid = self.COMPARE_START + i |
| self._compare_to_id[cmp] = tid |
| self._id_to_opcode[tid] = f"CMP:{cmp}" |
| |
| |
| for i, task in enumerate(self.TASK_MARKERS): |
| tid = self.TASK_START + i |
| self._task_to_id[task] = tid |
| self._id_to_opcode[tid] = f"TASK:{task}" |
| |
| |
| for i, st in enumerate(self.STRUCT_TOKENS): |
| tid = self.STRUCT_START + i |
| self._struct_to_id[st] = tid |
| self._id_to_opcode[tid] = f"STRUCT:{st}" |
| |
| self.vocab_size = self.STRUCT_START + len(self.STRUCT_TOKENS) + 1 |
| self._num_opcodes = len(opcodes) |
| |
| def _get_builtin_opcodes(self) -> List[str]: |
| """Built-in opcode vocabulary covering Python/WASM/EVM.""" |
| return [ |
| |
| 'LOAD_CONST', 'LOAD_FAST', 'LOAD_GLOBAL', 'LOAD_NAME', |
| 'LOAD_ATTR', 'LOAD_DEREF', 'LOAD_CLOSURE', |
| 'STORE_FAST', 'STORE_GLOBAL', 'STORE_NAME', 'STORE_ATTR', |
| 'STORE_DEREF', 'STORE_SUBSCR', |
| 'BINARY_ADD', 'BINARY_SUBTRACT', 'BINARY_MULTIPLY', |
| 'BINARY_TRUE_DIVIDE', 'BINARY_FLOOR_DIVIDE', 'BINARY_MODULO', |
| 'BINARY_POWER', 'BINARY_AND', 'BINARY_OR', 'BINARY_XOR', |
| 'BINARY_LSHIFT', 'BINARY_RSHIFT', 'BINARY_SUBSCR', |
| 'UNARY_POSITIVE', 'UNARY_NEGATIVE', 'UNARY_NOT', 'UNARY_INVERT', |
| 'COMPARE_OP', |
| 'JUMP_ABSOLUTE', 'JUMP_FORWARD', |
| 'POP_JUMP_IF_TRUE', 'POP_JUMP_IF_FALSE', |
| 'JUMP_IF_TRUE_OR_POP', 'JUMP_IF_FALSE_OR_POP', |
| 'CALL_FUNCTION', 'CALL_FUNCTION_KW', 'CALL_METHOD', |
| 'BUILD_TUPLE', 'BUILD_LIST', 'BUILD_SET', 'BUILD_MAP', |
| 'BUILD_CONST_KEY_MAP', 'BUILD_STRING', 'BUILD_SLICE', |
| 'LIST_APPEND', 'SET_ADD', 'MAP_ADD', |
| 'POP_TOP', 'ROT_TWO', 'ROT_THREE', 'ROT_FOUR', |
| 'DUP_TOP', 'DUP_TOP_TWO', |
| 'RETURN_VALUE', 'GET_ITER', 'FOR_ITER', 'GET_YIELD_FROM_ITER', |
| 'NOP', 'MAKE_FUNCTION', 'SETUP_LOOP', 'POP_BLOCK', |
| 'SETUP_EXCEPT', 'SETUP_FINALLY', 'RAISE_VARARGS', |
| 'IMPORT_NAME', 'IMPORT_FROM', 'UNPACK_SEQUENCE', |
| |
| 'i32.const', 'i64.const', 'f32.const', 'f64.const', |
| 'i32.add', 'i32.sub', 'i32.mul', 'i32.div_s', 'i32.div_u', |
| 'i32.rem_s', 'i32.rem_u', 'i32.and', 'i32.or', 'i32.xor', |
| 'i32.shl', 'i32.shr_s', 'i32.shr_u', 'i32.rotl', 'i32.rotr', |
| 'i32.clz', 'i32.ctz', 'i32.popcnt', |
| 'i64.add', 'i64.sub', 'i64.mul', 'i64.div_s', |
| 'i64.and', 'i64.or', 'i64.xor', |
| 'f32.add', 'f32.sub', 'f32.mul', 'f32.div', |
| 'f32.sqrt', 'f32.min', 'f32.max', 'f32.abs', 'f32.neg', |
| 'f64.add', 'f64.sub', 'f64.mul', 'f64.div', |
| 'f64.sqrt', 'f64.min', 'f64.max', 'f64.abs', 'f64.neg', |
| 'i32.eqz', 'i32.eq', 'i32.ne', 'i32.lt_s', 'i32.lt_u', |
| 'i32.gt_s', 'i32.gt_u', 'i32.le_s', 'i32.le_u', |
| 'i32.ge_s', 'i32.ge_u', |
| 'i64.eqz', 'i64.eq', 'i64.ne', |
| 'f32.eq', 'f32.ne', 'f32.lt', 'f32.gt', |
| 'f64.eq', 'f64.ne', 'f64.lt', 'f64.gt', |
| 'i32.load', 'i64.load', 'f32.load', 'f64.load', |
| 'i32.store', 'i64.store', 'f32.store', 'f64.store', |
| 'i32.load8_s', 'i32.load8_u', 'i32.load16_s', 'i32.load16_u', |
| 'memory.size', 'memory.grow', |
| 'block', 'loop', 'if', 'else', 'end', |
| 'br', 'br_if', 'br_table', 'return', |
| 'call', 'call_indirect', |
| 'local.get', 'local.set', 'local.tee', |
| 'global.get', 'global.set', |
| 'i32.wrap_i64', 'i64.extend_i32_s', 'i64.extend_i32_u', |
| 'f32.convert_i32_s', 'f64.convert_i32_s', |
| 'i32.trunc_f32_s', 'i32.trunc_f64_s', |
| 'f32.demote_f64', 'f64.promote_f32', |
| 'i32.reinterpret_f32', 'f32.reinterpret_i32', |
| 'drop', 'select', 'nop', 'unreachable', |
| |
| 'PUSH1', 'PUSH2', 'PUSH32', 'POP', |
| 'DUP1', 'DUP2', 'DUP3', 'DUP4', |
| 'SWAP1', 'SWAP2', 'SWAP3', 'SWAP4', |
| 'ADD', 'MUL', 'SUB', 'DIV', 'SDIV', 'MOD', 'SMOD', |
| 'ADDMOD', 'MULMOD', 'EXP', 'SIGNEXTEND', |
| 'LT', 'GT', 'SLT', 'SGT', 'EQ', 'ISZERO', |
| 'AND', 'OR', 'XOR', 'NOT', 'BYTE', 'SHL', 'SHR', 'SAR', |
| 'SHA3', |
| 'MLOAD', 'MSTORE', 'MSTORE8', 'MSIZE', |
| 'SLOAD', 'SSTORE', |
| 'JUMP', 'JUMPI', 'JUMPDEST', 'STOP', 'RETURN', 'REVERT', |
| 'ADDRESS', 'BALANCE', 'ORIGIN', 'CALLER', 'CALLVALUE', |
| 'CALLDATALOAD', 'CALLDATASIZE', 'CALLDATACOPY', |
| 'CODESIZE', 'CODECOPY', 'GASPRICE', 'RETURNDATASIZE', |
| 'RETURNDATACOPY', 'BLOCKHASH', 'COINBASE', 'TIMESTAMP', |
| 'NUMBER', 'DIFFICULTY', 'GASLIMIT', 'CHAINID', 'SELFBALANCE', |
| 'GAS', |
| 'LOG0', 'LOG1', 'LOG2', 'LOG3', 'LOG4', |
| 'CALL', 'DELEGATECALL', 'STATICCALL', 'CREATE', 'CREATE2', |
| 'SELFDESTRUCT', |
| ] |
| |
| def _quantize_number(self, val: float) -> int: |
| """Quantize a number to a bucket index (0-127).""" |
| v = int(val) if val == int(val) else int(val) |
| |
| |
| if -32 <= v <= 31: |
| return v + 32 |
| |
| |
| if 32 <= v <= 255: |
| return 64 + min(27, (v - 32) // 8) |
| if -256 <= v < -32: |
| return 64 + min(27, (-v - 33) // 8) |
| |
| |
| if 256 <= abs(v) <= 4095: |
| return 92 + min(15, (abs(v) - 256) // 256) |
| |
| |
| if abs(v) > 4095: |
| import math |
| return 108 + min(19, int(math.log2(max(1, abs(v) / 4096)))) |
| |
| return 64 |
| |
| def _dequantize_number(self, bucket: int) -> int: |
| """Reverse quantization — approximate original value.""" |
| if 0 <= bucket <= 63: |
| return bucket - 32 |
| if 64 <= bucket <= 91: |
| return 32 + (bucket - 64) * 8 |
| if 92 <= bucket <= 107: |
| return 256 + (bucket - 92) * 256 |
| if 108 <= bucket <= 127: |
| return 4096 * (2 ** (bucket - 108)) |
| return 0 |
| |
| def encode_line(self, line: str) -> List[int]: |
| """Encode a complete bytecode line into token IDs. |
| |
| Input format: D{n}|{stack}|{opcode_sequence}|{task} |
| Returns: [BOS, ...token_ids..., EOS] |
| """ |
| tokens = [self.BOS] |
| |
| parts = line.split('|') |
| if len(parts) < 4: |
| return tokens + [self.EOS] |
| |
| |
| diff = parts[0].strip() |
| if diff in self._task_to_id: |
| tokens.append(self._task_to_id[diff]) |
| |
| |
| stack = parts[1].strip() |
| if stack in self._task_to_id: |
| tokens.append(self._task_to_id[stack]) |
| |
| |
| tokens.append(self._struct_to_id.get('|', self.STRUCT_START + 3)) |
| |
| |
| ops_str = parts[2].strip() |
| tokens.extend(self._encode_ops(ops_str)) |
| |
| |
| tokens.append(self._struct_to_id.get('|', self.STRUCT_START + 3)) |
| |
| |
| task_str = parts[3].strip() |
| task_parts = task_str.split(':', 1) |
| task_marker = task_parts[0] |
| if task_marker in self._task_to_id: |
| tokens.append(self._task_to_id[task_marker]) |
| |
| if len(task_parts) > 1: |
| task_content = task_parts[1].strip() |
| |
| if task_marker in ('NEXT', 'FILL'): |
| tokens.extend(self._encode_ops(task_content)) |
| elif task_marker == 'STACK': |
| try: |
| val = int(task_content) |
| bucket = self._quantize_number(val) |
| tokens.append(self.NUMERIC_START + bucket) |
| except ValueError: |
| pass |
| elif task_marker == 'EXEC': |
| if task_content in self._struct_to_id: |
| tokens.append(self._struct_to_id[task_content]) |
| |
| tokens.append(self.EOS) |
| return tokens |
| |
| def _encode_ops(self, ops_str: str) -> List[int]: |
| """Encode a space-separated sequence of opcodes with arguments.""" |
| tokens = [] |
| |
| pattern = re.compile(r'(\S+?)(?:\(([^)]*)\))?(?:\s|$)') |
| |
| for match in pattern.finditer(ops_str): |
| opcode = match.group(1) |
| args = match.group(2) |
| |
| |
| if opcode in self._opcode_to_id: |
| tokens.append(self._opcode_to_id[opcode]) |
| else: |
| |
| continue |
| |
| |
| if args: |
| for arg in args.split(','): |
| arg = arg.strip() |
| tokens.extend(self._encode_arg(arg)) |
| |
| return tokens |
| |
| def _encode_arg(self, arg: str) -> List[int]: |
| """Encode a single argument to token(s).""" |
| tokens = [] |
| |
| |
| if arg in self._compare_to_id: |
| return [self._compare_to_id[arg]] |
| |
| |
| if '=' in arg and not arg.startswith('0x'): |
| key, val = arg.split('=', 1) |
| key_tok = key + '=' |
| if key_tok in self._struct_to_id: |
| tokens.append(self._struct_to_id[key_tok]) |
| try: |
| v = int(val) |
| bucket = self._quantize_number(v) |
| tokens.append(self.NUMERIC_START + bucket) |
| except ValueError: |
| pass |
| return tokens |
| |
| |
| if arg.startswith('0x'): |
| try: |
| v = int(arg, 16) |
| bucket = self._quantize_number(v) |
| tokens.append(self.NUMERIC_START + bucket) |
| return tokens |
| except ValueError: |
| pass |
| |
| |
| try: |
| v = float(arg) |
| bucket = self._quantize_number(v) |
| tokens.append(self.NUMERIC_START + bucket) |
| return tokens |
| except ValueError: |
| pass |
| |
| |
| try: |
| v = int(arg) |
| if 0 <= v <= 31: |
| tokens.append(self.REGISTER_START + v) |
| return tokens |
| except ValueError: |
| pass |
| |
| return tokens |
| |
| def decode(self, token_ids: List[int]) -> str: |
| """Decode token IDs back to bytecode string (approximate).""" |
| parts = [] |
| for tid in token_ids: |
| if tid == self.BOS: |
| parts.append('<BOS>') |
| elif tid == self.EOS: |
| parts.append('<EOS>') |
| elif tid == self.PAD: |
| continue |
| elif tid in self._id_to_opcode: |
| parts.append(self._id_to_opcode[tid]) |
| elif self.NUMERIC_START <= tid < self.REGISTER_START: |
| bucket = tid - self.NUMERIC_START |
| val = self._dequantize_number(bucket) |
| parts.append(f"#{val}") |
| elif self.REGISTER_START <= tid < self.COMPARE_START: |
| reg = tid - self.REGISTER_START |
| parts.append(f"r{reg}") |
| else: |
| parts.append(f"?{tid}") |
| return ' '.join(parts) |
| |
| def stats(self) -> Dict[str, int]: |
| return { |
| 'vocab_size': self.vocab_size, |
| 'num_opcodes': self._num_opcodes, |
| 'num_comparisons': len(self._compare_to_id), |
| 'num_task_markers': len(self._task_to_id), |
| 'num_struct_tokens': len(self._struct_to_id), |
| 'numeric_buckets': 128, |
| 'register_slots': 32, |
| } |
|
|
|
|
| def main(): |
| """Test the tokenizer.""" |
| tok = BytecodeTokenizer() |
| print(f"BytecodeTokenizer initialized:") |
| for k, v in tok.stats().items(): |
| print(f" {k}: {v}") |
| |
| |
| test_lines = [ |
| "D1|python|LOAD_CONST(42) LOAD_CONST(10) BINARY_ADD RETURN_VALUE|NEXT:RETURN_VALUE", |
| "D2|wasm|local.get(1) i32.const(54) i32.lt_u if i32.const(60)|STACK:3", |
| "D3|evm|PUSH1(0x00) CALLDATALOAD PUSH2(0x3a8a) EQ PUSH1(0x29) JUMPI|NEXT:STOP", |
| ] |
| |
| print(f"\nEncoding tests:") |
| for line in test_lines: |
| ids = tok.encode_line(line) |
| decoded = tok.decode(ids) |
| print(f"\n Input: {line[:80]}...") |
| print(f" Tokens: {ids}") |
| print(f" Length: {len(ids)}") |
| print(f" Decoded: {decoded[:80]}...") |
| |
| |
| corpus_path = Path(__file__).parent.parent / 'corpus' / 'bytecode.txt' |
| if corpus_path.exists(): |
| print(f"\nCorpus encoding test ({corpus_path}):") |
| total = 0 |
| total_tokens = 0 |
| empty = 0 |
| with open(corpus_path) as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| ids = tok.encode_line(line) |
| total += 1 |
| total_tokens += len(ids) |
| if len(ids) <= 2: |
| empty += 1 |
| print(f" Lines: {total}") |
| print(f" Total tokens: {total_tokens}") |
| print(f" Avg tokens/line: {total_tokens / max(1, total):.1f}") |
| print(f" Empty (BOS+EOS only): {empty}") |
| print(f" Compression vs raw chars: {total_tokens / sum(len(l) for l in open(corpus_path)):.2f}x") |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|