ModelMatrix / matrix /code /runners /run_batch.py
Akshay4506's picture
Fix deployment entry point and merge requirements
c4ff02d
"""
Batch Experiment Runner
========================
Run multiple models on multiple datasets.
Usage:
python -m runners.run_batch \
--datasets config/datasets.yaml \
--models config/models.yaml
Author: UW MSIM Team
Date: April 2026
"""
import argparse
import yaml
import logging
import sys
import os
import json
import time
from pathlib import Path
from typing import List, Dict, Optional
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from runners.run_experiment import run_single_experiment, get_model
logger = logging.getLogger(__name__)
def get_dataset_list(datasets_config: dict, dataset_dir: str = None) -> List[str]:
"""
Get list of available dataset names from the download directory.
Parameters
----------
datasets_config : dict
Datasets YAML configuration
dataset_dir : str
Directory containing downloaded datasets
Returns
-------
datasets : list of str
List of dataset names
"""
datasets = []
if dataset_dir is None:
dataset_dir = str(Path(__file__).parent.parent.parent / 'datasets')
if os.path.isdir(dataset_dir):
# Find all *_X.csv files and extract dataset names
for f in sorted(os.listdir(dataset_dir)):
if f.endswith('_X.csv'):
name = f[:-6] # Remove '_X.csv'
# Verify y file also exists
y_file = os.path.join(dataset_dir, f"{name}_y.csv")
if os.path.exists(y_file):
datasets.append(name)
logger.info(f"Found {len(datasets)} datasets in {dataset_dir}")
else:
logger.warning(f"Dataset directory not found: {dataset_dir}")
return datasets
def get_model_list(models_config: dict) -> List[str]:
"""
Get list of enabled model names from configuration.
Parameters
----------
models_config : dict
Models YAML configuration
Returns
-------
models : list of str
List of enabled model names
"""
models = []
for model_entry in models_config.get('models', []):
if model_entry.get('enabled', True):
models.append(model_entry['name'])
return models
def run_batch_experiments(
datasets: List[str],
models: List[str],
experiment_config: dict,
output_dir: str = '../results/raw',
skip_existing: bool = True
) -> dict:
"""
Run experiments for all dataset × model combinations.
Parameters
----------
datasets : list of str
Dataset names
models : list of str
Model names
experiment_config : dict
Experiment configuration (n_folds, random_state, etc.)
output_dir : str
Where to save results
skip_existing : bool
If True, skip experiments that already have result files
Returns
-------
summary : dict
Batch run summary with successes and failures
"""
total_experiments = len(datasets) * len(models)
logger.info(f"\n{'='*60}")
logger.info(f"BATCH RUN: {len(datasets)} datasets × {len(models)} models = {total_experiments} experiments")
logger.info(f"{'='*60}\n")
summary = {
'total': total_experiments,
'completed': 0,
'skipped': 0,
'failed': 0,
'results': [],
'errors': []
}
batch_start_time = time.time()
for i, dataset_name in enumerate(datasets):
for j, model_name in enumerate(models):
experiment_num = i * len(models) + j + 1
output_file = os.path.join(output_dir, f"{dataset_name}_{model_name}.json")
# Skip existing results
if skip_existing and os.path.exists(output_file):
logger.info(
f"[{experiment_num}/{total_experiments}] "
f"SKIP {model_name} on {dataset_name} (result exists)"
)
summary['skipped'] += 1
continue
logger.info(
f"\n[{experiment_num}/{total_experiments}] "
f"Running {model_name} on {dataset_name}..."
)
try:
result = run_single_experiment(
dataset_name=dataset_name,
model_name=model_name,
config=experiment_config,
output_dir=output_dir
)
summary['completed'] += 1
summary['results'].append({
'dataset': dataset_name,
'model': model_name,
'status': 'success'
})
except Exception as e:
logger.error(f"FAILED: {model_name} on {dataset_name}: {e}")
summary['failed'] += 1
summary['errors'].append({
'dataset': dataset_name,
'model': model_name,
'error': str(e)
})
batch_elapsed = time.time() - batch_start_time
# Print summary
logger.info(f"\n{'='*60}")
logger.info(f"BATCH RUN COMPLETE")
logger.info(f"{'='*60}")
logger.info(f" Total experiments: {summary['total']}")
logger.info(f" Completed: {summary['completed']}")
logger.info(f" Skipped: {summary['skipped']}")
logger.info(f" Failed: {summary['failed']}")
logger.info(f" Total time: {batch_elapsed / 3600:.2f} hours")
logger.info(f"{'='*60}\n")
# Save batch summary
os.makedirs(output_dir, exist_ok=True)
summary_file = os.path.join(output_dir, '_batch_summary.json')
summary['elapsed_hours'] = batch_elapsed / 3600
with open(summary_file, 'w') as f:
json.dump(summary, f, indent=2)
logger.info(f"Batch summary saved to {summary_file}")
return summary
def main():
"""Entry point for batch runner."""
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Parse arguments
parser = argparse.ArgumentParser(description='Run batch benchmarking experiments')
parser.add_argument('--datasets', default='config/datasets.yaml',
help='Datasets config file')
parser.add_argument('--models', default='config/models.yaml',
help='Models config file')
parser.add_argument('--config', default='config/experiments.yaml',
help='Experiment config file')
parser.add_argument('--output-dir', default='../results/raw',
help='Output directory')
parser.add_argument('--dataset-dir', default=None,
help='Directory containing downloaded datasets')
parser.add_argument('--no-skip', action='store_true',
help='Re-run experiments even if results exist')
parser.add_argument('--model-filter', nargs='*', default=None,
help='Only run specific models (e.g., --model-filter sap-rpt1-hf xgboost)')
parser.add_argument('--dataset-filter', nargs='*', default=None,
help='Only run specific datasets')
args = parser.parse_args()
# Load configs
if os.path.exists(args.datasets):
with open(args.datasets) as f:
datasets_config = yaml.safe_load(f)
else:
datasets_config = {}
if os.path.exists(args.models):
with open(args.models) as f:
models_config = yaml.safe_load(f)
else:
models_config = {}
if os.path.exists(args.config):
with open(args.config) as f:
experiment_config = yaml.safe_load(f)
else:
experiment_config = {
'n_folds': 10,
'random_state': 42,
'cost_per_hour': 0.90,
'gpu_type': 'H200'
}
# Get dataset and model lists
dataset_list = args.dataset_filter or get_dataset_list(datasets_config, args.dataset_dir)
model_list = args.model_filter or get_model_list(models_config)
if not dataset_list:
print("[ERROR] No datasets found in the datasets directory.")
sys.exit(1)
if not model_list:
print("[ERROR] No models enabled in config. Check config/models.yaml")
sys.exit(1)
print(f"\n[INFO] Datasets ({len(dataset_list)}): {dataset_list[:5]}{'...' if len(dataset_list) > 5 else ''}")
print(f"[INFO] Models ({len(model_list)}): {model_list}")
# Add dataset_dir to config for run_experiment to use
experiment_config['dataset_dir'] = args.dataset_dir if args.dataset_dir else str(Path(__file__).parent.parent.parent / 'datasets')
# Run batch
summary = run_batch_experiments(
datasets=dataset_list,
models=model_list,
experiment_config=experiment_config,
output_dir=args.output_dir,
skip_existing=not args.no_skip
)
print(f"\n[SUCCESS] Batch complete! {summary['completed']} succeeded, {summary['failed']} failed")
if __name__ == "__main__":
main()