| from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash |
| import pandas as pd |
| import numpy as np |
| import plotly.express as px |
| import plotly.utils |
| import json |
|
|
| sales_prediction_bp = Blueprint('sales_prediction', __name__, url_prefix='/forecast/sales') |
|
|
| _current_df = None |
|
|
| def get_summary_stats(df): |
| return { |
| 'total_rows': len(df), |
| 'total_columns': len(df.columns), |
| 'columns': list(df.columns), |
| 'numeric_columns': list(df.select_dtypes(include=[np.number]).columns), |
| 'categorical_columns': list(df.select_dtypes(exclude=[np.number]).columns), |
| 'missing_values': df.isnull().sum().to_dict() |
| } |
|
|
| @sales_prediction_bp.route('/', methods=['GET']) |
| def show_sales_prediction(): |
| return render_template('sales_prediction.html', title="Sales Prediction Engine") |
|
|
| @sales_prediction_bp.route('/upload', methods=['POST']) |
| def upload_file(): |
| if 'sales_file' not in request.files: |
| flash('No file selected') |
| return redirect(url_for('sales_prediction.show_sales_prediction')) |
| |
| file = request.files['sales_file'] |
| if file.filename == '': |
| flash('No file selected') |
| return redirect(url_for('sales_prediction.show_sales_prediction')) |
| |
| try: |
| global _current_df |
| _current_df = pd.read_csv(file) |
| preview_data = _current_df.head().to_dict('records') |
| summary_stats = get_summary_stats(_current_df) |
| return render_template('sales_prediction.html', |
| title="Sales Prediction Engine", |
| preview_data=preview_data, |
| columns=_current_df.columns.tolist(), |
| summary_stats=summary_stats) |
| except Exception as e: |
| flash(f'Error processing file: {str(e)}') |
| return redirect(url_for('sales_prediction.show_sales_prediction')) |
|
|
| @sales_prediction_bp.route('/plot', methods=['POST']) |
| def create_plot(): |
| try: |
| global _current_df |
| if _current_df is None: |
| return jsonify(success=False, |
| error="Please upload the CSV first.") |
|
|
| column = request.form.get('column') |
| plot_type = request.form.get('plot_type') |
| NUMERIC = {'sales_units', 'sales_revenue', 'price'} |
| CATEGORICAL = {'sku', 'category', 'region', 'channel', 'brand'} |
|
|
| if column not in NUMERIC.union(CATEGORICAL): |
| return jsonify(success=False, |
| error=f"Column '{column}' not supported.") |
|
|
| if column in NUMERIC: |
| _current_df[column] = pd.to_numeric(_current_df[column], |
| errors='coerce') |
|
|
| df = _current_df.dropna(subset=[column]) |
| if plot_type == 'histogram' and column in NUMERIC: |
| fig = px.histogram(df, x=column, nbins=40, |
| title=f'Distribution of {column}') |
|
|
| elif plot_type == 'box' and column in NUMERIC: |
| fig = px.box(df, y=column, |
| title=f'Box-plot of {column}') |
|
|
| elif plot_type == 'bar' and column in CATEGORICAL: |
| agg_df = (df.groupby(column, as_index=False) |
| ['sales_units'].sum() |
| .sort_values('sales_units', ascending=False) |
| .head(25)) |
|
|
| fig = px.bar(agg_df, x=column, y='sales_units', |
| title=f'{column} – total units sold', |
| labels={'sales_units': 'Units sold'}) |
|
|
| fig.update_layout(xaxis_categoryorder='total descending') |
|
|
| else: |
| return jsonify(success=False, |
| error=f"'{plot_type}' not allowed for '{column}'") |
|
|
| plot_json = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder) |
| return jsonify(success=True, plot=plot_json) |
|
|
| except Exception as e: |
| return jsonify(success=False, error=str(e)) |
| |
| @sales_prediction_bp.route('/fix_nulls', methods=['POST']) |
| def fix_nulls(): |
| try: |
| global _current_df |
| if _current_df is None: |
| return jsonify(success=False, error="No data available. Please upload a CSV file first.") |
| column = request.form.get('column') |
| method = request.form.get('method') |
| if column not in _current_df.columns: |
| return jsonify(success=False, error=f"Column '{column}' not found") |
| nulls_before = int(_current_df[column].isnull().sum()) |
| if nulls_before == 0: |
| return jsonify(success=False, error=f"No nulls to fix in '{column}'") |
| if method == 'drop': |
| _current_df = _current_df.dropna(subset=[column]) |
| elif method == 'mean': |
| _current_df[column] = _current_df[column].fillna(_current_df[column].mean()) |
| elif method == 'median': |
| _current_df[column] = _current_df[column].fillna(_current_df[column].median()) |
| elif method == 'mode': |
| _current_df[column] = _current_df[column].fillna(_current_df[column].mode()[0]) |
| else: |
| return jsonify(success=False, error="Invalid method") |
| nulls_after = int(_current_df[column].isnull().sum()) |
| summary_stats = get_summary_stats(_current_df) |
| preview_data = _current_df.head().to_dict('records') |
| columns = list(_current_df.columns) |
| return jsonify( |
| success=True, |
| message=f"Fixed {nulls_before - nulls_after} nulls in '{column}' using {method}.", |
| summary_stats=summary_stats, |
| preview_data=preview_data, |
| columns=columns |
| ) |
| except Exception as e: |
| return jsonify(success=False, error=str(e)) |
| |
| @sales_prediction_bp.route('/run_forecast', methods=['POST']) |
| def run_forecast(): |
| try: |
| global _current_df |
| if _current_df is None: |
| return jsonify(success=False, error="No data available. Please upload a CSV file first.") |
| model = request.form.get('model') |
| date_col = request.form.get('date_col') |
| target_col = request.form.get('target_col') |
| horizon = int(request.form.get('horizon', 30)) |
|
|
| if date_col not in _current_df.columns or target_col not in _current_df.columns: |
| return jsonify(success=False, error="Invalid column selection.") |
|
|
| df = _current_df.copy() |
| df = df[[date_col, target_col]].dropna() |
| |
| try: |
| df[date_col] = pd.to_datetime(df[date_col], dayfirst=True, errors='coerce') |
| except Exception: |
| df[date_col] = pd.to_datetime(df[date_col], errors='coerce') |
| df = df.dropna(subset=[date_col]) |
| df = df.sort_values(date_col) |
| df = df.groupby(date_col)[target_col].sum().reset_index() |
|
|
| |
| if len(df) < horizon + 5: |
| return jsonify(success=False, error="Not enough data for the selected forecast horizon.") |
|
|
| train = df.iloc[:-horizon] |
| test = df.iloc[-horizon:] |
|
|
| forecast = [] |
| conf_int = [] |
| trend = [] |
| seasonality = [] |
| metrics = {} |
|
|
| if model == "ARIMA": |
| try: |
| from statsmodels.tsa.arima.model import ARIMA |
| ts = train.set_index(date_col)[target_col] |
| arima = ARIMA(ts, order=(1,1,1)).fit() |
| pred = arima.get_forecast(steps=horizon) |
| forecast = pred.predicted_mean.values.tolist() |
| conf_int = pred.conf_int().values.tolist() |
| trend = [np.mean(ts)] * horizon |
| seasonality = [0] * horizon |
| y_true = test[target_col].values |
| y_pred = forecast |
| metrics = { |
| "MAPE": float(np.mean(np.abs((y_true - y_pred) / y_true)) * 100), |
| "RMSE": float(np.sqrt(np.mean((y_true - y_pred) ** 2))), |
| "R2": float(1 - np.sum((y_true - y_pred) ** 2) / np.sum((y_true - np.mean(y_true)) ** 2)) |
| } |
| except Exception as e: |
| return jsonify(success=False, error=f"ARIMA error: {e}") |
|
|
| elif model == "Prophet": |
| try: |
| from prophet import Prophet |
| prophet_df = train.rename(columns={date_col: "ds", target_col: "y"}) |
| m = Prophet() |
| m.fit(prophet_df) |
| future = m.make_future_dataframe(periods=horizon) |
| forecast_df = m.predict(future) |
| forecast = forecast_df['yhat'][-horizon:].values.tolist() |
| conf_int = list(zip(forecast_df['yhat_lower'][-horizon:].values, forecast_df['yhat_upper'][-horizon:].values)) |
| trend = forecast_df['trend'][-horizon:].values.tolist() |
| seasonality = (forecast_df['seasonal'][-horizon:].values.tolist() if 'seasonal' in forecast_df else [0]*horizon) |
| y_true = test[target_col].values |
| y_pred = forecast |
| metrics = { |
| "MAPE": float(np.mean(np.abs((y_true - y_pred) / y_true)) * 100), |
| "RMSE": float(np.sqrt(np.mean((y_true - y_pred) ** 2))), |
| "R2": float(1 - np.sum((y_true - y_pred) ** 2) / np.sum((y_true - np.mean(y_true)) ** 2)) |
| } |
| except Exception as e: |
| return jsonify(success=False, error=f"Prophet error: {e}") |
|
|
| elif model in ["Random Forest", "XGBoost", "LSTM"]: |
| y_true = test[target_col].values |
| y_pred = [np.mean(train[target_col])] * horizon |
| forecast = y_pred |
| conf_int = [[y-5, y+5] for y in y_pred] |
| trend = [np.mean(train[target_col])] * horizon |
| seasonality = [0] * horizon |
| metrics = { |
| "MAPE": float(np.mean(np.abs((y_true - y_pred) / y_true)) * 100), |
| "RMSE": float(np.sqrt(np.mean((y_true - y_pred) ** 2))), |
| "R2": float(1 - np.sum((y_true - y_pred) ** 2) / np.sum((y_true - np.mean(y_true)) ** 2)) |
| } |
|
|
| forecast_dates = test[date_col].dt.strftime('%Y-%m-%d').tolist() |
|
|
| return jsonify( |
| success=True, |
| forecast=forecast, |
| conf_int=conf_int, |
| trend=trend, |
| seasonality=seasonality, |
| metrics=metrics, |
| dates=forecast_dates, |
| model=model |
| ) |
| except Exception as e: |
| return jsonify(success=False, error=str(e)) |