WorldDisasterLM-8B / scripts /collect_data.py
drdeveloper88's picture
Upload WorldDisasterLM-8B source code: FastAPI backend, training pipeline, 11-language support
495526b
Raw
History Blame Contribute Delete
4.56 kB
"""
Data collection orchestrator.
Runs all online collectors and saves the combined raw dataset plus
the instruction-following JSONL used for training.
Usage
-----
python scripts/collect_data.py # all sources, default limits
python scripts/collect_data.py --sources reliefweb usgs gdacs
python scripts/collect_data.py --max-per-source 2000
"""
from __future__ import annotations
import argparse
import json
import logging
import time
from pathlib import Path
logger = logging.getLogger(__name__)
DEFAULT_LIMITS: dict[str, int] = {
"reliefweb": 5000,
"usgs": 20000,
"gdacs": 2000,
"noaa": 5000,
"openfema": 20000,
"who": 1000,
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Collect online disaster data")
parser.add_argument(
"--sources",
nargs="+",
default=list(DEFAULT_LIMITS.keys()),
choices=list(DEFAULT_LIMITS.keys()),
help="Data sources to collect from",
)
parser.add_argument("--max-per-source", type=int, default=None, help="Override max records per source")
parser.add_argument("--raw-dir", default="data/raw", help="Directory for raw records")
parser.add_argument("--processed-dir", default="data/processed", help="Directory for processed JSONL")
return parser.parse_args()
def save_records(records: list, path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
for record in records:
handle.write(json.dumps(record.__dict__, ensure_ascii=False) + "\n")
logger.info("Saved %d records to %s", len(records), path)
def collect_source(source: str, max_records: int) -> list:
if source == "reliefweb":
from worlddisasterlm.data.collectors.reliefweb import collect_reliefweb
return collect_reliefweb(max_records=max_records)
if source == "usgs":
from worlddisasterlm.data.collectors.usgs import collect_usgs
return collect_usgs(max_records=max_records)
if source == "gdacs":
from worlddisasterlm.data.collectors.gdacs import collect_gdacs
return collect_gdacs(max_records=max_records)
if source == "noaa":
from worlddisasterlm.data.collectors.noaa import collect_noaa
return collect_noaa(max_records=max_records)
if source == "openfema":
from worlddisasterlm.data.collectors.openfema import collect_openfema
return collect_openfema(max_records=max_records)
if source == "who":
from worlddisasterlm.data.collectors.who_rss import collect_who
return collect_who(max_records=max_records)
raise ValueError(f"Unknown source: {source}")
def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
args = parse_args()
from worlddisasterlm.data.etl import DisasterETL
from worlddisasterlm.data.qa_generator import generate_qa_pairs
from worlddisasterlm.data.scenario_builder import build_all_scenarios
from worlddisasterlm.data.processors import save_instruction_dataset
raw_dir = Path(args.raw_dir)
processed_dir = Path(args.processed_dir)
processed_dir.mkdir(parents=True, exist_ok=True)
all_records = []
for source in args.sources:
limit = args.max_per_source or DEFAULT_LIMITS.get(source, 5000)
logger.info("Collecting from %s (max=%d) …", source, limit)
try:
records = collect_source(source, limit)
save_records(records, raw_dir / f"{source}.jsonl")
all_records.extend(records)
except Exception as exc:
logger.error("Failed to collect from %s: %s", source, exc)
time.sleep(1) # polite delay between sources
etl = DisasterETL()
all_records = etl.deduplicate(all_records)
all_records = etl.normalize(all_records)
logger.info("Total normalized records after dedup: %d", len(all_records))
# Generate instruction QA pairs (8x amplification)
logger.info("Generating instruction QA pairs …")
qa_samples = generate_qa_pairs(all_records)
# Add compound + multilingual scenarios
extra_samples = build_all_scenarios()
qa_samples.extend(extra_samples)
logger.info("Total instruction samples: %d", len(qa_samples))
output_path = processed_dir / "instruction_dataset.jsonl"
save_instruction_dataset(qa_samples, str(output_path))
logger.info("Instruction dataset saved: %s", output_path)
if __name__ == "__main__":
main()