AEDI_Tarefa2 / app.py
gilsonab's picture
Upload 3 files
9e23296 verified
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import gradio as gr
from datetime import datetime
import yfinance as yf
TRADING_DAYS = 252
def fetch_prices(tickers, index_symbol, start, end):
data = yf.download(tickers + ([index_symbol] if index_symbol else []),
start=start, end=end, auto_adjust=True, progress=False)
if "Close" not in data:
raise gr.Error("Não foi possível obter dados de fechamento. Verifique os códigos e o período.")
close = data["Close"]
assets = [t for t in tickers if t in close.columns]
prices = close[assets].dropna(how="all")
idx = None
if index_symbol and index_symbol in close.columns:
idx = close[index_symbol].dropna().rename("INDEX")
if idx is not None:
common = prices.dropna().index.intersection(idx.index)
prices = prices.loc[common].dropna()
idx = idx.loc[common]
else:
prices = prices.dropna()
return prices, idx
def simulate_portfolios(asset_returns, risk_free_rate, n_portfolios, seed):
rng = np.random.default_rng(seed)
mu_daily = asset_returns.mean().values
cov_daily = asset_returns.cov().values
mu_annual = mu_daily * TRADING_DAYS
cov_annual = cov_daily * TRADING_DAYS
n_assets = asset_returns.shape[1]
W = rng.dirichlet(np.ones(n_assets), size=n_portfolios) # (n_portfolios, n_assets)
rets = W @ mu_annual # (n_portfolios,)
vols = np.sqrt(np.einsum('ij,jk,ik->i', W, cov_annual, W))
sharpes = (rets - risk_free_rate) / np.where(vols == 0, np.nan, vols)
df = pd.DataFrame({"ret_annual": rets, "vol_annual": vols, "sharpe": sharpes})
weights_df = pd.DataFrame(W, columns=asset_returns.columns)
i_max = int(np.nanargmax(sharpes))
return df, weights_df, i_max, mu_annual, cov_annual
def frontier_approx(portfolios, bins=60):
v = portfolios["vol_annual"].values
r = portfolios["ret_annual"].values
if len(v) == 0:
return np.array([]), np.array([])
edges = np.linspace(np.nanmin(v), np.nanmax(v), bins)
xv, yv = [], []
for i in range(len(edges)-1):
mask = (v >= edges[i]) & (v < edges[i+1])
if np.any(mask):
idx = np.argmax(r[mask])
sel_v = v[mask][idx]
sel_r = r[mask][idx]
xv.append(sel_v); yv.append(sel_r)
return np.array(xv), np.array(yv)
def make_price_fig(prices):
fig = plt.figure()
prices.plot(ax=plt.gca())
plt.title("Preços ajustados — Ativos")
plt.xlabel("Data")
plt.ylabel("Preço")
return fig
def make_rr_fig(portfolios, xv, yv, i_max):
fig = plt.figure()
ax = plt.gca()
ax.scatter(portfolios["vol_annual"], portfolios["ret_annual"], s=4, alpha=0.35)
if len(xv) > 0:
ax.plot(xv, yv, linewidth=2, marker="o")
opt = portfolios.iloc[i_max]
ax.scatter([opt["vol_annual"]],[opt["ret_annual"]], marker="X", s=200)
ax.set_title("Risco × Retorno (com fronteira aproximada)")
ax.set_xlabel("Risco (desvio padrão anual)")
ax.set_ylabel("Retorno esperado anual")
return fig
def make_wealth_fig(asset_returns, weights, initial_capital):
port_daily = asset_returns.values @ weights
wealth = (1.0 + port_daily).cumprod() * initial_capital
ser = pd.Series(wealth, index=asset_returns.index, name="Patrimônio")
fig = plt.figure()
ser.plot(ax=plt.gca())
plt.title("Evolução do patrimônio — Carteira ótima (Sharpe)")
plt.xlabel("Data")
plt.ylabel("Patrimônio (R$)")
return fig, port_daily, ser.iloc[-1]
def var_tables(port_daily, initial_capital):
alpha_levels = [0.95, 0.99]
results = []
for a in alpha_levels:
q = np.percentile(port_daily, (1 - a) * 100)
var_pct = -q
var_money = var_pct * initial_capital
results.append([a, var_pct, var_money])
var_df = pd.DataFrame(results, columns=["Confiança", "VaR_%_1d", "VaR_R$_1d"])
mu_hat = np.mean(port_daily)
sigma_hat = np.std(port_daily, ddof=1) if len(port_daily) > 1 else 0.0
sim = np.random.normal(mu_hat, sigma_hat, size=100000) if sigma_hat > 0 else np.array([mu_hat])
results_mc = []
for a in alpha_levels:
var_pct = -np.percentile(sim, (1 - a) * 100)
var_money = var_pct * initial_capital
results_mc.append([a, var_pct, var_money])
mc_df = pd.DataFrame(results_mc, columns=["Confiança", "VaR_%_1d_MC", "VaR_R$_1d_MC"])
return var_df, mc_df
def run_app(tickers_str, index_symbol, start, end, risk_free_rate, n_portfolios, initial_capital, seed):
tickers = [t.strip() for t in tickers_str.split(",") if t.strip()]
if len(tickers) < 2:
raise gr.Error("Informe ao menos dois tickers separados por vírgula.")
prices, idx = fetch_prices(tickers, index_symbol, start, end)
used = list(prices.columns)
if len(used) < 2:
raise gr.Error("Dados insuficientes para os tickers informados (após limpeza). Ajuste os códigos/período.")
price_fig = make_price_fig(prices)
asset_returns = np.log(prices / prices.shift(1)).dropna()
portfolios, weights_df, i_max, mu_ann, cov_ann = simulate_portfolios(asset_returns, risk_free_rate, n_portfolios, seed)
xv, yv = frontier_approx(portfolios, bins=60)
rr_fig = make_rr_fig(portfolios, xv, yv, i_max)
opt_weights = weights_df.iloc[i_max].copy()
opt_table = pd.DataFrame({"Peso": opt_weights.values}, index=weights_df.columns)
opt_table.loc["TOTAL"] = opt_table["Peso"].sum()
opt = portfolios.iloc[i_max]
metrics = pd.DataFrame({
"Valor": [opt["ret_annual"], opt["vol_annual"], opt["sharpe"], risk_free_rate]
}, index=["Retorno anual", "Risco anual", "Sharpe", "Taxa livre de risco (a.a.)"])
wealth_fig, port_daily, final_wealth = make_wealth_fig(asset_returns, opt_weights.values, initial_capital)
var_df, mc_df = var_tables(port_daily, initial_capital)
info_md = (
f"**Tickers usados:** {', '.join(used)} \n"
f"**Período efetivo:** {asset_returns.index.min().date()}{asset_returns.index.max().date()} \n"
f"**Portfólios simulados:** {n_portfolios:,} \n"
f"**Valor final estimado (carteira ótima):** R$ {final_wealth:,.2f}"
)
return info_md, price_fig, rr_fig, wealth_fig, opt_table, metrics, var_df, mc_df
DEFAULT_TICKERS = "BBAS3.SA, BBSE3.SA, BSLI4.SA, TELB4.SA, CEBR6.SA"
DEFAULT_INDEX = "^BVSP"
DEFAULT_START = "2019-01-01"
with gr.Blocks(title="Simulador de Carteiras — Brasília (B3)") as demo:
gr.Markdown("# Simulador de Carteiras — Empresas com sede em Brasília (B3)")
gr.Markdown(
"Monte **carteiras por Monte Carlo**, visualize o **trade-off risco × retorno**, "
"a **fronteira eficiente (aprox.)**, a **carteira de maior Sharpe** e a **evolução do patrimônio**. "
"Use os tickers padrão (empresas com sede em Brasília) ou informe os seus."
)
with gr.Row():
tickers_in = gr.Textbox(label="Tickers (separados por vírgula)",
value=DEFAULT_TICKERS)
index_in = gr.Textbox(label="Índice (benchmark)", value=DEFAULT_INDEX)
with gr.Row():
start_in = gr.Textbox(label="Data inicial (YYYY-MM-DD)", value=DEFAULT_START)
end_in = gr.Textbox(label="Data final (YYYY-MM-DD ou vazio para hoje)", value="")
with gr.Row():
rf_in = gr.Number(label="Taxa livre de risco (a.a.)", value=0.0)
n_in = gr.Slider(5000, 100000, value=50000, step=1000, label="Número de portfólios (Monte Carlo)")
with gr.Row():
capital_in = gr.Number(label="Capital inicial (R$)", value=35000.0)
seed_in = gr.Number(label="Semente aleatória", value=42)
run_btn = gr.Button("Executar Simulação", variant="primary")
info_out = gr.Markdown()
price_plot = gr.Plot()
rr_plot = gr.Plot()
wealth_plot = gr.Plot()
with gr.Row():
weights_out = gr.Dataframe(label="Pesos — Carteira de maior Sharpe", interactive=False)
metrics_out = gr.Dataframe(label="Métricas — Carteira de maior Sharpe", interactive=False)
with gr.Row():
var_out = gr.Dataframe(label="VaR histórico (1 dia)", interactive=False)
mcvar_out = gr.Dataframe(label="VaR por simulação (1 dia, normal)", interactive=False)
def _run(tickers, index_symbol, start, end, rf, nport, capital, seed):
end = None if (end is None or str(end).strip() == "") else str(end).strip()
return run_app(tickers, index_symbol, str(start).strip(), end, float(rf), int(nport), float(capital), int(seed))
run_btn.click(
_run,
inputs=[tickers_in, index_in, start_in, end_in, rf_in, n_in, capital_in, seed_in],
outputs=[info_out, price_plot, rr_plot, wealth_plot, weights_out, metrics_out, var_out, mcvar_out]
)
if __name__ == "__main__":
demo.launch()