Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import PolynomialFeatures | |
| from pymongo import MongoClient | |
| from statsmodels.stats.diagnostic import acorr_ljungbox | |
| import pickle | |
| import joblib | |
| import matplotlib.pyplot as plt | |
| from itertools import product | |
| # --- MongoDB Setup --- | |
| uri = "mongodb+srv://csmith715:I3xSO3ImRKFyQ0hf@cluster0.hc5mw.mongodb.net/" | |
| client = MongoClient(uri) | |
| db = client["gemrate"] | |
| market_data = db["alt_market_data"] | |
| cards = db["gemrate_pokemon_cards"] | |
| # --- Load Models and Encoder --- | |
| gradient_boosting_model = joblib.load("gbm_card_model.joblib") | |
| confidence_model = joblib.load("gbm_card_confidence_model.joblib") | |
| with open("card_encoder.pkl", "rb") as f: | |
| card_encoder = pickle.load(f) | |
| # --- Helper Functions --- | |
| def calculate_moving_averages(df): | |
| df['ds'] = pd.to_datetime(df['ds']) | |
| df['y'] = df['y'].astype(float) | |
| df.sort_values(by=['certnumber', 'grade', 'grader', 'ds'], inplace=True) | |
| df.set_index('ds', inplace=True) | |
| def _rolling_avg(group): | |
| group = group.sort_index() | |
| group['ma_3d'] = group['y'].rolling('3D').mean() | |
| group['ma_7d'] = group['y'].rolling('7D').mean() | |
| group['ma_30d'] = group['y'].rolling('30D').mean() | |
| return group | |
| df = df.groupby(['certnumber', 'grade', 'grader'], group_keys=False).apply(_rolling_avg) | |
| return df.reset_index() | |
| def calculate_reliability(df): | |
| if df.shape[0] > 30: | |
| lags = [5, 10, 30] | |
| elif df.shape[0] > 10: | |
| lags = [5, 10] | |
| else: | |
| return 0.001 | |
| lb_pvals = acorr_ljungbox(df['y'], lags=lags, return_df=True)['lb_pvalue'] | |
| return 1 - np.mean(lb_pvals) | |
| def fetch_spec_data(specid): | |
| float_id = float(specid) | |
| tx_cursor = market_data.find( | |
| {'spec_id': float_id}, | |
| {'_id': 0, 'market_transaction': 1} | |
| ) | |
| card_cursor = cards.find_one( | |
| {'SPECID': float_id}, | |
| {'_id': 0, 'YEAR': 1, 'DETAILS': 1, 'SET_NAME': 1, 'NAME': 1, 'CERTNUMBER': 1} | |
| ) | |
| if not card_cursor: | |
| return pd.DataFrame() | |
| data = [] | |
| for entry in tx_cursor: | |
| tx = entry.get('market_transaction', {}) | |
| attr = tx.get('attributes', {}) | |
| data.append({ | |
| 'certnumber': card_cursor.get('CERTNUMBER'), | |
| 'ds': tx.get('date'), | |
| 'y': tx.get('price'), | |
| 'grade': attr.get('gradeNumber'), | |
| 'grader': attr.get('gradingCompany'), | |
| 'card_year': card_cursor.get('YEAR'), | |
| 'details': card_cursor.get('DETAILS'), | |
| 'set_name': card_cursor.get('SET_NAME'), | |
| 'name': card_cursor.get('NAME'), | |
| }) | |
| df = pd.DataFrame(data) | |
| return df | |
| def transform_data(df): | |
| df['ds'] = pd.to_datetime(df['ds']) | |
| df['day_since'] = (pd.Timestamp.today().normalize() - df['ds']).dt.days | |
| df['year'] = df['ds'].dt.year | |
| df['month'] = df['ds'].dt.month | |
| df['day_of_week'] = df['ds'].dt.dayofweek | |
| df.drop('ds', axis=1, inplace=True) | |
| df = pd.get_dummies(df, columns=['grader']) | |
| df['grade'] = pd.to_numeric(df['grade'], errors='coerce') | |
| poly = PolynomialFeatures(degree=3, include_bias=False) | |
| poly_features = poly.fit_transform(df[['grade']]) | |
| poly_df = pd.DataFrame(poly_features, columns=['grade1', 'grade^2', 'grade^3']) | |
| df = pd.concat([df, poly_df], axis=1).drop(columns=['grade1']) | |
| return df | |
| class PokemonCardPredictor: | |
| def __init__(self): | |
| self.confidence_features = [ | |
| 'grade', 'ma_3d', 'ma_7d', 'ma_30d', | |
| 'count_3d', 'count_7d', 'count_30d', | |
| 'reliability', 'day_since' | |
| ] | |
| self.latest_prices_df = pd.DataFrame() | |
| self.full_df = pd.DataFrame() | |
| def plot_time_series(self, range_option): | |
| if self.latest_prices_df.empty: | |
| return plt.figure() | |
| df = self.latest_prices_df.copy() | |
| df['ds'] = pd.to_datetime(df['ds']) | |
| df['y'] = pd.to_numeric(df['y'], errors='coerce') | |
| df = df.dropna(subset=['y']) | |
| # ⏱ Filter by selected time range | |
| if range_option == "Past Year": | |
| df = df[df['ds'] >= pd.Timestamp.today() - pd.DateOffset(years=1)] | |
| df['time_group'] = df['ds'].dt.to_period('M').dt.to_timestamp() | |
| group_label = "Month" | |
| elif range_option == "Past Month": | |
| df = df[df['ds'] >= pd.Timestamp.today() - pd.DateOffset(months=1)] | |
| df['time_group'] = df['ds'].dt.to_period('D').dt.to_timestamp() | |
| group_label = "Day" | |
| else: # "All Data" | |
| df['time_group'] = df['ds'].dt.to_period('M').dt.to_timestamp() | |
| group_label = "Month" | |
| if df.empty: | |
| fig, ax = plt.subplots() | |
| ax.text(0.5, 0.5, 'No data for selected range.', ha='center', va='center') | |
| ax.axis('off') | |
| return fig | |
| # 📊 Aggregate | |
| grouped_avg = df.groupby('time_group')['y'].mean().reset_index() | |
| fig, ax = plt.subplots(figsize=(8, 4)) | |
| ax.plot(grouped_avg['time_group'], grouped_avg['y'], marker='o') | |
| ax.set_title(f"Average Price by {group_label} ({range_option})") | |
| ax.set_xlabel(group_label) | |
| ax.set_ylabel("Avg Price ($)") | |
| ax.grid(True) | |
| ax.tick_params(axis='x', rotation=45) | |
| plt.tight_layout() | |
| return fig | |
| def predict_all(self, specid, grader, grade): | |
| self.full_df = pd.DataFrame() # Reset | |
| raw_df = fetch_spec_data(specid) | |
| if raw_df.empty: | |
| self.latest_prices_df = pd.DataFrame() # Reset | |
| return "Card info not found.", pd.DataFrame() | |
| known_grades = raw_df['grade'].unique() | |
| known_graders = raw_df['grader'].unique() | |
| for k_grader, k_grade in product(known_graders, known_grades): | |
| _, pred_df = self.predict(raw_df, k_grader, k_grade) | |
| self.full_df = pd.concat([self.full_df, pred_df]) | |
| # Predict selected grade and grader for specific predictive purpose | |
| pred, _ = self.predict(raw_df, grader, grade) | |
| return f"Predicted Price: ${pred:,.2f}", self.full_df.round(2) | |
| def predict(self, cert_df, grader, grade): | |
| df = cert_df[(cert_df['grader'] == grader) & (cert_df['grade'] == grade)] | |
| if df.empty: | |
| self.latest_prices_df = pd.DataFrame() | |
| return "No transactions for this grader and grade.", pd.DataFrame() | |
| self.latest_prices_df = df.copy() # Save full version with ds/y for plotting | |
| df = calculate_moving_averages(df) | |
| df['certnumber_encoded'] = card_encoder.fit_transform(df['certnumber'], df['y']) | |
| df['count_3d'] = df.groupby('certnumber')['ma_3d'].transform('count') | |
| df['count_7d'] = df.groupby('certnumber')['ma_7d'].transform('count') | |
| df['count_30d'] = df.groupby('certnumber')['ma_30d'].transform('count') | |
| latest_df = df[df['ds'] == df['ds'].max()] | |
| if latest_df.empty: | |
| return "No recent transaction to use.", pd.DataFrame() | |
| reliability = calculate_reliability(df) | |
| transformed_df = transform_data(latest_df).fillna(0) | |
| transformed_df = transformed_df[transformed_df['grade'] != 0] | |
| for col in gradient_boosting_model.feature_names_in_: | |
| if col not in transformed_df.columns: | |
| transformed_df[col] = 0 | |
| confidence_df = transformed_df.copy() | |
| confidence_df['reliability'] = reliability | |
| confidence_df['day_since'] = latest_df['day_since'].values | |
| confidence_df = confidence_df[self.confidence_features].fillna(0) | |
| risk_score = confidence_model.predict(confidence_df) | |
| transformed_df = transformed_df[gradient_boosting_model.feature_names_in_] | |
| if transformed_df.empty: | |
| return 'no data', pd.DataFrame() | |
| prediction = gradient_boosting_model.predict(transformed_df) | |
| display_df = pd.DataFrame({ | |
| 'certnumber': latest_df['certnumber'], | |
| 'Grader': latest_df['grader'].values, | |
| 'Grade': latest_df['grade'].values, | |
| # 'Card Year': latest_df['card_year'].values, | |
| 'Name': latest_df['name'].values, | |
| 'Set Name': latest_df['set_name'].values, | |
| # 'Details': latest_df['details'].values, | |
| 'Predicted Price': prediction, | |
| 'Risk': risk_score, | |
| 'Most Recent Price': latest_df['y'].values, | |
| 'Days Since': latest_df['day_since'].values | |
| # 'ma_3d': latest_df['ma_3d'].values, | |
| # 'ma_7d': latest_df['ma_7d'].values, | |
| # 'ma_30d': latest_df['ma_30d'].values, | |
| # 'count_3d': latest_df['count_3d'].values, | |
| # 'count_7d': latest_df['count_7d'].values, | |
| # 'count_30d': latest_df['count_30d'].values | |
| }) | |
| # Filter out duplicate data so that only the highest priced recent trade is displayed | |
| idx = display_df.groupby('certnumber')['Most Recent Price'].idxmax() | |
| display_df = display_df.loc[idx].reset_index(drop=True) | |
| display_df = display_df.drop('certnumber', axis=1) | |
| return prediction[0], display_df | |
| # --- Gradio UI --- | |
| predictor = PokemonCardPredictor() | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## 🎴 Pokémon Card Price Predictor") | |
| with gr.Row(): | |
| # cert_input = gr.Number(label="Cert Number", value=109301427, precision=0) | |
| specid_input = gr.Number(label="Spec ID", value=482897) | |
| grader_input = gr.Dropdown(["PSA", "BGS", "CGC"], value="PSA", label="Grader") | |
| grade_input = gr.Textbox(label="Grade (e.g., 10.0)", value="10.0") | |
| range_selector = gr.Radio( | |
| choices=["Past Month", "Past Year", "All Data"], | |
| value="Past Year", | |
| label="Select Time Range for Plot" | |
| ) | |
| predict_btn = gr.Button("Predict Price") | |
| output_text = gr.Textbox(label="Prediction") | |
| output_table = gr.Dataframe(label="Prediction Details") | |
| output_plot = gr.Plot(label="Price Over Time") | |
| predict_btn.click( | |
| fn=predictor.predict_all, | |
| inputs=[specid_input, grader_input, grade_input], | |
| outputs=[output_text, output_table] | |
| ).then( | |
| fn=predictor.plot_time_series, | |
| inputs=[range_selector], | |
| outputs=output_plot | |
| ) | |
| demo.launch() | |