import os import shutil from pathlib import Path from mlpipeline.entity import DataIngestionConfig, DataIngestionArtifact from mlpipeline.logging.logger import get_logger from mlpipeline.exception import DataIngestionException import sys logger = get_logger(__name__) class DataIngestion: def __init__(self, config: DataIngestionConfig): self.config = config self.kaggle_api = None def _authenticate_kaggle(self): """Lazy authentication - only when needed""" if self.kaggle_api is None: try: from kaggle.api.kaggle_api_extended import KaggleApi self.kaggle_api = KaggleApi() self.kaggle_api.authenticate() logger.info("Kaggle API authenticated successfully") except Exception as e: raise DataIngestionException( f"Failed to authenticate with Kaggle API: {e}", sys ) def download_data(self) -> DataIngestionArtifact: try: logger.info("Starting data ingestion") # Authenticate only when downloading self._authenticate_kaggle() os.makedirs(self.config.root_dir, exist_ok=True) competition_name = "playground-series-s6e2" logger.info(f"Downloading dataset from Kaggle competition: {competition_name}") self.kaggle_api.competition_download_files( competition_name, path=self.config.root_dir ) zip_file = self.config.root_dir / f"{competition_name}.zip" if zip_file.exists(): logger.info(f"Extracting {zip_file}") shutil.unpack_archive(zip_file, self.config.unzip_dir) zip_file.unlink() train_file = self.config.unzip_dir / "train.csv" test_file = self.config.unzip_dir / "test.csv" if train_file.exists() and test_file.exists(): train_raw = self.config.root_dir / "train_raw.csv" test_raw = self.config.root_dir / "test_raw.csv" shutil.copy(train_file, train_raw) shutil.copy(test_file, test_raw) logger.info(f"Data saved: {train_raw}, {test_raw}") return DataIngestionArtifact( data_file_path=train_raw, is_ingested=True, message="Data ingestion completed successfully" ) else: raise FileNotFoundError("Train or test file not found after extraction") except Exception as e: raise DataIngestionException(str(e), sys)