yuccaaa commited on
Commit
b776e9e
·
verified ·
1 Parent(s): f6ec1d8

Upload deal_data/qiege.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. deal_data/qiege.py +74 -0
deal_data/qiege.py ADDED
@@ -0,0 +1,74 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ from transformers import AutoTokenizer
3
+ from concurrent.futures import ProcessPoolExecutor, as_completed
4
+ import multiprocessing
5
+ from tqdm import tqdm
6
+
7
+ # 本地模型路径
8
+ local_model_path = "/nas/shared/kilab/hf-hub/Qwen3-32B"
9
+
10
+ # 主进程先加载一个 tokenizer,用于估算总token数量(可选)
11
+ tokenizer = AutoTokenizer.from_pretrained(local_model_path, trust_remote_code=True)
12
+
13
+ def init_tokenizer():
14
+ """为每个子进程加载 tokenizer"""
15
+ global tokenizer_worker
16
+ tokenizer_worker = AutoTokenizer.from_pretrained(local_model_path, trust_remote_code=True)
17
+
18
+ def process_line(line):
19
+ """处理每一行:JSON解析 + 分割过长文本"""
20
+ global tokenizer_worker
21
+ try:
22
+ data = json.loads(line.strip())
23
+ if 'content' in data:
24
+ output_text = data["content"]
25
+ tokens = tokenizer_worker.encode(output_text)
26
+ if len(tokens) > 4096:
27
+ chunks = []
28
+ current_chunk = []
29
+ for token in tokens:
30
+ if len(current_chunk) + 1 > 4096:
31
+ chunks.append(tokenizer_worker.decode(current_chunk))
32
+ current_chunk = [token]
33
+ else:
34
+ current_chunk.append(token)
35
+ if current_chunk:
36
+ chunks.append(tokenizer_worker.decode(current_chunk))
37
+ return [json.dumps({"content": chunk}, ensure_ascii=False) for chunk in chunks]
38
+ else:
39
+ return [json.dumps({"content": output_text}, ensure_ascii=False)]
40
+ else:
41
+ return None
42
+ except Exception:
43
+ return None
44
+
45
+ # 输入输出路径
46
+ input_file = '/nas/shared/kilab/wangyujia/pretrain_data/cot/clean/merge_cot.jsonl'
47
+ output_file = '/nas/shared/kilab/wangyujia/pretrain_data/cot/clean/merge_cot_new.jsonl'
48
+
49
+ if __name__ == '__main__':
50
+ try:
51
+ with open(input_file, 'r', encoding='utf-8') as infile:
52
+ lines = infile.readlines()
53
+
54
+ total_lines = len(lines)
55
+
56
+ with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count(), initializer=init_tokenizer) as executor:
57
+ futures = [executor.submit(process_line, line) for line in lines]
58
+
59
+ with open(output_file, 'w', encoding='utf-8') as outfile, tqdm(total=total_lines, desc="处理进度") as pbar:
60
+ for future in as_completed(futures):
61
+ result = future.result()
62
+ if result:
63
+ for r in result:
64
+ outfile.write(r + '\n')
65
+ pbar.update(1)
66
+
67
+ print(f"\n✅ 处理完成!共处理 {total_lines} 行,输出保存至 {output_file}")
68
+
69
+ except FileNotFoundError:
70
+ print(f"❌ 文件 {input_file} 未找到。")
71
+ except Exception as e:
72
+ print(f"❌ 发生错误: {e}")
73
+ import traceback
74
+ traceback.print_exc()