| import os |
| import csv |
| import gzip |
| import json |
| import time |
| import requests |
|
|
| URLS = [ |
| |
| |
| "https://codeberg.org/astronexus/athyg/media/branch/main/data/subsets/athyg_33_hyg_ids.csv.gz", |
| ] |
|
|
| DATA_DIR = "data" |
| STATUS_FILE = os.path.join(DATA_DIR, "status.json") |
| INDEX_FILE = os.path.join(DATA_DIR, "index.json") |
|
|
| CHUNK_SIZE = 5000 |
|
|
| os.makedirs(DATA_DIR, exist_ok=True) |
|
|
| def set_status(stage, detail="", progress=None, error=None): |
| with open(STATUS_FILE, "w") as f: |
| json.dump({ |
| "stage": stage, |
| "detail": detail, |
| "progress": progress, |
| "error": error |
| }, f) |
|
|
| def download_with_retry(url, path, retries=6): |
| last_error = None |
|
|
| for attempt in range(retries): |
| try: |
| set_status("downloading", url, progress=None) |
|
|
| r = requests.get(url, stream=True, timeout=60) |
| r.raise_for_status() |
|
|
| tmp_path = path + ".part" |
|
|
| with open(tmp_path, "wb") as f: |
| for chunk in r.iter_content(chunk_size=8192): |
| if chunk: |
| f.write(chunk) |
|
|
| os.replace(tmp_path, path) |
| return True |
|
|
| except Exception as e: |
| last_error = str(e) |
| set_status( |
| "retrying", |
| url, |
| error=f"attempt {attempt+1}: {last_error}" |
| ) |
|
|
| time.sleep(min(2 ** attempt, 30)) |
|
|
| |
| set_status( |
| "failed_file", |
| url, |
| error=f"giving up after {retries} retries: {last_error}" |
| ) |
|
|
| return False |
|
|
| def open_chunk(chunk_id): |
| path = os.path.join(DATA_DIR, f"chunk_{chunk_id}.json") |
| f = open(path, "w", encoding="utf-8") |
| f.write("[") |
| return f |
|
|
|
|
| def close_chunk(f): |
| f.write("]") |
| f.close() |
|
|
|
|
| chunk_id = 0 |
| row_count = 0 |
| first = True |
|
|
| set_status("starting") |
|
|
| out = open_chunk(chunk_id) |
| chunk_files = [] |
|
|
| for i, url in enumerate(URLS): |
| gz_path = os.path.join(DATA_DIR, f"file_{i}.csv.gz") |
|
|
| ok = download_with_retry(url, gz_path) |
|
|
| if not ok: |
| continue |
|
|
| set_status("processing", url) |
|
|
| with gzip.open(gz_path, "rt", encoding="utf-8", newline="") as f: |
| reader = csv.DictReader(f) |
|
|
| for row in reader: |
| if row_count >= CHUNK_SIZE: |
| close_chunk(out) |
| chunk_files.append(f"chunk_{chunk_id}.json") |
|
|
| chunk_id += 1 |
| out = open_chunk(chunk_id) |
| row_count = 0 |
| first = True |
|
|
| if not first: |
| out.write(",") |
|
|
| json.dump(row, out, separators=(",", ":")) |
| first = False |
| row_count += 1 |
|
|
| close_chunk(out) |
| chunk_files.append(f"chunk_{chunk_id}.json") |
|
|
| with open(INDEX_FILE, "w") as f: |
| json.dump({"chunks": chunk_files}, f) |
|
|
| set_status("done", "complete", progress=100) |