File size: 7,055 Bytes
72c0672 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | import json
import base64
import argparse
import os
import re
import sys
# --- 从你提供的源代码中复制的关键函数 ---
def vread(buf: bytes, i: int):
shift = val = 0
while True:
if i >= len(buf):
raise IndexError("Buffer exhausted during vread")
b = buf[i]
i += 1
val |= (b & 0x7F) << shift
if b < 0x80:
return val, i
shift += 7
def decompress_windows_starts_lens(b64_stream: str) -> tuple[list[int], list[int]]:
try:
buf = base64.b64decode(b64_stream)
i = 0
cursor= 0
starts, lens = [], []
while i < len(buf):
gap, i = vread(buf, i)
size, i = vread(buf, i)
start = cursor + gap
length = size
starts.append(start)
lens.append(length)
cursor = start + length
return starts, lens
except (base64.binascii.Error, IndexError) as e:
print(f" [解码窗口时出错: {e}]")
return [], []
def packed_bytes_to_pseudo(b: bytes) -> list[int]:
# 来自你的源代码的 9-bit 解包器
out, acc, bits = [], 0, 0
for byte in b:
acc |= byte << bits
bits += 8
while bits >= 9:
out.append(acc & 0x1FF)
acc >>= 9
bits -= 9
# 注意:你的原始代码没有处理尾部不足9bit的情况,这里我们保持一致
return out
# --- 参数解析和 Key 构建 ---
def parse_parameters_from_path(path_name: str) -> dict:
params = {}
base_name = os.path.basename(os.path.normpath(path_name))
parts = base_name.split('_')
for part in parts:
if '-' in part:
key, value = part.split('-', 1)
params[key.lower()] = value.lower()
else:
match = re.match(r'([a-zA-Z]+)(\d+)', part)
if match:
key, value = match.groups()
params[key.lower()] = value
return params
def construct_compression_key(params: dict) -> str:
ow = params.get('ow', '20')
escape_fb = 'True' if params.get('escapefb', 'false') == 'true' else 'False'
iterative = 'True' if params.get('iterative', 'false') == 'true' else 'False'
force_padding = 'True' if params.get('forcepadding', 'false') == 'true' else 'False'
key = f"m1_ac_ow{ow}_escapefb-{escape_fb}_iterative-{iterative}_forcepadding-{force_padding}"
return key
# --- 主调试函数 ---
def debug_line(args):
print(f"--- 开始调试: 文件夹 '{args.input_dir}', 行号 {args.line_number} ---")
# 1. 找到对应的文件和行
target_line = args.line_number
current_line_count = 0
line_content = None
jsonl_files = sorted([os.path.join(r, f) for r, _, fs in os.walk(args.input_dir) for f in fs if f.endswith('.jsonl')])
if not jsonl_files:
print(f"❌ 错误: 文件夹中没有 .jsonl 文件。")
return
for file_path in jsonl_files:
with open(file_path, 'r', errors='ignore') as f:
for line in f:
current_line_count += 1
if current_line_count == target_line:
line_content = line
print(f"✅ 找到了第 {target_line} 行,位于文件: {file_path}")
break
if line_content:
break
if not line_content:
print(f"❌ 错误: 未能找到第 {target_line} 行 (总共扫描了 {current_line_count} 行)。")
return
# 2. 解析和 Key 构建
params = parse_parameters_from_path(args.input_dir)
compression_key = construct_compression_key(params)
print(f"\n[步骤 A] 构建的压缩 Key: '{compression_key}'")
try:
data = json.loads(line_content)
print(" -> JSON 加载成功。")
if compression_key not in data:
print(f" -> ❌ 错误: 构建的 Key 不在 JSON 对象中!")
print(f" JSON 中的可用 Keys: {list(data.keys())}")
return
print(" -> ✅ Key 匹配成功!")
except json.JSONDecodeError as e:
print(f"❌ 错误: JSON 解码失败: {e}")
return
# 3. 解码 windows_starts_lens_b64
print("\n[步骤 B] 解码 'windows_starts_lens_b64'")
b64_windows = data.get('windows_starts_lens_b64', '')
print(f" -> 输入的 Base64 (前64字节): '{b64_windows[:64]}...'")
starts, lens = decompress_windows_starts_lens(b64_windows)
print(f" -> 解码结果: 共有 {len(starts)} 个窗口。")
if starts:
print(f" -> 前 5 个窗口 (start, length): {list(zip(starts, lens))[:5]}")
# 4. 解码压缩数据
print(f"\n[步骤 C] 解码压缩数据字段 '{compression_key}'")
b64_compressed = data.get(compression_key, '')
print(f" -> 输入的 Base64 (前64字节): '{b64_compressed[:64]}...'")
try:
decoded_bytes = base64.b64decode(b64_compressed)
print(f" -> Base64 解码后的字节长度: {len(decoded_bytes)}")
mixed_pseudo_bytes = packed_bytes_to_pseudo(decoded_bytes)
print(f" -> `packed_bytes_to_pseudo` 输出的总元素数量: {len(mixed_pseudo_bytes)}")
print(f" -> 前 20 个元素: {mixed_pseudo_bytes[:20]}")
pseudo_tokens = [t for t in mixed_pseudo_bytes if t >= 256]
print(f" -> 过滤后 (>= 256) 的压缩 Token 数量: {len(pseudo_tokens)}")
print(f" -> 前 20 个压缩 Token: {pseudo_tokens[:20]}")
except Exception as e:
print(f" -> ❌ 在此步骤中发生错误: {e}")
return
# 5. 最终诊断
print("\n" + "="*20 + " 最终诊断 " + "="*20)
print(f"窗口数量 (来自 windows_starts_lens_b64): {len(starts)}")
print(f"压缩 Token 数量 (来自 {compression_key}): {len(pseudo_tokens)}")
if len(starts) == len(pseudo_tokens):
print("\n🟢 结论: 长度匹配!之前的脚本可能存在其他问题。")
else:
print("\n🔴 结论: 长度不匹配!这是导致100%失败的根本原因。")
print(" 这表明数据生成逻辑与我们的解析逻辑之间存在根本性的不一致。")
print(" 可能的原因:")
print(" 1. `windows_starts_lens_b64` 可能不包含所有压缩块的信息(例如,跳过了某些小块)。")
print(" 2. 最终的压缩流中可能包含了一些不对应于 `windows` 的特殊符号。")
print(" 3. `packed_bytes_to_pseudo` 的行为可能比我们想象的更复杂。")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="调试单行压缩数据,以找出长度不匹配的根本原因。",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument("input_dir", type=str, help="包含 .jsonl 数据文件的输入文件夹路径。")
parser.add_argument("--line_number", type=int, required=True, help="要检查的具体行号 (从1开始)。")
args = parser.parse_args()
debug_line(args)
|