{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": "# Build Subset — PHASE 2 (Google Colab)\n\nChạy **sau** `build_subset_local.ipynb`. Input = `subset_bundle.zip` đã upload lên Drive.\n\n**Pipeline:**\n1. Mount Drive + install deps\n2. Config + credentials\n3. Giải nén bundle + load manifest\n4. Định nghĩa `dl()` + log checkpoint\n5. Test 3 ảnh đo tốc độ\n6. Tải ~50k ảnh từ PhysioNet (resume-safe)\n7. Kiểm tra ảnh còn thiếu\n8. Copy reports + vqa vào package, sanity check\n9. Push Hugging Face (`hieu3636/cxr-vlm-data/MIMIC-CXR_processed`)\n\n**Cấu trúc kết quả:** `files/pXX/pSUBJ/sSTUDY/.jpg` + `sSTUDY.txt`, giữ tên gốc.", "id": "f1b45232" }, { "cell_type": "markdown", "metadata": {}, "source": "## 0. Setup + Config", "id": "c00c34d2" }, { "cell_type": "code", "metadata": {}, "source": "import sys, os\nIN_COLAB = \"google.colab\" in sys.modules\nif IN_COLAB:\n from google.colab import drive\n drive.mount(\"/content/drive\")\n !pip -q install huggingface_hub tqdm\nprint(\"IN_COLAB =\", IN_COLAB)", "id": "ea6636b7", "execution_count": null, "outputs": [] }, { "cell_type": "code", "metadata": {}, "source": "from pathlib import Path\nimport os, getpass, zipfile, json, time, shutil\n\nDRIVE = Path(\"/content/drive/MyDrive\")\nBUNDLE_ZIP = DRIVE / \"subset_bundle.zip\"\nBUNDLE_DIR = Path(\"/content/subset_bundle\")\nOUT = DRIVE / \"MIMIC-CXR_processed\"\n\nHF_REPO_ID = \"hieu3636/cxr-vlm-data\"\nHF_REPO_TYPE = \"dataset\"\nHF_PATH_IN_REPO = \"MIMIC-CXR_processed\"\n\n# ── CREDENTIALS ──────────────────────────────────────────────────────────────\n# Cách 1 (KHUYẾN NGHỊ): Colab Secrets (icon chìa khoá cột trái).\n# Tạo 3 secret: PHYSIONET_USER, PHYSIONET_PASS, HF_TOKEN (bật Notebook access)\n# Cách 2: gõ thẳng vào _HARDCODE_* (đừng commit notebook có pass)\n# Cách 3: để trống -> nhập tay khi chạy\n\n_HARDCODE_USER = \"\"\n_HARDCODE_PASS = \"\"\n_HARDCODE_HFTOK = \"\"\n\ndef _get(name, hard):\n if hard: return hard\n try:\n from google.colab import userdata\n v = userdata.get(name)\n if v: return v\n except Exception: pass\n return os.environ.get(name)\n\nPHYSIONET_USER = _get(\"PHYSIONET_USER\", _HARDCODE_USER) or input(\"PhysioNet username: \")\nPHYSIONET_PASS = _get(\"PHYSIONET_PASS\", _HARDCODE_PASS) or getpass.getpass(\"PhysioNet password: \")\nHF_TOKEN = _get(\"HF_TOKEN\", _HARDCODE_HFTOK) or getpass.getpass(\"HF write token: \")\n\nVQA_OUT = {\"train\":\"vqa.json\",\"val\":\"vqa_val.json\",\"test\":\"vqa_test.json\"}\nOUT.mkdir(parents=True, exist_ok=True)\nprint(\"Credentials OK | bundle zip exists:\", BUNDLE_ZIP.exists())", "id": "c31ae355", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": "## 1. Giải nén bundle + load manifest", "id": "1d7009eb" }, { "cell_type": "code", "metadata": {}, "source": "BUNDLE_DIR.mkdir(parents=True, exist_ok=True)\nwith zipfile.ZipFile(BUNDLE_ZIP) as z:\n z.extractall(BUNDLE_DIR)\n\nmanifests = {sp: json.load(open(BUNDLE_DIR/f\"manifest_{sp}.json\", encoding=\"utf-8\"))\n for sp in (\"train\",\"val\",\"test\")}\nfor sp, r in manifests.items():\n print(f\"{sp}: {len(r):,} studies\")\nall_rows = manifests[\"train\"]+manifests[\"val\"]+manifests[\"test\"]\nprint(\"TOTAL ảnh cần tải:\", len(all_rows))", "id": "e249a91d", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": "## 2. Định nghĩa dl() + log checkpoint\n\nPhysioNet từ chối `requests` basic-auth nhưng OK với `wget` → dùng wget per-file, 12 luồng.\nLog per-file ghi vào `/content` (SSD, ~µs); cứ 500 ảnh checkpoint sang Drive.", "id": "6ee18052" }, { "cell_type": "code", "metadata": {}, "source": "import subprocess, threading\nfrom concurrent.futures import ThreadPoolExecutor, as_completed\nfrom collections import Counter\nfrom tqdm.auto import tqdm\n\nDL_LOG_LOCAL = Path(\"/content/downloaded.txt\")\nDL_LOG_DRIVE = OUT / \"downloaded.txt\"\nCHECKPOINT_EVERY = 500\n\n_log_lk = threading.Lock()\n_log_cnt = 0\n\ndef mark_done(relpath):\n global _log_cnt\n with _log_lk:\n with open(DL_LOG_LOCAL, \"a\") as f:\n f.write(relpath + \"\\n\")\n _log_cnt += 1\n if _log_cnt % CHECKPOINT_EVERY == 0:\n try:\n shutil.copy(DL_LOG_LOCAL, DL_LOG_DRIVE)\n except Exception as e:\n print(\" [warn] copy log -> Drive:\", e)\n\ndef flush_log_to_drive():\n with _log_lk:\n if DL_LOG_LOCAL.exists():\n shutil.copy(DL_LOG_LOCAL, DL_LOG_DRIVE)\n\ndef dl(row):\n rp = row[\"image_relpath\"]\n out = OUT / rp\n if out.exists() and out.stat().st_size > 10_000:\n mark_done(rp); return \"skip\"\n out.parent.mkdir(parents=True, exist_ok=True)\n tmp = out.with_suffix(\".part\")\n cmd = [\"wget\", \"-q\", \"-T\", \"60\", \"-t\", \"3\", \"-O\", str(tmp),\n \"--user\", PHYSIONET_USER, \"--password\", PHYSIONET_PASS, row[\"jpg_url\"]]\n rc = subprocess.run(cmd).returncode\n if rc == 0 and tmp.exists() and tmp.stat().st_size > 10_000:\n tmp.replace(out); mark_done(rp); return \"ok\"\n if tmp.exists(): tmp.unlink()\n return f\"fail(rc={rc})\"\n\nprint(f\"dl() sẵn sàng. Log local={DL_LOG_LOCAL}, checkpoint -> Drive mỗi {CHECKPOINT_EVERY} ảnh.\")", "id": "de367660", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": "### TEST 10 ảnh đo tốc độ trước khi tải 50k", "id": "541894bc" }, { "cell_type": "code", "metadata": {}, "source": "N_TEST_IMG = 10\nsample = all_rows[:N_TEST_IMG]\nprint(f\"Test tải {len(sample)} ảnh (đo tốc độ)...\")\n\nt0 = time.time(); total_bytes = 0; ok = 0\nfor row in sample:\n out = OUT / row[\"image_relpath\"]\n if out.exists(): out.unlink()\n st = dl(row)\n if out.exists() and out.stat().st_size > 10_000:\n total_bytes += out.stat().st_size; ok += 1\n print(f\" {row['study_name']:>10s} -> {st}\")\n\ndt = time.time() - t0\nmb = total_bytes / 1e6\nspeed_kbs = (total_bytes/1024) / dt if dt > 0 else 0\nprint(f\"\\n{ok}/{len(sample)} OK | {mb:.1f} MB / {dt:.1f}s = {speed_kbs:,.0f} KB/s \"\n f\"({speed_kbs/1024:.2f} MB/s) [1 luồng]\")\nif ok:\n avg = mb/ok\n h = (len(all_rows)*avg) / (speed_kbs/1024) / 3600\n print(f\"Ảnh ~{avg:.2f} MB | tổng ~{len(all_rows)*avg/1000:.0f} GB | \"\n f\"ETA 12 luồng: ~{h/12*1.6:.1f}-{h/12*3:.1f}h\")\nprint(\"\\n\" + (\"OK — chạy cell TẢI 50k.\" if ok==len(sample)\n else \"FAIL — kiểm tra user/pass, ĐỪNG chạy tải 50k.\"))", "id": "8645fa27", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": "### Tải toàn bộ 50k (resume-safe, log local → Drive checkpoint)", "id": "f6855c0e" }, { "cell_type": "code", "metadata": {}, "source": "# Resume ưu tiên: log local -> log Drive -> os.walk fallback\nif DL_LOG_LOCAL.exists():\n done_set = set(l.strip() for l in open(DL_LOG_LOCAL) if l.strip())\n print(f\"[log local] {len(done_set):,} ảnh đã tải (tức thì).\")\nelif DL_LOG_DRIVE.exists():\n shutil.copy(DL_LOG_DRIVE, DL_LOG_LOCAL)\n done_set = set(l.strip() for l in open(DL_LOG_LOCAL) if l.strip())\n print(f\"[log Drive] session mới: {len(done_set):,} ảnh (tức thì).\")\nelse:\n print(\"Chưa có log -> quét Drive 1 lần để dựng log...\")\n done_set = set()\n froot = OUT / \"files\"\n if froot.exists():\n for dp, _, fns in os.walk(froot):\n for fn in fns:\n if fn.endswith(\".jpg\"):\n done_set.add(os.path.relpath(os.path.join(dp,fn), OUT).replace(\"\\\\\",\"/\"))\n with open(DL_LOG_LOCAL, \"w\") as f:\n f.write(\"\\n\".join(sorted(done_set)) + (\"\\n\" if done_set else \"\"))\n if done_set: shutil.copy(DL_LOG_LOCAL, DL_LOG_DRIVE)\n print(f\"Dựng log xong: {len(done_set):,} ảnh.\")\n\ntodo = [r for r in all_rows if r[\"image_relpath\"] not in done_set]\nn_done = len(all_rows) - len(todo)\nprint(f\"Đã có : {n_done:,} / {len(all_rows):,} ({n_done/len(all_rows)*100:.1f}%)\")\nprint(f\"Cần tải: {len(todo):,}\")\n\nif not todo:\n print(\"\\nĐã tải đủ.\")\n flush_log_to_drive()\nelse:\n res = Counter({\"skip\": n_done})\n try:\n with ThreadPoolExecutor(max_workers=12) as ex:\n futs = [ex.submit(dl, r) for r in todo]\n for f in tqdm(as_completed(futs), total=len(todo), desc=\"downloading\"):\n res[f.result().split(\"(\")[0]] += 1\n finally:\n flush_log_to_drive()\n print(dict(res))\n if any(k.startswith(\"fail\") for k in res):\n print(\"Còn fail -> chạy lại cell này (chỉ tải phần thiếu).\")", "id": "51b13f2f", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": "### Kiểm tra ảnh còn thiếu", "id": "c53ce10a" }, { "cell_type": "code", "metadata": {}, "source": "# Đọc log thay vì stat 50k file qua Drive\nif DL_LOG_LOCAL.exists():\n done_set = set(l.strip() for l in open(DL_LOG_LOCAL) if l.strip())\nelif DL_LOG_DRIVE.exists():\n done_set = set(l.strip() for l in open(DL_LOG_DRIVE) if l.strip())\nelse:\n done_set = set()\n froot = OUT / \"files\"\n if froot.exists():\n for dp,_,fns in os.walk(froot):\n for fn in fns:\n if fn.endswith(\".jpg\"):\n done_set.add(os.path.relpath(os.path.join(dp,fn), OUT).replace(\"\\\\\",\"/\"))\n\nmiss = {sp: [] for sp in (\"train\",\"val\",\"test\")}\nfor row in all_rows:\n if row[\"image_relpath\"] not in done_set:\n miss[row[\"split\"]].append(row[\"study_name\"])\nprint(\"ảnh còn thiếu:\", {k: len(v) for k,v in miss.items()},\n \"| tổng:\", sum(len(v) for v in miss.values()))\nfor sp, names in miss.items():\n if names:\n print(f\" [{sp}] thiếu {len(names)}, vd: {names[:5]}\")", "id": "bed9ee2e", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": "## 3. Copy reports + vqa từ bundle vào package", "id": "23f67812" }, { "cell_type": "code", "metadata": {}, "source": "# Reports giữ nguyên cấu trúc bundle/reports/files/... -> OUT/files/...\nshutil.copytree(BUNDLE_DIR/\"reports\", OUT, dirs_exist_ok=True)\n\nfor sp in (\"train\",\"val\",\"test\"):\n shutil.copy(BUNDLE_DIR/\"vqa\"/VQA_OUT[sp], OUT/VQA_OUT[sp])\n shutil.copy(BUNDLE_DIR/f\"manifest_{sp}.json\", OUT/f\"manifest_{sp}.json\")\n src_csv = BUNDLE_DIR/f\"manifest_{sp}.csv\"\n if src_csv.exists():\n shutil.copy(src_csv, OUT/f\"manifest_{sp}.csv\")\n nv = len(json.load(open(OUT/VQA_OUT[sp], encoding=\"utf-8\")))\n print(f\"{sp}: vqa={nv:,} manifest copied\")\n\nn_rep = sum(1 for _ in (OUT/'files').rglob('s*.txt'))\nprint(f\"reports trong package: {n_rep:,}\")", "id": "8482caf4", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": "### Sanity check (1 lần os.walk)", "id": "9332c248" }, { "cell_type": "code", "metadata": {}, "source": "print(\"Quét cây thư mục 1 lần...\")\nimgs_on_disk, reps_on_disk = set(), set()\nfroot = OUT / \"files\"\nif froot.exists():\n for dp,_,fns in os.walk(froot):\n for fn in fns:\n rel = os.path.relpath(os.path.join(dp,fn), OUT).replace(\"\\\\\",\"/\")\n if fn.endswith(\".jpg\"): imgs_on_disk.add(rel)\n elif fn.endswith(\".txt\"): reps_on_disk.add(rel)\n\nfor sp in (\"train\",\"val\",\"test\"):\n rows = manifests[sp]\n ni = sum(1 for x in rows if x[\"image_relpath\"] in imgs_on_disk)\n nr = sum(1 for x in rows if x[\"report_relpath\"] in reps_on_disk)\n print(f\"{sp:5s} manifest={len(rows):,} images_ok={ni:,} reports_ok={nr:,}\")\nprint(\"\\nPackage:\", OUT)", "id": "93a3ef0e", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": "## 4. Upload Hugging Face — TAR SHARDS\n\nThay vì upload 50k file lẻ (chậm vì Drive FUSE + 50k HTTP request), **gom thành ~100 shard `.tar`** (~750 MB / shard, 500 study mỗi shard). Đọc tuần tự nhanh, upload ít file lớn → nhanh hơn **5–10 lần**.\n\n**Cấu trúc trên HF** (`hieu3636/cxr-vlm-data/MIMIC-CXR_processed/`):\n```\nshards/train-00000.tar ... train-000NN.tar ← mỗi tar chứa Study_X.jpg + Study_X.txt\nshards/val-00000.tar ... val-000NN.tar\nshards/test-00000.tar ... test-000NN.tar\nmanifest_train.json/.csv, vqa.json ... ← file nhỏ, upload riêng\n```\n\nKhi train: tải shard về, extract → có cây `Study_N.jpg` + `Study_N.txt`. Hoặc dùng `webdataset` PyTorch stream thẳng từ tar.", "id": "bb0581d4" }, { "cell_type": "code", "metadata": {}, "source": "RUN_HF = False # ← bật khi sẵn sàng\nSHARD_SIZE = 500 # studies / tar shard (~750 MB nếu ảnh ~1.5 MB)\nSHARDS_DIR = \"shards\" # thư mục con trong repo chứa tars\nSTAGE = Path(\"/content/_shards\")\nSTAGE.mkdir(parents=True, exist_ok=True)\n\nif RUN_HF:\n import tarfile\n from huggingface_hub import HfApi, CommitOperationAdd\n\n api = HfApi(token=HF_TOKEN)\n api.create_repo(HF_REPO_ID, repo_type=HF_REPO_TYPE, exist_ok=True)\n\n print(\"Đọc danh sách file đã có trên HF (resume)...\")\n try:\n existing = set(api.list_repo_files(repo_id=HF_REPO_ID, repo_type=HF_REPO_TYPE))\n except Exception:\n existing = set()\n print(f\" HF đã có : {len(existing):,} file\")\n\n # ── (a) Upload file nhỏ trước: manifest + vqa (gộp 1 commit) ─────────────\n small = []\n for sp in (\"train\",\"val\",\"test\"):\n for p in [OUT/f\"manifest_{sp}.json\", OUT/f\"manifest_{sp}.csv\", OUT/VQA_OUT[sp]]:\n if p.exists():\n rp = f\"{HF_PATH_IN_REPO}/{p.name}\"\n if rp not in existing:\n small.append((p, rp))\n if small:\n ops = [CommitOperationAdd(path_in_repo=rp, path_or_fileobj=str(p)) for p,rp in small]\n api.create_commit(repo_id=HF_REPO_ID, repo_type=HF_REPO_TYPE,\n operations=ops, commit_message=\"manifest + vqa\")\n print(f\" uploaded {len(small)} small files\")\n else:\n print(\" small files đã có sẵn\")\n\n # ── (b) Build + upload từng shard ────────────────────────────────────────\n def upload_shard(sp, sidx, shard_rows):\n tar_name = f\"{sp}-{sidx:05d}.tar\"\n tar_rp = f\"{HF_PATH_IN_REPO}/{SHARDS_DIR}/{tar_name}\"\n if tar_rp in existing:\n return \"skip\"\n tar_path = STAGE / tar_name\n # 1) build tar local\n with tarfile.open(tar_path, \"w\") as tf:\n for r in shard_rows:\n img = OUT / r[\"image_relpath\"]\n rep = OUT / r[\"report_relpath\"]\n if img.exists(): tf.add(img, arcname=f\"{r['study_name']}.jpg\")\n if rep.exists(): tf.add(rep, arcname=f\"{r['study_name']}.txt\")\n # 2) upload + retry\n for attempt in range(3):\n try:\n api.upload_file(path_or_fileobj=str(tar_path),\n path_in_repo=tar_rp,\n repo_id=HF_REPO_ID, repo_type=HF_REPO_TYPE,\n commit_message=f\"shard {sp}-{sidx:05d}\")\n tar_path.unlink(missing_ok=True)\n return \"ok\"\n except Exception as e:\n time.sleep(5*(attempt+1))\n last = str(e)[:200]\n tar_path.unlink(missing_ok=True)\n return f\"fail: {last}\"\n\n for sp in (\"train\",\"val\",\"test\"):\n rows = manifests[sp]\n n_shards = (len(rows) + SHARD_SIZE - 1) // SHARD_SIZE\n print(f\"\\n[{sp}] {len(rows):,} studies -> {n_shards} shards\")\n pbar = tqdm(range(n_shards), desc=f\"{sp:5s}\", unit=\"shard\")\n for sidx in pbar:\n shard_rows = rows[sidx*SHARD_SIZE : (sidx+1)*SHARD_SIZE]\n r = upload_shard(sp, sidx, shard_rows)\n if r.startswith(\"fail\"):\n pbar.write(f\" {sp}-{sidx:05d} FAIL: {r}\")\n\n print(\"\\ndone →\",\n f\"https://huggingface.co/{HF_REPO_TYPE}s/{HF_REPO_ID}/tree/main/{HF_PATH_IN_REPO}\")\nelse:\n print(\"RUN_HF=False — bật True để build+push tar shards.\")", "id": "844bd22d", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": "## 5. (Tùy chọn) Zip backup", "id": "23ccb986" }, { "cell_type": "code", "metadata": {}, "source": "RUN_ZIP = False\nif RUN_ZIP:\n shutil.make_archive(\"/content/MIMIC-CXR_processed\", \"zip\", OUT)\n shutil.copy(\"/content/MIMIC-CXR_processed.zip\", DRIVE/\"MIMIC-CXR_processed.zip\")\n print(\"zipped -> Drive/MIMIC-CXR_processed.zip\")\nelse:\n print(\"RUN_ZIP=False\")", "id": "5595d4ab", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "metadata": {}, "source": "## 6. Summary", "id": "4c7df7e7" }, { "cell_type": "code", "metadata": {}, "source": "try:\n _imgs = imgs_on_disk\nexcept NameError:\n _imgs = set()\n if (OUT/\"files\").exists():\n for dp,_,fns in os.walk(OUT/\"files\"):\n for fn in fns:\n if fn.endswith(\".jpg\"):\n _imgs.add(os.path.relpath(os.path.join(dp,fn), OUT).replace(\"\\\\\",\"/\"))\nprint(\"=\"*54)\nprint(\" PHASE 2 (COLAB) DONE\")\nprint(\"=\"*54)\nfor sp in (\"train\",\"val\",\"test\"):\n rows = manifests[sp]\n ni = sum(1 for x in rows if x[\"image_relpath\"] in _imgs)\n print(f\" {sp:5s} images={ni:,}/{len(rows):,}\")\nprint(f\" package: {OUT}\")\nprint(\"=\"*54)", "id": "e7ab8417", "execution_count": null, "outputs": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.10.0" } }, "nbformat": 4, "nbformat_minor": 5 }