pg032011 / app.py
XRachel's picture
Upload app.py
b65a7ca verified
import os
from functools import lru_cache
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import streamlit as st
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
st.set_page_config(
page_title="FreshWise - Perishable Retail Optimization",
page_icon="🥗",
layout="wide",
initial_sidebar_state="expanded",
)
DATA_CANDIDATES = [
os.environ.get("DATA_PATH", ""),
"perishable_goods_management.csv",
"/app/perishable_goods_management.csv",
"/data/perishable_goods_management.csv",
"/mnt/data/perishable_goods_management.csv",
]
CATEGORY_COLORS = {
"Produce": "#2E8B57",
"Dairy": "#1E90FF",
"Meat": "#B22222",
"Seafood": "#20B2AA",
"Bakery": "#D2691E",
"Ready_to_Eat": "#8A2BE2",
}
def find_data_path() -> str:
for path in DATA_CANDIDATES:
if path and os.path.exists(path):
return path
raise FileNotFoundError(
"perishable_goods_management.csv not found. Put it next to app.py or set DATA_PATH."
)
@st.cache_data(show_spinner=False)
def load_data() -> pd.DataFrame:
path = find_data_path()
df = pd.read_csv(path)
df["transaction_date"] = pd.to_datetime(df["transaction_date"], errors="coerce")
df["expiration_date"] = pd.to_datetime(df["expiration_date"], errors="coerce")
df["sell_through_pct"] = np.where(
df["initial_quantity"] > 0, df["units_sold"] / df["initial_quantity"], 0
)
df["stock_demand_ratio"] = np.where(
df["daily_demand"] > 0, df["initial_quantity"] / df["daily_demand"], np.nan
)
df["gross_margin"] = df["selling_price"] - df["cost_price"]
df["leftover_units"] = (df["initial_quantity"] - df["units_sold"]).clip(lower=0)
df["value_score"] = (
(1 - df["waste_pct"].clip(0, 1)) * 0.35
+ df["profit_margin_pct"].clip(lower=0) / 100 * 0.25
+ (1 - df["days_until_expiry"].clip(upper=14) / 14) * 0.15
+ df["discount_pct"].clip(0, 0.5) * 0.25
)
df["expiry_bucket"] = pd.cut(
df["days_until_expiry"],
bins=[-1, 1, 3, 7, 30, 10_000],
labels=["<=1d", "2-3d", "4-7d", "8-30d", ">30d"],
)
df["high_waste_flag"] = (df["waste_pct"] >= df["waste_pct"].quantile(0.75)).astype(int)
return df
@st.cache_data(show_spinner=False)
def fit_segments(df: pd.DataFrame) -> pd.DataFrame:
work = df[[
"daily_demand",
"initial_quantity",
"waste_pct",
"shelf_life_days",
"stock_demand_ratio",
"sell_through_pct",
]].replace([np.inf, -np.inf], np.nan).dropna().copy()
sample_size = min(len(work), 20000)
work = work.sample(sample_size, random_state=42)
scaler = StandardScaler()
X = scaler.fit_transform(work)
km = KMeans(n_clusters=4, random_state=42, n_init=10)
work["cluster"] = km.fit_predict(X)
return work
@st.cache_resource(show_spinner=False)
def fit_risk_model(df: pd.DataFrame):
features = [
"daily_demand",
"initial_quantity",
"shelf_life_days",
"days_until_expiry",
"temp_deviation",
"temp_abuse_events",
"handling_score",
"packaging_score",
"spoilage_risk",
"discount_pct",
"markdown_applied",
"is_weekend",
"supplier_score",
]
X = df[features]
y = df["high_waste_flag"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
model = RandomForestClassifier(
n_estimators=120, random_state=42, n_jobs=-1, max_depth=10
)
model.fit(X_train, y_train)
importances = pd.Series(model.feature_importances_, index=features).sort_values(ascending=False)
return model, importances
@lru_cache(maxsize=1)
def cluster_name_map():
return {
0: "Stable performers",
1: "Overstocked slow movers",
2: "Short-life high risk",
3: "High demand fast movers",
}
def apply_filters(df: pd.DataFrame):
st.sidebar.header("Filters")
regions = st.sidebar.multiselect("Region", sorted(df["region"].dropna().unique()), default=[])
stores = st.sidebar.multiselect("Store", sorted(df["store_id"].dropna().unique())[:200], default=[])
categories = st.sidebar.multiselect("Category", sorted(df["category"].dropna().unique()), default=[])
expiry_range = st.sidebar.slider("Days until expiry", 0, int(df["days_until_expiry"].max()), (0, 30))
weekend_choice = st.sidebar.selectbox("Day type", ["All", "Weekday", "Weekend"])
filtered = df.copy()
if regions:
filtered = filtered[filtered["region"].isin(regions)]
if stores:
filtered = filtered[filtered["store_id"].isin(stores)]
if categories:
filtered = filtered[filtered["category"].isin(categories)]
filtered = filtered[
(filtered["days_until_expiry"] >= expiry_range[0])
& (filtered["days_until_expiry"] <= expiry_range[1])
]
if weekend_choice == "Weekday":
filtered = filtered[filtered["is_weekend"] == 0]
elif weekend_choice == "Weekend":
filtered = filtered[filtered["is_weekend"] == 1]
return filtered
def metric_row(df: pd.DataFrame):
c1, c2, c3, c4, c5 = st.columns(5)
c1.metric("Waste %", f"{df['waste_pct'].mean():.1%}")
c2.metric("Profit", f"€{df['profit'].mean():.2f}")
c3.metric("Sell-through", f"{df['sell_through_pct'].mean():.1%}")
c4.metric("Units wasted", f"{df['units_wasted'].mean():.1f}")
c5.metric("Markdown rate", f"{df['markdown_applied'].mean():.1%}")
def manager_dashboard(df: pd.DataFrame):
st.subheader("Manager Mode")
metric_row(df)
a, b = st.columns([1.2, 1])
with a:
trend = df.groupby(df["transaction_date"].dt.to_period("M").astype(str))[["waste_pct", "profit"]].mean().reset_index()
fig = go.Figure()
fig.add_trace(go.Scatter(x=trend["transaction_date"], y=trend["waste_pct"], name="Waste %", mode="lines+markers"))
fig.add_trace(go.Scatter(x=trend["transaction_date"], y=trend["profit"], name="Profit", mode="lines+markers", yaxis="y2"))
fig.update_layout(
title="Monthly Waste and Profit Trend",
yaxis=dict(title="Waste %"),
yaxis2=dict(title="Profit", overlaying="y", side="right"),
legend=dict(orientation="h"),
margin=dict(l=10, r=10, t=40, b=10),
)
st.plotly_chart(fig, use_container_width=True)
with b:
top_risk = (
df.groupby("category")[["waste_pct", "profit", "stock_demand_ratio"]]
.mean()
.sort_values("waste_pct", ascending=False)
.head(8)
.reset_index()
)
fig = px.bar(top_risk, x="waste_pct", y="category", orientation="h", title="High Waste Categories")
st.plotly_chart(fig, use_container_width=True)
c1, c2 = st.columns(2)
with c1:
store_risk = (
df.groupby("store_id")[["waste_pct", "profit", "temp_deviation"]]
.mean()
.sort_values(["waste_pct", "temp_deviation"], ascending=[False, False])
.head(15)
.reset_index()
)
st.dataframe(store_risk, use_container_width=True, hide_index=True)
with c2:
expiry = df.groupby("expiry_bucket")[["waste_pct", "profit", "discount_pct"]].mean().reset_index()
fig = px.line(expiry, x="expiry_bucket", y=["waste_pct", "profit", "discount_pct"], markers=True, title="Expiry Stage Performance")
st.plotly_chart(fig, use_container_width=True)
def manager_inventory(df: pd.DataFrame):
st.subheader("Inventory & Replenishment")
overstock = df.copy()
overstock["recommended_order_qty"] = (
1.2 * overstock["daily_demand"] * (1 + overstock["demand_variability"])
- overstock["leftover_units"]
)
overstock.loc[overstock["shelf_life_days"] <= 7, "recommended_order_qty"] *= 0.7
overstock.loc[overstock["spoilage_risk"] >= overstock["spoilage_risk"].quantile(0.75), "recommended_order_qty"] *= 0.8
overstock["recommended_order_qty"] = overstock["recommended_order_qty"].clip(lower=0).round()
c1, c2 = st.columns([1.3, 1])
with c1:
category_summary = overstock.groupby("category")[["initial_quantity", "recommended_order_qty", "waste_pct", "profit"]].mean().reset_index()
category_summary["order_reduction_pct"] = 1 - category_summary["recommended_order_qty"] / category_summary["initial_quantity"]
fig = px.bar(
category_summary.sort_values("order_reduction_pct", ascending=False),
x="order_reduction_pct",
y="category",
orientation="h",
title="Recommended Order Reduction by Category",
)
st.plotly_chart(fig, use_container_width=True)
with c2:
st.markdown("**Action shortlist**")
shortlist = overstock.sort_values(["waste_pct", "stock_demand_ratio"], ascending=[False, False])[[
"store_id", "product_name", "category", "initial_quantity", "daily_demand",
"days_until_expiry", "waste_pct", "recommended_order_qty"
]].head(20)
st.dataframe(shortlist, use_container_width=True, hide_index=True)
st.markdown("### What-if Simulator")
col1, col2, col3 = st.columns(3)
selected_category = col1.selectbox("Category for simulation", sorted(df["category"].unique()))
order_cut = col2.slider("Reduce order quantity by %", 0, 40, 10)
markdown_shift = col3.slider("Advance markdown trigger by days", 0, 5, 2)
sim = df[df["category"] == selected_category].copy()
current_waste = sim["waste_pct"].mean()
current_profit = sim["profit"].mean()
waste_reduction = 0.35 * (order_cut / 100) + 0.015 * markdown_shift
sim_waste = max(current_waste * (1 - waste_reduction), 0)
sim_profit = current_profit * (1 + 0.08 * (order_cut / 100) + 0.03 * markdown_shift)
s1, s2, s3 = st.columns(3)
s1.metric("Current waste", f"{current_waste:.1%}")
s2.metric("Simulated waste", f"{sim_waste:.1%}", delta=f"-{(current_waste-sim_waste):.1%}")
s3.metric("Simulated avg profit", f"€{sim_profit:.2f}", delta=f"€{(sim_profit-current_profit):.2f}")
def manager_promotions(df: pd.DataFrame):
st.subheader("Promotion Designer")
left, right = st.columns([1, 1.2])
with left:
promo_category = st.selectbox("Promotion category", sorted(df["category"].unique()), key="promo_cat")
expiry_target = st.selectbox("Target expiry bucket", ["<=1d", "2-3d", "4-7d", "8-30d", ">30d"])
discount = st.slider("Discount %", 0, 50, 18)
bundle = st.checkbox("Bundle with complementary items", value=True)
weekend_only = st.checkbox("Weekend campaign only", value=False)
sub = df[(df["category"] == promo_category) & (df["expiry_bucket"].astype(str) == expiry_target)].copy()
if weekend_only:
sub = sub[sub["is_weekend"] == 1]
demand_lift = 0.08 + discount / 200
if bundle:
demand_lift += 0.06
est_sales_uplift = sub["units_sold"].mean() * demand_lift if len(sub) else 0
est_waste_drop = sub["waste_pct"].mean() * min(0.35, demand_lift) if len(sub) else 0
est_profit = sub["profit"].mean() * (1 + demand_lift - discount / 150) if len(sub) else 0
st.metric("Estimated sales uplift", f"{est_sales_uplift:.2f} units")
st.metric("Estimated waste reduction", f"{est_waste_drop:.1%}")
st.metric("Estimated avg profit", f"€{est_profit:.2f}")
with right:
promo_base = df.groupby(["expiry_bucket"])[["discount_pct", "waste_pct", "profit"]].mean().reset_index()
fig = px.bar(promo_base, x="expiry_bucket", y=["discount_pct", "waste_pct"], barmode="group", title="Current Discount vs Waste by Expiry")
st.plotly_chart(fig, use_container_width=True)
st.markdown("**Recommended promotion copy**")
st.info(
f"Run a {discount}% {promo_category} campaign for {expiry_target} items"
+ (" on weekends" if weekend_only else "")
+ (" with bundle offers" if bundle else " as single-item markdown")
+ ". Position the offer at high-traffic display zones and highlight value + freshness."
)
def manager_risk(df: pd.DataFrame):
st.subheader("Risk & Store Operations")
_, importances = fit_risk_model(df)
c1, c2 = st.columns([1.1, 1])
with c1:
fig = px.bar(importances.head(10).sort_values(), orientation="h", title="Top Drivers of High Waste Risk")
st.plotly_chart(fig, use_container_width=True)
with c2:
heat = df.groupby(["region", "category"])["temp_deviation"].mean().reset_index()
fig = px.density_heatmap(heat, x="category", y="region", z="temp_deviation", title="Temperature Deviation Heatmap")
st.plotly_chart(fig, use_container_width=True)
alerts = (
df.groupby("store_id")[["temp_deviation", "temp_abuse_events", "waste_pct", "profit"]]
.mean()
.assign(alert_score=lambda x: 0.35 * x["temp_deviation"] + 0.25 * x["temp_abuse_events"] + 0.4 * x["waste_pct"] * 10)
.sort_values("alert_score", ascending=False)
.head(15)
.reset_index()
)
st.markdown("### Automated store alerts")
st.dataframe(alerts, use_container_width=True, hide_index=True)
def consumer_deals(df: pd.DataFrame):
st.subheader("Consumer Mode")
c1, c2, c3 = st.columns(3)
max_budget = c1.slider("Budget (€)", 5, 60, 20)
preferred_category = c2.selectbox("Preferred category", ["All"] + sorted(df["category"].unique()))
max_expiry = c3.slider("Maximum days until expiry", 1, 14, 5)
deals = df[df["days_until_expiry"] <= max_expiry].copy()
if preferred_category != "All":
deals = deals[deals["category"] == preferred_category]
deals = deals.assign(
savings=lambda x: x["base_price"] - x["selling_price"],
deal_score=lambda x: x["discount_pct"] * 0.5 + x["value_score"] * 0.35 + (x["profit_margin_pct"].clip(lower=0) / 100) * 0.15,
).sort_values(["deal_score", "savings"], ascending=False)
display = deals[[
"product_name", "category", "store_id", "days_until_expiry",
"base_price", "selling_price", "discount_pct", "savings"
]].head(25)
st.dataframe(display, use_container_width=True, hide_index=True)
fig = px.scatter(
deals.head(500), x="selling_price", y="discount_pct", color="category",
hover_data=["product_name", "store_id", "days_until_expiry"],
title="Discounted Items Map"
)
st.plotly_chart(fig, use_container_width=True)
affordable = deals[deals["selling_price"] <= max_budget].head(10)
if not affordable.empty:
st.markdown("### Best picks for your budget")
for _, row in affordable.iterrows():
st.success(
f"Now €{row['selling_price']:.2f} (save €{row['base_price'] - row['selling_price']:.2f}) · expires in {int(row['days_until_expiry'])} day(s)"
)
st.markdown(
f"""
🛒 **{row['product_name']}**
📦 Category: {row['category']}
🏪 Store: {row['store_id']}
💸 Discount: {row['discount_pct']*100:.0f}%
⏳ Expiry: {row['days_until_expiry']} days
"""
)
def build_bundle(df: pd.DataFrame, budget: float, people: int, theme: str):
work = df.copy()
work = work[work["days_until_expiry"] <= 7].copy()
work["score"] = work["value_score"] + work["discount_pct"]
theme_map = {
"Quick dinner": ["Ready_to_Eat", "Produce", "Bakery", "Dairy"],
"Healthy protein": ["Meat", "Seafood", "Dairy", "Produce"],
"Family breakfast": ["Bakery", "Dairy", "Beverages", "Produce"],
"Budget saver": list(work["category"].unique()),
}
cats = theme_map.get(theme, list(work["category"].unique()))
work = work[work["category"].isin(cats)].sort_values(["score", "selling_price"], ascending=[False, True])
chosen = []
remaining = budget
target_items = min(max(people + 1, 3), 6)
used_categories = set()
for _, row in work.iterrows():
if row["selling_price"] <= remaining:
if theme != "Budget saver" and row["category"] in used_categories:
continue
chosen.append(row)
remaining -= row["selling_price"]
used_categories.add(row["category"])
if len(chosen) >= target_items:
break
if not chosen:
return pd.DataFrame(), 0.0, 0.0
bundle = pd.DataFrame(chosen)
total = bundle["selling_price"].sum()
saved = (bundle["base_price"] - bundle["selling_price"]).sum()
return bundle, total, saved
def consumer_bundles(df: pd.DataFrame):
st.subheader("Bundle Builder")
c1, c2, c3 = st.columns(3)
budget = c1.slider("Bundle budget (€)", 8, 80, 25)
people = c2.slider("People", 1, 6, 2)
theme = c3.selectbox("Bundle theme", ["Quick dinner", "Healthy protein", "Family breakfast", "Budget saver"])
bundle, total, saved = build_bundle(df, budget, people, theme)
if bundle.empty:
st.warning("No bundle found for the current filters.")
return
k1, k2, k3 = st.columns(3)
k1.metric("Bundle total", f"€{total:.2f}")
k2.metric("You save", f"€{saved:.2f}")
k3.metric("Items", f"{len(bundle)}")
st.dataframe(bundle[[
"product_name", "category", "store_id", "selling_price", "base_price", "discount_pct", "days_until_expiry"
]], use_container_width=True, hide_index=True)
st.info(
"Suggested marketing use: turn these bundles into one-click promotions for end customers or pre-designed campaign packs for store managers."
)
def consumer_personal(df: pd.DataFrame):
st.subheader("Personalized Promotions")
favorite = st.selectbox("Favorite category", sorted(df["category"].unique()))
price_cap = st.slider("Max item price (€)", 1, 30, 10)
not_too_close = st.checkbox("Hide items expiring within 1 day", value=False)
recs = df[df["category"] == favorite].copy()
recs = recs[recs["selling_price"] <= price_cap]
if not_too_close:
recs = recs[recs["days_until_expiry"] > 1]
recs = recs.assign(score=lambda x: x["discount_pct"] * 0.55 + x["value_score"] * 0.45).sort_values("score", ascending=False).head(12)
cols = st.columns(3)
for i, (_, row) in enumerate(recs.iterrows()):
with cols[i % 3]:
st.markdown(f"### {row['product_name']}")
st.write(f"{row['category']} · {row['store_id']}")
st.write(f"Now **€{row['selling_price']:.2f}** | Save **€{(row['base_price'] - row['selling_price']):.2f}**")
st.write(f"Expires in {int(row['days_until_expiry'])} day(s)")
st.button("Add to shortlist", key=f"short_{i}")
def main():
st.title("🥗 FreshWise")
st.caption("Perishable retail optimization for managers and consumers")
try:
df = load_data()
except Exception as e:
st.error(str(e))
st.stop()
filtered = apply_filters(df)
if filtered.empty:
st.warning("No data left after filtering.")
st.stop()
role = st.radio("Choose your mode", ["Manager", "Consumer"], horizontal=True)
if role == "Manager":
tabs = st.tabs([
"Executive Dashboard",
"Inventory & Replenishment",
"Promotion Designer",
"Risk Monitor",
])
with tabs[0]:
manager_dashboard(filtered)
with tabs[1]:
manager_inventory(filtered)
with tabs[2]:
manager_promotions(filtered)
with tabs[3]:
manager_risk(filtered)
else:
tabs = st.tabs([
"Deal Finder",
"Bundle Builder",
"Personalized Promotions",
])
with tabs[0]:
consumer_deals(filtered)
with tabs[1]:
consumer_bundles(filtered)
with tabs[2]:
consumer_personal(filtered)
with st.expander("About this app"):
st.markdown(
"""
- **Manager mode** turns data into inventory, markdown, and operational decisions.
- **Consumer mode** surfaces discounted products, smart bundles, and personalized promotions.
- Built for deployment on Hugging Face Docker Spaces with Streamlit.
"""
)
if __name__ == "__main__":
main()