File size: 4,561 Bytes
495526b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
Data collection orchestrator.

Runs all online collectors and saves the combined raw dataset plus
the instruction-following JSONL used for training.

Usage
-----
  python scripts/collect_data.py                        # all sources, default limits
  python scripts/collect_data.py --sources reliefweb usgs gdacs
  python scripts/collect_data.py --max-per-source 2000
"""

from __future__ import annotations

import argparse
import json
import logging
import time
from pathlib import Path

logger = logging.getLogger(__name__)

DEFAULT_LIMITS: dict[str, int] = {
    "reliefweb": 5000,
    "usgs": 20000,
    "gdacs": 2000,
    "noaa": 5000,
    "openfema": 20000,
    "who": 1000,
}


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Collect online disaster data")
    parser.add_argument(
        "--sources",
        nargs="+",
        default=list(DEFAULT_LIMITS.keys()),
        choices=list(DEFAULT_LIMITS.keys()),
        help="Data sources to collect from",
    )
    parser.add_argument("--max-per-source", type=int, default=None, help="Override max records per source")
    parser.add_argument("--raw-dir", default="data/raw", help="Directory for raw records")
    parser.add_argument("--processed-dir", default="data/processed", help="Directory for processed JSONL")
    return parser.parse_args()


def save_records(records: list, path: Path) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8") as handle:
        for record in records:
            handle.write(json.dumps(record.__dict__, ensure_ascii=False) + "\n")
    logger.info("Saved %d records to %s", len(records), path)


def collect_source(source: str, max_records: int) -> list:
    if source == "reliefweb":
        from worlddisasterlm.data.collectors.reliefweb import collect_reliefweb
        return collect_reliefweb(max_records=max_records)
    if source == "usgs":
        from worlddisasterlm.data.collectors.usgs import collect_usgs
        return collect_usgs(max_records=max_records)
    if source == "gdacs":
        from worlddisasterlm.data.collectors.gdacs import collect_gdacs
        return collect_gdacs(max_records=max_records)
    if source == "noaa":
        from worlddisasterlm.data.collectors.noaa import collect_noaa
        return collect_noaa(max_records=max_records)
    if source == "openfema":
        from worlddisasterlm.data.collectors.openfema import collect_openfema
        return collect_openfema(max_records=max_records)
    if source == "who":
        from worlddisasterlm.data.collectors.who_rss import collect_who
        return collect_who(max_records=max_records)
    raise ValueError(f"Unknown source: {source}")


def main() -> None:
    logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
    args = parse_args()

    from worlddisasterlm.data.etl import DisasterETL
    from worlddisasterlm.data.qa_generator import generate_qa_pairs
    from worlddisasterlm.data.scenario_builder import build_all_scenarios
    from worlddisasterlm.data.processors import save_instruction_dataset

    raw_dir = Path(args.raw_dir)
    processed_dir = Path(args.processed_dir)
    processed_dir.mkdir(parents=True, exist_ok=True)

    all_records = []

    for source in args.sources:
        limit = args.max_per_source or DEFAULT_LIMITS.get(source, 5000)
        logger.info("Collecting from %s (max=%d) …", source, limit)
        try:
            records = collect_source(source, limit)
            save_records(records, raw_dir / f"{source}.jsonl")
            all_records.extend(records)
        except Exception as exc:
            logger.error("Failed to collect from %s: %s", source, exc)

        time.sleep(1)  # polite delay between sources

    etl = DisasterETL()
    all_records = etl.deduplicate(all_records)
    all_records = etl.normalize(all_records)
    logger.info("Total normalized records after dedup: %d", len(all_records))

    # Generate instruction QA pairs (8x amplification)
    logger.info("Generating instruction QA pairs …")
    qa_samples = generate_qa_pairs(all_records)

    # Add compound + multilingual scenarios
    extra_samples = build_all_scenarios()
    qa_samples.extend(extra_samples)

    logger.info("Total instruction samples: %d", len(qa_samples))

    output_path = processed_dir / "instruction_dataset.jsonl"
    save_instruction_dataset(qa_samples, str(output_path))
    logger.info("Instruction dataset saved: %s", output_path)


if __name__ == "__main__":
    main()