|
|
""" |
|
|
Contains a simple experimentation pipeline for the QA system. |
|
|
|
|
|
Benefits: |
|
|
- Completely **plug-and-play**: the users can easily replace models and configs without |
|
|
needing to change pipeline or other code. |
|
|
- Automates experiment tracking/versioning: facilitates experimental iteration. |
|
|
- Offers data splitting routines promoting model generalization & objective perf measuring: |
|
|
a) initial training set gets split into: 'train' Vs 'val' subsets which DO NOT share |
|
|
common articles, such that the 'val' set simulates actual held-out article performance; |
|
|
b) initial 'dev' set can remain untouched until the very end for objective perf measuring |
|
|
""" |
|
|
|
|
|
import pandas as pd |
|
|
from typing import Tuple, Dict, Optional |
|
|
from pathlib import Path |
|
|
import sys |
|
|
from io import StringIO |
|
|
from src.utils.constants import ( |
|
|
EXPERIMENTS_DIR, |
|
|
DEV_DATA_PATH, |
|
|
TRAIN_DATA_PATH, |
|
|
Col, |
|
|
DEBUG_SEED, |
|
|
) |
|
|
from src.etl.squad_v2_loader import load_squad_v2_df, df_to_examples_map |
|
|
from src.models.base_qa_model import QAModel |
|
|
from src.etl.types import QAExample |
|
|
from src.config.model_configs import BaseModelConfig, BertQAConfig |
|
|
from src.evaluation.evaluator import Evaluator |
|
|
from src.utils.experiment_snapshot import ExperimentSnapshot |
|
|
|
|
|
DEFAULT_VAL_SET_FRACTION = 0.1 |
|
|
|
|
|
|
|
|
class Tee: |
|
|
""" |
|
|
Based on: https://stackoverflow.com/questions/616645/how-to-duplicate-sys-stdout-to-a-log-file |
|
|
Duplicates output to multiple destinations such that experiment tracking can include notebook output. |
|
|
""" |
|
|
|
|
|
def __init__(self, *files): |
|
|
self.files = files |
|
|
|
|
|
def write(self, obj): |
|
|
|
|
|
for f in self.files: |
|
|
f.write(obj) |
|
|
f.flush() |
|
|
|
|
|
def flush(self): |
|
|
|
|
|
for f in self.files: |
|
|
f.flush() |
|
|
|
|
|
|
|
|
def run_qa_experiment( |
|
|
experiment_name: str, |
|
|
model: QAModel, |
|
|
debug_limit: Optional[int] = None, |
|
|
val_fraction: float = DEFAULT_VAL_SET_FRACTION, |
|
|
) -> Tuple[ExperimentSnapshot, Path, Optional[pd.DataFrame], Optional[pd.DataFrame]]: |
|
|
""" |
|
|
Basic pipeline for running a QA system experiment. |
|
|
|
|
|
To facilitate debugging: |
|
|
1. The function can limit the #training examples processed |
|
|
2. The sampled ETLed input DF is also provided as part of the function return |
|
|
|
|
|
Note that debug_limit is only applied to the training instances; i.e., dev set is not capped. |
|
|
""" |
|
|
|
|
|
|
|
|
log_capture = StringIO() |
|
|
original_stdout = sys.stdout |
|
|
sys.stdout = Tee(sys.stdout, log_capture) |
|
|
|
|
|
try: |
|
|
if debug_limit is not None: |
|
|
print(f"{debug_limit} articles will be considered from training in total.") |
|
|
else: |
|
|
print("All articles from training set are considered.") |
|
|
assert TRAIN_DATA_PATH.exists(), "Unspecified train data location." |
|
|
|
|
|
(train_examples, val_examples), (df_train, _) = _load_examples( |
|
|
path=TRAIN_DATA_PATH, debug_limit=debug_limit, split_fraction=val_fraction |
|
|
) |
|
|
|
|
|
assert DEV_DATA_PATH.exists(), "Unspecified dev data location." |
|
|
|
|
|
(dev_examples, _), (df_dev, _) = _load_examples( |
|
|
path=DEV_DATA_PATH, debug_limit=None, split_fraction=None |
|
|
) |
|
|
|
|
|
|
|
|
assert len(train_examples) > 0, "train_examples is empty." |
|
|
assert len(dev_examples) > 0, "dev_examples is empty." |
|
|
|
|
|
if val_examples is not None: |
|
|
model.train(train_examples, val_examples=val_examples) |
|
|
else: |
|
|
model.train(train_examples) |
|
|
predictions = model.predict(dev_examples) |
|
|
metrics = Evaluator().evaluate(predictions=predictions, examples=dev_examples) |
|
|
|
|
|
|
|
|
config = getattr(model, "config", None) |
|
|
assert isinstance(config, BaseModelConfig), "Incompatible Config type." |
|
|
snapshot = ExperimentSnapshot( |
|
|
experiment_name=experiment_name, |
|
|
config=config, |
|
|
predictions=predictions, |
|
|
metrics=metrics, |
|
|
model=model, |
|
|
) |
|
|
|
|
|
print("\n" + "=" * 70) |
|
|
print("FINAL DEV SET RESULTS") |
|
|
print("=" * 70) |
|
|
print(f"Exact Match (EM): {snapshot.metrics.exact_score:.2f}%") |
|
|
print(f"F1 Score: {snapshot.metrics.f1_score:.2f}%") |
|
|
print(f"Total dev examples: {snapshot.metrics.total_num_instances}") |
|
|
print("=" * 70) |
|
|
|
|
|
run_dir = snapshot.save(experiments_root=EXPERIMENTS_DIR) |
|
|
(run_dir / "training_log.txt").write_text( |
|
|
log_capture.getvalue(), encoding="utf-8" |
|
|
) |
|
|
return snapshot, run_dir, df_train, df_dev |
|
|
finally: |
|
|
|
|
|
sys.stdout = original_stdout |
|
|
|
|
|
|
|
|
def create_experiment_name( |
|
|
model_name_short: str, config: BertQAConfig, num_articles: Optional[int] = None |
|
|
) -> str: |
|
|
assert ( |
|
|
model_name_short in config.backbone_name |
|
|
), "Inconsistent model name used for experiment tracking Vs actual model name." |
|
|
experiment_name = ( |
|
|
f"{model_name_short}_{num_articles}_articles" |
|
|
if num_articles is not None |
|
|
else f"{model_name_short}_ALL_articles" |
|
|
) |
|
|
return experiment_name |
|
|
|
|
|
|
|
|
def _load_examples( |
|
|
path: Path, debug_limit: int | None, split_fraction: float | None |
|
|
) -> Tuple[ |
|
|
Tuple[Dict[str, QAExample], Dict[str, QAExample] | None], |
|
|
Tuple[pd.DataFrame, pd.DataFrame | None], |
|
|
]: |
|
|
""" |
|
|
Returns both a dict with QAExample objects and the associated DF for debugging. |
|
|
Both the debug_limit and the split_fraction are operating on the ARTICLE level Vs |
|
|
individual example/question level. |
|
|
- debug_limit: caps the #articles returned for debugging/easier experimentation |
|
|
- split_fraction: enables train/val splitting based on initial training data |
|
|
""" |
|
|
df = load_squad_v2_df(path) |
|
|
|
|
|
if debug_limit is not None: |
|
|
all_titles = df[Col.TITLE.value].unique() |
|
|
assert ( |
|
|
1 <= debug_limit <= len(all_titles) |
|
|
), f"debug_limit={debug_limit} exceeds {len(all_titles)} available articles" |
|
|
|
|
|
|
|
|
sampled_titles = pd.Series(all_titles).sample( |
|
|
n=debug_limit, random_state=DEBUG_SEED |
|
|
) |
|
|
df = df[df[Col.TITLE.value].isin(sampled_titles)].copy() |
|
|
|
|
|
if split_fraction is not None: |
|
|
df_train, df_val = split_by_title(df, split_fraction) |
|
|
train_examples = df_to_examples_map(df_train) |
|
|
val_examples = df_to_examples_map(df_val) |
|
|
return (train_examples, val_examples), (df_train, df_val) |
|
|
else: |
|
|
examples = df_to_examples_map(df) |
|
|
return (examples, None), (df, None) |
|
|
|
|
|
|
|
|
def split_by_title( |
|
|
df: pd.DataFrame, val_fraction: float |
|
|
) -> Tuple[pd.DataFrame, pd.DataFrame]: |
|
|
""" |
|
|
Split input DF by article title ensuring no title overlap: |
|
|
this is critical for model generalization to new contexts Vs |
|
|
simply memorizing passages and responding to new questions about them |
|
|
(e.g., when splitting the initial training set into 'train' and 'val' subsets). |
|
|
""" |
|
|
assert 0 < val_fraction < 1, "val set fraction should be between (0, 1)." |
|
|
unique_titles = df[Col.TITLE.value].drop_duplicates() |
|
|
shuffled_titles = unique_titles.sample(frac=1.0, random_state=DEBUG_SEED) |
|
|
num_unique_titles = len(shuffled_titles) |
|
|
|
|
|
n_val = max(1, int(num_unique_titles * val_fraction)) |
|
|
val_titles = set(shuffled_titles.iloc[:n_val]) |
|
|
train_titles = set(shuffled_titles.iloc[n_val:]) |
|
|
|
|
|
df_val = df[df[Col.TITLE.value].isin(val_titles)].copy() |
|
|
df_train = df[df[Col.TITLE.value].isin(train_titles)].copy() |
|
|
print( |
|
|
f"Initial split | num-train-examples: {df_train.shape[0]}; num-val-examples: {df_val.shape[0]}" |
|
|
) |
|
|
return df_train, df_val |
|
|
|