File size: 1,994 Bytes
bb21b5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""Ingest task: read the raw Kaggle CSV, validate schema, write parquet.

Data source: Kaggle Playground Series S5E6 — F1 Pit-Stop Prediction.
To pull from scratch:
    kaggle competitions download -c playground-series-s5e6 -p data/ && unzip data/*.zip -d data/

This script assumes the CSV is already at `data/train.csv` (Member A's pull).
"""

from __future__ import annotations

import logging

import pandas as pd

from src import config

logger = logging.getLogger(__name__)


def run() -> str:
    config.ensure_dirs()
    if not config.RAW_TRAIN_CSV.exists():
        raise FileNotFoundError(
            f"Raw CSV not found at {config.RAW_TRAIN_CSV}. "
            "Pull from Kaggle: `kaggle competitions download -c playground-series-s5e6 -p data/` "
            "and unzip into data/, or restore via `dvc checkout`."
        )
    logger.info("Reading raw CSV from %s", config.RAW_TRAIN_CSV)
    df = pd.read_csv(config.RAW_TRAIN_CSV)
    _validate_schema(df)
    df.to_parquet(config.RAW_PARQUET, index=False)
    logger.info("Wrote %s rows to %s", len(df), config.RAW_PARQUET)
    return str(config.RAW_PARQUET)


def _validate_schema(df: pd.DataFrame) -> None:
    missing = [c for c in config.RAW_COLUMNS if c not in df.columns]
    if missing:
        raise ValueError(f"Raw CSV missing expected columns: {missing}")
    extra = [c for c in df.columns if c not in config.RAW_COLUMNS]
    if extra:
        raise ValueError(f"Raw CSV has unexpected columns: {extra}")
    years = sorted(df["Year"].unique().tolist())
    expected_years = list(config.TRAIN_YEARS) + list(config.TEST_YEARS)
    if years != expected_years:
        raise ValueError(f"Unexpected Year values: {years}, expected {expected_years}")


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(message)s")
    path = run()
    df = pd.read_parquet(path)
    print(f"\nIngest complete: {len(df):,} rows, {len(df.columns)} cols -> {path}")
    print(df.head())