Lilli98's picture
Update app.py
e3cdb59 verified
raw
history blame
21.3 kB
# app.py
# @title 啤酒游戏最终整合版 (Streamlit 交互应用 + Hugging Face 日志上传)
# -----------------------------------------------------------------------------
# 1. 导入必要的库
# -----------------------------------------------------------------------------
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from collections import deque
import time
import openai
import re
import random
import uuid
import os
from pathlib import Path
from datetime import datetime
from huggingface_hub import HfApi, upload_file
# -----------------------------------------------------------------------------
# 2. 配置游戏核心参数和API密钥
# -----------------------------------------------------------------------------
# --- 游戏参数 ---
WEEKS = 24
INITIAL_INVENTORY = 12
INITIAL_BACKLOG = 0
ORDER_PASSING_DELAY = 1
SHIPPING_DELAY = 2
FACTORY_LEAD_TIME = 1
FACTORY_SHIPPING_DELAY = 1
HOLDING_COST = 0.5
BACKLOG_COST = 1.0
# --- 模型和日志配置 ---
OPENAI_MODEL = "gpt-4o-mini"
LOCAL_LOG_DIR = Path("logs")
LOCAL_LOG_DIR.mkdir(exist_ok=True) # Ensure the log directory exists
# --- API & Secrets 配置 (从 Streamlit Secrets 读取) ---
try:
# OpenAI
client = openai.OpenAI(api_key=st.secrets["OPENAI_API_KEY"])
# Hugging Face
HF_TOKEN = st.secrets.get("HF_TOKEN")
HF_REPO_ID = st.secrets.get("HF_REPO_ID") # e.g., "YourUser/beer-game-logs"
if HF_TOKEN:
hf_api = HfApi()
else:
hf_api = None
except Exception as e:
st.error(f"启动时读取Secrets出错: {e}")
st.info("请确保在Streamlit的Secrets中设置了 OPENAI_API_KEY。可选设置 HF_TOKEN 和 HF_REPO_ID 用于上传日志。")
client = None
HF_TOKEN = None
HF_REPO_ID = None
hf_api = None
# -----------------------------------------------------------------------------
# 3. 游戏核心逻辑函数 (大部分源自代码1, 并为Streamlit适配)
# -----------------------------------------------------------------------------
def get_customer_demand(week: int) -> int:
"""定义终端客户需求函数"""
return 4 if week <= 4 else 8
def init_game_state(llm_personality: str, info_sharing: str):
"""初始化或重置游戏状态,并储存在 st.session_state 中"""
roles = ["Retailer", "Wholesaler", "Distributor", "Factory"]
human_role = random.choice(roles)
participant_id = str(uuid.uuid4())[:8] # Generate a unique ID for this game session
st.session_state.game_state = {
'game_running': True,
'participant_id': participant_id,
'week': 1,
'human_role': human_role,
'llm_personality': llm_personality,
'info_sharing': info_sharing,
'logs': [], # Changed from 'history' to 'logs' for more detailed logging
'echelons': {},
'factory_production_pipeline': deque([0] * FACTORY_LEAD_TIME, maxlen=FACTORY_LEAD_TIME),
}
# 为每个角色初始化状态
for i, name in enumerate(roles):
upstream = roles[i + 1] if i + 1 < len(roles) else None
downstream = roles[i - 1] if i - 1 >= 0 else None
if name == "Distributor": shipping_weeks = FACTORY_SHIPPING_DELAY
elif name == "Factory": shipping_weeks = 0
else: shipping_weeks = SHIPPING_DELAY
st.session_state.game_state['echelons'][name] = {
'name': name, 'upstream_name': upstream, 'downstream_name': downstream,
'inventory': INITIAL_INVENTORY, 'backlog': INITIAL_BACKLOG,
'order_pipeline': deque([0] * ORDER_PASSING_DELAY, maxlen=ORDER_PASSING_DELAY),
'incoming_shipments': deque([0] * shipping_weeks, maxlen=shipping_weeks),
'incoming_order': 0, 'order_placed': 0, 'shipment_sent': 0,
'weekly_cost': 0, 'total_cost': 0,
}
st.info(f"新游戏开始!AI模式: **{llm_personality} / {info_sharing}**。您的角色: **{human_role}**。本次游戏ID: `{participant_id}`")
def get_llm_order_decision(prompt: str, echelon_name: str, current_week: int, personality: str) -> (int, str):
"""调用 OpenAI API 获取决策,并返回决策和原始文本"""
if not client:
st.warning("API Key未设置,LLM将使用默认值8。")
return 8, "NO_API_KEY_DEFAULT"
with st.spinner(f"正在为 {echelon_name} 获取AI决策..."):
temp = 0.1 if personality == 'perfect_rational' else 0.7
try:
response = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[
{"role": "system", "content": "You are a supply chain manager playing the Beer Game. Your response must be only an integer number representing your order quantity and nothing else. For example: 8"},
{"role": "user", "content": prompt}
],
temperature=temp,
max_tokens=10
)
raw_text = response.choices[0].message.content.strip()
match = re.search(r'\d+', raw_text)
if match:
return int(match.group(0)), raw_text
else:
st.warning(f"LLM for {echelon_name} 未返回有效数字,将使用默认值 8。原始返回: '{raw_text}'")
return 8, raw_text
except Exception as e:
st.error(f"API调用失败 for {echelon_name}。错误: {e}。将使用默认值 8。")
return 8, f"API_ERROR: {e}"
def get_llm_prompt(echelon_state: dict, week: int, llm_personality: str, info_sharing: str, all_echelons_state: dict) -> str:
"""生成LLM的提示词 (核心逻辑完全来自代码1)"""
# (此函数内容与上一版完全相同,为简洁省略,实际代码中应完整保留)
base_info = f"""
Your Current Status at the **{echelon_state['name']}** for **Week {week}**:
- On-hand inventory: {echelon_state['inventory']} units.
- Backlog (unfilled orders): {echelon_state['backlog']} units.
- Incoming order this week (from your customer): {echelon_state['incoming_order']} units.
- Shipments on the way to you: {list(echelon_state['incoming_shipments'])}
- Orders you have placed being processed by your supplier: {list(echelon_state['order_pipeline'])}
"""
# 场景 1: 完美理性 x 完全信息
if llm_personality == 'perfect_rational' and info_sharing == 'full':
stable_demand = 8; total_lead_time = ORDER_PASSING_DELAY + SHIPPING_DELAY; safety_stock = 4
target_inventory_level = (stable_demand * total_lead_time) + safety_stock
inventory_position = (echelon_state['inventory'] - echelon_state['backlog'] + sum(echelon_state['incoming_shipments']) + sum(echelon_state['order_pipeline']))
optimal_order = max(0, int(target_inventory_level - inventory_position))
return f"**You are a perfectly rational supply chain AI with full system visibility.**\nYour only goal is to maintain stability and minimize costs based on mathematical optimization.\n**System Analysis:**\n* **Known Stable End-Customer Demand:** {stable_demand} units/week.\n* **Your Current Total Inventory Position:** {inventory_position} units.\n* **Optimal Target Inventory Level:** {target_inventory_level} units.\n* **Mathematically Optimal Order:** The optimal order is **{optimal_order} units**.\n**Your Task:** Confirm this optimal quantity. Respond with a single integer."
# 场景 2: 完美理性 x 本地信息
elif llm_personality == 'perfect_rational' and info_sharing == 'local':
safety_stock = 4; anchor_demand = echelon_state['incoming_order']
inventory_correction = safety_stock - (echelon_state['inventory'] - echelon_state['backlog'])
supply_line = sum(echelon_state['incoming_shipments']) + sum(echelon_state['order_pipeline'])
calculated_order = anchor_demand + inventory_correction - supply_line
rational_local_order = max(0, int(calculated_order))
return f"**You are a perfectly rational supply chain AI with ONLY LOCAL information.**\nYou must use a logical heuristic to make a stable decision. A proven method is \"Anchoring and Adjustment\".\n\n{base_info}\n\n**Rational Calculation (Anchoring & Adjustment):**\n1. **Anchor on Demand:** Your best guess for future demand is your last incoming order: **{anchor_demand} units**.\n2. **Adjust for Inventory:** You want to hold a safety stock of {safety_stock} units. Your current stock is {echelon_state['inventory'] - echelon_state['backlog']}. You need to order an extra **{inventory_correction} units** to correct this.\n3. **Account for Supply Line:** You already have **{supply_line} units** in transit or being processed. These should be subtracted from your new order.\n\n**Final Calculation:**\n* Order = (Anchor Demand) + (Inventory Adjustment) - (Supply Line)\n* Order = {anchor_demand} + {inventory_correction} - {supply_line} = **{rational_local_order} units**.\n\n**Your Task:** Confirm this locally rational quantity. Respond with a single integer."
# 场景 3: 类人 x 完全信息
elif llm_personality == 'human_like' and info_sharing == 'full':
full_info_str = f"\n**Full Supply Chain Information:**\n- End-Customer Demand this week: {get_customer_demand(week)} units.\n"
for name, e_state in all_echelons_state.items():
if name != echelon_state['name']: full_info_str += f"- {name}: Inventory={e_state['inventory']}, Backlog={e_state['backlog']}\n"
return f"**You are a supply chain manager with full visibility across the entire system.**\nYou can see everyone's inventory and the real customer demand. Your goal is to use this information to make a smart, coordinated decision. However, you are still human and might get anxious about your own stock levels.\n{base_info}\n{full_info_str}\n**Your Task:** Look at the full picture, especially the stable end-customer demand. Try to avoid causing the bullwhip effect. However, also consider your own inventory pressure. What quantity should you order this week? Respond with a single integer."
# 场景 4: 类人 x 本地信息
elif llm_personality == 'human_like' and info_sharing == 'local':
return f"**You are a reactive supply chain manager for the {echelon_state['name']}.** You have a limited view and tend to over-correct based on fear.\n\n**Your Mindset: **Your top priority is try to not have a backlog.\n\n{base_info}\n\n**Your Task:** You just saw your own inventory and a new order coming. Your gut instinct is to panic and order enough to ensure you are never caught with a backlog again.\n\n**React emotionally.** What is your knee-jerk order quantity? Respond with a single integer."
def step_game(human_final_order: int):
"""推进一周的游戏进程,并记录详细日志"""
state = st.session_state.game_state
week = state['week']
echelons = state['echelons']
human_role = state['human_role']
llm_personality = state['llm_personality']
info_sharing = state['info_sharing']
echelon_order = ["Retailer", "Wholesaler", "Distributor", "Factory"]
llm_raw_responses = {}
# --- 游戏流程 ---
# 1. 工厂生产完成 & 2. 各环节接收货物
factory_state = echelons["Factory"]
if state['factory_production_pipeline']: factory_state['inventory'] += state['factory_production_pipeline'].popleft()
for name in ["Retailer", "Wholesaler", "Distributor"]:
if echelons[name]['incoming_shipments']: echelons[name]['inventory'] += echelons[name]['incoming_shipments'].popleft()
# 3. 各环节接收订单
for name in echelon_order:
if name == "Retailer": echelons[name]['incoming_order'] = get_customer_demand(week)
else:
downstream = echelons[name]['downstream_name']
if downstream and echelons[downstream]['order_pipeline']:
echelons[name]['incoming_order'] = echelons[downstream]['order_pipeline'].popleft()
# 4. 满足订单并发货
for name in echelon_order:
e = echelons[name]
demand = e['incoming_order'] + e['backlog']
e['shipment_sent'] = min(e['inventory'], demand)
e['inventory'] -= e['shipment_sent']
e['backlog'] = demand - e['shipment_sent']
# 5. 发货在途
for sender in ["Factory", "Distributor", "Wholesaler"]:
receiver = echelons[sender]['downstream_name']
if receiver: echelons[receiver]['incoming_shipments'].append(echelons[sender]['shipment_sent'])
# 6. 各环节下订单
for name in echelon_order:
e = echelons[name]
if name == human_role:
order_amount, raw_resp = human_final_order, "HUMAN_INPUT"
st.sidebar.write(f"✔️ 你 ({name}) 的最终订单: {order_amount}")
else:
prompt = get_llm_prompt(e, week, llm_personality, info_sharing, echelons)
order_amount, raw_resp = get_llm_order_decision(prompt, name, week, llm_personality)
st.sidebar.write(f"🤖 AI ({name}) 的订单: {order_amount}")
llm_raw_responses[name] = raw_resp
e['order_placed'] = max(0, order_amount)
if name != "Factory": e['order_pipeline'].append(e['order_placed'])
# 7. 工厂安排生产
state['factory_production_pipeline'].append(echelons["Factory"]['order_placed'])
# 8. 更新成本
for name in echelon_order:
e = echelons[name]
e['weekly_cost'] = (e['inventory'] * HOLDING_COST) + (e['backlog'] * BACKLOG_COST)
e['total_cost'] += e['weekly_cost']
# 9. 记录详细日志
log_entry = {
'timestamp': datetime.utcnow().isoformat() + "Z", 'week': week,
'participant_id': state['participant_id'], 'human_role': human_role,
'llm_personality': llm_personality, 'info_sharing': info_sharing,
'customer_demand': get_customer_demand(week),
}
for name in echelon_order:
e = echelons[name]
log_entry[f'{name}.inventory'] = e['inventory']
log_entry[f'{name}.backlog'] = e['backlog']
log_entry[f'{name}.incoming_order'] = e['incoming_order']
log_entry[f'{name}.order_placed'] = e['order_placed']
log_entry[f'{name}.shipment_sent'] = e['shipment_sent']
log_entry[f'{name}.weekly_cost'] = e['weekly_cost']
log_entry[f'{name}.total_cost'] = e['total_cost']
log_entry[f'{name}.llm_raw_response'] = llm_raw_responses.get(name, "")
state['logs'].append(log_entry)
# 10. 推进周数
state['week'] += 1
if state['week'] > WEEKS:
state['game_running'] = False
def plot_results(df: pd.DataFrame, title: str):
"""绘制结果图表 (源自代码1)"""
fig, axes = plt.subplots(3, 1, figsize=(12, 16)); fig.suptitle(title, fontsize=16)
echelons = ['Retailer', 'Wholesaler', 'Distributor', 'Factory']
# 提取用于绘图的数据
plot_data = []
for _, row in df.iterrows():
for e in echelons:
plot_data.append({
'week': row['week'], 'echelon': e,
'inventory': row[f'{e}.inventory'], 'order_placed': row[f'{e}.order_placed'],
'total_cost': row[f'{e}.total_cost']
})
plot_df = pd.DataFrame(plot_data)
# 绘图逻辑 (与之前版本相同)
inventory_pivot = plot_df.pivot(index='week', columns='echelon', values='inventory').reindex(columns=echelons)
inventory_pivot.plot(ax=axes[0], kind='line', marker='o', markersize=4); axes[0].set_title('Inventory Levels'); axes[0].grid(True, linestyle='--')
order_pivot = plot_df.pivot(index='week', columns='echelon', values='order_placed').reindex(columns=echelons)
order_pivot.plot(ax=axes[1], style='--'); axes[1].plot(range(1, WEEKS + 1), [get_customer_demand(w) for w in range(1, WEEKS + 1)], label='Customer Demand', color='black', lw=2.5); axes[1].set_title('Order Quantities (Bullwhip Effect)'); axes[1].grid(True, linestyle='--'); axes[1].legend()
total_costs = plot_df.groupby('echelon')['total_cost'].max().reindex(echelons)
total_costs.plot(kind='bar', ax=axes[2], rot=0); axes[2].set_title('Total Cumulative Cost')
plt.tight_layout(rect=[0, 0, 1, 0.96]); return fig
def save_logs_and_upload(state: dict):
"""在游戏结束后,保存日志到本地并尝试上传到Hugging Face"""
if not state.get('logs'):
st.warning("没有可保存的日志。")
return
participant_id = state['participant_id']
df = pd.json_normalize(state['logs'])
fname = LOCAL_LOG_DIR / f"log_{participant_id}_{int(time.time())}.csv"
df.to_csv(fname, index=False)
st.success(f"日志已成功保存到本地: `{fname}`")
# 提供下载按钮
with open(fname, "rb") as f:
st.download_button("📥 下载日志CSV文件", data=f, file_name=fname.name, mime="text/csv")
# 尝试上传到Hugging Face
if HF_TOKEN and HF_REPO_ID and hf_api:
with st.spinner("正在上传日志到 Hugging Face Hub..."):
try:
dest_path = f"logs/{fname.name}"
url = hf_api.upload_file(
path_or_fileobj=str(fname),
path_in_repo=dest_path,
repo_id=HF_REPO_ID,
repo_type="dataset",
token=HF_TOKEN
)
st.success(f"✅ 日志已成功上传到 Hugging Face! [查看文件]({url})")
except Exception as e:
st.error(f"上传到 Hugging Face 失败: {e}")
else:
st.info("未配置Hugging Face的 HF_TOKEN 或 HF_REPO_ID, 将跳过上传。")
# -----------------------------------------------------------------------------
# 4. Streamlit UI 界面
# -----------------------------------------------------------------------------
st.set_page_config(page_title="啤酒游戏-人机协作版", layout="wide")
st.title("🍺 啤酒游戏:人机协作挑战")
st.markdown("你将扮演供应链中的一个角色,与另外三个由大语言模型(LLM)驱动的AI代理合作。")
# --- 游戏设置和初始化 ---
if 'game_state' not in st.session_state or not st.session_state.game_state.get('game_running', False):
st.header("🎮 开始新游戏")
col1, col2 = st.columns(2)
with col1:
llm_personality = st.selectbox("AI '性格'", ('human_like', 'perfect_rational'), format_func=lambda x: x.replace('_', ' ').title())
with col2:
info_sharing = st.selectbox("信息共享", ('local', 'full'), format_func=lambda x: x.title())
if st.button("🚀 开始游戏", type="primary"):
init_game_state(llm_personality, info_sharing)
st.rerun()
# --- 游戏主界面 ---
elif 'game_state' in st.session_state and st.session_state.game_state.get('game_running'):
state = st.session_state.game_state
week, human_role, echelons = state['week'], state['human_role'], state['echelons']
st.header(f"第 {week} 周 / 共 {WEEKS} 周")
st.subheader(f"你的角色: **{human_role}** | AI模式: **{state['llm_personality'].replace('_', ' ')}** | 信息: **{state['info_sharing']}**")
cols = st.columns(4)
for i, name in enumerate(["Retailer", "Wholesaler", "Distributor", "Factory"]):
with cols[i]:
e, title_icon = echelons[name], "👤" if name == human_role else "🤖"
st.markdown(f"### {title_icon} {name} {'(你)' if name == human_role else '(AI)'}")
st.metric("库存", e['inventory']); st.metric("缺货/积压", e['backlog'])
st.write(f"本周收到订单: **{e['incoming_order']}**")
st.write(f"下周到货: **{list(e['incoming_shipments'])[0] if e['incoming_shipments'] else 0}**")
st.markdown("---")
st.header("你的决策")
human_echelon_state = echelons[human_role]
prompt_sugg = get_llm_prompt(human_echelon_state, week, state['llm_personality'], state['info_sharing'], echelons)
ai_suggestion, _ = get_llm_order_decision(prompt_sugg, f"{human_role} (Suggestion)", week, state['llm_personality'])
st.info(f"💡 AI建议你 ({human_role}) 本周向上游订购 **{ai_suggestion}** 单位。")
with st.form(key="order_form"):
final_order = st.number_input("请输入你的最终订单数量:", min_value=0, step=1, value=ai_suggestion)
if st.form_submit_button(label="✅ 提交订单并进入下一周"):
step_game(int(final_order)); st.rerun()
st.sidebar.header("游戏信息")
st.sidebar.markdown(f"**游戏ID**: `{state['participant_id']}`")
st.sidebar.markdown(f"**当前周**: {week-1} (已完成)")
if st.sidebar.button("🔄 重置游戏"):
del st.session_state.game_state; st.rerun()
# --- 游戏结束界面 ---
if 'game_state' in st.session_state and not st.session_state.game_state.get('game_running', False) and st.session_state.game_state['week'] > WEEKS:
st.header("🎉 游戏结束!")
state = st.session_state.game_state
logs_df = pd.json_normalize(state['logs'])
title = f"Beer Game (Human: {state['human_role']})\n(AI: {state['llm_personality'].replace('_', ' ').title()} | Info: {state['info_sharing'].title()})"
fig = plot_results(logs_df, title)
st.pyplot(fig)
# 保存并上传日志
save_logs_and_upload(state)
if st.button("✨ 开始一局新游戏"):
del st.session_state.game_state; st.rerun()