| import gradio as gr |
| import pandas as pd |
| import numpy as np |
| import yfinance as yf |
| import plotly.graph_objects as go |
| import tempfile |
| from datetime import timedelta |
|
|
| |
| def load_price_data(tickers_text, uploaded_csv, period="1y", interval="1d"): |
| |
| if uploaded_csv is not None: |
| df = pd.read_csv(uploaded_csv) |
| |
| df.columns = [c.strip() for c in df.columns] |
| if 'Date' not in df.columns and 'date' in df.columns: |
| df.rename(columns={'date': 'Date'}, inplace=True) |
| df['Date'] = pd.to_datetime(df['Date']) |
| df = df.sort_values('Date') |
| |
| |
| if set(['Open','High','Low','Close']).issubset(set(df.columns)): |
| |
| return {"UPLOADED": df[['Date','Open','High','Low','Close']].copy()} |
| else: |
| |
| out = {} |
| for col in df.columns: |
| if col.lower() == 'date': |
| continue |
| ser = df[['Date', col]].dropna(how='all').copy() |
| ser.columns = ['Date', 'Close'] |
| |
| ser['Open'] = ser['Close'] |
| ser['High'] = ser['Close'] |
| ser['Low'] = ser['Close'] |
| out[col] = ser[['Date','Open','High','Low','Close']].reset_index(drop=True) |
| return out |
|
|
| |
| tickers = [t.strip().upper() for t in tickers_text.split(",") if t.strip()] |
| if not tickers: |
| return {} |
| data = yf.download(tickers, period=period, interval=interval, auto_adjust=False, progress=False, threads=True) |
| result = {} |
| if isinstance(data.columns, pd.MultiIndex): |
| for t in tickers: |
| if t in data.columns.levels[0]: |
| d = data[t].dropna(how='all').reset_index() |
| if set(['Open','High','Low','Close']).issubset(d.columns): |
| result[t] = d[['Date','Open','High','Low','Close']].copy() |
| else: |
| |
| d = data.reset_index() |
| if set(['Open','High','Low','Close']).issubset(d.columns): |
| result[tickers[0]] = d[['Date','Open','High','Low','Close']].copy() |
| return result |
|
|
| def sma(series, window): |
| return series.rolling(window=window, min_periods=1).mean() |
|
|
| def generate_signals(df, short_w, long_w): |
| df = df.copy().reset_index(drop=True) |
| df['sma_short'] = sma(df['Close'], short_w) |
| df['sma_long'] = sma(df['Close'], long_w) |
| df['signal'] = 0 |
| |
| df.loc[(df['sma_short'] > df['sma_long']) & (df['sma_short'].shift(1) <= df['sma_long'].shift(1)), 'signal'] = 1 |
| df.loc[(df['sma_short'] < df['sma_long']) & (df['sma_short'].shift(1) >= df['sma_long'].shift(1)), 'signal'] = -1 |
| return df |
|
|
| def run_backtest_on_df(df, stop_loss_pct=None, take_profit_pct=None, initial_capital=10000): |
| """ |
| df must have Date, Open, High, Low, Close |
| Returns trades list and daily strategy returns series |
| """ |
| df = df.copy().reset_index(drop=True) |
| df['Date'] = pd.to_datetime(df['Date']) |
| trades = [] |
| in_position = False |
| entry = {} |
| position_history = [] |
| for i in range(len(df)): |
| row = df.loc[i] |
| position_history.append(1 if in_position else 0) |
| |
| if (not in_position) and row.get('signal', 0) == 1: |
| |
| exec_price = None |
| if i+1 < len(df): |
| exec_price = float(df.loc[i+1]['Open']) |
| entry_date = df.loc[i+1]['Date'] |
| start_i = i+1 |
| else: |
| exec_price = float(row['Close']) |
| entry_date = row['Date'] |
| start_i = i |
| entry = { |
| 'entry_date': entry_date, |
| 'entry_price': exec_price, |
| 'entry_index': start_i |
| } |
| in_position = True |
| |
| if stop_loss_pct is not None: |
| entry['stop'] = exec_price * (1 - stop_loss_pct/100.0) |
| else: |
| entry['stop'] = None |
| if take_profit_pct is not None: |
| entry['take'] = exec_price * (1 + take_profit_pct/100.0) |
| else: |
| entry['take'] = None |
| continue |
|
|
| |
| if in_position: |
| |
| cur_high = float(row['High']) |
| cur_low = float(row['Low']) |
| cur_close = float(row['Close']) |
| exit_price = None |
| exit_date = None |
| exit_index = i |
| |
| if entry.get('take') is not None and cur_high >= entry['take']: |
| exit_price = entry['take'] |
| exit_date = row['Date'] |
| |
| elif entry.get('stop') is not None and cur_low <= entry['stop']: |
| exit_price = entry['stop'] |
| exit_date = row['Date'] |
| |
| elif row.get('signal', 0) == -1: |
| |
| if i+1 < len(df): |
| exit_price = float(df.loc[i+1]['Open']) |
| exit_date = df.loc[i+1]['Date'] |
| exit_index = i+1 |
| else: |
| exit_price = cur_close |
| exit_date = row['Date'] |
| exit_index = i |
| if exit_price is not None: |
| ret = (exit_price / entry['entry_price']) - 1.0 |
| trades.append({ |
| 'entry_date': entry['entry_date'], |
| 'entry_price': entry['entry_price'], |
| 'exit_date': exit_date, |
| 'exit_price': exit_price, |
| 'return_pct': ret, |
| 'holding_days': (exit_date - entry['entry_date']).days if entry.get('entry_date') is not None else None |
| }) |
| in_position = False |
| entry = {} |
| |
| |
| |
| if in_position and entry: |
| last = df.iloc[-1] |
| exit_price = float(last['Close']) |
| exit_date = last['Date'] |
| ret = (exit_price / entry['entry_price']) - 1.0 |
| trades.append({ |
| 'entry_date': entry['entry_date'], |
| 'entry_price': entry['entry_price'], |
| 'exit_date': exit_date, |
| 'exit_price': exit_price, |
| 'return_pct': ret, |
| 'holding_days': (exit_date - entry['entry_date']).days if entry.get('entry_date') is not None else None |
| }) |
| in_position = False |
|
|
| |
| price = df.set_index('Date')['Close'].sort_index() |
| |
| pos = pd.Series(0, index=price.index) |
| for tr in trades: |
| start = tr['entry_date'] |
| end = tr['exit_date'] |
| |
| mask = (pos.index >= pd.to_datetime(start)) & (pos.index <= pd.to_datetime(end)) |
| pos.loc[mask] = 1 |
| |
| daily_rets = price.pct_change().fillna(0) |
| strat_rets = daily_rets * pos.shift(1).fillna(0) |
| strat_rets = strat_rets.fillna(0) |
|
|
| |
| equity = (1 + strat_rets).cumprod() * initial_capital |
|
|
| return trades, strat_rets, equity |
|
|
| def compute_performance_metrics(strat_rets, initial_capital=10000, rf_pct=0.0): |
| |
| total_days = len(strat_rets) |
| if total_days == 0: |
| return {} |
| ending_value = (1 + strat_rets).cumprod().iloc[-1] * initial_capital |
| total_return = ending_value / initial_capital - 1.0 |
| |
| trading_days = 252 |
| years = total_days / trading_days |
| cagr = (ending_value / initial_capital) ** (1 / years) - 1 if years > 0 else 0.0 |
| ann_vol = strat_rets.std(ddof=0) * np.sqrt(trading_days) |
| sharpe = (cagr - rf_pct/100.0) / (ann_vol or 1e-9) |
| |
| cum = (1 + strat_rets).cumprod() |
| peak = cum.cummax() |
| drawdown = (cum - peak) / peak |
| max_dd = drawdown.min() |
| return { |
| 'ending_value': float(ending_value), |
| 'total_return_pct': float(total_return), |
| 'CAGR': float(cagr), |
| 'annual_volatility': float(ann_vol), |
| 'sharpe': float(sharpe), |
| 'max_drawdown_pct': float(max_dd) |
| } |
|
|
| def trades_to_df(trades): |
| if not trades: |
| return pd.DataFrame(columns=['entry_date','entry_price','exit_date','exit_price','return_pct','holding_days']) |
| df = pd.DataFrame(trades) |
| df['entry_date'] = pd.to_datetime(df['entry_date']) |
| df['exit_date'] = pd.to_datetime(df['exit_date']) |
| df['return_pct'] = df['return_pct'].astype(float) |
| return df |
|
|
| def save_trades_csv(df_trades): |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", prefix="trades_") |
| df_trades.to_csv(tmp.name, index=False) |
| return tmp.name |
|
|
| |
| def run_backtester(tickers_text, uploaded_csv, period, interval, short_w, long_w, stop_loss_pct, take_profit_pct, initial_capital, rf_pct): |
| try: |
| price_dict = load_price_data(tickers_text, uploaded_csv, period=period, interval=interval) |
| except Exception as e: |
| return f"Data load error: {e}", None, None, None, None |
|
|
| if not price_dict: |
| return "No price data returned. Provide valid tickers or a proper CSV.", None, None, None, None |
|
|
| |
| ticker = list(price_dict.keys())[0] |
| df = price_dict[ticker].copy().reset_index(drop=True) |
| |
| if short_w >= long_w: |
| return "Short window must be smaller than long window.", None, None, None, None |
| df = generate_signals(df, int(short_w), int(long_w)) |
|
|
| trades, strat_rets, equity = run_backtest_on_df(df, |
| stop_loss_pct=float(stop_loss_pct) if stop_loss_pct is not None else None, |
| take_profit_pct=float(take_profit_pct) if take_profit_pct is not None else None, |
| initial_capital=float(initial_capital)) |
|
|
| perf = compute_performance_metrics(strat_rets, initial_capital=float(initial_capital), rf_pct=float(rf_pct)) |
| trades_df = trades_to_df(trades) |
| trades_csv = save_trades_csv(trades_df) if not trades_df.empty else None |
|
|
| |
| fig = go.Figure() |
| if equity is not None and not equity.empty: |
| fig.add_trace(go.Line(x=equity.index, y=equity.values, name='Equity Curve')) |
| fig.update_layout(title=f'Equity Curve ({ticker})', xaxis_title='Date', yaxis_title='Portfolio Value') |
| else: |
| fig = None |
|
|
| perf_text = ("Performance summary:\n" |
| f"Ending value: {perf.get('ending_value',0):.2f}\n" |
| f"Total return: {perf.get('total_return_pct',0)*100:.2f}%\n" |
| f"CAGR: {perf.get('CAGR',0)*100:.2f}%\n" |
| f"Annual vol: {perf.get('annual_volatility',0)*100:.2f}%\n" |
| f"Sharpe: {perf.get('sharpe',0):.3f}\n" |
| f"Max Drawdown: {perf.get('max_drawdown_pct',0)*100:.2f}%\n" |
| f"Number of trades: {len(trades)}") |
|
|
| |
| trades_html = trades_df.round(6).to_html(classes="table table-striped", index=False) if not trades_df.empty else "<i>No trades executed</i>" |
|
|
| return perf_text, fig, trades_html, trades_csv, ticker |
|
|
| |
| with gr.Blocks(title="Event-Driven Backtester") as demo: |
| gr.Markdown("## 🔁 Event-Driven Backtester (SMA crossover)\nUpload OHLC CSV or provide tickers. Strategy: short SMA crossover long SMA. Options: stop-loss / take-profit.") |
| with gr.Row(): |
| with gr.Column(scale=2): |
| tickers = gr.Textbox(label="Tickers (comma-separated)", value="AAPL") |
| uploaded = gr.File(label="Or upload CSV (Date + OHLC or Date + Close columns)", file_types=[".csv"]) |
| period = gr.Dropdown(choices=["3mo","6mo","1y","2y","5y"], value="1y", label="Fetch period (if using tickers)") |
| interval = gr.Dropdown(choices=["1d","1wk"], value="1d", label="Fetch interval") |
| short_w = gr.Number(value=20, label="Short SMA window (days)") |
| long_w = gr.Number(value=50, label="Long SMA window (days)") |
| stop_loss_pct = gr.Number(value=5.0, label="Stop-loss percent (0 to disable)", precision=2) |
| take_profit_pct = gr.Number(value=10.0, label="Take-profit percent (0 to disable)", precision=2) |
| initial_capital = gr.Number(value=10000, label="Initial capital (USD)") |
| rf_pct = gr.Number(value=0.0, label="Risk-free rate (%) for Sharpe") |
| run_btn = gr.Button("Run Backtest") |
| with gr.Column(scale=3): |
| perf_out = gr.Textbox(label="Performance summary", interactive=False) |
| eq_plot = gr.Plot() |
| trades_html = gr.HTML() |
| trades_file = gr.File(label="Download trades CSV") |
|
|
| run_btn.click(fn=run_backtester, |
| inputs=[tickers, uploaded, period, interval, short_w, long_w, stop_loss_pct, take_profit_pct, initial_capital, rf_pct], |
| outputs=[perf_out, eq_plot, trades_html, trades_file, gr.Textbox()]) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|