File size: 3,127 Bytes
cc707b5 3b49a1c cc707b5 3b49a1c cc707b5 231d0e9 1fa428d cc707b5 c0470e8 b9a10e3 040d215 cc707b5 3b49a1c c0470e8 3b49a1c c0470e8 1871e23 b9a10e3 3b49a1c cc707b5 3b49a1c b9a10e3 1871e23 3b49a1c b9a10e3 1871e23 b9a10e3 3b49a1c 1871e23 3b49a1c 1871e23 3b49a1c 1871e23 3b49a1c b9a10e3 cc707b5 3b49a1c c0470e8 cc707b5 3b49a1c 1871e23 3b49a1c c0470e8 b9a10e3 c0470e8 b9a10e3 3b49a1c cc707b5 3b49a1c b9a10e3 cc707b5 b9a10e3 3b49a1c b9a10e3 3b49a1c b9a10e3 3b49a1c cc707b5 3b49a1c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 | import os
import csv
import gzip
import json
import time
import requests
URLS = [
#"https://codeberg.org/astronexus/athyg/media/branch/main/data/athyg_v33-1.csv.gz",
#"https://codeberg.org/astronexus/athyg/media/branch/main/data/athyg_v33-2.csv.gz",
"https://codeberg.org/astronexus/athyg/media/branch/main/data/subsets/athyg_33_hyg_ids.csv.gz",
]
DATA_DIR = "data"
STATUS_FILE = os.path.join(DATA_DIR, "status.json")
INDEX_FILE = os.path.join(DATA_DIR, "index.json")
CHUNK_SIZE = 5000
os.makedirs(DATA_DIR, exist_ok=True)
def set_status(stage, detail="", progress=None, error=None):
with open(STATUS_FILE, "w") as f:
json.dump({
"stage": stage,
"detail": detail,
"progress": progress,
"error": error
}, f)
def download_with_retry(url, path, retries=6):
last_error = None
for attempt in range(retries):
try:
set_status("downloading", url, progress=None)
r = requests.get(url, stream=True, timeout=60)
r.raise_for_status()
tmp_path = path + ".part"
with open(tmp_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
os.replace(tmp_path, path) # atomic move
return True
except Exception as e:
last_error = str(e)
set_status(
"retrying",
url,
error=f"attempt {attempt+1}: {last_error}"
)
time.sleep(min(2 ** attempt, 30)) # cap backoff
# ❗ DO NOT crash pipeline anymore
set_status(
"failed_file",
url,
error=f"giving up after {retries} retries: {last_error}"
)
return False
def open_chunk(chunk_id):
path = os.path.join(DATA_DIR, f"chunk_{chunk_id}.json")
f = open(path, "w", encoding="utf-8")
f.write("[")
return f
def close_chunk(f):
f.write("]")
f.close()
chunk_id = 0
row_count = 0
first = True
set_status("starting")
out = open_chunk(chunk_id)
chunk_files = []
for i, url in enumerate(URLS):
gz_path = os.path.join(DATA_DIR, f"file_{i}.csv.gz")
ok = download_with_retry(url, gz_path)
if not ok:
continue # skip this dataset instead of crashing
set_status("processing", url)
with gzip.open(gz_path, "rt", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
if row_count >= CHUNK_SIZE:
close_chunk(out)
chunk_files.append(f"chunk_{chunk_id}.json")
chunk_id += 1
out = open_chunk(chunk_id)
row_count = 0
first = True
if not first:
out.write(",")
json.dump(row, out, separators=(",", ":"))
first = False
row_count += 1
close_chunk(out)
chunk_files.append(f"chunk_{chunk_id}.json")
with open(INDEX_FILE, "w") as f:
json.dump({"chunks": chunk_files}, f)
set_status("done", "complete", progress=100) |