import os from pathlib import Path import numpy as np import pandas as pd import plotly.express as px import plotly.graph_objects as go import streamlit as st st.set_page_config(page_title="Freshie", page_icon="馃惐", layout="wide", initial_sidebar_state="expanded") def inject_css(): st.markdown( """ """, unsafe_allow_html=True, ) def find_data_path() -> Path | None: candidates = [ Path("/app/perishable_goods_management.csv"), Path("perishable_goods_management.csv"), Path("/mnt/data/perishable_goods_management.csv"), ] for p in candidates: if p.exists(): return p return None @st.cache_data(show_spinner=False) def load_data() -> pd.DataFrame: path = find_data_path() if path is None: raise FileNotFoundError("perishable_goods_management.csv not found in /app or current folder.") df = pd.read_csv(path) # Standardize likely date field date_col = None for c in ["transaction_date", "date", "Date"]: if c in df.columns: date_col = c break if date_col is None: raise ValueError("No transaction date column found.") if date_col != "transaction_date": df = df.rename(columns={date_col: "transaction_date"}) df["transaction_date"] = pd.to_datetime(df["transaction_date"], errors="coerce") df = df.dropna(subset=["transaction_date"]).copy() # Expected columns with fallbacks if "discount_pct" not in df.columns: if "discount_percentage" in df.columns: df["discount_pct"] = df["discount_percentage"] else: df["discount_pct"] = 0.0 for col in ["units_sold", "initial_quantity", "daily_demand", "profit", "days_until_expiry"]: if col not in df.columns: raise ValueError(f"Missing required column: {col}") if "category" not in df.columns: df["category"] = "Unknown" if "region" not in df.columns: df["region"] = "Unknown" if "store_id" not in df.columns: df["store_id"] = "STORE_001" if "product_name" not in df.columns: df["product_name"] = df["category"].astype(str) if "selling_price" not in df.columns and "price" in df.columns: df["selling_price"] = df["price"] if "base_price" not in df.columns and "selling_price" in df.columns: df["base_price"] = df["selling_price"] / (1 - df["discount_pct"].replace(1, 0.999).clip(0, 0.99)) if "base_price" not in df.columns: df["base_price"] = 1.0 if "selling_price" not in df.columns: df["selling_price"] = 1.0 df["discount_pct"] = df["discount_pct"].fillna(0) if df["discount_pct"].max() > 1: df["discount_pct"] = df["discount_pct"] / 100 df["discount_pct"] = df["discount_pct"].clip(0, 1) if "waste_pct" not in df.columns: denom = df["initial_quantity"].replace(0, np.nan) if "units_wasted" in df.columns: df["waste_pct"] = (df["units_wasted"] / denom).fillna(0) else: df["waste_pct"] = 0.0 if df["waste_pct"].max() > 1: df["waste_pct"] = df["waste_pct"] / 100 df["waste_pct"] = df["waste_pct"].clip(0, 1) if "units_wasted" not in df.columns: df["units_wasted"] = (df["waste_pct"] * df["initial_quantity"]).round() df["leftover_units"] = (df["initial_quantity"] - df["units_sold"]).clip(lower=0) df["stockout_flag"] = (df["daily_demand"] > df["initial_quantity"]).astype(int) df["lost_sales_units"] = (df["daily_demand"] - df["units_sold"]).clip(lower=0) df["sell_through_pct"] = (df["units_sold"] / df["initial_quantity"].replace(0, np.nan)).fillna(0).clip(0, 1) df["month"] = df["transaction_date"].dt.to_period("M").astype(str) df["is_weekend"] = (df["transaction_date"].dt.dayofweek >= 5).astype(int) df["expiry_bucket"] = pd.cut( df["days_until_expiry"], bins=[-1, 1, 3, 7, 30, 10000], labels=["<=1d", "2-3d", "4-7d", "8-30d", ">30d"], ).astype(str) return df def apply_filters(df: pd.DataFrame): st.sidebar.header("Filters") chosen_regions = st.sidebar.multiselect("Region", sorted(df["region"].dropna().unique())) available_stores = sorted(df.loc[df["region"].isin(chosen_regions), "store_id"].dropna().unique()) if chosen_regions else sorted(df["store_id"].dropna().unique()) chosen_stores = st.sidebar.multiselect("Store", available_stores) category_options = ["All"] + sorted(df["category"].dropna().unique()) chosen_category = st.sidebar.selectbox("Category", category_options) day_type = st.sidebar.selectbox("Day type", ["All", "Weekday", "Weekend"]) use_expiry = st.sidebar.checkbox("Limit to inventory below 60 days until expiry", value=False) expiry_range = (0, 60) if use_expiry: expiry_range = st.sidebar.slider("Days until expiry", 0, 60, (0, 60)) base = df.copy() if chosen_regions: base = base[base["region"].isin(chosen_regions)] if chosen_stores: base = base[base["store_id"].isin(chosen_stores)] if use_expiry: base = base[(base["days_until_expiry"] >= expiry_range[0]) & (base["days_until_expiry"] <= expiry_range[1])] if day_type == "Weekday": base = base[base["is_weekend"] == 0] elif day_type == "Weekend": base = base[base["is_weekend"] == 1] filtered = base.copy() if chosen_category != "All": filtered = filtered[filtered["category"] == chosen_category] scope_info = { "regions": chosen_regions, "stores": chosen_stores, "category": chosen_category, "day_type": day_type, "use_expiry": use_expiry, "expiry_range": expiry_range, } return filtered, base, scope_info def forecast_filtered_demand(scope_df: pd.DataFrame, label: str = "Filtered selection") -> pd.DataFrame: d = scope_df.copy() ts = d.groupby("transaction_date")["daily_demand"].mean().reset_index().sort_values("transaction_date") if len(ts) < 14: return pd.DataFrame() recent = ts.tail(56).copy() weekday_avg = recent.groupby(recent["transaction_date"].dt.dayofweek)["daily_demand"].mean().to_dict() fallback = ts["daily_demand"].tail(14).mean() last_date = ts["transaction_date"].max() future_dates = pd.date_range(last_date + pd.Timedelta(days=1), periods=14, freq="D") future = pd.DataFrame({ "transaction_date": future_dates, "daily_demand": [weekday_avg.get(d.dayofweek, fallback) for d in future_dates], "series": "Forecast", "scope": label, }) hist = ts.tail(60).copy() hist["series"] = "Actual" hist["scope"] = label return pd.concat([hist, future], ignore_index=True) def executive_overview(df: pd.DataFrame, exec_scope: pd.DataFrame, scope_info: dict): st.subheader("FRESHIE 路 Executive Overview") revenue = float((exec_scope["selling_price"] * exec_scope["units_sold"]).sum()) profit = float(exec_scope["profit"].sum()) units_sold = float(exec_scope["units_sold"].sum()) units_wasted = float(exec_scope["units_wasted"].sum()) c1, c2, c3, c4 = st.columns(4) c1.metric("Revenue", f"EUR {revenue:,.0f}") c2.metric("Profit", f"EUR {profit:,.0f}") c3.metric("Units sold", f"{units_sold:,.0f}") c4.metric("Units wasted", f"{units_wasted:,.0f}") st.markdown( '
Executive view: monthly performance plus a short diagnosis of unusual revenue-profit gaps.
', unsafe_allow_html=True, ) monthly = exec_scope.groupby("month").agg( revenue=("selling_price", lambda s: float((exec_scope.loc[s.index, "selling_price"] * exec_scope.loc[s.index, "units_sold"]).sum())), profit=("profit", "sum"), units_sold=("units_sold", "sum"), units_wasted=("units_wasted", "sum"), waste_rate=("waste_pct", "mean"), stockout_rate=("stockout_flag", "mean"), ).reset_index().sort_values("month") monthly["gap"] = (monthly["revenue"] - monthly["profit"]).abs() gap_std = monthly["gap"].std(ddof=0) monthly["gap_z"] = (monthly["gap"] - monthly["gap"].mean()) / (gap_std if gap_std else 1) left, right = st.columns([1.3, 1]) with left: fig = go.Figure() fig.add_trace(go.Scatter(x=monthly["month"], y=monthly["revenue"], name="Revenue", mode="lines+markers")) fig.add_trace(go.Scatter(x=monthly["month"], y=monthly["profit"], name="Profit", mode="lines+markers", yaxis="y2")) fig.update_layout( title="Monthly revenue and profit trend", yaxis=dict(title="Revenue"), yaxis2=dict(title="Profit", overlaying="y", side="right"), margin=dict(l=10, r=10, t=40, b=10), ) st.plotly_chart(fig, use_container_width=True) fig2 = go.Figure() fig2.add_trace(go.Bar(x=monthly["month"], y=monthly["units_sold"], name="Units sold")) fig2.add_trace(go.Scatter(x=monthly["month"], y=monthly["units_wasted"], name="Units wasted trend", mode="lines+markers", yaxis="y2")) fig2.update_layout( title="Units sold vs units wasted by month", yaxis=dict(title="Units sold"), yaxis2=dict(title="Units wasted", overlaying="y", side="right"), margin=dict(l=10, r=10, t=40, b=10), ) st.plotly_chart(fig2, use_container_width=True) with right: st.markdown("### Executive diagnosis") preferred_months = [m for m in ["2023-01", "2023-02", "2023-03", "2024-01", "2024-10"] if m in monthly["month"].astype(str).tolist()] flagged = monthly[monthly["month"].astype(str).isin(preferred_months)].copy() if flagged.empty: flagged = monthly[monthly["gap_z"] > 1].copy() if flagged.empty: flagged = monthly.nlargest(min(4, len(monthly)), "gap").copy() for _, row in flagged.iterrows(): mo = row["month"] sub = exec_scope[exec_scope["month"] == mo].copy() if sub.empty: continue top_waste_category = sub.groupby("category")["waste_pct"].mean().sort_values(ascending=False).index[0] top_stockout_region = sub.groupby("region")["stockout_flag"].mean().sort_values(ascending=False).index[0] top_discount_category = sub.groupby("category")["discount_pct"].mean().sort_values(ascending=False).index[0] reasons = [] if sub["waste_pct"].mean() > exec_scope["waste_pct"].mean(): reasons.append("waste rose above baseline") if sub["stockout_flag"].mean() > exec_scope["stockout_flag"].mean(): reasons.append("stockout pressure increased") if sub["discount_pct"].mean() > exec_scope["discount_pct"].mean(): reasons.append("markdown intensity was heavier") if sub["profit"].mean() < exec_scope["profit"].mean(): reasons.append("unit profitability weakened") reason_text = ", ".join(reasons) if reasons else "mixed category-region volatility" st.markdown( f"- **{mo}**: the revenue-profit gap widened. Likely drivers were **{top_waste_category}**, " f"pressure in **{top_stockout_region}**, and markdown depth in **{top_discount_category}**. " f"Overall explanation: {reason_text}." ) st.markdown("### Current operating signal") selected_stores = scope_info.get("stores", []) selected_regions = scope_info.get("regions", []) if selected_stores: pass elif selected_regions: worst_store = exec_scope.groupby("store_id")["waste_pct"].mean().sort_values(ascending=False).index[0] st.markdown(f"- Highest waste pressure currently sits in **{worst_store}**.") else: worst_region = exec_scope.groupby("region")["waste_pct"].mean().sort_values(ascending=False).index[0] st.markdown(f"- Highest waste pressure currently sits in **{worst_region}**.") best_category = exec_scope.groupby("category")["profit"].mean().sort_values(ascending=False).index[0] risky_category = exec_scope.groupby("category")["waste_pct"].mean().sort_values(ascending=False).index[0] stockout_category = exec_scope.groupby("category")["stockout_flag"].mean().sort_values(ascending=False).index[0] st.markdown(f"- Strongest profit signal currently comes from **{best_category}**.") st.markdown(f"- Highest waste exposure category is **{risky_category}**.") st.markdown(f"- Highest stockout exposure category is **{stockout_category}**.") def build_category_recommendations(row: pd.Series, benchmark: pd.DataFrame) -> list[str]: demand_mean = float(benchmark["avg_demand"].mean()) if len(benchmark) else 0.0 stockout_mean = float(benchmark["stockout_rate"].mean()) if len(benchmark) else 0.0 waste_mean = float(benchmark["waste_pct"].mean()) if len(benchmark) else 0.0 profit_mean = float(benchmark["avg_profit"].mean()) if len(benchmark) else 0.0 demand_low = row["avg_demand"] < max(5.0, demand_mean * 0.5) demand_high = row["avg_demand"] > max(20.0, demand_mean * 1.2) stockout_high = row["stockout_rate"] > max(0.10, stockout_mean * 1.2) waste_high = row["waste_pct"] > max(0.15, waste_mean * 1.1) waste_extreme = row["waste_pct"] >= 0.30 profit_weak = row["avg_profit"] < min(0.0, profit_mean * 0.8) advice: list[str] = [] # Priority 1: excess inventory / expiry risk must override any default maintain rule. if waste_extreme: advice.append("start markdown earlier") # Priority 2: high demand and frequent unavailability. if demand_high and stockout_high: advice.append("increase replenishment") # Priority 3: slow-moving and unprofitable inventory. if demand_low and (waste_high or profit_weak): advice.append("reduce replenishment") if waste_high or profit_weak: advice.append("review mix and margin") if not advice: advice.append("maintain current playbook") deduped: list[str] = [] for item in advice: if item not in deduped: deduped.append(item) return deduped def category_intelligence(df: pd.DataFrame): st.subheader("FRESHIE 路 Category Intelligence") st.markdown( '
Category view: chart-led category, region, store, and forecast insights for the current filters.
', unsafe_allow_html=True, ) cat_summary = df.groupby("category").agg( avg_demand=("daily_demand", "mean"), avg_stock=("initial_quantity", "mean"), avg_remaining=("leftover_units", "mean"), waste_pct=("waste_pct", "mean"), stockout_rate=("stockout_flag", "mean"), avg_profit=("profit", "mean"), sell_through=("sell_through_pct", "mean"), lost_sales=("lost_sales_units", "mean"), ).reset_index() region_cat = df.groupby(["region", "category"]).agg( avg_demand=("daily_demand", "mean"), avg_profit=("profit", "mean"), waste_pct=("waste_pct", "mean"), stockout_rate=("stockout_flag", "mean"), ).reset_index() store_cat = df.groupby(["store_id", "category"]).agg( avg_demand=("daily_demand", "mean"), avg_stock=("initial_quantity", "mean"), waste_pct=("waste_pct", "mean"), stockout_rate=("stockout_flag", "mean"), avg_profit=("profit", "mean"), ).reset_index() weekpart = df.copy() weekpart["week_part"] = np.where(weekpart["is_weekend"] == 1, "Weekend", "Weekday") week_summary = weekpart.groupby(["category", "week_part"]).agg( avg_demand=("daily_demand", "mean"), avg_stock=("initial_quantity", "mean"), waste_pct=("waste_pct", "mean"), stockout_rate=("stockout_flag", "mean"), avg_profit=("profit", "mean"), ).reset_index() k1, k2, k3, k4 = st.columns(4) k1.metric("Avg demand", f"{df['daily_demand'].mean():.1f}") k2.metric("Waste rate", f"{df['waste_pct'].mean():.1%}") k3.metric("Stockout rate", f"{df['stockout_flag'].mean():.1%}") k4.metric("Avg profit", f"EUR {df['profit'].mean():.2f}") top_left, top_right = st.columns([1.25, 1]) with top_left: fig = px.bar( cat_summary.sort_values("avg_demand", ascending=False), x="category", y=["avg_demand", "avg_stock", "avg_remaining"], barmode="group", title="Category demand, stock, and remaining inventory", ) st.plotly_chart(fig, use_container_width=True) fig2 = px.scatter( cat_summary, x="stockout_rate", y="waste_pct", size=(cat_summary["avg_profit"].clip(lower=0) + 1), color="category", hover_data=["avg_demand", "avg_stock", "avg_remaining", "sell_through", "lost_sales"], title="Category trade-off: stockout vs waste", ) st.plotly_chart(fig2, use_container_width=True) with top_right: st.markdown("### Core indicators and recommendations") for _, r in cat_summary.sort_values("avg_demand", ascending=False).iterrows(): advice = build_category_recommendations(r, cat_summary) st.markdown( f"- **{r['category']}**: demand {r['avg_demand']:.1f}, stock {r['avg_stock']:.1f}, " f"waste {r['waste_pct']:.1%}, stockout {r['stockout_rate']:.1%}, profit EUR {r['avg_profit']:.2f}. " f"**Recommendation:** " + "; ".join(advice) + "." ) lower_left, lower_right = st.columns([1.25, 1]) with lower_left: fig3 = px.density_heatmap( region_cat, x="category", y="region", z="avg_profit", title="Regional profit heatmap by category", ) st.plotly_chart(fig3, use_container_width=True) fig4 = px.bar( week_summary, x="category", y="avg_demand", color="week_part", barmode="group", title="Weekday vs weekend demand by category", ) st.plotly_chart(fig4, use_container_width=True) with lower_right: fig5 = px.bar( store_cat.sort_values("avg_profit", ascending=False).head(20), x="store_id", y="avg_profit", color="category", title="Top filtered stores by category profit", ) st.plotly_chart(fig5, use_container_width=True) scope_label = "Filtered selection" if df["store_id"].nunique() == 1: scope_label = df["store_id"].iloc[0] elif df["store_id"].nunique() > 1: scope_label = f"{df['store_id'].nunique()} stores" elif df["region"].nunique() >= 1: scope_label = " / ".join(sorted(df["region"].dropna().unique().tolist())) forecast_df = forecast_filtered_demand(df, scope_label) if not forecast_df.empty: fig6 = px.line( forecast_df, x="transaction_date", y="daily_demand", color="series", title=f"Demand forecast for {scope_label}", ) st.plotly_chart(fig6, use_container_width=True) st.caption("Forecast method: weekday seasonal average from the most recent 56 days, with a 14-day mean fallback.") def inventory_page(df: pd.DataFrame): st.subheader("FRESHIE 路 Inventory & Replenishment") st.markdown('
Audience: supply chain. Use this page for reorder, transfer, expiry control, and stock-balancing decisions.
', unsafe_allow_html=True) work = df.copy() work["recommended_order_qty"] = (1.15 * work["daily_demand"] - work["leftover_units"]).clip(lower=0).round() work.loc[work["days_until_expiry"] <= 7, "recommended_order_qty"] *= 0.75 work["recommended_order_qty"] = work["recommended_order_qty"].round() left, right = st.columns([1.2, 1]) with left: cat = work.groupby("category")[["initial_quantity", "recommended_order_qty", "waste_pct", "profit", "lost_sales_units"]].mean().reset_index() cat["order_reduction_pct"] = 1 - cat["recommended_order_qty"] / cat["initial_quantity"].replace(0, np.nan) cat["order_reduction_pct"] = cat["order_reduction_pct"].fillna(0) fig = px.bar( cat.sort_values("order_reduction_pct", ascending=False), x="order_reduction_pct", y="category", orientation="h", title="Recommended order reduction by category", ) st.plotly_chart(fig, use_container_width=True) fig2 = px.scatter( cat, x="waste_pct", y="lost_sales_units", size=(cat["profit"].clip(lower=0) + 1), color="category", title="Waste vs lost sales by category", ) st.plotly_chart(fig2, use_container_width=True) with right: st.markdown("### Action shortlist") shortlist = work.sort_values(["waste_pct", "lost_sales_units"], ascending=[False, False])[[ "store_id", "category", "product_name", "daily_demand", "leftover_units", "days_until_expiry", "waste_pct", "recommended_order_qty" ]].head(15) st.dataframe(shortlist, use_container_width=True, hide_index=True) st.markdown("### Expiry pressure") expiry = work.groupby("expiry_bucket")[["units_wasted", "leftover_units", "units_sold"]].sum().reset_index() fig3 = px.bar( expiry, x="expiry_bucket", y=["units_wasted", "leftover_units"], barmode="group", title="Wasted and leftover units by expiry bucket", ) st.plotly_chart(fig3, use_container_width=True) st.markdown("### What-if Simulator") st.caption("Simulator logic: deterministic business-rule heuristic. Waste and profit changes are estimated with fixed uplift/reduction coefficients, not machine learning.") col1, col2, col3 = st.columns(3) selected_category = col1.selectbox("Category for simulation", sorted(df["category"].unique()), key="inv_sim_cat") order_cut = col2.slider("Reduce order quantity by %", 0, 40, 10, key="inv_sim_cut") markdown_shift = col3.slider("Advance markdown trigger by days", 0, 5, 2, key="inv_sim_mark") sim = df[df["category"] == selected_category].copy() current_waste = sim["waste_pct"].mean() current_profit = sim["profit"].mean() current_stockout = sim["stockout_flag"].mean() waste_reduction = 0.35 * (order_cut / 100) + 0.015 * markdown_shift stockout_rise = 0.12 * (order_cut / 100) sim_waste = max(current_waste * (1 - waste_reduction), 0) sim_profit = current_profit * (1 + 0.08 * (order_cut / 100) + 0.03 * markdown_shift) sim_stockout = min(current_stockout * (1 + stockout_rise), 1) s1, s2, s3 = st.columns(3) s1.metric("Simulated waste", f"{sim_waste:.1%}", delta=f"-{(current_waste - sim_waste):.1%}") s2.metric("Simulated avg profit", f"EUR {sim_profit:.2f}", delta=f"EUR {(sim_profit - current_profit):.2f}") s3.metric("Simulated stockout", f"{sim_stockout:.1%}", delta=f"+{(sim_stockout - current_stockout):.1%}") st.markdown("### Transfer suggestions") receiver = work.groupby(["store_id", "region", "category"]).agg( remaining_inventory=("leftover_units", "sum"), demand=("daily_demand", "sum"), unmet_demand=("lost_sales_units", "sum"), avg_days_until_expiry=("days_until_expiry", "mean"), ).reset_index() donors = receiver.copy() donors["surplus_qty"] = donors["remaining_inventory"] - donors["demand"] transfer_rows = [] need_df = receiver[receiver["unmet_demand"] > 0].copy() for _, r in need_df.iterrows(): pool = donors[ (donors["category"] == r["category"]) & (donors["store_id"] != r["store_id"]) & (donors["surplus_qty"] > 0) ].copy() if pool.empty: best_route = "No feasible donor" same_region_options = "No same-region donor" cross_region_options = "No cross-region donor" transfer_qty = 0 else: pool["priority_rank"] = (pool["region"] != r["region"]).astype(int) pool = pool.sort_values(["priority_rank", "avg_days_until_expiry", "surplus_qty"], ascending=[True, False, False]) same_region = pool[pool["priority_rank"] == 0].head(3) cross_region = pool[pool["priority_rank"] == 1].head(3) best = pool.iloc[0] transfer_qty = int(min(r["unmet_demand"], max(best["surplus_qty"], 0))) def label(d): tier = "same-region" if d["priority_rank"] == 0 else "cross-region" return f"{d['store_id']} ({tier}, expiry {d['avg_days_until_expiry']:.1f}d, surplus {int(d['surplus_qty'])})" best_route = label(best) same_region_options = "; ".join(label(d) for _, d in same_region.iterrows()) if not same_region.empty else "No same-region donor" cross_region_options = "; ".join(label(d) for _, d in cross_region.iterrows()) if not cross_region.empty else "No cross-region donor" transfer_rows.append({ "store_id": r["store_id"], "region": r["region"], "category": r["category"], "remaining_inventory": int(r["remaining_inventory"]), "demand": int(r["demand"]), "unmet_demand": int(r["unmet_demand"]), "recommended_transfer_qty": transfer_qty, "best_route": best_route, "same_region_options": same_region_options, "cross_region_options": cross_region_options, }) transfer_df = pd.DataFrame(transfer_rows) st.caption("Transfer logic: same-region donors are prioritized first. Because the source data has no distance field, donor ranking uses region priority, remaining shelf life, and surplus quantity.") if transfer_df.empty: st.success("No filtered store currently shows unmet demand that needs transfer support.") else: st.dataframe(transfer_df.sort_values(["unmet_demand", "recommended_transfer_qty"], ascending=[False, False]), use_container_width=True, hide_index=True) def promotion_page(df: pd.DataFrame): st.subheader("FRESHIE 路 Promotion Designer") st.markdown('
Audience: marketing. Use this page to test markdown depth, bundle logic, and campaign copy.
', unsafe_allow_html=True) st.caption("Promotion designer logic: business-rule simulator. Demand lift is estimated from a base uplift + discount effect + optional bundle effect.") left, right = st.columns([1, 1.25]) with left: promo_category = st.selectbox("Promotion category", sorted(df["category"].unique()), key="promo_cat") expiry_target = st.selectbox("Target expiry bucket", ["<=1d", "2-3d", "4-7d", "8-30d", ">30d"], key="promo_exp") discount = st.slider("Discount %", 0, 50, 18, key="promo_disc") bundle = st.checkbox("Bundle with complementary items", value=True, key="promo_bundle") weekend_only = st.checkbox("Weekend campaign only", value=False, key="promo_weekend") cat_df = df[df["category"] == promo_category].copy() sub = cat_df[cat_df["expiry_bucket"].astype(str) == str(expiry_target)].copy() if weekend_only: sub = sub[sub["is_weekend"] == 1] demand_lift = 0.08 + discount / 200 if bundle: demand_lift += 0.06 est_sales_uplift = sub["units_sold"].mean() * demand_lift if len(sub) else 0 est_waste_drop = sub["waste_pct"].mean() * min(0.35, demand_lift) if len(sub) else 0 est_profit = sub["profit"].mean() * (1 + demand_lift - discount / 150) if len(sub) else 0 # recommend optimal discount by scanning candidates for chosen category and expiry bucket sim_base = sub if len(sub) else cat_df[cat_df["expiry_bucket"].astype(str) == str(expiry_target)].copy() sims = [] for disc in range(0, 55, 5): lift = 0.08 + disc / 200 + (0.06 if bundle else 0) sim_profit = (sim_base["profit"].mean() * (1 + lift - disc / 150)) if len(sim_base) else 0 sim_waste = (sim_base["waste_pct"].mean() * (1 - min(0.35, lift))) if len(sim_base) else 0 score = sim_profit - 120 * sim_waste sims.append({"discount": disc, "sim_profit": sim_profit, "sim_waste": sim_waste, "score": score}) sim_df = pd.DataFrame(sims) best = sim_df.sort_values("score", ascending=False).iloc[0] st.metric("Estimated sales uplift", f"{est_sales_uplift:.2f} units") st.metric("Estimated waste reduction", f"{est_waste_drop:.1%}") st.metric("Estimated avg profit", f"EUR {est_profit:.2f}") st.metric("Suggested discount", f"{int(best['discount'])}%") st.caption("Suggested discount logic: test 0% to 50% in 5-point steps and choose the discount that maximizes a simple score = simulated profit - waste penalty.") st.markdown("### Campaign brief") campaign_type = "weekend bundle campaign" if bundle and weekend_only else "bundle campaign" if bundle else "markdown campaign" st.success(f"Run a {int(best['discount'])}% {campaign_type} for {promo_category} items in {expiry_target}.") with right: order = ["<=1d", "2-3d", "4-7d", "8-30d", ">30d"] cat_ts = cat_df.groupby(["month", "expiry_bucket"])[["discount_pct", "waste_pct", "sell_through_pct"]].mean().reset_index() cat_ts["expiry_bucket"] = pd.Categorical(cat_ts["expiry_bucket"], categories=order, ordered=True) cat_ts = cat_ts.sort_values(["expiry_bucket", "month"]) present = cat_ts["expiry_bucket"].dropna().astype(str).unique().tolist() if not cat_ts.empty: fig_disc = px.line( cat_ts, x="month", y="discount_pct", color="expiry_bucket", markers=True, title=f"Discount trend for {promo_category} across expiry buckets", ) st.plotly_chart(fig_disc, use_container_width=True) fig_waste = px.line( cat_ts, x="month", y="waste_pct", color="expiry_bucket", markers=True, title=f"Waste trend for {promo_category} across expiry buckets", ) st.plotly_chart(fig_waste, use_container_width=True) fig_sell = px.line( cat_ts, x="month", y="sell_through_pct", color="expiry_bucket", markers=True, title=f"Sell-through trend for {promo_category} across expiry buckets", ) st.plotly_chart(fig_sell, use_container_width=True) missing = [b for b in order if b not in present] if missing: st.caption("Buckets without records for this category are omitted: " + ", ".join(missing)) else: st.info("No records match the current promotion category.") st.markdown("### Recommended promotion copy") st.info( f"Fresh pick alert: enjoy {int(best['discount'])}% off selected {promo_category.lower()} items" + (" this weekend" if weekend_only else "") + (" with smart bundle savings" if bundle else " while they are still at peak freshness") + f". Prioritize the {expiry_target} bucket and highlight freshness, value, and limited-time availability." ) def consumer_deals(df: pd.DataFrame): st.subheader("FRESHIE 路 Deal Finder") stores = sorted(df["store_id"].dropna().unique()) if not stores: st.warning("No stores available in the current filter.") return chosen_store = st.selectbox("Choose store first", stores) store_df = df[df["store_id"] == chosen_store].copy() c1, c2, c3 = st.columns(3) budget_range = c1.slider("Budget range (EUR)", 1, 60, (1, 20)) preferred_category = c2.selectbox("Preferred category", ["All"] + sorted(store_df["category"].unique())) expiry_range = c3.slider("Days until expiry", 1, 14, (1, 5)) deals = store_df[ (store_df["days_until_expiry"] >= expiry_range[0]) & (store_df["days_until_expiry"] <= expiry_range[1]) & (store_df["selling_price"] >= budget_range[0]) & (store_df["selling_price"] <= budget_range[1]) ].copy() if preferred_category != "All": deals = deals[deals["category"] == preferred_category] deals["savings"] = (deals["base_price"] - deals["selling_price"]).clip(lower=0) deals["deal_score"] = deals["discount_pct"] * 0.6 + deals["sell_through_pct"] * 0.2 + (1 - deals["waste_pct"]) * 0.2 deals = deals.sort_values(["deal_score", "savings"], ascending=False) st.markdown('
Store marketing view: all deal recommendations below are scoped to the selected store so the shopper sees store-specific promotions and products.
', unsafe_allow_html=True) show = deals[["product_name", "category", "days_until_expiry", "base_price", "selling_price", "discount_pct", "savings"]].head(20).copy() icon_map = { "Bakery": "馃", "Beverages": "馃", "Dairy": "馃", "Deli": "馃Ш", "Meat": "馃ォ", "Pharmaceuticals": "馃拪", "Produce": "馃ガ", "Ready_to_Eat": "馃嵄", "Seafood": "馃悷" } show["item"] = show.apply(lambda r: f"{icon_map.get(str(r['category']), '馃摝')} {r['product_name']}", axis=1) st.dataframe(show[["item", "category", "days_until_expiry", "base_price", "selling_price", "discount_pct", "savings"]], use_container_width=True, hide_index=True) st.markdown("### Best current deals") top_cards = deals.head(6) cols = st.columns(3) for i, (_, row) in enumerate(top_cards.iterrows()): icon = icon_map.get(str(row["category"]), "馃摝") with cols[i % 3]: st.markdown(f"**{icon} {row['product_name']}**") st.write(f"{row['category']} 路 expires in {int(row['days_until_expiry'])} day(s)") st.write(f"Now EUR {row['selling_price']:.2f} | Save EUR {row['savings']:.2f}") st.caption(f"Discount: {row['discount_pct']:.0%} 路 Store: {chosen_store}") def consumer_bundles(df: pd.DataFrame): st.subheader("FRESHIE 路 Bundle Builder") stores = sorted(df["store_id"].dropna().unique()) if not stores: st.warning("No stores available in the current filter.") return chosen_store = st.selectbox("Choose store", stores, key="bundle_store") store_df = df[df["store_id"] == chosen_store].copy() c1, c2, c3 = st.columns(3) budget_range = c1.slider("Bundle budget range (EUR)", 8, 80, (8, 25)) theme = c2.selectbox("Bundle theme", ["Quick dinner", "Healthy protein", "Family breakfast", "Budget saver"]) expiry_range = c3.slider("Use items expiring within days", 1, 10, (1, 5), key="bundle_exp") if "bundle_seed" not in st.session_state: st.session_state["bundle_seed"] = 0 if st.button("Refresh bundle"): st.session_state["bundle_seed"] += 1 work = store_df[ (store_df["days_until_expiry"] >= expiry_range[0]) & (store_df["days_until_expiry"] <= expiry_range[1]) & (store_df["selling_price"] <= budget_range[1]) ].copy() theme_map = { "Quick dinner": ["Ready_to_Eat", "Produce", "Bakery", "Dairy"], "Healthy protein": ["Meat", "Seafood", "Dairy", "Produce"], "Family breakfast": ["Bakery", "Dairy", "Beverages", "Produce"], "Budget saver": list(work["category"].dropna().unique()), } work = work[work["category"].isin(theme_map.get(theme, []))].copy() work["score"] = work["discount_pct"] * 0.5 + (1 - work["waste_pct"]) * 0.2 + work["sell_through_pct"] * 0.3 if len(work) == 0: st.warning("No bundle fits the current conditions.") return seed = int(st.session_state["bundle_seed"]) candidate_frames = [] for idx, (cat_name, grp) in enumerate(work.groupby("category")): grp = grp.sort_values(["score", "selling_price"], ascending=[False, True]).head(10).copy() if len(grp) > 1: shift = (seed + idx) % len(grp) grp = pd.concat([grp.iloc[shift:], grp.iloc[:shift]], ignore_index=True) candidate_frames.append(grp) work = pd.concat(candidate_frames, ignore_index=True) if theme == "Budget saver": work = work.sort_values(["selling_price", "score"], ascending=[True, False]) else: work = work.sort_values(["score", "selling_price"], ascending=[False, True]) picked, subtotal, used_categories = [], 0.0, set() for _, row in work.iterrows(): if subtotal + row["selling_price"] <= budget_range[1]: if theme != "Budget saver" and row["category"] in used_categories: continue picked.append(row) subtotal += row["selling_price"] used_categories.add(row["category"]) if len(picked) >= 5: break if not picked: st.warning("No bundle fits the current conditions.") return bundle = pd.DataFrame(picked) base_total = float(bundle["base_price"].sum()) item_total = float(bundle["selling_price"].sum()) if item_total >= 45: bundle_discount = 0.15 elif item_total >= 30: bundle_discount = 0.12 elif item_total >= 20: bundle_discount = 0.10 elif item_total >= 12: bundle_discount = 0.08 else: bundle_discount = 0.05 bundle_saving = item_total * bundle_discount final_total = max(item_total - bundle_saving, 0) saved = base_total - final_total k1, k2, k3, k4 = st.columns(4) k1.metric("Bundle subtotal", f"EUR {item_total:.2f}") k2.metric("Bundle discount", f"{bundle_discount:.0%}") k3.metric("Bundle total", f"EUR {final_total:.2f}") k4.metric("You save", f"EUR {saved:.2f}") st.dataframe( bundle[["product_name", "category", "selling_price", "base_price", "discount_pct", "days_until_expiry"]], use_container_width=True, hide_index=True, ) st.caption( "Bundle pricing logic: items keep their current markdowns, then the system applies an extra basket discount. " "Higher basket subtotals receive a deeper extra discount. Refresh rotates the candidate pool to surface a different combination." ) def consumer_personal(df: pd.DataFrame): st.subheader("FRESHIE 路 Personalized Promotions") stores = sorted(df["store_id"].dropna().unique()) if not stores: st.warning("No stores available in the current filter.") return chosen_store = st.selectbox("Choose store", stores, key="personal_store") store_df = df[df["store_id"] == chosen_store].copy() favorite = st.selectbox("Favorite category", sorted(store_df["category"].unique()), key="cp_fav") price_range = st.slider("Price range (EUR)", 1, 30, (1, 10), key="cp_cap") expiry_range = st.slider("Days until expiry", 1, 14, (1, 7), key="cp_days") recs = store_df[ (store_df["category"] == favorite) & (store_df["selling_price"] >= price_range[0]) & (store_df["selling_price"] <= price_range[1]) & (store_df["days_until_expiry"] >= expiry_range[0]) & (store_df["days_until_expiry"] <= expiry_range[1]) ].copy() recs["score"] = recs["discount_pct"] * 0.55 + recs["sell_through_pct"] * 0.20 + (1 - recs["waste_pct"]) * 0.25 recs = recs.sort_values("score", ascending=False).head(12) cols = st.columns(3) for i, (_, row) in enumerate(recs.iterrows()): with cols[i % 3]: st.markdown(f"**{row['product_name']}**") st.write(f"{row['category']} 路 {chosen_store}") st.write(f"Now EUR {row['selling_price']:.2f}") st.write(f"Expires in {int(row['days_until_expiry'])} day(s)") st.caption(f"Discount: {row['discount_pct']:.0%}") def consumer_wait_or_buy(df: pd.DataFrame): st.subheader("FRESHIE 路 Wait or buy now?") st.markdown( """ Get smart purchase timing suggestions based on your selected region and product category. The system evaluates each item's sell-through rate and current discount across stores to indicate whether it is better to buy now or wait for a potentially better deal. """ ) work = df.copy() if work.empty: st.info("No records match the current filters.") return # Keep the logic on top of the current sidebar filters. # Only add an optional category selector when multiple categories remain visible. categories = sorted(work["category"].dropna().unique().tolist()) if len(categories) > 1: selected_category = st.selectbox("Choose category", categories, key="wait_buy_category") work = work[work["category"] == selected_category].copy() if work.empty: st.info("No records remain after the category selection.") return work["discount"] = work["discount_pct"].fillna(0).clip(lower=0) work["sell_through"] = work["sell_through_pct"].fillna(0).clip(lower=0) def classify_item(row): if row["sell_through"] > 0.7: return "buy now", "selling fast" if row["sell_through"] < 0.4 and row["discount"] < 0.2: return "wait it", "inventory looks comfortable and discount is still modest" if row["sell_through"] < 0.4 and row["discount"] > 0.2: return "suggested", "inventory is comfortable and discount is attractive" return "no special signal", "balanced signal" labels = work.apply(classify_item, axis=1) work["suggestion"] = [x[0] for x in labels] work["reason"] = [x[1] for x in labels] c1, c2, c3, c4 = st.columns(4) c1.metric("Items shown", f"{len(work):,}") c2.metric("Buy now", int((work["suggestion"] == "buy now").sum())) c3.metric("Wait it", int((work["suggestion"] == "wait it").sum())) c4.metric("Suggested", int((work["suggestion"] == "suggested").sum())) display = work[[ "product_name", "category", "store_id", "days_until_expiry", "sell_through", "discount", "stockout_flag", "suggestion", "reason", ]].copy() display = display.sort_values( ["suggestion", "days_until_expiry", "discount", "product_name"], ascending=[True, True, False, True], ) st.dataframe(display, use_container_width=True, hide_index=True) summary = ( display["suggestion"] .value_counts() .rename_axis("suggestion") .reset_index(name="items") .sort_values("items", ascending=False) ) fig = px.bar( summary, x="suggestion", y="items", color="suggestion", title="Wait or buy now suggestion mix", ) st.plotly_chart(fig, use_container_width=True) def manager_manual(): st.subheader("FRESHIE 路 User Manual (Manager)") st.markdown(""" **Executive Overview** - Track revenue, profit, units sold, and units wasted by month. - Use the diagnosis panel to understand abnormal revenue-profit gaps. **Category Intelligence** - Compare categories on demand, stock, waste, stockout, and profit. - Read the category recommendations and forecast for the current selection. **Inventory & Replenishment** - Review reorder guidance by category. - Use the what-if simulator to see the trade-off between waste, profit, and stockout. **Promotion Designer** - Test markdown depth, expiry targeting, and bundle logic. - Use the campaign brief and promotion copy as a starting point for activation. """) def consumer_manual(): st.subheader("FRESHIE 路 User Manual (Consumer)") st.markdown(""" **Deal Finder** - Browse the strongest discounted products under your preferred budget and expiry window. **Bundle Builder** - Build a themed basket while staying under budget. **Personalized Promotions** - Focus on your favorite category and a comfortable price cap. **Wait or Buy Now?** - Get smart purchase timing suggestions based on your selected region and product category. - The system evaluates each item's sell-through rate and current discount across stores to indicate whether it is better to buy now or wait for a potentially better deal. """) def main(): inject_css() st.markdown( """
A warm, friendly fresh-food assistant for stores, managers, and everyday shoppers.
馃ガ 馃崜 馃悷 馃崬
馃攷 Fresh, reliable, low-waste decisions across fish, produce, dairy, bakery, and more
""", unsafe_allow_html=True, ) try: df = load_data() except Exception as e: st.error(str(e)) st.stop() filtered, exec_scope, scope_info = apply_filters(df) if filtered.empty: st.warning("No data left after filtering.") st.stop() role = st.radio("Choose your mode", ["Manager", "Consumer"], horizontal=True) if role == "Manager": tabs = st.tabs([ "Executive Overview", "Category Intelligence", "Inventory & Replenishment", "Promotion Designer", "User Manual", ]) with tabs[0]: executive_overview(filtered, exec_scope, scope_info) with tabs[1]: category_intelligence(filtered) with tabs[2]: inventory_page(filtered) with tabs[3]: promotion_page(filtered) with tabs[4]: manager_manual() else: tabs = st.tabs(["Deal Finder", "Bundle Builder", "Personalized Promotions", "Wait or buy now?", "User Manual"]) with tabs[0]: consumer_deals(filtered) with tabs[1]: consumer_bundles(filtered) with tabs[2]: consumer_personal(filtered) with tabs[3]: consumer_wait_or_buy(filtered) with tabs[4]: consumer_manual() with st.expander("About this app"): st.markdown(""" - Stable FRESHIE UI baseline. - Built to avoid the white-screen issues caused by heavier UI logic. - Next step: reintroduce richer branding and additional consumer features incrementally. """) if __name__ == "__main__": main()