griffingoodwin04 commited on
Commit
295ffa5
·
1 Parent(s): 8ca55bc

Add parquet_to_npy utility for converting local HuggingFace parquet files to .npy; update pipeline configuration and README

Browse files
README.md CHANGED
@@ -58,7 +58,8 @@ FOXES
58
  │ ├── download_sdo.py # Download SDO/AIA EUV images from JSOC
59
  │ ├── sxr_downloader.py # Download GOES SXR flux data
60
  │ ├── hugging_face_data_download.py # Download pre-processed data from HuggingFace Hub
61
- ── hf_download_config.yaml # Config for HuggingFace downloader
 
62
  ├── forecasting # Model training and inference
63
  │ ├── data_loaders
64
  │ │ ├── SDOAIA_dataloader.py # PyTorch Lightning DataModule for AIA+SXR
@@ -115,6 +116,7 @@ FOXES uses a single orchestrator script (`run_pipeline.py`) and a top-level conf
115
  | # | Step | Description |
116
  |---|------|--------------------------------------------------------------------------------|
117
  | 0 | `hf_download` | Download pre-processed, pre-split data from HuggingFace *(replaces steps 1–6)* |
 
118
  | 1 | `download_aia` | Download SDO/AIA EUV images from JSOC |
119
  | 2 | `download_sxr` | Download GOES SXR flux data |
120
  | 3 | `combine_sxr` | Combine raw GOES `.nc` files into per-satellite CSVs |
@@ -138,6 +140,9 @@ python run_pipeline.py --config pipeline_config.yaml --steps all
138
  # Quick-start: download pre-processed data from HuggingFace, then train
139
  python run_pipeline.py --config pipeline_config.yaml --steps hf_download,train,inference,evaluate
140
 
 
 
 
141
  # Run specific steps
142
  python run_pipeline.py --config pipeline_config.yaml --steps train,inference,evaluate
143
 
@@ -187,6 +192,32 @@ Run the downloader standalone:
187
  python download/hugging_face_data_download.py --config download/hf_download_config.yaml
188
  ```
189
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190
  ### Configuration
191
 
192
  Edit `pipeline_config.yaml` to set data paths, date ranges, and hyperparameters. Each step has its own section, and an `overrides` block lets you override values from the step's base config without editing it directly.
 
58
  │ ├── download_sdo.py # Download SDO/AIA EUV images from JSOC
59
  │ ├── sxr_downloader.py # Download GOES SXR flux data
60
  │ ├── hugging_face_data_download.py # Download pre-processed data from HuggingFace Hub
61
+ ── parquet_to_npy.py # Convert locally-downloaded HF parquet files to .npy
62
+ │ └── hf_download_config.yaml # Config for HuggingFace downloader and parquet_to_npy
63
  ├── forecasting # Model training and inference
64
  │ ├── data_loaders
65
  │ │ ├── SDOAIA_dataloader.py # PyTorch Lightning DataModule for AIA+SXR
 
116
  | # | Step | Description |
117
  |---|------|--------------------------------------------------------------------------------|
118
  | 0 | `hf_download` | Download pre-processed, pre-split data from HuggingFace *(replaces steps 1–6)* |
119
+ | 0b | `parquet_to_npy` | Convert already-downloaded HF parquet files to `.npy` *(skips network download)* |
120
  | 1 | `download_aia` | Download SDO/AIA EUV images from JSOC |
121
  | 2 | `download_sxr` | Download GOES SXR flux data |
122
  | 3 | `combine_sxr` | Combine raw GOES `.nc` files into per-satellite CSVs |
 
140
  # Quick-start: download pre-processed data from HuggingFace, then train
141
  python run_pipeline.py --config pipeline_config.yaml --steps hf_download,train,inference,evaluate
142
 
143
+ # Already have parquet files locally? Convert them to .npy, then train
144
+ python run_pipeline.py --config pipeline_config.yaml --steps parquet_to_npy,train,inference,evaluate
145
+
146
  # Run specific steps
147
  python run_pipeline.py --config pipeline_config.yaml --steps train,inference,evaluate
148
 
 
192
  python download/hugging_face_data_download.py --config download/hf_download_config.yaml
193
  ```
194
 
195
+ ### Converting Local Parquet Files to .npy
196
+
197
+ If you've already downloaded the HuggingFace parquet files (e.g., via `huggingface-cli` or the HF web UI), use `parquet_to_npy.py` to convert them directly — no network connection needed. The output is identical to what `hf_download` produces.
198
+
199
+ ```bash
200
+ # All splits at once — parquet_root should contain train/, validation/, test/ subdirs
201
+ python download/parquet_to_npy.py \
202
+ --parquet_root /path/to/parquet \
203
+ --config download/hf_download_config.yaml
204
+
205
+ # Single split
206
+ python download/parquet_to_npy.py \
207
+ --parquet_dir /path/to/parquet/train \
208
+ --split train \
209
+ --aia_dir /Volumes/T9/AIA_hg_processed \
210
+ --sxr_dir /Volumes/T9/SXR_hg_processed
211
+ ```
212
+
213
+ Configure it via `pipeline_config.yaml` to use it as a pipeline step:
214
+
215
+ ```yaml
216
+ parquet_to_npy:
217
+ config: "download/hf_download_config.yaml" # provides aia_dir, sxr_dir, num_workers
218
+ parquet_root: "/path/to/your/parquet" # dir with train/, validation/, test/ subdirs
219
+ ```
220
+
221
  ### Configuration
222
 
223
  Edit `pipeline_config.yaml` to set data paths, date ranges, and hyperparameters. Each step has its own section, and an `overrides` block lets you override values from the step's base config without editing it directly.
download/parquet_to_npy.py ADDED
@@ -0,0 +1,182 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Convert Local HuggingFace Parquet Files to .npy
3
+ ================================================
4
+ Same output layout as hugging_face_data_download.py, but reads from
5
+ parquet files you've already downloaded instead of streaming from HF.
6
+
7
+ Expected parquet columns: filename, aia_stack, sxr_value
8
+
9
+ Usage:
10
+ # Convert one split at a time (parquet files flat in a directory)
11
+ python download/parquet_to_npy.py \\
12
+ --parquet_dir /path/to/parquet/train \\
13
+ --split train \\
14
+ --config download/hf_download_config.yaml
15
+
16
+ # Or specify output dirs directly
17
+ python download/parquet_to_npy.py \\
18
+ --parquet_dir /path/to/parquet/validation \\
19
+ --split validation \\
20
+ --aia_dir /Volumes/T9/AIA_hg_processed \\
21
+ --sxr_dir /Volumes/T9/SXR_hg_processed
22
+
23
+ # Auto-discover split subdirs (train/, validation/, test/) under a root
24
+ python download/parquet_to_npy.py \\
25
+ --parquet_root /path/to/parquet \\
26
+ --config download/hf_download_config.yaml
27
+ """
28
+
29
+ import argparse
30
+ import os
31
+ import sys
32
+ import time
33
+ from concurrent.futures import ThreadPoolExecutor, as_completed
34
+ from pathlib import Path
35
+
36
+ import numpy as np
37
+ import pyarrow.parquet as pq
38
+ import yaml
39
+
40
+
41
+ HF_TO_LOCAL = {"validation": "val"}
42
+
43
+
44
+ def load_config(path: str) -> dict:
45
+ with open(path) as f:
46
+ return yaml.safe_load(f)
47
+
48
+
49
+ def _write_arrays(filename: str, aia_arr: np.ndarray, sxr_arr: np.ndarray,
50
+ aia_split_dir: str, sxr_split_dir: str) -> bool:
51
+ """Save arrays to disk. Returns True if written, False if already exists."""
52
+ aia_path = os.path.join(aia_split_dir, filename)
53
+ sxr_path = os.path.join(sxr_split_dir, filename)
54
+
55
+ if os.path.exists(aia_path) and os.path.exists(sxr_path):
56
+ return False
57
+
58
+ np.save(aia_path, aia_arr)
59
+ np.save(sxr_path, sxr_arr)
60
+ return True
61
+
62
+
63
+ def convert_split(parquet_dir: str, hf_split: str, aia_base: str, sxr_base: str,
64
+ num_workers: int = 8, print_every: int = 500):
65
+ local_split = HF_TO_LOCAL.get(hf_split, hf_split)
66
+ aia_split_dir = os.path.join(aia_base, local_split)
67
+ sxr_split_dir = os.path.join(sxr_base, local_split)
68
+ os.makedirs(aia_split_dir, exist_ok=True)
69
+ os.makedirs(sxr_split_dir, exist_ok=True)
70
+
71
+ parquet_files = sorted(Path(parquet_dir).glob("*.parquet"))
72
+ if not parquet_files:
73
+ print(f"No parquet files found in {parquet_dir}", file=sys.stderr)
74
+ return
75
+
76
+ print(f"\n{'='*50}")
77
+ print(f"Converting split: {hf_split} -> local dir: {local_split}")
78
+ print(f"{'='*50}")
79
+ print(f" Parquet dir: {parquet_dir} ({len(parquet_files)} files)")
80
+ print(f" AIA -> {aia_split_dir}")
81
+ print(f" SXR -> {sxr_split_dir}")
82
+
83
+ saved = skipped = submitted = 0
84
+ start = time.time()
85
+
86
+ with ThreadPoolExecutor(max_workers=num_workers) as pool:
87
+ futures = {}
88
+
89
+ for pq_file in parquet_files:
90
+ table = pq.read_table(pq_file, columns=["filename", "aia_stack", "sxr_value"])
91
+
92
+ for i in range(len(table)):
93
+ row = table.slice(i, 1)
94
+ filename = row["filename"][0].as_py()
95
+ aia_arr = np.array(row["aia_stack"][0].as_py(), dtype=np.float32)
96
+ sxr_arr = np.array(row["sxr_value"][0].as_py(), dtype=np.float32)
97
+
98
+ fut = pool.submit(_write_arrays, filename, aia_arr, sxr_arr,
99
+ aia_split_dir, sxr_split_dir)
100
+ futures[fut] = submitted
101
+ submitted += 1
102
+
103
+ if submitted % print_every == 0:
104
+ done = [f for f in futures if f.done()]
105
+ for f in done:
106
+ if f.result():
107
+ saved += 1
108
+ else:
109
+ skipped += 1
110
+ del futures[f]
111
+
112
+ elapsed = time.time() - start
113
+ rate = submitted / elapsed if elapsed > 0 else 0
114
+ print(
115
+ f"[{hf_split}] submitted={submitted} | saved={saved} skipped={skipped} | "
116
+ f"{rate:.1f} rows/sec",
117
+ flush=True,
118
+ )
119
+
120
+ for fut in as_completed(futures):
121
+ if fut.result():
122
+ saved += 1
123
+ else:
124
+ skipped += 1
125
+
126
+ elapsed = time.time() - start
127
+ print(f"[{hf_split}] Done — {saved} saved, {skipped} skipped | {elapsed/60:.1f} min")
128
+
129
+
130
+ def main():
131
+ parser = argparse.ArgumentParser(
132
+ description="Convert locally-downloaded HF parquet files to .npy arrays"
133
+ )
134
+ parser.add_argument("--config", type=str, default=None,
135
+ help="Path to hf_download_config.yaml (provides aia_dir, sxr_dir, num_workers)")
136
+ parser.add_argument("--aia_dir", type=str, default=None,
137
+ help="Output base dir for AIA .npy files (overrides config)")
138
+ parser.add_argument("--sxr_dir", type=str, default=None,
139
+ help="Output base dir for SXR .npy files (overrides config)")
140
+ parser.add_argument("--parquet_dir", type=str, default=None,
141
+ help="Directory containing parquet files for a single split")
142
+ parser.add_argument("--split", type=str, default=None,
143
+ help="Split name for --parquet_dir (train, validation, test)")
144
+ parser.add_argument("--parquet_root", type=str, default=None,
145
+ help="Root dir with split subdirs (train/, validation/, test/)")
146
+ parser.add_argument("--splits", type=str, default="train,validation,test",
147
+ help="Comma-separated splits to process when using --parquet_root")
148
+ parser.add_argument("--num_workers", type=int, default=None,
149
+ help="Parallel write threads (default: from config or 8)")
150
+ parser.add_argument("--print_every", type=int, default=500,
151
+ help="Log progress every N rows")
152
+ args = parser.parse_args()
153
+
154
+ cfg = load_config(args.config) if args.config else {}
155
+
156
+ aia_dir = args.aia_dir or cfg.get("aia_dir")
157
+ sxr_dir = args.sxr_dir or cfg.get("sxr_dir")
158
+ num_workers = args.num_workers or cfg.get("num_workers", 8)
159
+
160
+ if not aia_dir or not sxr_dir:
161
+ parser.error("Provide --aia_dir and --sxr_dir, or --config with those keys set.")
162
+
163
+ if args.parquet_root:
164
+ splits = [s.strip() for s in args.splits.split(",")]
165
+ for split in splits:
166
+ split_dir = os.path.join(args.parquet_root, split)
167
+ if not os.path.isdir(split_dir):
168
+ print(f"[warn] Split dir not found, skipping: {split_dir}")
169
+ continue
170
+ convert_split(split_dir, split, aia_dir, sxr_dir, num_workers, args.print_every)
171
+ elif args.parquet_dir:
172
+ if not args.split:
173
+ parser.error("--split is required when using --parquet_dir")
174
+ convert_split(args.parquet_dir, args.split, aia_dir, sxr_dir, num_workers, args.print_every)
175
+ else:
176
+ parser.error("Provide either --parquet_dir + --split, or --parquet_root")
177
+
178
+ print("\nDone.")
179
+
180
+
181
+ if __name__ == "__main__":
182
+ main()
pipeline_config.yaml CHANGED
@@ -24,6 +24,24 @@ checkpoint: "/Users/griffingoodwin/Downloads/FOXES_Model_Checkpoint.ckpt"
24
  hf_download:
25
  config: "download/hf_download_config.yaml"
26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
27
  # -----------------------------------------------------------------------------
28
  # Shared date range (used by download_aia and download_sxr)
29
  # -----------------------------------------------------------------------------
 
24
  hf_download:
25
  config: "download/hf_download_config.yaml"
26
 
27
+ # -----------------------------------------------------------------------------
28
+ # Local parquet → .npy (step: parquet_to_npy)
29
+ # Use this if you've already downloaded HF parquet files and want to skip the
30
+ # network step. Point parquet_root at a directory with split subdirs
31
+ # (train/, validation/, test/) or use parquet_dir + split for a single split.
32
+ # aia_dir / sxr_dir default to the values in hf_download_config.yaml if
33
+ # --config is also provided; override here to use different paths.
34
+ # -----------------------------------------------------------------------------
35
+ parquet_to_npy:
36
+ config: "download/hf_download_config.yaml" # provides aia_dir, sxr_dir, num_workers
37
+ parquet_root: "" # root dir containing train/, validation/, test/ subdirs
38
+ # parquet_dir: "" # alternative: single split dir (also set split: below)
39
+ # split: "train"
40
+ # aia_dir: "${base_dir}/AIA_hg_processed" # override config if needed
41
+ # sxr_dir: "${base_dir}/SXR_hg_processed"
42
+ # splits: "train,validation,test" # which subdirs to process
43
+ # num_workers: 8
44
+
45
  # -----------------------------------------------------------------------------
46
  # Shared date range (used by download_aia and download_sxr)
47
  # -----------------------------------------------------------------------------
requirements.txt CHANGED
@@ -36,4 +36,6 @@ imageio-ffmpeg
36
  # Utilities
37
  tqdm
38
  wandb
39
- PyYAML
 
 
 
36
  # Utilities
37
  tqdm
38
  wandb
39
+ PyYAML
40
+ huggingface_hub
41
+ datasets
run_pipeline.py CHANGED
@@ -106,6 +106,7 @@ def write_merged_config(base_path: str, overrides: dict, out_name: str) -> Path:
106
 
107
  STEP_ORDER = [
108
  "hf_download",
 
109
  "download_aia",
110
  "download_sxr",
111
  "combine_sxr",
@@ -125,6 +126,10 @@ STEP_INFO = {
125
  "description": "Download processed+split AIA/SXR data from HuggingFace Hub (replaces download→preprocess→split)",
126
  "script": ROOT / "download" / "hugging_face_data_download.py",
127
  },
 
 
 
 
128
  "download_aia": {
129
  "description": "Download SDO/AIA EUV images from JSOC",
130
  "script": ROOT / "download" / "download_sdo.py",
@@ -200,6 +205,31 @@ def build_commands(step: str, cfg: dict, force: bool) -> list[list[str]] | None:
200
  config_path = hf.get("config", "download/hf_download_config.yaml")
201
  return [[sys.executable, str(STEP_INFO[step]["script"]), "--config", config_path]]
202
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
  if step == "download_aia":
204
  if not require(["download_dir", "email"], "aia") or not require(["start_date"]):
205
  return None
 
106
 
107
  STEP_ORDER = [
108
  "hf_download",
109
+ "parquet_to_npy",
110
  "download_aia",
111
  "download_sxr",
112
  "combine_sxr",
 
126
  "description": "Download processed+split AIA/SXR data from HuggingFace Hub (replaces download→preprocess→split)",
127
  "script": ROOT / "download" / "hugging_face_data_download.py",
128
  },
129
+ "parquet_to_npy": {
130
+ "description": "Convert locally-downloaded HF parquet files to .npy (skips network download)",
131
+ "script": ROOT / "download" / "parquet_to_npy.py",
132
+ },
133
  "download_aia": {
134
  "description": "Download SDO/AIA EUV images from JSOC",
135
  "script": ROOT / "download" / "download_sdo.py",
 
205
  config_path = hf.get("config", "download/hf_download_config.yaml")
206
  return [[sys.executable, str(STEP_INFO[step]["script"]), "--config", config_path]]
207
 
208
+ if step == "parquet_to_npy":
209
+ p2n = cfg.get("parquet_to_npy", {})
210
+ cmd = [sys.executable, str(STEP_INFO[step]["script"])]
211
+ if p2n.get("config"):
212
+ cmd += ["--config", p2n["config"]]
213
+ if p2n.get("parquet_root"):
214
+ cmd += ["--parquet_root", p2n["parquet_root"]]
215
+ elif p2n.get("parquet_dir"):
216
+ if not p2n.get("split"):
217
+ log.error("pipeline_config.yaml parquet_to_npy.split is required when parquet_dir is set")
218
+ return None
219
+ cmd += ["--parquet_dir", p2n["parquet_dir"], "--split", p2n["split"]]
220
+ else:
221
+ log.error("pipeline_config.yaml parquet_to_npy requires parquet_root or parquet_dir")
222
+ return None
223
+ if p2n.get("aia_dir"):
224
+ cmd += ["--aia_dir", p2n["aia_dir"]]
225
+ if p2n.get("sxr_dir"):
226
+ cmd += ["--sxr_dir", p2n["sxr_dir"]]
227
+ if p2n.get("splits"):
228
+ cmd += ["--splits", p2n["splits"]]
229
+ if p2n.get("num_workers"):
230
+ cmd += ["--num_workers", str(p2n["num_workers"])]
231
+ return [cmd]
232
+
233
  if step == "download_aia":
234
  if not require(["download_dir", "email"], "aia") or not require(["start_date"]):
235
  return None