|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from fastapi import FastAPI, HTTPException |
|
|
from fastapi.responses import FileResponse |
|
|
from pydantic import BaseModel |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
from sklearn.preprocessing import MinMaxScaler |
|
|
from statsmodels.tsa.arima.model import ARIMA |
|
|
import matplotlib.pyplot as plt |
|
|
import uvicorn |
|
|
import warnings |
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
|
|
|
class LSTMResidual(nn.Module): |
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
self.lstm = nn.LSTM(1, 64, num_layers=2, batch_first=True) |
|
|
self.dropout = nn.Dropout(0.2) |
|
|
self.fc = nn.Linear(64, 1) |
|
|
|
|
|
def forward(self, x): |
|
|
out, _ = self.lstm(x) |
|
|
out = self.dropout(out[:, -1, :]) |
|
|
return self.fc(out) |
|
|
|
|
|
|
|
|
df = pd.read_csv("Name_Updated_Retail_Inventory.csv") |
|
|
df.columns = df.columns.str.strip() |
|
|
df['Product ID'] = df['Product ID'].astype(str).str.strip() |
|
|
df['Date'] = pd.to_datetime(df['Date'], format="%d-%m-%Y") |
|
|
|
|
|
checkpoint = torch.load("hybrid_inventory_model.pt", weights_only=False) |
|
|
model = LSTMResidual() |
|
|
model.load_state_dict(checkpoint['lstm_state_dict']) |
|
|
model.eval() |
|
|
scaler = checkpoint['scaler'] |
|
|
|
|
|
|
|
|
app = FastAPI(title="π¦ Inventory Forecast API") |
|
|
|
|
|
|
|
|
class ProductRequest(BaseModel): |
|
|
product_id: str |
|
|
|
|
|
|
|
|
@app.post("/forecast-product/") |
|
|
def forecast_product(request: ProductRequest): |
|
|
product_id = request.product_id.strip() |
|
|
if product_id not in df['Product ID'].unique(): |
|
|
raise HTTPException(status_code=404, detail=f"β Invalid Product ID '{product_id}'.") |
|
|
|
|
|
product_data = df[df['Product ID'] == product_id].sort_values('Date') |
|
|
|
|
|
if product_data['Date'].nunique() < 50: |
|
|
raise HTTPException(status_code=400, detail=f"β Not enough unique days of data for Product ID '{product_id}'.") |
|
|
|
|
|
try: |
|
|
ts = product_data.groupby('Date')['Inventory Level'].mean().asfreq('D').fillna(method='ffill') |
|
|
if ts.empty or len(ts) < 10: |
|
|
raise ValueError("β Inventory time series too short for ARIMA.") |
|
|
|
|
|
arima_model = ARIMA(ts, order=(2, 1, 1)) |
|
|
arima_result = arima_model.fit() |
|
|
arima_forecast = arima_result.forecast(steps=30) |
|
|
|
|
|
residuals = arima_result.resid.dropna() |
|
|
if len(residuals) < 10: |
|
|
raise ValueError("β Not enough residuals for LSTM.") |
|
|
|
|
|
residuals_scaled = scaler.transform(residuals.values.reshape(-1, 1)) |
|
|
|
|
|
|
|
|
seq_len = 7 |
|
|
next_30 = [] |
|
|
last_input = residuals_scaled[-seq_len:].copy() |
|
|
for _ in range(30): |
|
|
input_tensor = torch.tensor(last_input.reshape(1, seq_len, 1), dtype=torch.float32) |
|
|
with torch.no_grad(): |
|
|
pred = model(input_tensor).item() |
|
|
next_30.append(pred) |
|
|
last_input = np.append(last_input[1:], [[pred]], axis=0) |
|
|
|
|
|
predicted_residuals_unscaled = scaler.inverse_transform(np.array(next_30).reshape(-1, 1)).flatten() |
|
|
final_forecast_30 = arima_forecast.values + predicted_residuals_unscaled |
|
|
estimated_sales = (final_forecast_30[:-1] - final_forecast_30[1:]).sum() |
|
|
|
|
|
name = product_data['Product Name'].iloc[0] |
|
|
|
|
|
|
|
|
plt.figure(figsize=(12, 5)) |
|
|
plt.plot(ts[-60:], label="Historical Inventory", color='skyblue') |
|
|
future_dates = pd.date_range(start=ts.index[-1] + pd.Timedelta(days=1), periods=30) |
|
|
plt.plot(future_dates, final_forecast_30, label="Forecast", color='orange') |
|
|
plt.title(f"Inventory Forecast for {name}") |
|
|
plt.xlabel("Date") |
|
|
plt.ylabel("Inventory Level") |
|
|
plt.legend() |
|
|
plt.tight_layout() |
|
|
plt.savefig("forecast.png") |
|
|
plt.close() |
|
|
|
|
|
return { |
|
|
"Product ID": product_id, |
|
|
"Product Name": name, |
|
|
"Inventory Range (Next 30 Days)": { |
|
|
"Min": round(final_forecast_30.min(), 2), |
|
|
"Max": round(final_forecast_30.max(), 2) |
|
|
}, |
|
|
"Estimated Sales (Next 30 Days)": round(estimated_sales, 2), |
|
|
"Download Forecast Plot": "http://localhost:8000/forecast-plot/" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=f"β Forecast failed: {str(e)}") |
|
|
|
|
|
|
|
|
@app.get("/predict-sales/") |
|
|
def predict_sales(): |
|
|
predicted_sales_summary = [] |
|
|
|
|
|
for product_id in df['Product ID'].unique(): |
|
|
product_data = df[df['Product ID'] == product_id].sort_values('Date') |
|
|
ts = product_data.groupby('Date')['Inventory Level'].mean().asfreq('D').fillna(method='ffill') |
|
|
if len(ts) < 50: |
|
|
continue |
|
|
|
|
|
try: |
|
|
arima_model = ARIMA(ts, order=(2,1,1)) |
|
|
arima_result = arima_model.fit() |
|
|
arima_forecast = arima_result.forecast(steps=30) |
|
|
residuals = arima_result.resid.dropna() |
|
|
residuals_scaled = scaler.transform(residuals.values.reshape(-1, 1)) |
|
|
|
|
|
|
|
|
seq_len = 7 |
|
|
next_30 = [] |
|
|
last_input = residuals_scaled[-seq_len:].copy() |
|
|
for _ in range(30): |
|
|
input_tensor = torch.tensor(last_input.reshape(1, seq_len, 1), dtype=torch.float32) |
|
|
with torch.no_grad(): |
|
|
pred = model(input_tensor).item() |
|
|
next_30.append(pred) |
|
|
last_input = np.append(last_input[1:], [[pred]], axis=0) |
|
|
|
|
|
predicted_residuals_unscaled = scaler.inverse_transform(np.array(next_30).reshape(-1, 1)).flatten() |
|
|
final_forecast_30 = arima_forecast.values.flatten() + predicted_residuals_unscaled |
|
|
estimated_sales = final_forecast_30[:-1] - final_forecast_30[1:] |
|
|
total_sales = estimated_sales.sum() |
|
|
|
|
|
name = product_data['Product Name'].iloc[0] |
|
|
|
|
|
predicted_sales_summary.append({ |
|
|
'Product ID': product_id, |
|
|
'Product Name': name, |
|
|
'Predicted Total Sales (Units)': round(total_sales, 2) |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"β Skipping {product_id} due to error: {e}") |
|
|
continue |
|
|
|
|
|
result_df = pd.DataFrame(predicted_sales_summary).sort_values(by='Predicted Total Sales (Units)', ascending=False) |
|
|
|
|
|
return { |
|
|
"top_5": result_df.head(5).to_dict(orient="records"), |
|
|
"bottom_5": result_df.tail(5).to_dict(orient="records"), |
|
|
"all_predictions": result_df.to_dict(orient="records") |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/forecast-plot/") |
|
|
def get_forecast_plot(): |
|
|
return FileResponse("forecast.png", media_type="image/png", filename="forecast.png") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) |
|
|
|