Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import requests | |
| import math | |
| import os | |
| import tempfile | |
| import gradio as gr | |
| import traceback | |
| from openpyxl import load_workbook | |
| from datetime import datetime | |
| # ================== 配置 ================== | |
| APP_USERNAME = os.environ.get("APP_USERNAME") | |
| APP_PASSWORD = os.environ.get("APP_PASSWORD") | |
| if not APP_USERNAME or not APP_PASSWORD: | |
| raise ValueError("APP_USERNAME / APP_PASSWORD not set") | |
| DATABASE_ID = "2caee2cb9a5d8069a8b7c54113f266c1" | |
| NOTION_TOKEN = os.environ.get("NOTION_TOKEN") | |
| if not NOTION_TOKEN: | |
| raise ValueError("NOTION_TOKEN not setup yet") | |
| HEADERS = { | |
| "Authorization": f"Bearer {NOTION_TOKEN}", | |
| "Content-Type": "application/json", | |
| "Notion-Version": "2022-06-28" | |
| } | |
| TEMPLATE_PATH = "template.xlsx" | |
| # ================== Fee Configuration ================== | |
| RATES = { | |
| "SHEIN": { | |
| "T01": { | |
| "$125/entry_cap 10 entries/MAWB (2025/9)": { | |
| "enabled": False, # False为停用,True为启用 | |
| "hts": { | |
| "max_per_entry": 999, | |
| "entry_fee": 125, | |
| "cap_entry_count": 10, | |
| "cap_fee": 1250, | |
| }, | |
| "wh_rate": 0.18, | |
| "ces_transfer_rate": 0.23, | |
| }, | |
| "$100/entry_no cap (2026/2)": { | |
| "enabled": True,# False为停用,True为启用 | |
| "hts": { | |
| "max_per_entry": 999, | |
| "entry_fee": 100, | |
| "cap_entry_count": None, | |
| "cap_fee": None, | |
| }, | |
| "wh_rate": 0.18, | |
| "ces_transfer_rate": 0.23, | |
| }, | |
| } | |
| # 未来可添加其他 Service Level,如 "T02": {...} | |
| } | |
| # 未来可添加其他客户 | |
| } | |
| # ================== Notion 辅助函数 ================== | |
| def _get_rich_text(p, name): | |
| arr = p.get(name, {}).get("rich_text", []) | |
| return arr[0]["text"]["content"] if arr else "" | |
| def _get_title(p, name): | |
| arr = p.get(name, {}).get("title", []) | |
| return arr[0]["text"]["content"] if arr else "" | |
| def _get_number(p, name): | |
| return p.get(name, {}).get("number") | |
| def _get_checkbox(p, name): | |
| return p.get(name, {}).get("checkbox", False) | |
| # ================== Notion 数据获取 ================== | |
| def query_notion_database(logs): | |
| logs.append("Starting to query the Notion database...") | |
| results = [] | |
| try: | |
| has_more = True | |
| start_cursor = None | |
| while has_more: | |
| body = {"page_size": 100} | |
| if start_cursor: | |
| body["start_cursor"] = start_cursor | |
| r = requests.post( | |
| f"https://api.notion.com/v1/databases/{DATABASE_ID}/query", | |
| headers=HEADERS, | |
| json=body, | |
| timeout=30 | |
| ) | |
| r.raise_for_status() | |
| data = r.json() | |
| for page in data.get("results", []): | |
| p = page.get("properties", {}) | |
| results.append({ | |
| "MAWB": _get_title(p, "MAWB"), | |
| "Customer": _get_rich_text(p, "Customer"), | |
| "Service Level": _get_rich_text(p, "Service Level"), | |
| "POD": _get_rich_text(p, "POD"), | |
| "Parcels": _get_number(p, "Parcels") or 0, | |
| "CWT": _get_number(p, "CWT"), | |
| "HTS Count": _get_number(p, "HTS Count") or 0, | |
| "ISC Fee": _get_number(p, "ISC Fee") or 0, | |
| "BreakDown Fee": _get_number(p, "BreakDown Fee") or 0, | |
| "Bypass Fee": _get_number(p, "Bypass Fee") or 0, | |
| "CES Inspection Fee": _get_number(p, "CES Inspection Fee") or 0, | |
| "Delivery Fee": _get_number(p, "Delivery Fee") or 0, | |
| "ETA": _get_rich_text(p, "ETA"), | |
| "AP": _get_checkbox(p, "AP"), | |
| "AR": _get_checkbox(p, "AR") | |
| }) | |
| has_more = data.get("has_more", False) | |
| start_cursor = data.get("next_cursor") | |
| logs.append(f"The query of the Notion database has been completed,There are a total of {len(results)} records") | |
| return results | |
| except Exception: | |
| logs.append("Notion query error") | |
| logs.append(traceback.format_exc()) | |
| return [] | |
| # ================== 费用计算 ================== | |
| def calculate_hts_fee(hts_count, hts_rate): | |
| entry_count = math.ceil(hts_count / hts_rate["max_per_entry"]) | |
| # 有 cap 时,超出上限直接返回封顶费用 | |
| if hts_rate["cap_entry_count"] is not None and entry_count > hts_rate["cap_entry_count"]: | |
| return hts_rate["cap_fee"], entry_count | |
| if entry_count == 1: | |
| return hts_rate["entry_fee"], entry_count | |
| full_entries = entry_count - 1 | |
| last_entry_hts = hts_count - full_entries * hts_rate["max_per_entry"] | |
| fee = full_entries * hts_rate["entry_fee"] + min(last_entry_hts, hts_rate["entry_fee"]) | |
| return fee, entry_count | |
| def calculate_wh_fee(cwt, rate): | |
| return round(cwt * rate, 2) | |
| def calculate_total_fee(hts, wh, isc, bd, bypass, ces_i, ces_t, lm): | |
| return round(hts + wh + isc + bd + bypass + ces_i + ces_t + lm, 2) | |
| def is_missing_cwt(val): | |
| return val is None or val == 0 | |
| # ================== 生成 Excel ================== | |
| def generate_excel_statement(notion_data, logs, customer, service_level, rate_card): | |
| logs.append(f"Filter out the MAWBs of {customer} / AP = Done / AR = Pending / {service_level} / Rate Card: {rate_card}") | |
| rows = [ | |
| d for d in notion_data | |
| if d["Customer"] == customer | |
| and d["AP"] is True | |
| and d["AR"] is False | |
| and d["Service Level"] == service_level | |
| ] | |
| if not rows: | |
| logs.append("No MAWB that meets the requirements exists.") | |
| return None | |
| logs.append(f"eligible MAWB:{len(rows)}") | |
| # ===== CWT 强校验 ===== | |
| missing_cwt_mawbs = [r["MAWB"] for r in rows if is_missing_cwt(r["CWT"])] | |
| if missing_cwt_mawbs: | |
| logs.append("The following MAWBs are missing CWT. Please try again after updating the CWT via Hugging face.") | |
| for m in missing_cwt_mawbs: | |
| logs.append(f" - {m}") | |
| return None | |
| try: | |
| customer_rates = RATES.get(customer, {}).get(service_level, {}).get(rate_card) | |
| if not customer_rates: | |
| logs.append(f"Rate config missing for customer: {customer}, service level: {service_level}, rate card: {rate_card}") | |
| return None | |
| wb = load_workbook(TEMPLATE_PATH) | |
| ws = wb.active | |
| start_row = 11 | |
| current_row = start_row | |
| for r in rows: | |
| hts_fee, entry_cnt = calculate_hts_fee(r["HTS Count"], customer_rates["hts"]) | |
| wh_fee = calculate_wh_fee(r["CWT"], customer_rates["wh_rate"]) | |
| ces_inspection_fee = r["CES Inspection Fee"] | |
| if not ces_inspection_fee: | |
| ces_transfer_fee = 0 | |
| else: | |
| ces_transfer_fee = round(r["CWT"] * customer_rates["ces_transfer_rate"], 2) | |
| lm_delivery_fee = r["Delivery Fee"] or 0 | |
| total_fee = calculate_total_fee( | |
| hts_fee, | |
| wh_fee, | |
| r["ISC Fee"], | |
| r["BreakDown Fee"], | |
| r["Bypass Fee"], | |
| ces_inspection_fee, | |
| ces_transfer_fee, | |
| lm_delivery_fee | |
| ) | |
| try: | |
| pickup = (pd.to_datetime(r["ETA"]) + pd.Timedelta(days=1)).strftime("%Y/%m/%d") | |
| except (ValueError, TypeError) as e: | |
| logs.append(f"Warning: MAWB {r['MAWB']} ETA parse failed: {e}") | |
| pickup = "" | |
| data = [ | |
| r["Service Level"], # 1 Clearance Mode | |
| r["POD"], # 2 POD | |
| r["MAWB"], # 3 MAWB | |
| "GOFO", # 4 LM Carrier | |
| r["Parcels"], # 5 Parcels | |
| 0, # 6 Exam Hold | |
| r["CWT"], # 7 CWT (KG) | |
| entry_cnt, # 8 7501-Form Entry submissions | |
| r["HTS Count"], # 9 HTS Code Count | |
| pickup, # 10 Pickup Time | |
| hts_fee, # 11 HTS Processing Fee | |
| wh_fee, # 12 WH Handling Fee | |
| r["ISC Fee"], # 13 Import Service Fee | |
| r["BreakDown Fee"], # 14 Loose cargo BreakDown Fee | |
| r["Bypass Fee"], # 15 Bypass Fee | |
| 0, # 16 Exam Sorting Fee | |
| ces_inspection_fee, # 17 CES Inspection Fee | |
| ces_transfer_fee, # 18 CES Transfer Fee | |
| 0, # 19 Material Fee | |
| 0, # 20 Storage Fee | |
| lm_delivery_fee, # 21 LM Delivery Fee | |
| total_fee # 22 Total | |
| ] | |
| for col, val in enumerate(data, start=1): | |
| ws.cell(row=current_row, column=col, value=val) | |
| logs.append(f"Generate MAWB {r['MAWB']}") | |
| current_row += 1 | |
| now = datetime.now() | |
| filename = f"{customer} Invoice {now.strftime('%B')}_{now.strftime('%Y')}.xlsx" | |
| file_path = os.path.join(tempfile.gettempdir(), filename) | |
| wb.save(file_path) | |
| logs.append(f"Excel statement generation completed:{filename}") | |
| return file_path | |
| except Exception: | |
| logs.append("Excel statement generation error") | |
| logs.append(traceback.format_exc()) | |
| return None | |
| # ================== Gradio UI ================== | |
| def get_service_levels(customer): | |
| """根据客户获取可用的 Service Level 列表""" | |
| return list(RATES.get(customer, {}).keys()) | |
| def get_rate_cards(customer, service_level): | |
| """根据客户和 Service Level 获取可用的 Rate Card 列表(仅返回 enabled=True 的)""" | |
| rate_cards = RATES.get(customer, {}).get(service_level, {}) | |
| return [name for name, config in rate_cards.items() if config.get("enabled", True)] | |
| def run_statement(customer, service_level, rate_card): | |
| logs = [] | |
| notion_data = query_notion_database(logs) | |
| if not notion_data: | |
| return None, "\n".join(logs) | |
| file_path = generate_excel_statement(notion_data, logs, customer, service_level, rate_card) | |
| return file_path, "\n".join(logs) | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Apex-Ecomm Billing Statement Generator") | |
| customer_pick = gr.Dropdown( | |
| choices=list(RATES.keys()), | |
| value="SHEIN", | |
| label="Customer", | |
| ) | |
| service_level_pick = gr.Dropdown( | |
| choices=get_service_levels("SHEIN"), | |
| value="T01", | |
| label="Service Level", | |
| ) | |
| rate_card_pick = gr.Dropdown( | |
| choices=get_rate_cards("SHEIN", "T01"), | |
| value=get_rate_cards("SHEIN", "T01")[0] if get_rate_cards("SHEIN", "T01") else None, | |
| label="Rate Card", | |
| ) | |
| run_btn = gr.Button("Generate Statement") | |
| download = gr.File(label="Download Statement") | |
| log_box = gr.Textbox(label="Logs", lines=20) | |
| # 当 Customer 改变时,动态更新 Service Level 和 Rate Card 选项 | |
| def update_service_level(customer): | |
| levels = get_service_levels(customer) | |
| first_level = levels[0] if levels else None | |
| cards = get_rate_cards(customer, first_level) if first_level else [] | |
| return ( | |
| gr.Dropdown(choices=levels, value=first_level), | |
| gr.Dropdown(choices=cards, value=cards[0] if cards else None), | |
| ) | |
| # 当 Service Level 改变时,动态更新 Rate Card 选项 | |
| def update_rate_card(customer, service_level): | |
| cards = get_rate_cards(customer, service_level) | |
| return gr.Dropdown(choices=cards, value=cards[0] if cards else None) | |
| customer_pick.change(update_service_level, inputs=[customer_pick], outputs=[service_level_pick, rate_card_pick]) | |
| service_level_pick.change(update_rate_card, inputs=[customer_pick, service_level_pick], outputs=[rate_card_pick]) | |
| run_btn.click(run_statement, inputs=[customer_pick, service_level_pick, rate_card_pick], outputs=[download, log_box]) | |
| demo.launch( | |
| auth=(APP_USERNAME, APP_PASSWORD), | |
| auth_message="Please login to continue" | |
| ) | |