| import os |
| from functools import lru_cache |
|
|
| import numpy as np |
| import pandas as pd |
| import plotly.express as px |
| import plotly.graph_objects as go |
| import streamlit as st |
| from sklearn.cluster import KMeans |
| from sklearn.ensemble import RandomForestClassifier |
| from sklearn.model_selection import train_test_split |
| from sklearn.preprocessing import StandardScaler |
|
|
| st.set_page_config( |
| page_title="FreshWise - Perishable Retail Optimization", |
| page_icon="🥗", |
| layout="wide", |
| initial_sidebar_state="expanded", |
| ) |
|
|
| DATA_CANDIDATES = [ |
| os.environ.get("DATA_PATH", ""), |
| "perishable_goods_management.csv", |
| "/app/perishable_goods_management.csv", |
| "/data/perishable_goods_management.csv", |
| "/mnt/data/perishable_goods_management.csv", |
| ] |
|
|
| CATEGORY_COLORS = { |
| "Produce": "#2E8B57", |
| "Dairy": "#1E90FF", |
| "Meat": "#B22222", |
| "Seafood": "#20B2AA", |
| "Bakery": "#D2691E", |
| "Ready_to_Eat": "#8A2BE2", |
| } |
|
|
|
|
| def find_data_path() -> str: |
| for path in DATA_CANDIDATES: |
| if path and os.path.exists(path): |
| return path |
| raise FileNotFoundError( |
| "perishable_goods_management.csv not found. Put it next to app.py or set DATA_PATH." |
| ) |
|
|
|
|
| @st.cache_data(show_spinner=False) |
| def load_data() -> pd.DataFrame: |
| path = find_data_path() |
| df = pd.read_csv(path) |
|
|
| df["transaction_date"] = pd.to_datetime(df["transaction_date"], errors="coerce") |
| df["expiration_date"] = pd.to_datetime(df["expiration_date"], errors="coerce") |
|
|
| df["sell_through_pct"] = np.where( |
| df["initial_quantity"] > 0, df["units_sold"] / df["initial_quantity"], 0 |
| ) |
| df["stock_demand_ratio"] = np.where( |
| df["daily_demand"] > 0, df["initial_quantity"] / df["daily_demand"], np.nan |
| ) |
| df["gross_margin"] = df["selling_price"] - df["cost_price"] |
| df["leftover_units"] = (df["initial_quantity"] - df["units_sold"]).clip(lower=0) |
| df["value_score"] = ( |
| (1 - df["waste_pct"].clip(0, 1)) * 0.35 |
| + df["profit_margin_pct"].clip(lower=0) / 100 * 0.25 |
| + (1 - df["days_until_expiry"].clip(upper=14) / 14) * 0.15 |
| + df["discount_pct"].clip(0, 0.5) * 0.25 |
| ) |
| df["expiry_bucket"] = pd.cut( |
| df["days_until_expiry"], |
| bins=[-1, 1, 3, 7, 30, 10_000], |
| labels=["<=1d", "2-3d", "4-7d", "8-30d", ">30d"], |
| ) |
| df["high_waste_flag"] = (df["waste_pct"] >= df["waste_pct"].quantile(0.75)).astype(int) |
| return df |
|
|
|
|
| @st.cache_data(show_spinner=False) |
| def fit_segments(df: pd.DataFrame) -> pd.DataFrame: |
| work = df[[ |
| "daily_demand", |
| "initial_quantity", |
| "waste_pct", |
| "shelf_life_days", |
| "stock_demand_ratio", |
| "sell_through_pct", |
| ]].replace([np.inf, -np.inf], np.nan).dropna().copy() |
|
|
| sample_size = min(len(work), 20000) |
| work = work.sample(sample_size, random_state=42) |
| scaler = StandardScaler() |
| X = scaler.fit_transform(work) |
| km = KMeans(n_clusters=4, random_state=42, n_init=10) |
| work["cluster"] = km.fit_predict(X) |
| return work |
|
|
|
|
| @st.cache_resource(show_spinner=False) |
| def fit_risk_model(df: pd.DataFrame): |
| features = [ |
| "daily_demand", |
| "initial_quantity", |
| "shelf_life_days", |
| "days_until_expiry", |
| "temp_deviation", |
| "temp_abuse_events", |
| "handling_score", |
| "packaging_score", |
| "spoilage_risk", |
| "discount_pct", |
| "markdown_applied", |
| "is_weekend", |
| "supplier_score", |
| ] |
| X = df[features] |
| y = df["high_waste_flag"] |
| X_train, X_test, y_train, y_test = train_test_split( |
| X, y, test_size=0.2, random_state=42, stratify=y |
| ) |
| model = RandomForestClassifier( |
| n_estimators=120, random_state=42, n_jobs=-1, max_depth=10 |
| ) |
| model.fit(X_train, y_train) |
| importances = pd.Series(model.feature_importances_, index=features).sort_values(ascending=False) |
| return model, importances |
|
|
|
|
| @lru_cache(maxsize=1) |
| def cluster_name_map(): |
| return { |
| 0: "Stable performers", |
| 1: "Overstocked slow movers", |
| 2: "Short-life high risk", |
| 3: "High demand fast movers", |
| } |
|
|
|
|
| def apply_filters(df: pd.DataFrame): |
| st.sidebar.header("Filters") |
| regions = st.sidebar.multiselect("Region", sorted(df["region"].dropna().unique()), default=[]) |
| stores = st.sidebar.multiselect("Store", sorted(df["store_id"].dropna().unique())[:200], default=[]) |
| categories = st.sidebar.multiselect("Category", sorted(df["category"].dropna().unique()), default=[]) |
| expiry_range = st.sidebar.slider("Days until expiry", 0, int(df["days_until_expiry"].max()), (0, 30)) |
| weekend_choice = st.sidebar.selectbox("Day type", ["All", "Weekday", "Weekend"]) |
|
|
| filtered = df.copy() |
| if regions: |
| filtered = filtered[filtered["region"].isin(regions)] |
| if stores: |
| filtered = filtered[filtered["store_id"].isin(stores)] |
| if categories: |
| filtered = filtered[filtered["category"].isin(categories)] |
| filtered = filtered[ |
| (filtered["days_until_expiry"] >= expiry_range[0]) |
| & (filtered["days_until_expiry"] <= expiry_range[1]) |
| ] |
| if weekend_choice == "Weekday": |
| filtered = filtered[filtered["is_weekend"] == 0] |
| elif weekend_choice == "Weekend": |
| filtered = filtered[filtered["is_weekend"] == 1] |
| return filtered |
|
|
|
|
| def metric_row(df: pd.DataFrame): |
| c1, c2, c3, c4, c5 = st.columns(5) |
| c1.metric("Waste %", f"{df['waste_pct'].mean():.1%}") |
| c2.metric("Profit", f"€{df['profit'].mean():.2f}") |
| c3.metric("Sell-through", f"{df['sell_through_pct'].mean():.1%}") |
| c4.metric("Units wasted", f"{df['units_wasted'].mean():.1f}") |
| c5.metric("Markdown rate", f"{df['markdown_applied'].mean():.1%}") |
|
|
|
|
| def manager_dashboard(df: pd.DataFrame): |
| st.subheader("Manager Mode") |
| metric_row(df) |
|
|
| a, b = st.columns([1.2, 1]) |
| with a: |
| trend = df.groupby(df["transaction_date"].dt.to_period("M").astype(str))[["waste_pct", "profit"]].mean().reset_index() |
| fig = go.Figure() |
| fig.add_trace(go.Scatter(x=trend["transaction_date"], y=trend["waste_pct"], name="Waste %", mode="lines+markers")) |
| fig.add_trace(go.Scatter(x=trend["transaction_date"], y=trend["profit"], name="Profit", mode="lines+markers", yaxis="y2")) |
| fig.update_layout( |
| title="Monthly Waste and Profit Trend", |
| yaxis=dict(title="Waste %"), |
| yaxis2=dict(title="Profit", overlaying="y", side="right"), |
| legend=dict(orientation="h"), |
| margin=dict(l=10, r=10, t=40, b=10), |
| ) |
| st.plotly_chart(fig, use_container_width=True) |
| with b: |
| top_risk = ( |
| df.groupby("category")[["waste_pct", "profit", "stock_demand_ratio"]] |
| .mean() |
| .sort_values("waste_pct", ascending=False) |
| .head(8) |
| .reset_index() |
| ) |
| fig = px.bar(top_risk, x="waste_pct", y="category", orientation="h", title="High Waste Categories") |
| st.plotly_chart(fig, use_container_width=True) |
|
|
| c1, c2 = st.columns(2) |
| with c1: |
| store_risk = ( |
| df.groupby("store_id")[["waste_pct", "profit", "temp_deviation"]] |
| .mean() |
| .sort_values(["waste_pct", "temp_deviation"], ascending=[False, False]) |
| .head(15) |
| .reset_index() |
| ) |
| st.dataframe(store_risk, use_container_width=True, hide_index=True) |
| with c2: |
| expiry = df.groupby("expiry_bucket")[["waste_pct", "profit", "discount_pct"]].mean().reset_index() |
| fig = px.line(expiry, x="expiry_bucket", y=["waste_pct", "profit", "discount_pct"], markers=True, title="Expiry Stage Performance") |
| st.plotly_chart(fig, use_container_width=True) |
|
|
|
|
| def manager_inventory(df: pd.DataFrame): |
| st.subheader("Inventory & Replenishment") |
|
|
| overstock = df.copy() |
| overstock["recommended_order_qty"] = ( |
| 1.2 * overstock["daily_demand"] * (1 + overstock["demand_variability"]) |
| - overstock["leftover_units"] |
| ) |
| overstock.loc[overstock["shelf_life_days"] <= 7, "recommended_order_qty"] *= 0.7 |
| overstock.loc[overstock["spoilage_risk"] >= overstock["spoilage_risk"].quantile(0.75), "recommended_order_qty"] *= 0.8 |
| overstock["recommended_order_qty"] = overstock["recommended_order_qty"].clip(lower=0).round() |
|
|
| c1, c2 = st.columns([1.3, 1]) |
| with c1: |
| category_summary = overstock.groupby("category")[["initial_quantity", "recommended_order_qty", "waste_pct", "profit"]].mean().reset_index() |
| category_summary["order_reduction_pct"] = 1 - category_summary["recommended_order_qty"] / category_summary["initial_quantity"] |
| fig = px.bar( |
| category_summary.sort_values("order_reduction_pct", ascending=False), |
| x="order_reduction_pct", |
| y="category", |
| orientation="h", |
| title="Recommended Order Reduction by Category", |
| ) |
| st.plotly_chart(fig, use_container_width=True) |
| with c2: |
| st.markdown("**Action shortlist**") |
| shortlist = overstock.sort_values(["waste_pct", "stock_demand_ratio"], ascending=[False, False])[[ |
| "store_id", "product_name", "category", "initial_quantity", "daily_demand", |
| "days_until_expiry", "waste_pct", "recommended_order_qty" |
| ]].head(20) |
| st.dataframe(shortlist, use_container_width=True, hide_index=True) |
|
|
| st.markdown("### What-if Simulator") |
| col1, col2, col3 = st.columns(3) |
| selected_category = col1.selectbox("Category for simulation", sorted(df["category"].unique())) |
| order_cut = col2.slider("Reduce order quantity by %", 0, 40, 10) |
| markdown_shift = col3.slider("Advance markdown trigger by days", 0, 5, 2) |
|
|
| sim = df[df["category"] == selected_category].copy() |
| current_waste = sim["waste_pct"].mean() |
| current_profit = sim["profit"].mean() |
|
|
| waste_reduction = 0.35 * (order_cut / 100) + 0.015 * markdown_shift |
| sim_waste = max(current_waste * (1 - waste_reduction), 0) |
| sim_profit = current_profit * (1 + 0.08 * (order_cut / 100) + 0.03 * markdown_shift) |
|
|
| s1, s2, s3 = st.columns(3) |
| s1.metric("Current waste", f"{current_waste:.1%}") |
| s2.metric("Simulated waste", f"{sim_waste:.1%}", delta=f"-{(current_waste-sim_waste):.1%}") |
| s3.metric("Simulated avg profit", f"€{sim_profit:.2f}", delta=f"€{(sim_profit-current_profit):.2f}") |
|
|
|
|
| def manager_promotions(df: pd.DataFrame): |
| st.subheader("Promotion Designer") |
| left, right = st.columns([1, 1.2]) |
| with left: |
| promo_category = st.selectbox("Promotion category", sorted(df["category"].unique()), key="promo_cat") |
| expiry_target = st.selectbox("Target expiry bucket", ["<=1d", "2-3d", "4-7d", "8-30d", ">30d"]) |
| discount = st.slider("Discount %", 0, 50, 18) |
| bundle = st.checkbox("Bundle with complementary items", value=True) |
| weekend_only = st.checkbox("Weekend campaign only", value=False) |
|
|
| sub = df[(df["category"] == promo_category) & (df["expiry_bucket"].astype(str) == expiry_target)].copy() |
| if weekend_only: |
| sub = sub[sub["is_weekend"] == 1] |
|
|
| demand_lift = 0.08 + discount / 200 |
| if bundle: |
| demand_lift += 0.06 |
|
|
| est_sales_uplift = sub["units_sold"].mean() * demand_lift if len(sub) else 0 |
| est_waste_drop = sub["waste_pct"].mean() * min(0.35, demand_lift) if len(sub) else 0 |
| est_profit = sub["profit"].mean() * (1 + demand_lift - discount / 150) if len(sub) else 0 |
|
|
| st.metric("Estimated sales uplift", f"{est_sales_uplift:.2f} units") |
| st.metric("Estimated waste reduction", f"{est_waste_drop:.1%}") |
| st.metric("Estimated avg profit", f"€{est_profit:.2f}") |
|
|
| with right: |
| promo_base = df.groupby(["expiry_bucket"])[["discount_pct", "waste_pct", "profit"]].mean().reset_index() |
| fig = px.bar(promo_base, x="expiry_bucket", y=["discount_pct", "waste_pct"], barmode="group", title="Current Discount vs Waste by Expiry") |
| st.plotly_chart(fig, use_container_width=True) |
|
|
| st.markdown("**Recommended promotion copy**") |
| st.info( |
| f"Run a {discount}% {promo_category} campaign for {expiry_target} items" |
| + (" on weekends" if weekend_only else "") |
| + (" with bundle offers" if bundle else " as single-item markdown") |
| + ". Position the offer at high-traffic display zones and highlight value + freshness." |
| ) |
|
|
|
|
| def manager_risk(df: pd.DataFrame): |
| st.subheader("Risk & Store Operations") |
| _, importances = fit_risk_model(df) |
| c1, c2 = st.columns([1.1, 1]) |
| with c1: |
| fig = px.bar(importances.head(10).sort_values(), orientation="h", title="Top Drivers of High Waste Risk") |
| st.plotly_chart(fig, use_container_width=True) |
| with c2: |
| heat = df.groupby(["region", "category"])["temp_deviation"].mean().reset_index() |
| fig = px.density_heatmap(heat, x="category", y="region", z="temp_deviation", title="Temperature Deviation Heatmap") |
| st.plotly_chart(fig, use_container_width=True) |
|
|
| alerts = ( |
| df.groupby("store_id")[["temp_deviation", "temp_abuse_events", "waste_pct", "profit"]] |
| .mean() |
| .assign(alert_score=lambda x: 0.35 * x["temp_deviation"] + 0.25 * x["temp_abuse_events"] + 0.4 * x["waste_pct"] * 10) |
| .sort_values("alert_score", ascending=False) |
| .head(15) |
| .reset_index() |
| ) |
| st.markdown("### Automated store alerts") |
| st.dataframe(alerts, use_container_width=True, hide_index=True) |
|
|
|
|
| def consumer_deals(df: pd.DataFrame): |
| st.subheader("Consumer Mode") |
| c1, c2, c3 = st.columns(3) |
| max_budget = c1.slider("Budget (€)", 5, 60, 20) |
| preferred_category = c2.selectbox("Preferred category", ["All"] + sorted(df["category"].unique())) |
| max_expiry = c3.slider("Maximum days until expiry", 1, 14, 5) |
|
|
| deals = df[df["days_until_expiry"] <= max_expiry].copy() |
| if preferred_category != "All": |
| deals = deals[deals["category"] == preferred_category] |
| deals = deals.assign( |
| savings=lambda x: x["base_price"] - x["selling_price"], |
| deal_score=lambda x: x["discount_pct"] * 0.5 + x["value_score"] * 0.35 + (x["profit_margin_pct"].clip(lower=0) / 100) * 0.15, |
| ).sort_values(["deal_score", "savings"], ascending=False) |
|
|
| display = deals[[ |
| "product_name", "category", "store_id", "days_until_expiry", |
| "base_price", "selling_price", "discount_pct", "savings" |
| ]].head(25) |
| st.dataframe(display, use_container_width=True, hide_index=True) |
|
|
| fig = px.scatter( |
| deals.head(500), x="selling_price", y="discount_pct", color="category", |
| hover_data=["product_name", "store_id", "days_until_expiry"], |
| title="Discounted Items Map" |
| ) |
| st.plotly_chart(fig, use_container_width=True) |
|
|
| affordable = deals[deals["selling_price"] <= max_budget].head(10) |
| if not affordable.empty: |
| st.markdown("### Best picks for your budget") |
| for _, row in affordable.iterrows(): |
| st.success( |
| f"Now €{row['selling_price']:.2f} (save €{row['base_price'] - row['selling_price']:.2f}) · expires in {int(row['days_until_expiry'])} day(s)" |
| ) |
| st.markdown( |
| f""" |
| 🛒 **{row['product_name']}** |
| 📦 Category: {row['category']} |
| 🏪 Store: {row['store_id']} |
| 💸 Discount: {row['discount_pct']*100:.0f}% |
| ⏳ Expiry: {row['days_until_expiry']} days |
| """ |
| ) |
|
|
|
|
| def build_bundle(df: pd.DataFrame, budget: float, people: int, theme: str): |
| work = df.copy() |
| work = work[work["days_until_expiry"] <= 7].copy() |
| work["score"] = work["value_score"] + work["discount_pct"] |
|
|
| theme_map = { |
| "Quick dinner": ["Ready_to_Eat", "Produce", "Bakery", "Dairy"], |
| "Healthy protein": ["Meat", "Seafood", "Dairy", "Produce"], |
| "Family breakfast": ["Bakery", "Dairy", "Beverages", "Produce"], |
| "Budget saver": list(work["category"].unique()), |
| } |
| cats = theme_map.get(theme, list(work["category"].unique())) |
| work = work[work["category"].isin(cats)].sort_values(["score", "selling_price"], ascending=[False, True]) |
|
|
| chosen = [] |
| remaining = budget |
| target_items = min(max(people + 1, 3), 6) |
| used_categories = set() |
|
|
| for _, row in work.iterrows(): |
| if row["selling_price"] <= remaining: |
| if theme != "Budget saver" and row["category"] in used_categories: |
| continue |
| chosen.append(row) |
| remaining -= row["selling_price"] |
| used_categories.add(row["category"]) |
| if len(chosen) >= target_items: |
| break |
|
|
| if not chosen: |
| return pd.DataFrame(), 0.0, 0.0 |
| bundle = pd.DataFrame(chosen) |
| total = bundle["selling_price"].sum() |
| saved = (bundle["base_price"] - bundle["selling_price"]).sum() |
| return bundle, total, saved |
|
|
|
|
| def consumer_bundles(df: pd.DataFrame): |
| st.subheader("Bundle Builder") |
| c1, c2, c3 = st.columns(3) |
| budget = c1.slider("Bundle budget (€)", 8, 80, 25) |
| people = c2.slider("People", 1, 6, 2) |
| theme = c3.selectbox("Bundle theme", ["Quick dinner", "Healthy protein", "Family breakfast", "Budget saver"]) |
|
|
| bundle, total, saved = build_bundle(df, budget, people, theme) |
| if bundle.empty: |
| st.warning("No bundle found for the current filters.") |
| return |
|
|
| k1, k2, k3 = st.columns(3) |
| k1.metric("Bundle total", f"€{total:.2f}") |
| k2.metric("You save", f"€{saved:.2f}") |
| k3.metric("Items", f"{len(bundle)}") |
|
|
| st.dataframe(bundle[[ |
| "product_name", "category", "store_id", "selling_price", "base_price", "discount_pct", "days_until_expiry" |
| ]], use_container_width=True, hide_index=True) |
|
|
| st.info( |
| "Suggested marketing use: turn these bundles into one-click promotions for end customers or pre-designed campaign packs for store managers." |
| ) |
|
|
|
|
| def consumer_personal(df: pd.DataFrame): |
| st.subheader("Personalized Promotions") |
| favorite = st.selectbox("Favorite category", sorted(df["category"].unique())) |
| price_cap = st.slider("Max item price (€)", 1, 30, 10) |
| not_too_close = st.checkbox("Hide items expiring within 1 day", value=False) |
|
|
| recs = df[df["category"] == favorite].copy() |
| recs = recs[recs["selling_price"] <= price_cap] |
| if not_too_close: |
| recs = recs[recs["days_until_expiry"] > 1] |
| recs = recs.assign(score=lambda x: x["discount_pct"] * 0.55 + x["value_score"] * 0.45).sort_values("score", ascending=False).head(12) |
|
|
| cols = st.columns(3) |
| for i, (_, row) in enumerate(recs.iterrows()): |
| with cols[i % 3]: |
| st.markdown(f"### {row['product_name']}") |
| st.write(f"{row['category']} · {row['store_id']}") |
| st.write(f"Now **€{row['selling_price']:.2f}** | Save **€{(row['base_price'] - row['selling_price']):.2f}**") |
| st.write(f"Expires in {int(row['days_until_expiry'])} day(s)") |
| st.button("Add to shortlist", key=f"short_{i}") |
|
|
|
|
| def main(): |
| st.title("🥗 FreshWise") |
| st.caption("Perishable retail optimization for managers and consumers") |
|
|
| try: |
| df = load_data() |
| except Exception as e: |
| st.error(str(e)) |
| st.stop() |
|
|
| filtered = apply_filters(df) |
| if filtered.empty: |
| st.warning("No data left after filtering.") |
| st.stop() |
|
|
| role = st.radio("Choose your mode", ["Manager", "Consumer"], horizontal=True) |
|
|
| if role == "Manager": |
| tabs = st.tabs([ |
| "Executive Dashboard", |
| "Inventory & Replenishment", |
| "Promotion Designer", |
| "Risk Monitor", |
| ]) |
| with tabs[0]: |
| manager_dashboard(filtered) |
| with tabs[1]: |
| manager_inventory(filtered) |
| with tabs[2]: |
| manager_promotions(filtered) |
| with tabs[3]: |
| manager_risk(filtered) |
| else: |
| tabs = st.tabs([ |
| "Deal Finder", |
| "Bundle Builder", |
| "Personalized Promotions", |
| ]) |
| with tabs[0]: |
| consumer_deals(filtered) |
| with tabs[1]: |
| consumer_bundles(filtered) |
| with tabs[2]: |
| consumer_personal(filtered) |
|
|
| with st.expander("About this app"): |
| st.markdown( |
| """ |
| - **Manager mode** turns data into inventory, markdown, and operational decisions. |
| - **Consumer mode** surfaces discounted products, smart bundles, and personalized promotions. |
| - Built for deployment on Hugging Face Docker Spaces with Streamlit. |
| """ |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|