File size: 9,014 Bytes
c86611e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eb6d865
c86611e
 
 
eb6d865
 
 
 
 
c86611e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
"""
Data processing and feature engineering utilities.
"""
import pandas as pd
import numpy as np
from scipy import stats


def clean_timeseries(df: pd.DataFrame, value_col: str = None) -> pd.DataFrame:
    """
    Clean a time series DataFrame:
    - Ensure datetime index
    - Sort by date
    - Handle missing values with interpolation
    - Remove duplicates
    """
    df = df.copy()
    
    # Ensure datetime index
    if not isinstance(df.index, pd.DatetimeIndex):
        if "Date" in df.columns:
            df["Date"] = pd.to_datetime(df["Date"])
            df = df.set_index("Date")
        elif "date" in df.columns:
            df["date"] = pd.to_datetime(df["date"])
            df = df.set_index("date")
        else:
            # Try to parse the first column as dates
            try:
                df.index = pd.to_datetime(df.index)
            except Exception:
                pass
    
    # Sort by index
    df = df.sort_index()
    
    # Remove duplicate indices
    df = df[~df.index.duplicated(keep="first")]
    
    # Interpolate missing values
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
        if df[col].isna().sum() > 0:
            df[col] = df[col].interpolate(method="time", limit_direction="both")
            df[col] = df[col].ffill().bfill()
    
    return df


def compute_returns(df: pd.DataFrame, col: str = "Close", periods: int = 1) -> pd.Series:
    """Compute percentage returns."""
    return df[col].pct_change(periods) * 100


def compute_rolling_stats(df: pd.DataFrame, col: str, windows: list = None) -> pd.DataFrame:
    """
    Compute rolling statistics (mean, std) for given windows.
    """
    if windows is None:
        windows = [7, 14, 30, 90]
    
    result = df[[col]].copy()
    for w in windows:
        if len(df) >= w:
            result[f"MA_{w}"] = df[col].rolling(window=w).mean()
            result[f"STD_{w}"] = df[col].rolling(window=w).std()
    
    return result


def compute_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
    """
    Compute common technical indicators for stock/crypto data.
    Expects columns: Open, High, Low, Close, Volume
    """
    result = df.copy()
    
    # Moving Averages
    for period in [7, 20, 50, 200]:
        if len(df) >= period:
            result[f"SMA_{period}"] = df["Close"].rolling(window=period).mean()
            result[f"EMA_{period}"] = df["Close"].ewm(span=period, adjust=False).mean()
    
    # RSI (Relative Strength Index)
    if len(df) >= 14:
        delta = df["Close"].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
        rs = gain / loss
        result["RSI"] = 100 - (100 / (1 + rs))
    
    # MACD
    if len(df) >= 26:
        ema12 = df["Close"].ewm(span=12, adjust=False).mean()
        ema26 = df["Close"].ewm(span=26, adjust=False).mean()
        result["MACD"] = ema12 - ema26
        result["MACD_Signal"] = result["MACD"].ewm(span=9, adjust=False).mean()
        result["MACD_Hist"] = result["MACD"] - result["MACD_Signal"]
    
    # Bollinger Bands
    if len(df) >= 20:
        sma20 = df["Close"].rolling(window=20).mean()
        std20 = df["Close"].rolling(window=20).std()
        result["BB_Upper"] = sma20 + (std20 * 2)
        result["BB_Lower"] = sma20 - (std20 * 2)
        result["BB_Middle"] = sma20
    
    # ATR (Average True Range)
    if len(df) >= 14:
        high_low = df["High"] - df["Low"]
        high_close = (df["High"] - df["Close"].shift()).abs()
        low_close = (df["Low"] - df["Close"].shift()).abs()
        true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
        result["ATR"] = true_range.rolling(window=14).mean()
    
    # Daily Returns
    result["Returns"] = df["Close"].pct_change() * 100
    
    # Volume Moving Average
    if "Volume" in df.columns and len(df) >= 20:
        result["Volume_MA20"] = df["Volume"].rolling(window=20).mean()
    
    return result


def decompose_timeseries(series: pd.Series, period: int = None) -> dict:
    """
    Decompose time series into trend, seasonal, and residual components.
    """
    from statsmodels.tsa.seasonal import seasonal_decompose
    
    if period is None:
        # Auto-detect period
        n = len(series)
        if n >= 730:
            period = 365
        elif n >= 60:
            period = 30
        elif n >= 14:
            period = 7
        else:
            period = max(2, n // 3)
    
    # Ensure no missing values
    series = series.dropna()
    
    if len(series) < 2 * period:
        period = max(2, len(series) // 3)
    
    result = seasonal_decompose(series, model="additive", period=period)
    
    return {
        "observed": result.observed,
        "trend": result.trend,
        "seasonal": result.seasonal,
        "residual": result.resid,
    }


def detect_anomalies(series: pd.Series, method: str = "zscore", threshold: float = 3.0) -> pd.Series:
    """
    Detect anomalies in a time series.
    Returns a boolean Series (True = anomaly).
    """
    if method == "zscore":
        z_scores = np.abs(stats.zscore(series.dropna()))
        anomalies = pd.Series(False, index=series.index)
        anomalies[series.dropna().index] = z_scores > threshold
    elif method == "iqr":
        Q1 = series.quantile(0.25)
        Q3 = series.quantile(0.75)
        IQR = Q3 - Q1
        anomalies = (series < (Q1 - 1.5 * IQR)) | (series > (Q3 + 1.5 * IQR))
    else:
        anomalies = pd.Series(False, index=series.index)
    
    return anomalies


def compute_stationarity_test(series: pd.Series) -> dict:
    """
    Perform Augmented Dickey-Fuller test for stationarity.
    """
    from statsmodels.tsa.stattools import adfuller
    
    series = series.dropna()
    if len(series) < 20:
        return {"error": "Not enough data points for stationarity test"}
    
    result = adfuller(series, autolag="AIC")
    
    return {
        "test_statistic": round(result[0], 4),
        "p_value": round(result[1], 6),
        "lags_used": result[2],
        "observations": result[3],
        "critical_values": {k: round(v, 4) for k, v in result[4].items()},
        "is_stationary": result[1] < 0.05,
    }


def prepare_forecast_data(df: pd.DataFrame, target_col: str, train_ratio: float = 0.8) -> tuple:
    """
    Split time series into train and test sets.
    Returns (train, test)
    """
    n = len(df)
    split_idx = int(n * train_ratio)
    
    train = df.iloc[:split_idx]
    test = df.iloc[split_idx:]
    
    return train, test

def inject_stochastic_volatility(forecast: pd.Series, actual: pd.Series, fitted: pd.Series = None) -> pd.Series:
    """
    Inject realistic historical volatility (noise) into a smooth prediction line
    to simulate a realistic path based on past patterns.
    """
    if fitted is None or len(fitted) < 10:
        # Fallback to computing volatility of differenced actuals
        diffs = actual.diff().dropna()
        std_resid = diffs.std()
    else:
        # Calculate volatility from model residuals
        common_idx = fitted.index.intersection(actual.index)
        if len(common_idx) < 10:
            diffs = actual.diff().dropna()
            std_resid = diffs.std()
        else:
            residuals = actual[common_idx] - fitted[common_idx]
            std_resid = np.nanstd(residuals)
            
    if np.isnan(std_resid) or std_resid == 0:
        std_resid = actual.diff().dropna().std()

    # Calculate recent market volatility (last 30 days) to ensure the noise is proportional
    recent_actual = actual.tail(30)
    std_recent = recent_actual.std() if len(recent_actual) > 5 else std_resid
    
    if np.isnan(std_recent) or std_recent == 0:
        std_recent = std_resid

    # Ensure baseline volatility is noticeable (at least 15% of recent standard deviation)
    base_volatility = max(std_resid, std_recent * 0.15)
    
    if np.isnan(base_volatility) or base_volatility == 0:
        return forecast

    # Generate a mean-reverting Random Walk (AR(1) with high momentum)
    # This perfectly simulates real asset price movement (trends that eventually revert to the prediction mean)
    n_steps = len(forecast)
    noise = np.zeros(n_steps)
    
    # Amplify the scale slightly so the user visually sees the "fluctuation"
    noise_scale = base_volatility * 1.2
    
    np.random.seed(np.random.randint(0, 10000)) # Randomize seed for every run
    # 0.85 momentum makes the noise "drift" like a real market swing before returning to the forecast line
    for i in range(1, n_steps):
        noise[i] = 0.85 * noise[i-1] + np.random.normal(0, noise_scale * 0.5)
        
    # Apply a tapering window (Sigmoid-like) at the start
    # This ensures the transition from actual history to forecast is perfectly smooth
    taper = np.linspace(0, 1, min(n_steps, 10))
    noise[:len(taper)] *= taper
        
    return forecast + noise