"""Modal app for distributed dataset generation.""" import modal app = modal.App("gazet-dataset") VOLUME_MOUNT = "/data" INTERMEDIATE_MOUNT = "/intermediate" volume = modal.Volume.from_name("gazet-data", create_if_missing=True) intermediate_volume = modal.Volume.from_name( "gazet-intermediate", create_if_missing=True ) image = ( modal.Image.debian_slim(python_version="3.12") .pip_install( "duckdb>=1.4.4", "dspy>=3.1.3", "fastapi>=0.100", "pandas>=2.2", "pydantic>=2.0", "pyarrow>=17.0.0", "pyyaml>=6.0", ) .env( { "GAZET_DATA_DIR": VOLUME_MOUNT, "PYTHONPATH": "/root", # Spatial self-joins are much more stable with conservative # DuckDB settings inside Modal containers. "GAZET_DUCKDB_THREADS": "1", "GAZET_DUCKDB_MEMORY_LIMIT": "20GB", } ) .add_local_dir("src/gazet", "/root/gazet") .add_local_dir("dataset", "/root/dataset") ) @app.function( image=image, volumes={VOLUME_MOUNT: volume, INTERMEDIATE_MOUNT: intermediate_volume}, timeout=300, cpu=2, memory=4096, ) def build_inventory_remote(): """Build entity inventory from parquet files on the volume.""" from pathlib import Path from dataset.scripts.build_inventory import build_inventory_to_dir result = build_inventory_to_dir(Path(INTERMEDIATE_MOUNT)) intermediate_volume.commit() return result @app.function( image=image, volumes={VOLUME_MOUNT: volume, INTERMEDIATE_MOUNT: intermediate_volume}, timeout=7200, cpu=2, memory=65536, ) def build_relation_remote(relation_type: str, countries: list, limit: int): """Compute one relation type and save to intermediate volume.""" from pathlib import Path from dataset.scripts.build_relations import compute_single_relation count = compute_single_relation( relation_type=relation_type, countries=countries, limit=limit, output_dir=Path(INTERMEDIATE_MOUNT), ) intermediate_volume.commit() return {"relation_type": relation_type, "count": count} @app.function( image=image, volumes={VOLUME_MOUNT: volume, INTERMEDIATE_MOUNT: intermediate_volume}, timeout=7200, cpu=2, memory=4096, ) def generate_batch_remote(work_items: list) -> list: """Process a batch of work items on a Modal container.""" from dataset.scripts.generate_samples import generate_batch_core results = generate_batch_core( work_items=work_items, intermediate_dir=INTERMEDIATE_MOUNT, ) print(f"Batch complete: {sum(1 for r in results if r['sample'])} success / " f"{sum(1 for r in results if not r['sample'])} failed out of {len(work_items)}") return results @app.local_entrypoint() def run_pipeline( config_path: str = "dataset/config.yaml", num_containers: int = 0, skip_inventory: bool = False, skip_relations: bool = False, fresh: bool = False, ): """Run the full distributed pipeline.""" import yaml from pathlib import Path config = yaml.safe_load(Path(config_path).read_text()) countries = config["countries"] sample_targets = config["sample_targets"] modal_cfg = config.get("modal", {}) n_containers = num_containers or modal_cfg.get("num_containers", 50) retry_multiplier = config["generation"]["retry_multiplier"] print(f"Countries: {countries}") print(f"Sample targets: {sample_targets}") print(f"Containers: {n_containers}") if not skip_inventory: print("Building inventory...") result = build_inventory_remote.remote() print(f" Inventory: {result}") if not skip_relations: print("Building relations...") from dataset.scripts.cli import calculate_relation_limits relation_needs = calculate_relation_limits(config) # Global containment-style relations are the most expensive and don't # need extremely large anchor tables to support sample generation. if countries == ["all"]: for rel_type, cap in { "containment": 12000, "coastal_containment": 8000, "landlocked_containment": 8000, }.items(): if rel_type in relation_needs: relation_needs[rel_type] = min(relation_needs[rel_type], cap) # Spatial relation builds are the most crash-prone part of the Modal # pipeline. Run them sequentially with conservative DuckDB settings # rather than fanning out several large native spatial joins at once. # common_neighbor still runs after adjacency because it depends on the # adjacency parquet being committed first. ordered_relations = [ rel_type for rel_type in ( "adjacency", "containment", "intersection", "cross_source", "coastal_containment", "landlocked_containment", "common_neighbor", ) if rel_type in relation_needs ] for rel_type in ordered_relations: limit = max(relation_needs[rel_type], 500) print(f" building {rel_type} (limit={limit})...") result = build_relation_remote.remote(rel_type, countries, limit) print(f" {rel_type}: {result['count']} pairs") print(f"Generating samples across {n_containers} containers...") import json from dataset.scripts.generate_samples import prepare_work_items output_dir = Path("dataset/output") output_dir.mkdir(exist_ok=True, parents=True) output_file = output_dir / "dataset_raw.jsonl" existing_samples = [] sample_counter = 1 if not fresh and output_file.exists(): with open(output_file) as f: for line in f: if line.strip(): existing_samples.append(json.loads(line)) if existing_samples: max_id = max( int(s["id"].split("_")[1]) for s in existing_samples if s["id"].startswith("sample_") ) sample_counter = max_id + 1 print(f" Appending to {len(existing_samples)} existing samples") work_items = prepare_work_items( target_counts=sample_targets, retry_multiplier=retry_multiplier, start_counter=sample_counter, intermediate_dir_str="", ) total_work = len(work_items) print(f" Total work items: {total_work}") batch_size = max(1, (total_work + n_containers - 1) // n_containers) batches = [ work_items[i : i + batch_size] for i in range(0, total_work, batch_size) ] print(f" Batches: {len(batches)} x ~{batch_size} items") new_sample_count = 0 failed_batches = 0 family_progress = {} write_mode = "w" if fresh else "a" fout = open(output_file, write_mode) try: for batch_results in generate_batch_remote.map( batches, return_exceptions=True ): if isinstance(batch_results, Exception): failed_batches += 1 print(f" Batch failed: {batch_results}") continue batch_samples = [] for r in batch_results: fam = r["family"] if fam not in family_progress: family_progress[fam] = {"success": 0, "failed": 0} if r["sample"]: batch_samples.append(r["sample"]) family_progress[fam]["success"] += 1 else: family_progress[fam]["failed"] += 1 for sample in batch_samples: fout.write(json.dumps(sample) + "\n") fout.flush() new_sample_count += len(batch_samples) done = sum(p["success"] + p["failed"] for p in family_progress.values()) print(f" Progress: {done}/{total_work} items | {new_sample_count} saved | {failed_batches} batch errors") except Exception as e: print(f" Map interrupted: {e}") finally: fout.close() print(f"\nResults by family:") for fam in sorted(family_progress.keys()): s = family_progress[fam]["success"] f = family_progress[fam]["failed"] total = s + f rate = (s / total * 100) if total > 0 else 0 target = sample_targets.get(fam, 0) print( f" {fam:20s}: {s:4d} success / {f:4d} failed " f"({rate:5.1f}%, target: {target})" ) total_samples = len(existing_samples) + new_sample_count status = "COMPLETE" if failed_batches == 0 else "PARTIAL" print(f"\nGeneration {status}: {new_sample_count} new, {total_samples} total") if failed_batches: print(f" Failed batches: {failed_batches}/{len(batches)}") print(f" Output: {output_file}") @app.local_entrypoint() def upload_data(data_dir: str = "data"): """Upload only normalized parquet datasets to the Modal volume. The runtime config prefers these normalized copies, so there is no need to upload the original raw source trees to Modal. """ from pathlib import Path data_path = Path(data_dir) if not data_path.exists(): print(f"Error: {data_path} does not exist") return upload_specs = [ ( data_path / "overture_normalized" / "divisions_area", "/overture_normalized/divisions_area", True, ), ( data_path / "natural_earth_normalized" / "ne_geography.parquet", "/natural_earth_normalized/ne_geography.parquet", False, ), ] missing = [] files_to_upload: list[tuple[Path, str]] = [] total_size = 0 for local_path, remote_path, is_dir in upload_specs: if not local_path.exists(): missing.append(local_path) continue if is_dir: parquet_files = sorted(local_path.glob("*.parquet")) if not parquet_files: missing.append(local_path) continue for parquet_file in parquet_files: rel_name = parquet_file.name dest = f"{remote_path}/{rel_name}" files_to_upload.append((parquet_file, dest)) total_size += parquet_file.stat().st_size else: files_to_upload.append((local_path, remote_path)) total_size += local_path.stat().st_size if missing: print("Error: missing normalized dataset paths:") for path in missing: print(f" {path}") print("Run dataset/scripts/normalize_geodata.py first.") return print("Uploading normalized datasets to Modal volume 'gazet-data'...") for local_path, remote_path in files_to_upload: print(f" {local_path.relative_to(data_path)} -> {remote_path} " f"({local_path.stat().st_size / (1024 * 1024):.1f} MB)") print(f" {len(files_to_upload)} files, {total_size / (1024 * 1024):.1f} MB") vol = modal.Volume.from_name("gazet-data", create_if_missing=True) with vol.batch_upload() as batch: for local_path, remote_path in files_to_upload: batch.put_file(str(local_path), remote_path) print("Upload complete")