import io import json import time import numpy as np import pandas as pd import plotly.graph_objects as go import plotly.express as px import gradio as gr from datetime import datetime from typing import Dict, List, Tuple, Optional # SciPy é usado para a fronteira eficiente from scipy.optimize import minimize try: import yfinance as yf YF_AVAILABLE = True except Exception: YF_AVAILABLE = False # ========================= # Utilidades de dados # ========================= def _ensure_datetime_index(df: pd.DataFrame) -> pd.DataFrame: if "Date" in df.columns: df["Date"] = pd.to_datetime(df["Date"]) df = df.set_index("Date").sort_index() elif df.index.name is None or not np.issubdtype(df.index.dtype, np.datetime64): # tentativa de converter primeiro campo em data try: df.index = pd.to_datetime(df.index) df = df.sort_index() except Exception: pass return df def _pct_returns(prices: pd.DataFrame, log=False) -> pd.DataFrame: if log: rets = np.log(prices / prices.shift(1)).dropna(how="all") else: rets = prices.pct_change().dropna(how="all") return rets def _summary_stats(rets: pd.DataFrame, freq: str = "D") -> pd.DataFrame: """ freq: 'D' diário (~252), 'W' semanal (~52), 'M' mensal (~12) """ ann = {"D": 252, "W": 52, "M": 12}[freq] mu = rets.mean() * ann cov = rets.cov() * ann vol = np.sqrt(np.diag(cov)) sharpe = mu / vol df = pd.DataFrame({"mu_ann": mu, "vol_ann": vol, "sharpe": sharpe}).sort_values("sharpe", ascending=False) return df def _fetch_yf(tickers: List[str], start: str, end: str, interval: str) -> pd.DataFrame: if not YF_AVAILABLE: raise RuntimeError("yfinance não está disponível neste Space. Use CSV de preços.") data = yf.download(tickers, start=start, end=end, interval=interval, auto_adjust=True, progress=False)["Close"] if isinstance(data, pd.Series): data = data.to_frame() data = data.dropna(how="all") data.columns = [c.upper() for c in data.columns] return data # ========================= # Monte Carlo e Otimização # ========================= def simulate_portfolios( rets: pd.DataFrame, n_sims: int = 10_000, rf: float = 0.0, shorting: bool = False, w_min: float = 0.0, w_max: float = 1.0, seed: Optional[int] = 42, freq: str = "D", ): rng = np.random.default_rng(seed) ann = {"D": 252, "W": 52, "M": 12}[freq] mu = rets.mean().values * ann cov = rets.cov().values * ann names = list(rets.columns) n = len(names) all_w = [] all_mu = [] all_vol = [] all_sh = [] for _ in range(n_sims): if shorting: # pesos com soma = 1, permitindo negativos mas cortando por min/max w = rng.normal(size=n) w = w / np.sum(np.abs(w)) # normalização simples else: w = rng.random(size=n) w = w / w.sum() w = np.clip(w, w_min, w_max) if w.sum() == 0: continue w = w / w.sum() port_mu = float(np.dot(w, mu)) port_vol = float(np.sqrt(w @ cov @ w)) port_sh = (port_mu - rf) / port_vol if port_vol > 0 else np.nan all_w.append(w) all_mu.append(port_mu) all_vol.append(port_vol) all_sh.append(port_sh) df = pd.DataFrame( { "ret_ann": all_mu, "vol_ann": all_vol, "sharpe": all_sh, "weights": [dict(zip(names, w)) for w in all_w], } ) df = df.dropna(subset=["sharpe"]) return df.sort_values("sharpe", ascending=False), names def _min_variance(mu, cov, bounds): n = len(mu) cons = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1},) x0 = np.repeat(1/n, n) res = minimize(lambda w: w @ cov @ w, x0=x0, method="SLSQP", bounds=bounds, constraints=cons) return res def _max_sharpe(mu, cov, rf, bounds): n = len(mu) cons = ({'type': 'eq', 'fun': lambda w: np.sum(w) - 1},) x0 = np.repeat(1/n, n) def neg_sh(w): ret = w @ mu vol = np.sqrt(w @ cov @ w) return - (ret - rf) / vol res = minimize(neg_sh, x0=x0, method="SLSQP", bounds=bounds, constraints=cons) return res def efficient_frontier(rets: pd.DataFrame, rf: float = 0.0, steps: int = 50, shorting: bool = False, w_min: float = 0.0, w_max: float = 1.0, freq="D"): ann = {"D": 252, "W": 52, "M": 12}[freq] mu = rets.mean().values * ann cov = rets.cov().values * ann names = list(rets.columns) n = len(names) bounds = [(-1.0, 1.0) if shorting else (w_min, w_max)] * n # min var e max sharpe res_minv = _min_variance(mu, cov, bounds) res_msr = _max_sharpe(mu, cov, rf, bounds) # traçar alvos de retorno entre min e max ret_min = res_minv.x @ mu ret_max = max(mu) * 1.2 target_rets = np.linspace(ret_min, ret_max, steps) ef_points = [] for tr in target_rets: cons = ( {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}, {'type': 'eq', 'fun': lambda w, tr=tr: w @ mu - tr}, ) x0 = np.repeat(1/n, n) res = minimize(lambda w: w @ cov @ w, x0=x0, method="SLSQP", bounds=bounds, constraints=cons) if res.success: w = res.x ret = w @ mu vol = np.sqrt(w @ cov @ w) ef_points.append((ret, vol, dict(zip(names, w)))) ef_df = pd.DataFrame(ef_points, columns=["ret_ann", "vol_ann", "weights"]) return ef_df, dict(min_variance=res_minv, max_sharpe=res_msr, names=names, mu=mu, cov=cov) # ========================= # Visualizações # ========================= def plot_montecarlo(df_mc: pd.DataFrame, highlight_best=True): fig = px.scatter( df_mc, x="vol_ann", y="ret_ann", color="sharpe", color_continuous_scale="Turbo", hover_data=["sharpe"], labels={"vol_ann":"Volatilidade (anual)", "ret_ann":"Retorno (anual)", "sharpe":"Sharpe"} ) if highlight_best and not df_mc.empty: best = df_mc.iloc[0] fig.add_trace(go.Scatter( x=[best["vol_ann"]], y=[best["ret_ann"]], mode="markers+text", text=["Máx. Sharpe"], textposition="top center", marker=dict(size=14, symbol="star"), name="Máx. Sharpe" )) fig.update_layout(template="plotly_white", height=520) return fig def plot_frontier(df_mc: pd.DataFrame, ef_df: pd.DataFrame, extras: dict, rf: float): fig = go.Figure() if not df_mc.empty: fig.add_trace(go.Scatter( x=df_mc["vol_ann"], y=df_mc["ret_ann"], mode="markers", name="Simulações", marker=dict(size=6, color=df_mc["sharpe"], colorscale="Turbo", showscale=True), text=[f"Sharpe={s:.2f}" for s in df_mc["sharpe"]] )) if ef_df is not None and not ef_df.empty: fig.add_trace(go.Scatter( x=ef_df["vol_ann"], y=ef_df["ret_ann"], mode="lines+markers", name="Fronteira Eficiente", line=dict(width=3) )) # ponto de Máx. Sharpe res_msr = extras.get("max_sharpe") if res_msr is not None and hasattr(res_msr, "x"): w = res_msr.x mu = extras["mu"]; cov = extras["cov"] ret = float(w @ mu); vol = float(np.sqrt(w @ cov @ w)) fig.add_trace(go.Scatter( x=[vol], y=[ret], mode="markers+text", text=["Máx. Sharpe"], textposition="bottom center", marker=dict(size=14, symbol="star", color="black"), name="Máx. Sharpe (ótimo)" )) # ponto de Mín. Variância res_minv = extras.get("min_variance") if res_minv is not None and hasattr(res_minv, "x"): w = res_minv.x mu = extras["mu"]; cov = extras["cov"] ret = float(w @ mu); vol = float(np.sqrt(w @ cov @ w)) fig.add_trace(go.Scatter( x=[vol], y=[ret], mode="markers+text", text=["Mín. Variância"], textposition="top center", marker=dict(size=12, symbol="diamond", color="green"), name="Mín. Variância" )) # Capital Market Line (se quiser, usa rf e ponto MSR) if res_msr is not None and hasattr(res_msr, "x"): msr_w = res_msr.x mu = extras["mu"]; cov = extras["cov"] msr_ret = float(msr_w @ mu); msr_vol = float(np.sqrt(msr_w @ cov @ msr_w)) x_line = np.linspace(0, max(ef_df["vol_ann"].max(), msr_vol)*1.2, 50) slope = (msr_ret - rf) / msr_vol if msr_vol > 0 else 0 y_line = rf + slope * x_line fig.add_trace(go.Scatter(x=x_line, y=y_line, mode="lines", name="CML (rf)", line=dict(dash="dot"))) fig.update_layout( template="plotly_white", height=560, xaxis_title="Volatilidade (anual)", yaxis_title="Retorno (anual)" ) return fig # ========================= # Pipeline do app # ========================= STATE = { "prices": None, "returns": None, "freq": "D", "last_sim": None, "ef": None, "ef_extras": None, } HELP_TEXT = """ """ def load_csv(file: gr.File) -> str: if file is None: return "Envie um CSV primeiro." df = pd.read_csv(file.name) df = _ensure_datetime_index(df) df = df.dropna(how="all").sort_index() STATE["prices"] = df STATE["returns"] = None return f"CSV carregado com shape {df.shape}. Colunas: {list(df.columns)[:8]}{'...' if df.shape[1]>8 else ''}" def fetch_data(tickers: str, start: str, end: str, interval: str) -> str: if not tickers: return "Informe tickers separados por vírgula." try: df = _fetch_yf([t.strip() for t in tickers.split(",") if t.strip()], start, end, interval) except Exception as e: return f"Erro ao baixar: {e}" STATE["prices"] = df STATE["returns"] = None return f"Baixado com sucesso: shape {df.shape}. Período: {df.index.min().date()} – {df.index.max().date()}." def prepare(freq: str, logret: bool): if STATE["prices"] is None: return "Carregue ou baixe dados na aba Dados.", None, None prices = STATE["prices"].copy() rets = _pct_returns(prices, log=logret).dropna(how="all") # escolher frequência de referência para anualização STATE["returns"] = rets STATE["freq"] = freq stats_df = _summary_stats(rets, freq=freq) head = prices.tail(5) return f"Retornos prontos. {rets.shape[0]} períodos x {rets.shape[1]} ativos.", stats_df, head def run_mc(n_sims, rf, shorting, wmin, wmax, seed): if STATE["returns"] is None: return "Prepare os dados primeiro (aba Dados).", None df_mc, names = simulate_portfolios( STATE["returns"], n_sims=n_sims, rf=rf, shorting=shorting, w_min=wmin, w_max=wmax, seed=seed, freq=STATE["freq"] ) STATE["last_sim"] = df_mc fig = plot_montecarlo(df_mc) return f"{len(df_mc)} carteiras simuladas.", fig def run_frontier(rf, steps, shorting, wmin, wmax): if STATE["returns"] is None: return "Prepare os dados primeiro.", None ef_df, extras = efficient_frontier( STATE["returns"], rf=rf, steps=steps, shorting=shorting, w_min=wmin, w_max=wmax, freq=STATE["freq"] ) STATE["ef"] = ef_df STATE["ef_extras"] = extras fig = plot_frontier(STATE["last_sim"] if STATE["last_sim"] is not None else pd.DataFrame(), ef_df, extras, rf) return f"Fronteira com {len(ef_df)} pontos.", fig def download_csv(kind: str): if kind == "prices" and STATE["prices"] is not None: return STATE["prices"].to_csv().encode(), "prices.csv" if kind == "returns" and STATE["returns"] is not None: return STATE["returns"].to_csv().encode(), "returns.csv" if kind == "mc" and STATE["last_sim"] is not None: return STATE["last_sim"].to_csv(index=False).encode(), "montecarlo.csv" if kind == "ef" and STATE["ef"] is not None: return STATE["ef"].to_csv(index=False).encode(), "efficient_frontier.csv" return None, None def backtest(weights_json: str): if STATE["returns"] is None: return "Prepare os dados primeiro.", None, None try: w_map = json.loads(weights_json) except Exception as e: return f"JSON inválido: {e}", None, None # normalizar ordem de colunas cols = list(STATE["returns"].columns) w = np.array([w_map.get(c, 0.0) for c in cols], dtype=float) if np.sum(w) == 0: return "Pesos somam 0. Informe ao menos um ativo > 0.", None, None w = w / np.sum(w) # curva de retorno port_rets = (STATE["returns"] @ w).fillna(0.0) port_curve = (1 + port_rets).cumprod() bench = (1 + STATE["returns"].mean(axis=1)).cumprod() # benchmark simples: média aritmética diária df_bt = pd.DataFrame({ "Curve": port_curve, "Benchmark_eq_mean": bench }) fig = go.Figure() fig.add_trace(go.Scatter(x=df_bt.index, y=df_bt["Curve"], name="Carteira", mode="lines")) fig.add_trace(go.Scatter(x=df_bt.index, y=df_bt["Benchmark_eq_mean"], name="Benchmark (média eq.)", mode="lines")) fig.update_layout(template="plotly_white", height=500, title="Backtest – Curva de Valor (base=1)") # métricas ann = {"D": 252, "W": 52, "M": 12}[STATE["freq"]] mu = port_rets.mean() * ann vol = port_rets.std(ddof=1) * np.sqrt(ann) sharpe = mu / vol if vol > 0 else np.nan metrics = pd.DataFrame({"ret_ann":[mu], "vol_ann":[vol], "sharpe":[sharpe]}) return "Backtest concluído.", fig, metrics with gr.Blocks(title="Simulador de Carteiras – ANOVA Project") as demo: gr.Markdown("# Simulador de Carteiras\n" + HELP_TEXT) with gr.Tabs(): with gr.Tab("Dados"): gr.Markdown("### Carregar CSV **ou** Baixar do Yahoo Finance") with gr.Row(): csv_in = gr.File(label="CSV de preços (Date + colunas de ativos)") btn_csv = gr.Button("Carregar CSV") out_csv = gr.Textbox(label="Status") btn_csv.click(load_csv, inputs=csv_in, outputs=out_csv) gr.Markdown("**OU**") with gr.Row(): tickers = gr.Textbox(value="AAPL, MSFT, SPY", label="Tickers (separados por vírgula)") start = gr.Textbox(value="2018-01-01", label="Início (AAAA-MM-DD)") end = gr.Textbox(value=str(datetime.today().date()), label="Fim (AAAA-MM-DD)") interval = gr.Dropdown(["1d","1wk","1mo"], value="1d", label="Intervalo") btn_yf = gr.Button("Baixar do Yahoo Finance") out_yf = gr.Textbox(label="Status") btn_yf.click(fetch_data, inputs=[tickers, start, end, interval], outputs=out_yf) gr.Markdown("### Preparar dados") with gr.Row(): freq = gr.Radio(choices=["D","W","M"], value="D", label="Frequência de referência") logret = gr.Checkbox(value=False, label="Usar retornos log?") btn_prep = gr.Button("Preparar Dados") msg_prep = gr.Textbox(label="Status") tbl_stats = gr.Dataframe(label="Estatísticas (anualizadas)", wrap=True) tail_prices = gr.Dataframe(label="Últimas linhas dos preços") btn_prep.click(prepare, inputs=[freq, logret], outputs=[msg_prep, tbl_stats, tail_prices]) with gr.Tab("Simulações"): gr.Markdown("### Monte Carlo de carteiras") with gr.Row(): n_sims = gr.Slider(1000, 50000, value=10000, step=1000, label="Nº de simulações") rf = gr.Number(value=0.02, label="Taxa livre de risco (a.a.)") with gr.Row(): shorting = gr.Checkbox(value=False, label="Permitir short?") wmin = gr.Slider(0.0, 1.0, value=0.0, step=0.01, label="Peso mínimo") wmax = gr.Slider(0.0, 1.0, value=1.0, step=0.01, label="Peso máximo") seed = gr.Number(value=42, label="Seed") btn_mc = gr.Button("Rodar Monte Carlo") out_mc_msg = gr.Textbox(label="Status") fig_mc = gr.Plot(label="Resultados (retorno x vol; cor = Sharpe)") btn_mc.click(run_mc, inputs=[n_sims, rf, shorting, wmin, wmax, seed], outputs=[out_mc_msg, fig_mc]) with gr.Tab("Fronteira Eficiente"): gr.Markdown("### Cálculo da fronteira eficiente") with gr.Row(): rf2 = gr.Number(value=0.02, label="Taxa livre de risco (a.a.)") steps = gr.Slider(10, 150, value=50, step=5, label="Pontos na fronteira") shorting2 = gr.Checkbox(value=False, label="Permitir short?") wmin2 = gr.Slider(0.0, 1.0, value=0.0, step=0.01, label="Peso min") wmax2 = gr.Slider(0.0, 1.0, value=1.0, step=0.01, label="Peso máx") btn_ef = gr.Button("Traçar fronteira") out_ef_msg = gr.Textbox(label="Status") fig_ef = gr.Plot(label="Fronteira Eficiente + Pontos Ótimos") btn_ef.click(run_frontier, inputs=[rf2, steps, shorting2, wmin2, wmax2], outputs=[out_ef_msg, fig_ef]) with gr.Tab("Backtest / Download"): gr.Markdown("### Backtest de uma carteira específica") sample_json = gr.Code(value='{"AAPL": 0.4, "MSFT": 0.4, "SPY": 0.2}', label="Pesos (JSON)") btn_bt = gr.Button("Rodar Backtest") bt_status = gr.Textbox(label="Status") bt_plot = gr.Plot(label="Curva de Valor (base 1)") bt_metrics = gr.Dataframe(label="Métricas (anualizadas)") btn_bt.click(backtest, inputs=sample_json, outputs=[bt_status, bt_plot, bt_metrics]) gr.Markdown("### Baixar dados") kind = gr.Dropdown( choices=[("Preços","prices"),("Retornos","returns"),("Monte Carlo","mc"),("Fronteira","ef")], value="prices", label="Escolha" ) btn_dl = gr.Button("Gerar CSV") file_out = gr.File(label="Clique para baixar") def _dl(kind): blob, name = download_csv(kind) if blob is None: return None return (name, blob) btn_dl.click(_dl, inputs=kind, outputs=file_out) gr.Markdown("") if __name__ == "__main__": demo.launch()