File size: 14,950 Bytes
e6021a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
"""
Internal Pine Script Backtester.

Translates the *intent* of Pine Script strategies into Python
and executes them against real historical data via yfinance.
This is NOT a full Pine Script parser — it maps the generated
template strategies to the existing BacktestEngine.

Produces TradingView-style performance metrics:
  - Net Profit, Gross Profit, Gross Loss
  - Max Drawdown, Profit Factor
  - Sharpe Ratio, Sortino Ratio
  - Win Rate, Average Win/Loss
  - Equity Curve, Monthly Returns
"""

from __future__ import annotations

import logging
import re
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd

logger = logging.getLogger(__name__)


class PineScriptBacktester:
    """
    Simulates Pine Script strategies using Python.

    For template-generated strategies, maps the strategy logic
    directly to pandas operations for accurate backtesting.
    """

    async def backtest(
        self,
        code: str,
        ticker: str = "SPY",
        period: str = "3y",
        initial_capital: float = 100_000,
        commission_pct: float = 0.1,
    ) -> Dict[str, Any]:
        """
        Run a backtest on Pine Script code.
        """
        from app.services.data_ingestion.yahoo import yahoo_adapter

        # Fetch data
        df = await yahoo_adapter.get_price_dataframe(ticker, period=period)
        if df.empty or len(df) < 50:
            raise ValueError(f"Insufficient data for {ticker}")

        # Detect strategy type from code and run appropriate backtest
        strategy_type = self._detect_strategy_type(code)
        params = self._extract_parameters(code)

        signals = self._generate_signals(df, strategy_type, params)
        results = self._simulate_trades(df, signals, initial_capital, commission_pct)

        return {
            "ticker": ticker,
            "period": period,
            "strategy_type": strategy_type,
            "initial_capital": initial_capital,
            **results,
        }

    def _detect_strategy_type(self, code: str) -> str:
        """Determine strategy type from Pine Script code keywords."""
        code_lower = code.lower()

        if "ta.supertrend" in code_lower:
            return "supertrend"
        if "ta.macd" in code_lower:
            return "macd"
        if "ta.rsi" in code_lower and "ta.stoch" in code_lower:
            return "stochastic_rsi"
        if "ta.rsi" in code_lower:
            return "rsi"
        if "ta.bb" in code_lower or "bollinger" in code_lower or "ta.stdev" in code_lower:
            return "bollinger"
        if "ta.vwap" in code_lower or "vwap" in code_lower:
            return "vwap"
        if "ichimoku" in code_lower or "donchian" in code_lower:
            return "ichimoku"
        if "z_score" in code_lower or "z-score" in code_lower:
            return "zscore"
        if "ema" in code_lower and code_lower.count("ta.ema") >= 3:
            return "ema_ribbon"
        if "crossover" in code_lower and "sma" in code_lower:
            return "sma_crossover"
        if "ta.atr" in code_lower and "trail" in code_lower:
            return "atr_trailing"
        if "request.security" in code_lower:
            return "multi_timeframe"

        return "sma_crossover"  # default

    def _extract_parameters(self, code: str) -> Dict[str, Any]:
        """Extract input parameters from Pine Script code."""
        params: Dict[str, Any] = {}

        # Match input.int(value, ...) and input.float(value, ...)
        int_matches = re.findall(r'(\w+)\s*=\s*input\.int\((\d+)', code)
        float_matches = re.findall(r'(\w+)\s*=\s*input\.float\(([\d.]+)', code)

        for name, val in int_matches:
            params[name] = int(val)
        for name, val in float_matches:
            params[name] = float(val)

        return params

    def _generate_signals(
        self,
        df: pd.DataFrame,
        strategy_type: str,
        params: Dict[str, Any],
    ) -> pd.Series:
        """
        Generate trading signals: +1 = buy, -1 = sell, 0 = hold.
        Maps each strategy type to Python logic.
        """
        close = df["Close"]
        high = df["High"]
        low = df["Low"]
        volume = df["Volume"] if "Volume" in df.columns else pd.Series(0, index=df.index)
        signals = pd.Series(0, index=df.index)

        if strategy_type == "sma_crossover":
            fast = params.get("fast_length", 20)
            slow = params.get("slow_length", 50)
            sma_fast = close.rolling(fast).mean()
            sma_slow = close.rolling(slow).mean()
            signals[sma_fast > sma_slow] = 1
            signals[sma_fast <= sma_slow] = -1

        elif strategy_type == "rsi":
            length = params.get("rsi_length", 14)
            oversold = params.get("oversold_level", 30)
            overbought = params.get("overbought_level", 70)
            delta = close.diff()
            gain = delta.where(delta > 0, 0).rolling(length).mean()
            loss = (-delta.where(delta < 0, 0)).rolling(length).mean()
            rs = gain / (loss + 1e-10)
            rsi = 100 - (100 / (1 + rs))
            signals[rsi < oversold] = 1
            signals[rsi > overbought] = -1

        elif strategy_type == "macd":
            fast = params.get("fast_len", 12)
            slow = params.get("slow_len", 26)
            sig = params.get("sig_len", 9)
            ema_fast = close.ewm(span=fast).mean()
            ema_slow = close.ewm(span=slow).mean()
            macd_line = ema_fast - ema_slow
            signal_line = macd_line.ewm(span=sig).mean()
            hist = macd_line - signal_line
            signals[(macd_line > signal_line) & (hist > 0)] = 1
            signals[macd_line < signal_line] = -1

        elif strategy_type == "bollinger":
            length = params.get("length", 20)
            mult = params.get("mult", 2.0)
            basis = close.rolling(length).mean()
            std = close.rolling(length).std()
            upper = basis + mult * std
            signals[close > upper] = 1
            signals[close < basis] = -1

        elif strategy_type == "supertrend":
            atr_len = params.get("atr_len", 10)
            factor = params.get("factor", 3.0)
            tr = pd.concat([
                high - low,
                abs(high - close.shift(1)),
                abs(low - close.shift(1))
            ], axis=1).max(axis=1)
            atr = tr.rolling(atr_len).mean()
            upper_band = (high + low) / 2 + factor * atr
            lower_band = (high + low) / 2 - factor * atr

            supertrend = pd.Series(0.0, index=df.index)
            direction = pd.Series(1, index=df.index)
            for i in range(1, len(df)):
                if close.iloc[i] > upper_band.iloc[i-1]:
                    direction.iloc[i] = 1
                elif close.iloc[i] < lower_band.iloc[i-1]:
                    direction.iloc[i] = -1
                else:
                    direction.iloc[i] = direction.iloc[i-1]
            signals[direction == 1] = 1
            signals[direction == -1] = -1

        elif strategy_type == "ema_ribbon":
            emas = [8, 13, 21, 34, 55]
            ema_vals = [close.ewm(span=e).mean() for e in emas]
            bullish = all(ema_vals[i].iloc[-1] > ema_vals[i+1].iloc[-1] for i in range(len(ema_vals)-1))
            for i in range(len(df)):
                if all(ema_vals[j].iloc[i] > ema_vals[j+1].iloc[i] for j in range(len(ema_vals)-1) if i < len(ema_vals[j])):
                    signals.iloc[i] = 1
                elif all(ema_vals[j].iloc[i] < ema_vals[j+1].iloc[i] for j in range(len(ema_vals)-1) if i < len(ema_vals[j])):
                    signals.iloc[i] = -1

        elif strategy_type == "zscore":
            lookback = params.get("lookback", 50)
            entry_z = params.get("entry_z", 2.0)
            mean = close.rolling(lookback).mean()
            std = close.rolling(lookback).std()
            z = (close - mean) / (std + 1e-10)
            signals[z < -entry_z] = 1
            signals[z > entry_z] = -1

        else:
            # Default: SMA crossover
            sma20 = close.rolling(20).mean()
            sma50 = close.rolling(50).mean()
            signals[sma20 > sma50] = 1
            signals[sma20 <= sma50] = -1

        return signals

    def _simulate_trades(
        self,
        df: pd.DataFrame,
        signals: pd.Series,
        initial_capital: float,
        commission_pct: float,
    ) -> Dict[str, Any]:
        """
        Simulate trading based on signals and compute all metrics.
        """
        close = df["Close"].values
        sig = signals.values
        n = len(close)

        # Track positions and equity
        position = 0  # 0 or 1
        entry_price = 0.0
        capital = initial_capital
        equity_curve = [initial_capital]
        trades: List[Dict[str, Any]] = []
        daily_returns: List[float] = [0.0]

        for i in range(1, n):
            if sig[i] == 1 and position == 0:
                # Buy
                shares = capital / close[i]
                commission = capital * (commission_pct / 100)
                capital -= commission
                entry_price = close[i]
                position = 1
                trades.append({
                    "type": "ENTRY",
                    "date": str(df.index[i].date()) if hasattr(df.index[i], 'date') else str(df.index[i]),
                    "price": round(close[i], 2),
                    "shares": round(shares, 4),
                })

            elif sig[i] == -1 and position == 1:
                # Sell
                ret = (close[i] - entry_price) / entry_price
                commission = capital * (1 + ret) * (commission_pct / 100)
                capital = capital * (1 + ret) - commission
                position = 0
                trades.append({
                    "type": "EXIT",
                    "date": str(df.index[i].date()) if hasattr(df.index[i], 'date') else str(df.index[i]),
                    "price": round(close[i], 2),
                    "pnl_pct": round(ret * 100, 2),
                    "pnl_abs": round(capital - equity_curve[-1], 2),
                })

            # Update equity
            if position == 1:
                current_equity = capital * (close[i] / entry_price) if entry_price > 0 else capital
            else:
                current_equity = capital

            daily_ret = (current_equity - equity_curve[-1]) / equity_curve[-1] if equity_curve[-1] > 0 else 0
            daily_returns.append(daily_ret)
            equity_curve.append(current_equity)

        # Close open position
        if position == 1:
            ret = (close[-1] - entry_price) / entry_price
            capital = capital * (1 + ret)

        # Compute metrics
        eq = np.array(equity_curve)
        rets = np.array(daily_returns)
        peak = np.maximum.accumulate(eq)
        drawdown = (eq - peak) / (peak + 1e-10)

        # Trade stats
        pnl_list = [t.get("pnl_pct", 0) for t in trades if t["type"] == "EXIT"]
        wins = [p for p in pnl_list if p > 0]
        losses = [p for p in pnl_list if p < 0]

        total_trades = len(pnl_list)
        win_rate = len(wins) / total_trades if total_trades > 0 else 0
        avg_win = np.mean(wins) if wins else 0
        avg_loss = np.mean(losses) if losses else 0

        net_profit = eq[-1] - initial_capital
        net_profit_pct = (eq[-1] / initial_capital - 1) * 100
        gross_profit = sum(p for p in pnl_list if p > 0)
        gross_loss = abs(sum(p for p in pnl_list if p < 0))
        profit_factor = gross_profit / (gross_loss + 1e-10) if gross_loss > 0 else float("inf")
        max_drawdown = float(np.min(drawdown)) * 100

        # Annualized metrics
        trading_days = len(rets) - 1
        ann_factor = 252 / max(trading_days, 1)
        ann_return = ((eq[-1] / initial_capital) ** ann_factor - 1) * 100 if trading_days > 0 else 0

        daily_std = np.std(rets[1:]) if len(rets) > 1 else 0
        sharpe = (np.mean(rets[1:]) / (daily_std + 1e-10)) * np.sqrt(252) if daily_std > 0 else 0

        downside = rets[rets < 0]
        downside_std = np.std(downside) if len(downside) > 0 else 0
        sortino = (np.mean(rets[1:]) / (downside_std + 1e-10)) * np.sqrt(252) if downside_std > 0 else 0

        # Monthly returns
        dates = df.index
        monthly_returns = {}
        month_start_eq = equity_curve[0]
        for i in range(1, len(equity_curve)):
            if i < len(dates):
                month_key = dates[i].strftime("%Y-%m") if hasattr(dates[i], 'strftime') else str(dates[i])[:7]
                if i > 0 and (i == len(equity_curve) - 1 or
                    (i < len(dates) and hasattr(dates[i], 'month') and
                     (dates[i].month != dates[i-1].month if i > 0 and hasattr(dates[i-1], 'month') else False))):
                    prev_key = dates[i-1].strftime("%Y-%m") if hasattr(dates[i-1], 'strftime') else str(dates[i-1])[:7]
                    monthly_returns[prev_key] = round((equity_curve[i] / month_start_eq - 1) * 100, 2) if month_start_eq > 0 else 0
                    month_start_eq = equity_curve[i]

        # Downsample equity curve for response size
        eq_len = len(equity_curve)
        step = max(1, eq_len // 250)
        sampled_equity = [
            {"index": i, "equity": round(equity_curve[i], 2)}
            for i in range(0, eq_len, step)
        ]

        return {
            "net_profit": round(net_profit, 2),
            "net_profit_pct": round(net_profit_pct, 2),
            "gross_profit_pct": round(gross_profit, 2),
            "gross_loss_pct": round(gross_loss, 2),
            "profit_factor": round(profit_factor, 4),
            "max_drawdown_pct": round(max_drawdown, 2),
            "sharpe_ratio": round(sharpe, 4),
            "sortino_ratio": round(sortino, 4),
            "annualized_return_pct": round(ann_return, 2),
            "total_trades": total_trades,
            "win_rate": round(win_rate, 4),
            "avg_win_pct": round(avg_win, 2),
            "avg_loss_pct": round(avg_loss, 2),
            "max_consecutive_wins": _max_consecutive(pnl_list, positive=True),
            "max_consecutive_losses": _max_consecutive(pnl_list, positive=False),
            "equity_curve": sampled_equity,
            "trades": trades[-50:],  # last 50 trades
            "monthly_returns": monthly_returns,
            "final_equity": round(eq[-1], 2),
            "trading_days": trading_days,
        }


def _max_consecutive(values: list, positive: bool = True) -> int:
    """Count max consecutive wins or losses."""
    max_count = 0
    current = 0
    for v in values:
        if (positive and v > 0) or (not positive and v < 0):
            current += 1
            max_count = max(max_count, current)
        else:
            current = 0
    return max_count


# Module singleton
pine_backtester = PineScriptBacktester()