gazet / dataset /modal_app.py
srmsoumya's picture
fix: reduce templates to country, region & county for mvp
3ba9557
"""Modal app for distributed dataset generation."""
import modal
app = modal.App("gazet-dataset")
VOLUME_MOUNT = "/data"
INTERMEDIATE_MOUNT = "/intermediate"
volume = modal.Volume.from_name("gazet-data", create_if_missing=True)
intermediate_volume = modal.Volume.from_name(
"gazet-intermediate", create_if_missing=True
)
image = (
modal.Image.debian_slim(python_version="3.12")
.pip_install(
"duckdb>=1.4.4",
"dspy>=3.1.3",
"fastapi>=0.100",
"pandas>=2.2",
"pydantic>=2.0",
"pyarrow>=17.0.0",
"pyyaml>=6.0",
)
.env(
{
"GAZET_DATA_DIR": VOLUME_MOUNT,
"PYTHONPATH": "/root",
# Spatial self-joins are much more stable with conservative
# DuckDB settings inside Modal containers.
"GAZET_DUCKDB_THREADS": "1",
"GAZET_DUCKDB_MEMORY_LIMIT": "20GB",
}
)
.add_local_dir("src/gazet", "/root/gazet")
.add_local_dir("dataset", "/root/dataset")
)
@app.function(
image=image,
volumes={VOLUME_MOUNT: volume, INTERMEDIATE_MOUNT: intermediate_volume},
timeout=300,
cpu=2,
memory=4096,
)
def build_inventory_remote():
"""Build entity inventory from parquet files on the volume."""
from pathlib import Path
from dataset.scripts.build_inventory import build_inventory_to_dir
result = build_inventory_to_dir(Path(INTERMEDIATE_MOUNT))
intermediate_volume.commit()
return result
@app.function(
image=image,
volumes={VOLUME_MOUNT: volume, INTERMEDIATE_MOUNT: intermediate_volume},
timeout=7200,
cpu=2,
memory=65536,
)
def build_relation_remote(relation_type: str, countries: list, limit: int):
"""Compute one relation type and save to intermediate volume."""
from pathlib import Path
from dataset.scripts.build_relations import compute_single_relation
count = compute_single_relation(
relation_type=relation_type,
countries=countries,
limit=limit,
output_dir=Path(INTERMEDIATE_MOUNT),
)
intermediate_volume.commit()
return {"relation_type": relation_type, "count": count}
@app.function(
image=image,
volumes={VOLUME_MOUNT: volume, INTERMEDIATE_MOUNT: intermediate_volume},
timeout=7200,
cpu=2,
memory=4096,
)
def generate_batch_remote(work_items: list) -> list:
"""Process a batch of work items on a Modal container."""
from dataset.scripts.generate_samples import generate_batch_core
results = generate_batch_core(
work_items=work_items,
intermediate_dir=INTERMEDIATE_MOUNT,
)
print(f"Batch complete: {sum(1 for r in results if r['sample'])} success / "
f"{sum(1 for r in results if not r['sample'])} failed out of {len(work_items)}")
return results
@app.local_entrypoint()
def run_pipeline(
config_path: str = "dataset/config.yaml",
num_containers: int = 0,
skip_inventory: bool = False,
skip_relations: bool = False,
fresh: bool = False,
):
"""Run the full distributed pipeline."""
import yaml
from pathlib import Path
config = yaml.safe_load(Path(config_path).read_text())
countries = config["countries"]
sample_targets = config["sample_targets"]
modal_cfg = config.get("modal", {})
n_containers = num_containers or modal_cfg.get("num_containers", 50)
retry_multiplier = config["generation"]["retry_multiplier"]
print(f"Countries: {countries}")
print(f"Sample targets: {sample_targets}")
print(f"Containers: {n_containers}")
if not skip_inventory:
print("Building inventory...")
result = build_inventory_remote.remote()
print(f" Inventory: {result}")
if not skip_relations:
print("Building relations...")
from dataset.scripts.cli import calculate_relation_limits
relation_needs = calculate_relation_limits(config)
# Global containment-style relations are the most expensive and don't
# need extremely large anchor tables to support sample generation.
if countries == ["all"]:
for rel_type, cap in {
"containment": 12000,
"coastal_containment": 8000,
"landlocked_containment": 8000,
}.items():
if rel_type in relation_needs:
relation_needs[rel_type] = min(relation_needs[rel_type], cap)
# Spatial relation builds are the most crash-prone part of the Modal
# pipeline. Run them sequentially with conservative DuckDB settings
# rather than fanning out several large native spatial joins at once.
# common_neighbor still runs after adjacency because it depends on the
# adjacency parquet being committed first.
ordered_relations = [
rel_type
for rel_type in (
"adjacency",
"containment",
"intersection",
"cross_source",
"coastal_containment",
"landlocked_containment",
"common_neighbor",
)
if rel_type in relation_needs
]
for rel_type in ordered_relations:
limit = max(relation_needs[rel_type], 500)
print(f" building {rel_type} (limit={limit})...")
result = build_relation_remote.remote(rel_type, countries, limit)
print(f" {rel_type}: {result['count']} pairs")
print(f"Generating samples across {n_containers} containers...")
import json
from dataset.scripts.generate_samples import prepare_work_items
output_dir = Path("dataset/output")
output_dir.mkdir(exist_ok=True, parents=True)
output_file = output_dir / "dataset_raw.jsonl"
existing_samples = []
sample_counter = 1
if not fresh and output_file.exists():
with open(output_file) as f:
for line in f:
if line.strip():
existing_samples.append(json.loads(line))
if existing_samples:
max_id = max(
int(s["id"].split("_")[1])
for s in existing_samples
if s["id"].startswith("sample_")
)
sample_counter = max_id + 1
print(f" Appending to {len(existing_samples)} existing samples")
work_items = prepare_work_items(
target_counts=sample_targets,
retry_multiplier=retry_multiplier,
start_counter=sample_counter,
intermediate_dir_str="",
)
total_work = len(work_items)
print(f" Total work items: {total_work}")
batch_size = max(1, (total_work + n_containers - 1) // n_containers)
batches = [
work_items[i : i + batch_size]
for i in range(0, total_work, batch_size)
]
print(f" Batches: {len(batches)} x ~{batch_size} items")
new_sample_count = 0
failed_batches = 0
family_progress = {}
write_mode = "w" if fresh else "a"
fout = open(output_file, write_mode)
try:
for batch_results in generate_batch_remote.map(
batches, return_exceptions=True
):
if isinstance(batch_results, Exception):
failed_batches += 1
print(f" Batch failed: {batch_results}")
continue
batch_samples = []
for r in batch_results:
fam = r["family"]
if fam not in family_progress:
family_progress[fam] = {"success": 0, "failed": 0}
if r["sample"]:
batch_samples.append(r["sample"])
family_progress[fam]["success"] += 1
else:
family_progress[fam]["failed"] += 1
for sample in batch_samples:
fout.write(json.dumps(sample) + "\n")
fout.flush()
new_sample_count += len(batch_samples)
done = sum(p["success"] + p["failed"] for p in family_progress.values())
print(f" Progress: {done}/{total_work} items | {new_sample_count} saved | {failed_batches} batch errors")
except Exception as e:
print(f" Map interrupted: {e}")
finally:
fout.close()
print(f"\nResults by family:")
for fam in sorted(family_progress.keys()):
s = family_progress[fam]["success"]
f = family_progress[fam]["failed"]
total = s + f
rate = (s / total * 100) if total > 0 else 0
target = sample_targets.get(fam, 0)
print(
f" {fam:20s}: {s:4d} success / {f:4d} failed "
f"({rate:5.1f}%, target: {target})"
)
total_samples = len(existing_samples) + new_sample_count
status = "COMPLETE" if failed_batches == 0 else "PARTIAL"
print(f"\nGeneration {status}: {new_sample_count} new, {total_samples} total")
if failed_batches:
print(f" Failed batches: {failed_batches}/{len(batches)}")
print(f" Output: {output_file}")
@app.local_entrypoint()
def upload_data(data_dir: str = "data"):
"""Upload only normalized parquet datasets to the Modal volume.
The runtime config prefers these normalized copies, so there is no need to
upload the original raw source trees to Modal.
"""
from pathlib import Path
data_path = Path(data_dir)
if not data_path.exists():
print(f"Error: {data_path} does not exist")
return
upload_specs = [
(
data_path / "overture_normalized" / "divisions_area",
"/overture_normalized/divisions_area",
True,
),
(
data_path / "natural_earth_normalized" / "ne_geography.parquet",
"/natural_earth_normalized/ne_geography.parquet",
False,
),
]
missing = []
files_to_upload: list[tuple[Path, str]] = []
total_size = 0
for local_path, remote_path, is_dir in upload_specs:
if not local_path.exists():
missing.append(local_path)
continue
if is_dir:
parquet_files = sorted(local_path.glob("*.parquet"))
if not parquet_files:
missing.append(local_path)
continue
for parquet_file in parquet_files:
rel_name = parquet_file.name
dest = f"{remote_path}/{rel_name}"
files_to_upload.append((parquet_file, dest))
total_size += parquet_file.stat().st_size
else:
files_to_upload.append((local_path, remote_path))
total_size += local_path.stat().st_size
if missing:
print("Error: missing normalized dataset paths:")
for path in missing:
print(f" {path}")
print("Run dataset/scripts/normalize_geodata.py first.")
return
print("Uploading normalized datasets to Modal volume 'gazet-data'...")
for local_path, remote_path in files_to_upload:
print(f" {local_path.relative_to(data_path)} -> {remote_path} "
f"({local_path.stat().st_size / (1024 * 1024):.1f} MB)")
print(f" {len(files_to_upload)} files, {total_size / (1024 * 1024):.1f} MB")
vol = modal.Volume.from_name("gazet-data", create_if_missing=True)
with vol.batch_upload() as batch:
for local_path, remote_path in files_to_upload:
batch.put_file(str(local_path), remote_path)
print("Upload complete")