| import os |
| import logging |
| import duckdb |
| from pathlib import Path |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
| def store_in_duckdb(data_dir: str, db_path: str): |
| """ |
| Reads the raw CSV files and stores them into DuckDB as Parquet-backed tables. |
| This simulates a Snowflake ingestion process. |
| """ |
| logging.info(f"Ingesting raw data into DuckDB at {db_path}...") |
| train_csv = os.path.join(data_dir, 'train.csv') |
| |
| if not os.path.exists(train_csv): |
| logging.error(f"File {train_csv} not found.") |
| return |
| |
| con = duckdb.connect(db_path) |
| |
| |
| logging.info("Ingesting main sales file train.csv into table raw_sales...") |
| con.execute(f"CREATE TABLE IF NOT EXISTS raw_sales AS SELECT * FROM read_csv_auto('{train_csv}')") |
| |
| |
| aux_files = { |
| 'stores': 'stores.csv', |
| 'oil': 'oil.csv', |
| 'holidays_events': 'holidays_events.csv', |
| 'transactions': 'transactions.csv' |
| } |
| |
| for table_name, file_name in aux_files.items(): |
| file_path = os.path.join(data_dir, file_name) |
| if os.path.exists(file_path): |
| logging.info(f"Ingesting auxiliary file {file_name} into table {table_name}...") |
| con.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM read_csv_auto('{file_path}')") |
| |
| logging.info("All available data successfully ingested into DuckDB.") |
| con.close() |
|
|
| if __name__ == "__main__": |
| |
| project_dir = Path(__file__).resolve().parents[2] |
| raw_data_dir = os.path.join(project_dir, "data", "raw") |
| db_path = os.path.join(project_dir, "data", "warehouse.duckdb") |
| |
| os.makedirs(raw_data_dir, exist_ok=True) |
| |
| train_path = os.path.join(raw_data_dir, 'train.csv') |
| if not os.path.exists(train_path): |
| raise FileNotFoundError(f"Kaggle data not found at {train_path}. Please ensure your Kaggle CSV files are placed in the data/raw directory.") |
| else: |
| logging.info("Found Kaggle data in data/raw. Proceeding to DuckDB ingestion.") |
| |
| |
| store_in_duckdb(raw_data_dir, db_path) |
| logging.info("Data ingestion phase complete.") |
|
|