|
|
|
|
|
import os |
|
|
import joblib |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from datetime import datetime |
|
|
from flask import Flask, request, jsonify |
|
|
import shutil |
|
|
|
|
|
|
|
|
try: |
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
|
except NameError: |
|
|
|
|
|
BASE_DIR = os.getcwd() |
|
|
|
|
|
DEFAULT_MODEL_PATH = os.path.join(BASE_DIR, "superkart_rf_best_pipeline.joblib") |
|
|
MODEL_PATH = os.environ.get("MODEL_PATH", DEFAULT_MODEL_PATH) |
|
|
|
|
|
APP_NAME = "SuperKart_Sales_Forecast_API" |
|
|
|
|
|
MODEL_PATH = os.environ.get("MODEL_PATH", DEFAULT_MODEL_PATH) |
|
|
CURRENT_YEAR = int(os.environ.get("CURRENT_YEAR", datetime.now().year)) |
|
|
|
|
|
|
|
|
def ensure_model_present(): |
|
|
if MODEL_PATH == DEFAULT_MODEL_PATH and not os.path.exists(MODEL_PATH): |
|
|
candidates = [ |
|
|
os.path.join("/content/backend_files", "superkart_rf_best_pipeline.joblib"), |
|
|
os.path.join("/content", "superkart_rf_best_pipeline.joblib"), |
|
|
] |
|
|
for src in candidates: |
|
|
if os.path.exists(src): |
|
|
os.makedirs(os.path.dirname(MODEL_PATH), exist_ok=True) |
|
|
shutil.copy(src, MODEL_PATH) |
|
|
print(f"[INFO] Copied model from {src} to {MODEL_PATH}") |
|
|
return |
|
|
raise FileNotFoundError( |
|
|
f"Model file not found. Checked: {candidates}. " |
|
|
"Upload the model or set env var MODEL_PATH to the correct file." |
|
|
) |
|
|
|
|
|
RAW_FIELDS = [ |
|
|
"Product_Id", |
|
|
"Product_Weight", |
|
|
"Product_Sugar_Content", |
|
|
"Product_Allocated_Area", |
|
|
"Product_Type", |
|
|
"Product_MRP", |
|
|
"Store_Id", |
|
|
"Store_Establishment_Year", |
|
|
"Store_Age", |
|
|
"Store_Size", |
|
|
"Store_Location_City_Type", |
|
|
"Store_Type", |
|
|
] |
|
|
|
|
|
def map_product_category(pid): |
|
|
pid = str(pid) |
|
|
prefix = pid[:2].upper() |
|
|
if prefix == "FD": return "Food" |
|
|
if prefix == "NC": return "Non-Consumable" |
|
|
if prefix == "DR": return "Drinks" |
|
|
return "Other" |
|
|
|
|
|
def clean_sugar(x): |
|
|
s = str(x).strip().lower() |
|
|
if "low" in s: return "Low Sugar" |
|
|
if "no" in s: return "No Sugar" |
|
|
if "reg" in s or "regular" in s: return "Regular" |
|
|
return s.title() if s else s |
|
|
|
|
|
def bin_allocated_area(x): |
|
|
v = pd.to_numeric(x, errors="coerce") |
|
|
if pd.isna(v): |
|
|
return np.nan |
|
|
|
|
|
if v < 0.02: |
|
|
return "Very Small" |
|
|
elif v < 0.05: |
|
|
return "Small" |
|
|
elif v < 0.10: |
|
|
return "Medium" |
|
|
else: |
|
|
return "Large" |
|
|
|
|
|
def bin_mrp(x): |
|
|
v = pd.to_numeric(x, errors="coerce") |
|
|
if pd.isna(v): return np.nan |
|
|
if v < 100: return "Low" |
|
|
elif v < 150: return "Medium" |
|
|
elif v < 200: return "High" |
|
|
else: return "Premium" |
|
|
|
|
|
def engineer_features(df_raw: pd.DataFrame) -> pd.DataFrame: |
|
|
df = df_raw.copy() |
|
|
|
|
|
if "Product_Id" in df.columns: |
|
|
df["Product_Category"] = df["Product_Id"].map(map_product_category) |
|
|
else: |
|
|
df["Product_Category"] = np.nan |
|
|
|
|
|
if "Product_Sugar_Content" in df.columns: |
|
|
df["Product_Sugar_Content"] = df["Product_Sugar_Content"].apply(clean_sugar) |
|
|
|
|
|
|
|
|
if "Product_Allocated_Area" in df.columns: |
|
|
df["Allocated_Area_Bins"] = df["Product_Allocated_Area"].apply(bin_allocated_area) |
|
|
else: |
|
|
df["Allocated_Area_Bins"] = np.nan |
|
|
|
|
|
if "Store_Age" not in df.columns or df["Store_Age"].isna().all(): |
|
|
if "Store_Establishment_Year" in df.columns: |
|
|
df["Store_Age"] = (CURRENT_YEAR - pd.to_numeric(df["Store_Establishment_Year"], errors="coerce")).clip(lower=0) |
|
|
else: |
|
|
df["Store_Age"] = np.nan |
|
|
|
|
|
if "Product_MRP" in df.columns: |
|
|
df["MRP_Bins"] = df["Product_MRP"].apply(bin_mrp) |
|
|
else: |
|
|
df["MRP_Bins"] = np.nan |
|
|
|
|
|
if "Product_MRP" in df.columns and "Product_Weight" in df.columns: |
|
|
mrp = pd.to_numeric(df["Product_MRP"], errors="coerce") |
|
|
wgt = pd.to_numeric(df["Product_Weight"], errors="coerce").replace(0, np.nan) |
|
|
df["Unit_Value"] = mrp / wgt |
|
|
else: |
|
|
df["Unit_Value"] = np.nan |
|
|
|
|
|
if "Store_Type" in df.columns and "Product_Type" in df.columns: |
|
|
df["Store_Product_Interaction"] = df["Store_Type"].astype(str) + "__" + df["Product_Type"].astype(str) |
|
|
else: |
|
|
df["Store_Product_Interaction"] = np.nan |
|
|
|
|
|
if "MRP_Bins" in df.columns and "Store_Type" in df.columns: |
|
|
df["MRPBin_StoreType"] = df["MRP_Bins"].astype(str) + "__" + df["Store_Type"].astype(str) |
|
|
|
|
|
return df |
|
|
|
|
|
app = Flask(APP_NAME) |
|
|
|
|
|
|
|
|
try: |
|
|
ensure_model_present() |
|
|
except NameError: |
|
|
pass |
|
|
except Exception as e: |
|
|
print(f"[WARN] {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
model = joblib.load(MODEL_PATH) |
|
|
print(f"[INFO] Loaded model from {MODEL_PATH}") |
|
|
except Exception as e: |
|
|
print(f"[ERROR] Failed to load model: {e}") |
|
|
model = None |
|
|
|
|
|
@app.get("/") |
|
|
def root(): |
|
|
return jsonify({ |
|
|
"service": APP_NAME, |
|
|
"status": "ok", |
|
|
"message": "POST to /v1/forecast/single (JSON) or /v1/forecast/batch (CSV as 'file')", |
|
|
"raw_fields": RAW_FIELDS |
|
|
}) |
|
|
|
|
|
@app.post("/v1/forecast/single") |
|
|
def predict_single(): |
|
|
if model is None: |
|
|
return jsonify({"error": "Model not loaded"}), 500 |
|
|
|
|
|
payload = request.get_json(silent=True) or {} |
|
|
row = {col: payload.get(col, None) for col in RAW_FIELDS} |
|
|
df_raw = pd.DataFrame([row]) |
|
|
|
|
|
try: |
|
|
df_feat = engineer_features(df_raw) |
|
|
for c in ["Product_Id", "Store_Id"]: |
|
|
if c in df_feat.columns: |
|
|
df_feat = df_feat.drop(columns=[c]) |
|
|
|
|
|
yhat = float(model.predict(df_feat)[0]) |
|
|
return jsonify({ |
|
|
"Predicted_Product_Store_Sales_Total": round(yhat, 2), |
|
|
"input_used": df_feat.iloc[0].to_dict() |
|
|
}) |
|
|
except Exception as e: |
|
|
return jsonify({"error": f"Inference failed: {e}"}), 400 |
|
|
|
|
|
@app.post("/v1/forecast/batch") |
|
|
def predict_batch(): |
|
|
if model is None: |
|
|
return jsonify({"error": "Model not loaded"}), 500 |
|
|
|
|
|
file = request.files.get("file") |
|
|
if file is None: |
|
|
return jsonify({"error": "Please POST a CSV file under form field 'file'"}), 400 |
|
|
|
|
|
try: |
|
|
df_raw = pd.read_csv(file) |
|
|
for col in RAW_FIELDS: |
|
|
if col not in df_raw.columns: |
|
|
df_raw[col] = None |
|
|
|
|
|
df_feat = engineer_features(df_raw) |
|
|
for c in ["Product_Id", "Store_Id"]: |
|
|
if c in df_feat.columns: |
|
|
df_feat = df_feat.drop(columns=[c]) |
|
|
|
|
|
preds = model.predict(df_feat) |
|
|
out = df_raw.copy() |
|
|
out["Predicted_Product_Store_Sales_Total"] = preds |
|
|
return jsonify(out.to_dict(orient="records")) |
|
|
except Exception as e: |
|
|
return jsonify({"error": f"Inference failed: {e}"}), 400 |
|
|
|
|
|
if __name__ == "__main__": |
|
|
port = int(os.environ.get("PORT", 7860)) |
|
|
app.run(host="0.0.0.0", port=port) |
|
|
|