Spaces:
Sleeping
Sleeping
| # ============================================ | |
| # CLASS 2: DATA LOADER | |
| # ============================================ | |
| from datetime import datetime | |
| import hashlib | |
| import json | |
| import traceback | |
| from typing import Dict, List, Optional | |
| from venv import logger | |
| from config.config import Config, DataType | |
| import numpy as np | |
| import pandas as pd | |
| class DataLoader: | |
| """Class for loading and initial data processing""" | |
| def __init__(self, config: Config): | |
| """ | |
| Initialise data loader | |
| Parameters: | |
| ----------- | |
| config : Config | |
| Experiment configuration | |
| """ | |
| self.config = config | |
| self.data = None | |
| self.metadata = {} | |
| self.data_hash = None | |
| self.loading_time = None | |
| self.data_types = {} | |
| self.original_shape = None | |
| def load_from_csv( | |
| self, | |
| data_path: Optional[str] = None, | |
| parse_dates: List[str] = None, | |
| date_format: str = None, | |
| dtype: Dict = None, | |
| **kwargs | |
| ) -> pd.DataFrame: | |
| """ | |
| Load data from CSV file | |
| Parameters: | |
| ----------- | |
| data_path : str, optional | |
| Path to CSV file. If None, uses path from configuration. | |
| parse_dates : List[str], optional | |
| List of columns to parse as dates | |
| date_format : str, optional | |
| Date format | |
| dtype : Dict, optional | |
| Data types for columns | |
| **kwargs : dict | |
| Additional parameters for pd.read_csv | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Loaded data | |
| """ | |
| logger.info("="*80) | |
| logger.info("LOADING DATA FROM CSV") | |
| logger.info("="*80) | |
| start_time = datetime.now() | |
| try: | |
| path = data_path or self.config.data_path | |
| if parse_dates is None: | |
| parse_dates = ['date'] | |
| # Load data | |
| self.data = pd.read_csv( | |
| path, | |
| parse_dates=parse_dates, | |
| dayfirst=False, | |
| dtype=dtype, | |
| **kwargs | |
| ) | |
| # Convert dates if needed | |
| for date_col in parse_dates: | |
| if date_col in self.data.columns: | |
| if date_format: | |
| self.data[date_col] = pd.to_datetime( | |
| self.data[date_col], | |
| format=date_format, | |
| errors='coerce' | |
| ) | |
| else: | |
| self.data[date_col] = pd.to_datetime( | |
| self.data[date_col], | |
| errors='coerce' | |
| ) | |
| # Save original shape | |
| self.original_shape = self.data.shape | |
| # Filter by years | |
| if 'date' in self.data.columns: | |
| mask = (self.data['date'].dt.year >= self.config.start_year) & \ | |
| (self.data['date'].dt.year <= self.config.end_year) | |
| self.data = self.data.loc[mask].copy() | |
| # Sort by date | |
| if 'date' in self.data.columns: | |
| self.data = self.data.sort_values('date').reset_index(drop=True) | |
| # Set date as index | |
| self.data.set_index('date', inplace=True) | |
| # Calculate data hash | |
| self.data_hash = self._calculate_data_hash() | |
| # Analyse data types | |
| self._analyse_data_types() | |
| # Save metadata | |
| self._save_metadata() | |
| # Loading time | |
| self.loading_time = (datetime.now() - start_time).total_seconds() | |
| logger.info(f"✓ Loaded {len(self.data)} records, {len(self.data.columns)} columns") | |
| logger.info(f" Period: {self.data.index.min()} - {self.data.index.max()}") | |
| logger.info(f" Data types: {self.data_types}") | |
| logger.info(f" Target variable: {self.config.target_column}") | |
| logger.info(f" Loading time: {self.loading_time:.2f} sec") | |
| return self.data | |
| except Exception as e: | |
| logger.error(f"✗ Error loading data: {e}") | |
| logger.error(traceback.format_exc()) | |
| raise | |
| def create_synthetic_data( | |
| self, | |
| n_days: int = 365*21, | |
| trend_strength: float = 0.01, | |
| seasonal_amplitude: List[float] = None, | |
| noise_std: float = 10, | |
| include_exogenous: bool = True, | |
| random_state: int = 42 | |
| ) -> pd.DataFrame: | |
| """ | |
| Create synthetic data for testing | |
| Parameters: | |
| ----------- | |
| n_days : int | |
| Number of days to generate | |
| trend_strength : float | |
| Trend strength | |
| seasonal_amplitude : List[float], optional | |
| Seasonal component amplitudes | |
| noise_std : float | |
| Noise standard deviation | |
| include_exogenous : bool | |
| Whether to include exogenous variables | |
| random_state : int | |
| Seed for reproducibility | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Synthetic data | |
| """ | |
| logger.info("="*80) | |
| logger.info("CREATING SYNTHETIC DATA") | |
| logger.info("="*80) | |
| if seasonal_amplitude is None: | |
| seasonal_amplitude = [50, 30, 20] | |
| np.random.seed(random_state) | |
| # Generate dates | |
| dates = pd.date_range( | |
| start=f'{self.config.start_year}-01-01', | |
| periods=n_days, | |
| freq='D' | |
| ) | |
| t = np.arange(n_days) | |
| # Base components | |
| trend = trend_strength * t | |
| # Seasonal components | |
| seasonal = 0 | |
| periods = [365, 30, 7] # yearly, monthly, weekly seasonality | |
| for i, (period, amplitude) in enumerate(zip(periods, seasonal_amplitude)): | |
| seasonal += amplitude * np.sin(2 * np.pi * t / period) | |
| if i < len(seasonal_amplitude) - 1: | |
| seasonal += 0.5 * amplitude * np.cos(4 * np.pi * t / period) | |
| # Cyclical component (business cycles) | |
| cycle = 20 * np.sin(2 * np.pi * t / (365*5)) # 5-year cycle | |
| # Noise | |
| noise = np.random.normal(0, noise_std, n_days) | |
| # Generate target variable | |
| raskhodvoda = 100 + trend + seasonal + cycle + noise | |
| # Create DataFrame | |
| self.data = pd.DataFrame( | |
| index=dates, | |
| data={'raskhodvoda': raskhodvoda} | |
| ) | |
| # Generate exogenous variables | |
| if include_exogenous: | |
| # Temperature with seasonality | |
| tavg = 10 + 8 * np.sin(2 * np.pi * t / 365) + np.random.normal(0, 3, n_days) | |
| tmin = tavg - 5 + np.random.normal(0, 2, n_days) | |
| tmax = tavg + 5 + np.random.normal(0, 2, n_days) | |
| # Water level with trend and seasonality | |
| urovenvoda = 200 + 0.5 * t + 20 * np.sin(2 * np.pi * t / 365) + np.random.normal(0, 5, n_days) | |
| # Add to DataFrame | |
| self.data['tavg'] = tavg | |
| self.data['tmin'] = tmin | |
| self.data['tmax'] = tmax | |
| self.data['urovenvoda'] = urovenvoda | |
| # Add noisy lags | |
| for lag in [1, 7, 30]: | |
| self.data[f'tavg_lag_{lag}'] = self.data['tavg'].shift(lag) + np.random.normal(0, 1, n_days) | |
| # Add missing values and outliers for testing | |
| if n_days > 100: | |
| # Missing values (5% of data) | |
| mask_missing = np.random.random(n_days) < 0.05 | |
| self.data.loc[mask_missing, 'tavg'] = np.nan | |
| # Outliers (1% of data) | |
| mask_outliers = np.random.random(n_days) < 0.01 | |
| self.data.loc[mask_outliers, 'raskhodvoda'] *= 2 | |
| # Save metadata | |
| self.metadata.update({ | |
| 'is_synthetic': True, | |
| 'synthetic_params': { | |
| 'n_days': n_days, | |
| 'trend_strength': trend_strength, | |
| 'seasonal_amplitude': seasonal_amplitude, | |
| 'noise_std': noise_std, | |
| 'include_exogenous': include_exogenous, | |
| 'random_state': random_state | |
| } | |
| }) | |
| logger.info(f"✓ Created {len(self.data)} synthetic records") | |
| logger.info(f" Columns: {list(self.data.columns)}") | |
| return self.data | |
| def _calculate_data_hash(self) -> str: | |
| """Calculate data hash for tracking changes""" | |
| if self.data is None: | |
| return None | |
| # Use hash of first 1000 rows and metadata | |
| sample = self.data.head(1000).to_string().encode() | |
| return hashlib.md5(sample).hexdigest() | |
| def _analyse_data_types(self) -> None: | |
| """Analyse data types in DataFrame""" | |
| if self.data is None: | |
| return | |
| for col in self.data.columns: | |
| dtype = str(self.data[col].dtype) | |
| if 'datetime' in dtype: | |
| self.data_types[col] = DataType.TEMPORAL.value | |
| elif 'int' in dtype or 'float' in dtype: | |
| self.data_types[col] = DataType.NUMERIC.value | |
| elif 'object' in dtype or 'category' in dtype: | |
| # Check if categorical | |
| unique_ratio = self.data[col].nunique() / len(self.data) | |
| if unique_ratio < 0.1: # Less than 10% unique values | |
| self.data_types[col] = DataType.CATEGORICAL.value | |
| else: | |
| self.data_types[col] = DataType.TEXT.value | |
| else: | |
| self.data_types[col] = 'unknown' | |
| def _save_metadata(self) -> None: | |
| """Save data metadata""" | |
| if self.data is None: | |
| return | |
| # Basic metadata | |
| self.metadata.update({ | |
| 'original_shape': list(self.original_shape) if self.original_shape else [], | |
| 'current_shape': list(self.data.shape), | |
| 'columns': list(self.data.columns), | |
| 'data_types': self.data_types, | |
| 'date_range': { | |
| 'min': self.data.index.min().strftime('%Y-%m-%d') if pd.notnull(self.data.index.min()) else None, | |
| 'max': self.data.index.max().strftime('%Y-%m-%d') if pd.notnull(self.data.index.max()) else None | |
| }, | |
| 'data_hash': self.data_hash, | |
| 'loading_time': self.loading_time | |
| }) | |
| # Statistics for numeric columns | |
| numeric_cols = self.data.select_dtypes(include=[np.number]).columns | |
| if len(numeric_cols) > 0: | |
| stats = self.data[numeric_cols].describe().to_dict() | |
| # Add additional statistics | |
| for col in numeric_cols: | |
| stats[col]['skewness'] = float(self.data[col].skew()) | |
| stats[col]['kurtosis'] = float(self.data[col].kurtosis()) | |
| stats[col]['cv'] = float(self.data[col].std() / self.data[col].mean()) if self.data[col].mean() != 0 else np.nan | |
| self.metadata['numeric_statistics'] = stats | |
| # Missing values information | |
| missing_info = { | |
| 'total_missing': int(self.data.isnull().sum().sum()), | |
| 'missing_by_column': self.data.isnull().sum().to_dict(), | |
| 'missing_percentage': (self.data.isnull().sum() / len(self.data) * 100).to_dict(), | |
| 'rows_with_missing': int(self.data.isnull().any(axis=1).sum()), | |
| 'columns_with_missing': self.data.columns[self.data.isnull().any()].tolist() | |
| } | |
| self.metadata['missing_info'] = missing_info | |
| def get_data_info(self) -> Dict: | |
| """Get information about data""" | |
| if self.data is None: | |
| return {} | |
| info = { | |
| 'shape': list(self.data.shape), | |
| 'columns': list(self.data.columns), | |
| 'data_types': self.data_types, | |
| 'date_range': { | |
| 'min': self.data.index.min().strftime('%Y-%m-%d') if pd.notnull(self.data.index.min()) else None, | |
| 'max': self.data.index.max().strftime('%Y-%m-%d') if pd.notnull(self.data.index.max()) else None | |
| }, | |
| 'target_column': self.config.target_column, | |
| 'numeric_columns': self.data.select_dtypes(include=[np.number]).columns.tolist(), | |
| 'categorical_columns': [col for col, dtype in self.data_types.items() | |
| if dtype == DataType.CATEGORICAL.value], | |
| 'missing_info': self.metadata.get('missing_info', {}) | |
| } | |
| return info | |
| def save_raw_data_info(self) -> None: | |
| """Save raw data information""" | |
| if self.data is None: | |
| return | |
| info_path = f'{self.config.results_dir}/reports/raw_data_info.json' | |
| # Custom JSON encoder for handling numpy types | |
| class NumpyEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, (np.integer, np.floating)): | |
| if np.isnan(obj): | |
| return None | |
| return float(obj) | |
| elif isinstance(obj, np.bool_): | |
| return bool(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| elif isinstance(obj, pd.Timestamp): | |
| return obj.strftime('%Y-%m-%d %H:%M:%S') | |
| elif isinstance(obj, pd.Period): | |
| return str(obj) | |
| return super().default(obj) | |
| with open(info_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.metadata, f, indent=4, ensure_ascii=False, cls=NumpyEncoder) | |
| logger.info(f"✓ Raw data information saved: {info_path}") | |
| def resample_data( | |
| self, | |
| freq: str = None, | |
| method: str = 'mean' | |
| ) -> pd.DataFrame: | |
| """ | |
| Resample time series data | |
| Parameters: | |
| ----------- | |
| freq : str, optional | |
| New frequency (e.g., 'D', 'W', 'M') | |
| method : str | |
| Aggregation method: 'mean', 'sum', 'last', 'first' | |
| Returns: | |
| -------- | |
| pd.DataFrame | |
| Resampled data | |
| """ | |
| if self.data is None: | |
| logger.warning("Data not loaded") | |
| return None | |
| freq = freq or self.config.freq | |
| # Check if index is datetime | |
| if not isinstance(self.data.index, pd.DatetimeIndex): | |
| logger.error("Data index is not DatetimeIndex") | |
| return self.data | |
| # Aggregation methods | |
| agg_methods = { | |
| 'mean': np.mean, | |
| 'sum': np.sum, | |
| 'last': lambda x: x.iloc[-1], | |
| 'first': lambda x: x.iloc[0], | |
| 'min': np.min, | |
| 'max': np.max, | |
| 'median': np.median | |
| } | |
| if method not in agg_methods: | |
| logger.warning(f"Method {method} not supported, using mean") | |
| method = 'mean' | |
| # Resampling | |
| try: | |
| if method == 'last': | |
| resampled_data = self.data.resample(freq).last() | |
| elif method == 'first': | |
| resampled_data = self.data.resample(freq).first() | |
| else: | |
| resampled_data = self.data.resample(freq).agg(agg_methods[method]) | |
| logger.info(f"Data resampled to frequency {freq}, method {method}") | |
| logger.info(f"Size before: {len(self.data)}, after: {len(resampled_data)}") | |
| self.data = resampled_data | |
| return self.data | |
| except Exception as e: | |
| logger.error(f"Error during resampling: {e}") | |
| return self.data | |
| def detect_frequency(self) -> str: | |
| """ | |
| Automatically detect data frequency | |
| Returns: | |
| -------- | |
| str | |
| Detected data frequency | |
| """ | |
| if self.data is None or len(self.data) < 2: | |
| return 'unknown' | |
| if not isinstance(self.data.index, pd.DatetimeIndex): | |
| return 'irregular' | |
| # Calculate differences between timestamps | |
| diffs = pd.Series(self.data.index).diff().dropna() | |
| if len(diffs) == 0: | |
| return 'unknown' | |
| # Most frequent difference | |
| mode_diff = diffs.mode().iloc[0] if not diffs.mode().empty else diffs.iloc[0] | |
| # Determine frequency | |
| if mode_diff < pd.Timedelta('1 hour'): | |
| return 'H' # Hourly | |
| elif mode_diff < pd.Timedelta('1 day'): | |
| return 'D' # Daily | |
| elif mode_diff < pd.Timedelta('7 days'): | |
| return 'W' # Weekly | |
| elif mode_diff < pd.Timedelta('30 days'): | |
| return 'M' # Monthly | |
| elif mode_diff < pd.Timedelta('90 days'): | |
| return 'Q' # Quarterly | |
| else: | |
| return 'Y' # Yearly |