Neon-tech commited on
Commit
fa43c6e
Β·
verified Β·
1 Parent(s): f640e06

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +6 -5
app.py CHANGED
@@ -33,7 +33,6 @@ def serve():
33
  conn.close()
34
 
35
  # ── Friendly name ─────────────────────────────────────────────────────────────
36
- # data/CC-MAIN-2025-05/000_00042.parquet β†’ cc2025-05_000042.parquet
37
  def friendly_name(hf_path):
38
  m = re.search(r"CC-MAIN-(\d{4}-\d+)/\d+_(\d+)\.parquet", hf_path)
39
  if m:
@@ -62,7 +61,7 @@ def save_state(state):
62
  json.dump(state, f, indent=2)
63
  os.replace(tmp, STATE_FILE)
64
 
65
- # ── Discover β€” builds queue only, does NOT touch state["shards"] ──────────────
66
  def discover_queue(state):
67
  print("Discovering shards from HF...")
68
  files = api.list_repo_files(DATASET_REPO, repo_type="dataset")
@@ -90,7 +89,7 @@ def reclaim_stale(state):
90
  if reclaimed:
91
  save_state(state)
92
 
93
- # ── Download loop β€” pops queue, downloads, THEN adds to state as pending ──────
94
  def download_loop(state):
95
  base_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/"
96
 
@@ -123,6 +122,7 @@ def download_loop(state):
123
  hf_path = state["queue"][0]
124
  name = friendly_name(hf_path)
125
  raw_path = Path(RAW_DIR) / name
 
126
  url = base_url + hf_path
127
 
128
  print(f" Downloading: {hf_path} β†’ {name}")
@@ -134,15 +134,16 @@ def download_loop(state):
134
  stream=True,
135
  )
136
  resp.raise_for_status()
137
- with open(raw_path, "wb") as f:
138
  for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
139
  f.write(chunk)
 
140
  except Exception as e:
141
  print(f" βœ— Failed: {e} β€” retrying in 30s")
 
142
  time.sleep(30)
143
  continue
144
 
145
- # Only now: pop from queue, add to state as pending
146
  state["queue"].pop(0)
147
  state["shards"][name] = {
148
  "status": "pending",
 
33
  conn.close()
34
 
35
  # ── Friendly name ─────────────────────────────────────────────────────────────
 
36
  def friendly_name(hf_path):
37
  m = re.search(r"CC-MAIN-(\d{4}-\d+)/\d+_(\d+)\.parquet", hf_path)
38
  if m:
 
61
  json.dump(state, f, indent=2)
62
  os.replace(tmp, STATE_FILE)
63
 
64
+ # ── Discover ──────────────────────────────────────────────────────────────────
65
  def discover_queue(state):
66
  print("Discovering shards from HF...")
67
  files = api.list_repo_files(DATASET_REPO, repo_type="dataset")
 
89
  if reclaimed:
90
  save_state(state)
91
 
92
+ # ── Download loop ─────────────────────────────────────────────────────────────
93
  def download_loop(state):
94
  base_url = f"https://huggingface.co/datasets/{DATASET_REPO}/resolve/main/"
95
 
 
122
  hf_path = state["queue"][0]
123
  name = friendly_name(hf_path)
124
  raw_path = Path(RAW_DIR) / name
125
+ tmp_path = Path(RAW_DIR) / f"{name}.tmp"
126
  url = base_url + hf_path
127
 
128
  print(f" Downloading: {hf_path} β†’ {name}")
 
134
  stream=True,
135
  )
136
  resp.raise_for_status()
137
+ with open(tmp_path, "wb") as f:
138
  for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
139
  f.write(chunk)
140
+ tmp_path.rename(raw_path) # ← atomic, only visible when complete
141
  except Exception as e:
142
  print(f" βœ— Failed: {e} β€” retrying in 30s")
143
+ tmp_path.unlink(missing_ok=True) # ← clean up partial
144
  time.sleep(30)
145
  continue
146
 
 
147
  state["queue"].pop(0)
148
  state["shards"][name] = {
149
  "status": "pending",