Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| process_report.py | |
| 保持你原有功能(读取 Excel -> 聚合 -> 导出 Excel/HTML -> 通过 Resend 发送)的前提下, | |
| 仅进行了最小必要增强: | |
| 1) 增强导出的 Excel 样式,解决:部分边框不全、Q 列以后列无边框、去掉最后一行(若为汇总/空行)的问题; | |
| 2) 在 Resend 发送失败并怀疑为 Gmail "550-5.7.1 unsolicited mail" 时,自动尝试用更“简洁/友好”的邮件内容重试一次; | |
| 3) 可选 SMTP 回退:当 Resend 失败且你提供 SMTP_* 环境变量时,会尝试通过 SMTP 重发(仅作回退); | |
| 其他数据处理逻辑(表头识别、分组、聚合、进度判定)保持原样,未做不必要修改. | |
| """ | |
| import os | |
| import sys | |
| import glob | |
| import json | |
| import base64 | |
| from io import BytesIO | |
| from typing import Optional, Tuple, List | |
| from datetime import datetime, date | |
| import pandas as pd | |
| import requests | |
| import smtplib | |
| from email.message import EmailMessage | |
| # ====== 目录 & 环境 ====== | |
| INPUT_DIR = os.environ.get("INPUT_DIR", "/data/uploads") | |
| OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "/data/outputs") | |
| os.makedirs(INPUT_DIR, exist_ok=True) | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| RESEND_API_KEY = os.environ.get("RESEND_API_KEY") | |
| FROM_EMAIL = os.environ.get("FROM_EMAIL") | |
| FROM_NAME = os.environ.get("FROM_NAME") # 可选,显示名 | |
| TO_EMAIL = os.environ.get("TO_EMAIL") | |
| TIMEZONE = os.environ.get("TZ", "Asia/Shanghai") | |
| # SMTP 回退(可选) | |
| SMTP_HOST = os.environ.get("SMTP_HOST") | |
| SMTP_PORT = int(os.environ.get("SMTP_PORT", 587) or 587) | |
| SMTP_USER = os.environ.get("SMTP_USER") | |
| SMTP_PASS = os.environ.get("SMTP_PASS") | |
| # ====== 业务相关字段 ====== | |
| ALIASES = { | |
| "请购日期": ["请购日期", "请购日", "申请日期"], | |
| "请购单号": ["请购单号", "请购单编号", "申请单号"], | |
| "物料编码": ["物料编码", "物料号", "物料代码"], | |
| "物料名称": ["物料名称", "品名", "名称"], | |
| "纱支密度": ["纱支密度", "纱支/密度", "纱支 密度"], | |
| "门幅(CM)": ["门幅(CM)", "门幅(CM)", "门幅cm", "门幅"], | |
| "颜色": ["颜色", "色号/颜色", "色号"], | |
| "主单位": ["主单位", "单位"], | |
| "主数量": ["主数量", "数量", "请购数量"], | |
| "需求日期": ["需求日期", "需求日", "交期", "要求到货日期"], | |
| "供应商": ["供应商", "供货商", "供应商名称"], | |
| "到货日期": ["到货日期", "实到日期", "收货日期"], | |
| "到货主数量": ["到货主数量", "到货数量", "实到数量"], | |
| "入库日期": ["入库日期", "入库日"], | |
| "入库主数量": ["入库主数量", "入库数量"], | |
| "计划到货日期": ["计划到货日期", "预计到货日期", "承诺到货日期", "计划到货日"], | |
| } | |
| EMAIL_COLS_DEFAULT = [ | |
| "请购日期","请购单号","物料编码","物料名称","纱支密度","门幅(CM)","颜色","主单位", | |
| "主数量","需求日期","供应商","到货日期","到货主数量","入库日期","入库主数量","目前进度" | |
| ] | |
| # ====== 工具函数(读取 / 归一化 / 聚合) ====== | |
| def _today() -> date: | |
| return datetime.now().date() | |
| def _normalize_columns(df: pd.DataFrame) -> pd.DataFrame: | |
| mapped = {} | |
| for std_name, variants in ALIASES.items(): | |
| for v in df.columns: | |
| v_clean = str(v).strip() | |
| if v_clean in variants: | |
| mapped[v] = std_name | |
| break | |
| df = df.rename(columns=mapped) | |
| return df | |
| def _find_header_row(path: str, must_have: List[str] = None, try_rows: int = 10) -> int: | |
| must_have = must_have or ["物料编码", "主数量"] | |
| for r in range(try_rows): | |
| try: | |
| df_try = pd.read_excel(path, header=r, nrows=1) | |
| except Exception: | |
| continue | |
| cols = [str(c).strip() for c in df_try.columns] | |
| if all(any(m in c for c in cols) or m in cols for m in must_have): | |
| return r | |
| return 0 | |
| def read_system_export(path: str) -> pd.DataFrame: | |
| header_row = _find_header_row(path) | |
| try: | |
| df = pd.read_excel(path, header=header_row) | |
| except Exception: | |
| df = pd.read_excel(path, header=0) | |
| df = df.dropna(axis=1, how="all") | |
| df = _normalize_columns(df) | |
| if "物料名称" in df.columns: | |
| df["物料名称"] = df["物料名称"].astype(str).fillna('') | |
| contains_e = df["物料名称"].str.contains("鹅", na=False) | |
| contains_ya = df["物料名称"].str.contains("鸭", na=False) | |
| contains_huazhu = df["物料名称"].str.contains("华住", na=False) | |
| contains_huazhu_special = df["物料名称"].str.contains("华住专用", na=False) | |
| contains_yuanliao_special = df["物料名称"].str.contains("分原料绒", na=False) | |
| to_remove = (contains_e | contains_ya | contains_huazhu | contains_yuanliao_special) & ~contains_huazhu_special | |
| df = df[~to_remove] | |
| for c in ["请购日期","需求日期","到货日期","入库日期","计划到货日期"]: | |
| if c in df.columns: | |
| df[c] = pd.to_datetime(df[c], errors="coerce") | |
| for c in ["主数量","到货主数量","入库主数量"]: | |
| if c in df.columns: | |
| df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0) | |
| df = df.dropna(how="all") | |
| # ====== 新增代码:开始 ====== | |
| # 增加清洗:如果一行连“请购单号”或“物料编码”这种关键ID都没有,则删除 | |
| key_cols_to_check = [c for c in ["请购单号", "物料编码"] if c in df.columns] | |
| if key_cols_to_check: | |
| df = df.dropna(subset=key_cols_to_check, how='all') | |
| # ====== 新增代码:结束 ====== | |
| return df | |
| def _first_nonnull(series: pd.Series): | |
| return series.dropna().iloc[0] if not series.dropna().empty else None | |
| def aggregate_for_email(df: pd.DataFrame) -> pd.DataFrame: | |
| group_keys = [k for k in [ | |
| "请购单号","物料编码","物料名称","纱支密度","门幅(CM)","颜色","主单位","供应商" | |
| ] if k in df.columns] | |
| if not group_keys: | |
| raise RuntimeError("找不到用于分组的关键字段(如 请购单号/物料编码 等),请检查导入的表头。") | |
| agg_map = { | |
| "主数量": _first_nonnull, | |
| "请购日期": _first_nonnull, | |
| "需求日期": _first_nonnull, | |
| "到货日期": "max", | |
| "入库日期": "max", | |
| "计划到货日期": "max", | |
| "到货主数量": "sum", | |
| "入库主数量": "sum" | |
| } | |
| final_agg_map = {k: v for k, v in agg_map.items() if k in df.columns} | |
| grouped = df.groupby(group_keys, dropna=False).agg(final_agg_map).reset_index() | |
| grouped["目前进度"] = grouped.apply(_calc_progress_row, axis=1) | |
| # ====== 修改点在这里(新的 get_sort_key 排序逻辑) ====== | |
| def get_sort_key(status_text): | |
| status_text = str(status_text) | |
| # 1. 最紧急:逾期 + 未到货 | |
| if "逾期" in status_text and "未到货" in status_text: | |
| return 1 | |
| # 2. 次紧急:逾期 + 部分到货 | |
| if "逾期" in status_text and "部分到货" in status_text: | |
| return 2 | |
| # 3. 一般紧急:即将到货 | |
| if "未来7天" in status_text: | |
| return 3 | |
| # 4. 处理中:部分到货(未逾期) | |
| if "部分到货" in status_text: # 此时已排除逾期 | |
| return 4 | |
| # 5. 处理中:未到货(未到期) | |
| if "未到货(未到期)" in status_text: | |
| return 5 | |
| # 6. 已完成 | |
| if "完全到货" in status_text: | |
| return 6 | |
| # 7. 其他(如 "处理中" 或其他未匹配状态) | |
| return 7 | |
| # ====== 修改结束 ====== | |
| grouped['sort_key'] = grouped['目前进度'].apply(get_sort_key) | |
| grouped = grouped.sort_values(by='sort_key').drop(columns=['sort_key']) | |
| if "计划到货日期" in grouped.columns: | |
| grouped = grouped.drop(columns=["计划到货日期"]) | |
| final_cols = [col for col in EMAIL_COLS_DEFAULT if col in grouped.columns] | |
| final = grouped[final_cols] | |
| date_cols_to_format = ["请购日期", "需求日期", "到货日期", "入库日期"] | |
| for col in date_cols_to_format: | |
| if col in final.columns: | |
| final[col] = pd.to_datetime(final[col], errors='coerce').dt.strftime('%Y-%m-%d') | |
| return final | |
| def get_sort_key(status_text): | |
| if "逾期" in status_text and "未到货" in status_text: | |
| return 1 | |
| if "逾期" in status_text and "部分到货" in status_text: | |
| return 2 | |
| if "未来7天" in status_text: | |
| return 3 | |
| if "完全到货" in status_text: | |
| return 4 | |
| return 5 | |
| grouped['sort_key'] = grouped['目前进度'].apply(get_sort_key) | |
| grouped = grouped.sort_values(by='sort_key').drop(columns=['sort_key']) | |
| if "计划到货日期" in grouped.columns: | |
| grouped = grouped.drop(columns=["计划到货日期"]) | |
| final_cols = [col for col in EMAIL_COLS_DEFAULT if col in grouped.columns] | |
| final = grouped[final_cols] | |
| date_cols_to_format = ["请购日期", "需求日期", "到货日期", "入库日期"] | |
| for col in date_cols_to_format: | |
| if col in final.columns: | |
| final[col] = pd.to_datetime(final[col], errors='coerce').dt.strftime('%Y-%m-%d') | |
| return final | |
| def _calc_progress_row(row: pd.Series) -> str: | |
| today = pd.Timestamp(_today()) | |
| main_qty = float(row.get("主数量", 0) or 0) | |
| arr_qty = float(row.get("到货主数量", 0) or 0) | |
| demand_date = row.get("需求日期", pd.NaT) | |
| arrival_date = row.get("到货日期", pd.NaT) | |
| plan_arrival = row.get("计划到货日期", pd.NaT) | |
| if main_qty > 0 and arr_qty >= main_qty * 0.97: | |
| return "完全到货" | |
| parts: List[str] = [] | |
| shortage = max(0.0, main_qty - arr_qty) | |
| if arr_qty > 0: | |
| parts.append(f"部分到货 缺货{shortage:g}米") | |
| elif pd.isna(arrival_date) or arr_qty == 0: | |
| parts.append("未到货") | |
| if (pd.isna(arrival_date) or arr_qty < main_qty) and pd.notna(demand_date): | |
| overdue_days = (today - demand_date).days | |
| if overdue_days > 0: | |
| parts.append(f"已逾期{overdue_days}天") | |
| if pd.notna(plan_arrival): | |
| try: | |
| days_ahead = (plan_arrival.date() - today.date()).days | |
| if 0 <= days_ahead <= 7: | |
| parts.append(f"未来7天到货(计划{str(plan_arrival.date())})") | |
| except Exception: | |
| pass | |
| if not parts: | |
| if pd.notna(demand_date) and today <= pd.Timestamp(demand_date.date()): | |
| return "未到货(未到期)" | |
| return "处理中" | |
| return ";".join(parts) | |
| def _find_latest_input(input_dir: str) -> Optional[str]: | |
| files = [] | |
| for pat in ("*.xlsx", "*.xls"): | |
| files.extend(glob.glob(os.path.join(input_dir, pat))) | |
| if not files: | |
| return None | |
| files.sort(key=os.path.getmtime, reverse=True) | |
| return files[0] | |
| # ====== 导出 Excel:增强样式,解决边框/最后一行问题(仅替换导出逻辑) ====== | |
| def _df_to_styled_excel_bytes(df: pd.DataFrame) -> bytes: | |
| """ | |
| 增强导出: | |
| - 如果最后一行是全空或包含合计/总计/nan文字,删除它(用户不需要最后一行) | |
| - 另外:如果最后一行“看起来像汇总行”(大部分为数值),也会删除 | |
| - 设置统一边框,仅覆盖到实际数据的最后一列(不延伸到 Q 列之后) | |
| - 标红逾期行 | |
| - 自动列宽,但不破坏原数据 | |
| """ | |
| # ---------- 更稳健的最后一行判断与删除 ---------- | |
| if len(df) > 0: | |
| last = df.tail(1) | |
| # 1) 如果整行全为空 -> 删除 | |
| if last.isna().all(axis=1).iloc[0]: | |
| df = df.iloc[:-1, :] | |
| else: | |
| # 将最后一行所有单元格值转换为字符串并清洗(去 BOM / 零宽等) | |
| def _clean_val(x): | |
| try: | |
| s = str(x) | |
| except Exception: | |
| s = "" | |
| s = s.replace('\ufeff', '').replace('\u200b', '').strip().lower() | |
| return s | |
| last_strs = last.applymap(_clean_val).iloc[0].tolist() | |
| joined = " ".join([s for s in last_strs if s]) | |
| # 2) 如果包含明确关键字 -> 删除 | |
| keywords = ["合计", "总计", "小计", "汇总", "nan", "total", "sum"] | |
| if any(k in joined for k in keywords): | |
| df = df.iloc[:-1, :] | |
| else: | |
| # 3) 判断“数值比例”:如果最后一行中多数单元格看起来是数值(>=60%),则认定为汇总行并删除 | |
| num_cols = len(last_strs) | |
| numeric_count = 0 | |
| for v in last_strs: | |
| if v == "": | |
| continue | |
| # 尝试解析为 float(去掉千分分隔符逗号) | |
| v2 = v.replace(',', '') | |
| try: | |
| float(v2) | |
| numeric_count += 1 | |
| continue | |
| except Exception: | |
| pass | |
| # 允许带单位的数字(例如 "301575.6米")——提取前面数字 | |
| import re | |
| m = re.match(r'^[\+\-]?\d+(\.\d+)?', v2) | |
| if m: | |
| numeric_count += 1 | |
| # 规则阈值:数值比例 >= 0.6 或者数值个数 >= max(2, 40%*cols) 时视作汇总 | |
| if num_cols > 0: | |
| ratio = numeric_count / num_cols | |
| else: | |
| ratio = 0.0 | |
| if (numeric_count >= max(2, int(num_cols * 0.4))) or (ratio >= 0.6): | |
| df = df.iloc[:-1, :] | |
| # ---------- 原有的导出处理(后续代码保持不变) ---------- | |
| # 下面从你原来的代码继续(不要删改),例如: | |
| # bio = BytesIO() | |
| # writer = pd.ExcelWriter(bio, engine='xlsxwriter') | |
| # ... | |
| bio = BytesIO() | |
| writer = pd.ExcelWriter(bio, engine='xlsxwriter') | |
| sheet_name = '采购执行表' | |
| df.fillna('').to_excel(writer, sheet_name=sheet_name, index=False) | |
| workbook = writer.book | |
| worksheet = writer.sheets[sheet_name] | |
| # 样式 | |
| header_format = workbook.add_format({ | |
| 'bold': True, 'font_name': 'Arial', 'font_size': 10, | |
| 'border': 1, 'align': 'center', 'valign': 'vcenter', 'bg_color': '#f6f6f6' | |
| }) | |
| border_fmt = workbook.add_format({ | |
| 'font_name': 'Arial', 'font_size': 10, 'border': 1 | |
| }) | |
| overdue_fmt = workbook.add_format({ | |
| 'font_name': 'Arial', 'font_size': 10, 'border': 1, 'font_color': 'red' | |
| }) | |
| # 写表头 | |
| for col_num, value in enumerate(df.columns.values): | |
| worksheet.write(0, col_num, value, header_format) | |
| # 仅在实际数据范围内画边框(最后一列 = df.shape[1]-1) | |
| nrows = len(df) | |
| # 如果 df 为空,确保 ncols 不为负 | |
| ncols = max(df.shape[1] - 1, 0) | |
| # conditional_format with 'no_errors' ensures each cell gets the format; header is row 0, data rows 1..nrows | |
| # last row index for conditional_format should be nrows (since header 0, data rows occupy 1..nrows) | |
| worksheet.conditional_format(1, 0, nrows, ncols, {'type': 'no_errors', 'format': border_fmt}) | |
| # 标红逾期行(与原逻辑保持一致) | |
| try: | |
| progress_col_idx = df.columns.get_loc('目前进度') | |
| for row_num in range(len(df)): | |
| status_text = str(df.iloc[row_num, progress_col_idx]) | |
| if "逾期" in status_text: | |
| worksheet.set_row(row_num + 1, None, overdue_fmt) | |
| except Exception: | |
| # 如果没有 '目前进度' 列会抛出,但不影响导出 | |
| pass | |
| # 自动列宽(限制最大宽度) | |
| for i, col in enumerate(df.columns): | |
| try: | |
| column_len = df[col].astype(str).str.len().max() | |
| column_len = max(column_len, len(col) * 2) | |
| worksheet.set_column(i, i, min(column_len, 40)) | |
| except Exception: | |
| worksheet.set_column(i, i, 20) | |
| writer.close() | |
| bio.seek(0) | |
| return bio.read() | |
| # ====== 构造 HTML 邮件体(增强友好性:加入简短说明和签名) ====== | |
| def _build_html_body(df: pd.DataFrame, title: str, simple: bool = False) -> str: | |
| """ | |
| 如果 simple=True,会生成更简洁、带文本说明+签名的邮件体(用于 Gmail 友好的重试) | |
| 默认生成详细表格版。 | |
| """ | |
| if simple: | |
| # 简洁版:短说明 + 附件提示 + 联系信息(更容易被过滤器接受) | |
| intro = f"<p>您好,</p><p>附件为 {title},请查收。如有疑问,请回复本邮件联系。</p>" | |
| signature = "<p>--<br>采购部<br>联系人: 采购组<br>电话/邮箱: example@company.com</p>" | |
| html = f"<html><head><meta charset='utf-8'></head><body>{intro}{signature}</body></html>" | |
| return html | |
| # 默认完整版:表格 + 说明 + 签名 | |
| table_html = df.fillna('').to_html(index=False, escape=False, na_rep="") | |
| intro = f"<p>您好,</p><p>以下为 {title},请查阅(表中为系统聚合后的采购执行明细):</p>" | |
| signature = "<p>--<br>采购部<br>联系人: 采购组<br>电话/邮箱: example@company.com</p>" | |
| html = f"""<html><head><meta charset="utf-8" /><style> | |
| table {{ border-collapse: collapse; font-size: 13px; }} | |
| table, th, td {{ border: 1px solid #ccc; padding: 6px; }} | |
| th {{ background:#f6f6f6; }} | |
| tr:nth-child(even) {{background-color: #f2f2f2;}} | |
| </style></head><body>{intro}{table_html}{signature}</body></html>""" | |
| return html | |
| # ====== 发送:Resend + Gmail 检测 + 可选 SMTP 回退(最小改动) ====== | |
| def _send_email_via_resend(subject: str, html_body: str, attachment_bytes: Optional[bytes], attachment_name: str) -> Tuple[bool, str]: | |
| """ | |
| 使用 Resend API 发送邮件(带附件 base64)。 | |
| - 如果返回文本包含 Gmail 的 550-5.7.1 unsolicited 信息,将返回该信息,调用者可根据该信息选择重试(此文件中实现了自动简洁重试)。 | |
| """ | |
| if not (RESEND_API_KEY and FROM_EMAIL and TO_EMAIL): | |
| return False, "缺少 Resend 配置(RESEND_API_KEY / FROM_EMAIL / TO_EMAIL)" | |
| if FROM_NAME: | |
| from_field = f"{FROM_NAME} <{FROM_EMAIL}>" | |
| else: | |
| from_field = FROM_EMAIL | |
| url = "https://api.resend.com/emails" | |
| headers = { | |
| "Authorization": f"Bearer {RESEND_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "from": from_field, | |
| "to": [TO_EMAIL], | |
| "subject": subject, | |
| "html": html_body, | |
| # 增加 Reply-To 与 Message-ID,有助于通过垃圾邮件过滤 | |
| "headers": { | |
| "Reply-To": FROM_EMAIL, | |
| "Message-ID": f"<{int(datetime.now().timestamp())}@{FROM_EMAIL.split('@')[-1]}>" | |
| } | |
| } | |
| if attachment_bytes is not None: | |
| payload["attachments"] = [ | |
| {"filename": attachment_name, "content": base64.b64encode(attachment_bytes).decode("utf-8")} | |
| ] | |
| try: | |
| resp = requests.post(url, headers=headers, data=json.dumps(payload, ensure_ascii=False).encode('utf-8'), timeout=30) | |
| try: | |
| resp_text = resp.text | |
| except Exception: | |
| resp_text = repr(resp.content) | |
| if 200 <= resp.status_code < 300: | |
| return True, f"Resend OK: {resp_text}" | |
| else: | |
| # 检测是否为 Gmail 550-5.7.1 / unsolicited | |
| lowered = resp_text.lower() | |
| if "550-5.7.1" in lowered or "unsolicited" in lowered or "unsolicited mail" in lowered or "message rejected" in lowered: | |
| # 明确标识为 Gmail 垃圾判定 | |
| extra = ";检测到可能的 Gmail unsolicited/550-5.7.1 拒收提示。" | |
| return False, f"Resend 返回失败状态 {resp.status_code}: {resp_text}{extra}" | |
| return False, f"Resend 返回失败状态 {resp.status_code}: {resp_text}" | |
| except requests.exceptions.RequestException as e: | |
| return False, f"邮件发送请求失败: {e}" | |
| def _send_email_via_smtp(subject: str, html_body: str, attachment_bytes: Optional[bytes], attachment_name: str) -> Tuple[bool, str]: | |
| """ | |
| 可选 SMTP 回退(仅在 Resend 失败时尝试)。 | |
| 需要在环境变量中配置 SMTP_HOST/SMTP_PORT/SMTP_USER/SMTP_PASS。 | |
| """ | |
| if not (SMTP_HOST and SMTP_USER and SMTP_PASS and FROM_EMAIL and TO_EMAIL): | |
| return False, "缺少 SMTP 配置(SMTP_HOST / SMTP_USER / SMTP_PASS / FROM_EMAIL / TO_EMAIL)" | |
| try: | |
| msg = EmailMessage() | |
| sender = f"{FROM_NAME} <{FROM_EMAIL}>" if FROM_NAME else FROM_EMAIL | |
| msg["From"] = sender | |
| msg["To"] = TO_EMAIL | |
| msg["Subject"] = subject | |
| msg["Reply-To"] = FROM_EMAIL | |
| msg.set_content("这是 HTML 邮件,请使用支持 HTML 的客户端查看。") | |
| msg.add_alternative(html_body, subtype='html') | |
| if attachment_bytes is not None: | |
| msg.add_attachment(attachment_bytes, maintype='application', subtype='vnd.openxmlformats-officedocument.spreadsheetml.sheet', filename=attachment_name) | |
| smtp = smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=30) | |
| smtp.starttls() | |
| smtp.login(SMTP_USER, SMTP_PASS) | |
| smtp.send_message(msg) | |
| smtp.quit() | |
| return True, "SMTP 发送成功(回退)" | |
| except Exception as e: | |
| return False, f"SMTP 发送失败(回退): {e}" | |
| # ====== 主流程:保持原逻辑,仅在发送处增加 Gmail 简洁重试与 SMTP 回退 ====== | |
| def run_once(file_path: Optional[str] = None) -> dict: | |
| try: | |
| if file_path is None: | |
| file_path = _find_latest_input(INPUT_DIR) | |
| if not file_path: | |
| return {"ok": False, "msg": f"未在 {INPUT_DIR} 找到Excel输入文件"} | |
| raw = read_system_export(file_path) | |
| final = aggregate_for_email(raw) | |
| out_name = f"采购执行表_{datetime.now().strftime('%Y%m%d')}.xlsx" | |
| out_path = os.path.join(OUTPUT_DIR, out_name) | |
| attach = _df_to_styled_excel_bytes(final) | |
| final.to_excel(out_path, index=False) | |
| subject = f"采购执行表自动推送 {datetime.now().date()}" | |
| html = _build_html_body(final, title=f"采购执行表({datetime.now().date()})", simple=False) | |
| # 首选:Resend | |
| ok, info = _send_email_via_resend(subject, html, attachment_bytes=attach, attachment_name=out_name) | |
| # 若 Resend 返回怀疑为 Gmail unsolicited(含 550-5.7.1),尝试一次“简洁模板”重试(只重试一次) | |
| if not ok and ("550-5.7.1" in info or "unsolicited" in info.lower() or "unsolicited mail" in info.lower()): | |
| # 生成更简洁的邮件内容(少表格、带签名提示),再次尝试 Resend | |
| simple_html = _build_html_body(final, title=f"采购执行表({datetime.now().date()})", simple=True) | |
| ok2, info2 = _send_email_via_resend(subject, simple_html, attachment_bytes=attach, attachment_name=out_name) | |
| if ok2: | |
| return {"ok": True, "msg": "Resend 初次被 Gmail 判断为 unsolicited,已用简洁模板重试并发送成功", "input": file_path, "output": out_path, "rows": len(final), "resend_info": info, "resend_retry_info": info2} | |
| # 如果简洁重试也失败了,继续下面的 SMTP 回退逻辑或直接返回失败信息 | |
| info = f"{info};尝试简洁模板重试结果: {info2}" | |
| # 如果仍未发送成功,并且配置了 SMTP 回退,则尝试 SMTP | |
| if not ok and SMTP_HOST and SMTP_USER and SMTP_PASS: | |
| ok_smtp, info_smtp = _send_email_via_smtp(subject, html, attachment_bytes=attach, attachment_name=out_name) | |
| if ok_smtp: | |
| return {"ok": True, "msg": "Resend 失败,但通过 SMTP 回退发送成功", "input": file_path, "output": out_path, "rows": len(final), "resend_info": info, "smtp_info": info_smtp} | |
| else: | |
| return {"ok": False, "msg": f"Resend 失败: {info};SMTP 回退也失败: {info_smtp}", "input": file_path, "output": out_path, "rows": len(final)} | |
| return {"ok": ok, "msg": "邮件发送成功" if ok else f"邮件发送失败:{info}", "input": file_path, "output": out_path, "rows": len(final)} | |
| except Exception as e: | |
| import traceback | |
| return {"ok": False, "msg": f"处理文件时发生严重错误: {e}", "traceback": traceback.format_exc()} | |
| def main(): | |
| arg_file = sys.argv[1] if len(sys.argv) > 1 else None | |
| result = run_once(arg_file) | |
| print(json.dumps(result, ensure_ascii=False)) | |
| if not result.get("ok"): | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |