File size: 8,084 Bytes
461f64f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""
Contains a simple experimentation pipeline for the QA system.

Benefits:
    - Completely **plug-and-play**: the users can easily replace models and configs without
    needing to change pipeline or other code.
    - Automates experiment tracking/versioning: facilitates experimental iteration.
    - Offers data splitting routines promoting model generalization & objective perf measuring:
        a) initial training set gets split into: 'train' Vs 'val' subsets which DO NOT share
        common articles, such that the 'val' set simulates actual held-out article performance;
        b) initial 'dev' set can remain untouched until the very end for objective perf measuring
"""

import pandas as pd
from typing import Tuple, Dict, Optional
from pathlib import Path
import sys
from io import StringIO
from src.utils.constants import (
    EXPERIMENTS_DIR,
    DEV_DATA_PATH,
    TRAIN_DATA_PATH,
    Col,
    DEBUG_SEED,
)
from src.etl.squad_v2_loader import load_squad_v2_df, df_to_examples_map
from src.models.base_qa_model import QAModel
from src.etl.types import QAExample
from src.config.model_configs import BaseModelConfig, BertQAConfig
from src.evaluation.evaluator import Evaluator
from src.utils.experiment_snapshot import ExperimentSnapshot

DEFAULT_VAL_SET_FRACTION = 0.1


class Tee:
    """
    Based on: https://stackoverflow.com/questions/616645/how-to-duplicate-sys-stdout-to-a-log-file
    Duplicates output to multiple destinations such that experiment tracking can include notebook output.
    """

    def __init__(self, *files):
        self.files = files

    def write(self, obj):
        # Writes to all of the streams
        for f in self.files:
            f.write(obj)
            f.flush()

    def flush(self):
        # Flushes all of the streams (ensures text appears immediately)
        for f in self.files:
            f.flush()


def run_qa_experiment(
    experiment_name: str,
    model: QAModel,
    debug_limit: Optional[int] = None,
    val_fraction: float = DEFAULT_VAL_SET_FRACTION,
) -> Tuple[ExperimentSnapshot, Path, Optional[pd.DataFrame], Optional[pd.DataFrame]]:
    """
    Basic pipeline for running a QA system experiment.

    To facilitate debugging:
        1. The function can limit the #training examples processed
        2. The sampled ETLed input DF is also provided as part of the function return

    Note that debug_limit is only applied to the training instances; i.e., dev set is not capped.
    """
    # TODO - use proper logging for all of this
    # Capture output to StringIO while printing to console
    log_capture = StringIO()
    original_stdout = sys.stdout
    sys.stdout = Tee(sys.stdout, log_capture)

    try:
        if debug_limit is not None:
            print(f"{debug_limit} articles will be considered from training in total.")
        else:
            print("All articles from training set are considered.")
        assert TRAIN_DATA_PATH.exists(), "Unspecified train data location."
        # Note that df_val can be returned for debugging: ignored for now
        (train_examples, val_examples), (df_train, _) = _load_examples(
            path=TRAIN_DATA_PATH, debug_limit=debug_limit, split_fraction=val_fraction
        )

        assert DEV_DATA_PATH.exists(), "Unspecified dev data location."
        # do NOT split dev set -> split_fraction is explicitly set to None
        (dev_examples, _), (df_dev, _) = _load_examples(
            path=DEV_DATA_PATH, debug_limit=None, split_fraction=None
        )

        # Sanity checking for non-empty data splits
        assert len(train_examples) > 0, "train_examples is empty."
        assert len(dev_examples) > 0, "dev_examples is empty."

        if val_examples is not None:
            model.train(train_examples, val_examples=val_examples)
        else:
            model.train(train_examples)
        predictions = model.predict(dev_examples)
        metrics = Evaluator().evaluate(predictions=predictions, examples=dev_examples)

        # Save experiment
        config = getattr(model, "config", None)
        assert isinstance(config, BaseModelConfig), "Incompatible Config type."
        snapshot = ExperimentSnapshot(
            experiment_name=experiment_name,
            config=config,
            predictions=predictions,
            metrics=metrics,
            model=model,
        )

        print("\n" + "=" * 70)
        print("FINAL DEV SET RESULTS")
        print("=" * 70)
        print(f"Exact Match (EM): {snapshot.metrics.exact_score:.2f}%")
        print(f"F1 Score: {snapshot.metrics.f1_score:.2f}%")
        print(f"Total dev examples: {snapshot.metrics.total_num_instances}")
        print("=" * 70)

        run_dir = snapshot.save(experiments_root=EXPERIMENTS_DIR)
        (run_dir / "training_log.txt").write_text(
            log_capture.getvalue(), encoding="utf-8"
        )
        return snapshot, run_dir, df_train, df_dev
    finally:
        # Restore stdout after running the experiment
        sys.stdout = original_stdout


def create_experiment_name(
    model_name_short: str, config: BertQAConfig, num_articles: Optional[int] = None
) -> str:
    assert (
        model_name_short in config.backbone_name
    ), "Inconsistent model name used for experiment tracking Vs actual model name."
    experiment_name = (
        f"{model_name_short}_{num_articles}_articles"
        if num_articles is not None
        else f"{model_name_short}_ALL_articles"
    )
    return experiment_name


def _load_examples(
    path: Path, debug_limit: int | None, split_fraction: float | None
) -> Tuple[
    Tuple[Dict[str, QAExample], Dict[str, QAExample] | None],
    Tuple[pd.DataFrame, pd.DataFrame | None],
]:
    """
    Returns both a dict with QAExample objects and the associated DF for debugging.
    Both the debug_limit and the split_fraction are operating on the ARTICLE level Vs
    individual example/question level.
        - debug_limit: caps the #articles returned for debugging/easier experimentation
        - split_fraction: enables train/val splitting based on initial training data
    """
    df = load_squad_v2_df(path)

    if debug_limit is not None:
        all_titles = df[Col.TITLE.value].unique()
        assert (
            1 <= debug_limit <= len(all_titles)
        ), f"debug_limit={debug_limit} exceeds {len(all_titles)} available articles"

        # df = df.sample(n=debug_limit, random_state=DEBUG_SEED).copy()
        sampled_titles = pd.Series(all_titles).sample(
            n=debug_limit, random_state=DEBUG_SEED
        )
        df = df[df[Col.TITLE.value].isin(sampled_titles)].copy()

    if split_fraction is not None:
        df_train, df_val = split_by_title(df, split_fraction)
        train_examples = df_to_examples_map(df_train)
        val_examples = df_to_examples_map(df_val)
        return (train_examples, val_examples), (df_train, df_val)
    else:
        examples = df_to_examples_map(df)
        return (examples, None), (df, None)


def split_by_title(
    df: pd.DataFrame, val_fraction: float
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Split input DF by article title ensuring no title overlap:
    this is critical for model generalization to new contexts Vs
    simply memorizing passages and responding to new questions about them
    (e.g., when splitting the initial training set into 'train' and 'val' subsets).
    """
    assert 0 < val_fraction < 1, "val set fraction should be between (0, 1)."
    unique_titles = df[Col.TITLE.value].drop_duplicates()
    shuffled_titles = unique_titles.sample(frac=1.0, random_state=DEBUG_SEED)
    num_unique_titles = len(shuffled_titles)

    n_val = max(1, int(num_unique_titles * val_fraction))
    val_titles = set(shuffled_titles.iloc[:n_val])
    train_titles = set(shuffled_titles.iloc[n_val:])

    df_val = df[df[Col.TITLE.value].isin(val_titles)].copy()
    df_train = df[df[Col.TITLE.value].isin(train_titles)].copy()
    print(
        f"Initial split | num-train-examples: {df_train.shape[0]}; num-val-examples: {df_val.shape[0]}"
    )
    return df_train, df_val