import numpy as np import pandas as pd import statsmodels.api as sm from collections import OrderedDict import datetime from alphacast import Alphacast import gradio as gr import plotly.graph_objects as go import matplotlib.pyplot as plt import seaborn as sns import os key = os.getenv('key_alphacast') alphacast = Alphacast(key) dataset = alphacast.datasets.dataset(5664) df = dataset.download_data(format = "pandas", startDate=None, endDate=None, filterVariables = [], filterEntities = {}) data = df[["country","Date","Real Consumption at constant 2017 national prices (In mil. 2017US$)", "Average annual hours worked by persons engaged", "Capital Stock at constant 2017 national prices (In mil. 2017US$)", "Population (In millions)"]] data = data.rename(columns={ "Real Consumption at constant 2017 national prices (In mil. 2017US$)": "C", "Average annual hours worked by persons engaged": "L", "Capital Stock at constant 2017 national prices (In mil. 2017US$)": "I", "Population (In millions)": "N" }) paises = ["Argentina", "Brazil", "Chile", "Uruguay", "Colombia"] subdatasets = {} rbc_data = {} for pais in paises: df_pais = data[data['country'] == pais] df_pais = df_pais.rename(columns = {"Date":"DATE"}) df_pais.set_index('DATE', inplace=True) df_pais = df_pais[df_pais.index >= '1990-01-01'] N = df_pais['N'] C = df_pais['C'] / N I = df_pais['I'] / N L = df_pais['L'] Y = C + I y = np.log(Y).diff()[1:] c = np.log(C).diff()[1:] n = np.log(L).diff()[1:] rbc_data[pais] = pd.concat((y, n, c), axis=1) rbc_data[pais].columns = ['output', 'labor', 'consumption'] data_2 = df[["country","Date","Real GDP at constant 2017 national prices (In mil. 2017US$)","Real Consumption at constant 2017 national prices (In mil. 2017US$)", "Average annual hours worked by persons engaged", "Capital Stock at constant 2017 national prices (In mil. 2017US$)", "Population (In millions)"]] data_2= data_2.rename(columns={"Real GDP at constant 2017 national prices (In mil. 2017US$)":"Y", "Real Consumption at constant 2017 national prices (In mil. 2017US$)": "C", "Average annual hours worked by persons engaged": "L", "Capital Stock at constant 2017 national prices (In mil. 2017US$)": "I", "Population (In millions)": "N" }) subdatasets_2 = {} rbc_data_2 = {} for pais in paises: df_pais = data_2[data_2['country'] == pais] df_pais = df_pais.rename(columns = {"Date":"DATE"}) df_pais.set_index('DATE', inplace=True) df_pais = df_pais[df_pais.index >= '1990-01-01'] N = df_pais['N'] Y = df_pais["Y"] / N C = df_pais['C'] / N I = df_pais['I'] / N L = df_pais['L'] y = np.log(Y).diff()[1:] c = np.log(C).diff()[1:] n = np.log(L).diff()[1:] rbc_data_2[pais] = pd.concat((y, n, c), axis=1) rbc_data_2[pais].columns = ['output', 'labor', 'consumption'] def generar_grafico_pais(pais): df_pais = rbc_data[pais] fig = go.Figure() fig.add_trace(go.Scatter(x=df_pais.index, y=df_pais['output'], mode='lines+markers', name='Output (y)', marker=dict(symbol='circle'))) fig.add_trace(go.Scatter(x=df_pais.index, y=df_pais['labor'], mode='lines+markers', name='Labor (n)', marker=dict(symbol='x'))) fig.add_trace(go.Scatter(x=df_pais.index, y=df_pais['consumption'], mode='lines+markers', name='Consumption (c)', marker=dict(symbol='square'))) fig.update_layout(title=f'Producción, Trabajo, y Consumo para {pais} (1991 - 2019)', xaxis_title='Fecha', yaxis_title='Log Differences', legend_title='Variables', template='plotly_dark') fig.show() class SimpleRBC(sm.tsa.statespace.MLEModel): parameters = OrderedDict([ ('discount_rate', 0.95), ('disutility_labor', 3.), ('depreciation_rate', 0.025), ('capital_share', 0.36), ('technology_shock_persistence', 0.85), ('technology_shock_var', 0.04**2) ]) def __init__(self, endog, calibrated=None): super(SimpleRBC, self).__init__( endog, k_states=2, k_posdef=1, initialization='stationary') self.k_predetermined = 1 parameters = list(self.parameters.keys()) calibrated = calibrated or {} self.calibrated = OrderedDict([ (param, calibrated[param]) for param in parameters if param in calibrated ]) self.idx_calibrated = np.array([ param in self.calibrated for param in parameters]) self.idx_estimated = ~self.idx_calibrated self.k_params = len(self.parameters) self.k_calibrated = len(self.calibrated) self.k_estimated = self.k_params - self.k_calibrated self.idx_cap_share = parameters.index('capital_share') self.idx_tech_pers = parameters.index('technology_shock_persistence') self.idx_tech_var = parameters.index('technology_shock_var') self['selection', 1, 0] = 1 @property def start_params(self): structural_params = np.array(list(self.parameters.values()))[self.idx_estimated] measurement_variances = [0.1] * 3 return np.r_[structural_params, measurement_variances] @property def param_names(self): structural_params = np.array(list(self.parameters.keys()))[self.idx_estimated] measurement_variances = ['%s.var' % name for name in self.endog_names] return structural_params.tolist() + measurement_variances def log_linearize(self, params): (discount_rate, disutility_labor, depreciation_rate, capital_share, technology_shock_persistence, technology_shock_var) = params tmp = (1. / discount_rate - (1. - depreciation_rate)) theta = (capital_share / tmp)**(1. / (1. - capital_share)) gamma = 1. - depreciation_rate * theta**(1. - capital_share) zeta = capital_share * discount_rate * theta**(capital_share - 1) A = np.eye(2) B11 = 1 + depreciation_rate * (gamma / (1 - gamma)) B12 = (-depreciation_rate * (1 - capital_share + gamma * capital_share) / (capital_share * (1 - gamma))) B21 = 0 B22 = capital_share / (zeta + capital_share*(1 - zeta)) B = np.array([[B11, B12], [B21, B22]]) C1 = depreciation_rate / (capital_share * (1 - gamma)) C2 = (zeta * technology_shock_persistence / (zeta + capital_share*(1 - zeta))) C = np.array([[C1], [C2]]) return A, B, C def solve(self, params): capital_share = params[self.idx_cap_share] technology_shock_persistence = params[self.idx_tech_pers] A, B, C = self.log_linearize(params) eigvals, right_eigvecs = np.linalg.eig(np.transpose(B)) left_eigvecs = np.transpose(right_eigvecs) idx = np.argsort(eigvals) eigvals = np.diag(eigvals[idx]) left_eigvecs = left_eigvecs[idx, :] k_nonpredetermined = self.k_states - self.k_predetermined k_stable = len(np.where(eigvals.diagonal() < 1)[0]) k_unstable = self.k_states - k_stable if not k_stable == self.k_predetermined: raise RuntimeError('Blanchard-Kahn condition not met.' ' Unique solution does not exist.') k = self.k_predetermined p1 = np.s_[:k] p2 = np.s_[k:] p11 = np.s_[:k, :k] p12 = np.s_[:k, k:] p21 = np.s_[k:, :k] p22 = np.s_[k:, k:] decoupled_C = np.dot(left_eigvecs, C) tmp = np.linalg.inv(left_eigvecs[p22]) policy_state = - np.dot(tmp, left_eigvecs[p21]).squeeze() policy_shock = -( np.dot(tmp, 1. / eigvals[p22]).dot( np.linalg.inv( np.eye(k_nonpredetermined) - technology_shock_persistence / eigvals[p22] ) ).dot(decoupled_C[p2]) ).squeeze() transition_state = np.squeeze(B[p11] + np.dot(B[p12], policy_state)) transition_shock = np.squeeze(np.dot(B[p12], policy_shock) + C[p1]) tmp = (1 - capital_share) / capital_share tmp1 = 1. / capital_share design = np.array([[1 - tmp * policy_state, tmp1 - tmp * policy_shock], [1 - tmp1 * policy_state, tmp1 * (1-policy_shock)], [policy_state, policy_shock]]) transition = ( np.array([[transition_state, transition_shock], [0, technology_shock_persistence]])) return design, transition def transform_discount_rate(self, param, untransform=False): epsilon = 1e-4 if not untransform: return np.abs(1 / (1 + np.exp(param)) - epsilon) else: return np.log((1 - param + epsilon) / (param + epsilon)) def transform_disutility_labor(self, param, untransform=False): return param**2 if not untransform else param**0.5 def transform_depreciation_rate(self, param, untransform=False): return param**2 if not untransform else param**0.5 def transform_capital_share(self, param, untransform=False): epsilon = 1e-4 if not untransform: return np.abs(1 / (1 + np.exp(param)) - epsilon) else: return np.log((1 - param + epsilon) / (param + epsilon)) def transform_technology_shock_persistence(self, param, untransform=False): if not untransform: return param / (1 + np.abs(param)) else: return param / (1 - param) def transform_technology_shock_var(self, unconstrained, untransform=False): return unconstrained**2 if not untransform else unconstrained**0.5 def transform_params(self, unconstrained): constrained = np.zeros(unconstrained.shape, unconstrained.dtype) i = 0 for param in self.parameters.keys(): if param not in self.calibrated: method = getattr(self, 'transform_%s' % param) constrained[i] = method(unconstrained[i]) i += 1 constrained[self.k_estimated:] = unconstrained[self.k_estimated:]**2 return constrained def untransform_params(self, constrained): unconstrained = np.zeros(constrained.shape, constrained.dtype) i = 0 for param in self.parameters.keys(): if param not in self.calibrated: method = getattr(self, 'transform_%s' % param) unconstrained[i] = method(constrained[i], untransform=True) i += 1 unconstrained[self.k_estimated:] = constrained[self.k_estimated:]**0.5 return unconstrained def update(self, params, **kwargs): params = super(SimpleRBC, self).update(params, **kwargs) structural_params = np.zeros(self.k_params, dtype=params.dtype) structural_params[self.idx_calibrated] = list(self.calibrated.values()) structural_params[self.idx_estimated] = params[:self.k_estimated] measurement_variances = params[self.k_estimated:] design, transition = self.solve(structural_params) self['design'] = design self['obs_cov', 0, 0] = measurement_variances[0] self['obs_cov', 1, 1] = measurement_variances[1] self['obs_cov', 2, 2] = measurement_variances[2] self['transition'] = transition self['state_cov', 0, 0] = structural_params[self.idx_tech_var] calibrated = { 'discount_rate': 0.95, 'disutility_labor': 3.0, 'capital_share': 0.36, 'depreciation_rate': 0.025, 'technology_shock_persistence': 0.85, 'technology_shock_var': 0.012**2 } def plot_irfs_plotly(irfs): fig = go.Figure() fig.add_trace(go.Scatter(x=list(range(len(irfs['output']))), y=irfs['output'], mode='lines+markers', name='Output')) fig.add_trace(go.Scatter(x=list(range(len(irfs['labor']))), y=irfs['labor'], mode='lines+markers', name='Labor')) fig.add_trace(go.Scatter(x=list(range(len(irfs['consumption']))), y=irfs['consumption'], mode='lines+markers', name='Consumption')) fig.update_layout( title="Función Impulso Respuesta", xaxis_title="Años después del shcok", yaxis_title="Impulso respuesta (%)", legend_title="Variables", template = "plotly_dark" ) return fig def plot_states_plotly(res, rbc_data): fig = go.Figure() capital = res.smoothed_state[0, :] shock = res.smoothed_state[1, :] fig.add_trace(go.Scatter(x=rbc_data.index, y=capital, mode='lines', name='Capital')) fig.add_trace(go.Scatter(x=rbc_data.index, y=shock, mode='lines', name='Technology process')) fig.update_layout( title="State Variables over Time", xaxis_title="Time", yaxis_title="Value", legend_title="Variables", template = "plotly_dark" ) return fig def plot_rbc_model(pais, persistence, shock_variance, discount_rate, disutility_labor, capital_share, depreciation_rate): calibrated = { 'discount_rate': discount_rate, 'disutility_labor': disutility_labor, 'capital_share': capital_share, 'depreciation_rate': depreciation_rate, 'technology_shock_persistence': persistence, 'technology_shock_var': shock_variance ** 2 } calibrated_mod = SimpleRBC(rbc_data[pais], calibrated=calibrated) calibrated_res = calibrated_mod.fit(method='nm', maxiter=1000, disp=0) calibrated_irfs_pos = calibrated_res.impulse_responses(40, orthogonalized=True) * 100 calibrated_irfs_neg = -calibrated_irfs_pos fig_pos = plot_irfs_plotly(pd.DataFrame(calibrated_irfs_pos, columns=['output', 'labor', 'consumption'])) fig_neg = plot_irfs_plotly(pd.DataFrame(calibrated_irfs_neg, columns=['output', 'labor', 'consumption'])) summary_df = calibrated_res.summary().tables[1].data summary_df = pd.DataFrame(summary_df[1:], columns=summary_df[0]) estimated_coefficients = summary_df['coef'].astype(float) estimated_var_output = estimated_coefficients[summary_df.index[0]] estimated_var_labor = estimated_coefficients[summary_df.index[1]] estimated_var_consumption = estimated_coefficients[summary_df.index[2]] var_output = np.var(rbc_data_2[pais]["output"]) var_consumption = np.var(rbc_data_2[pais]["consumption"]) var_labor = np.var(rbc_data_2[pais]["labor"]) var_data = pd.DataFrame({ 'Variable': ['Output Real', 'Consumption', 'Labor'], 'Varianza Real': [var_output, var_consumption, var_labor], 'Varianza Estimada': [estimated_var_output, estimated_var_consumption, estimated_var_labor], 'Diferencia': [abs(var_output - estimated_var_output), abs(var_consumption - estimated_var_consumption), abs(var_labor - estimated_var_labor)] }) return fig_pos, fig_neg, summary_df, var_data with gr.Blocks() as demo: with gr.Row(): gr.Markdown("### Real Business Cycle (RBC) Model Dashboard") with gr.Tab("Ecuaciones del Modelo"): gr.Markdown(r""" ### Ecuaciones del Modelo RBC - **FOC estática**: $$ \psi c_t = (1 - \alpha) z_t \left( \frac{k_t}{n_t} \right)^{\alpha} $$ - **Ecuación de Euler**: $$ \frac{1}{c_t} = \beta E_t \left\{ \frac{1}{c_{t+1}} \left[ \alpha z_{t+1} \left( \frac{k_{t+1}}{n_{t+1}} \right)^{\alpha-1} + (1 - \delta) \right] \right\} $$ - **Función de producción**: $$ y_t = z_t k_t^{\alpha} n_t^{1 - \alpha} $$ - **Restricción de recursos agregados**: $$ y_t = c_t + i_t $$ - **Acumulación de capital**: $$ k_{t+1} = (1 - \delta)k_t + i_t $$ - **Comercio entre trabajo y ocio**: $$ 1 = l_t + n_t $$ - **Transición del choque tecnológico**: $$ \log z_t = \rho \log z_{t-1} + \varepsilon_t $$ """) #with gr.Tab("Gráfico del País"): # pais_plot = gr.Dropdown(label="Clasificación Fondo", choices=["Argentina", "Brazil", "Chile", "Uruguay", "Colombia"]) # plot_output = gr.Plot() # pais_plot.change(generar_grafico_pais, pais_plot, plot_output) with gr.Tab("Calibración del Modelo"): pais_selec = gr.Dropdown(choices=["Argentina", "Brazil", "Chile", "Uruguay", "Colombia"], label="Selecciona un país") persistence = gr.Slider(label="Persistencia del choque tecnológico", minimum=0.5, maximum=1.0, value=0.85) shock_variance = gr.Slider(label="Desviación estándar del choque tecnológico", minimum=0.01, maximum=0.05, value=0.012) discount_rate = gr.Number(label="Tasa de descuento (β)", value=0.95) disutility_labor = gr.Number(label="Desutilidad del trabajo", value=3.0) capital_share = gr.Number(label="Participación del capital (α)", value=0.36) depreciation_rate = gr.Number(label="Tasa de depreciación", value=0.025) btn = gr.Button("Actualizar Modelo") output_pos = gr.Plot(label="Respuesta ante un Choque Tecnológico Positivo") output_neg = gr.Plot(label="Respuesta ante un Choque Tecnológico Negativo") with gr.Tab("Estadísticas del Modelo"): output_stats = gr.DataFrame(label="Output estadístico del modelo", type="pandas") output_var = gr.DataFrame(label="Varianzas reales", type = "pandas") btn.click(fn=plot_rbc_model, inputs=[pais_selec,persistence, shock_variance, discount_rate, disutility_labor, capital_share, depreciation_rate], outputs=[output_pos, output_neg, output_stats,output_var]) demo.launch()