Neon-tech commited on
Commit
af01b43
·
verified ·
1 Parent(s): e80aacb

Delete app.py

Browse files
Files changed (1) hide show
  1. app.py +0 -198
app.py DELETED
@@ -1,198 +0,0 @@
1
- import os
2
- import json
3
- import re
4
- import gc
5
- import requests
6
- from pathlib import Path
7
- import pyarrow.parquet as pq
8
- import pyarrow as pa
9
-
10
- # ── Config ───────────────────────────────────────────────────────────────────
11
- HF_TOKEN = os.environ.get("HF_TOKEN")
12
- DATASET_REPO = "HuggingFaceFW/fineweb-edu"
13
- RAW_DIR = "/data/raw"
14
- STATE_FILE = "/data/state.json"
15
- ROWS_PER_CHUNK = 50_000
16
-
17
- def hf_path_from_name(name):
18
- m = re.match(r"cc(\d{4})-(\d+)_(\d+)\.parquet", name)
19
- if m:
20
- year_week = f"{m.group(1)}-{m.group(2)}"
21
- idx = int(m.group(3))
22
- return f"data/CC-MAIN-{year_week}/{idx//1000:03d}_{idx:05d}.parquet"
23
- return None
24
-
25
- def download_file(hf_path, dest_path):
26
- url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/{hf_path}"
27
- tmp_path = dest_path.with_suffix(".tmp")
28
- print(f" Downloading: {hf_path}")
29
- try:
30
- resp = requests.get(
31
- url,
32
- headers={"Authorization": f"Bearer {HF_TOKEN}"},
33
- timeout=300,
34
- stream=True,
35
- )
36
- resp.raise_for_status()
37
- with open(tmp_path, "wb") as f:
38
- for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
39
- f.write(chunk)
40
- tmp_path.rename(dest_path)
41
- print(f" ✓ Downloaded: {dest_path.name}")
42
- return True
43
- except Exception as e:
44
- print(f" ✗ Download failed: {e}")
45
- tmp_path.unlink(missing_ok=True)
46
- return False
47
-
48
- def split_file(raw_path):
49
- name = raw_path.name
50
- print(f" Splitting: {name}")
51
-
52
- try:
53
- pf = pq.ParquetFile(raw_path)
54
- except Exception as e:
55
- print(f" ✗ Corrupt (open): {name} — {e}")
56
- raw_path.unlink(missing_ok=True)
57
- return None
58
-
59
- chunk_paths = []
60
- chunk_idx = 0
61
- current = []
62
-
63
- try:
64
- for batch in pf.iter_batches(batch_size=10_000, columns=["text"]):
65
- current.append(batch)
66
- if sum(len(b) for b in current) >= ROWS_PER_CHUNK:
67
- chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
68
- chunk_path = Path(RAW_DIR) / chunk_name
69
- table = pa.Table.from_batches(current)
70
- pq.write_table(table, chunk_path)
71
- print(f" ✓ {chunk_name} ({len(table):,} rows)")
72
- chunk_paths.append(chunk_name)
73
- chunk_idx += 1
74
- current = []
75
- del table
76
- gc.collect()
77
- except Exception as e:
78
- print(f" ✗ Corrupt (read): {name} — {e}")
79
- for c in chunk_paths:
80
- Path(RAW_DIR, c).unlink(missing_ok=True)
81
- raw_path.unlink(missing_ok=True)
82
- return None
83
-
84
- if current:
85
- chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
86
- chunk_path = Path(RAW_DIR) / chunk_name
87
- table = pa.Table.from_batches(current)
88
- pq.write_table(table, chunk_path)
89
- print(f" ✓ {chunk_name} ({len(table):,} rows)")
90
- chunk_paths.append(chunk_name)
91
- del table
92
- gc.collect()
93
-
94
- return chunk_paths
95
-
96
- # ── Main ──────────────────────────────────────────────────────────────────────
97
- raw_files = [f for f in sorted(Path(RAW_DIR).glob("*.parquet")) if "_chunk" not in f.name]
98
- print(f"Found {len(raw_files)} unsplit files in /data/raw")
99
-
100
- # ── Load or create state ──────────────────────────────────────────────────────
101
- if os.path.exists(STATE_FILE):
102
- with open(STATE_FILE) as f:
103
- state = json.load(f)
104
- print(f"Loaded existing state — {len(state['shards'])} shards, {len(state.get('queue', []))} queued")
105
- else:
106
- state = {"shards": {}, "queue": []}
107
- print("Fresh state")
108
-
109
- # ── Step 1: write all unsplit files to queue and delete them ──────────────────
110
- print("\n── Step 1: queuing and deleting unsplit files ──")
111
- for raw_path in raw_files:
112
- hf_path = hf_path_from_name(raw_path.name)
113
- if not hf_path:
114
- print(f" ✗ Could not derive hf_path for {raw_path.name}, skipping")
115
- continue
116
- if hf_path not in state.get("queue", []):
117
- state.setdefault("queue", []).append(hf_path)
118
- print(f" ↺ Queued: {hf_path}")
119
- raw_path.unlink(missing_ok=True)
120
- print(f" 🗑 Deleted: {raw_path.name}")
121
-
122
- # save state with queue populated
123
- tmp = STATE_FILE + ".tmp"
124
- with open(tmp, "w") as f:
125
- json.dump(state, f, indent=2)
126
- os.replace(tmp, STATE_FILE)
127
- print(f"\n✓ State saved — {len(state['queue'])} in queue")
128
-
129
- # ── Step 2: download, split, register ───────────���────────────────────────────
130
- print("\n── Step 2: downloading and splitting ──")
131
- for hf_path in list(state["queue"]):
132
- # derive name from hf_path
133
- m = re.search(r"CC-MAIN-(\d{4}-\d+)/\d+_(\d+)\.parquet", hf_path)
134
- if not m:
135
- print(f" ✗ Could not parse hf_path: {hf_path}, skipping")
136
- continue
137
-
138
- name = f"cc{m.group(1)}_{int(m.group(2)):06d}.parquet"
139
- raw_path = Path(RAW_DIR) / name
140
-
141
- # check if already split from a previous run
142
- existing_chunks = sorted(Path(RAW_DIR).glob(f"{name.replace('.parquet', '')}_chunk*.parquet"))
143
- if existing_chunks:
144
- print(f" ✓ Already split: {name}")
145
- state["queue"].remove(hf_path)
146
- for chunk in existing_chunks:
147
- if chunk.name not in state["shards"]:
148
- state["shards"][chunk.name] = {
149
- "status": "pending",
150
- "hf_path": hf_path,
151
- "worker": None,
152
- "claimed_at": None,
153
- "error": None,
154
- "retries": 0,
155
- }
156
- save_tmp = STATE_FILE + ".tmp"
157
- with open(save_tmp, "w") as f:
158
- json.dump(state, f, indent=2)
159
- os.replace(save_tmp, STATE_FILE)
160
- continue
161
-
162
- # download
163
- success = download_file(hf_path, raw_path)
164
- if not success:
165
- print(f" ✗ Download failed, leaving in queue: {hf_path}")
166
- continue
167
-
168
- # split
169
- chunk_names = split_file(raw_path)
170
- if chunk_names is None:
171
- print(f" ✗ Still corrupt after download, leaving in queue: {hf_path}")
172
- continue
173
-
174
- # delete original
175
- raw_path.unlink(missing_ok=True)
176
- print(f" 🗑 Deleted original: {name}")
177
-
178
- # register chunks and pop from queue
179
- state["queue"].remove(hf_path)
180
- for chunk_name in chunk_names:
181
- if chunk_name not in state["shards"]:
182
- state["shards"][chunk_name] = {
183
- "status": "pending",
184
- "hf_path": hf_path,
185
- "worker": None,
186
- "claimed_at": None,
187
- "error": None,
188
- "retries": 0,
189
- }
190
-
191
- # save after every file so restarts are safe
192
- save_tmp = STATE_FILE + ".tmp"
193
- with open(save_tmp, "w") as f:
194
- json.dump(state, f, indent=2)
195
- os.replace(save_tmp, STATE_FILE)
196
- print(f" ✓ Registered {len(chunk_names)} chunks for {name}")
197
-
198
- print(f"\n✓ All done — {len(state['shards'])} shards in state.json")