Spaces:
Running
Running
File size: 9,014 Bytes
c86611e eb6d865 c86611e eb6d865 c86611e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 | """
Data processing and feature engineering utilities.
"""
import pandas as pd
import numpy as np
from scipy import stats
def clean_timeseries(df: pd.DataFrame, value_col: str = None) -> pd.DataFrame:
"""
Clean a time series DataFrame:
- Ensure datetime index
- Sort by date
- Handle missing values with interpolation
- Remove duplicates
"""
df = df.copy()
# Ensure datetime index
if not isinstance(df.index, pd.DatetimeIndex):
if "Date" in df.columns:
df["Date"] = pd.to_datetime(df["Date"])
df = df.set_index("Date")
elif "date" in df.columns:
df["date"] = pd.to_datetime(df["date"])
df = df.set_index("date")
else:
# Try to parse the first column as dates
try:
df.index = pd.to_datetime(df.index)
except Exception:
pass
# Sort by index
df = df.sort_index()
# Remove duplicate indices
df = df[~df.index.duplicated(keep="first")]
# Interpolate missing values
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if df[col].isna().sum() > 0:
df[col] = df[col].interpolate(method="time", limit_direction="both")
df[col] = df[col].ffill().bfill()
return df
def compute_returns(df: pd.DataFrame, col: str = "Close", periods: int = 1) -> pd.Series:
"""Compute percentage returns."""
return df[col].pct_change(periods) * 100
def compute_rolling_stats(df: pd.DataFrame, col: str, windows: list = None) -> pd.DataFrame:
"""
Compute rolling statistics (mean, std) for given windows.
"""
if windows is None:
windows = [7, 14, 30, 90]
result = df[[col]].copy()
for w in windows:
if len(df) >= w:
result[f"MA_{w}"] = df[col].rolling(window=w).mean()
result[f"STD_{w}"] = df[col].rolling(window=w).std()
return result
def compute_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
"""
Compute common technical indicators for stock/crypto data.
Expects columns: Open, High, Low, Close, Volume
"""
result = df.copy()
# Moving Averages
for period in [7, 20, 50, 200]:
if len(df) >= period:
result[f"SMA_{period}"] = df["Close"].rolling(window=period).mean()
result[f"EMA_{period}"] = df["Close"].ewm(span=period, adjust=False).mean()
# RSI (Relative Strength Index)
if len(df) >= 14:
delta = df["Close"].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
result["RSI"] = 100 - (100 / (1 + rs))
# MACD
if len(df) >= 26:
ema12 = df["Close"].ewm(span=12, adjust=False).mean()
ema26 = df["Close"].ewm(span=26, adjust=False).mean()
result["MACD"] = ema12 - ema26
result["MACD_Signal"] = result["MACD"].ewm(span=9, adjust=False).mean()
result["MACD_Hist"] = result["MACD"] - result["MACD_Signal"]
# Bollinger Bands
if len(df) >= 20:
sma20 = df["Close"].rolling(window=20).mean()
std20 = df["Close"].rolling(window=20).std()
result["BB_Upper"] = sma20 + (std20 * 2)
result["BB_Lower"] = sma20 - (std20 * 2)
result["BB_Middle"] = sma20
# ATR (Average True Range)
if len(df) >= 14:
high_low = df["High"] - df["Low"]
high_close = (df["High"] - df["Close"].shift()).abs()
low_close = (df["Low"] - df["Close"].shift()).abs()
true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
result["ATR"] = true_range.rolling(window=14).mean()
# Daily Returns
result["Returns"] = df["Close"].pct_change() * 100
# Volume Moving Average
if "Volume" in df.columns and len(df) >= 20:
result["Volume_MA20"] = df["Volume"].rolling(window=20).mean()
return result
def decompose_timeseries(series: pd.Series, period: int = None) -> dict:
"""
Decompose time series into trend, seasonal, and residual components.
"""
from statsmodels.tsa.seasonal import seasonal_decompose
if period is None:
# Auto-detect period
n = len(series)
if n >= 730:
period = 365
elif n >= 60:
period = 30
elif n >= 14:
period = 7
else:
period = max(2, n // 3)
# Ensure no missing values
series = series.dropna()
if len(series) < 2 * period:
period = max(2, len(series) // 3)
result = seasonal_decompose(series, model="additive", period=period)
return {
"observed": result.observed,
"trend": result.trend,
"seasonal": result.seasonal,
"residual": result.resid,
}
def detect_anomalies(series: pd.Series, method: str = "zscore", threshold: float = 3.0) -> pd.Series:
"""
Detect anomalies in a time series.
Returns a boolean Series (True = anomaly).
"""
if method == "zscore":
z_scores = np.abs(stats.zscore(series.dropna()))
anomalies = pd.Series(False, index=series.index)
anomalies[series.dropna().index] = z_scores > threshold
elif method == "iqr":
Q1 = series.quantile(0.25)
Q3 = series.quantile(0.75)
IQR = Q3 - Q1
anomalies = (series < (Q1 - 1.5 * IQR)) | (series > (Q3 + 1.5 * IQR))
else:
anomalies = pd.Series(False, index=series.index)
return anomalies
def compute_stationarity_test(series: pd.Series) -> dict:
"""
Perform Augmented Dickey-Fuller test for stationarity.
"""
from statsmodels.tsa.stattools import adfuller
series = series.dropna()
if len(series) < 20:
return {"error": "Not enough data points for stationarity test"}
result = adfuller(series, autolag="AIC")
return {
"test_statistic": round(result[0], 4),
"p_value": round(result[1], 6),
"lags_used": result[2],
"observations": result[3],
"critical_values": {k: round(v, 4) for k, v in result[4].items()},
"is_stationary": result[1] < 0.05,
}
def prepare_forecast_data(df: pd.DataFrame, target_col: str, train_ratio: float = 0.8) -> tuple:
"""
Split time series into train and test sets.
Returns (train, test)
"""
n = len(df)
split_idx = int(n * train_ratio)
train = df.iloc[:split_idx]
test = df.iloc[split_idx:]
return train, test
def inject_stochastic_volatility(forecast: pd.Series, actual: pd.Series, fitted: pd.Series = None) -> pd.Series:
"""
Inject realistic historical volatility (noise) into a smooth prediction line
to simulate a realistic path based on past patterns.
"""
if fitted is None or len(fitted) < 10:
# Fallback to computing volatility of differenced actuals
diffs = actual.diff().dropna()
std_resid = diffs.std()
else:
# Calculate volatility from model residuals
common_idx = fitted.index.intersection(actual.index)
if len(common_idx) < 10:
diffs = actual.diff().dropna()
std_resid = diffs.std()
else:
residuals = actual[common_idx] - fitted[common_idx]
std_resid = np.nanstd(residuals)
if np.isnan(std_resid) or std_resid == 0:
std_resid = actual.diff().dropna().std()
# Calculate recent market volatility (last 30 days) to ensure the noise is proportional
recent_actual = actual.tail(30)
std_recent = recent_actual.std() if len(recent_actual) > 5 else std_resid
if np.isnan(std_recent) or std_recent == 0:
std_recent = std_resid
# Ensure baseline volatility is noticeable (at least 15% of recent standard deviation)
base_volatility = max(std_resid, std_recent * 0.15)
if np.isnan(base_volatility) or base_volatility == 0:
return forecast
# Generate a mean-reverting Random Walk (AR(1) with high momentum)
# This perfectly simulates real asset price movement (trends that eventually revert to the prediction mean)
n_steps = len(forecast)
noise = np.zeros(n_steps)
# Amplify the scale slightly so the user visually sees the "fluctuation"
noise_scale = base_volatility * 1.2
np.random.seed(np.random.randint(0, 10000)) # Randomize seed for every run
# 0.85 momentum makes the noise "drift" like a real market swing before returning to the forecast line
for i in range(1, n_steps):
noise[i] = 0.85 * noise[i-1] + np.random.normal(0, noise_scale * 0.5)
# Apply a tapering window (Sigmoid-like) at the start
# This ensures the transition from actual history to forecast is perfectly smooth
taper = np.linspace(0, 1, min(n_steps, 10))
noise[:len(taper)] *= taper
return forecast + noise
|