File size: 2,327 Bytes
89ca667
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import os
import logging
import duckdb
from pathlib import Path

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def store_in_duckdb(data_dir: str, db_path: str):
    """
    Reads the raw CSV files and stores them into DuckDB as Parquet-backed tables.
    This simulates a Snowflake ingestion process.
    """
    logging.info(f"Ingesting raw data into DuckDB at {db_path}...")
    train_csv = os.path.join(data_dir, 'train.csv')
    
    if not os.path.exists(train_csv):
        logging.error(f"File {train_csv} not found.")
        return
        
    con = duckdb.connect(db_path)
    
    # Create table from CSV
    logging.info("Ingesting main sales file train.csv into table raw_sales...")
    con.execute(f"CREATE TABLE IF NOT EXISTS raw_sales AS SELECT * FROM read_csv_auto('{train_csv}')")
    
    # Load auxiliary tables if they exist
    aux_files = {
        'stores': 'stores.csv',
        'oil': 'oil.csv',
        'holidays_events': 'holidays_events.csv',
        'transactions': 'transactions.csv'
    }
    
    for table_name, file_name in aux_files.items():
        file_path = os.path.join(data_dir, file_name)
        if os.path.exists(file_path):
            logging.info(f"Ingesting auxiliary file {file_name} into table {table_name}...")
            con.execute(f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM read_csv_auto('{file_path}')")
            
    logging.info("All available data successfully ingested into DuckDB.")
    con.close()

if __name__ == "__main__":
    # Define paths
    project_dir = Path(__file__).resolve().parents[2]
    raw_data_dir = os.path.join(project_dir, "data", "raw")
    db_path = os.path.join(project_dir, "data", "warehouse.duckdb")
    
    os.makedirs(raw_data_dir, exist_ok=True)
    
    train_path = os.path.join(raw_data_dir, 'train.csv')
    if not os.path.exists(train_path):
        raise FileNotFoundError(f"Kaggle data not found at {train_path}. Please ensure your Kaggle CSV files are placed in the data/raw directory.")
    else:
        logging.info("Found Kaggle data in data/raw. Proceeding to DuckDB ingestion.")
        
    # Ingest to simulated Data Warehouse (DuckDB)
    store_in_duckdb(raw_data_dir, db_path)
    logging.info("Data ingestion phase complete.")