athyg / build_data.py
1TSnakers's picture
Update build_data.py
1fa428d verified
import os
import csv
import gzip
import json
import time
import requests
URLS = [
#"https://codeberg.org/astronexus/athyg/media/branch/main/data/athyg_v33-1.csv.gz",
#"https://codeberg.org/astronexus/athyg/media/branch/main/data/athyg_v33-2.csv.gz",
"https://codeberg.org/astronexus/athyg/media/branch/main/data/subsets/athyg_33_hyg_ids.csv.gz",
]
DATA_DIR = "data"
STATUS_FILE = os.path.join(DATA_DIR, "status.json")
INDEX_FILE = os.path.join(DATA_DIR, "index.json")
CHUNK_SIZE = 5000
os.makedirs(DATA_DIR, exist_ok=True)
def set_status(stage, detail="", progress=None, error=None):
with open(STATUS_FILE, "w") as f:
json.dump({
"stage": stage,
"detail": detail,
"progress": progress,
"error": error
}, f)
def download_with_retry(url, path, retries=6):
last_error = None
for attempt in range(retries):
try:
set_status("downloading", url, progress=None)
r = requests.get(url, stream=True, timeout=60)
r.raise_for_status()
tmp_path = path + ".part"
with open(tmp_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
os.replace(tmp_path, path) # atomic move
return True
except Exception as e:
last_error = str(e)
set_status(
"retrying",
url,
error=f"attempt {attempt+1}: {last_error}"
)
time.sleep(min(2 ** attempt, 30)) # cap backoff
# ❗ DO NOT crash pipeline anymore
set_status(
"failed_file",
url,
error=f"giving up after {retries} retries: {last_error}"
)
return False
def open_chunk(chunk_id):
path = os.path.join(DATA_DIR, f"chunk_{chunk_id}.json")
f = open(path, "w", encoding="utf-8")
f.write("[")
return f
def close_chunk(f):
f.write("]")
f.close()
chunk_id = 0
row_count = 0
first = True
set_status("starting")
out = open_chunk(chunk_id)
chunk_files = []
for i, url in enumerate(URLS):
gz_path = os.path.join(DATA_DIR, f"file_{i}.csv.gz")
ok = download_with_retry(url, gz_path)
if not ok:
continue # skip this dataset instead of crashing
set_status("processing", url)
with gzip.open(gz_path, "rt", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
if row_count >= CHUNK_SIZE:
close_chunk(out)
chunk_files.append(f"chunk_{chunk_id}.json")
chunk_id += 1
out = open_chunk(chunk_id)
row_count = 0
first = True
if not first:
out.write(",")
json.dump(row, out, separators=(",", ":"))
first = False
row_count += 1
close_chunk(out)
chunk_files.append(f"chunk_{chunk_id}.json")
with open(INDEX_FILE, "w") as f:
json.dump({"chunks": chunk_files}, f)
set_status("done", "complete", progress=100)