| |
| |
| |
| |
| |
|
|
| """Utility tools that extracts DWARF information from a WebAssembly file. |
| |
| This script take the output produced by the LLVM tools, and encodes it as |
| a wasm source map. Additionally, it can collect original sources, change |
| files prefixes, and strip debug sections from a wasm file. |
| """ |
|
|
| import argparse |
| import json |
| import logging |
| import os |
| import re |
| import sys |
| from math import floor, log |
|
|
| __scriptdir__ = os.path.dirname(os.path.abspath(__file__)) |
| __rootdir__ = os.path.dirname(__scriptdir__) |
| sys.path.insert(0, __rootdir__) |
|
|
| from tools import shared, utils |
| from tools.system_libs import DETERMINISTIC_PREFIX |
|
|
| LLVM_CXXFILT = shared.llvm_tool_path('llvm-cxxfilt') |
|
|
| EMSCRIPTEN_PREFIX = utils.normalize_path(utils.path_from_root()) |
|
|
| logger = logging.getLogger('wasm-sourcemap') |
|
|
| |
| generate_scopes = False |
|
|
|
|
| def parse_args(args): |
| parser = argparse.ArgumentParser(prog='wasm-sourcemap.py', description=__doc__) |
| parser.add_argument('wasm', help='wasm file') |
| parser.add_argument('-o', '--output', help='output source map') |
| parser.add_argument('-p', '--prefix', nargs='*', help='replace source debug filename prefix for source map', default=[]) |
| parser.add_argument('-s', '--sources', action='store_true', help='read and embed source files from file system into source map') |
| parser.add_argument('-l', '--load-prefix', nargs='*', help='replace source debug filename prefix for reading sources from file system (see also --sources)', default=[]) |
| parser.add_argument('-w', nargs='?', help='set output wasm file') |
| parser.add_argument('-x', '--strip', action='store_true', help='removes debug and linking sections') |
| parser.add_argument('-u', '--source-map-url', nargs='?', help='specifies sourceMappingURL section content') |
| parser.add_argument('--dwarfdump', help="path to llvm-dwarfdump executable") |
| parser.add_argument('--dwarfdump-output', nargs='?', help=argparse.SUPPRESS) |
| parser.add_argument('--basepath', help='base path for source files, which will be relative to this') |
| return parser.parse_args(args) |
|
|
|
|
| class Prefixes: |
| def __init__(self, args, base_path=None, preserve_deterministic_prefix=True): |
| prefixes = [] |
| for p in args: |
| if '=' in p: |
| prefix, replacement = p.split('=') |
| prefixes.append({'prefix': utils.normalize_path(prefix), 'replacement': replacement}) |
| else: |
| prefixes.append({'prefix': utils.normalize_path(p), 'replacement': ''}) |
| self.base_path = utils.normalize_path(base_path) if base_path is not None else None |
| self.preserve_deterministic_prefix = preserve_deterministic_prefix |
| self.prefixes = prefixes |
| self.cache = {} |
|
|
| def resolve(self, name): |
| if name in self.cache: |
| return self.cache[name] |
|
|
| source = name |
| if not self.preserve_deterministic_prefix and name.startswith(DETERMINISTIC_PREFIX): |
| source = EMSCRIPTEN_PREFIX + name.removeprefix(DETERMINISTIC_PREFIX) |
|
|
| provided = False |
| for p in self.prefixes: |
| if source.startswith(p['prefix']): |
| source = p['replacement'] + source.removeprefix(p['prefix']) |
| provided = True |
| break |
|
|
| |
| |
| |
| |
| |
| if not (source.startswith(DETERMINISTIC_PREFIX) or provided or self.base_path is None): |
| try: |
| source = os.path.relpath(source, self.base_path) |
| except ValueError: |
| source = os.path.abspath(source) |
| source = utils.normalize_path(source) |
|
|
| self.cache[name] = source |
| return source |
|
|
|
|
| |
| |
| |
| class SourceMapPrefixes: |
| def __init__(self, sources, load, base_path): |
| self.sources = Prefixes(sources, base_path=base_path) |
| self.load = Prefixes(load, preserve_deterministic_prefix=False) |
|
|
|
|
| def encode_vlq(n): |
| VLQ_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" |
| x = (n << 1) if n >= 0 else ((-n << 1) + 1) |
| result = "" |
| while x > 31: |
| result += VLQ_CHARS[32 + (x & 31)] |
| x >>= 5 |
| return result + VLQ_CHARS[x] |
|
|
|
|
| def read_var_uint(wasm, pos): |
| n = 0 |
| shift = 0 |
| b = ord(wasm[pos:pos + 1]) |
| pos += 1 |
| while b >= 128: |
| n |= ((b - 128) << shift) |
| b = ord(wasm[pos:pos + 1]) |
| pos += 1 |
| shift += 7 |
| return n + (b << shift), pos |
|
|
|
|
| def strip_debug_sections(wasm): |
| logger.debug('Strip debug sections') |
| pos = 8 |
| stripped = wasm[:pos] |
|
|
| while pos < len(wasm): |
| section_start = pos |
| section_id, pos_ = read_var_uint(wasm, pos) |
| section_size, section_body = read_var_uint(wasm, pos_) |
| pos = section_body + section_size |
| if section_id == 0: |
| name_len, name_pos = read_var_uint(wasm, section_body) |
| name_end = name_pos + name_len |
| name = str(wasm[name_pos:name_end]) |
| if name in {'linking', 'sourceMappingURL'} or name.startswith(('reloc..debug_', '.debug_')): |
| continue |
| stripped += wasm[section_start:pos] |
|
|
| return stripped |
|
|
|
|
| def encode_uint_var(n): |
| result = bytearray() |
| while n > 127: |
| result.append(128 | (n & 127)) |
| n >>= 7 |
| result.append(n) |
| return bytes(result) |
|
|
|
|
| def append_source_mapping(wasm, url): |
| logger.debug('Append sourceMappingURL section') |
| section_name = "sourceMappingURL" |
| section_content = encode_uint_var(len(section_name)) + section_name.encode() + encode_uint_var(len(url)) + url.encode() |
| return wasm + encode_uint_var(0) + encode_uint_var(len(section_content)) + section_content |
|
|
|
|
| def get_code_section_offset(wasm): |
| logger.debug('Read sections index') |
| pos = 8 |
|
|
| while pos < len(wasm): |
| section_id, pos_ = read_var_uint(wasm, pos) |
| section_size, pos = read_var_uint(wasm, pos_) |
| if section_id == 10: |
| return pos |
| pos += section_size |
|
|
|
|
| def remove_dead_entries(entries): |
| |
| |
| block_start = 0 |
| cur_entry = 0 |
| while cur_entry < len(entries): |
| if not entries[cur_entry]['eos']: |
| cur_entry += 1 |
| continue |
| fn_start = entries[block_start]['address'] |
| |
| fn_size_length = floor(log(entries[cur_entry]['address'] - fn_start + 1, 128)) + 1 |
| min_live_offset = 1 + fn_size_length |
| if fn_start < min_live_offset: |
| |
| del entries[block_start:cur_entry + 1] |
| cur_entry = block_start |
| continue |
| cur_entry += 1 |
| block_start = cur_entry |
|
|
|
|
| |
| |
| def decode_octal_encoded_utf8(str): |
| out = bytearray(len(str)) |
| i = 0 |
| o = 0 |
| final_length = len(str) |
| in_escape = False |
| while i < len(str): |
| if not in_escape and str[i] == '\\' and (str[i + 1] == '2' or str[i + 1] == '3'): |
| out[o] = int(str[i + 1:i + 4], 8) |
| i += 4 |
| final_length -= 3 |
| in_escape = False |
| else: |
| out[o] = ord(str[i]) |
| in_escape = False if in_escape else (str[i] == '\\') |
| i += 1 |
| o += 1 |
| return out[:final_length].decode('utf-8') |
|
|
|
|
| def extract_comp_dir_map(text): |
| compile_unit_pattern = re.compile(r"0x[0-9a-f]*: DW_TAG_compile_unit") |
| stmt_list_pattern = re.compile(r"DW_AT_stmt_list\s+\((0x[0-9a-f]*)\)") |
| comp_dir_pattern = re.compile(r"DW_AT_comp_dir\s+\(\"([^\"]+)\"\)") |
|
|
| map_stmt_list_to_comp_dir = {} |
| iterator = compile_unit_pattern.finditer(text) |
| current_match = next(iterator, None) |
|
|
| while current_match: |
| next_match = next(iterator, None) |
| start = current_match.end() |
| end = next_match.start() if next_match else len(text) |
|
|
| stmt_list_match = stmt_list_pattern.search(text, start, end) |
| if stmt_list_match is not None: |
| stmt_list = stmt_list_match.group(1) |
| comp_dir_match = comp_dir_pattern.search(text, start, end) |
| comp_dir = decode_octal_encoded_utf8(comp_dir_match.group(1)) if comp_dir_match is not None else '' |
| map_stmt_list_to_comp_dir[stmt_list] = comp_dir |
|
|
| current_match = next_match |
|
|
| return map_stmt_list_to_comp_dir |
|
|
|
|
| def demangle_names(names): |
| |
| mangled_names = sorted({n for n in names if n.startswith('_Z')}) |
| if not mangled_names: |
| return {} |
| if not os.path.exists(LLVM_CXXFILT): |
| logger.warning('llvm-cxxfilt does not exist') |
| return {} |
|
|
| |
| input_str = '\n'.join(mangled_names) |
| proc = shared.check_call([LLVM_CXXFILT], input=input_str, stdout=shared.PIPE, stderr=shared.PIPE, text=True) |
| if proc.returncode != 0: |
| logger.warning('llvm-cxxfilt failed: %s' % proc.stderr) |
| return {} |
|
|
| demangled_list = proc.stdout.splitlines() |
| if len(demangled_list) != len(mangled_names): |
| logger.warning('llvm-cxxfilt output length mismatch') |
| return {} |
|
|
| return dict(zip(mangled_names, demangled_list, strict=True)) |
|
|
|
|
| class FuncRange: |
| def __init__(self, name, low_pc, high_pc): |
| self.name = name |
| self.low_pc = low_pc |
| self.high_pc = high_pc |
|
|
|
|
| |
| |
| |
| def extract_func_ranges(text): |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| next_tag_pattern = re.compile(r'\n0x[0-9a-f]+:') |
| |
| func_pattern = re.compile(r'DW_TAG_(?:subprogram|inlined_subroutine)') |
|
|
| low_pc_pattern = re.compile(r'DW_AT_low_pc\s+\(0x([0-9a-f]+)\)') |
| high_pc_pattern = re.compile(r'DW_AT_high_pc\s+\(0x([0-9a-f]+)\)') |
| abstract_origin_pattern = re.compile(r'DW_AT_abstract_origin\s+\(0x[0-9a-f]+\s+"([^"]+)"\)') |
| linkage_name_pattern = re.compile(r'DW_AT_linkage_name\s+\("([^"]+)"\)') |
| name_pattern = re.compile(r'DW_AT_name\s+\("([^"]+)"\)') |
| specification_pattern = re.compile(r'DW_AT_specification\s+\(0x[0-9a-f]+\s+"([^"]+)"\)') |
|
|
| def get_name_from_tag(start, end): |
| m = linkage_name_pattern.search(text, start, end) |
| if m: |
| return m.group(1) |
| m = name_pattern.search(text, start, end) |
| if m: |
| return m.group(1) |
| |
| m = specification_pattern.search(text, start, end) |
| if m: |
| return m.group(1) |
| return None |
|
|
| func_ranges = [] |
| for match in func_pattern.finditer(text): |
| |
| |
| search_start = match.end() |
|
|
| |
| m_next = next_tag_pattern.search(text, search_start) |
| search_end = m_next.start() if m_next else len(text) |
|
|
| name = None |
| low_pc = None |
| high_pc = None |
| m = low_pc_pattern.search(text, search_start, search_end) |
| if m: |
| low_pc = int(m.group(1), 16) |
| m = high_pc_pattern.search(text, search_start, search_end) |
| if m: |
| high_pc = int(m.group(1), 16) |
|
|
| if 'DW_TAG_subprogram' in match.group(0): |
| name = get_name_from_tag(search_start, search_end) |
| else: |
| m = abstract_origin_pattern.search(text, search_start, search_end) |
| if m: |
| name = m.group(1) |
|
|
| if name and low_pc is not None and high_pc is not None: |
| func_ranges.append(FuncRange(name, low_pc, high_pc)) |
|
|
| |
| all_names = [item.name for item in func_ranges] |
| demangled_map = demangle_names(all_names) |
| for func_range in func_ranges: |
| if func_range.name in demangled_map: |
| func_range.name = demangled_map[func_range.name] |
|
|
| |
| |
| |
| |
| func_ranges.sort(key=lambda item: (item.low_pc, -item.high_pc)) |
| return func_ranges |
|
|
|
|
| def read_dwarf_info(wasm, options): |
| if options.dwarfdump_output: |
| output = utils.read_file(options.dwarfdump_output) |
| elif options.dwarfdump: |
| logger.debug('Reading DWARF information from %s' % wasm) |
| if not os.path.exists(options.dwarfdump): |
| utils.exit_with_error('llvm-dwarfdump not found: ' + options.dwarfdump) |
| dwarfdump_cmd = [options.dwarfdump, '-debug-info', '-debug-line', wasm] |
| if generate_scopes: |
| |
| |
| |
| dwarfdump_cmd += ['-t', 'DW_TAG_compile_unit', '-t', 'DW_TAG_subprogram', |
| '-t', 'DW_TAG_inlined_subroutine'] |
| else: |
| |
| |
| dwarfdump_cmd += ['--recurse-depth=0'] |
| proc = shared.check_call(dwarfdump_cmd, stdout=shared.PIPE) |
| output = proc.stdout |
| else: |
| utils.exit_with_error('Please specify either --dwarfdump or --dwarfdump-output') |
|
|
| debug_line_pattern = re.compile(r"debug_line\[(0x[0-9a-f]*)\]") |
| include_dir_pattern = re.compile(r"include_directories\[\s*(\d+)\] = \"([^\"]*)") |
| file_pattern = re.compile(r"file_names\[\s*(\d+)\]:\s+name: \"([^\"]*)\"\s+dir_index: (\d+)") |
| line_pattern = re.compile(r"\n0x([0-9a-f]+)\s+(\d+)\s+(\d+)\s+(\d+)(.*?end_sequence)?") |
|
|
| entries = [] |
| iterator = debug_line_pattern.finditer(output) |
| current_match = None |
| try: |
| current_match = next(iterator) |
| debug_info_end = current_match.start() |
| except StopIteration: |
| debug_info_end = len(output) |
|
|
| debug_info = output[:debug_info_end] |
| map_stmt_list_to_comp_dir = extract_comp_dir_map(debug_info) |
|
|
| while current_match: |
| next_match = next(iterator, None) |
|
|
| stmt_list = current_match.group(1) |
| start = current_match.end() |
| end = next_match.start() if next_match else len(output) |
|
|
| comp_dir = map_stmt_list_to_comp_dir.get(stmt_list, '') |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| include_directories = {'0': comp_dir} |
| for dir in include_dir_pattern.finditer(output, start, end): |
| include_directories[dir.group(1)] = os.path.join(comp_dir, decode_octal_encoded_utf8(dir.group(2))) |
|
|
| files = {} |
| for file in file_pattern.finditer(output, start, end): |
| dir = include_directories[file.group(3)] |
| file_path = os.path.join(dir, decode_octal_encoded_utf8(file.group(2))) |
| files[file.group(1)] = file_path |
|
|
| for line in line_pattern.finditer(output, start, end): |
| entry = {'address': int(line.group(1), 16), 'line': int(line.group(2)), 'column': int(line.group(3)), 'file': files[line.group(4)], 'eos': line.group(5) is not None} |
| if not entry['eos']: |
| entries.append(entry) |
| else: |
| |
| entry['address'] -= 1 |
| if entries[-1]['address'] == entry['address']: |
| |
| entries[-1]['eos'] = True |
| else: |
| entries.append(entry) |
|
|
| current_match = next_match |
|
|
| remove_dead_entries(entries) |
|
|
| |
| entries = sorted(entries, key=lambda entry: entry['address']) |
|
|
| if generate_scopes: |
| func_ranges = extract_func_ranges(debug_info) |
| else: |
| func_ranges = [] |
| return entries, func_ranges |
|
|
|
|
| def build_sourcemap(entries, func_ranges, code_section_offset, options): |
| base_path = options.basepath |
| collect_sources = options.sources |
| prefixes = SourceMapPrefixes(options.prefix, options.load_prefix, base_path) |
|
|
| |
| for func_range in func_ranges: |
| func_range.low_pc += code_section_offset |
| func_range.high_pc += code_section_offset |
|
|
| sources = [] |
| sources_content = [] |
| |
| |
| |
| |
| names = sorted({item.name for item in func_ranges}) |
| name_to_id = {name: i for i, name in enumerate(names)} |
| mappings = [] |
| sources_map = {} |
| last_address = 0 |
| last_source_id = 0 |
| last_line = 1 |
| last_column = 1 |
| last_func_id = 0 |
|
|
| active_funcs = [] |
| next_func_range_id = 0 |
|
|
| |
| def get_function_id(address): |
| nonlocal active_funcs |
| nonlocal next_func_range_id |
|
|
| |
| |
| |
| |
| while next_func_range_id < len(func_ranges) and func_ranges[next_func_range_id].low_pc <= address: |
| |
| active_funcs.append((func_ranges[next_func_range_id].high_pc, next_func_range_id)) |
| next_func_range_id += 1 |
| active_funcs = [f for f in active_funcs if f[0] > address] |
|
|
| if active_funcs: |
| func_range_id = active_funcs[-1][1] |
| name = func_ranges[func_range_id].name |
| return name_to_id[name] |
| return None |
|
|
| for entry in entries: |
| line = entry['line'] |
| column = entry['column'] |
| |
| if line == 0: |
| continue |
| |
| if column == 0: |
| column = 1 |
|
|
| address = entry['address'] + code_section_offset |
| file_name = utils.normalize_path(entry['file']) |
| source_name = prefixes.sources.resolve(file_name) |
|
|
| if source_name not in sources_map: |
| source_id = len(sources) |
| sources_map[source_name] = source_id |
| sources.append(source_name) |
| if collect_sources: |
| load_name = prefixes.load.resolve(file_name) |
| try: |
| source_content = utils.read_file(load_name) |
| sources_content.append(source_content) |
| except OSError: |
| print('Failed to read source: %s' % load_name) |
| sources_content.append(None) |
| else: |
| source_id = sources_map[source_name] |
| func_id = get_function_id(address) |
|
|
| address_delta = address - last_address |
| source_id_delta = source_id - last_source_id |
| line_delta = line - last_line |
| column_delta = column - last_column |
| last_address = address |
| last_source_id = source_id |
| last_line = line |
| last_column = column |
| mapping = encode_vlq(address_delta) + encode_vlq(source_id_delta) + encode_vlq(line_delta) + encode_vlq(column_delta) |
| if func_id is not None: |
| func_id_delta = func_id - last_func_id |
| last_func_id = func_id |
| mapping += encode_vlq(func_id_delta) |
| mappings.append(mapping) |
|
|
| return {'version': 3, |
| 'sources': sources, |
| 'sourcesContent': sources_content, |
| 'names': names, |
| 'mappings': ','.join(mappings)} |
|
|
|
|
| def main(args): |
| options = parse_args(args) |
|
|
| wasm_input = options.wasm |
| with open(wasm_input, 'rb') as infile: |
| wasm = infile.read() |
|
|
| entries, func_ranges = read_dwarf_info(wasm_input, options) |
|
|
| code_section_offset = get_code_section_offset(wasm) |
|
|
| logger.debug('Saving to %s' % options.output) |
| map = build_sourcemap(entries, func_ranges, code_section_offset, options) |
| with open(options.output, 'w', encoding='utf-8') as outfile: |
| json.dump(map, outfile, separators=(',', ':'), ensure_ascii=False) |
|
|
| if options.strip: |
| wasm = strip_debug_sections(wasm) |
|
|
| if options.source_map_url: |
| wasm = append_source_mapping(wasm, options.source_map_url) |
|
|
| if options.w: |
| logger.debug('Saving wasm to %s' % options.w) |
| with open(options.w, 'wb') as outfile: |
| outfile.write(wasm) |
|
|
| logger.debug('Done') |
| return 0 |
|
|
|
|
| if __name__ == '__main__': |
| logging.basicConfig(level=logging.DEBUG if os.environ.get('EMCC_DEBUG') else logging.INFO) |
| sys.exit(main(sys.argv[1:])) |
|
|