import gradio as gr import pandas as pd import plotly.express as px import numpy as np from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense from tensorflow.keras.callbacks import EarlyStopping lstm_explanation = """ ## Understanding LSTM in This App **What is LSTM?** LSTM (Long Short-Term Memory) is a type of neural network designed for time-series data, like housing prices. It excels at capturing patterns in sequential data, making it ideal for predicting future values based on historical trends. **How is it used here?** - The LSTM model uses housing price data since January 2000 for the selected ZIP code. - It takes a 60-month lookback window (5 years) of historical prices to predict the next month's price. - The model learns trends, such as seasonal changes or long-term growth. - 'LSTM Fit on Training Data' shows how well the model learned the patterns in the historical data it was trained on. - 'LSTM Predictions on Hold-out Data' shows the model's predictions for a recent period of actual prices that it wasn't trained on, to evaluate its forecasting ability. - 'LSTM Future Predictions' shows the model's predictions for months *beyond* the available historical data. """ def plot_real_estate(zip_code_str, future_months_to_predict_on_holdout=12, future_months_to_project=6): try: zip_val = int(zip_code_str) except ValueError: return px.line(title=f"Invalid ZIP Code: '{zip_code_str}'. Please enter a numeric ZIP code.") # Read the CSV file df_full = pd.read_csv('https://files.zillowstatic.com/research/public_csvs/zhvi/Zip_zhvi_uc_sfrcondo_tier_0.33_0.67_sm_sa_month.csv') # Extract the data for the given zip code df_zip_subset = df_full[df_full['RegionName'] == zip_val] if df_zip_subset.empty: return px.line(title=f'No data found for Zip Code {zip_val}') # Select the columns with dates and process df_processed = df_zip_subset.loc[:, '2000-01-31':] df_processed = df_processed.T.reset_index() df_processed.columns = ['Date', 'Price'] df_processed['Date'] = pd.to_datetime(df_processed['Date']) df_processed.dropna(subset=['Price'], inplace=True) # Remove rows with NaN prices if any if len(df_processed['Price']) < 60 + future_months_to_predict_on_holdout: return px.line(title=f'Not enough historical data for Zip Code {zip_val} (need at least {60 + future_months_to_predict_on_holdout} months of data).') # Compute the moving averages for window in [3, 6, 12, 24]: df_processed[f'{window}-Month MA'] = df_processed['Price'].rolling(window).mean() # --- Prepare data for LSTM --- prices = df_processed['Price'].values.reshape(-1, 1) # Define split point for scaler fitting (all data except the hold-out "future" part) train_scaler_fit_size = len(prices) - future_months_to_predict_on_holdout if train_scaler_fit_size < 60: return px.line(title=f'Not enough data before hold-out period for Zip Code {zip_val} (need at least 60 months for LSTM lookback).') train_prices_for_scaler = prices[:train_scaler_fit_size] scaler = MinMaxScaler(feature_range=(0, 1)) scaler.fit(train_prices_for_scaler) # Fit scaler ONLY on the training portion scaled_data_full = scaler.transform(prices) # Transform the entire dataset # Create the training sequences (from the part the scaler was fit on) train_sequences_source_data = scaled_data_full[:train_scaler_fit_size] x_train, y_train = [], [] for i in range(60, len(train_sequences_source_data)): x_train.append(train_sequences_source_data[i-60:i, 0]) y_train.append(train_sequences_source_data[i, 0]) if not x_train: # Should be caught by earlier checks, but as a safeguard return px.line(title=f'Not enough data to form training sequences for Zip Code {zip_val}.') x_train, y_train = np.array(x_train), np.array(y_train) x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1)) # Build the LSTM model model = Sequential() model.add(LSTM(units=50, return_sequences=True, input_shape=(x_train.shape[1], 1))) model.add(LSTM(units=50, return_sequences=False)) # Can experiment with more layers or units model.add(Dense(units=25)) model.add(Dense(units=1)) model.compile(optimizer='adam', loss='mean_squared_error') # Train the model - **RECOMMENDATION: Increase epochs and adjust patience** model.fit(x_train, y_train, batch_size=1, epochs=1, # For better results, try epochs=50 or more callbacks=[EarlyStopping(monitor='loss', patience=2)], verbose=0) # verbose=0 to suppress log spam in Gradio # --- Predictions --- # 1. Past predictions (on the training data part for visualization of fit) past_predictions_scaled = model.predict(x_train) past_predictions_actual_scale = scaler.inverse_transform(past_predictions_scaled) # Dates for these past predictions align with y_train targets past_pred_dates = df_processed['Date'].iloc[60 : len(train_sequences_source_data)].reset_index(drop=True) # 2. Predictions on the hold-out set ("future_months_to_predict_on_holdout") x_test_holdout = [] for i in range(future_months_to_predict_on_holdout): seq_start_idx = train_scaler_fit_size - 60 + i # Start of sequence relative to `prices` seq_end_idx = train_scaler_fit_size + i # End of sequence relative to `prices` x_test_holdout.append(scaled_data_full[seq_start_idx:seq_end_idx, 0]) x_test_holdout = np.array(x_test_holdout) x_test_holdout = np.reshape(x_test_holdout, (x_test_holdout.shape[0], x_test_holdout.shape[1], 1)) holdout_predictions_scaled = model.predict(x_test_holdout) holdout_predictions_actual_scale = scaler.inverse_transform(holdout_predictions_scaled) # Dates for these hold-out predictions are the last `future_months_to_predict_on_holdout` dates holdout_pred_dates = df_processed['Date'].iloc[-future_months_to_predict_on_holdout:].reset_index(drop=True) # 3. Future predictions (beyond the available data) future_predictions_scaled = [] last_sequence = scaled_data_full[-60:].reshape(1, 60, 1) for _ in range(future_months_to_project): predicted_scaled = model.predict(last_sequence) future_predictions_scaled.append(predicted_scaled[0, 0]) last_sequence = np.concatenate([last_sequence[:, 1:, :], predicted_scaled.reshape(1, 1, 1)], axis=1) future_predictions_actual_scale = scaler.inverse_transform(np.array(future_predictions_scaled).reshape(-1, 1)) last_actual_date = df_processed['Date'].iloc[-1] future_dates = pd.date_range(start=last_actual_date, periods=future_months_to_project + 1, freq='M')[1:] future_predictions_df = pd.DataFrame({'Date': future_dates, 'Predicted Price': future_predictions_actual_scale.flatten()}) # --- Plotting --- fig = px.line(df_processed, x='Date', y='Price', title=f'Housing Prices & LSTM Analysis for Zip Code {zip_val}') fig.data[0].showlegend = True fig.data[0].name = 'Actual Price' for window in [3, 6, 12, 24]: fig.add_scatter(x=df_processed['Date'], y=df_processed[f'{window}-Month MA'], mode='lines', name=f'{window}-Month MA') # Plot past (training set) predictions if len(past_pred_dates) == len(past_predictions_actual_scale.flatten()): fig.add_scatter(x=past_pred_dates, y=past_predictions_actual_scale.flatten(), mode='lines', line=dict(dash='dash'), name='LSTM Fit on Training Data') # Plot predictions on the hold-out set if len(holdout_pred_dates) == len(holdout_predictions_actual_scale.flatten()): fig.add_scatter(x=holdout_pred_dates, y=holdout_predictions_actual_scale.flatten(), mode='lines', line=dict(color='red'), name='LSTM Predictions on Hold-out Data') # Plot future predictions fig.add_scatter(x=future_predictions_df['Date'], y=future_predictions_df['Predicted Price'], mode='lines', line=dict(color='green'), name='LSTM Future Predictions') fig.update_layout(legend_title_text='Legend') return fig # --- Gradio Interface --- iface = gr.Interface( fn=plot_real_estate, inputs=[ gr.Textbox(label="Enter ZIP Code (e.g., 90210)"), gr.Slider(label="Months for Hold-out Prediction", minimum=6, maximum=36, value=12, step=1), gr.Slider(label="Months to Predict into the Future", minimum=3, maximum=24, value=6, step=1) ], outputs=gr.Plot(), title="Real Estate Price Analysis with LSTM Prediction", description=lstm_explanation, allow_flagging='never' ) if __name__ == '__main__': iface.launch(share=False, debug=True)