quanthedge / backend /app /services /pinescript /pine_backtester.py
jashdoshi77's picture
whole lotta changes
e6021a3
"""
Internal Pine Script Backtester.
Translates the *intent* of Pine Script strategies into Python
and executes them against real historical data via yfinance.
This is NOT a full Pine Script parser — it maps the generated
template strategies to the existing BacktestEngine.
Produces TradingView-style performance metrics:
- Net Profit, Gross Profit, Gross Loss
- Max Drawdown, Profit Factor
- Sharpe Ratio, Sortino Ratio
- Win Rate, Average Win/Loss
- Equity Curve, Monthly Returns
"""
from __future__ import annotations
import logging
import re
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
class PineScriptBacktester:
"""
Simulates Pine Script strategies using Python.
For template-generated strategies, maps the strategy logic
directly to pandas operations for accurate backtesting.
"""
async def backtest(
self,
code: str,
ticker: str = "SPY",
period: str = "3y",
initial_capital: float = 100_000,
commission_pct: float = 0.1,
) -> Dict[str, Any]:
"""
Run a backtest on Pine Script code.
"""
from app.services.data_ingestion.yahoo import yahoo_adapter
# Fetch data
df = await yahoo_adapter.get_price_dataframe(ticker, period=period)
if df.empty or len(df) < 50:
raise ValueError(f"Insufficient data for {ticker}")
# Detect strategy type from code and run appropriate backtest
strategy_type = self._detect_strategy_type(code)
params = self._extract_parameters(code)
signals = self._generate_signals(df, strategy_type, params)
results = self._simulate_trades(df, signals, initial_capital, commission_pct)
return {
"ticker": ticker,
"period": period,
"strategy_type": strategy_type,
"initial_capital": initial_capital,
**results,
}
def _detect_strategy_type(self, code: str) -> str:
"""Determine strategy type from Pine Script code keywords."""
code_lower = code.lower()
if "ta.supertrend" in code_lower:
return "supertrend"
if "ta.macd" in code_lower:
return "macd"
if "ta.rsi" in code_lower and "ta.stoch" in code_lower:
return "stochastic_rsi"
if "ta.rsi" in code_lower:
return "rsi"
if "ta.bb" in code_lower or "bollinger" in code_lower or "ta.stdev" in code_lower:
return "bollinger"
if "ta.vwap" in code_lower or "vwap" in code_lower:
return "vwap"
if "ichimoku" in code_lower or "donchian" in code_lower:
return "ichimoku"
if "z_score" in code_lower or "z-score" in code_lower:
return "zscore"
if "ema" in code_lower and code_lower.count("ta.ema") >= 3:
return "ema_ribbon"
if "crossover" in code_lower and "sma" in code_lower:
return "sma_crossover"
if "ta.atr" in code_lower and "trail" in code_lower:
return "atr_trailing"
if "request.security" in code_lower:
return "multi_timeframe"
return "sma_crossover" # default
def _extract_parameters(self, code: str) -> Dict[str, Any]:
"""Extract input parameters from Pine Script code."""
params: Dict[str, Any] = {}
# Match input.int(value, ...) and input.float(value, ...)
int_matches = re.findall(r'(\w+)\s*=\s*input\.int\((\d+)', code)
float_matches = re.findall(r'(\w+)\s*=\s*input\.float\(([\d.]+)', code)
for name, val in int_matches:
params[name] = int(val)
for name, val in float_matches:
params[name] = float(val)
return params
def _generate_signals(
self,
df: pd.DataFrame,
strategy_type: str,
params: Dict[str, Any],
) -> pd.Series:
"""
Generate trading signals: +1 = buy, -1 = sell, 0 = hold.
Maps each strategy type to Python logic.
"""
close = df["Close"]
high = df["High"]
low = df["Low"]
volume = df["Volume"] if "Volume" in df.columns else pd.Series(0, index=df.index)
signals = pd.Series(0, index=df.index)
if strategy_type == "sma_crossover":
fast = params.get("fast_length", 20)
slow = params.get("slow_length", 50)
sma_fast = close.rolling(fast).mean()
sma_slow = close.rolling(slow).mean()
signals[sma_fast > sma_slow] = 1
signals[sma_fast <= sma_slow] = -1
elif strategy_type == "rsi":
length = params.get("rsi_length", 14)
oversold = params.get("oversold_level", 30)
overbought = params.get("overbought_level", 70)
delta = close.diff()
gain = delta.where(delta > 0, 0).rolling(length).mean()
loss = (-delta.where(delta < 0, 0)).rolling(length).mean()
rs = gain / (loss + 1e-10)
rsi = 100 - (100 / (1 + rs))
signals[rsi < oversold] = 1
signals[rsi > overbought] = -1
elif strategy_type == "macd":
fast = params.get("fast_len", 12)
slow = params.get("slow_len", 26)
sig = params.get("sig_len", 9)
ema_fast = close.ewm(span=fast).mean()
ema_slow = close.ewm(span=slow).mean()
macd_line = ema_fast - ema_slow
signal_line = macd_line.ewm(span=sig).mean()
hist = macd_line - signal_line
signals[(macd_line > signal_line) & (hist > 0)] = 1
signals[macd_line < signal_line] = -1
elif strategy_type == "bollinger":
length = params.get("length", 20)
mult = params.get("mult", 2.0)
basis = close.rolling(length).mean()
std = close.rolling(length).std()
upper = basis + mult * std
signals[close > upper] = 1
signals[close < basis] = -1
elif strategy_type == "supertrend":
atr_len = params.get("atr_len", 10)
factor = params.get("factor", 3.0)
tr = pd.concat([
high - low,
abs(high - close.shift(1)),
abs(low - close.shift(1))
], axis=1).max(axis=1)
atr = tr.rolling(atr_len).mean()
upper_band = (high + low) / 2 + factor * atr
lower_band = (high + low) / 2 - factor * atr
supertrend = pd.Series(0.0, index=df.index)
direction = pd.Series(1, index=df.index)
for i in range(1, len(df)):
if close.iloc[i] > upper_band.iloc[i-1]:
direction.iloc[i] = 1
elif close.iloc[i] < lower_band.iloc[i-1]:
direction.iloc[i] = -1
else:
direction.iloc[i] = direction.iloc[i-1]
signals[direction == 1] = 1
signals[direction == -1] = -1
elif strategy_type == "ema_ribbon":
emas = [8, 13, 21, 34, 55]
ema_vals = [close.ewm(span=e).mean() for e in emas]
bullish = all(ema_vals[i].iloc[-1] > ema_vals[i+1].iloc[-1] for i in range(len(ema_vals)-1))
for i in range(len(df)):
if all(ema_vals[j].iloc[i] > ema_vals[j+1].iloc[i] for j in range(len(ema_vals)-1) if i < len(ema_vals[j])):
signals.iloc[i] = 1
elif all(ema_vals[j].iloc[i] < ema_vals[j+1].iloc[i] for j in range(len(ema_vals)-1) if i < len(ema_vals[j])):
signals.iloc[i] = -1
elif strategy_type == "zscore":
lookback = params.get("lookback", 50)
entry_z = params.get("entry_z", 2.0)
mean = close.rolling(lookback).mean()
std = close.rolling(lookback).std()
z = (close - mean) / (std + 1e-10)
signals[z < -entry_z] = 1
signals[z > entry_z] = -1
else:
# Default: SMA crossover
sma20 = close.rolling(20).mean()
sma50 = close.rolling(50).mean()
signals[sma20 > sma50] = 1
signals[sma20 <= sma50] = -1
return signals
def _simulate_trades(
self,
df: pd.DataFrame,
signals: pd.Series,
initial_capital: float,
commission_pct: float,
) -> Dict[str, Any]:
"""
Simulate trading based on signals and compute all metrics.
"""
close = df["Close"].values
sig = signals.values
n = len(close)
# Track positions and equity
position = 0 # 0 or 1
entry_price = 0.0
capital = initial_capital
equity_curve = [initial_capital]
trades: List[Dict[str, Any]] = []
daily_returns: List[float] = [0.0]
for i in range(1, n):
if sig[i] == 1 and position == 0:
# Buy
shares = capital / close[i]
commission = capital * (commission_pct / 100)
capital -= commission
entry_price = close[i]
position = 1
trades.append({
"type": "ENTRY",
"date": str(df.index[i].date()) if hasattr(df.index[i], 'date') else str(df.index[i]),
"price": round(close[i], 2),
"shares": round(shares, 4),
})
elif sig[i] == -1 and position == 1:
# Sell
ret = (close[i] - entry_price) / entry_price
commission = capital * (1 + ret) * (commission_pct / 100)
capital = capital * (1 + ret) - commission
position = 0
trades.append({
"type": "EXIT",
"date": str(df.index[i].date()) if hasattr(df.index[i], 'date') else str(df.index[i]),
"price": round(close[i], 2),
"pnl_pct": round(ret * 100, 2),
"pnl_abs": round(capital - equity_curve[-1], 2),
})
# Update equity
if position == 1:
current_equity = capital * (close[i] / entry_price) if entry_price > 0 else capital
else:
current_equity = capital
daily_ret = (current_equity - equity_curve[-1]) / equity_curve[-1] if equity_curve[-1] > 0 else 0
daily_returns.append(daily_ret)
equity_curve.append(current_equity)
# Close open position
if position == 1:
ret = (close[-1] - entry_price) / entry_price
capital = capital * (1 + ret)
# Compute metrics
eq = np.array(equity_curve)
rets = np.array(daily_returns)
peak = np.maximum.accumulate(eq)
drawdown = (eq - peak) / (peak + 1e-10)
# Trade stats
pnl_list = [t.get("pnl_pct", 0) for t in trades if t["type"] == "EXIT"]
wins = [p for p in pnl_list if p > 0]
losses = [p for p in pnl_list if p < 0]
total_trades = len(pnl_list)
win_rate = len(wins) / total_trades if total_trades > 0 else 0
avg_win = np.mean(wins) if wins else 0
avg_loss = np.mean(losses) if losses else 0
net_profit = eq[-1] - initial_capital
net_profit_pct = (eq[-1] / initial_capital - 1) * 100
gross_profit = sum(p for p in pnl_list if p > 0)
gross_loss = abs(sum(p for p in pnl_list if p < 0))
profit_factor = gross_profit / (gross_loss + 1e-10) if gross_loss > 0 else float("inf")
max_drawdown = float(np.min(drawdown)) * 100
# Annualized metrics
trading_days = len(rets) - 1
ann_factor = 252 / max(trading_days, 1)
ann_return = ((eq[-1] / initial_capital) ** ann_factor - 1) * 100 if trading_days > 0 else 0
daily_std = np.std(rets[1:]) if len(rets) > 1 else 0
sharpe = (np.mean(rets[1:]) / (daily_std + 1e-10)) * np.sqrt(252) if daily_std > 0 else 0
downside = rets[rets < 0]
downside_std = np.std(downside) if len(downside) > 0 else 0
sortino = (np.mean(rets[1:]) / (downside_std + 1e-10)) * np.sqrt(252) if downside_std > 0 else 0
# Monthly returns
dates = df.index
monthly_returns = {}
month_start_eq = equity_curve[0]
for i in range(1, len(equity_curve)):
if i < len(dates):
month_key = dates[i].strftime("%Y-%m") if hasattr(dates[i], 'strftime') else str(dates[i])[:7]
if i > 0 and (i == len(equity_curve) - 1 or
(i < len(dates) and hasattr(dates[i], 'month') and
(dates[i].month != dates[i-1].month if i > 0 and hasattr(dates[i-1], 'month') else False))):
prev_key = dates[i-1].strftime("%Y-%m") if hasattr(dates[i-1], 'strftime') else str(dates[i-1])[:7]
monthly_returns[prev_key] = round((equity_curve[i] / month_start_eq - 1) * 100, 2) if month_start_eq > 0 else 0
month_start_eq = equity_curve[i]
# Downsample equity curve for response size
eq_len = len(equity_curve)
step = max(1, eq_len // 250)
sampled_equity = [
{"index": i, "equity": round(equity_curve[i], 2)}
for i in range(0, eq_len, step)
]
return {
"net_profit": round(net_profit, 2),
"net_profit_pct": round(net_profit_pct, 2),
"gross_profit_pct": round(gross_profit, 2),
"gross_loss_pct": round(gross_loss, 2),
"profit_factor": round(profit_factor, 4),
"max_drawdown_pct": round(max_drawdown, 2),
"sharpe_ratio": round(sharpe, 4),
"sortino_ratio": round(sortino, 4),
"annualized_return_pct": round(ann_return, 2),
"total_trades": total_trades,
"win_rate": round(win_rate, 4),
"avg_win_pct": round(avg_win, 2),
"avg_loss_pct": round(avg_loss, 2),
"max_consecutive_wins": _max_consecutive(pnl_list, positive=True),
"max_consecutive_losses": _max_consecutive(pnl_list, positive=False),
"equity_curve": sampled_equity,
"trades": trades[-50:], # last 50 trades
"monthly_returns": monthly_returns,
"final_equity": round(eq[-1], 2),
"trading_days": trading_days,
}
def _max_consecutive(values: list, positive: bool = True) -> int:
"""Count max consecutive wins or losses."""
max_count = 0
current = 0
for v in values:
if (positive and v > 0) or (not positive and v < 0):
current += 1
max_count = max(max_count, current)
else:
current = 0
return max_count
# Module singleton
pine_backtester = PineScriptBacktester()