Lilli98's picture
Update app.py
3fb14e8 verified
raw
history blame
22.7 kB
# app.py
"""
Beer Game — Full Streamlit app for Hugging Face Spaces
- Classic parameters (transport delay 2 weeks, order delay 1 week)
- Human = Distributor (must Submit Order, then press Next Week)
- LLM agents (Retailer, Wholesaler, Factory) using OpenAI gpt-4o-mini
- Info sharing toggle + configurable demand history length
- Per-participant sessions (participant_id via URL query param or input)
- Detailed logging (orders, shipments, inventory, backlog, timestamps, raw LLM outputs)
- Automatic upload of per-participant CSV logs to Hugging Face Datasets Hub
"""
import os
import re
import time
import uuid
import random
import json
from datetime import datetime
from pathlib import Path
import streamlit as st
import pandas as pd
import openai
from huggingface_hub import upload_file, HfApi
# ---------------------------
# CONFIGURABLE PARAMETERS
# ---------------------------
# Classic Beer Game choices: choose 24 or 36 depending on experiment design
DEFAULT_WEEKS = 36
TRANSPORT_DELAY = 2 # shipments take 2 weeks to arrive
ORDER_DELAY = 1 # orders incur 1-week processing delay (modeled via pipeline)
INITIAL_INVENTORY = 12
INITIAL_BACKLOG = 0
# OpenAI model to use for agents
OPENAI_MODEL = "gpt-4o-mini"
# Local folder to hold temporary log files before upload
LOCAL_LOG_DIR = Path("logs")
LOCAL_LOG_DIR.mkdir(exist_ok=True)
# ---------------------------
# Helper functions
# ---------------------------
def now_iso():
return datetime.utcnow().isoformat(timespec="milliseconds") + "Z"
def fmt(o):
try:
return json.dumps(o, ensure_ascii=False)
except Exception:
return str(o)
# ---------------------------
# Hugging Face upload helper
# ---------------------------
HF_TOKEN = os.getenv("HF_TOKEN")
HF_REPO_ID = os.getenv("HF_REPO_ID") # e.g., "XinyuLi/beer-game-logs"
hf_api = HfApi()
def upload_log_to_hf(local_path: Path, participant_id: str):
"""
Upload a local CSV file to HF dataset repo under path logs/<participant_id>/...
Requires HF_TOKEN and HF_REPO_ID set as environment variables (Space secrets).
"""
if not HF_TOKEN or not HF_REPO_ID:
st.info("HF_TOKEN or HF_REPO_ID not configured; skipping upload to HF Hub.")
return None
dest_path_in_repo = f"logs/{participant_id}/{local_path.name}"
try:
upload_file(
path_or_fileobj=str(local_path),
path_in_repo=dest_path_in_repo,
repo_id=HF_REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
st.success(f"Uploaded logs to Hugging Face: {HF_REPO_ID}/{dest_path_in_repo}")
return f"https://huggingface.co/datasets/{HF_REPO_ID}/resolve/main/{dest_path_in_repo}"
except Exception as e:
st.error(f"Failed to upload logs to HF Hub: {e}")
return None
# ---------------------------
# OpenAI helper
# ---------------------------
openai.api_key = os.getenv("OPENAI_API_KEY")
def call_llm_for_order(role: str, local_state: dict, info_sharing_visible: bool, demand_history: list, max_tokens=40, temperature=0.7):
"""
Call OpenAI to decide an integer order for `role`.
Returns (order_int, raw_text)
"""
# Compose a careful prompt giving only local info unless info_sharing_visible is True
visible_history = demand_history if info_sharing_visible else []
prompt = (
f"You are the {role} in a 4-player Beer Game (Retailer -> Wholesaler -> Distributor -> Factory).\n"
f"Current week: {local_state['week']}\n"
f"Local state for {role}:\n"
f"- Inventory: {local_state['inventory'][role]}\n"
f"- Backlog: {local_state['backlog'][role]}\n"
f"- Incoming shipment next week (front of pipeline): {local_state['pipeline'][role][0] if local_state['pipeline'][role] else 0}\n"
f"- Incoming order this week: {local_state['incoming_orders'].get(role, 0)}\n"
)
if visible_history:
prompt += f"- Customer demand history (visible to you): {visible_history}\n"
prompt += (
"\nDecide a non-negative integer order quantity to place to your upstream supplier this week.\n"
"Reply with a single integer only. You may optionally append a short one-sentence reason after a dash."
)
try:
resp = openai.ChatCompletion.create(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": "You are an automated Beer Game agent who decides weekly orders."},
{"role": "user", "content": prompt}
],
max_tokens=max_tokens,
temperature=temperature,
n=1
)
raw = resp.choices[0].message.get("content", "").strip()
except Exception as e:
raw = f"OPENAI_ERROR: {str(e)}"
# fallback later
# Extract first integer from model output
m = re.search(r"(-?\d+)", raw or "")
order = None
if m:
try:
order = int(m.group(1))
if order < 0:
order = 0
except:
order = None
# fallback heuristic if parsing failed or error
if order is None:
# simple policy: target inventory = INITIAL_INVENTORY + incoming_order
incoming = local_state['incoming_orders'].get(role, 0) or 0
target = INITIAL_INVENTORY + incoming
order = max(0, target - (local_state['inventory'].get(role, 0) or 0))
raw = (raw + " | PARSE_FALLBACK").strip()
return int(order), raw
# ---------------------------
# Game mechanics
# ---------------------------
def make_classic_demand(weeks: int):
"""
Typical demand: first 4 weeks stable (4), then shock (8) for many weeks, then maybe fluctuations.
We'll implement: weeks 0-3 => 4; weeks 4..(weeks-1) => 8
You can adjust as needed.
"""
demand = []
for t in range(weeks):
if t < 4:
demand.append(4)
else:
demand.append(8)
return demand
def init_game(weeks=DEFAULT_WEEKS):
"""
Return a dict representing full game state for a single participant/session.
"""
roles = ["retailer", "wholesaler", "distributor", "factory"]
state = {
"participant_id": None,
"week": 1,
"weeks_total": weeks,
"roles": roles,
"inventory": {r: INITIAL_INVENTORY for r in roles},
"backlog": {r: INITIAL_BACKLOG for r in roles},
# pipeline: each role has a queue representing shipments that will arrive next weeks;
# we keep length = TRANSPORT_DELAY, front is arriving next week.
"pipeline": {r: [0] * TRANSPORT_DELAY for r in roles},
"incoming_orders": {r: 0 for r in roles}, # orders received this week from downstream
"orders_history": {r: [] for r in roles},
"shipments_history": {r: [] for r in roles},
"logs": [],
"info_sharing": False,
"info_history_weeks": 0,
"customer_demand": make_classic_demand(weeks),
}
return state
def step_game(state: dict, distributor_order: int):
"""
Apply one week's dynamics.
Order of events (typical simplification):
1. Customer demand hits retailer this week.
2. Deliveries that are at pipeline[front] arrive to each role this week.
3. Roles fulfill incoming orders from downstream (if backlog arises).
4. Human (distributor) order is recorded; LLMs decide orders for their roles.
5. Place orders into upstream's pipeline so they will arrive after TRANSPORT_DELAY.
6. Log everything.
"""
week = state["week"]
roles = state["roles"]
# 1) Customer demand for this week to retailer
demand = state["customer_demand"][week - 1] # week is 1-indexed
state["incoming_orders"]["retailer"] = demand
# 2) Shipments arrive (front of pipeline)
arriving = {}
for r in roles:
# Pop front arrival if exists
arr = 0
if len(state["pipeline"][r]) > 0:
arr = state["pipeline"][r].pop(0)
state["inventory"][r] += arr
arriving[r] = arr
# 3) Fulfill incoming orders from downstream (downstream -> this role)
# For each role, the incoming_order is whatever downstream ordered last turn.
# For first week, incoming_orders maybe zero for non-retailer; that's fine.
shipments_out = {}
for r in roles:
incoming = state["incoming_orders"].get(r, 0) or 0
inv = state["inventory"].get(r, 0) or 0
shipped = min(inv, incoming)
state["inventory"][r] -= shipped
# any unfilled becomes backlog
unfilled = incoming - shipped
if unfilled > 0:
state["backlog"][r] += unfilled
shipments_out[r] = shipped
state["shipments_history"][r].append(shipped)
# 4) Record human distributor order (this week's order placed by distributor)
# distributor_order is the order placed to wholesaler by the distributor this week
# Save to orders_history for distributor
state["orders_history"]["distributor"].append(int(distributor_order))
# Also set downstream->upstream linking: the upstream (wholesaler) will see distributor_order as incoming next period
state["incoming_orders"]["wholesaler"] = int(distributor_order)
# 5) LLM decisions for AI roles (retailer, wholesaler, factory)
demand_history_visible = []
if state["info_sharing"] and state["info_history_weeks"] > 0:
start_idx = max(0, (week - 1) - state["info_history_weeks"])
demand_history_visible = state["customer_demand"][start_idx: (week - 1)]
llm_outputs = {}
for role in ["retailer", "wholesaler", "factory"]:
order_val, raw = call_llm_for_order(role, state_snapshot_for_prompt(state), state["info_sharing"], demand_history_visible)
order_val = max(0, int(order_val))
state["orders_history"][role].append(order_val)
llm_outputs[role] = {"order": order_val, "raw": raw}
# set incoming_orders for upstream relation: upstream will see this order next period
# e.g., if retailer orders X, upstream (distributor) incoming_orders will be X
if role == "retailer":
state["incoming_orders"]["distributor"] = order_val
elif role == "wholesaler":
state["incoming_orders"]["factory"] = order_val
# factory's upstream is the supplier/external: we don't model beyond factory
# 6) Place orders into pipelines: these are shipments that will be sent upstream now and arrive after TRANSPORT_DELAY
# In the simple Beer Game, the shipped amounts are based on inventories; but orders placed lead to upstream shipments in future after they process.
# We'll model that orders placed this week translate into future shipments arriving after TRANSPORT_DELAY at the ordering party.
for role in roles:
# Determine the order placed by this role this week:
if role == "distributor":
placed_order = int(distributor_order)
else:
# role in orders_history last appended
placed_order = state["orders_history"][role][-1] if state["orders_history"][role] else 0
# For the downstream partner (the entity that will receive the shipment), we append to that partner's pipeline tail
# Example: distributor placed order to wholesaler -> wholesaler will receive shipment after TRANSPORT_DELAY
# Map role -> downstream partner (who receives shipments from role)
# shipments flow downstream: factory -> wholesaler -> distributor -> retailer
downstream_map = {
"factory": "wholesaler",
"wholesaler": "distributor",
"distributor": "retailer",
"retailer": None
}
downstream = downstream_map.get(role)
if downstream:
# append zeros if pipeline too short to ensure correct index, then append placed_order at tail
# We want the placed_order to be delivered to downstream after TRANSPORT_DELAY weeks (so push at tail)
state["pipeline"][downstream].append(placed_order)
# 7) Log the week's summary
log_entry = {
"timestamp": now_iso(),
"week": week,
"demand": demand,
"arriving": arriving,
"shipments_out": shipments_out,
"orders_submitted": {
"distributor": int(distributor_order),
"retailer": state["orders_history"]["retailer"][-1] if state["orders_history"]["retailer"] else None,
"wholesaler": state["orders_history"]["wholesaler"][-1] if state["orders_history"]["wholesaler"] else None,
"factory": state["orders_history"]["factory"][-1] if state["orders_history"]["factory"] else None,
},
"inventory": dict(state["inventory"]),
"backlog": dict(state["backlog"]),
"info_sharing": state["info_sharing"],
"info_history_weeks": state["info_history_weeks"],
"llm_raw": {k: v["raw"] for k, v in llm_outputs.items()}
}
state["logs"].append(log_entry)
# 8) Advance week
state["week"] += 1
return state
def state_snapshot_for_prompt(state):
"""
Prepare a compact snapshot of state for LLM prompt (avoid sending huge objects).
We'll include week, inventory and backlog for each role and incoming_orders for this week.
"""
snap = {
"week": state["week"],
"inventory": state["inventory"].copy(),
"backlog": state["backlog"].copy(),
"incoming_orders": state["incoming_orders"].copy(),
# pipeline front (arriving next week)
"incoming_shipments_next_week": {r: (state["pipeline"][r][0] if state["pipeline"][r] else 0) for r in state["roles"]}
}
return snap
# ---------------------------
# Persistence: local + HF upload
# ---------------------------
def save_logs_local(state, participant_id):
df = pd.json_normalize(state["logs"])
fname = LOCAL_LOG_DIR / f"logs_{participant_id}_{int(time.time())}.csv"
df.to_csv(fname, index=False)
return fname
def save_and_upload(state, participant_id):
local_path = save_logs_local(state, participant_id)
url = upload_log_to_hf(local_path, participant_id)
return local_path, url
# ---------------------------
# Streamlit UI & session management
# ---------------------------
st.set_page_config(page_title="Beer Game — Distributor (Human) + LLM Agents", layout="wide")
st.title("🍺 Beer Game — Human Distributor vs LLM agents")
# Participant id: prefer query param or user input
qp = st.query_params
pid_from_q = qp.get("participant_id", [None])[0] if qp else None
pid_input = st.text_input("Participant ID (leave blank to auto-generate or use ?participant_id=ID in URL)", value=pid_from_q or "")
if pid_input:
participant_id = pid_input.strip()
else:
if "auto_pid" not in st.session_state:
st.session_state["auto_pid"] = str(uuid.uuid4())[:8]
participant_id = st.session_state["auto_pid"]
st.sidebar.markdown(f"**Participant ID:** `{participant_id}`")
# Multi-session container in st.session_state
if "sessions" not in st.session_state:
st.session_state["sessions"] = {}
if participant_id not in st.session_state["sessions"]:
st.session_state["sessions"][participant_id] = init_game(DEFAULT_WEEKS)
st.session_state["sessions"][participant_id]["participant_id"] = participant_id
state = st.session_state["sessions"][participant_id]
# Sidebar controls: info sharing, demand history slider, quick config
st.sidebar.header("Experiment controls")
state["info_sharing"] = st.sidebar.checkbox("Enable Information Sharing (show customer demand to all roles)", value=state.get("info_sharing", False))
state["info_history_weeks"] = st.sidebar.slider("How many past weeks of demand to share (0 = none)", 0, 8, value=state.get("info_history_weeks", 0))
st.sidebar.markdown("---")
st.sidebar.write("Model for LLM agents:")
st.sidebar.write(OPENAI_MODEL)
st.sidebar.markdown("---")
st.sidebar.write("HF upload settings:")
st.sidebar.write(f"- HF_REPO_ID: {HF_REPO_ID or 'NOT SET'}")
st.sidebar.write(f"- HF_TOKEN: {'SET' if HF_TOKEN else 'NOT SET'}")
# Main UI: show week, metrics, panels
col_main, col_sidebar = st.columns([3, 1])
with col_main:
st.header(f"Week {state['week']} / {state['weeks_total']}")
# show demand for this week (if info sharing or for distributor only?)
demand_display = state["customer_demand"][state["week"] - 1] if state["week"] - 1 < len(state["customer_demand"]) else None
st.subheader(f"Customer demand (retailer receives this week): {demand_display}")
# show role panels in a grid
roles = state["roles"]
panels = st.columns(len(roles))
for i, role in enumerate(roles):
with panels[i]:
st.markdown(f"### {role.title()}")
st.metric("Inventory", state["inventory"][role])
st.metric("Backlog", state["backlog"][role])
incoming = state["incoming_orders"].get(role, 0)
st.write(f"Incoming order (this week): **{incoming}**")
next_shipment = state["pipeline"][role][0] if state["pipeline"][role] else 0
st.write(f"Incoming shipment next week: **{next_shipment}**")
st.markdown("---")
# Distributor input box + submit button
with st.form(key=f"order_form_{participant_id}", clear_on_submit=False):
st.write("### Your (Distributor) decision this week")
default_val = state["incoming_orders"].get("distributor", 4) or 4
distributor_order = st.number_input("Order to place to upstream (Wholesaler):", min_value=0, step=1, value=default_val)
submitted = st.form_submit_button("Submit Order (locks your decision)")
if submitted:
# store pending order in session until Next Week pressed
st.session_state.setdefault("pending_orders", {})
st.session_state["pending_orders"][participant_id] = int(distributor_order)
st.success(f"Order submitted: {distributor_order}. Now click 'Next Week' to process the week.")
st.markdown("---")
# Next Week button: only enabled if pending order exists
pending = st.session_state.get("pending_orders", {}).get(participant_id, None)
if pending is None:
st.info("Please submit your order first to enable Next Week processing.")
else:
if st.button("Next Week — process week and invoke LLM agents"):
# step game with pending order
try:
state = step_game(state, pending)
# save state back
st.session_state["sessions"][participant_id] = state
# auto-save logs to HF after each week (can change to only end of game)
local_path = save_logs_local_and_return(state, participant_id) if 'save_logs_local_and_return' in globals() else None
# default: immediate upload
local_file = save_logs_local(state, participant_id)
uploaded_url = None
if HF_TOKEN and HF_REPO_ID:
uploaded_url = upload_log_to_hf(local_file, participant_id)
# remove pending order
del st.session_state["pending_orders"][participant_id]
st.success(f"Week processed. Advanced to week {state['week']}.")
if uploaded_url:
st.info(f"Logs uploaded to HF: {uploaded_url}")
except Exception as e:
st.error(f"Error during Next Week processing: {e}")
st.markdown("### Recent logs")
if state["logs"]:
# show last 6 logs in a readable table
df = pd.json_normalize(state["logs"][-6:])
st.dataframe(df, use_container_width=True)
else:
st.write("No logs yet. Submit your first order and press Next Week.")
with col_sidebar:
st.subheader("Information Sharing (preview)")
st.write("Toggle on to share real customer demand (current + recent weeks) with all LLM agents.")
st.write(f"Sharing {state['info_history_weeks']} weeks of history (0 = only current week).")
if state["info_sharing"]:
# display recent demand history according to slider
h = state["info_history_weeks"]
start = max(0, (state["week"] - 1) - h)
hist = state["customer_demand"][start: state["week"]]
st.write("Demand visible to agents:", hist)
st.markdown("---")
st.subheader("Admin / Debug")
if st.button("Test LLM connection"):
if not openai.api_key:
st.error("OpenAI API key is missing. Set OPENAI_API_KEY in Space Secrets.")
else:
# quick test prompt
try:
test_prompt = "You are a helpful agent. Reply with '42'."
resp = openai.ChatCompletion.create(
model=OPENAI_MODEL,
messages=[{"role":"user","content":test_prompt}],
max_tokens=10
)
st.write("LLM raw:", resp.choices[0].message.get("content"))
except Exception as e:
st.error(f"LLM test failed: {e}")
st.markdown("---")
if st.button("Save logs now (manual)"):
if not state["logs"]:
st.info("No logs to save yet.")
else:
local_file = save_logs_local(state, participant_id)
if HF_TOKEN and HF_REPO_ID:
url = upload_log_to_hf(local_file, participant_id)
if url:
st.success("Logs uploaded.")
else:
st.success(f"Saved local file: {local_file}")
# ---------------------------
# Utility save functions (placed after UI to avoid NameError in some deployments)
# ---------------------------
def save_logs_local(state: dict, participant_id: str):
"""
Save logs to local logs directory and return Path.
"""
df = pd.json_normalize(state["logs"])
fname = LOCAL_LOG_DIR / f"logs_{participant_id}_{int(time.time())}.csv"
df.to_csv(fname, index=False)
return fname
# alias used earlier if present
def save_logs_local_and_return(state: dict, participant_id: str):
return save_logs_local(state, participant_id)
# ---------------------------
# End-of-game auto actions
# ---------------------------
# If game has finished for this participant, offer final download / upload
if state["week"] > state["weeks_total"]:
st.success("Game completed for this participant.")
# prepare final CSV
final_csv = save_logs_local(state, participant_id)
with open(final_csv, "rb") as f:
st.download_button("Download final logs CSV", data=f, file_name=final_csv.name, mime="text/csv")
if HF_TOKEN and HF_REPO_ID:
url = upload_log_to_hf(final_csv, participant_id)
if url:
st.write(f"Final logs uploaded to HF Hub: {url}")