Spaces:
Build error
Build error
File size: 3,621 Bytes
ea6f215 3635bbe ea6f215 3635bbe ea6f215 3635bbe ea6f215 3635bbe ea6f215 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
"""
Data ingestion and processing modules for Rossmann Store Sales.
"""
import os
from abc import ABC, abstractmethod
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from src.core import setup_logger
logger = setup_logger(__name__)
# --- INGESTION ---
class DataIngestor(ABC):
@abstractmethod
def ingest(self, file_path: str) -> pd.DataFrame:
pass
class RossmannDataIngestor(DataIngestor):
def ingest(self, file_path: str) -> pd.DataFrame:
from src.config import global_config
logger.info(f"Ingesting Rossmann sales data from {file_path}")
df = pd.read_csv(file_path, low_memory=False)
# Use config for store path, fallback to sibling 'store.csv'
store_path = global_config.data.store_path
if not store_path:
data_dir = os.path.dirname(file_path)
store_path = os.path.join(data_dir, "store.csv")
if os.path.exists(store_path):
logger.info(f"Merging with store metadata from {store_path}")
store_df = pd.read_csv(store_path)
# Ensure Date is datetime for merging logic if needed, though usually merge is on Store
if 'Date' in df.columns:
df['Date'] = pd.to_datetime(df['Date'])
df = pd.merge(df, store_df, on='Store', how='left')
else:
logger.warning(f"Store metadata not found at {store_path}. Proceeding with sales data only.")
return df
class DataIngestorFactory:
@staticmethod
def get_data_ingestor(dataset_name: str) -> DataIngestor:
if "rossmann" in dataset_name.lower():
return RossmannDataIngestor()
raise ValueError(f"No ingestor available for dataset: {dataset_name}")
# --- PROCESSING / CLEANING ---
class MissingValueHandlingStrategy(ABC):
@abstractmethod
def handle(self, df: pd.DataFrame) -> pd.DataFrame:
pass
class FillMissingValuesStrategy(MissingValueHandlingStrategy):
def __init__(self, method: str = "mean", fill_value: any = None):
self.method = method
self.fill_value = fill_value
def handle(self, df: pd.DataFrame) -> pd.DataFrame:
df_cleaned = df.copy()
if self.method == "mean":
numeric_columns = df_cleaned.select_dtypes(include="number").columns
df_cleaned[numeric_columns] = df_cleaned[numeric_columns].fillna(df[numeric_columns].mean())
elif self.method == "constant":
df_cleaned = df_cleaned.fillna(self.fill_value)
return df_cleaned
# --- OUTLIER DETECTION ---
class OutlierDetectionStrategy(ABC):
@abstractmethod
def detect_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
pass
class IQROutlierDetection(OutlierDetectionStrategy):
def detect_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
Q1 = df.quantile(0.25)
Q3 = df.quantile(0.75)
IQR = Q3 - Q1
return (df < (Q1 - 1.5 * IQR)) | (df > (Q3 + 1.5 * IQR))
# --- SPLITTING ---
class DataSplittingStrategy(ABC):
@abstractmethod
def split_data(self, df: pd.DataFrame, target_column: str):
pass
class SimpleTrainTestSplitStrategy(DataSplittingStrategy):
def __init__(self, test_size: float = 0.2, random_state: int = 42):
self.test_size = test_size
self.random_state = random_state
def split_data(self, df: pd.DataFrame, target_column: str):
X = df.drop(columns=[target_column])
y = df[target_column]
return train_test_split(X, y, test_size=self.test_size, random_state=self.random_state)
|