zainulabedin949's picture
Update app.py
41aff0a verified
import gradio as gr
import pandas as pd
import numpy as np
import yfinance as yf
import plotly.graph_objects as go
import tempfile
from datetime import timedelta
# ---------- Helpers ----------
def load_price_data(tickers_text, uploaded_csv, period="1y", interval="1d"):
# Return dict: ticker -> dataframe with Date, Open, High, Low, Close
if uploaded_csv is not None:
df = pd.read_csv(uploaded_csv)
# normalize column names
df.columns = [c.strip() for c in df.columns]
if 'Date' not in df.columns and 'date' in df.columns:
df.rename(columns={'date': 'Date'}, inplace=True)
df['Date'] = pd.to_datetime(df['Date'])
df = df.sort_values('Date')
# guess tickers as other columns (Close series per ticker) or expect OHLC per ticker
# If columns include 'Close' assume single-series CSV
if set(['Open','High','Low','Close']).issubset(set(df.columns)):
# single series with OHLC
return {"UPLOADED": df[['Date','Open','High','Low','Close']].copy()}
else:
# multiple columns, first is Date, others are tickers with prices -> convert to Date + Close per ticker
out = {}
for col in df.columns:
if col.lower() == 'date':
continue
ser = df[['Date', col]].dropna(how='all').copy()
ser.columns = ['Date', 'Close']
# fill Open/High/Low with Close if not present
ser['Open'] = ser['Close']
ser['High'] = ser['Close']
ser['Low'] = ser['Close']
out[col] = ser[['Date','Open','High','Low','Close']].reset_index(drop=True)
return out
# else fetch from yfinance
tickers = [t.strip().upper() for t in tickers_text.split(",") if t.strip()]
if not tickers:
return {}
data = yf.download(tickers, period=period, interval=interval, auto_adjust=False, progress=False, threads=True)
result = {}
if isinstance(data.columns, pd.MultiIndex):
for t in tickers:
if t in data.columns.levels[0]:
d = data[t].dropna(how='all').reset_index()
if set(['Open','High','Low','Close']).issubset(d.columns):
result[t] = d[['Date','Open','High','Low','Close']].copy()
else:
# single ticker returned
d = data.reset_index()
if set(['Open','High','Low','Close']).issubset(d.columns):
result[tickers[0]] = d[['Date','Open','High','Low','Close']].copy()
return result
def sma(series, window):
return series.rolling(window=window, min_periods=1).mean()
def generate_signals(df, short_w, long_w):
df = df.copy().reset_index(drop=True)
df['sma_short'] = sma(df['Close'], short_w)
df['sma_long'] = sma(df['Close'], long_w)
df['signal'] = 0
# when short crosses above long => 1, when crosses below => -1
df.loc[(df['sma_short'] > df['sma_long']) & (df['sma_short'].shift(1) <= df['sma_long'].shift(1)), 'signal'] = 1
df.loc[(df['sma_short'] < df['sma_long']) & (df['sma_short'].shift(1) >= df['sma_long'].shift(1)), 'signal'] = -1
return df
def run_backtest_on_df(df, stop_loss_pct=None, take_profit_pct=None, initial_capital=10000):
"""
df must have Date, Open, High, Low, Close
Returns trades list and daily strategy returns series
"""
df = df.copy().reset_index(drop=True)
df['Date'] = pd.to_datetime(df['Date'])
trades = []
in_position = False
entry = {}
position_history = [] # list aligned with df rows: 1 if long, 0 if flat
for i in range(len(df)):
row = df.loc[i]
position_history.append(1 if in_position else 0)
# enter
if (not in_position) and row.get('signal', 0) == 1:
# entry price use next row Open if available, else current Close
exec_price = None
if i+1 < len(df):
exec_price = float(df.loc[i+1]['Open'])
entry_date = df.loc[i+1]['Date']
start_i = i+1
else:
exec_price = float(row['Close'])
entry_date = row['Date']
start_i = i
entry = {
'entry_date': entry_date,
'entry_price': exec_price,
'entry_index': start_i
}
in_position = True
# compute stop/take if provided
if stop_loss_pct is not None:
entry['stop'] = exec_price * (1 - stop_loss_pct/100.0)
else:
entry['stop'] = None
if take_profit_pct is not None:
entry['take'] = exec_price * (1 + take_profit_pct/100.0)
else:
entry['take'] = None
continue
# if in position, check exits: TP/SL intraday using High/Low, or exit signal
if in_position:
# current row could be the day of entry; check from entry_index onward
cur_high = float(row['High'])
cur_low = float(row['Low'])
cur_close = float(row['Close'])
exit_price = None
exit_date = None
exit_index = i
# check TP first
if entry.get('take') is not None and cur_high >= entry['take']:
exit_price = entry['take']
exit_date = row['Date']
# check SL
elif entry.get('stop') is not None and cur_low <= entry['stop']:
exit_price = entry['stop']
exit_date = row['Date']
# check exit signal
elif row.get('signal', 0) == -1:
# exit at next open if possible else close
if i+1 < len(df):
exit_price = float(df.loc[i+1]['Open'])
exit_date = df.loc[i+1]['Date']
exit_index = i+1
else:
exit_price = cur_close
exit_date = row['Date']
exit_index = i
if exit_price is not None:
ret = (exit_price / entry['entry_price']) - 1.0
trades.append({
'entry_date': entry['entry_date'],
'entry_price': entry['entry_price'],
'exit_date': exit_date,
'exit_price': exit_price,
'return_pct': ret,
'holding_days': (exit_date - entry['entry_date']).days if entry.get('entry_date') is not None else None
})
in_position = False
entry = {}
# mark position_history for the exit day as 0 (since we consider exit at open/price)
# continue loop
# if still in position at the end, close at last Close
if in_position and entry:
last = df.iloc[-1]
exit_price = float(last['Close'])
exit_date = last['Date']
ret = (exit_price / entry['entry_price']) - 1.0
trades.append({
'entry_date': entry['entry_date'],
'entry_price': entry['entry_price'],
'exit_date': exit_date,
'exit_price': exit_price,
'return_pct': ret,
'holding_days': (exit_date - entry['entry_date']).days if entry.get('entry_date') is not None else None
})
in_position = False
# Build daily returns series from trades and price series
price = df.set_index('Date')['Close'].sort_index()
# create positions by marking date ranges when in a trade
pos = pd.Series(0, index=price.index)
for tr in trades:
start = tr['entry_date']
end = tr['exit_date']
# include start through end (if present in index)
mask = (pos.index >= pd.to_datetime(start)) & (pos.index <= pd.to_datetime(end))
pos.loc[mask] = 1
# strategy daily returns: when pos was 1 during day t-1 to t, we use pct_change
daily_rets = price.pct_change().fillna(0)
strat_rets = daily_rets * pos.shift(1).fillna(0) # assume position applied to next day returns
strat_rets = strat_rets.fillna(0)
# equity curve
equity = (1 + strat_rets).cumprod() * initial_capital
return trades, strat_rets, equity
def compute_performance_metrics(strat_rets, initial_capital=10000, rf_pct=0.0):
# strat_rets is daily returns series
total_days = len(strat_rets)
if total_days == 0:
return {}
ending_value = (1 + strat_rets).cumprod().iloc[-1] * initial_capital
total_return = ending_value / initial_capital - 1.0
# annualize
trading_days = 252
years = total_days / trading_days
cagr = (ending_value / initial_capital) ** (1 / years) - 1 if years > 0 else 0.0
ann_vol = strat_rets.std(ddof=0) * np.sqrt(trading_days)
sharpe = (cagr - rf_pct/100.0) / (ann_vol or 1e-9)
# max drawdown
cum = (1 + strat_rets).cumprod()
peak = cum.cummax()
drawdown = (cum - peak) / peak
max_dd = drawdown.min()
return {
'ending_value': float(ending_value),
'total_return_pct': float(total_return),
'CAGR': float(cagr),
'annual_volatility': float(ann_vol),
'sharpe': float(sharpe),
'max_drawdown_pct': float(max_dd)
}
def trades_to_df(trades):
if not trades:
return pd.DataFrame(columns=['entry_date','entry_price','exit_date','exit_price','return_pct','holding_days'])
df = pd.DataFrame(trades)
df['entry_date'] = pd.to_datetime(df['entry_date'])
df['exit_date'] = pd.to_datetime(df['exit_date'])
df['return_pct'] = df['return_pct'].astype(float)
return df
def save_trades_csv(df_trades):
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", prefix="trades_")
df_trades.to_csv(tmp.name, index=False)
return tmp.name
# ---------- Main Gradio function ----------
def run_backtester(tickers_text, uploaded_csv, period, interval, short_w, long_w, stop_loss_pct, take_profit_pct, initial_capital, rf_pct):
try:
price_dict = load_price_data(tickers_text, uploaded_csv, period=period, interval=interval)
except Exception as e:
return f"Data load error: {e}", None, None, None, None
if not price_dict:
return "No price data returned. Provide valid tickers or a proper CSV.", None, None, None, None
# If multiple tickers present in uploaded CSV, show list to choose; for simplicity pick first returned ticker
ticker = list(price_dict.keys())[0]
df = price_dict[ticker].copy().reset_index(drop=True)
# compute signals
if short_w >= long_w:
return "Short window must be smaller than long window.", None, None, None, None
df = generate_signals(df, int(short_w), int(long_w))
trades, strat_rets, equity = run_backtest_on_df(df,
stop_loss_pct=float(stop_loss_pct) if stop_loss_pct is not None else None,
take_profit_pct=float(take_profit_pct) if take_profit_pct is not None else None,
initial_capital=float(initial_capital))
perf = compute_performance_metrics(strat_rets, initial_capital=float(initial_capital), rf_pct=float(rf_pct))
trades_df = trades_to_df(trades)
trades_csv = save_trades_csv(trades_df) if not trades_df.empty else None
# Equity plot
fig = go.Figure()
if equity is not None and not equity.empty:
fig.add_trace(go.Line(x=equity.index, y=equity.values, name='Equity Curve'))
fig.update_layout(title=f'Equity Curve ({ticker})', xaxis_title='Date', yaxis_title='Portfolio Value')
else:
fig = None
perf_text = ("Performance summary:\n"
f"Ending value: {perf.get('ending_value',0):.2f}\n"
f"Total return: {perf.get('total_return_pct',0)*100:.2f}%\n"
f"CAGR: {perf.get('CAGR',0)*100:.2f}%\n"
f"Annual vol: {perf.get('annual_volatility',0)*100:.2f}%\n"
f"Sharpe: {perf.get('sharpe',0):.3f}\n"
f"Max Drawdown: {perf.get('max_drawdown_pct',0)*100:.2f}%\n"
f"Number of trades: {len(trades)}")
# trades table html
trades_html = trades_df.round(6).to_html(classes="table table-striped", index=False) if not trades_df.empty else "<i>No trades executed</i>"
return perf_text, fig, trades_html, trades_csv, ticker
# ---------- Gradio UI ----------
with gr.Blocks(title="Event-Driven Backtester") as demo:
gr.Markdown("## 🔁 Event-Driven Backtester (SMA crossover)\nUpload OHLC CSV or provide tickers. Strategy: short SMA crossover long SMA. Options: stop-loss / take-profit.")
with gr.Row():
with gr.Column(scale=2):
tickers = gr.Textbox(label="Tickers (comma-separated)", value="AAPL")
uploaded = gr.File(label="Or upload CSV (Date + OHLC or Date + Close columns)", file_types=[".csv"])
period = gr.Dropdown(choices=["3mo","6mo","1y","2y","5y"], value="1y", label="Fetch period (if using tickers)")
interval = gr.Dropdown(choices=["1d","1wk"], value="1d", label="Fetch interval")
short_w = gr.Number(value=20, label="Short SMA window (days)")
long_w = gr.Number(value=50, label="Long SMA window (days)")
stop_loss_pct = gr.Number(value=5.0, label="Stop-loss percent (0 to disable)", precision=2)
take_profit_pct = gr.Number(value=10.0, label="Take-profit percent (0 to disable)", precision=2)
initial_capital = gr.Number(value=10000, label="Initial capital (USD)")
rf_pct = gr.Number(value=0.0, label="Risk-free rate (%) for Sharpe")
run_btn = gr.Button("Run Backtest")
with gr.Column(scale=3):
perf_out = gr.Textbox(label="Performance summary", interactive=False)
eq_plot = gr.Plot()
trades_html = gr.HTML()
trades_file = gr.File(label="Download trades CSV")
run_btn.click(fn=run_backtester,
inputs=[tickers, uploaded, period, interval, short_w, long_w, stop_loss_pct, take_profit_pct, initial_capital, rf_pct],
outputs=[perf_out, eq_plot, trades_html, trades_file, gr.Textbox()])
if __name__ == "__main__":
demo.launch()