import os import csv import gzip import json import time import requests URLS = [ #"https://codeberg.org/astronexus/athyg/media/branch/main/data/athyg_v33-1.csv.gz", #"https://codeberg.org/astronexus/athyg/media/branch/main/data/athyg_v33-2.csv.gz", "https://codeberg.org/astronexus/athyg/media/branch/main/data/subsets/athyg_33_hyg_ids.csv.gz", ] DATA_DIR = "data" STATUS_FILE = os.path.join(DATA_DIR, "status.json") INDEX_FILE = os.path.join(DATA_DIR, "index.json") CHUNK_SIZE = 5000 os.makedirs(DATA_DIR, exist_ok=True) def set_status(stage, detail="", progress=None, error=None): with open(STATUS_FILE, "w") as f: json.dump({ "stage": stage, "detail": detail, "progress": progress, "error": error }, f) def download_with_retry(url, path, retries=6): last_error = None for attempt in range(retries): try: set_status("downloading", url, progress=None) r = requests.get(url, stream=True, timeout=60) r.raise_for_status() tmp_path = path + ".part" with open(tmp_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) os.replace(tmp_path, path) # atomic move return True except Exception as e: last_error = str(e) set_status( "retrying", url, error=f"attempt {attempt+1}: {last_error}" ) time.sleep(min(2 ** attempt, 30)) # cap backoff # ❗ DO NOT crash pipeline anymore set_status( "failed_file", url, error=f"giving up after {retries} retries: {last_error}" ) return False def open_chunk(chunk_id): path = os.path.join(DATA_DIR, f"chunk_{chunk_id}.json") f = open(path, "w", encoding="utf-8") f.write("[") return f def close_chunk(f): f.write("]") f.close() chunk_id = 0 row_count = 0 first = True set_status("starting") out = open_chunk(chunk_id) chunk_files = [] for i, url in enumerate(URLS): gz_path = os.path.join(DATA_DIR, f"file_{i}.csv.gz") ok = download_with_retry(url, gz_path) if not ok: continue # skip this dataset instead of crashing set_status("processing", url) with gzip.open(gz_path, "rt", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) for row in reader: if row_count >= CHUNK_SIZE: close_chunk(out) chunk_files.append(f"chunk_{chunk_id}.json") chunk_id += 1 out = open_chunk(chunk_id) row_count = 0 first = True if not first: out.write(",") json.dump(row, out, separators=(",", ":")) first = False row_count += 1 close_chunk(out) chunk_files.append(f"chunk_{chunk_id}.json") with open(INDEX_FILE, "w") as f: json.dump({"chunks": chunk_files}, f) set_status("done", "complete", progress=100)