squad2-qa / src /pipeline /qa_runner.py
Kimis Perros
Initial deployment
461f64f
"""
Contains a simple experimentation pipeline for the QA system.
Benefits:
- Completely **plug-and-play**: the users can easily replace models and configs without
needing to change pipeline or other code.
- Automates experiment tracking/versioning: facilitates experimental iteration.
- Offers data splitting routines promoting model generalization & objective perf measuring:
a) initial training set gets split into: 'train' Vs 'val' subsets which DO NOT share
common articles, such that the 'val' set simulates actual held-out article performance;
b) initial 'dev' set can remain untouched until the very end for objective perf measuring
"""
import pandas as pd
from typing import Tuple, Dict, Optional
from pathlib import Path
import sys
from io import StringIO
from src.utils.constants import (
EXPERIMENTS_DIR,
DEV_DATA_PATH,
TRAIN_DATA_PATH,
Col,
DEBUG_SEED,
)
from src.etl.squad_v2_loader import load_squad_v2_df, df_to_examples_map
from src.models.base_qa_model import QAModel
from src.etl.types import QAExample
from src.config.model_configs import BaseModelConfig, BertQAConfig
from src.evaluation.evaluator import Evaluator
from src.utils.experiment_snapshot import ExperimentSnapshot
DEFAULT_VAL_SET_FRACTION = 0.1
class Tee:
"""
Based on: https://stackoverflow.com/questions/616645/how-to-duplicate-sys-stdout-to-a-log-file
Duplicates output to multiple destinations such that experiment tracking can include notebook output.
"""
def __init__(self, *files):
self.files = files
def write(self, obj):
# Writes to all of the streams
for f in self.files:
f.write(obj)
f.flush()
def flush(self):
# Flushes all of the streams (ensures text appears immediately)
for f in self.files:
f.flush()
def run_qa_experiment(
experiment_name: str,
model: QAModel,
debug_limit: Optional[int] = None,
val_fraction: float = DEFAULT_VAL_SET_FRACTION,
) -> Tuple[ExperimentSnapshot, Path, Optional[pd.DataFrame], Optional[pd.DataFrame]]:
"""
Basic pipeline for running a QA system experiment.
To facilitate debugging:
1. The function can limit the #training examples processed
2. The sampled ETLed input DF is also provided as part of the function return
Note that debug_limit is only applied to the training instances; i.e., dev set is not capped.
"""
# TODO - use proper logging for all of this
# Capture output to StringIO while printing to console
log_capture = StringIO()
original_stdout = sys.stdout
sys.stdout = Tee(sys.stdout, log_capture)
try:
if debug_limit is not None:
print(f"{debug_limit} articles will be considered from training in total.")
else:
print("All articles from training set are considered.")
assert TRAIN_DATA_PATH.exists(), "Unspecified train data location."
# Note that df_val can be returned for debugging: ignored for now
(train_examples, val_examples), (df_train, _) = _load_examples(
path=TRAIN_DATA_PATH, debug_limit=debug_limit, split_fraction=val_fraction
)
assert DEV_DATA_PATH.exists(), "Unspecified dev data location."
# do NOT split dev set -> split_fraction is explicitly set to None
(dev_examples, _), (df_dev, _) = _load_examples(
path=DEV_DATA_PATH, debug_limit=None, split_fraction=None
)
# Sanity checking for non-empty data splits
assert len(train_examples) > 0, "train_examples is empty."
assert len(dev_examples) > 0, "dev_examples is empty."
if val_examples is not None:
model.train(train_examples, val_examples=val_examples)
else:
model.train(train_examples)
predictions = model.predict(dev_examples)
metrics = Evaluator().evaluate(predictions=predictions, examples=dev_examples)
# Save experiment
config = getattr(model, "config", None)
assert isinstance(config, BaseModelConfig), "Incompatible Config type."
snapshot = ExperimentSnapshot(
experiment_name=experiment_name,
config=config,
predictions=predictions,
metrics=metrics,
model=model,
)
print("\n" + "=" * 70)
print("FINAL DEV SET RESULTS")
print("=" * 70)
print(f"Exact Match (EM): {snapshot.metrics.exact_score:.2f}%")
print(f"F1 Score: {snapshot.metrics.f1_score:.2f}%")
print(f"Total dev examples: {snapshot.metrics.total_num_instances}")
print("=" * 70)
run_dir = snapshot.save(experiments_root=EXPERIMENTS_DIR)
(run_dir / "training_log.txt").write_text(
log_capture.getvalue(), encoding="utf-8"
)
return snapshot, run_dir, df_train, df_dev
finally:
# Restore stdout after running the experiment
sys.stdout = original_stdout
def create_experiment_name(
model_name_short: str, config: BertQAConfig, num_articles: Optional[int] = None
) -> str:
assert (
model_name_short in config.backbone_name
), "Inconsistent model name used for experiment tracking Vs actual model name."
experiment_name = (
f"{model_name_short}_{num_articles}_articles"
if num_articles is not None
else f"{model_name_short}_ALL_articles"
)
return experiment_name
def _load_examples(
path: Path, debug_limit: int | None, split_fraction: float | None
) -> Tuple[
Tuple[Dict[str, QAExample], Dict[str, QAExample] | None],
Tuple[pd.DataFrame, pd.DataFrame | None],
]:
"""
Returns both a dict with QAExample objects and the associated DF for debugging.
Both the debug_limit and the split_fraction are operating on the ARTICLE level Vs
individual example/question level.
- debug_limit: caps the #articles returned for debugging/easier experimentation
- split_fraction: enables train/val splitting based on initial training data
"""
df = load_squad_v2_df(path)
if debug_limit is not None:
all_titles = df[Col.TITLE.value].unique()
assert (
1 <= debug_limit <= len(all_titles)
), f"debug_limit={debug_limit} exceeds {len(all_titles)} available articles"
# df = df.sample(n=debug_limit, random_state=DEBUG_SEED).copy()
sampled_titles = pd.Series(all_titles).sample(
n=debug_limit, random_state=DEBUG_SEED
)
df = df[df[Col.TITLE.value].isin(sampled_titles)].copy()
if split_fraction is not None:
df_train, df_val = split_by_title(df, split_fraction)
train_examples = df_to_examples_map(df_train)
val_examples = df_to_examples_map(df_val)
return (train_examples, val_examples), (df_train, df_val)
else:
examples = df_to_examples_map(df)
return (examples, None), (df, None)
def split_by_title(
df: pd.DataFrame, val_fraction: float
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Split input DF by article title ensuring no title overlap:
this is critical for model generalization to new contexts Vs
simply memorizing passages and responding to new questions about them
(e.g., when splitting the initial training set into 'train' and 'val' subsets).
"""
assert 0 < val_fraction < 1, "val set fraction should be between (0, 1)."
unique_titles = df[Col.TITLE.value].drop_duplicates()
shuffled_titles = unique_titles.sample(frac=1.0, random_state=DEBUG_SEED)
num_unique_titles = len(shuffled_titles)
n_val = max(1, int(num_unique_titles * val_fraction))
val_titles = set(shuffled_titles.iloc[:n_val])
train_titles = set(shuffled_titles.iloc[n_val:])
df_val = df[df[Col.TITLE.value].isin(val_titles)].copy()
df_train = df[df[Col.TITLE.value].isin(train_titles)].copy()
print(
f"Initial split | num-train-examples: {df_train.shape[0]}; num-val-examples: {df_val.shape[0]}"
)
return df_train, df_val