File size: 3,621 Bytes
ea6f215
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3635bbe
ea6f215
 
3635bbe
 
 
 
 
 
ea6f215
 
 
 
3635bbe
 
 
 
ea6f215
 
3635bbe
ea6f215
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""
Data ingestion and processing modules for Rossmann Store Sales.
"""

import os
from abc import ABC, abstractmethod
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from src.core import setup_logger

logger = setup_logger(__name__)

# --- INGESTION ---

class DataIngestor(ABC):
    @abstractmethod
    def ingest(self, file_path: str) -> pd.DataFrame:
        pass

class RossmannDataIngestor(DataIngestor):
    def ingest(self, file_path: str) -> pd.DataFrame:
        from src.config import global_config
        logger.info(f"Ingesting Rossmann sales data from {file_path}")
        df = pd.read_csv(file_path, low_memory=False)
        
        # Use config for store path, fallback to sibling 'store.csv'
        store_path = global_config.data.store_path
        if not store_path:
             data_dir = os.path.dirname(file_path)
             store_path = os.path.join(data_dir, "store.csv")

        if os.path.exists(store_path):
            logger.info(f"Merging with store metadata from {store_path}")
            store_df = pd.read_csv(store_path)
            # Ensure Date is datetime for merging logic if needed, though usually merge is on Store
            if 'Date' in df.columns:
                df['Date'] = pd.to_datetime(df['Date'])
            
            df = pd.merge(df, store_df, on='Store', how='left')
        else:
            logger.warning(f"Store metadata not found at {store_path}. Proceeding with sales data only.")
        return df

class DataIngestorFactory:
    @staticmethod
    def get_data_ingestor(dataset_name: str) -> DataIngestor:
        if "rossmann" in dataset_name.lower():
            return RossmannDataIngestor()
        raise ValueError(f"No ingestor available for dataset: {dataset_name}")

# --- PROCESSING / CLEANING ---

class MissingValueHandlingStrategy(ABC):
    @abstractmethod
    def handle(self, df: pd.DataFrame) -> pd.DataFrame:
        pass

class FillMissingValuesStrategy(MissingValueHandlingStrategy):
    def __init__(self, method: str = "mean", fill_value: any = None):
        self.method = method
        self.fill_value = fill_value

    def handle(self, df: pd.DataFrame) -> pd.DataFrame:
        df_cleaned = df.copy()
        if self.method == "mean":
            numeric_columns = df_cleaned.select_dtypes(include="number").columns
            df_cleaned[numeric_columns] = df_cleaned[numeric_columns].fillna(df[numeric_columns].mean())
        elif self.method == "constant":
            df_cleaned = df_cleaned.fillna(self.fill_value)
        return df_cleaned

# --- OUTLIER DETECTION ---

class OutlierDetectionStrategy(ABC):
    @abstractmethod
    def detect_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
        pass

class IQROutlierDetection(OutlierDetectionStrategy):
    def detect_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
        Q1 = df.quantile(0.25)
        Q3 = df.quantile(0.75)
        IQR = Q3 - Q1
        return (df < (Q1 - 1.5 * IQR)) | (df > (Q3 + 1.5 * IQR))

# --- SPLITTING ---

class DataSplittingStrategy(ABC):
    @abstractmethod
    def split_data(self, df: pd.DataFrame, target_column: str):
        pass

class SimpleTrainTestSplitStrategy(DataSplittingStrategy):
    def __init__(self, test_size: float = 0.2, random_state: int = 42):
        self.test_size = test_size
        self.random_state = random_state

    def split_data(self, df: pd.DataFrame, target_column: str):
        X = df.drop(columns=[target_column])
        y = df[target_column]
        return train_test_split(X, y, test_size=self.test_size, random_state=self.random_state)