Neon-tech commited on
Commit
c5bb1b8
Β·
verified Β·
1 Parent(s): 991345e

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +91 -0
app.py ADDED
@@ -0,0 +1,91 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import re
4
+ import gc
5
+ from pathlib import Path
6
+ import pyarrow.parquet as pq
7
+ import pyarrow as pa
8
+
9
+ # ── Config ───────────────────────────────────────────────────────────────────
10
+ RAW_DIR = "/data/raw"
11
+ STATE_FILE = "/data/state.json"
12
+ ROWS_PER_CHUNK = 50_000
13
+
14
+ def friendly_name(path):
15
+ m = re.search(r"CC-MAIN-(\d{4}-\d+)/\d+_(\d+)\.parquet", str(path))
16
+ if m:
17
+ return f"cc{m.group(1)}_{int(m.group(2)):06d}.parquet"
18
+ return Path(path).name
19
+
20
+ def split_file(raw_path):
21
+ name = raw_path.name
22
+ print(f" Splitting: {name}")
23
+
24
+ try:
25
+ pf = pq.ParquetFile(raw_path)
26
+ except Exception as e:
27
+ print(f" βœ— Corrupt, skipping: {name} β€” {e}")
28
+ return []
29
+
30
+ chunk_paths = []
31
+ chunk_idx = 0
32
+ current = []
33
+
34
+ for batch in pf.iter_batches(batch_size=10_000, columns=["text"]):
35
+ current.append(batch)
36
+ if sum(len(b) for b in current) >= ROWS_PER_CHUNK:
37
+ chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
38
+ chunk_path = Path(RAW_DIR) / chunk_name
39
+ table = pa.Table.from_batches(current)
40
+ pq.write_table(table, chunk_path)
41
+ print(f" βœ“ {chunk_name} ({len(table):,} rows)")
42
+ chunk_paths.append(chunk_name)
43
+ chunk_idx += 1
44
+ current = []
45
+ del table
46
+ gc.collect()
47
+
48
+ if current:
49
+ chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
50
+ chunk_path = Path(RAW_DIR) / chunk_name
51
+ table = pa.Table.from_batches(current)
52
+ pq.write_table(table, chunk_path)
53
+ print(f" βœ“ {chunk_name} ({len(table):,} rows)")
54
+ chunk_paths.append(chunk_name)
55
+ del table
56
+ gc.collect()
57
+
58
+ return chunk_paths
59
+
60
+ # ── Main ──────────────────────────────────────────────────────────────────────
61
+ raw_files = [f for f in Path(RAW_DIR).glob("*.parquet") if "_chunk" not in f.name]
62
+ print(f"Found {len(raw_files)} unsplit files in /data/raw")
63
+
64
+ state = {"shards": {}, "queue": []}
65
+
66
+ for raw_path in sorted(raw_files):
67
+ chunk_names = split_file(raw_path)
68
+ if not chunk_names:
69
+ continue
70
+
71
+ # delete the big original
72
+ raw_path.unlink()
73
+ print(f" πŸ—‘ Deleted original: {raw_path.name}")
74
+
75
+ for chunk_name in chunk_names:
76
+ state["shards"][chunk_name] = {
77
+ "status": "pending",
78
+ "hf_path": str(raw_path.name),
79
+ "worker": None,
80
+ "claimed_at": None,
81
+ "error": None,
82
+ "retries": 0,
83
+ }
84
+
85
+ tmp = STATE_FILE + ".tmp"
86
+ with open(tmp, "w") as f:
87
+ json.dump(state, f, indent=2)
88
+ os.replace(tmp, STATE_FILE)
89
+
90
+ total = len(state["shards"])
91
+ print(f"\nβœ“ Done β€” {total} chunks registered in state.json")