Neon-tech commited on
Commit
f4e35af
Β·
verified Β·
1 Parent(s): fa43c6e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +60 -10
app.py CHANGED
@@ -5,6 +5,9 @@ import socket
5
  import threading
6
  import re
7
  import requests
 
 
 
8
  from pathlib import Path
9
  from huggingface_hub import HfApi
10
 
@@ -16,6 +19,7 @@ STATE_FILE = "/data/state.json"
16
  WORKER_TIMEOUT = 600
17
  MAX_BUFFERED = 4
18
  CC_PREFIX = "data/CC-MAIN-2025"
 
19
 
20
  os.makedirs(RAW_DIR, exist_ok=True)
21
  api = HfApi(token=HF_TOKEN)
@@ -89,6 +93,39 @@ def reclaim_stale(state):
89
  if reclaimed:
90
  save_state(state)
91
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
  # ── Download loop ─────────────────────────────────────────────────────────────
93
  def download_loop(state):
94
  base_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/"
@@ -137,22 +174,35 @@ def download_loop(state):
137
  with open(tmp_path, "wb") as f:
138
  for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
139
  f.write(chunk)
140
- tmp_path.rename(raw_path) # ← atomic, only visible when complete
141
  except Exception as e:
142
- print(f" βœ— Failed: {e} β€” retrying in 30s")
143
- tmp_path.unlink(missing_ok=True) # ← clean up partial
144
  time.sleep(30)
145
  continue
146
 
 
 
 
 
 
 
 
 
 
 
 
147
  state["queue"].pop(0)
148
- state["shards"][name] = {
149
- "status": "pending",
150
- "hf_path": hf_path,
151
- "worker": None,
152
- "claimed_at": None,
153
- }
 
 
154
  save_state(state)
155
- print(f" βœ“ Ready: {name}")
156
  time.sleep(5)
157
 
158
  # ── Monitor ───────────────────────────────────────────────────────────────────
 
5
  import threading
6
  import re
7
  import requests
8
+ import pyarrow.parquet as pq
9
+ import pyarrow as pa
10
+ import gc
11
  from pathlib import Path
12
  from huggingface_hub import HfApi
13
 
 
19
  WORKER_TIMEOUT = 600
20
  MAX_BUFFERED = 4
21
  CC_PREFIX = "data/CC-MAIN-2025"
22
+ ROWS_PER_CHUNK = 50_000
23
 
24
  os.makedirs(RAW_DIR, exist_ok=True)
25
  api = HfApi(token=HF_TOKEN)
 
93
  if reclaimed:
94
  save_state(state)
95
 
96
+ # ── Split parquet into chunks ─────────────────────────────────────────────────
97
+ def split_parquet(src_path, name):
98
+ pf = pq.ParquetFile(src_path)
99
+ chunk_paths = []
100
+ chunk_idx = 0
101
+ current = []
102
+
103
+ for batch in pf.iter_batches(batch_size=10_000, columns=["text"]):
104
+ current.append(batch)
105
+ if sum(len(b) for b in current) >= ROWS_PER_CHUNK:
106
+ chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
107
+ chunk_path = Path(RAW_DIR) / chunk_name
108
+ table = pa.Table.from_batches(current)
109
+ pq.write_table(table, chunk_path)
110
+ print(f" βœ“ {chunk_name} ({len(table):,} rows)")
111
+ chunk_paths.append(chunk_name)
112
+ chunk_idx += 1
113
+ current = []
114
+ del table
115
+ gc.collect()
116
+
117
+ if current:
118
+ chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
119
+ chunk_path = Path(RAW_DIR) / chunk_name
120
+ table = pa.Table.from_batches(current)
121
+ pq.write_table(table, chunk_path)
122
+ print(f" βœ“ {chunk_name} ({len(table):,} rows)")
123
+ chunk_paths.append(chunk_name)
124
+ del table
125
+ gc.collect()
126
+
127
+ return chunk_paths
128
+
129
  # ── Download loop ─────────────────────────────────────────────────────────────
130
  def download_loop(state):
131
  base_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/"
 
174
  with open(tmp_path, "wb") as f:
175
  for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
176
  f.write(chunk)
177
+ tmp_path.rename(raw_path)
178
  except Exception as e:
179
+ print(f" βœ— Download failed: {e} β€” retrying in 30s")
180
+ tmp_path.unlink(missing_ok=True)
181
  time.sleep(30)
182
  continue
183
 
184
+ print(f" Splitting: {name}")
185
+ try:
186
+ chunk_names = split_parquet(raw_path, name)
187
+ except Exception as e:
188
+ print(f" βœ— Split failed: {e} β€” retrying in 30s")
189
+ raw_path.unlink(missing_ok=True)
190
+ time.sleep(30)
191
+ continue
192
+
193
+ raw_path.unlink(missing_ok=True)
194
+
195
  state["queue"].pop(0)
196
+ for chunk_name in chunk_names:
197
+ state["shards"][chunk_name] = {
198
+ "status": "pending",
199
+ "hf_path": hf_path,
200
+ "worker": None,
201
+ "claimed_at": None,
202
+ "error": None,
203
+ }
204
  save_state(state)
205
+ print(f" βœ“ {len(chunk_names)} chunks ready from {name}")
206
  time.sleep(5)
207
 
208
  # ── Monitor ───────────────────────────────────────────────────────────────────