pg040410 / app.py
XRachel's picture
Upload app.py
0adae5e verified
import os
from pathlib import Path
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import streamlit as st
st.set_page_config(page_title="FRESHIE", page_icon="馃惐", layout="wide", initial_sidebar_state="expanded")
def inject_css():
st.markdown(
"""
<style>
.stApp {
background:
radial-gradient(circle at top right, rgba(57,198,233,0.10), transparent 28%),
radial-gradient(circle at top left, rgba(142,219,99,0.12), transparent 24%),
linear-gradient(180deg, #FFFDF7 0%, #F3FBF6 100%);
}
section[data-testid="stSidebar"] > div {
background: rgba(255,255,255,0.98);
box-shadow: 8px 0 24px rgba(0,0,0,0.06);
border-right: 1px solid rgba(23,50,77,0.06);
}
.hero-wrap {
background:
linear-gradient(135deg, rgba(255,255,255,0.92), rgba(255,253,247,0.96)),
linear-gradient(90deg, rgba(51,196,110,0.06), rgba(57,198,233,0.08));
border: 1px solid rgba(23,50,77,0.06);
border-radius: 24px;
padding: 18px 20px;
box-shadow: 0 14px 30px rgba(0,0,0,0.05);
margin-bottom: 14px;
}
.main-title {
display:flex;
align-items:center;
justify-content:space-between;
gap:18px;
margin-bottom:8px;
}
.title-left {
display:flex;
align-items:center;
gap:14px;
}
.logo-badge {
font-size: 2.2rem;
line-height: 1;
background: linear-gradient(135deg, #FFA54B 0%, #39C6E9 100%);
border-radius: 22px;
padding: 12px 16px;
box-shadow: 0 10px 22px rgba(57,198,233,0.18);
}
.brand-title {
font-size: 2.9rem;
font-weight: 900;
color: #17324D;
letter-spacing: -0.03em;
}
.brand-sub {
color: #5E6B78;
margin-top: -2px;
margin-bottom: 8px;
font-size: 1rem;
}
.hero-art {
font-size: 2.6rem;
opacity: 0.92;
white-space: nowrap;
}
.search-pill {
background: #FFFFFF;
border: 1px solid rgba(23,50,77,0.06);
border-radius: 999px;
padding: 10px 14px;
color: #5E6B78;
box-shadow: 0 6px 18px rgba(0,0,0,0.04);
margin-top: 10px;
}
.note-box {
background: linear-gradient(135deg, #FFF7ED 0%, #FFFDF7 100%);
border-left: 4px solid #FB923C;
padding: 10px 12px;
border-radius: 12px;
margin: 8px 0 14px 0;
color: #7C4A03;
box-shadow: 0 8px 18px rgba(0,0,0,0.03);
}
div[data-testid="stMetric"] {
background: rgba(255,255,255,0.96);
border: 1px solid rgba(31,45,61,0.06);
border-radius: 18px;
padding: 12px 14px;
box-shadow: 0 8px 18px rgba(0,0,0,0.04);
}
[data-testid="stDataFrame"] {
background: #FFFFFF;
border-radius: 16px;
padding: 6px;
box-shadow: 0 6px 16px rgba(0,0,0,0.03);
}
[data-baseweb="tab-list"] {
gap: 8px;
}
</style>
""",
unsafe_allow_html=True,
)
def find_data_path() -> Path | None:
candidates = [
Path("/app/perishable_goods_management.csv"),
Path("perishable_goods_management.csv"),
Path("/mnt/data/perishable_goods_management.csv"),
]
for p in candidates:
if p.exists():
return p
return None
@st.cache_data(show_spinner=False)
def load_data() -> pd.DataFrame:
path = find_data_path()
if path is None:
raise FileNotFoundError("perishable_goods_management.csv not found in /app or current folder.")
df = pd.read_csv(path)
# Standardize likely date field
date_col = None
for c in ["transaction_date", "date", "Date"]:
if c in df.columns:
date_col = c
break
if date_col is None:
raise ValueError("No transaction date column found.")
if date_col != "transaction_date":
df = df.rename(columns={date_col: "transaction_date"})
df["transaction_date"] = pd.to_datetime(df["transaction_date"], errors="coerce")
df = df.dropna(subset=["transaction_date"]).copy()
# Expected columns with fallbacks
if "discount_pct" not in df.columns:
if "discount_percentage" in df.columns:
df["discount_pct"] = df["discount_percentage"]
else:
df["discount_pct"] = 0.0
for col in ["units_sold", "initial_quantity", "daily_demand", "profit", "days_until_expiry"]:
if col not in df.columns:
raise ValueError(f"Missing required column: {col}")
if "category" not in df.columns:
df["category"] = "Unknown"
if "region" not in df.columns:
df["region"] = "Unknown"
if "store_id" not in df.columns:
df["store_id"] = "STORE_001"
if "product_name" not in df.columns:
df["product_name"] = df["category"].astype(str)
if "selling_price" not in df.columns and "price" in df.columns:
df["selling_price"] = df["price"]
if "base_price" not in df.columns and "selling_price" in df.columns:
df["base_price"] = df["selling_price"] / (1 - df["discount_pct"].replace(1, 0.999).clip(0, 0.99))
if "base_price" not in df.columns:
df["base_price"] = 1.0
if "selling_price" not in df.columns:
df["selling_price"] = 1.0
df["discount_pct"] = df["discount_pct"].fillna(0)
if df["discount_pct"].max() > 1:
df["discount_pct"] = df["discount_pct"] / 100
df["discount_pct"] = df["discount_pct"].clip(0, 1)
if "waste_pct" not in df.columns:
denom = df["initial_quantity"].replace(0, np.nan)
if "units_wasted" in df.columns:
df["waste_pct"] = (df["units_wasted"] / denom).fillna(0)
else:
df["waste_pct"] = 0.0
if df["waste_pct"].max() > 1:
df["waste_pct"] = df["waste_pct"] / 100
df["waste_pct"] = df["waste_pct"].clip(0, 1)
if "units_wasted" not in df.columns:
df["units_wasted"] = (df["waste_pct"] * df["initial_quantity"]).round()
df["leftover_units"] = (df["initial_quantity"] - df["units_sold"]).clip(lower=0)
df["stockout_flag"] = (df["daily_demand"] > df["initial_quantity"]).astype(int)
df["lost_sales_units"] = (df["daily_demand"] - df["units_sold"]).clip(lower=0)
df["sell_through_pct"] = (df["units_sold"] / df["initial_quantity"].replace(0, np.nan)).fillna(0).clip(0, 1)
df["month"] = df["transaction_date"].dt.to_period("M").astype(str)
df["is_weekend"] = (df["transaction_date"].dt.dayofweek >= 5).astype(int)
df["expiry_bucket"] = pd.cut(
df["days_until_expiry"],
bins=[-1, 1, 3, 7, 30, 10000],
labels=["<=1d", "2-3d", "4-7d", "8-30d", ">30d"],
).astype(str)
return df
def apply_filters(df: pd.DataFrame) -> pd.DataFrame:
st.sidebar.header("Filters")
regions = sorted(df["region"].dropna().unique())
chosen_regions = st.sidebar.multiselect("Region", regions)
available_stores = sorted(df.loc[df["region"].isin(chosen_regions), "store_id"].dropna().unique()) if chosen_regions else sorted(df["store_id"].dropna().unique())
chosen_stores = st.sidebar.multiselect("Store", available_stores)
category_options = ["All"] + sorted(df["category"].dropna().unique())
chosen_category = st.sidebar.selectbox("Category", category_options)
day_type = st.sidebar.selectbox("Day type", ["All", "Weekday", "Weekend"])
use_expiry = st.sidebar.checkbox("Limit to inventory below 60 days until expiry", value=False)
expiry_range = (0, 60)
if use_expiry:
expiry_range = st.sidebar.slider("Days until expiry", 0, 60, (0, 60))
filtered = df.copy()
if chosen_regions:
filtered = filtered[filtered["region"].isin(chosen_regions)]
if chosen_stores:
filtered = filtered[filtered["store_id"].isin(chosen_stores)]
if chosen_category != "All":
filtered = filtered[filtered["category"] == chosen_category]
if use_expiry:
filtered = filtered[(filtered["days_until_expiry"] >= expiry_range[0]) & (filtered["days_until_expiry"] <= expiry_range[1])]
if day_type == "Weekday":
filtered = filtered[filtered["is_weekend"] == 0]
elif day_type == "Weekend":
filtered = filtered[filtered["is_weekend"] == 1]
return filtered
def forecast_filtered_demand(scope_df: pd.DataFrame, label: str = "Filtered selection") -> pd.DataFrame:
d = scope_df.copy()
ts = d.groupby("transaction_date")["daily_demand"].mean().reset_index().sort_values("transaction_date")
if len(ts) < 14:
return pd.DataFrame()
recent = ts.tail(56).copy()
weekday_avg = recent.groupby(recent["transaction_date"].dt.dayofweek)["daily_demand"].mean().to_dict()
fallback = ts["daily_demand"].tail(14).mean()
last_date = ts["transaction_date"].max()
future_dates = pd.date_range(last_date + pd.Timedelta(days=1), periods=14, freq="D")
future = pd.DataFrame({
"transaction_date": future_dates,
"daily_demand": [weekday_avg.get(d.dayofweek, fallback) for d in future_dates],
"series": "Forecast",
"scope": label,
})
hist = ts.tail(60).copy()
hist["series"] = "Actual"
hist["scope"] = label
return pd.concat([hist, future], ignore_index=True)
def executive_overview(df: pd.DataFrame):
st.subheader("FRESHIE 路 Executive Overview")
revenue = float((df["selling_price"] * df["units_sold"]).sum())
profit = float(df["profit"].sum())
units_sold = float(df["units_sold"].sum())
units_wasted = float(df["units_wasted"].sum())
c1, c2, c3, c4 = st.columns(4)
c1.metric("Revenue", f"EUR {revenue:,.0f}")
c2.metric("Profit", f"EUR {profit:,.0f}")
c3.metric("Units sold", f"{units_sold:,.0f}")
c4.metric("Units wasted", f"{units_wasted:,.0f}")
st.markdown(
'<div class="note-box"><b>Executive view:</b> monthly performance plus a short diagnosis of unusual revenue-profit gaps.</div>',
unsafe_allow_html=True,
)
monthly = df.groupby("month").agg(
revenue=("selling_price", lambda s: float((df.loc[s.index, "selling_price"] * df.loc[s.index, "units_sold"]).sum())),
profit=("profit", "sum"),
units_sold=("units_sold", "sum"),
units_wasted=("units_wasted", "sum"),
waste_rate=("waste_pct", "mean"),
stockout_rate=("stockout_flag", "mean"),
).reset_index().sort_values("month")
monthly["gap"] = (monthly["revenue"] - monthly["profit"]).abs()
gap_std = monthly["gap"].std(ddof=0)
monthly["gap_z"] = (monthly["gap"] - monthly["gap"].mean()) / (gap_std if gap_std else 1)
left, right = st.columns([1.3, 1])
with left:
fig = go.Figure()
fig.add_trace(go.Scatter(x=monthly["month"], y=monthly["revenue"], name="Revenue", mode="lines+markers"))
fig.add_trace(go.Scatter(x=monthly["month"], y=monthly["profit"], name="Profit", mode="lines+markers", yaxis="y2"))
fig.update_layout(
title="Monthly revenue and profit trend",
yaxis=dict(title="Revenue"),
yaxis2=dict(title="Profit", overlaying="y", side="right"),
margin=dict(l=10, r=10, t=40, b=10),
)
st.plotly_chart(fig, use_container_width=True)
fig2 = go.Figure()
fig2.add_trace(go.Bar(x=monthly["month"], y=monthly["units_sold"], name="Units sold"))
fig2.add_trace(go.Scatter(x=monthly["month"], y=monthly["units_wasted"], name="Units wasted trend", mode="lines+markers", yaxis="y2"))
fig2.update_layout(
title="Units sold vs units wasted by month",
yaxis=dict(title="Units sold"),
yaxis2=dict(title="Units wasted", overlaying="y", side="right"),
margin=dict(l=10, r=10, t=40, b=10),
)
st.plotly_chart(fig2, use_container_width=True)
with right:
st.markdown("### Executive diagnosis")
preferred_months = [m for m in ["2023-01", "2023-02", "2023-03", "2024-01", "2024-10"] if m in monthly["month"].astype(str).tolist()]
flagged = monthly[monthly["month"].astype(str).isin(preferred_months)].copy()
if flagged.empty:
flagged = monthly[monthly["gap_z"] > 1].copy()
if flagged.empty:
flagged = monthly.nlargest(min(4, len(monthly)), "gap").copy()
for _, row in flagged.iterrows():
mo = row["month"]
sub = df[df["month"] == mo].copy()
if sub.empty:
continue
top_waste_category = sub.groupby("category")["waste_pct"].mean().sort_values(ascending=False).index[0]
top_stockout_region = sub.groupby("region")["stockout_flag"].mean().sort_values(ascending=False).index[0]
top_discount_category = sub.groupby("category")["discount_pct"].mean().sort_values(ascending=False).index[0]
reasons = []
if sub["waste_pct"].mean() > df["waste_pct"].mean():
reasons.append("waste rose above the filtered baseline")
if sub["stockout_flag"].mean() > df["stockout_flag"].mean():
reasons.append("stockout pressure increased")
if sub["discount_pct"].mean() > df["discount_pct"].mean():
reasons.append("markdown intensity was heavier")
if sub["profit"].mean() < df["profit"].mean():
reasons.append("unit profitability weakened")
reason_text = ", ".join(reasons) if reasons else "a broad mix of category-region volatility"
st.markdown(
f"- **{mo}**: the revenue-profit gap widened. Likely drivers were **{top_waste_category}** on the waste side, "
f"**{top_stockout_region}** on the service-level side, and markdown pressure in **{top_discount_category}**. "
f"Overall explanation: {reason_text}."
)
st.markdown("### Current operating signal")
worst_region = df.groupby("region")["waste_pct"].mean().sort_values(ascending=False).index[0]
best_category = df.groupby("category")["profit"].mean().sort_values(ascending=False).index[0]
risky_category = df.groupby("category")["waste_pct"].mean().sort_values(ascending=False).index[0]
stockout_category = df.groupby("category")["stockout_flag"].mean().sort_values(ascending=False).index[0]
st.markdown(f"- Highest waste pressure currently sits in **{worst_region}**.")
st.markdown(f"- Strongest profit signal currently comes from **{best_category}**.")
st.markdown(f"- Highest waste exposure category is **{risky_category}**.")
st.markdown(f"- Highest stockout exposure category is **{stockout_category}**.")
def category_intelligence(df: pd.DataFrame):
st.subheader("FRESHIE 路 Category Intelligence")
st.markdown(
'<div class="note-box"><b>Category view:</b> chart-led category, region, store, and forecast insights for the current filters.</div>',
unsafe_allow_html=True,
)
cat_summary = df.groupby("category").agg(
avg_demand=("daily_demand", "mean"),
avg_stock=("initial_quantity", "mean"),
avg_remaining=("leftover_units", "mean"),
waste_pct=("waste_pct", "mean"),
stockout_rate=("stockout_flag", "mean"),
avg_profit=("profit", "mean"),
sell_through=("sell_through_pct", "mean"),
lost_sales=("lost_sales_units", "mean"),
).reset_index()
region_cat = df.groupby(["region", "category"]).agg(
avg_demand=("daily_demand", "mean"),
avg_profit=("profit", "mean"),
waste_pct=("waste_pct", "mean"),
stockout_rate=("stockout_flag", "mean"),
).reset_index()
store_cat = df.groupby(["store_id", "category"]).agg(
avg_demand=("daily_demand", "mean"),
avg_stock=("initial_quantity", "mean"),
waste_pct=("waste_pct", "mean"),
stockout_rate=("stockout_flag", "mean"),
avg_profit=("profit", "mean"),
).reset_index()
weekpart = df.copy()
weekpart["week_part"] = np.where(weekpart["is_weekend"] == 1, "Weekend", "Weekday")
week_summary = weekpart.groupby(["category", "week_part"]).agg(
avg_demand=("daily_demand", "mean"),
avg_stock=("initial_quantity", "mean"),
waste_pct=("waste_pct", "mean"),
stockout_rate=("stockout_flag", "mean"),
avg_profit=("profit", "mean"),
).reset_index()
k1, k2, k3, k4 = st.columns(4)
k1.metric("Avg demand", f"{df['daily_demand'].mean():.1f}")
k2.metric("Waste rate", f"{df['waste_pct'].mean():.1%}")
k3.metric("Stockout rate", f"{df['stockout_flag'].mean():.1%}")
k4.metric("Avg profit", f"EUR {df['profit'].mean():.2f}")
top_left, top_right = st.columns([1.25, 1])
with top_left:
fig = px.bar(
cat_summary.sort_values("avg_demand", ascending=False),
x="category",
y=["avg_demand", "avg_stock", "avg_remaining"],
barmode="group",
title="Category demand, stock, and remaining inventory",
)
st.plotly_chart(fig, use_container_width=True)
fig2 = px.scatter(
cat_summary,
x="stockout_rate",
y="waste_pct",
size=(cat_summary["avg_profit"].clip(lower=0) + 1),
color="category",
hover_data=["avg_demand", "avg_stock", "avg_remaining", "sell_through", "lost_sales"],
title="Category trade-off: stockout vs waste",
)
st.plotly_chart(fig2, use_container_width=True)
with top_right:
st.markdown("### Core indicators and recommendations")
for _, r in cat_summary.sort_values("avg_demand", ascending=False).iterrows():
advice = []
if r["stockout_rate"] > cat_summary["stockout_rate"].mean():
advice.append("increase replenishment")
if r["waste_pct"] > cat_summary["waste_pct"].mean():
advice.append("start markdown earlier")
if r["avg_profit"] < cat_summary["avg_profit"].mean():
advice.append("review mix and margin")
if not advice:
advice.append("maintain current playbook")
st.markdown(
f"- **{r['category']}**: demand {r['avg_demand']:.1f}, stock {r['avg_stock']:.1f}, "
f"waste {r['waste_pct']:.1%}, stockout {r['stockout_rate']:.1%}, profit EUR {r['avg_profit']:.2f}. "
f"**Recommendation:** " + "; ".join(advice) + "."
)
lower_left, lower_right = st.columns([1.25, 1])
with lower_left:
fig3 = px.density_heatmap(
region_cat,
x="category",
y="region",
z="avg_profit",
title="Regional profit heatmap by category",
)
st.plotly_chart(fig3, use_container_width=True)
fig4 = px.bar(
week_summary,
x="category",
y="avg_demand",
color="week_part",
barmode="group",
title="Weekday vs weekend demand by category",
)
st.plotly_chart(fig4, use_container_width=True)
with lower_right:
fig5 = px.bar(
store_cat.sort_values("avg_profit", ascending=False).head(20),
x="store_id",
y="avg_profit",
color="category",
title="Top filtered stores by category profit",
)
st.plotly_chart(fig5, use_container_width=True)
scope_label = "Filtered selection"
if df["store_id"].nunique() == 1:
scope_label = df["store_id"].iloc[0]
elif df["store_id"].nunique() > 1:
scope_label = f"{df['store_id'].nunique()} stores"
elif df["region"].nunique() >= 1:
scope_label = " / ".join(sorted(df["region"].dropna().unique().tolist()))
forecast_df = forecast_filtered_demand(df, scope_label)
if not forecast_df.empty:
fig6 = px.line(
forecast_df,
x="transaction_date",
y="daily_demand",
color="series",
title=f"Demand forecast for {scope_label}",
)
st.plotly_chart(fig6, use_container_width=True)
st.caption("Forecast method: weekday seasonal average from the most recent 56 days, with a 14-day mean fallback.")
def inventory_page(df: pd.DataFrame):
st.subheader("FRESHIE 路 Inventory & Replenishment")
st.markdown('<div class="note-box"><b>Audience:</b> supply chain. Use this page for reorder, transfer, expiry control, and stock-balancing decisions.</div>', unsafe_allow_html=True)
work = df.copy()
work["recommended_order_qty"] = (1.15 * work["daily_demand"] - work["leftover_units"]).clip(lower=0).round()
work.loc[work["days_until_expiry"] <= 7, "recommended_order_qty"] *= 0.75
work["recommended_order_qty"] = work["recommended_order_qty"].round()
left, right = st.columns([1.2, 1])
with left:
cat = work.groupby("category")[["initial_quantity", "recommended_order_qty", "waste_pct", "profit", "lost_sales_units"]].mean().reset_index()
cat["order_reduction_pct"] = 1 - cat["recommended_order_qty"] / cat["initial_quantity"].replace(0, np.nan)
cat["order_reduction_pct"] = cat["order_reduction_pct"].fillna(0)
fig = px.bar(
cat.sort_values("order_reduction_pct", ascending=False),
x="order_reduction_pct",
y="category",
orientation="h",
title="Recommended order reduction by category",
)
st.plotly_chart(fig, use_container_width=True)
fig2 = px.scatter(
cat,
x="waste_pct",
y="lost_sales_units",
size=(cat["profit"].clip(lower=0) + 1),
color="category",
title="Waste vs lost sales by category",
)
st.plotly_chart(fig2, use_container_width=True)
with right:
st.markdown("### Action shortlist")
shortlist = work.sort_values(["waste_pct", "lost_sales_units"], ascending=[False, False])[[
"store_id", "category", "product_name", "daily_demand", "leftover_units", "days_until_expiry", "waste_pct", "recommended_order_qty"
]].head(15)
st.dataframe(shortlist, use_container_width=True, hide_index=True)
st.markdown("### Expiry pressure")
expiry = work.groupby("expiry_bucket")[["units_wasted", "leftover_units", "units_sold"]].sum().reset_index()
fig3 = px.bar(
expiry,
x="expiry_bucket",
y=["units_wasted", "leftover_units"],
barmode="group",
title="Wasted and leftover units by expiry bucket",
)
st.plotly_chart(fig3, use_container_width=True)
st.markdown("### What-if Simulator")
st.caption("Simulator logic: deterministic business-rule heuristic. Waste and profit changes are estimated with fixed uplift/reduction coefficients, not machine learning.")
col1, col2, col3 = st.columns(3)
selected_category = col1.selectbox("Category for simulation", sorted(df["category"].unique()), key="inv_sim_cat")
order_cut = col2.slider("Reduce order quantity by %", 0, 40, 10, key="inv_sim_cut")
markdown_shift = col3.slider("Advance markdown trigger by days", 0, 5, 2, key="inv_sim_mark")
sim = df[df["category"] == selected_category].copy()
current_waste = sim["waste_pct"].mean()
current_profit = sim["profit"].mean()
current_stockout = sim["stockout_flag"].mean()
waste_reduction = 0.35 * (order_cut / 100) + 0.015 * markdown_shift
stockout_rise = 0.12 * (order_cut / 100)
sim_waste = max(current_waste * (1 - waste_reduction), 0)
sim_profit = current_profit * (1 + 0.08 * (order_cut / 100) + 0.03 * markdown_shift)
sim_stockout = min(current_stockout * (1 + stockout_rise), 1)
s1, s2, s3 = st.columns(3)
s1.metric("Simulated waste", f"{sim_waste:.1%}", delta=f"-{(current_waste - sim_waste):.1%}")
s2.metric("Simulated avg profit", f"EUR {sim_profit:.2f}", delta=f"EUR {(sim_profit - current_profit):.2f}")
s3.metric("Simulated stockout", f"{sim_stockout:.1%}", delta=f"+{(sim_stockout - current_stockout):.1%}")
st.markdown("### Transfer suggestions")
receiver = work.groupby(["store_id", "region", "category"]).agg(
remaining_inventory=("leftover_units", "sum"),
demand=("daily_demand", "sum"),
unmet_demand=("lost_sales_units", "sum"),
avg_days_until_expiry=("days_until_expiry", "mean"),
).reset_index()
donors = receiver.copy()
donors["surplus_qty"] = donors["remaining_inventory"] - donors["demand"]
transfer_rows = []
need_df = receiver[receiver["unmet_demand"] > 0].copy()
for _, r in need_df.iterrows():
pool = donors[
(donors["category"] == r["category"]) &
(donors["store_id"] != r["store_id"]) &
(donors["surplus_qty"] > 0)
].copy()
if pool.empty:
best_route = "No feasible donor"
same_region_options = "No same-region donor"
cross_region_options = "No cross-region donor"
transfer_qty = 0
else:
pool["priority_rank"] = (pool["region"] != r["region"]).astype(int)
pool = pool.sort_values(["priority_rank", "avg_days_until_expiry", "surplus_qty"], ascending=[True, False, False])
same_region = pool[pool["priority_rank"] == 0].head(3)
cross_region = pool[pool["priority_rank"] == 1].head(3)
best = pool.iloc[0]
transfer_qty = int(min(r["unmet_demand"], max(best["surplus_qty"], 0)))
def label(d):
tier = "same-region" if d["priority_rank"] == 0 else "cross-region"
return f"{d['store_id']} ({tier}, expiry {d['avg_days_until_expiry']:.1f}d, surplus {int(d['surplus_qty'])})"
best_route = label(best)
same_region_options = "; ".join(label(d) for _, d in same_region.iterrows()) if not same_region.empty else "No same-region donor"
cross_region_options = "; ".join(label(d) for _, d in cross_region.iterrows()) if not cross_region.empty else "No cross-region donor"
transfer_rows.append({
"store_id": r["store_id"],
"region": r["region"],
"category": r["category"],
"remaining_inventory": int(r["remaining_inventory"]),
"demand": int(r["demand"]),
"unmet_demand": int(r["unmet_demand"]),
"recommended_transfer_qty": transfer_qty,
"best_route": best_route,
"same_region_options": same_region_options,
"cross_region_options": cross_region_options,
})
transfer_df = pd.DataFrame(transfer_rows)
st.caption("Transfer logic: same-region donors are prioritized first. Because the source data has no distance field, donor ranking uses region priority, remaining shelf life, and surplus quantity.")
if transfer_df.empty:
st.success("No filtered store currently shows unmet demand that needs transfer support.")
else:
st.dataframe(transfer_df.sort_values(["unmet_demand", "recommended_transfer_qty"], ascending=[False, False]), use_container_width=True, hide_index=True)
def promotion_page(df: pd.DataFrame):
st.subheader("FRESHIE 路 Promotion Designer")
st.markdown('<div class="note-box"><b>Audience:</b> marketing. Use this page to test markdown depth, bundle logic, and campaign copy.</div>', unsafe_allow_html=True)
st.caption("Promotion designer logic: business-rule simulator. Demand lift is estimated from a base uplift + discount effect + optional bundle effect.")
left, right = st.columns([1, 1.2])
with left:
promo_category = st.selectbox("Promotion category", sorted(df["category"].unique()), key="promo_cat")
expiry_target = st.selectbox("Target expiry bucket", sorted(df["expiry_bucket"].dropna().unique()), key="promo_exp")
discount = st.slider("Discount %", 0, 50, 18, key="promo_disc")
bundle = st.checkbox("Bundle with complementary items", value=True, key="promo_bundle")
weekend_only = st.checkbox("Weekend campaign only", value=False, key="promo_weekend")
sub = df[(df["category"] == promo_category) & (df["expiry_bucket"].astype(str) == str(expiry_target))].copy()
if weekend_only:
sub = sub[sub["is_weekend"] == 1]
demand_lift = 0.08 + discount / 200
if bundle:
demand_lift += 0.06
est_sales_uplift = sub["units_sold"].mean() * demand_lift if len(sub) else 0
est_waste_drop = sub["waste_pct"].mean() * min(0.35, demand_lift) if len(sub) else 0
est_profit = sub["profit"].mean() * (1 + demand_lift - discount / 150) if len(sub) else 0
st.metric("Estimated sales uplift", f"{est_sales_uplift:.2f} units")
st.metric("Estimated waste reduction", f"{est_waste_drop:.1%}")
st.metric("Estimated avg profit", f"EUR {est_profit:.2f}")
st.markdown("### Campaign brief")
campaign_type = "weekend bundle campaign" if bundle and weekend_only else "bundle campaign" if bundle else "markdown campaign"
st.success(f"Run a {discount}% {campaign_type} for {promo_category} items in {expiry_target}.")
with right:
promo_base = sub.groupby("expiry_bucket")[["discount_pct", "waste_pct", "sell_through_pct"]].mean().reset_index() if len(sub) else pd.DataFrame(columns=["expiry_bucket","discount_pct","waste_pct","sell_through_pct"])
if not promo_base.empty:
fig = px.bar(
promo_base,
x="expiry_bucket",
y=["discount_pct", "waste_pct"],
barmode="group",
title=f"Discount vs waste for {promo_category}",
)
st.plotly_chart(fig, use_container_width=True)
fig2 = px.line(
promo_base,
x="expiry_bucket",
y="sell_through_pct",
markers=True,
title=f"Sell-through for {promo_category}",
)
st.plotly_chart(fig2, use_container_width=True)
else:
st.info("No records match the current promotion settings.")
st.markdown("### Recommended promotion copy")
st.info(
f"Fresh pick alert: enjoy {discount}% off selected {promo_category.lower()} items"
+ (" this weekend" if weekend_only else "")
+ (" with smart bundle savings" if bundle else " while they are still at peak freshness")
+ ". Highlight freshness, value, and limited-time availability."
)
def consumer_deals(df: pd.DataFrame):
st.subheader("FRESHIE 路 Deal Finder")
stores = sorted(df["store_id"].dropna().unique())
if not stores:
st.warning("No stores available in the current filter.")
return
chosen_store = st.selectbox("Choose store first", stores)
store_df = df[df["store_id"] == chosen_store].copy()
c1, c2, c3 = st.columns(3)
budget_range = c1.slider("Budget range (EUR)", 1, 60, (1, 20))
preferred_category = c2.selectbox("Preferred category", ["All"] + sorted(store_df["category"].unique()))
expiry_range = c3.slider("Days until expiry", 1, 14, (1, 5))
deals = store_df[
(store_df["days_until_expiry"] >= expiry_range[0]) &
(store_df["days_until_expiry"] <= expiry_range[1]) &
(store_df["selling_price"] >= budget_range[0]) &
(store_df["selling_price"] <= budget_range[1])
].copy()
if preferred_category != "All":
deals = deals[deals["category"] == preferred_category]
deals["savings"] = (deals["base_price"] - deals["selling_price"]).clip(lower=0)
deals["deal_score"] = deals["discount_pct"] * 0.6 + deals["sell_through_pct"] * 0.2 + (1 - deals["waste_pct"]) * 0.2
deals = deals.sort_values(["deal_score", "savings"], ascending=False)
st.markdown('<div class="note-box"><b>Store marketing view:</b> all deal recommendations below are scoped to the selected store so the shopper sees store-specific promotions and products.</div>', unsafe_allow_html=True)
show = deals[["product_name", "category", "days_until_expiry", "base_price", "selling_price", "discount_pct", "savings"]].head(20).copy()
icon_map = {
"Bakery": "馃", "Beverages": "馃", "Dairy": "馃", "Deli": "馃Ш",
"Meat": "馃ォ", "Pharmaceuticals": "馃拪", "Produce": "馃ガ", "Ready_to_Eat": "馃嵄", "Seafood": "馃悷"
}
show["item"] = show.apply(lambda r: f"{icon_map.get(str(r['category']), '馃摝')} {r['product_name']}", axis=1)
st.dataframe(show[["item", "category", "days_until_expiry", "base_price", "selling_price", "discount_pct", "savings"]], use_container_width=True, hide_index=True)
st.markdown("### Best current deals")
top_cards = deals.head(6)
cols = st.columns(3)
for i, (_, row) in enumerate(top_cards.iterrows()):
icon = icon_map.get(str(row["category"]), "馃摝")
with cols[i % 3]:
st.markdown(f"**{icon} {row['product_name']}**")
st.write(f"{row['category']} 路 expires in {int(row['days_until_expiry'])} day(s)")
st.write(f"Now EUR {row['selling_price']:.2f} | Save EUR {row['savings']:.2f}")
st.caption(f"Discount: {row['discount_pct']:.0%} 路 Store: {chosen_store}")
def consumer_bundles(df: pd.DataFrame):
st.subheader("FRESHIE 路 Bundle Builder")
stores = sorted(df["store_id"].dropna().unique())
if not stores:
st.warning("No stores available in the current filter.")
return
chosen_store = st.selectbox("Choose store", stores, key="bundle_store")
store_df = df[df["store_id"] == chosen_store].copy()
c1, c2, c3 = st.columns(3)
budget_range = c1.slider("Bundle budget range (EUR)", 8, 80, (8, 25))
theme = c2.selectbox("Bundle theme", ["Quick dinner", "Healthy protein", "Family breakfast", "Budget saver"])
expiry_range = c3.slider("Use items expiring within days", 1, 10, (1, 5), key="bundle_exp")
work = store_df[
(store_df["days_until_expiry"] >= expiry_range[0]) &
(store_df["days_until_expiry"] <= expiry_range[1]) &
(store_df["selling_price"] <= budget_range[1])
].copy()
theme_map = {
"Quick dinner": ["Ready_to_Eat", "Produce", "Bakery", "Dairy"],
"Healthy protein": ["Meat", "Seafood", "Dairy", "Produce"],
"Family breakfast": ["Bakery", "Dairy", "Beverages", "Produce"],
"Budget saver": list(work["category"].dropna().unique()),
}
work = work[work["category"].isin(theme_map.get(theme, []))].copy()
work["score"] = work["discount_pct"] * 0.5 + (1 - work["waste_pct"]) * 0.2 + work["sell_through_pct"] * 0.3
work = work.sort_values(["score", "selling_price"], ascending=[False, True])
picked, total, used_categories = [], 0.0, set()
for _, row in work.iterrows():
if total + row["selling_price"] <= budget_range[1]:
if theme != "Budget saver" and row["category"] in used_categories:
continue
picked.append(row)
total += row["selling_price"]
used_categories.add(row["category"])
if len(picked) >= 5:
break
if not picked:
st.warning("No bundle fits the current conditions.")
return
bundle = pd.DataFrame(picked)
saved = (bundle["base_price"] - bundle["selling_price"]).clip(lower=0).sum()
k1, k2, k3 = st.columns(3)
k1.metric("Bundle total", f"EUR {total:.2f}")
k2.metric("You save", f"EUR {saved:.2f}")
k3.metric("Items", str(len(bundle)))
st.dataframe(bundle[["product_name", "category", "selling_price", "base_price", "discount_pct", "days_until_expiry"]], use_container_width=True, hide_index=True)
def consumer_personal(df: pd.DataFrame):
st.subheader("FRESHIE 路 Personalized Promotions")
stores = sorted(df["store_id"].dropna().unique())
if not stores:
st.warning("No stores available in the current filter.")
return
chosen_store = st.selectbox("Choose store", stores, key="personal_store")
store_df = df[df["store_id"] == chosen_store].copy()
favorite = st.selectbox("Favorite category", sorted(store_df["category"].unique()), key="cp_fav")
price_range = st.slider("Price range (EUR)", 1, 30, (1, 10), key="cp_cap")
expiry_range = st.slider("Days until expiry", 1, 14, (1, 7), key="cp_days")
recs = store_df[
(store_df["category"] == favorite) &
(store_df["selling_price"] >= price_range[0]) &
(store_df["selling_price"] <= price_range[1]) &
(store_df["days_until_expiry"] >= expiry_range[0]) &
(store_df["days_until_expiry"] <= expiry_range[1])
].copy()
recs["score"] = recs["discount_pct"] * 0.55 + recs["sell_through_pct"] * 0.20 + (1 - recs["waste_pct"]) * 0.25
recs = recs.sort_values("score", ascending=False).head(12)
cols = st.columns(3)
for i, (_, row) in enumerate(recs.iterrows()):
with cols[i % 3]:
st.markdown(f"**{row['product_name']}**")
st.write(f"{row['category']}{chosen_store}")
st.write(f"Now EUR {row['selling_price']:.2f}")
st.write(f"Expires in {int(row['days_until_expiry'])} day(s)")
st.caption(f"Discount: {row['discount_pct']:.0%}")
def manager_manual():
st.subheader("FRESHIE 路 User Manual (Manager)")
st.markdown("""
**Executive Overview**
- Track revenue, profit, units sold, and units wasted by month.
- Use the diagnosis panel to understand abnormal revenue-profit gaps.
**Category Intelligence**
- Compare categories on demand, stock, waste, stockout, and profit.
- Read the category recommendations and forecast for the current selection.
**Inventory & Replenishment**
- Review reorder guidance by category.
- Use the what-if simulator to see the trade-off between waste, profit, and stockout.
**Promotion Designer**
- Test markdown depth, expiry targeting, and bundle logic.
- Use the campaign brief and promotion copy as a starting point for activation.
""")
def consumer_manual():
st.subheader("FRESHIE 路 User Manual (Consumer)")
st.markdown("""
**Deal Finder**
- Browse the strongest discounted products under your preferred budget and expiry window.
**Bundle Builder**
- Build a themed basket while staying under budget.
**Personalized Promotions**
- Focus on your favorite category and a comfortable price cap.
""")
def main():
inject_css()
st.markdown(
"""
<div class="hero-wrap">
<div class="main-title">
<div class="title-left">
<div class="logo-badge">馃惐馃悷</div>
<div>
<div class="brand-title">FRESHIE</div>
<div class="brand-sub">A warm, friendly fresh-food assistant for stores, managers, and everyday shoppers.</div>
</div>
</div>
<div class="hero-art">馃ガ 馃崜 馃悷 馃崬</div>
</div>
<div class="search-pill">馃攷 Fresh, reliable, low-waste decisions across fish, produce, dairy, bakery, and more</div>
</div>
""",
unsafe_allow_html=True,
)
try:
df = load_data()
except Exception as e:
st.error(str(e))
st.stop()
filtered = apply_filters(df)
if filtered.empty:
st.warning("No data left after filtering.")
st.stop()
role = st.radio("Choose your mode", ["Manager", "Consumer"], horizontal=True)
if role == "Manager":
tabs = st.tabs([
"Executive Overview",
"Category Intelligence",
"Inventory & Replenishment",
"Promotion Designer",
"User Manual",
])
with tabs[0]:
executive_overview(filtered)
with tabs[1]:
category_intelligence(filtered)
with tabs[2]:
inventory_page(filtered)
with tabs[3]:
promotion_page(filtered)
with tabs[4]:
manager_manual()
else:
tabs = st.tabs(["Deal Finder", "Bundle Builder", "Personalized Promotions", "User Manual"])
with tabs[0]:
consumer_deals(filtered)
with tabs[1]:
consumer_bundles(filtered)
with tabs[2]:
consumer_personal(filtered)
with tabs[3]:
consumer_manual()
with st.expander("About this app"):
st.markdown("""
- Stable FRESHIE UI baseline.
- Built to avoid the white-screen issues caused by heavier UI logic.
- Next step: reintroduce richer branding and additional consumer features incrementally.
""")
if __name__ == "__main__":
main()