Neon-tech commited on
Commit
f640e06
Β·
verified Β·
1 Parent(s): ab1dbd4

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +79 -80
app.py CHANGED
@@ -3,23 +3,21 @@ import json
3
  import time
4
  import socket
5
  import threading
6
- import io
7
  import requests
8
  from pathlib import Path
9
- from huggingface_hub import HfApi, list_repo_tree
10
 
11
  # ── Config ───────────────────────────────────────────────────────────────────
12
- HF_TOKEN = os.environ.get("HF_TOKEN")
13
- DATASET_REPO = "HuggingFaceFW/fineweb-edu"
14
- RAW_DIR = "/data/raw"
15
- STATE_FILE = "/data/state.json"
16
- WORKER_TIMEOUT = 600 # 10 min β€” reclaim stale claimed shards
17
-
18
- # CC-MAIN-2025 prefix filter
19
- CC_PREFIX = "data/CC-MAIN-2025"
20
 
21
  os.makedirs(RAW_DIR, exist_ok=True)
22
-
23
  api = HfApi(token=HF_TOKEN)
24
 
25
  # ── Keep-alive ────────────────────────────────────────────────────────────────
@@ -34,18 +32,27 @@ def serve():
34
  conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
35
  conn.close()
36
 
 
 
 
 
 
 
 
 
37
  # ── State ─────────────────────────────────────────────────────────────────────
38
  def load_state():
39
  if os.path.exists(STATE_FILE):
40
  with open(STATE_FILE) as f:
41
  state = json.load(f)
42
- total = len(state["shards"])
43
- done = sum(1 for s in state["shards"].values() if s["status"] == "done")
44
- claimed = sum(1 for s in state["shards"].values() if s["status"] == "claimed")
45
- pending = sum(1 for s in state["shards"].values() if s["status"] == "pending")
46
- print(f"Resuming β€” {done} done / {claimed} claimed / {pending} pending / {total} total")
 
47
  else:
48
- state = {"shards": {}}
49
  print("Starting fresh")
50
  return state
51
 
@@ -55,88 +62,70 @@ def save_state(state):
55
  json.dump(state, f, indent=2)
56
  os.replace(tmp, STATE_FILE)
57
 
58
- # ── Discover all CC-MAIN-2025 parquet files ───────────────────────────────────
59
- def discover_shards(state):
60
  print("Discovering shards from HF...")
61
  files = api.list_repo_files(DATASET_REPO, repo_type="dataset")
 
62
  new_count = 0
63
  for f in files:
64
- if f.startswith(CC_PREFIX) and f.endswith(".parquet"):
65
- if f not in state["shards"]:
66
- state["shards"][f] = {
67
- "status": "pending",
68
- "worker": None,
69
- "claimed_at": None,
70
- }
71
- new_count += 1
72
- print(f"βœ“ {new_count} new shards discovered | {len(state['shards'])} total")
73
  save_state(state)
74
 
75
  # ── Reclaim timed-out shards ──────────────────────────────────────────────────
76
  def reclaim_stale(state):
77
  now = time.time()
78
  reclaimed = 0
79
- for shard, info in state["shards"].items():
80
  if info["status"] == "claimed" and info["claimed_at"]:
81
  if now - info["claimed_at"] > WORKER_TIMEOUT:
82
- print(f" ⚠ Reclaiming stale shard: {shard} (worker: {info['worker']})")
83
  info["status"] = "pending"
84
  info["worker"] = None
85
  info["claimed_at"] = None
86
  reclaimed += 1
87
  if reclaimed:
88
  save_state(state)
89
- return reclaimed
90
 
91
- # ── Download pending shards to /data/raw ─────────────────────────────────────
92
  def download_loop(state):
93
  base_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/"
94
 
95
  while True:
96
- # Reclaim stale first
97
- reclaim_stale(state)
98
-
99
- # Reload state to pick up worker updates
100
- if os.path.exists(STATE_FILE):
101
  with open(STATE_FILE) as f:
102
- state["shards"] = json.load(f)["shards"]
 
 
 
 
103
 
104
- # Count how many raw files already sitting in /data/raw (not yet claimed)
105
- raw_files = list(Path(RAW_DIR).glob("*.parquet"))
106
- pending_raw = len(raw_files)
107
 
108
- # Keep at most 4 shards pre-downloaded to avoid filling disk
109
- if pending_raw >= 4:
110
- print(f" Buffer full ({pending_raw} shards waiting) β€” sleeping...")
111
- time.sleep(60)
112
  continue
113
 
114
- # Find next pending shard to download
115
- to_download = None
116
- for shard, info in state["shards"].items():
117
- if info["status"] == "pending":
118
- raw_name = shard.replace("/", "__")
119
- raw_path = Path(RAW_DIR) / raw_name
120
- if not raw_path.exists():
121
- to_download = shard
122
- break
123
-
124
- if not to_download:
125
- done = sum(1 for s in state["shards"].values() if s["status"] == "done")
126
  total = len(state["shards"])
127
- if done == total:
128
  print("βœ“ All shards complete!")
129
  break
130
- print(" Nothing to download right now β€” sleeping...")
131
  time.sleep(60)
132
  continue
133
 
134
- # Download it
135
- url = base_url + to_download
136
- raw_name = to_download.replace("/", "__")
137
- raw_path = Path(RAW_DIR) / raw_name
138
 
139
- print(f" Downloading: {to_download}")
140
  try:
141
  resp = requests.get(
142
  url,
@@ -148,37 +137,47 @@ def download_loop(state):
148
  with open(raw_path, "wb") as f:
149
  for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
150
  f.write(chunk)
151
- print(f" βœ“ Downloaded: {raw_name}")
152
  except Exception as e:
153
- print(f" βœ— Download failed: {e}")
154
  time.sleep(30)
155
  continue
156
 
 
 
 
 
 
 
 
 
 
 
157
  time.sleep(5)
158
 
159
- # ── Monitor loop β€” prints progress ───────────────────────────────────────────
160
- def monitor_loop(state):
161
  while True:
162
  time.sleep(120)
163
- if os.path.exists(STATE_FILE):
164
  with open(STATE_FILE) as f:
165
- s = json.load(f)["shards"]
166
- done = sum(1 for v in s.values() if v["status"] == "done")
167
- claimed = sum(1 for v in s.values() if v["status"] == "claimed")
168
- pending = sum(1 for v in s.values() if v["status"] == "pending")
169
- total = len(s)
 
 
170
  pct = (done / total * 100) if total else 0
171
- print(f"[MONITOR] {done}/{total} done ({pct:.1f}%) | {claimed} active | {pending} pending")
 
 
172
 
173
  # ── Entry point ───────────────────────────────────────────────────────────────
174
  if __name__ == "__main__":
175
  threading.Thread(target=serve, daemon=True).start()
176
-
177
  state = load_state()
178
- discover_shards(state)
179
-
180
- threading.Thread(target=monitor_loop, args=(state,), daemon=True).start()
181
  threading.Thread(target=download_loop, args=(state,), daemon=True).start()
182
-
183
  while True:
184
  time.sleep(60)
 
3
  import time
4
  import socket
5
  import threading
6
+ import re
7
  import requests
8
  from pathlib import Path
9
+ from huggingface_hub import HfApi
10
 
11
  # ── Config ───────────────────────────────────────────────────────────────────
12
+ HF_TOKEN = os.environ.get("HF_TOKEN")
13
+ DATASET_REPO = "HuggingFaceFW/fineweb-edu"
14
+ RAW_DIR = "/data/raw"
15
+ STATE_FILE = "/data/state.json"
16
+ WORKER_TIMEOUT = 600
17
+ MAX_BUFFERED = 4
18
+ CC_PREFIX = "data/CC-MAIN-2025"
 
19
 
20
  os.makedirs(RAW_DIR, exist_ok=True)
 
21
  api = HfApi(token=HF_TOKEN)
22
 
23
  # ── Keep-alive ────────────────────────────────────────────────────────────────
 
32
  conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
33
  conn.close()
34
 
35
+ # ── Friendly name ─────────────────────────────────────────────────────────────
36
+ # data/CC-MAIN-2025-05/000_00042.parquet β†’ cc2025-05_000042.parquet
37
+ def friendly_name(hf_path):
38
+ m = re.search(r"CC-MAIN-(\d{4}-\d+)/\d+_(\d+)\.parquet", hf_path)
39
+ if m:
40
+ return f"cc{m.group(1)}_{int(m.group(2)):06d}.parquet"
41
+ return hf_path.replace("/", "__")
42
+
43
  # ── State ─────────────────────────────────────────────────────────────────────
44
  def load_state():
45
  if os.path.exists(STATE_FILE):
46
  with open(STATE_FILE) as f:
47
  state = json.load(f)
48
+ shards = state["shards"]
49
+ queue = state.get("queue", [])
50
+ done = sum(1 for v in shards.values() if v["status"] == "done")
51
+ claimed = sum(1 for v in shards.values() if v["status"] == "claimed")
52
+ pending = sum(1 for v in shards.values() if v["status"] == "pending")
53
+ print(f"Resuming β€” {done} done / {claimed} claimed / {pending} buffered / {len(queue)} queued")
54
  else:
55
+ state = {"shards": {}, "queue": []}
56
  print("Starting fresh")
57
  return state
58
 
 
62
  json.dump(state, f, indent=2)
63
  os.replace(tmp, STATE_FILE)
64
 
65
+ # ── Discover β€” builds queue only, does NOT touch state["shards"] ──────────────
66
+ def discover_queue(state):
67
  print("Discovering shards from HF...")
68
  files = api.list_repo_files(DATASET_REPO, repo_type="dataset")
69
+ known = {v["hf_path"] for v in state["shards"].values()} | set(state.get("queue", []))
70
  new_count = 0
71
  for f in files:
72
+ if f.startswith(CC_PREFIX) and f.endswith(".parquet") and f not in known:
73
+ state["queue"].append(f)
74
+ new_count += 1
75
+ print(f"βœ“ {new_count} queued | {len(state['queue'])} in queue | {len(state['shards'])} in state")
 
 
 
 
 
76
  save_state(state)
77
 
78
  # ── Reclaim timed-out shards ──────────────────────────────────────────────────
79
  def reclaim_stale(state):
80
  now = time.time()
81
  reclaimed = 0
82
+ for name, info in state["shards"].items():
83
  if info["status"] == "claimed" and info["claimed_at"]:
84
  if now - info["claimed_at"] > WORKER_TIMEOUT:
85
+ print(f" ⚠ Reclaiming: {name} (worker: {info['worker']})")
86
  info["status"] = "pending"
87
  info["worker"] = None
88
  info["claimed_at"] = None
89
  reclaimed += 1
90
  if reclaimed:
91
  save_state(state)
 
92
 
93
+ # ── Download loop β€” pops queue, downloads, THEN adds to state as pending ──────
94
  def download_loop(state):
95
  base_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/"
96
 
97
  while True:
98
+ try:
 
 
 
 
99
  with open(STATE_FILE) as f:
100
+ fresh = json.load(f)
101
+ state["shards"] = fresh["shards"]
102
+ state["queue"] = fresh.get("queue", [])
103
+ except Exception:
104
+ pass
105
 
106
+ reclaim_stale(state)
 
 
107
 
108
+ buffered = sum(1 for v in state["shards"].values() if v["status"] == "pending")
109
+ if buffered >= MAX_BUFFERED:
110
+ time.sleep(30)
 
111
  continue
112
 
113
+ if not state["queue"]:
114
+ done = sum(1 for v in state["shards"].values() if v["status"] == "done")
 
 
 
 
 
 
 
 
 
 
115
  total = len(state["shards"])
116
+ if done == total and total > 0:
117
  print("βœ“ All shards complete!")
118
  break
119
+ print(" Queue empty β€” sleeping...")
120
  time.sleep(60)
121
  continue
122
 
123
+ hf_path = state["queue"][0]
124
+ name = friendly_name(hf_path)
125
+ raw_path = Path(RAW_DIR) / name
126
+ url = base_url + hf_path
127
 
128
+ print(f" Downloading: {hf_path} β†’ {name}")
129
  try:
130
  resp = requests.get(
131
  url,
 
137
  with open(raw_path, "wb") as f:
138
  for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
139
  f.write(chunk)
 
140
  except Exception as e:
141
+ print(f" βœ— Failed: {e} β€” retrying in 30s")
142
  time.sleep(30)
143
  continue
144
 
145
+ # Only now: pop from queue, add to state as pending
146
+ state["queue"].pop(0)
147
+ state["shards"][name] = {
148
+ "status": "pending",
149
+ "hf_path": hf_path,
150
+ "worker": None,
151
+ "claimed_at": None,
152
+ }
153
+ save_state(state)
154
+ print(f" βœ“ Ready: {name}")
155
  time.sleep(5)
156
 
157
+ # ── Monitor ───────────────────────────────────────────────────────────────────
158
+ def monitor_loop():
159
  while True:
160
  time.sleep(120)
161
+ try:
162
  with open(STATE_FILE) as f:
163
+ s = json.load(f)
164
+ shards = s["shards"]
165
+ queue = s.get("queue", [])
166
+ done = sum(1 for v in shards.values() if v["status"] == "done")
167
+ claimed = sum(1 for v in shards.values() if v["status"] == "claimed")
168
+ pending = sum(1 for v in shards.values() if v["status"] == "pending")
169
+ total = len(shards) + len(queue)
170
  pct = (done / total * 100) if total else 0
171
+ print(f"[MONITOR] {done}/{total} ({pct:.1f}%) | {claimed} active | {pending} buffered | {len(queue)} queued")
172
+ except Exception:
173
+ pass
174
 
175
  # ── Entry point ───────────────────────────────────────────────────────────────
176
  if __name__ == "__main__":
177
  threading.Thread(target=serve, daemon=True).start()
 
178
  state = load_state()
179
+ discover_queue(state)
180
+ threading.Thread(target=monitor_loop, daemon=True).start()
 
181
  threading.Thread(target=download_loop, args=(state,), daemon=True).start()
 
182
  while True:
183
  time.sleep(60)