WorldDisasterLM-8B / dataset_builder.py
drdeveloper88's picture
Upload WorldDisasterLM-8B source code: FastAPI backend, training pipeline, 11-language support
495526b
Raw
History Blame Contribute Delete
3.41 kB
"""dataset_builder.py — standalone entry-point.
Collects data from all configured online sources and writes the final
instruction-following JSONL dataset ready for training.
For full control over which sources and limits to use, prefer:
python scripts/collect_data.py --sources reliefweb usgs gdacs --max-per-source 5000
"""
from __future__ import annotations
import logging
from pathlib import Path
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger(__name__)
DEFAULT_LIMITS: dict[str, int] = {
"reliefweb": 5000,
"usgs": 20000,
"gdacs": 2000,
"noaa": 5000,
"openfema": 20000,
"who": 1000,
}
def main() -> None:
from worlddisasterlm.data.etl import DisasterETL
from worlddisasterlm.data.qa_generator import generate_qa_pairs
from worlddisasterlm.data.scenario_builder import build_all_scenarios
from worlddisasterlm.data.processors import save_instruction_dataset
# Try live collection; fall back to stub if network is unavailable
all_records = []
for source, limit in DEFAULT_LIMITS.items():
try:
if source == "reliefweb":
from worlddisasterlm.data.collectors.reliefweb import collect_reliefweb
all_records.extend(collect_reliefweb(max_records=limit))
elif source == "usgs":
from worlddisasterlm.data.collectors.usgs import collect_usgs
all_records.extend(collect_usgs(max_records=limit))
elif source == "gdacs":
from worlddisasterlm.data.collectors.gdacs import collect_gdacs
all_records.extend(collect_gdacs(max_records=limit))
elif source == "noaa":
from worlddisasterlm.data.collectors.noaa import collect_noaa
all_records.extend(collect_noaa(max_records=limit))
elif source == "openfema":
from worlddisasterlm.data.collectors.openfema import collect_openfema
all_records.extend(collect_openfema(max_records=limit))
elif source == "who":
from worlddisasterlm.data.collectors.who_rss import collect_who
all_records.extend(collect_who(max_records=limit))
logger.info("%-12s collected %d total records so far", source, len(all_records))
except Exception as exc:
logger.warning("Source %s failed (%s). Continuing with remaining sources.", source, exc)
if not all_records:
logger.warning("No online records collected. Using stub data for offline testing.")
from worlddisasterlm.data.etl import DisasterETL
etl = DisasterETL()
all_records = etl.normalize(etl.deduplicate(etl.collect_records()))
else:
from worlddisasterlm.data.etl import DisasterETL
etl = DisasterETL()
all_records = etl.deduplicate(all_records)
all_records = etl.normalize(all_records)
logger.info("Total normalized records: %d", len(all_records))
qa_samples = generate_qa_pairs(all_records)
qa_samples.extend(build_all_scenarios())
logger.info("Total instruction samples: %d", len(qa_samples))
output_path = Path("data/processed/instruction_dataset.jsonl")
save_instruction_dataset(qa_samples, str(output_path))
logger.info("Dataset saved: %s", output_path)
if __name__ == "__main__":
main()