File size: 7,007 Bytes
94e03df | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | #python -m uvicorn main:app --reload
#GET - http://127.0.0.1:8000/predict-sales/
#http://localhost:8000/forecast-product/
# { "product_id": "P0002" }
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from pydantic import BaseModel
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
from statsmodels.tsa.arima.model import ARIMA
import matplotlib.pyplot as plt
import uvicorn
import warnings
warnings.filterwarnings("ignore")
# === LSTM Model ===
class LSTMResidual(nn.Module):
def __init__(self):
super().__init__()
self.lstm = nn.LSTM(1, 64, num_layers=2, batch_first=True)
self.dropout = nn.Dropout(0.2)
self.fc = nn.Linear(64, 1)
def forward(self, x):
out, _ = self.lstm(x)
out = self.dropout(out[:, -1, :])
return self.fc(out)
# === Load Assets ===
df = pd.read_csv("Name_Updated_Retail_Inventory.csv")
df.columns = df.columns.str.strip()
df['Product ID'] = df['Product ID'].astype(str).str.strip()
df['Date'] = pd.to_datetime(df['Date'], format="%d-%m-%Y")
checkpoint = torch.load("hybrid_inventory_model.pt", weights_only=False)
model = LSTMResidual()
model.load_state_dict(checkpoint['lstm_state_dict'])
model.eval()
scaler = checkpoint['scaler']
# === FastAPI App ===
app = FastAPI(title="π¦ Inventory Forecast API")
# === Request Schema ===
class ProductRequest(BaseModel):
product_id: str
# === 1. Single Product Forecast Endpoint ===
@app.post("/forecast-product/")
def forecast_product(request: ProductRequest):
product_id = request.product_id.strip()
if product_id not in df['Product ID'].unique():
raise HTTPException(status_code=404, detail=f"β Invalid Product ID '{product_id}'.")
product_data = df[df['Product ID'] == product_id].sort_values('Date')
if product_data['Date'].nunique() < 50:
raise HTTPException(status_code=400, detail=f"β Not enough unique days of data for Product ID '{product_id}'.")
try:
ts = product_data.groupby('Date')['Inventory Level'].mean().asfreq('D').fillna(method='ffill')
if ts.empty or len(ts) < 10:
raise ValueError("β Inventory time series too short for ARIMA.")
arima_model = ARIMA(ts, order=(2, 1, 1))
arima_result = arima_model.fit()
arima_forecast = arima_result.forecast(steps=30)
residuals = arima_result.resid.dropna()
if len(residuals) < 10:
raise ValueError("β Not enough residuals for LSTM.")
residuals_scaled = scaler.transform(residuals.values.reshape(-1, 1))
# LSTM Forecast
seq_len = 7
next_30 = []
last_input = residuals_scaled[-seq_len:].copy()
for _ in range(30):
input_tensor = torch.tensor(last_input.reshape(1, seq_len, 1), dtype=torch.float32)
with torch.no_grad():
pred = model(input_tensor).item()
next_30.append(pred)
last_input = np.append(last_input[1:], [[pred]], axis=0)
predicted_residuals_unscaled = scaler.inverse_transform(np.array(next_30).reshape(-1, 1)).flatten()
final_forecast_30 = arima_forecast.values + predicted_residuals_unscaled
estimated_sales = (final_forecast_30[:-1] - final_forecast_30[1:]).sum()
name = product_data['Product Name'].iloc[0]
# Save Plot
plt.figure(figsize=(12, 5))
plt.plot(ts[-60:], label="Historical Inventory", color='skyblue')
future_dates = pd.date_range(start=ts.index[-1] + pd.Timedelta(days=1), periods=30)
plt.plot(future_dates, final_forecast_30, label="Forecast", color='orange')
plt.title(f"Inventory Forecast for {name}")
plt.xlabel("Date")
plt.ylabel("Inventory Level")
plt.legend()
plt.tight_layout()
plt.savefig("forecast.png")
plt.close()
return {
"Product ID": product_id,
"Product Name": name,
"Inventory Range (Next 30 Days)": {
"Min": round(final_forecast_30.min(), 2),
"Max": round(final_forecast_30.max(), 2)
},
"Estimated Sales (Next 30 Days)": round(estimated_sales, 2),
"Download Forecast Plot": "http://localhost:8000/forecast-plot/"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"β Forecast failed: {str(e)}")
# === 2. All Products Summary Endpoint ===
@app.get("/predict-sales/")
def predict_sales():
predicted_sales_summary = []
for product_id in df['Product ID'].unique():
product_data = df[df['Product ID'] == product_id].sort_values('Date')
ts = product_data.groupby('Date')['Inventory Level'].mean().asfreq('D').fillna(method='ffill')
if len(ts) < 50:
continue
try:
arima_model = ARIMA(ts, order=(2,1,1))
arima_result = arima_model.fit()
arima_forecast = arima_result.forecast(steps=30)
residuals = arima_result.resid.dropna()
residuals_scaled = scaler.transform(residuals.values.reshape(-1, 1))
# LSTM Forecast
seq_len = 7
next_30 = []
last_input = residuals_scaled[-seq_len:].copy()
for _ in range(30):
input_tensor = torch.tensor(last_input.reshape(1, seq_len, 1), dtype=torch.float32)
with torch.no_grad():
pred = model(input_tensor).item()
next_30.append(pred)
last_input = np.append(last_input[1:], [[pred]], axis=0)
predicted_residuals_unscaled = scaler.inverse_transform(np.array(next_30).reshape(-1, 1)).flatten()
final_forecast_30 = arima_forecast.values.flatten() + predicted_residuals_unscaled
estimated_sales = final_forecast_30[:-1] - final_forecast_30[1:]
total_sales = estimated_sales.sum()
name = product_data['Product Name'].iloc[0]
predicted_sales_summary.append({
'Product ID': product_id,
'Product Name': name,
'Predicted Total Sales (Units)': round(total_sales, 2)
})
except Exception as e:
print(f"β Skipping {product_id} due to error: {e}")
continue
result_df = pd.DataFrame(predicted_sales_summary).sort_values(by='Predicted Total Sales (Units)', ascending=False)
return {
"top_5": result_df.head(5).to_dict(orient="records"),
"bottom_5": result_df.tail(5).to_dict(orient="records"),
"all_predictions": result_df.to_dict(orient="records")
}
# === 3. Serve Plot Image ===
@app.get("/forecast-plot/")
def get_forecast_plot():
return FileResponse("forecast.png", media_type="image/png", filename="forecast.png")
# === Server Runner ===
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
|