Wilson826 commited on
Commit
3c13182
·
verified ·
1 Parent(s): 42f0703

Upload app.py

Browse files
Files changed (1) hide show
  1. app.py +93 -63
app.py CHANGED
@@ -5,6 +5,8 @@ import time
5
  import os
6
  import tempfile
7
  import re
 
 
8
  from collections import OrderedDict
9
  from datetime import datetime
10
  from zoneinfo import ZoneInfo
@@ -36,6 +38,7 @@ NOTION_API_BASE = "https://api.notion.com/v1"
36
  NOTION_PAGE_SIZE = 100
37
  REQUEST_TIMEOUT = 30
38
  API_RATE_LIMIT_DELAY = 0.3
 
39
 
40
  HEADERS = {
41
  "Authorization": f"Bearer {NOTION_TOKEN}",
@@ -217,13 +220,14 @@ def extract_page_properties(page):
217
  return result
218
 
219
 
220
- def query_all_pages(extract_func):
221
  """
222
- 通用 Notion 分页查询
223
 
224
  参数:
225
  extract_func: 从每个 page 提取数据的函数,返回 (key, value) 元组
226
  如果 key 为 None,则跳过该记录
 
227
  返回:
228
  dict: 提取的数据字典
229
  """
@@ -235,6 +239,8 @@ def query_all_pages(extract_func):
235
  body = {"page_size": NOTION_PAGE_SIZE}
236
  if start_cursor:
237
  body["start_cursor"] = start_cursor
 
 
238
 
239
  r = notion_post(f"databases/{DATABASE_ID}/query", body)
240
  data = r.json()
@@ -847,15 +853,12 @@ def update_cwt(file, logger=None):
847
  existing_pages = query_all_pages(extract_mawb_with_id_ar_ap)
848
  logger.log(f"Total {len(existing_pages)} MAWBs in Notion")
849
 
850
- # -------- 更新数据 --------
851
- updated_count = 0
852
  skipped = []
853
  skipped_cwt_ar_ap = []
854
- total = len(mawb_list)
855
-
856
- for idx, (mawb, weight, ata) in enumerate(zip(mawb_list, weight_list, ata_list), 1):
857
- logger.log(f"Progress: {idx}/{total} | Processing MAWB: {mawb}")
858
 
 
859
  if mawb not in existing_pages:
860
  logger.log(f"MAWB {mawb} not found in Notion, skipped")
861
  skipped.append(mawb)
@@ -867,6 +870,7 @@ def update_cwt(file, logger=None):
867
  ap_done = page_info["ap_checked"]
868
 
869
  properties = {}
 
870
 
871
  # CWT:如果 AR 或 AP 已完成则跳过
872
  if ar_done or ap_done:
@@ -875,38 +879,49 @@ def update_cwt(file, logger=None):
875
  reasons.append("AR")
876
  if ap_done:
877
  reasons.append("AP")
878
- logger.log(f"CWT skipped ({' & '.join(reasons)} already checked)")
879
  skipped_cwt_ar_ap.append(f"{mawb} - {' & '.join(reasons)} already checked")
880
  else:
881
  properties["CWT"] = {"number": weight}
 
882
 
883
  # ATA 仅在 Excel 中有值时才写入(不受 AR/AP 影响)
884
  ata_str = None
885
  if pd.notna(ata):
886
  ata_str = ata.strftime("%Y/%m/%d %H:%M")
887
- properties["ATA"] = {
888
- "rich_text": [{"text": {"content": ata_str}}]
889
- }
890
 
891
- # 如果没有任何属性需要更新,跳过 API 调用
892
  if not properties:
893
- logger.log(f"MAWB {mawb} nothing to update, skipped")
894
  continue
895
 
896
- r = notion_patch(page_id, properties)
897
 
898
- if r.status_code == 200:
899
- parts = []
900
- if "CWT" in properties:
901
- parts.append(f"CWT={weight:.2f}")
902
- if "ATA" in properties:
903
- parts.append(f"ATA={ata_str}")
904
- logger.log(f"MAWB {mawb} updated | {' | '.join(parts)}")
905
- updated_count += 1
906
- else:
907
- logger.log(f"MAWB {mawb} update failed: {r.text}")
908
 
909
- time.sleep(API_RATE_LIMIT_DELAY)
 
 
 
 
 
 
 
 
 
 
 
 
910
 
911
  logger.log("\n================ Process Completed ================")
912
  logger.log(f"Updated MAWB: {updated_count}")
@@ -1068,62 +1083,77 @@ def update_whout_date(file, logger=None):
1068
  df = df.dropna(subset=[mawb_col])
1069
  logger.log(f"Excel read completed, total {len(df)} records")
1070
 
1071
- # 查询 Notion
1072
- logger.log("Querying Notion database...")
1073
- existing_pages = query_all_pages(extract_mawb_with_id_whout)
1074
- logger.log(f"Total {len(existing_pages)} MAWBs in Notion")
 
1075
 
1076
- updated = 0
 
 
 
 
 
1077
  skipped_no_date = []
1078
  skipped_already = []
1079
  not_found = []
1080
- total = len(df)
1081
 
1082
- for current, (_, row) in enumerate(df.iterrows(), 1):
1083
  mawb = str(row[mawb_col]).strip()
1084
- logger.log(f"Progress: {current}/{total} | Processing MAWB: {mawb}")
1085
 
1086
  if not mawb or mawb.lower() == 'nan':
1087
  logger.log("Empty MAWB, skipped")
1088
  continue
1089
 
1090
- if mawb not in existing_pages:
1091
- logger.log("Not found in Notion")
1092
- not_found.append(mawb)
1093
- continue
 
 
 
1094
 
1095
- page_info = existing_pages[mawb]
 
 
 
 
 
 
 
1096
 
1097
- # 已有日期则跳过,不做覆盖
1098
- if page_info["has_whout"]:
1099
- logger.log("WH-OUT Date already exists in Notion, skipped")
1100
  skipped_already.append(mawb)
1101
- continue
 
 
1102
 
1103
- # 日期清洗
1104
- raw_date = row.get(whout_col)
1105
- if pd.isna(raw_date):
1106
- logger.log("WH-OUT Date is empty in Excel, skipped")
1107
- skipped_no_date.append(mawb)
1108
- continue
1109
 
1110
- try:
1111
- date_value = pd.to_datetime(raw_date).strftime("%Y-%m-%d")
1112
- except Exception:
1113
- logger.log(f"Date parse failed: '{raw_date}', skipped")
1114
- skipped_no_date.append(f"{mawb} - parse failed: {raw_date}")
1115
- continue
1116
 
1117
- page_id = page_info["id"]
1118
  r = notion_patch(page_id, {"WH-OUT Date": {"date": {"start": date_value}}})
 
1119
 
1120
- if r.status_code == 200:
1121
- logger.log(f"WH-OUT Date updated to {date_value}")
1122
- updated += 1
1123
- else:
1124
- logger.log(f"Update failed: {r.text}")
1125
-
1126
- time.sleep(API_RATE_LIMIT_DELAY)
 
 
 
 
 
 
1127
 
1128
  logger.log("\n================ Process Completed ================")
1129
  logger.log(f"Updated: {updated}")
 
5
  import os
6
  import tempfile
7
  import re
8
+ import threading
9
+ from concurrent.futures import ThreadPoolExecutor, as_completed
10
  from collections import OrderedDict
11
  from datetime import datetime
12
  from zoneinfo import ZoneInfo
 
38
  NOTION_PAGE_SIZE = 100
39
  REQUEST_TIMEOUT = 30
40
  API_RATE_LIMIT_DELAY = 0.3
41
+ MAX_CONCURRENT_WORKERS = 3
42
 
43
  HEADERS = {
44
  "Authorization": f"Bearer {NOTION_TOKEN}",
 
220
  return result
221
 
222
 
223
+ def query_all_pages(extract_func, filter_body=None):
224
  """
225
+ 通用 Notion 分页查询,支持可选服务端过滤
226
 
227
  参数:
228
  extract_func: 从每个 page 提取数据的函数,返回 (key, value) 元组
229
  如果 key 为 None,则跳过该记录
230
+ filter_body: 可选,Notion filter 字典,传入后由服务端过滤,减少传输量
231
  返回:
232
  dict: 提取的数据字典
233
  """
 
239
  body = {"page_size": NOTION_PAGE_SIZE}
240
  if start_cursor:
241
  body["start_cursor"] = start_cursor
242
+ if filter_body:
243
+ body["filter"] = filter_body
244
 
245
  r = notion_post(f"databases/{DATABASE_ID}/query", body)
246
  data = r.json()
 
853
  existing_pages = query_all_pages(extract_mawb_with_id_ar_ap)
854
  logger.log(f"Total {len(existing_pages)} MAWBs in Notion")
855
 
856
+ # -------- Phase 1:预处理,构建更新任务列表 --------
 
857
  skipped = []
858
  skipped_cwt_ar_ap = []
859
+ tasks = [] # (mawb, page_id, properties, log_suffix)
 
 
 
860
 
861
+ for mawb, weight, ata in zip(mawb_list, weight_list, ata_list):
862
  if mawb not in existing_pages:
863
  logger.log(f"MAWB {mawb} not found in Notion, skipped")
864
  skipped.append(mawb)
 
870
  ap_done = page_info["ap_checked"]
871
 
872
  properties = {}
873
+ log_parts = []
874
 
875
  # CWT:如果 AR 或 AP 已完成则跳过
876
  if ar_done or ap_done:
 
879
  reasons.append("AR")
880
  if ap_done:
881
  reasons.append("AP")
882
+ logger.log(f"MAWB {mawb}: CWT skipped ({' & '.join(reasons)} already checked)")
883
  skipped_cwt_ar_ap.append(f"{mawb} - {' & '.join(reasons)} already checked")
884
  else:
885
  properties["CWT"] = {"number": weight}
886
+ log_parts.append(f"CWT={weight:.2f}")
887
 
888
  # ATA 仅在 Excel 中有值时才写入(不受 AR/AP 影响)
889
  ata_str = None
890
  if pd.notna(ata):
891
  ata_str = ata.strftime("%Y/%m/%d %H:%M")
892
+ properties["ATA"] = {"rich_text": [{"text": {"content": ata_str}}]}
893
+ log_parts.append(f"ATA={ata_str}")
 
894
 
895
+ # 没有任何属性需要更新,跳过
896
  if not properties:
897
+ logger.log(f"MAWB {mawb}: nothing to update, skipped")
898
  continue
899
 
900
+ tasks.append((mawb, page_id, properties, " | ".join(log_parts)))
901
 
902
+ logger.log(f"\nPrepared {len(tasks)} update tasks, starting concurrent PATCH (workers={MAX_CONCURRENT_WORKERS})...")
903
+
904
+ # -------- Phase 2:并发 PATCH --------
905
+ updated_count = 0
906
+ lock = threading.Lock()
907
+
908
+ def _patch_task(mawb, page_id, properties, log_suffix):
909
+ r = notion_patch(page_id, properties)
910
+ return mawb, r.status_code, r.text, log_suffix
 
911
 
912
+ with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_WORKERS) as executor:
913
+ futures = {
914
+ executor.submit(_patch_task, mawb, page_id, props, log_sfx): mawb
915
+ for mawb, page_id, props, log_sfx in tasks
916
+ }
917
+ for future in as_completed(futures):
918
+ mawb, status, text, log_sfx = future.result()
919
+ with lock:
920
+ if status == 200:
921
+ logger.log(f"MAWB {mawb} updated | {log_sfx}")
922
+ updated_count += 1
923
+ else:
924
+ logger.log(f"MAWB {mawb} update failed: {text}")
925
 
926
  logger.log("\n================ Process Completed ================")
927
  logger.log(f"Updated MAWB: {updated_count}")
 
1083
  df = df.dropna(subset=[mawb_col])
1084
  logger.log(f"Excel read completed, total {len(df)} records")
1085
 
1086
+ # -------- 方案二:服务端过滤,只拉取 WH-OUT Date 为空的记录 --------
1087
+ logger.log("Querying Notion (filter: WH-OUT Date is empty)...")
1088
+ whout_empty_filter = {"property": "WH-OUT Date", "date": {"is_empty": True}}
1089
+ pages_to_update = query_all_pages(extract_mawb_with_id_whout, filter_body=whout_empty_filter)
1090
+ logger.log(f"Records without WH-OUT Date: {len(pages_to_update)}")
1091
 
1092
+ # 第二次轻量查询:获取全部 MAWB,用于区分「未找到」与「已有日期」
1093
+ logger.log("Querying all MAWBs for existence check...")
1094
+ all_mawbs = set(query_all_pages(extract_mawb_only).keys())
1095
+ logger.log(f"Total MAWBs in Notion: {len(all_mawbs)}")
1096
+
1097
+ # -------- Phase 1:预处理,构建更新任务列表 --------
1098
  skipped_no_date = []
1099
  skipped_already = []
1100
  not_found = []
1101
+ tasks = [] # (mawb, page_id, date_value)
1102
 
1103
+ for _, row in df.iterrows():
1104
  mawb = str(row[mawb_col]).strip()
 
1105
 
1106
  if not mawb or mawb.lower() == 'nan':
1107
  logger.log("Empty MAWB, skipped")
1108
  continue
1109
 
1110
+ if mawb in pages_to_update:
1111
+ # 日期清洗
1112
+ raw_date = row.get(whout_col)
1113
+ if pd.isna(raw_date):
1114
+ logger.log(f"MAWB {mawb}: WH-OUT Date is empty in Excel, skipped")
1115
+ skipped_no_date.append(mawb)
1116
+ continue
1117
 
1118
+ try:
1119
+ date_value = pd.to_datetime(raw_date).strftime("%Y-%m-%d")
1120
+ except Exception:
1121
+ logger.log(f"MAWB {mawb}: Date parse failed '{raw_date}', skipped")
1122
+ skipped_no_date.append(f"{mawb} - parse failed: {raw_date}")
1123
+ continue
1124
+
1125
+ tasks.append((mawb, pages_to_update[mawb]["id"], date_value))
1126
 
1127
+ elif mawb in all_mawbs:
1128
+ logger.log(f"MAWB {mawb}: WH-OUT Date already exists in Notion, skipped")
 
1129
  skipped_already.append(mawb)
1130
+ else:
1131
+ logger.log(f"MAWB {mawb}: Not found in Notion")
1132
+ not_found.append(mawb)
1133
 
1134
+ logger.log(f"\nPrepared {len(tasks)} update tasks, starting concurrent PATCH (workers={MAX_CONCURRENT_WORKERS})...")
 
 
 
 
 
1135
 
1136
+ # -------- Phase 2:并发 PATCH --------
1137
+ updated = 0
1138
+ lock = threading.Lock()
 
 
 
1139
 
1140
+ def _patch_task(mawb, page_id, date_value):
1141
  r = notion_patch(page_id, {"WH-OUT Date": {"date": {"start": date_value}}})
1142
+ return mawb, r.status_code, r.text, date_value
1143
 
1144
+ with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_WORKERS) as executor:
1145
+ futures = {
1146
+ executor.submit(_patch_task, mawb, page_id, date_val): mawb
1147
+ for mawb, page_id, date_val in tasks
1148
+ }
1149
+ for future in as_completed(futures):
1150
+ mawb, status, text, date_val = future.result()
1151
+ with lock:
1152
+ if status == 200:
1153
+ logger.log(f"MAWB {mawb}: WH-OUT Date updated to {date_val}")
1154
+ updated += 1
1155
+ else:
1156
+ logger.log(f"MAWB {mawb}: Update failed: {text}")
1157
 
1158
  logger.log("\n================ Process Completed ================")
1159
  logger.log(f"Updated: {updated}")