Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import gradio as gr | |
| from datetime import datetime | |
| import yfinance as yf | |
| TRADING_DAYS = 252 | |
| def fetch_prices(tickers, index_symbol, start, end): | |
| data = yf.download(tickers + ([index_symbol] if index_symbol else []), | |
| start=start, end=end, auto_adjust=True, progress=False) | |
| if "Close" not in data: | |
| raise gr.Error("Não foi possível obter dados de fechamento. Verifique os códigos e o período.") | |
| close = data["Close"] | |
| assets = [t for t in tickers if t in close.columns] | |
| prices = close[assets].dropna(how="all") | |
| idx = None | |
| if index_symbol and index_symbol in close.columns: | |
| idx = close[index_symbol].dropna().rename("INDEX") | |
| if idx is not None: | |
| common = prices.dropna().index.intersection(idx.index) | |
| prices = prices.loc[common].dropna() | |
| idx = idx.loc[common] | |
| else: | |
| prices = prices.dropna() | |
| return prices, idx | |
| def simulate_portfolios(asset_returns, risk_free_rate, n_portfolios, seed): | |
| rng = np.random.default_rng(seed) | |
| mu_daily = asset_returns.mean().values | |
| cov_daily = asset_returns.cov().values | |
| mu_annual = mu_daily * TRADING_DAYS | |
| cov_annual = cov_daily * TRADING_DAYS | |
| n_assets = asset_returns.shape[1] | |
| W = rng.dirichlet(np.ones(n_assets), size=n_portfolios) # (n_portfolios, n_assets) | |
| rets = W @ mu_annual # (n_portfolios,) | |
| vols = np.sqrt(np.einsum('ij,jk,ik->i', W, cov_annual, W)) | |
| sharpes = (rets - risk_free_rate) / np.where(vols == 0, np.nan, vols) | |
| df = pd.DataFrame({"ret_annual": rets, "vol_annual": vols, "sharpe": sharpes}) | |
| weights_df = pd.DataFrame(W, columns=asset_returns.columns) | |
| i_max = int(np.nanargmax(sharpes)) | |
| return df, weights_df, i_max, mu_annual, cov_annual | |
| def frontier_approx(portfolios, bins=60): | |
| v = portfolios["vol_annual"].values | |
| r = portfolios["ret_annual"].values | |
| if len(v) == 0: | |
| return np.array([]), np.array([]) | |
| edges = np.linspace(np.nanmin(v), np.nanmax(v), bins) | |
| xv, yv = [], [] | |
| for i in range(len(edges)-1): | |
| mask = (v >= edges[i]) & (v < edges[i+1]) | |
| if np.any(mask): | |
| idx = np.argmax(r[mask]) | |
| sel_v = v[mask][idx] | |
| sel_r = r[mask][idx] | |
| xv.append(sel_v); yv.append(sel_r) | |
| return np.array(xv), np.array(yv) | |
| def make_price_fig(prices): | |
| fig = plt.figure() | |
| prices.plot(ax=plt.gca()) | |
| plt.title("Preços ajustados — Ativos") | |
| plt.xlabel("Data") | |
| plt.ylabel("Preço") | |
| return fig | |
| def make_rr_fig(portfolios, xv, yv, i_max): | |
| fig = plt.figure() | |
| ax = plt.gca() | |
| ax.scatter(portfolios["vol_annual"], portfolios["ret_annual"], s=4, alpha=0.35) | |
| if len(xv) > 0: | |
| ax.plot(xv, yv, linewidth=2, marker="o") | |
| opt = portfolios.iloc[i_max] | |
| ax.scatter([opt["vol_annual"]],[opt["ret_annual"]], marker="X", s=200) | |
| ax.set_title("Risco × Retorno (com fronteira aproximada)") | |
| ax.set_xlabel("Risco (desvio padrão anual)") | |
| ax.set_ylabel("Retorno esperado anual") | |
| return fig | |
| def make_wealth_fig(asset_returns, weights, initial_capital): | |
| port_daily = asset_returns.values @ weights | |
| wealth = (1.0 + port_daily).cumprod() * initial_capital | |
| ser = pd.Series(wealth, index=asset_returns.index, name="Patrimônio") | |
| fig = plt.figure() | |
| ser.plot(ax=plt.gca()) | |
| plt.title("Evolução do patrimônio — Carteira ótima (Sharpe)") | |
| plt.xlabel("Data") | |
| plt.ylabel("Patrimônio (R$)") | |
| return fig, port_daily, ser.iloc[-1] | |
| def var_tables(port_daily, initial_capital): | |
| alpha_levels = [0.95, 0.99] | |
| results = [] | |
| for a in alpha_levels: | |
| q = np.percentile(port_daily, (1 - a) * 100) | |
| var_pct = -q | |
| var_money = var_pct * initial_capital | |
| results.append([a, var_pct, var_money]) | |
| var_df = pd.DataFrame(results, columns=["Confiança", "VaR_%_1d", "VaR_R$_1d"]) | |
| mu_hat = np.mean(port_daily) | |
| sigma_hat = np.std(port_daily, ddof=1) if len(port_daily) > 1 else 0.0 | |
| sim = np.random.normal(mu_hat, sigma_hat, size=100000) if sigma_hat > 0 else np.array([mu_hat]) | |
| results_mc = [] | |
| for a in alpha_levels: | |
| var_pct = -np.percentile(sim, (1 - a) * 100) | |
| var_money = var_pct * initial_capital | |
| results_mc.append([a, var_pct, var_money]) | |
| mc_df = pd.DataFrame(results_mc, columns=["Confiança", "VaR_%_1d_MC", "VaR_R$_1d_MC"]) | |
| return var_df, mc_df | |
| def run_app(tickers_str, index_symbol, start, end, risk_free_rate, n_portfolios, initial_capital, seed): | |
| tickers = [t.strip() for t in tickers_str.split(",") if t.strip()] | |
| if len(tickers) < 2: | |
| raise gr.Error("Informe ao menos dois tickers separados por vírgula.") | |
| prices, idx = fetch_prices(tickers, index_symbol, start, end) | |
| used = list(prices.columns) | |
| if len(used) < 2: | |
| raise gr.Error("Dados insuficientes para os tickers informados (após limpeza). Ajuste os códigos/período.") | |
| price_fig = make_price_fig(prices) | |
| asset_returns = np.log(prices / prices.shift(1)).dropna() | |
| portfolios, weights_df, i_max, mu_ann, cov_ann = simulate_portfolios(asset_returns, risk_free_rate, n_portfolios, seed) | |
| xv, yv = frontier_approx(portfolios, bins=60) | |
| rr_fig = make_rr_fig(portfolios, xv, yv, i_max) | |
| opt_weights = weights_df.iloc[i_max].copy() | |
| opt_table = pd.DataFrame({"Peso": opt_weights.values}, index=weights_df.columns) | |
| opt_table.loc["TOTAL"] = opt_table["Peso"].sum() | |
| opt = portfolios.iloc[i_max] | |
| metrics = pd.DataFrame({ | |
| "Valor": [opt["ret_annual"], opt["vol_annual"], opt["sharpe"], risk_free_rate] | |
| }, index=["Retorno anual", "Risco anual", "Sharpe", "Taxa livre de risco (a.a.)"]) | |
| wealth_fig, port_daily, final_wealth = make_wealth_fig(asset_returns, opt_weights.values, initial_capital) | |
| var_df, mc_df = var_tables(port_daily, initial_capital) | |
| info_md = ( | |
| f"**Tickers usados:** {', '.join(used)} \n" | |
| f"**Período efetivo:** {asset_returns.index.min().date()} → {asset_returns.index.max().date()} \n" | |
| f"**Portfólios simulados:** {n_portfolios:,} \n" | |
| f"**Valor final estimado (carteira ótima):** R$ {final_wealth:,.2f}" | |
| ) | |
| return info_md, price_fig, rr_fig, wealth_fig, opt_table, metrics, var_df, mc_df | |
| DEFAULT_TICKERS = "BBAS3.SA, BBSE3.SA, BSLI4.SA, TELB4.SA, CEBR6.SA" | |
| DEFAULT_INDEX = "^BVSP" | |
| DEFAULT_START = "2019-01-01" | |
| with gr.Blocks(title="Simulador de Carteiras — Brasília (B3)") as demo: | |
| gr.Markdown("# Simulador de Carteiras — Empresas com sede em Brasília (B3)") | |
| gr.Markdown( | |
| "Monte **carteiras por Monte Carlo**, visualize o **trade-off risco × retorno**, " | |
| "a **fronteira eficiente (aprox.)**, a **carteira de maior Sharpe** e a **evolução do patrimônio**. " | |
| "Use os tickers padrão (empresas com sede em Brasília) ou informe os seus." | |
| ) | |
| with gr.Row(): | |
| tickers_in = gr.Textbox(label="Tickers (separados por vírgula)", | |
| value=DEFAULT_TICKERS) | |
| index_in = gr.Textbox(label="Índice (benchmark)", value=DEFAULT_INDEX) | |
| with gr.Row(): | |
| start_in = gr.Textbox(label="Data inicial (YYYY-MM-DD)", value=DEFAULT_START) | |
| end_in = gr.Textbox(label="Data final (YYYY-MM-DD ou vazio para hoje)", value="") | |
| with gr.Row(): | |
| rf_in = gr.Number(label="Taxa livre de risco (a.a.)", value=0.0) | |
| n_in = gr.Slider(5000, 100000, value=50000, step=1000, label="Número de portfólios (Monte Carlo)") | |
| with gr.Row(): | |
| capital_in = gr.Number(label="Capital inicial (R$)", value=35000.0) | |
| seed_in = gr.Number(label="Semente aleatória", value=42) | |
| run_btn = gr.Button("Executar Simulação", variant="primary") | |
| info_out = gr.Markdown() | |
| price_plot = gr.Plot() | |
| rr_plot = gr.Plot() | |
| wealth_plot = gr.Plot() | |
| with gr.Row(): | |
| weights_out = gr.Dataframe(label="Pesos — Carteira de maior Sharpe", interactive=False) | |
| metrics_out = gr.Dataframe(label="Métricas — Carteira de maior Sharpe", interactive=False) | |
| with gr.Row(): | |
| var_out = gr.Dataframe(label="VaR histórico (1 dia)", interactive=False) | |
| mcvar_out = gr.Dataframe(label="VaR por simulação (1 dia, normal)", interactive=False) | |
| def _run(tickers, index_symbol, start, end, rf, nport, capital, seed): | |
| end = None if (end is None or str(end).strip() == "") else str(end).strip() | |
| return run_app(tickers, index_symbol, str(start).strip(), end, float(rf), int(nport), float(capital), int(seed)) | |
| run_btn.click( | |
| _run, | |
| inputs=[tickers_in, index_in, start_in, end_in, rf_in, n_in, capital_in, seed_in], | |
| outputs=[info_out, price_plot, rr_plot, wealth_plot, weights_out, metrics_out, var_out, mcvar_out] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |