Spaces:
Running
Running
| """ | |
| Internal Pine Script Backtester. | |
| Translates the *intent* of Pine Script strategies into Python | |
| and executes them against real historical data via yfinance. | |
| This is NOT a full Pine Script parser — it maps the generated | |
| template strategies to the existing BacktestEngine. | |
| Produces TradingView-style performance metrics: | |
| - Net Profit, Gross Profit, Gross Loss | |
| - Max Drawdown, Profit Factor | |
| - Sharpe Ratio, Sortino Ratio | |
| - Win Rate, Average Win/Loss | |
| - Equity Curve, Monthly Returns | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import re | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| logger = logging.getLogger(__name__) | |
| class PineScriptBacktester: | |
| """ | |
| Simulates Pine Script strategies using Python. | |
| For template-generated strategies, maps the strategy logic | |
| directly to pandas operations for accurate backtesting. | |
| """ | |
| async def backtest( | |
| self, | |
| code: str, | |
| ticker: str = "SPY", | |
| period: str = "3y", | |
| initial_capital: float = 100_000, | |
| commission_pct: float = 0.1, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Run a backtest on Pine Script code. | |
| """ | |
| from app.services.data_ingestion.yahoo import yahoo_adapter | |
| # Fetch data | |
| df = await yahoo_adapter.get_price_dataframe(ticker, period=period) | |
| if df.empty or len(df) < 50: | |
| raise ValueError(f"Insufficient data for {ticker}") | |
| # Detect strategy type from code and run appropriate backtest | |
| strategy_type = self._detect_strategy_type(code) | |
| params = self._extract_parameters(code) | |
| signals = self._generate_signals(df, strategy_type, params) | |
| results = self._simulate_trades(df, signals, initial_capital, commission_pct) | |
| return { | |
| "ticker": ticker, | |
| "period": period, | |
| "strategy_type": strategy_type, | |
| "initial_capital": initial_capital, | |
| **results, | |
| } | |
| def _detect_strategy_type(self, code: str) -> str: | |
| """Determine strategy type from Pine Script code keywords.""" | |
| code_lower = code.lower() | |
| if "ta.supertrend" in code_lower: | |
| return "supertrend" | |
| if "ta.macd" in code_lower: | |
| return "macd" | |
| if "ta.rsi" in code_lower and "ta.stoch" in code_lower: | |
| return "stochastic_rsi" | |
| if "ta.rsi" in code_lower: | |
| return "rsi" | |
| if "ta.bb" in code_lower or "bollinger" in code_lower or "ta.stdev" in code_lower: | |
| return "bollinger" | |
| if "ta.vwap" in code_lower or "vwap" in code_lower: | |
| return "vwap" | |
| if "ichimoku" in code_lower or "donchian" in code_lower: | |
| return "ichimoku" | |
| if "z_score" in code_lower or "z-score" in code_lower: | |
| return "zscore" | |
| if "ema" in code_lower and code_lower.count("ta.ema") >= 3: | |
| return "ema_ribbon" | |
| if "crossover" in code_lower and "sma" in code_lower: | |
| return "sma_crossover" | |
| if "ta.atr" in code_lower and "trail" in code_lower: | |
| return "atr_trailing" | |
| if "request.security" in code_lower: | |
| return "multi_timeframe" | |
| return "sma_crossover" # default | |
| def _extract_parameters(self, code: str) -> Dict[str, Any]: | |
| """Extract input parameters from Pine Script code.""" | |
| params: Dict[str, Any] = {} | |
| # Match input.int(value, ...) and input.float(value, ...) | |
| int_matches = re.findall(r'(\w+)\s*=\s*input\.int\((\d+)', code) | |
| float_matches = re.findall(r'(\w+)\s*=\s*input\.float\(([\d.]+)', code) | |
| for name, val in int_matches: | |
| params[name] = int(val) | |
| for name, val in float_matches: | |
| params[name] = float(val) | |
| return params | |
| def _generate_signals( | |
| self, | |
| df: pd.DataFrame, | |
| strategy_type: str, | |
| params: Dict[str, Any], | |
| ) -> pd.Series: | |
| """ | |
| Generate trading signals: +1 = buy, -1 = sell, 0 = hold. | |
| Maps each strategy type to Python logic. | |
| """ | |
| close = df["Close"] | |
| high = df["High"] | |
| low = df["Low"] | |
| volume = df["Volume"] if "Volume" in df.columns else pd.Series(0, index=df.index) | |
| signals = pd.Series(0, index=df.index) | |
| if strategy_type == "sma_crossover": | |
| fast = params.get("fast_length", 20) | |
| slow = params.get("slow_length", 50) | |
| sma_fast = close.rolling(fast).mean() | |
| sma_slow = close.rolling(slow).mean() | |
| signals[sma_fast > sma_slow] = 1 | |
| signals[sma_fast <= sma_slow] = -1 | |
| elif strategy_type == "rsi": | |
| length = params.get("rsi_length", 14) | |
| oversold = params.get("oversold_level", 30) | |
| overbought = params.get("overbought_level", 70) | |
| delta = close.diff() | |
| gain = delta.where(delta > 0, 0).rolling(length).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(length).mean() | |
| rs = gain / (loss + 1e-10) | |
| rsi = 100 - (100 / (1 + rs)) | |
| signals[rsi < oversold] = 1 | |
| signals[rsi > overbought] = -1 | |
| elif strategy_type == "macd": | |
| fast = params.get("fast_len", 12) | |
| slow = params.get("slow_len", 26) | |
| sig = params.get("sig_len", 9) | |
| ema_fast = close.ewm(span=fast).mean() | |
| ema_slow = close.ewm(span=slow).mean() | |
| macd_line = ema_fast - ema_slow | |
| signal_line = macd_line.ewm(span=sig).mean() | |
| hist = macd_line - signal_line | |
| signals[(macd_line > signal_line) & (hist > 0)] = 1 | |
| signals[macd_line < signal_line] = -1 | |
| elif strategy_type == "bollinger": | |
| length = params.get("length", 20) | |
| mult = params.get("mult", 2.0) | |
| basis = close.rolling(length).mean() | |
| std = close.rolling(length).std() | |
| upper = basis + mult * std | |
| signals[close > upper] = 1 | |
| signals[close < basis] = -1 | |
| elif strategy_type == "supertrend": | |
| atr_len = params.get("atr_len", 10) | |
| factor = params.get("factor", 3.0) | |
| tr = pd.concat([ | |
| high - low, | |
| abs(high - close.shift(1)), | |
| abs(low - close.shift(1)) | |
| ], axis=1).max(axis=1) | |
| atr = tr.rolling(atr_len).mean() | |
| upper_band = (high + low) / 2 + factor * atr | |
| lower_band = (high + low) / 2 - factor * atr | |
| supertrend = pd.Series(0.0, index=df.index) | |
| direction = pd.Series(1, index=df.index) | |
| for i in range(1, len(df)): | |
| if close.iloc[i] > upper_band.iloc[i-1]: | |
| direction.iloc[i] = 1 | |
| elif close.iloc[i] < lower_band.iloc[i-1]: | |
| direction.iloc[i] = -1 | |
| else: | |
| direction.iloc[i] = direction.iloc[i-1] | |
| signals[direction == 1] = 1 | |
| signals[direction == -1] = -1 | |
| elif strategy_type == "ema_ribbon": | |
| emas = [8, 13, 21, 34, 55] | |
| ema_vals = [close.ewm(span=e).mean() for e in emas] | |
| bullish = all(ema_vals[i].iloc[-1] > ema_vals[i+1].iloc[-1] for i in range(len(ema_vals)-1)) | |
| for i in range(len(df)): | |
| if all(ema_vals[j].iloc[i] > ema_vals[j+1].iloc[i] for j in range(len(ema_vals)-1) if i < len(ema_vals[j])): | |
| signals.iloc[i] = 1 | |
| elif all(ema_vals[j].iloc[i] < ema_vals[j+1].iloc[i] for j in range(len(ema_vals)-1) if i < len(ema_vals[j])): | |
| signals.iloc[i] = -1 | |
| elif strategy_type == "zscore": | |
| lookback = params.get("lookback", 50) | |
| entry_z = params.get("entry_z", 2.0) | |
| mean = close.rolling(lookback).mean() | |
| std = close.rolling(lookback).std() | |
| z = (close - mean) / (std + 1e-10) | |
| signals[z < -entry_z] = 1 | |
| signals[z > entry_z] = -1 | |
| else: | |
| # Default: SMA crossover | |
| sma20 = close.rolling(20).mean() | |
| sma50 = close.rolling(50).mean() | |
| signals[sma20 > sma50] = 1 | |
| signals[sma20 <= sma50] = -1 | |
| return signals | |
| def _simulate_trades( | |
| self, | |
| df: pd.DataFrame, | |
| signals: pd.Series, | |
| initial_capital: float, | |
| commission_pct: float, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Simulate trading based on signals and compute all metrics. | |
| """ | |
| close = df["Close"].values | |
| sig = signals.values | |
| n = len(close) | |
| # Track positions and equity | |
| position = 0 # 0 or 1 | |
| entry_price = 0.0 | |
| capital = initial_capital | |
| equity_curve = [initial_capital] | |
| trades: List[Dict[str, Any]] = [] | |
| daily_returns: List[float] = [0.0] | |
| for i in range(1, n): | |
| if sig[i] == 1 and position == 0: | |
| # Buy | |
| shares = capital / close[i] | |
| commission = capital * (commission_pct / 100) | |
| capital -= commission | |
| entry_price = close[i] | |
| position = 1 | |
| trades.append({ | |
| "type": "ENTRY", | |
| "date": str(df.index[i].date()) if hasattr(df.index[i], 'date') else str(df.index[i]), | |
| "price": round(close[i], 2), | |
| "shares": round(shares, 4), | |
| }) | |
| elif sig[i] == -1 and position == 1: | |
| # Sell | |
| ret = (close[i] - entry_price) / entry_price | |
| commission = capital * (1 + ret) * (commission_pct / 100) | |
| capital = capital * (1 + ret) - commission | |
| position = 0 | |
| trades.append({ | |
| "type": "EXIT", | |
| "date": str(df.index[i].date()) if hasattr(df.index[i], 'date') else str(df.index[i]), | |
| "price": round(close[i], 2), | |
| "pnl_pct": round(ret * 100, 2), | |
| "pnl_abs": round(capital - equity_curve[-1], 2), | |
| }) | |
| # Update equity | |
| if position == 1: | |
| current_equity = capital * (close[i] / entry_price) if entry_price > 0 else capital | |
| else: | |
| current_equity = capital | |
| daily_ret = (current_equity - equity_curve[-1]) / equity_curve[-1] if equity_curve[-1] > 0 else 0 | |
| daily_returns.append(daily_ret) | |
| equity_curve.append(current_equity) | |
| # Close open position | |
| if position == 1: | |
| ret = (close[-1] - entry_price) / entry_price | |
| capital = capital * (1 + ret) | |
| # Compute metrics | |
| eq = np.array(equity_curve) | |
| rets = np.array(daily_returns) | |
| peak = np.maximum.accumulate(eq) | |
| drawdown = (eq - peak) / (peak + 1e-10) | |
| # Trade stats | |
| pnl_list = [t.get("pnl_pct", 0) for t in trades if t["type"] == "EXIT"] | |
| wins = [p for p in pnl_list if p > 0] | |
| losses = [p for p in pnl_list if p < 0] | |
| total_trades = len(pnl_list) | |
| win_rate = len(wins) / total_trades if total_trades > 0 else 0 | |
| avg_win = np.mean(wins) if wins else 0 | |
| avg_loss = np.mean(losses) if losses else 0 | |
| net_profit = eq[-1] - initial_capital | |
| net_profit_pct = (eq[-1] / initial_capital - 1) * 100 | |
| gross_profit = sum(p for p in pnl_list if p > 0) | |
| gross_loss = abs(sum(p for p in pnl_list if p < 0)) | |
| profit_factor = gross_profit / (gross_loss + 1e-10) if gross_loss > 0 else float("inf") | |
| max_drawdown = float(np.min(drawdown)) * 100 | |
| # Annualized metrics | |
| trading_days = len(rets) - 1 | |
| ann_factor = 252 / max(trading_days, 1) | |
| ann_return = ((eq[-1] / initial_capital) ** ann_factor - 1) * 100 if trading_days > 0 else 0 | |
| daily_std = np.std(rets[1:]) if len(rets) > 1 else 0 | |
| sharpe = (np.mean(rets[1:]) / (daily_std + 1e-10)) * np.sqrt(252) if daily_std > 0 else 0 | |
| downside = rets[rets < 0] | |
| downside_std = np.std(downside) if len(downside) > 0 else 0 | |
| sortino = (np.mean(rets[1:]) / (downside_std + 1e-10)) * np.sqrt(252) if downside_std > 0 else 0 | |
| # Monthly returns | |
| dates = df.index | |
| monthly_returns = {} | |
| month_start_eq = equity_curve[0] | |
| for i in range(1, len(equity_curve)): | |
| if i < len(dates): | |
| month_key = dates[i].strftime("%Y-%m") if hasattr(dates[i], 'strftime') else str(dates[i])[:7] | |
| if i > 0 and (i == len(equity_curve) - 1 or | |
| (i < len(dates) and hasattr(dates[i], 'month') and | |
| (dates[i].month != dates[i-1].month if i > 0 and hasattr(dates[i-1], 'month') else False))): | |
| prev_key = dates[i-1].strftime("%Y-%m") if hasattr(dates[i-1], 'strftime') else str(dates[i-1])[:7] | |
| monthly_returns[prev_key] = round((equity_curve[i] / month_start_eq - 1) * 100, 2) if month_start_eq > 0 else 0 | |
| month_start_eq = equity_curve[i] | |
| # Downsample equity curve for response size | |
| eq_len = len(equity_curve) | |
| step = max(1, eq_len // 250) | |
| sampled_equity = [ | |
| {"index": i, "equity": round(equity_curve[i], 2)} | |
| for i in range(0, eq_len, step) | |
| ] | |
| return { | |
| "net_profit": round(net_profit, 2), | |
| "net_profit_pct": round(net_profit_pct, 2), | |
| "gross_profit_pct": round(gross_profit, 2), | |
| "gross_loss_pct": round(gross_loss, 2), | |
| "profit_factor": round(profit_factor, 4), | |
| "max_drawdown_pct": round(max_drawdown, 2), | |
| "sharpe_ratio": round(sharpe, 4), | |
| "sortino_ratio": round(sortino, 4), | |
| "annualized_return_pct": round(ann_return, 2), | |
| "total_trades": total_trades, | |
| "win_rate": round(win_rate, 4), | |
| "avg_win_pct": round(avg_win, 2), | |
| "avg_loss_pct": round(avg_loss, 2), | |
| "max_consecutive_wins": _max_consecutive(pnl_list, positive=True), | |
| "max_consecutive_losses": _max_consecutive(pnl_list, positive=False), | |
| "equity_curve": sampled_equity, | |
| "trades": trades[-50:], # last 50 trades | |
| "monthly_returns": monthly_returns, | |
| "final_equity": round(eq[-1], 2), | |
| "trading_days": trading_days, | |
| } | |
| def _max_consecutive(values: list, positive: bool = True) -> int: | |
| """Count max consecutive wins or losses.""" | |
| max_count = 0 | |
| current = 0 | |
| for v in values: | |
| if (positive and v > 0) or (not positive and v < 0): | |
| current += 1 | |
| max_count = max(max_count, current) | |
| else: | |
| current = 0 | |
| return max_count | |
| # Module singleton | |
| pine_backtester = PineScriptBacktester() | |