""" Contains a simple experimentation pipeline for the QA system. Benefits: - Completely **plug-and-play**: the users can easily replace models and configs without needing to change pipeline or other code. - Automates experiment tracking/versioning: facilitates experimental iteration. - Offers data splitting routines promoting model generalization & objective perf measuring: a) initial training set gets split into: 'train' Vs 'val' subsets which DO NOT share common articles, such that the 'val' set simulates actual held-out article performance; b) initial 'dev' set can remain untouched until the very end for objective perf measuring """ import pandas as pd from typing import Tuple, Dict, Optional from pathlib import Path import sys from io import StringIO from src.utils.constants import ( EXPERIMENTS_DIR, DEV_DATA_PATH, TRAIN_DATA_PATH, Col, DEBUG_SEED, ) from src.etl.squad_v2_loader import load_squad_v2_df, df_to_examples_map from src.models.base_qa_model import QAModel from src.etl.types import QAExample from src.config.model_configs import BaseModelConfig, BertQAConfig from src.evaluation.evaluator import Evaluator from src.utils.experiment_snapshot import ExperimentSnapshot DEFAULT_VAL_SET_FRACTION = 0.1 class Tee: """ Based on: https://stackoverflow.com/questions/616645/how-to-duplicate-sys-stdout-to-a-log-file Duplicates output to multiple destinations such that experiment tracking can include notebook output. """ def __init__(self, *files): self.files = files def write(self, obj): # Writes to all of the streams for f in self.files: f.write(obj) f.flush() def flush(self): # Flushes all of the streams (ensures text appears immediately) for f in self.files: f.flush() def run_qa_experiment( experiment_name: str, model: QAModel, debug_limit: Optional[int] = None, val_fraction: float = DEFAULT_VAL_SET_FRACTION, ) -> Tuple[ExperimentSnapshot, Path, Optional[pd.DataFrame], Optional[pd.DataFrame]]: """ Basic pipeline for running a QA system experiment. To facilitate debugging: 1. The function can limit the #training examples processed 2. The sampled ETLed input DF is also provided as part of the function return Note that debug_limit is only applied to the training instances; i.e., dev set is not capped. """ # TODO - use proper logging for all of this # Capture output to StringIO while printing to console log_capture = StringIO() original_stdout = sys.stdout sys.stdout = Tee(sys.stdout, log_capture) try: if debug_limit is not None: print(f"{debug_limit} articles will be considered from training in total.") else: print("All articles from training set are considered.") assert TRAIN_DATA_PATH.exists(), "Unspecified train data location." # Note that df_val can be returned for debugging: ignored for now (train_examples, val_examples), (df_train, _) = _load_examples( path=TRAIN_DATA_PATH, debug_limit=debug_limit, split_fraction=val_fraction ) assert DEV_DATA_PATH.exists(), "Unspecified dev data location." # do NOT split dev set -> split_fraction is explicitly set to None (dev_examples, _), (df_dev, _) = _load_examples( path=DEV_DATA_PATH, debug_limit=None, split_fraction=None ) # Sanity checking for non-empty data splits assert len(train_examples) > 0, "train_examples is empty." assert len(dev_examples) > 0, "dev_examples is empty." if val_examples is not None: model.train(train_examples, val_examples=val_examples) else: model.train(train_examples) predictions = model.predict(dev_examples) metrics = Evaluator().evaluate(predictions=predictions, examples=dev_examples) # Save experiment config = getattr(model, "config", None) assert isinstance(config, BaseModelConfig), "Incompatible Config type." snapshot = ExperimentSnapshot( experiment_name=experiment_name, config=config, predictions=predictions, metrics=metrics, model=model, ) print("\n" + "=" * 70) print("FINAL DEV SET RESULTS") print("=" * 70) print(f"Exact Match (EM): {snapshot.metrics.exact_score:.2f}%") print(f"F1 Score: {snapshot.metrics.f1_score:.2f}%") print(f"Total dev examples: {snapshot.metrics.total_num_instances}") print("=" * 70) run_dir = snapshot.save(experiments_root=EXPERIMENTS_DIR) (run_dir / "training_log.txt").write_text( log_capture.getvalue(), encoding="utf-8" ) return snapshot, run_dir, df_train, df_dev finally: # Restore stdout after running the experiment sys.stdout = original_stdout def create_experiment_name( model_name_short: str, config: BertQAConfig, num_articles: Optional[int] = None ) -> str: assert ( model_name_short in config.backbone_name ), "Inconsistent model name used for experiment tracking Vs actual model name." experiment_name = ( f"{model_name_short}_{num_articles}_articles" if num_articles is not None else f"{model_name_short}_ALL_articles" ) return experiment_name def _load_examples( path: Path, debug_limit: int | None, split_fraction: float | None ) -> Tuple[ Tuple[Dict[str, QAExample], Dict[str, QAExample] | None], Tuple[pd.DataFrame, pd.DataFrame | None], ]: """ Returns both a dict with QAExample objects and the associated DF for debugging. Both the debug_limit and the split_fraction are operating on the ARTICLE level Vs individual example/question level. - debug_limit: caps the #articles returned for debugging/easier experimentation - split_fraction: enables train/val splitting based on initial training data """ df = load_squad_v2_df(path) if debug_limit is not None: all_titles = df[Col.TITLE.value].unique() assert ( 1 <= debug_limit <= len(all_titles) ), f"debug_limit={debug_limit} exceeds {len(all_titles)} available articles" # df = df.sample(n=debug_limit, random_state=DEBUG_SEED).copy() sampled_titles = pd.Series(all_titles).sample( n=debug_limit, random_state=DEBUG_SEED ) df = df[df[Col.TITLE.value].isin(sampled_titles)].copy() if split_fraction is not None: df_train, df_val = split_by_title(df, split_fraction) train_examples = df_to_examples_map(df_train) val_examples = df_to_examples_map(df_val) return (train_examples, val_examples), (df_train, df_val) else: examples = df_to_examples_map(df) return (examples, None), (df, None) def split_by_title( df: pd.DataFrame, val_fraction: float ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ Split input DF by article title ensuring no title overlap: this is critical for model generalization to new contexts Vs simply memorizing passages and responding to new questions about them (e.g., when splitting the initial training set into 'train' and 'val' subsets). """ assert 0 < val_fraction < 1, "val set fraction should be between (0, 1)." unique_titles = df[Col.TITLE.value].drop_duplicates() shuffled_titles = unique_titles.sample(frac=1.0, random_state=DEBUG_SEED) num_unique_titles = len(shuffled_titles) n_val = max(1, int(num_unique_titles * val_fraction)) val_titles = set(shuffled_titles.iloc[:n_val]) train_titles = set(shuffled_titles.iloc[n_val:]) df_val = df[df[Col.TITLE.value].isin(val_titles)].copy() df_train = df[df[Col.TITLE.value].isin(train_titles)].copy() print( f"Initial split | num-train-examples: {df_train.shape[0]}; num-val-examples: {df_val.shape[0]}" ) return df_train, df_val