""" Data collection orchestrator. Runs all online collectors and saves the combined raw dataset plus the instruction-following JSONL used for training. Usage ----- python scripts/collect_data.py # all sources, default limits python scripts/collect_data.py --sources reliefweb usgs gdacs python scripts/collect_data.py --max-per-source 2000 """ from __future__ import annotations import argparse import json import logging import time from pathlib import Path logger = logging.getLogger(__name__) DEFAULT_LIMITS: dict[str, int] = { "reliefweb": 5000, "usgs": 20000, "gdacs": 2000, "noaa": 5000, "openfema": 20000, "who": 1000, } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Collect online disaster data") parser.add_argument( "--sources", nargs="+", default=list(DEFAULT_LIMITS.keys()), choices=list(DEFAULT_LIMITS.keys()), help="Data sources to collect from", ) parser.add_argument("--max-per-source", type=int, default=None, help="Override max records per source") parser.add_argument("--raw-dir", default="data/raw", help="Directory for raw records") parser.add_argument("--processed-dir", default="data/processed", help="Directory for processed JSONL") return parser.parse_args() def save_records(records: list, path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: for record in records: handle.write(json.dumps(record.__dict__, ensure_ascii=False) + "\n") logger.info("Saved %d records to %s", len(records), path) def collect_source(source: str, max_records: int) -> list: if source == "reliefweb": from worlddisasterlm.data.collectors.reliefweb import collect_reliefweb return collect_reliefweb(max_records=max_records) if source == "usgs": from worlddisasterlm.data.collectors.usgs import collect_usgs return collect_usgs(max_records=max_records) if source == "gdacs": from worlddisasterlm.data.collectors.gdacs import collect_gdacs return collect_gdacs(max_records=max_records) if source == "noaa": from worlddisasterlm.data.collectors.noaa import collect_noaa return collect_noaa(max_records=max_records) if source == "openfema": from worlddisasterlm.data.collectors.openfema import collect_openfema return collect_openfema(max_records=max_records) if source == "who": from worlddisasterlm.data.collectors.who_rss import collect_who return collect_who(max_records=max_records) raise ValueError(f"Unknown source: {source}") def main() -> None: logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s") args = parse_args() from worlddisasterlm.data.etl import DisasterETL from worlddisasterlm.data.qa_generator import generate_qa_pairs from worlddisasterlm.data.scenario_builder import build_all_scenarios from worlddisasterlm.data.processors import save_instruction_dataset raw_dir = Path(args.raw_dir) processed_dir = Path(args.processed_dir) processed_dir.mkdir(parents=True, exist_ok=True) all_records = [] for source in args.sources: limit = args.max_per_source or DEFAULT_LIMITS.get(source, 5000) logger.info("Collecting from %s (max=%d) …", source, limit) try: records = collect_source(source, limit) save_records(records, raw_dir / f"{source}.jsonl") all_records.extend(records) except Exception as exc: logger.error("Failed to collect from %s: %s", source, exc) time.sleep(1) # polite delay between sources etl = DisasterETL() all_records = etl.deduplicate(all_records) all_records = etl.normalize(all_records) logger.info("Total normalized records after dedup: %d", len(all_records)) # Generate instruction QA pairs (8x amplification) logger.info("Generating instruction QA pairs …") qa_samples = generate_qa_pairs(all_records) # Add compound + multilingual scenarios extra_samples = build_all_scenarios() qa_samples.extend(extra_samples) logger.info("Total instruction samples: %d", len(qa_samples)) output_path = processed_dir / "instruction_dataset.jsonl" save_instruction_dataset(qa_samples, str(output_path)) logger.info("Instruction dataset saved: %s", output_path) if __name__ == "__main__": main()