# ============================================ # CLASS 2: DATA LOADER # ============================================ from datetime import datetime import hashlib import json import traceback from typing import Dict, List, Optional from venv import logger from config.config import Config, DataType import numpy as np import pandas as pd class DataLoader: """Class for loading and initial data processing""" def __init__(self, config: Config): """ Initialise data loader Parameters: ----------- config : Config Experiment configuration """ self.config = config self.data = None self.metadata = {} self.data_hash = None self.loading_time = None self.data_types = {} self.original_shape = None def load_from_csv( self, data_path: Optional[str] = None, parse_dates: List[str] = None, date_format: str = None, dtype: Dict = None, **kwargs ) -> pd.DataFrame: """ Load data from CSV file Parameters: ----------- data_path : str, optional Path to CSV file. If None, uses path from configuration. parse_dates : List[str], optional List of columns to parse as dates date_format : str, optional Date format dtype : Dict, optional Data types for columns **kwargs : dict Additional parameters for pd.read_csv Returns: -------- pd.DataFrame Loaded data """ logger.info("="*80) logger.info("LOADING DATA FROM CSV") logger.info("="*80) start_time = datetime.now() try: path = data_path or self.config.data_path if parse_dates is None: parse_dates = ['date'] # Load data self.data = pd.read_csv( path, parse_dates=parse_dates, dayfirst=False, dtype=dtype, **kwargs ) # Convert dates if needed for date_col in parse_dates: if date_col in self.data.columns: if date_format: self.data[date_col] = pd.to_datetime( self.data[date_col], format=date_format, errors='coerce' ) else: self.data[date_col] = pd.to_datetime( self.data[date_col], errors='coerce' ) # Save original shape self.original_shape = self.data.shape # Filter by years if 'date' in self.data.columns: mask = (self.data['date'].dt.year >= self.config.start_year) & \ (self.data['date'].dt.year <= self.config.end_year) self.data = self.data.loc[mask].copy() # Sort by date if 'date' in self.data.columns: self.data = self.data.sort_values('date').reset_index(drop=True) # Set date as index self.data.set_index('date', inplace=True) # Calculate data hash self.data_hash = self._calculate_data_hash() # Analyse data types self._analyse_data_types() # Save metadata self._save_metadata() # Loading time self.loading_time = (datetime.now() - start_time).total_seconds() logger.info(f"✓ Loaded {len(self.data)} records, {len(self.data.columns)} columns") logger.info(f" Period: {self.data.index.min()} - {self.data.index.max()}") logger.info(f" Data types: {self.data_types}") logger.info(f" Target variable: {self.config.target_column}") logger.info(f" Loading time: {self.loading_time:.2f} sec") return self.data except Exception as e: logger.error(f"✗ Error loading data: {e}") logger.error(traceback.format_exc()) raise def create_synthetic_data( self, n_days: int = 365*21, trend_strength: float = 0.01, seasonal_amplitude: List[float] = None, noise_std: float = 10, include_exogenous: bool = True, random_state: int = 42 ) -> pd.DataFrame: """ Create synthetic data for testing Parameters: ----------- n_days : int Number of days to generate trend_strength : float Trend strength seasonal_amplitude : List[float], optional Seasonal component amplitudes noise_std : float Noise standard deviation include_exogenous : bool Whether to include exogenous variables random_state : int Seed for reproducibility Returns: -------- pd.DataFrame Synthetic data """ logger.info("="*80) logger.info("CREATING SYNTHETIC DATA") logger.info("="*80) if seasonal_amplitude is None: seasonal_amplitude = [50, 30, 20] np.random.seed(random_state) # Generate dates dates = pd.date_range( start=f'{self.config.start_year}-01-01', periods=n_days, freq='D' ) t = np.arange(n_days) # Base components trend = trend_strength * t # Seasonal components seasonal = 0 periods = [365, 30, 7] # yearly, monthly, weekly seasonality for i, (period, amplitude) in enumerate(zip(periods, seasonal_amplitude)): seasonal += amplitude * np.sin(2 * np.pi * t / period) if i < len(seasonal_amplitude) - 1: seasonal += 0.5 * amplitude * np.cos(4 * np.pi * t / period) # Cyclical component (business cycles) cycle = 20 * np.sin(2 * np.pi * t / (365*5)) # 5-year cycle # Noise noise = np.random.normal(0, noise_std, n_days) # Generate target variable raskhodvoda = 100 + trend + seasonal + cycle + noise # Create DataFrame self.data = pd.DataFrame( index=dates, data={'raskhodvoda': raskhodvoda} ) # Generate exogenous variables if include_exogenous: # Temperature with seasonality tavg = 10 + 8 * np.sin(2 * np.pi * t / 365) + np.random.normal(0, 3, n_days) tmin = tavg - 5 + np.random.normal(0, 2, n_days) tmax = tavg + 5 + np.random.normal(0, 2, n_days) # Water level with trend and seasonality urovenvoda = 200 + 0.5 * t + 20 * np.sin(2 * np.pi * t / 365) + np.random.normal(0, 5, n_days) # Add to DataFrame self.data['tavg'] = tavg self.data['tmin'] = tmin self.data['tmax'] = tmax self.data['urovenvoda'] = urovenvoda # Add noisy lags for lag in [1, 7, 30]: self.data[f'tavg_lag_{lag}'] = self.data['tavg'].shift(lag) + np.random.normal(0, 1, n_days) # Add missing values and outliers for testing if n_days > 100: # Missing values (5% of data) mask_missing = np.random.random(n_days) < 0.05 self.data.loc[mask_missing, 'tavg'] = np.nan # Outliers (1% of data) mask_outliers = np.random.random(n_days) < 0.01 self.data.loc[mask_outliers, 'raskhodvoda'] *= 2 # Save metadata self.metadata.update({ 'is_synthetic': True, 'synthetic_params': { 'n_days': n_days, 'trend_strength': trend_strength, 'seasonal_amplitude': seasonal_amplitude, 'noise_std': noise_std, 'include_exogenous': include_exogenous, 'random_state': random_state } }) logger.info(f"✓ Created {len(self.data)} synthetic records") logger.info(f" Columns: {list(self.data.columns)}") return self.data def _calculate_data_hash(self) -> str: """Calculate data hash for tracking changes""" if self.data is None: return None # Use hash of first 1000 rows and metadata sample = self.data.head(1000).to_string().encode() return hashlib.md5(sample).hexdigest() def _analyse_data_types(self) -> None: """Analyse data types in DataFrame""" if self.data is None: return for col in self.data.columns: dtype = str(self.data[col].dtype) if 'datetime' in dtype: self.data_types[col] = DataType.TEMPORAL.value elif 'int' in dtype or 'float' in dtype: self.data_types[col] = DataType.NUMERIC.value elif 'object' in dtype or 'category' in dtype: # Check if categorical unique_ratio = self.data[col].nunique() / len(self.data) if unique_ratio < 0.1: # Less than 10% unique values self.data_types[col] = DataType.CATEGORICAL.value else: self.data_types[col] = DataType.TEXT.value else: self.data_types[col] = 'unknown' def _save_metadata(self) -> None: """Save data metadata""" if self.data is None: return # Basic metadata self.metadata.update({ 'original_shape': list(self.original_shape) if self.original_shape else [], 'current_shape': list(self.data.shape), 'columns': list(self.data.columns), 'data_types': self.data_types, 'date_range': { 'min': self.data.index.min().strftime('%Y-%m-%d') if pd.notnull(self.data.index.min()) else None, 'max': self.data.index.max().strftime('%Y-%m-%d') if pd.notnull(self.data.index.max()) else None }, 'data_hash': self.data_hash, 'loading_time': self.loading_time }) # Statistics for numeric columns numeric_cols = self.data.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: stats = self.data[numeric_cols].describe().to_dict() # Add additional statistics for col in numeric_cols: stats[col]['skewness'] = float(self.data[col].skew()) stats[col]['kurtosis'] = float(self.data[col].kurtosis()) stats[col]['cv'] = float(self.data[col].std() / self.data[col].mean()) if self.data[col].mean() != 0 else np.nan self.metadata['numeric_statistics'] = stats # Missing values information missing_info = { 'total_missing': int(self.data.isnull().sum().sum()), 'missing_by_column': self.data.isnull().sum().to_dict(), 'missing_percentage': (self.data.isnull().sum() / len(self.data) * 100).to_dict(), 'rows_with_missing': int(self.data.isnull().any(axis=1).sum()), 'columns_with_missing': self.data.columns[self.data.isnull().any()].tolist() } self.metadata['missing_info'] = missing_info def get_data_info(self) -> Dict: """Get information about data""" if self.data is None: return {} info = { 'shape': list(self.data.shape), 'columns': list(self.data.columns), 'data_types': self.data_types, 'date_range': { 'min': self.data.index.min().strftime('%Y-%m-%d') if pd.notnull(self.data.index.min()) else None, 'max': self.data.index.max().strftime('%Y-%m-%d') if pd.notnull(self.data.index.max()) else None }, 'target_column': self.config.target_column, 'numeric_columns': self.data.select_dtypes(include=[np.number]).columns.tolist(), 'categorical_columns': [col for col, dtype in self.data_types.items() if dtype == DataType.CATEGORICAL.value], 'missing_info': self.metadata.get('missing_info', {}) } return info def save_raw_data_info(self) -> None: """Save raw data information""" if self.data is None: return info_path = f'{self.config.results_dir}/reports/raw_data_info.json' # Custom JSON encoder for handling numpy types class NumpyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, (np.integer, np.floating)): if np.isnan(obj): return None return float(obj) elif isinstance(obj, np.bool_): return bool(obj) elif isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, pd.Timestamp): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, pd.Period): return str(obj) return super().default(obj) with open(info_path, 'w', encoding='utf-8') as f: json.dump(self.metadata, f, indent=4, ensure_ascii=False, cls=NumpyEncoder) logger.info(f"✓ Raw data information saved: {info_path}") def resample_data( self, freq: str = None, method: str = 'mean' ) -> pd.DataFrame: """ Resample time series data Parameters: ----------- freq : str, optional New frequency (e.g., 'D', 'W', 'M') method : str Aggregation method: 'mean', 'sum', 'last', 'first' Returns: -------- pd.DataFrame Resampled data """ if self.data is None: logger.warning("Data not loaded") return None freq = freq or self.config.freq # Check if index is datetime if not isinstance(self.data.index, pd.DatetimeIndex): logger.error("Data index is not DatetimeIndex") return self.data # Aggregation methods agg_methods = { 'mean': np.mean, 'sum': np.sum, 'last': lambda x: x.iloc[-1], 'first': lambda x: x.iloc[0], 'min': np.min, 'max': np.max, 'median': np.median } if method not in agg_methods: logger.warning(f"Method {method} not supported, using mean") method = 'mean' # Resampling try: if method == 'last': resampled_data = self.data.resample(freq).last() elif method == 'first': resampled_data = self.data.resample(freq).first() else: resampled_data = self.data.resample(freq).agg(agg_methods[method]) logger.info(f"Data resampled to frequency {freq}, method {method}") logger.info(f"Size before: {len(self.data)}, after: {len(resampled_data)}") self.data = resampled_data return self.data except Exception as e: logger.error(f"Error during resampling: {e}") return self.data def detect_frequency(self) -> str: """ Automatically detect data frequency Returns: -------- str Detected data frequency """ if self.data is None or len(self.data) < 2: return 'unknown' if not isinstance(self.data.index, pd.DatetimeIndex): return 'irregular' # Calculate differences between timestamps diffs = pd.Series(self.data.index).diff().dropna() if len(diffs) == 0: return 'unknown' # Most frequent difference mode_diff = diffs.mode().iloc[0] if not diffs.mode().empty else diffs.iloc[0] # Determine frequency if mode_diff < pd.Timedelta('1 hour'): return 'H' # Hourly elif mode_diff < pd.Timedelta('1 day'): return 'D' # Daily elif mode_diff < pd.Timedelta('7 days'): return 'W' # Weekly elif mode_diff < pd.Timedelta('30 days'): return 'M' # Monthly elif mode_diff < pd.Timedelta('90 days'): return 'Q' # Quarterly else: return 'Y' # Yearly