Wilson826's picture
Upload app.py
6737219 verified
import pandas as pd
import requests
import math
import os
import tempfile
import gradio as gr
import traceback
from openpyxl import load_workbook
from datetime import datetime
# ================== 配置 ==================
APP_USERNAME = os.environ.get("APP_USERNAME")
APP_PASSWORD = os.environ.get("APP_PASSWORD")
if not APP_USERNAME or not APP_PASSWORD:
raise ValueError("APP_USERNAME / APP_PASSWORD not set")
DATABASE_ID = "2caee2cb9a5d8069a8b7c54113f266c1"
NOTION_TOKEN = os.environ.get("NOTION_TOKEN")
if not NOTION_TOKEN:
raise ValueError("NOTION_TOKEN not setup yet")
HEADERS = {
"Authorization": f"Bearer {NOTION_TOKEN}",
"Content-Type": "application/json",
"Notion-Version": "2022-06-28"
}
TEMPLATE_PATH = "template.xlsx"
# ================== Fee Configuration ==================
RATES = {
"SHEIN": {
"T01": {
"$125/entry_cap 10 entries/MAWB (2025/9)": {
"enabled": False, # False为停用,True为启用
"hts": {
"max_per_entry": 999,
"entry_fee": 125,
"cap_entry_count": 10,
"cap_fee": 1250,
},
"wh_rate": 0.18,
"ces_transfer_rate": 0.23,
},
"$100/entry_no cap (2026/2)": {
"enabled": True,# False为停用,True为启用
"hts": {
"max_per_entry": 999,
"entry_fee": 100,
"cap_entry_count": None,
"cap_fee": None,
},
"wh_rate": 0.18,
"ces_transfer_rate": 0.23,
},
}
# 未来可添加其他 Service Level,如 "T02": {...}
}
# 未来可添加其他客户
}
# ================== Notion 辅助函数 ==================
def _get_rich_text(p, name):
arr = p.get(name, {}).get("rich_text", [])
return arr[0]["text"]["content"] if arr else ""
def _get_title(p, name):
arr = p.get(name, {}).get("title", [])
return arr[0]["text"]["content"] if arr else ""
def _get_number(p, name):
return p.get(name, {}).get("number")
def _get_checkbox(p, name):
return p.get(name, {}).get("checkbox", False)
# ================== Notion 数据获取 ==================
def query_notion_database(logs):
logs.append("Starting to query the Notion database...")
results = []
try:
has_more = True
start_cursor = None
while has_more:
body = {"page_size": 100}
if start_cursor:
body["start_cursor"] = start_cursor
r = requests.post(
f"https://api.notion.com/v1/databases/{DATABASE_ID}/query",
headers=HEADERS,
json=body,
timeout=30
)
r.raise_for_status()
data = r.json()
for page in data.get("results", []):
p = page.get("properties", {})
results.append({
"MAWB": _get_title(p, "MAWB"),
"Customer": _get_rich_text(p, "Customer"),
"Service Level": _get_rich_text(p, "Service Level"),
"POD": _get_rich_text(p, "POD"),
"Parcels": _get_number(p, "Parcels") or 0,
"CWT": _get_number(p, "CWT"),
"HTS Count": _get_number(p, "HTS Count") or 0,
"ISC Fee": _get_number(p, "ISC Fee") or 0,
"BreakDown Fee": _get_number(p, "BreakDown Fee") or 0,
"Bypass Fee": _get_number(p, "Bypass Fee") or 0,
"CES Inspection Fee": _get_number(p, "CES Inspection Fee") or 0,
"Delivery Fee": _get_number(p, "Delivery Fee") or 0,
"ETA": _get_rich_text(p, "ETA"),
"AP": _get_checkbox(p, "AP"),
"AR": _get_checkbox(p, "AR")
})
has_more = data.get("has_more", False)
start_cursor = data.get("next_cursor")
logs.append(f"The query of the Notion database has been completed,There are a total of {len(results)} records")
return results
except Exception:
logs.append("Notion query error")
logs.append(traceback.format_exc())
return []
# ================== 费用计算 ==================
def calculate_hts_fee(hts_count, hts_rate):
entry_count = math.ceil(hts_count / hts_rate["max_per_entry"])
# 有 cap 时,超出上限直接返回封顶费用
if hts_rate["cap_entry_count"] is not None and entry_count > hts_rate["cap_entry_count"]:
return hts_rate["cap_fee"], entry_count
if entry_count == 1:
return hts_rate["entry_fee"], entry_count
full_entries = entry_count - 1
last_entry_hts = hts_count - full_entries * hts_rate["max_per_entry"]
fee = full_entries * hts_rate["entry_fee"] + min(last_entry_hts, hts_rate["entry_fee"])
return fee, entry_count
def calculate_wh_fee(cwt, rate):
return round(cwt * rate, 2)
def calculate_total_fee(hts, wh, isc, bd, bypass, ces_i, ces_t, lm):
return round(hts + wh + isc + bd + bypass + ces_i + ces_t + lm, 2)
def is_missing_cwt(val):
return val is None or val == 0
# ================== 生成 Excel ==================
def generate_excel_statement(notion_data, logs, customer, service_level, rate_card):
logs.append(f"Filter out the MAWBs of {customer} / AP = Done / AR = Pending / {service_level} / Rate Card: {rate_card}")
rows = [
d for d in notion_data
if d["Customer"] == customer
and d["AP"] is True
and d["AR"] is False
and d["Service Level"] == service_level
]
if not rows:
logs.append("No MAWB that meets the requirements exists.")
return None
logs.append(f"eligible MAWB:{len(rows)}")
# ===== CWT 强校验 =====
missing_cwt_mawbs = [r["MAWB"] for r in rows if is_missing_cwt(r["CWT"])]
if missing_cwt_mawbs:
logs.append("The following MAWBs are missing CWT. Please try again after updating the CWT via Hugging face.")
for m in missing_cwt_mawbs:
logs.append(f" - {m}")
return None
try:
customer_rates = RATES.get(customer, {}).get(service_level, {}).get(rate_card)
if not customer_rates:
logs.append(f"Rate config missing for customer: {customer}, service level: {service_level}, rate card: {rate_card}")
return None
wb = load_workbook(TEMPLATE_PATH)
ws = wb.active
start_row = 11
current_row = start_row
for r in rows:
hts_fee, entry_cnt = calculate_hts_fee(r["HTS Count"], customer_rates["hts"])
wh_fee = calculate_wh_fee(r["CWT"], customer_rates["wh_rate"])
ces_inspection_fee = r["CES Inspection Fee"]
if not ces_inspection_fee:
ces_transfer_fee = 0
else:
ces_transfer_fee = round(r["CWT"] * customer_rates["ces_transfer_rate"], 2)
lm_delivery_fee = r["Delivery Fee"] or 0
total_fee = calculate_total_fee(
hts_fee,
wh_fee,
r["ISC Fee"],
r["BreakDown Fee"],
r["Bypass Fee"],
ces_inspection_fee,
ces_transfer_fee,
lm_delivery_fee
)
try:
pickup = (pd.to_datetime(r["ETA"]) + pd.Timedelta(days=1)).strftime("%Y/%m/%d")
except (ValueError, TypeError) as e:
logs.append(f"Warning: MAWB {r['MAWB']} ETA parse failed: {e}")
pickup = ""
data = [
r["Service Level"], # 1 Clearance Mode
r["POD"], # 2 POD
r["MAWB"], # 3 MAWB
"GOFO", # 4 LM Carrier
r["Parcels"], # 5 Parcels
0, # 6 Exam Hold
r["CWT"], # 7 CWT (KG)
entry_cnt, # 8 7501-Form Entry submissions
r["HTS Count"], # 9 HTS Code Count
pickup, # 10 Pickup Time
hts_fee, # 11 HTS Processing Fee
wh_fee, # 12 WH Handling Fee
r["ISC Fee"], # 13 Import Service Fee
r["BreakDown Fee"], # 14 Loose cargo BreakDown Fee
r["Bypass Fee"], # 15 Bypass Fee
0, # 16 Exam Sorting Fee
ces_inspection_fee, # 17 CES Inspection Fee
ces_transfer_fee, # 18 CES Transfer Fee
0, # 19 Material Fee
0, # 20 Storage Fee
lm_delivery_fee, # 21 LM Delivery Fee
total_fee # 22 Total
]
for col, val in enumerate(data, start=1):
ws.cell(row=current_row, column=col, value=val)
logs.append(f"Generate MAWB {r['MAWB']}")
current_row += 1
now = datetime.now()
filename = f"{customer} Invoice {now.strftime('%B')}_{now.strftime('%Y')}.xlsx"
file_path = os.path.join(tempfile.gettempdir(), filename)
wb.save(file_path)
logs.append(f"Excel statement generation completed:{filename}")
return file_path
except Exception:
logs.append("Excel statement generation error")
logs.append(traceback.format_exc())
return None
# ================== Gradio UI ==================
def get_service_levels(customer):
"""根据客户获取可用的 Service Level 列表"""
return list(RATES.get(customer, {}).keys())
def get_rate_cards(customer, service_level):
"""根据客户和 Service Level 获取可用的 Rate Card 列表(仅返回 enabled=True 的)"""
rate_cards = RATES.get(customer, {}).get(service_level, {})
return [name for name, config in rate_cards.items() if config.get("enabled", True)]
def run_statement(customer, service_level, rate_card):
logs = []
notion_data = query_notion_database(logs)
if not notion_data:
return None, "\n".join(logs)
file_path = generate_excel_statement(notion_data, logs, customer, service_level, rate_card)
return file_path, "\n".join(logs)
with gr.Blocks() as demo:
gr.Markdown("## Apex-Ecomm Billing Statement Generator")
customer_pick = gr.Dropdown(
choices=list(RATES.keys()),
value="SHEIN",
label="Customer",
)
service_level_pick = gr.Dropdown(
choices=get_service_levels("SHEIN"),
value="T01",
label="Service Level",
)
rate_card_pick = gr.Dropdown(
choices=get_rate_cards("SHEIN", "T01"),
value=get_rate_cards("SHEIN", "T01")[0] if get_rate_cards("SHEIN", "T01") else None,
label="Rate Card",
)
run_btn = gr.Button("Generate Statement")
download = gr.File(label="Download Statement")
log_box = gr.Textbox(label="Logs", lines=20)
# 当 Customer 改变时,动态更新 Service Level 和 Rate Card 选项
def update_service_level(customer):
levels = get_service_levels(customer)
first_level = levels[0] if levels else None
cards = get_rate_cards(customer, first_level) if first_level else []
return (
gr.Dropdown(choices=levels, value=first_level),
gr.Dropdown(choices=cards, value=cards[0] if cards else None),
)
# 当 Service Level 改变时,动态更新 Rate Card 选项
def update_rate_card(customer, service_level):
cards = get_rate_cards(customer, service_level)
return gr.Dropdown(choices=cards, value=cards[0] if cards else None)
customer_pick.change(update_service_level, inputs=[customer_pick], outputs=[service_level_pick, rate_card_pick])
service_level_pick.change(update_rate_card, inputs=[customer_pick, service_level_pick], outputs=[rate_card_pick])
run_btn.click(run_statement, inputs=[customer_pick, service_level_pick, rate_card_pick], outputs=[download, log_box])
demo.launch(
auth=(APP_USERNAME, APP_PASSWORD),
auth_message="Please login to continue"
)