""" Internal Pine Script Backtester. Translates the *intent* of Pine Script strategies into Python and executes them against real historical data via yfinance. This is NOT a full Pine Script parser — it maps the generated template strategies to the existing BacktestEngine. Produces TradingView-style performance metrics: - Net Profit, Gross Profit, Gross Loss - Max Drawdown, Profit Factor - Sharpe Ratio, Sortino Ratio - Win Rate, Average Win/Loss - Equity Curve, Monthly Returns """ from __future__ import annotations import logging import re from typing import Any, Dict, List, Optional, Tuple import numpy as np import pandas as pd logger = logging.getLogger(__name__) class PineScriptBacktester: """ Simulates Pine Script strategies using Python. For template-generated strategies, maps the strategy logic directly to pandas operations for accurate backtesting. """ async def backtest( self, code: str, ticker: str = "SPY", period: str = "3y", initial_capital: float = 100_000, commission_pct: float = 0.1, ) -> Dict[str, Any]: """ Run a backtest on Pine Script code. """ from app.services.data_ingestion.yahoo import yahoo_adapter # Fetch data df = await yahoo_adapter.get_price_dataframe(ticker, period=period) if df.empty or len(df) < 50: raise ValueError(f"Insufficient data for {ticker}") # Detect strategy type from code and run appropriate backtest strategy_type = self._detect_strategy_type(code) params = self._extract_parameters(code) signals = self._generate_signals(df, strategy_type, params) results = self._simulate_trades(df, signals, initial_capital, commission_pct) return { "ticker": ticker, "period": period, "strategy_type": strategy_type, "initial_capital": initial_capital, **results, } def _detect_strategy_type(self, code: str) -> str: """Determine strategy type from Pine Script code keywords.""" code_lower = code.lower() if "ta.supertrend" in code_lower: return "supertrend" if "ta.macd" in code_lower: return "macd" if "ta.rsi" in code_lower and "ta.stoch" in code_lower: return "stochastic_rsi" if "ta.rsi" in code_lower: return "rsi" if "ta.bb" in code_lower or "bollinger" in code_lower or "ta.stdev" in code_lower: return "bollinger" if "ta.vwap" in code_lower or "vwap" in code_lower: return "vwap" if "ichimoku" in code_lower or "donchian" in code_lower: return "ichimoku" if "z_score" in code_lower or "z-score" in code_lower: return "zscore" if "ema" in code_lower and code_lower.count("ta.ema") >= 3: return "ema_ribbon" if "crossover" in code_lower and "sma" in code_lower: return "sma_crossover" if "ta.atr" in code_lower and "trail" in code_lower: return "atr_trailing" if "request.security" in code_lower: return "multi_timeframe" return "sma_crossover" # default def _extract_parameters(self, code: str) -> Dict[str, Any]: """Extract input parameters from Pine Script code.""" params: Dict[str, Any] = {} # Match input.int(value, ...) and input.float(value, ...) int_matches = re.findall(r'(\w+)\s*=\s*input\.int\((\d+)', code) float_matches = re.findall(r'(\w+)\s*=\s*input\.float\(([\d.]+)', code) for name, val in int_matches: params[name] = int(val) for name, val in float_matches: params[name] = float(val) return params def _generate_signals( self, df: pd.DataFrame, strategy_type: str, params: Dict[str, Any], ) -> pd.Series: """ Generate trading signals: +1 = buy, -1 = sell, 0 = hold. Maps each strategy type to Python logic. """ close = df["Close"] high = df["High"] low = df["Low"] volume = df["Volume"] if "Volume" in df.columns else pd.Series(0, index=df.index) signals = pd.Series(0, index=df.index) if strategy_type == "sma_crossover": fast = params.get("fast_length", 20) slow = params.get("slow_length", 50) sma_fast = close.rolling(fast).mean() sma_slow = close.rolling(slow).mean() signals[sma_fast > sma_slow] = 1 signals[sma_fast <= sma_slow] = -1 elif strategy_type == "rsi": length = params.get("rsi_length", 14) oversold = params.get("oversold_level", 30) overbought = params.get("overbought_level", 70) delta = close.diff() gain = delta.where(delta > 0, 0).rolling(length).mean() loss = (-delta.where(delta < 0, 0)).rolling(length).mean() rs = gain / (loss + 1e-10) rsi = 100 - (100 / (1 + rs)) signals[rsi < oversold] = 1 signals[rsi > overbought] = -1 elif strategy_type == "macd": fast = params.get("fast_len", 12) slow = params.get("slow_len", 26) sig = params.get("sig_len", 9) ema_fast = close.ewm(span=fast).mean() ema_slow = close.ewm(span=slow).mean() macd_line = ema_fast - ema_slow signal_line = macd_line.ewm(span=sig).mean() hist = macd_line - signal_line signals[(macd_line > signal_line) & (hist > 0)] = 1 signals[macd_line < signal_line] = -1 elif strategy_type == "bollinger": length = params.get("length", 20) mult = params.get("mult", 2.0) basis = close.rolling(length).mean() std = close.rolling(length).std() upper = basis + mult * std signals[close > upper] = 1 signals[close < basis] = -1 elif strategy_type == "supertrend": atr_len = params.get("atr_len", 10) factor = params.get("factor", 3.0) tr = pd.concat([ high - low, abs(high - close.shift(1)), abs(low - close.shift(1)) ], axis=1).max(axis=1) atr = tr.rolling(atr_len).mean() upper_band = (high + low) / 2 + factor * atr lower_band = (high + low) / 2 - factor * atr supertrend = pd.Series(0.0, index=df.index) direction = pd.Series(1, index=df.index) for i in range(1, len(df)): if close.iloc[i] > upper_band.iloc[i-1]: direction.iloc[i] = 1 elif close.iloc[i] < lower_band.iloc[i-1]: direction.iloc[i] = -1 else: direction.iloc[i] = direction.iloc[i-1] signals[direction == 1] = 1 signals[direction == -1] = -1 elif strategy_type == "ema_ribbon": emas = [8, 13, 21, 34, 55] ema_vals = [close.ewm(span=e).mean() for e in emas] bullish = all(ema_vals[i].iloc[-1] > ema_vals[i+1].iloc[-1] for i in range(len(ema_vals)-1)) for i in range(len(df)): if all(ema_vals[j].iloc[i] > ema_vals[j+1].iloc[i] for j in range(len(ema_vals)-1) if i < len(ema_vals[j])): signals.iloc[i] = 1 elif all(ema_vals[j].iloc[i] < ema_vals[j+1].iloc[i] for j in range(len(ema_vals)-1) if i < len(ema_vals[j])): signals.iloc[i] = -1 elif strategy_type == "zscore": lookback = params.get("lookback", 50) entry_z = params.get("entry_z", 2.0) mean = close.rolling(lookback).mean() std = close.rolling(lookback).std() z = (close - mean) / (std + 1e-10) signals[z < -entry_z] = 1 signals[z > entry_z] = -1 else: # Default: SMA crossover sma20 = close.rolling(20).mean() sma50 = close.rolling(50).mean() signals[sma20 > sma50] = 1 signals[sma20 <= sma50] = -1 return signals def _simulate_trades( self, df: pd.DataFrame, signals: pd.Series, initial_capital: float, commission_pct: float, ) -> Dict[str, Any]: """ Simulate trading based on signals and compute all metrics. """ close = df["Close"].values sig = signals.values n = len(close) # Track positions and equity position = 0 # 0 or 1 entry_price = 0.0 capital = initial_capital equity_curve = [initial_capital] trades: List[Dict[str, Any]] = [] daily_returns: List[float] = [0.0] for i in range(1, n): if sig[i] == 1 and position == 0: # Buy shares = capital / close[i] commission = capital * (commission_pct / 100) capital -= commission entry_price = close[i] position = 1 trades.append({ "type": "ENTRY", "date": str(df.index[i].date()) if hasattr(df.index[i], 'date') else str(df.index[i]), "price": round(close[i], 2), "shares": round(shares, 4), }) elif sig[i] == -1 and position == 1: # Sell ret = (close[i] - entry_price) / entry_price commission = capital * (1 + ret) * (commission_pct / 100) capital = capital * (1 + ret) - commission position = 0 trades.append({ "type": "EXIT", "date": str(df.index[i].date()) if hasattr(df.index[i], 'date') else str(df.index[i]), "price": round(close[i], 2), "pnl_pct": round(ret * 100, 2), "pnl_abs": round(capital - equity_curve[-1], 2), }) # Update equity if position == 1: current_equity = capital * (close[i] / entry_price) if entry_price > 0 else capital else: current_equity = capital daily_ret = (current_equity - equity_curve[-1]) / equity_curve[-1] if equity_curve[-1] > 0 else 0 daily_returns.append(daily_ret) equity_curve.append(current_equity) # Close open position if position == 1: ret = (close[-1] - entry_price) / entry_price capital = capital * (1 + ret) # Compute metrics eq = np.array(equity_curve) rets = np.array(daily_returns) peak = np.maximum.accumulate(eq) drawdown = (eq - peak) / (peak + 1e-10) # Trade stats pnl_list = [t.get("pnl_pct", 0) for t in trades if t["type"] == "EXIT"] wins = [p for p in pnl_list if p > 0] losses = [p for p in pnl_list if p < 0] total_trades = len(pnl_list) win_rate = len(wins) / total_trades if total_trades > 0 else 0 avg_win = np.mean(wins) if wins else 0 avg_loss = np.mean(losses) if losses else 0 net_profit = eq[-1] - initial_capital net_profit_pct = (eq[-1] / initial_capital - 1) * 100 gross_profit = sum(p for p in pnl_list if p > 0) gross_loss = abs(sum(p for p in pnl_list if p < 0)) profit_factor = gross_profit / (gross_loss + 1e-10) if gross_loss > 0 else float("inf") max_drawdown = float(np.min(drawdown)) * 100 # Annualized metrics trading_days = len(rets) - 1 ann_factor = 252 / max(trading_days, 1) ann_return = ((eq[-1] / initial_capital) ** ann_factor - 1) * 100 if trading_days > 0 else 0 daily_std = np.std(rets[1:]) if len(rets) > 1 else 0 sharpe = (np.mean(rets[1:]) / (daily_std + 1e-10)) * np.sqrt(252) if daily_std > 0 else 0 downside = rets[rets < 0] downside_std = np.std(downside) if len(downside) > 0 else 0 sortino = (np.mean(rets[1:]) / (downside_std + 1e-10)) * np.sqrt(252) if downside_std > 0 else 0 # Monthly returns dates = df.index monthly_returns = {} month_start_eq = equity_curve[0] for i in range(1, len(equity_curve)): if i < len(dates): month_key = dates[i].strftime("%Y-%m") if hasattr(dates[i], 'strftime') else str(dates[i])[:7] if i > 0 and (i == len(equity_curve) - 1 or (i < len(dates) and hasattr(dates[i], 'month') and (dates[i].month != dates[i-1].month if i > 0 and hasattr(dates[i-1], 'month') else False))): prev_key = dates[i-1].strftime("%Y-%m") if hasattr(dates[i-1], 'strftime') else str(dates[i-1])[:7] monthly_returns[prev_key] = round((equity_curve[i] / month_start_eq - 1) * 100, 2) if month_start_eq > 0 else 0 month_start_eq = equity_curve[i] # Downsample equity curve for response size eq_len = len(equity_curve) step = max(1, eq_len // 250) sampled_equity = [ {"index": i, "equity": round(equity_curve[i], 2)} for i in range(0, eq_len, step) ] return { "net_profit": round(net_profit, 2), "net_profit_pct": round(net_profit_pct, 2), "gross_profit_pct": round(gross_profit, 2), "gross_loss_pct": round(gross_loss, 2), "profit_factor": round(profit_factor, 4), "max_drawdown_pct": round(max_drawdown, 2), "sharpe_ratio": round(sharpe, 4), "sortino_ratio": round(sortino, 4), "annualized_return_pct": round(ann_return, 2), "total_trades": total_trades, "win_rate": round(win_rate, 4), "avg_win_pct": round(avg_win, 2), "avg_loss_pct": round(avg_loss, 2), "max_consecutive_wins": _max_consecutive(pnl_list, positive=True), "max_consecutive_losses": _max_consecutive(pnl_list, positive=False), "equity_curve": sampled_equity, "trades": trades[-50:], # last 50 trades "monthly_returns": monthly_returns, "final_equity": round(eq[-1], 2), "trading_days": trading_days, } def _max_consecutive(values: list, positive: bool = True) -> int: """Count max consecutive wins or losses.""" max_count = 0 current = 0 for v in values: if (positive and v > 0) or (not positive and v < 0): current += 1 max_count = max(max_count, current) else: current = 0 return max_count # Module singleton pine_backtester = PineScriptBacktester()