TimeFlowPro / data_loader /data_loader.py
ArabovMK's picture
Update all files
d8f69a9
# ============================================
# CLASS 2: DATA LOADER
# ============================================
from datetime import datetime
import hashlib
import json
import traceback
from typing import Dict, List, Optional
from venv import logger
from config.config import Config, DataType
import numpy as np
import pandas as pd
class DataLoader:
"""Class for loading and initial data processing"""
def __init__(self, config: Config):
"""
Initialise data loader
Parameters:
-----------
config : Config
Experiment configuration
"""
self.config = config
self.data = None
self.metadata = {}
self.data_hash = None
self.loading_time = None
self.data_types = {}
self.original_shape = None
def load_from_csv(
self,
data_path: Optional[str] = None,
parse_dates: List[str] = None,
date_format: str = None,
dtype: Dict = None,
**kwargs
) -> pd.DataFrame:
"""
Load data from CSV file
Parameters:
-----------
data_path : str, optional
Path to CSV file. If None, uses path from configuration.
parse_dates : List[str], optional
List of columns to parse as dates
date_format : str, optional
Date format
dtype : Dict, optional
Data types for columns
**kwargs : dict
Additional parameters for pd.read_csv
Returns:
--------
pd.DataFrame
Loaded data
"""
logger.info("="*80)
logger.info("LOADING DATA FROM CSV")
logger.info("="*80)
start_time = datetime.now()
try:
path = data_path or self.config.data_path
if parse_dates is None:
parse_dates = ['date']
# Load data
self.data = pd.read_csv(
path,
parse_dates=parse_dates,
dayfirst=False,
dtype=dtype,
**kwargs
)
# Convert dates if needed
for date_col in parse_dates:
if date_col in self.data.columns:
if date_format:
self.data[date_col] = pd.to_datetime(
self.data[date_col],
format=date_format,
errors='coerce'
)
else:
self.data[date_col] = pd.to_datetime(
self.data[date_col],
errors='coerce'
)
# Save original shape
self.original_shape = self.data.shape
# Filter by years
if 'date' in self.data.columns:
mask = (self.data['date'].dt.year >= self.config.start_year) & \
(self.data['date'].dt.year <= self.config.end_year)
self.data = self.data.loc[mask].copy()
# Sort by date
if 'date' in self.data.columns:
self.data = self.data.sort_values('date').reset_index(drop=True)
# Set date as index
self.data.set_index('date', inplace=True)
# Calculate data hash
self.data_hash = self._calculate_data_hash()
# Analyse data types
self._analyse_data_types()
# Save metadata
self._save_metadata()
# Loading time
self.loading_time = (datetime.now() - start_time).total_seconds()
logger.info(f"✓ Loaded {len(self.data)} records, {len(self.data.columns)} columns")
logger.info(f" Period: {self.data.index.min()} - {self.data.index.max()}")
logger.info(f" Data types: {self.data_types}")
logger.info(f" Target variable: {self.config.target_column}")
logger.info(f" Loading time: {self.loading_time:.2f} sec")
return self.data
except Exception as e:
logger.error(f"✗ Error loading data: {e}")
logger.error(traceback.format_exc())
raise
def create_synthetic_data(
self,
n_days: int = 365*21,
trend_strength: float = 0.01,
seasonal_amplitude: List[float] = None,
noise_std: float = 10,
include_exogenous: bool = True,
random_state: int = 42
) -> pd.DataFrame:
"""
Create synthetic data for testing
Parameters:
-----------
n_days : int
Number of days to generate
trend_strength : float
Trend strength
seasonal_amplitude : List[float], optional
Seasonal component amplitudes
noise_std : float
Noise standard deviation
include_exogenous : bool
Whether to include exogenous variables
random_state : int
Seed for reproducibility
Returns:
--------
pd.DataFrame
Synthetic data
"""
logger.info("="*80)
logger.info("CREATING SYNTHETIC DATA")
logger.info("="*80)
if seasonal_amplitude is None:
seasonal_amplitude = [50, 30, 20]
np.random.seed(random_state)
# Generate dates
dates = pd.date_range(
start=f'{self.config.start_year}-01-01',
periods=n_days,
freq='D'
)
t = np.arange(n_days)
# Base components
trend = trend_strength * t
# Seasonal components
seasonal = 0
periods = [365, 30, 7] # yearly, monthly, weekly seasonality
for i, (period, amplitude) in enumerate(zip(periods, seasonal_amplitude)):
seasonal += amplitude * np.sin(2 * np.pi * t / period)
if i < len(seasonal_amplitude) - 1:
seasonal += 0.5 * amplitude * np.cos(4 * np.pi * t / period)
# Cyclical component (business cycles)
cycle = 20 * np.sin(2 * np.pi * t / (365*5)) # 5-year cycle
# Noise
noise = np.random.normal(0, noise_std, n_days)
# Generate target variable
raskhodvoda = 100 + trend + seasonal + cycle + noise
# Create DataFrame
self.data = pd.DataFrame(
index=dates,
data={'raskhodvoda': raskhodvoda}
)
# Generate exogenous variables
if include_exogenous:
# Temperature with seasonality
tavg = 10 + 8 * np.sin(2 * np.pi * t / 365) + np.random.normal(0, 3, n_days)
tmin = tavg - 5 + np.random.normal(0, 2, n_days)
tmax = tavg + 5 + np.random.normal(0, 2, n_days)
# Water level with trend and seasonality
urovenvoda = 200 + 0.5 * t + 20 * np.sin(2 * np.pi * t / 365) + np.random.normal(0, 5, n_days)
# Add to DataFrame
self.data['tavg'] = tavg
self.data['tmin'] = tmin
self.data['tmax'] = tmax
self.data['urovenvoda'] = urovenvoda
# Add noisy lags
for lag in [1, 7, 30]:
self.data[f'tavg_lag_{lag}'] = self.data['tavg'].shift(lag) + np.random.normal(0, 1, n_days)
# Add missing values and outliers for testing
if n_days > 100:
# Missing values (5% of data)
mask_missing = np.random.random(n_days) < 0.05
self.data.loc[mask_missing, 'tavg'] = np.nan
# Outliers (1% of data)
mask_outliers = np.random.random(n_days) < 0.01
self.data.loc[mask_outliers, 'raskhodvoda'] *= 2
# Save metadata
self.metadata.update({
'is_synthetic': True,
'synthetic_params': {
'n_days': n_days,
'trend_strength': trend_strength,
'seasonal_amplitude': seasonal_amplitude,
'noise_std': noise_std,
'include_exogenous': include_exogenous,
'random_state': random_state
}
})
logger.info(f"✓ Created {len(self.data)} synthetic records")
logger.info(f" Columns: {list(self.data.columns)}")
return self.data
def _calculate_data_hash(self) -> str:
"""Calculate data hash for tracking changes"""
if self.data is None:
return None
# Use hash of first 1000 rows and metadata
sample = self.data.head(1000).to_string().encode()
return hashlib.md5(sample).hexdigest()
def _analyse_data_types(self) -> None:
"""Analyse data types in DataFrame"""
if self.data is None:
return
for col in self.data.columns:
dtype = str(self.data[col].dtype)
if 'datetime' in dtype:
self.data_types[col] = DataType.TEMPORAL.value
elif 'int' in dtype or 'float' in dtype:
self.data_types[col] = DataType.NUMERIC.value
elif 'object' in dtype or 'category' in dtype:
# Check if categorical
unique_ratio = self.data[col].nunique() / len(self.data)
if unique_ratio < 0.1: # Less than 10% unique values
self.data_types[col] = DataType.CATEGORICAL.value
else:
self.data_types[col] = DataType.TEXT.value
else:
self.data_types[col] = 'unknown'
def _save_metadata(self) -> None:
"""Save data metadata"""
if self.data is None:
return
# Basic metadata
self.metadata.update({
'original_shape': list(self.original_shape) if self.original_shape else [],
'current_shape': list(self.data.shape),
'columns': list(self.data.columns),
'data_types': self.data_types,
'date_range': {
'min': self.data.index.min().strftime('%Y-%m-%d') if pd.notnull(self.data.index.min()) else None,
'max': self.data.index.max().strftime('%Y-%m-%d') if pd.notnull(self.data.index.max()) else None
},
'data_hash': self.data_hash,
'loading_time': self.loading_time
})
# Statistics for numeric columns
numeric_cols = self.data.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
stats = self.data[numeric_cols].describe().to_dict()
# Add additional statistics
for col in numeric_cols:
stats[col]['skewness'] = float(self.data[col].skew())
stats[col]['kurtosis'] = float(self.data[col].kurtosis())
stats[col]['cv'] = float(self.data[col].std() / self.data[col].mean()) if self.data[col].mean() != 0 else np.nan
self.metadata['numeric_statistics'] = stats
# Missing values information
missing_info = {
'total_missing': int(self.data.isnull().sum().sum()),
'missing_by_column': self.data.isnull().sum().to_dict(),
'missing_percentage': (self.data.isnull().sum() / len(self.data) * 100).to_dict(),
'rows_with_missing': int(self.data.isnull().any(axis=1).sum()),
'columns_with_missing': self.data.columns[self.data.isnull().any()].tolist()
}
self.metadata['missing_info'] = missing_info
def get_data_info(self) -> Dict:
"""Get information about data"""
if self.data is None:
return {}
info = {
'shape': list(self.data.shape),
'columns': list(self.data.columns),
'data_types': self.data_types,
'date_range': {
'min': self.data.index.min().strftime('%Y-%m-%d') if pd.notnull(self.data.index.min()) else None,
'max': self.data.index.max().strftime('%Y-%m-%d') if pd.notnull(self.data.index.max()) else None
},
'target_column': self.config.target_column,
'numeric_columns': self.data.select_dtypes(include=[np.number]).columns.tolist(),
'categorical_columns': [col for col, dtype in self.data_types.items()
if dtype == DataType.CATEGORICAL.value],
'missing_info': self.metadata.get('missing_info', {})
}
return info
def save_raw_data_info(self) -> None:
"""Save raw data information"""
if self.data is None:
return
info_path = f'{self.config.results_dir}/reports/raw_data_info.json'
# Custom JSON encoder for handling numpy types
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (np.integer, np.floating)):
if np.isnan(obj):
return None
return float(obj)
elif isinstance(obj, np.bool_):
return bool(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, pd.Timestamp):
return obj.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(obj, pd.Period):
return str(obj)
return super().default(obj)
with open(info_path, 'w', encoding='utf-8') as f:
json.dump(self.metadata, f, indent=4, ensure_ascii=False, cls=NumpyEncoder)
logger.info(f"✓ Raw data information saved: {info_path}")
def resample_data(
self,
freq: str = None,
method: str = 'mean'
) -> pd.DataFrame:
"""
Resample time series data
Parameters:
-----------
freq : str, optional
New frequency (e.g., 'D', 'W', 'M')
method : str
Aggregation method: 'mean', 'sum', 'last', 'first'
Returns:
--------
pd.DataFrame
Resampled data
"""
if self.data is None:
logger.warning("Data not loaded")
return None
freq = freq or self.config.freq
# Check if index is datetime
if not isinstance(self.data.index, pd.DatetimeIndex):
logger.error("Data index is not DatetimeIndex")
return self.data
# Aggregation methods
agg_methods = {
'mean': np.mean,
'sum': np.sum,
'last': lambda x: x.iloc[-1],
'first': lambda x: x.iloc[0],
'min': np.min,
'max': np.max,
'median': np.median
}
if method not in agg_methods:
logger.warning(f"Method {method} not supported, using mean")
method = 'mean'
# Resampling
try:
if method == 'last':
resampled_data = self.data.resample(freq).last()
elif method == 'first':
resampled_data = self.data.resample(freq).first()
else:
resampled_data = self.data.resample(freq).agg(agg_methods[method])
logger.info(f"Data resampled to frequency {freq}, method {method}")
logger.info(f"Size before: {len(self.data)}, after: {len(resampled_data)}")
self.data = resampled_data
return self.data
except Exception as e:
logger.error(f"Error during resampling: {e}")
return self.data
def detect_frequency(self) -> str:
"""
Automatically detect data frequency
Returns:
--------
str
Detected data frequency
"""
if self.data is None or len(self.data) < 2:
return 'unknown'
if not isinstance(self.data.index, pd.DatetimeIndex):
return 'irregular'
# Calculate differences between timestamps
diffs = pd.Series(self.data.index).diff().dropna()
if len(diffs) == 0:
return 'unknown'
# Most frequent difference
mode_diff = diffs.mode().iloc[0] if not diffs.mode().empty else diffs.iloc[0]
# Determine frequency
if mode_diff < pd.Timedelta('1 hour'):
return 'H' # Hourly
elif mode_diff < pd.Timedelta('1 day'):
return 'D' # Daily
elif mode_diff < pd.Timedelta('7 days'):
return 'W' # Weekly
elif mode_diff < pd.Timedelta('30 days'):
return 'M' # Monthly
elif mode_diff < pd.Timedelta('90 days'):
return 'Q' # Quarterly
else:
return 'Y' # Yearly