File size: 8,327 Bytes
ed59144
 
 
 
ab68c56
ed59144
ab68c56
 
 
 
 
 
 
 
 
 
 
 
 
 
ed59144
 
 
ab68c56
 
ed59144
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab68c56
ed59144
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab68c56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed59144
 
 
 
 
 
 
 
ab68c56
 
 
 
ed59144
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab68c56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed59144
 
 
 
 
ab68c56
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed59144
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#!/usr/bin/env python3
"""run_dataset.py โ€” Main entry point for processing a dataset through GrandLine.

Usage:
    # Process local parquet files
    python scripts/run_dataset.py --config configs/datasets/fineweb_edu.yaml

    # Download and process specific HF parquet shards (selective!)
    python scripts/run_dataset.py --config configs/datasets/fineweb_edu.yaml \
        --hf-files "data/CC-MAIN-2024-10/000_00000.parquet" \
                   "data/CC-MAIN-2024-10/000_00001.parquet"

    # Download first N shards from a specific subdirectory
    python scripts/run_dataset.py --config configs/datasets/fineweb_edu.yaml \
        --hf-subdir "data/CC-MAIN-2024-10" --max-shards 5

    # With executor tuning
    python scripts/run_dataset.py --config configs/datasets/fineweb_edu.yaml \
        --executor configs/executors/kaggle.yaml \
        --hf-subdir "data/CC-MAIN-2024-10" --max-shards 3

Workflow:
    1. Load dataset config (+ global + executor configs)
    2. Resolve input shards (local paths OR selective HF download)
    3. Build the appropriate pipeline from trust level and dataset type
    4. Process each shard independently (resumable)
    5. Write packed parquet artifacts + manifests
"""

from __future__ import annotations

import sys
from pathlib import Path

import click

# Add src to path for development mode
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))

from grandline.dedup_store import DedupStore
from grandline.io import download_hf_parquet_files, list_hf_parquet_shards
from grandline.pipelines.code import build_code_pipeline
from grandline.pipelines.curated_web import build_curated_web_pipeline
from grandline.pipelines.math import build_math_pipeline
from grandline.pipelines.papers import build_papers_pipeline
from grandline.pipelines.synthetic import build_synthetic_pipeline
from grandline.runtime import Runtime, RuntimeConfig
from grandline.util.config import load_dataset_config, validate_dataset_config
from grandline.util.logging import get_logger
from grandline.util.paths import ProjectPaths

logger = get_logger("run_dataset")

# Pipeline type โ†’ builder function
PIPELINE_BUILDERS = {
    "curated_web": build_curated_web_pipeline,
    "code": build_code_pipeline,
    "math": build_math_pipeline,
    "papers": build_papers_pipeline,
    "synthetic": build_synthetic_pipeline,
}


@click.command()
@click.option(
    "--config", "config_path", required=True, help="Path to dataset config YAML."
)
@click.option(
    "--global-config",
    default="configs/global.yaml",
    help="Path to global config YAML.",
)
@click.option("--executor", default=None, help="Path to executor config YAML.")
@click.option("--base-dir", default=".", help="Project base directory.")
@click.option("--dry-run", is_flag=True, help="Only show what would be done.")
@click.option("--no-resume", is_flag=True, help="Reprocess all shards from scratch.")
@click.option(
    "--hf-files",
    multiple=True,
    help="Specific parquet file paths within the HF repo to download and process.",
)
@click.option(
    "--hf-subdir",
    default=None,
    help="HF repo subdirectory to discover shards from (e.g. 'data/CC-MAIN-2024-10').",
)
@click.option(
    "--max-shards",
    default=None,
    type=int,
    help="Maximum number of shards to download from --hf-subdir.",
)
@click.option(
    "--list-shards",
    is_flag=True,
    help="List available shards in the HF repo and exit (no processing).",
)
@click.argument("overrides", nargs=-1)
def main(
    config_path: str,
    global_config: str,
    executor: str | None,
    base_dir: str,
    dry_run: bool,
    no_resume: bool,
    hf_files: tuple[str, ...],
    hf_subdir: str | None,
    max_shards: int | None,
    list_shards: bool,
    overrides: tuple[str, ...],
) -> None:
    """Process a dataset through the GrandLine pipeline."""
    # Setup paths
    paths = ProjectPaths(base_dir)
    paths.ensure_all()

    # Load config
    global_path = Path(global_config)
    global_cfg = str(global_path) if global_path.exists() else None

    config = load_dataset_config(
        config_path=config_path,
        global_config_path=global_cfg,
        executor_config_path=executor,
        overrides=list(overrides),
    )

    # Validate
    errors = validate_dataset_config(config)
    if errors:
        for err in errors:
            logger.error(f"Config error: {err}")
        sys.exit(1)

    dataset_name = config["name"]
    pipeline_type = config.get("pipeline_type", "curated_web")
    source_cfg = config.get("source", {})
    repo_id = source_cfg.get("repo")

    # Handle --list-shards: just show what's available and exit
    if list_shards:
        if not repo_id:
            logger.error("Cannot list shards: no source.repo in config")
            sys.exit(1)
        subdir = hf_subdir or "data"
        shards = list_hf_parquet_shards(repo_id, subdir=subdir)
        print(f"\nAvailable shards in {repo_id}/{subdir}:")
        print(f"{'โ”€' * 60}")
        for s in shards:
            if s.get("is_dir"):
                print(f"  ๐Ÿ“ {s['path']}/")
            else:
                size_gb = s["size_bytes"] / 1e9
                print(f"  ๐Ÿ“„ {s['path']} ({size_gb:.2f} GB)")
        print(f"\nTotal: {len(shards)} entries")
        return

    logger.info(f"Dataset: {dataset_name}")
    logger.info(f"Pipeline type: {pipeline_type}")
    logger.info(f"Trust level: {config.get('trust_level', 0)}")

    # Resolve input paths
    input_paths: list[str] = []

    if hf_files or hf_subdir:
        # Download specific HF parquet files
        if not repo_id:
            logger.error("Cannot download HF files: no source.repo in config")
            sys.exit(1)

        if hf_files:
            logger.info(f"Downloading {len(hf_files)} specific shard(s) from {repo_id}...")
            local_paths = download_hf_parquet_files(
                repo_id=repo_id,
                file_patterns=list(hf_files),
            )
        else:
            logger.info(
                f"Discovering shards in {repo_id}/{hf_subdir}"
                f"{f' (max {max_shards})' if max_shards else ''}..."
            )
            local_paths = download_hf_parquet_files(
                repo_id=repo_id,
                subdir=hf_subdir,
                max_files=max_shards,
            )

        input_paths = local_paths
        logger.info(f"Downloaded {len(input_paths)} shard(s)")
    else:
        # Use local paths from config
        input_paths = source_cfg.get("paths", [])
        if isinstance(input_paths, str):
            input_paths = [input_paths]

    if not input_paths:
        logger.error(
            "No input paths. Use --hf-files, --hf-subdir, or set source.paths in config."
        )
        sys.exit(1)

    # Build pipeline
    if pipeline_type not in PIPELINE_BUILDERS:
        logger.error(
            f"Unknown pipeline type: {pipeline_type}. "
            f"Available: {list(PIPELINE_BUILDERS.keys())}"
        )
        sys.exit(1)

    # Initialize dedup store
    dedup_store = DedupStore(paths.dedup_db_path)

    try:
        builder = PIPELINE_BUILDERS[pipeline_type]
        pipeline = builder(config, dedup_store)

        logger.info(f"Pipeline: {pipeline}")
        logger.info(f"Pipeline fingerprint: {pipeline.fingerprint[:16]}...")

        # Build runtime config
        rt_config = RuntimeConfig(
            output_dir=str(paths.dataset_output_dir(dataset_name)),
            state_dir=str(paths.state_dir / dataset_name),
            num_workers=config.get("executor", {}).get("num_workers", 1),
            resume=not no_resume,
            dry_run=dry_run,
        )

        # Run
        runtime = Runtime(config=rt_config, pipeline=pipeline)
        manifests = runtime.run(input_paths)

        # Summary
        if manifests:
            total_seqs = sum(m.num_output_sequences for m in manifests)
            total_time = sum(m.processing_time_seconds for m in manifests)
            logger.info(
                f"Done: {len(manifests)} shards, "
                f"{total_seqs} sequences, "
                f"{total_time:.1f}s total"
            )
        elif not dry_run:
            logger.info("No shards to process (all completed or none found).")

    finally:
        dedup_store.close()


if __name__ == "__main__":
    main()