Neon-tech commited on
Commit
e80aacb
Β·
verified Β·
1 Parent(s): c5bb1b8

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +149 -42
app.py CHANGED
@@ -2,20 +2,48 @@ import os
2
  import json
3
  import re
4
  import gc
 
5
  from pathlib import Path
6
  import pyarrow.parquet as pq
7
  import pyarrow as pa
8
 
9
  # ── Config ───────────────────────────────────────────────────────────────────
10
- RAW_DIR = "/data/raw"
11
- STATE_FILE = "/data/state.json"
 
 
12
  ROWS_PER_CHUNK = 50_000
13
 
14
- def friendly_name(path):
15
- m = re.search(r"CC-MAIN-(\d{4}-\d+)/\d+_(\d+)\.parquet", str(path))
16
  if m:
17
- return f"cc{m.group(1)}_{int(m.group(2)):06d}.parquet"
18
- return Path(path).name
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
  def split_file(raw_path):
21
  name = raw_path.name
@@ -24,26 +52,34 @@ def split_file(raw_path):
24
  try:
25
  pf = pq.ParquetFile(raw_path)
26
  except Exception as e:
27
- print(f" βœ— Corrupt, skipping: {name} β€” {e}")
28
- return []
 
29
 
30
  chunk_paths = []
31
  chunk_idx = 0
32
  current = []
33
 
34
- for batch in pf.iter_batches(batch_size=10_000, columns=["text"]):
35
- current.append(batch)
36
- if sum(len(b) for b in current) >= ROWS_PER_CHUNK:
37
- chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
38
- chunk_path = Path(RAW_DIR) / chunk_name
39
- table = pa.Table.from_batches(current)
40
- pq.write_table(table, chunk_path)
41
- print(f" βœ“ {chunk_name} ({len(table):,} rows)")
42
- chunk_paths.append(chunk_name)
43
- chunk_idx += 1
44
- current = []
45
- del table
46
- gc.collect()
 
 
 
 
 
 
 
47
 
48
  if current:
49
  chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
@@ -58,34 +94,105 @@ def split_file(raw_path):
58
  return chunk_paths
59
 
60
  # ── Main ──────────────────────────────────────────────────────────────────────
61
- raw_files = [f for f in Path(RAW_DIR).glob("*.parquet") if "_chunk" not in f.name]
62
  print(f"Found {len(raw_files)} unsplit files in /data/raw")
63
 
64
- state = {"shards": {}, "queue": []}
 
 
 
 
 
 
 
65
 
66
- for raw_path in sorted(raw_files):
67
- chunk_names = split_file(raw_path)
68
- if not chunk_names:
 
 
 
69
  continue
 
 
 
 
 
70
 
71
- # delete the big original
72
- raw_path.unlink()
73
- print(f" πŸ—‘ Deleted original: {raw_path.name}")
74
-
75
- for chunk_name in chunk_names:
76
- state["shards"][chunk_name] = {
77
- "status": "pending",
78
- "hf_path": str(raw_path.name),
79
- "worker": None,
80
- "claimed_at": None,
81
- "error": None,
82
- "retries": 0,
83
- }
84
-
85
  tmp = STATE_FILE + ".tmp"
86
  with open(tmp, "w") as f:
87
  json.dump(state, f, indent=2)
88
  os.replace(tmp, STATE_FILE)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
 
90
- total = len(state["shards"])
91
- print(f"\nβœ“ Done β€” {total} chunks registered in state.json")
 
2
  import json
3
  import re
4
  import gc
5
+ import requests
6
  from pathlib import Path
7
  import pyarrow.parquet as pq
8
  import pyarrow as pa
9
 
10
  # ── Config ───────────────────────────────────────────────────────────────────
11
+ HF_TOKEN = os.environ.get("HF_TOKEN")
12
+ DATASET_REPO = "HuggingFaceFW/fineweb-edu"
13
+ RAW_DIR = "/data/raw"
14
+ STATE_FILE = "/data/state.json"
15
  ROWS_PER_CHUNK = 50_000
16
 
17
+ def hf_path_from_name(name):
18
+ m = re.match(r"cc(\d{4})-(\d+)_(\d+)\.parquet", name)
19
  if m:
20
+ year_week = f"{m.group(1)}-{m.group(2)}"
21
+ idx = int(m.group(3))
22
+ return f"data/CC-MAIN-{year_week}/{idx//1000:03d}_{idx:05d}.parquet"
23
+ return None
24
+
25
+ def download_file(hf_path, dest_path):
26
+ url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{hf_path}"
27
+ tmp_path = dest_path.with_suffix(".tmp")
28
+ print(f" Downloading: {hf_path}")
29
+ try:
30
+ resp = requests.get(
31
+ url,
32
+ headers={"Authorization": f"Bearer {HF_TOKEN}"},
33
+ timeout=300,
34
+ stream=True,
35
+ )
36
+ resp.raise_for_status()
37
+ with open(tmp_path, "wb") as f:
38
+ for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
39
+ f.write(chunk)
40
+ tmp_path.rename(dest_path)
41
+ print(f" βœ“ Downloaded: {dest_path.name}")
42
+ return True
43
+ except Exception as e:
44
+ print(f" βœ— Download failed: {e}")
45
+ tmp_path.unlink(missing_ok=True)
46
+ return False
47
 
48
  def split_file(raw_path):
49
  name = raw_path.name
 
52
  try:
53
  pf = pq.ParquetFile(raw_path)
54
  except Exception as e:
55
+ print(f" βœ— Corrupt (open): {name} β€” {e}")
56
+ raw_path.unlink(missing_ok=True)
57
+ return None
58
 
59
  chunk_paths = []
60
  chunk_idx = 0
61
  current = []
62
 
63
+ try:
64
+ for batch in pf.iter_batches(batch_size=10_000, columns=["text"]):
65
+ current.append(batch)
66
+ if sum(len(b) for b in current) >= ROWS_PER_CHUNK:
67
+ chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
68
+ chunk_path = Path(RAW_DIR) / chunk_name
69
+ table = pa.Table.from_batches(current)
70
+ pq.write_table(table, chunk_path)
71
+ print(f" βœ“ {chunk_name} ({len(table):,} rows)")
72
+ chunk_paths.append(chunk_name)
73
+ chunk_idx += 1
74
+ current = []
75
+ del table
76
+ gc.collect()
77
+ except Exception as e:
78
+ print(f" βœ— Corrupt (read): {name} β€” {e}")
79
+ for c in chunk_paths:
80
+ Path(RAW_DIR, c).unlink(missing_ok=True)
81
+ raw_path.unlink(missing_ok=True)
82
+ return None
83
 
84
  if current:
85
  chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
 
94
  return chunk_paths
95
 
96
  # ── Main ──────────────────────────────────────────────────────────────────────
97
+ raw_files = [f for f in sorted(Path(RAW_DIR).glob("*.parquet")) if "_chunk" not in f.name]
98
  print(f"Found {len(raw_files)} unsplit files in /data/raw")
99
 
100
+ # ── Load or create state ──────────────────────────────────────────────────────
101
+ if os.path.exists(STATE_FILE):
102
+ with open(STATE_FILE) as f:
103
+ state = json.load(f)
104
+ print(f"Loaded existing state β€” {len(state['shards'])} shards, {len(state.get('queue', []))} queued")
105
+ else:
106
+ state = {"shards": {}, "queue": []}
107
+ print("Fresh state")
108
 
109
+ # ── Step 1: write all unsplit files to queue and delete them ──────────────────
110
+ print("\n── Step 1: queuing and deleting unsplit files ──")
111
+ for raw_path in raw_files:
112
+ hf_path = hf_path_from_name(raw_path.name)
113
+ if not hf_path:
114
+ print(f" βœ— Could not derive hf_path for {raw_path.name}, skipping")
115
  continue
116
+ if hf_path not in state.get("queue", []):
117
+ state.setdefault("queue", []).append(hf_path)
118
+ print(f" β†Ί Queued: {hf_path}")
119
+ raw_path.unlink(missing_ok=True)
120
+ print(f" πŸ—‘ Deleted: {raw_path.name}")
121
 
122
+ # save state with queue populated
 
 
 
 
 
 
 
 
 
 
 
 
 
123
  tmp = STATE_FILE + ".tmp"
124
  with open(tmp, "w") as f:
125
  json.dump(state, f, indent=2)
126
  os.replace(tmp, STATE_FILE)
127
+ print(f"\nβœ“ State saved β€” {len(state['queue'])} in queue")
128
+
129
+ # ── Step 2: download, split, register ────────────────────────────────────────
130
+ print("\n── Step 2: downloading and splitting ──")
131
+ for hf_path in list(state["queue"]):
132
+ # derive name from hf_path
133
+ m = re.search(r"CC-MAIN-(\d{4}-\d+)/\d+_(\d+)\.parquet", hf_path)
134
+ if not m:
135
+ print(f" βœ— Could not parse hf_path: {hf_path}, skipping")
136
+ continue
137
+
138
+ name = f"cc{m.group(1)}_{int(m.group(2)):06d}.parquet"
139
+ raw_path = Path(RAW_DIR) / name
140
+
141
+ # check if already split from a previous run
142
+ existing_chunks = sorted(Path(RAW_DIR).glob(f"{name.replace('.parquet', '')}_chunk*.parquet"))
143
+ if existing_chunks:
144
+ print(f" βœ“ Already split: {name}")
145
+ state["queue"].remove(hf_path)
146
+ for chunk in existing_chunks:
147
+ if chunk.name not in state["shards"]:
148
+ state["shards"][chunk.name] = {
149
+ "status": "pending",
150
+ "hf_path": hf_path,
151
+ "worker": None,
152
+ "claimed_at": None,
153
+ "error": None,
154
+ "retries": 0,
155
+ }
156
+ save_tmp = STATE_FILE + ".tmp"
157
+ with open(save_tmp, "w") as f:
158
+ json.dump(state, f, indent=2)
159
+ os.replace(save_tmp, STATE_FILE)
160
+ continue
161
+
162
+ # download
163
+ success = download_file(hf_path, raw_path)
164
+ if not success:
165
+ print(f" βœ— Download failed, leaving in queue: {hf_path}")
166
+ continue
167
+
168
+ # split
169
+ chunk_names = split_file(raw_path)
170
+ if chunk_names is None:
171
+ print(f" βœ— Still corrupt after download, leaving in queue: {hf_path}")
172
+ continue
173
+
174
+ # delete original
175
+ raw_path.unlink(missing_ok=True)
176
+ print(f" πŸ—‘ Deleted original: {name}")
177
+
178
+ # register chunks and pop from queue
179
+ state["queue"].remove(hf_path)
180
+ for chunk_name in chunk_names:
181
+ if chunk_name not in state["shards"]:
182
+ state["shards"][chunk_name] = {
183
+ "status": "pending",
184
+ "hf_path": hf_path,
185
+ "worker": None,
186
+ "claimed_at": None,
187
+ "error": None,
188
+ "retries": 0,
189
+ }
190
+
191
+ # save after every file so restarts are safe
192
+ save_tmp = STATE_FILE + ".tmp"
193
+ with open(save_tmp, "w") as f:
194
+ json.dump(state, f, indent=2)
195
+ os.replace(save_tmp, STATE_FILE)
196
+ print(f" βœ“ Registered {len(chunk_names)} chunks for {name}")
197
 
198
+ print(f"\nβœ“ All done β€” {len(state['shards'])} shards in state.json")