| """ |
| 自动处理学术产出报告: |
| 1) 复制 SUSTECH 开头的 docx 为所在文件夹同名副本; |
| 2) 删除“ESI 高被引论文收录概况”段落及表格; |
| 3) 更新报告编号为文件夹名的前两段(如 C2026-0003); |
| 4) 读取同文件夹下的“Incites 研究领域.csv”,转置后填入 ESI 学科表; |
| 5) 读取历年产出 CSV 生成图表并插入; |
| 6) 读取记录 XLSX 与 WOS CSV,填充论文检索统计表的 CNCI 列; |
| 7) 读取 savedrecs.xlsx,填充 SCIE 收录引用概况表。 |
| """ |
| from __future__ import annotations |
|
|
| import csv |
| import logging |
| from pathlib import Path |
| import shutil |
| from typing import Any, Dict, List, Optional |
| import re |
|
|
| from docx import Document |
| from docx.shared import Pt, Inches |
| from docx.enum.text import WD_ALIGN_PARAGRAPH |
| from docx.enum.table import WD_ALIGN_VERTICAL |
| from docx.oxml.ns import qn |
| from docx.text.paragraph import Paragraph |
| from docx.table import _Cell, Table |
|
|
| import pandas as pd |
| import zipfile |
| import xml.etree.ElementTree as ET |
| import matplotlib |
| matplotlib.use("Agg") |
| import matplotlib.pyplot as plt |
|
|
|
|
| LOGGER_NAME = "academic_report_processor" |
| logger = logging.getLogger(LOGGER_NAME) |
|
|
| |
| _current_folder: Optional[Path] = None |
|
|
|
|
| def format_path(path: Path) -> str: |
| """ |
| 格式化路径显示,优先显示相对路径或文件名。 |
| 如果路径在当前处理的文件夹内,显示相对路径;否则显示文件名。 |
| """ |
| global _current_folder |
| if _current_folder and path.is_relative_to(_current_folder): |
| return str(path.relative_to(_current_folder)) |
| return path.name |
|
|
|
|
| def read_rows_from_file(path: Path) -> List[List[str]]: |
| """读取 CSV 或 XLSX 文件,统一返回 List[List[str]]。""" |
| if path.suffix.lower() == ".xlsx": |
| df = pd.read_excel(path, header=None, dtype=str, engine="openpyxl") |
| df = df.fillna("") |
| return df.values.tolist() |
| else: |
| with path.open(newline="", encoding="utf-8-sig") as f: |
| return list(csv.reader(f)) |
|
|
|
|
| def _load_wos_df(path: Path): |
| """加载 WOS CSV 或 XLSX 为 DataFrame。""" |
| if path.suffix.lower() == ".xlsx": |
| return pd.read_excel(path, engine="openpyxl") |
| return pd.read_csv(path, encoding='utf-8-sig') |
|
|
|
|
| def find_csv_or_xlsx(csv_path: Path) -> Path: |
| """给定一个 .csv 路径,若不存在则自动查找同名 .xlsx,返回实际存在的路径(原路径不存在且无替代时返回原路径)。""" |
| if csv_path.exists(): |
| return csv_path |
| xlsx_path = csv_path.with_suffix(".xlsx") |
| if xlsx_path.exists(): |
| return xlsx_path |
| return csv_path |
|
|
|
|
| def find_file_by_keywords(folder: Path, include: List[str], exclude: Optional[List[str]] = None) -> Optional[Path]: |
| """ |
| 在指定文件夹内通过关键字模糊匹配文件名。 |
| include: 文件名必须包含的关键字(不区分大小写) |
| exclude: 文件名不能包含的关键字(可选,不区分大小写) |
| """ |
| exclude = exclude or [] |
| if not folder.exists(): |
| return None |
| for f in folder.iterdir(): |
| if not f.is_file(): |
| continue |
| name = f.name.lower() |
| if all(k.lower() in name for k in include) and not any(ex.lower() in name for ex in exclude): |
| return f |
| return None |
|
|
|
|
| def configure_logging(level: int = logging.INFO) -> None: |
| """初始化全局日志配置,统一输出格式。""" |
| logger.setLevel(level) |
| root_logger = logging.getLogger() |
| if not root_logger.handlers: |
| logging.basicConfig( |
| level=level, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| datefmt="%Y-%m-%d %H:%M:%S", |
| ) |
|
|
|
|
| def format_cnci(value: str) -> str: |
| try: |
| value_float = float(value) |
| except (TypeError, ValueError): |
| return value |
| if value_float == 0: |
| return "0" |
| return f"{value_float:.2f}" |
|
|
|
|
| def format_percent(value: str) -> str: |
| text = str(value or "").strip() |
| if not text or text.endswith("%"): |
| return text |
| try: |
| f = float(text) |
| except ValueError: |
| return text |
| |
| formatted = str(int(f)) if f == int(f) else text |
| return f"{formatted}%" |
|
|
|
|
| def _find_col(header: List[str], *keywords: str) -> Optional[int]: |
| """在表头中查找包含任一关键字的列索引(不区分大小写)。""" |
| for i, h in enumerate(header): |
| h_lower = str(h).lower() |
| if any(kw.lower() in h_lower for kw in keywords): |
| return i |
| return None |
|
|
|
|
| def read_csv_block(csv_path: Path) -> List[List[str]]: |
| logger.debug(" │ └─ 读取 ESI 学科文件:%s", format_path(csv_path)) |
| rows = read_rows_from_file(csv_path) |
| if not rows: |
| logger.warning(" │ └─ CSV 数据为空:%s", format_path(csv_path)) |
| return [] |
|
|
| header = [str(h) for h in rows[0]] |
| col_name = _find_col(header, "名称", "name") |
| col_count = _find_col(header, "论文数", "web of science") |
| col_cnci = _find_col(header, "学科规范化") |
| col_top10 = _find_col(header, "10%") |
|
|
| if None in (col_name, col_count, col_cnci, col_top10): |
| logger.warning(" │ └─ 表头缺少必要列 (名称=%s 论文数=%s CNCI=%s Top10%%=%s),回退到默认索引", |
| col_name, col_count, col_cnci, col_top10) |
| col_name, col_count, col_cnci, col_top10 = 0, 1, 6, 7 |
|
|
| |
| if len(rows) > 1: |
| rows[1][col_name] = "Total" |
|
|
| |
| data_rows = [] |
| for r in rows[1:]: |
| if len(r) <= col_name or not str(r[col_name]).strip(): |
| break |
| data_rows.append(r) |
|
|
| def safe_get(r, i): |
| return str(r[i]) if i < len(r) else "" |
|
|
| trimmed = [ |
| [safe_get(r, col_name), |
| safe_get(r, col_count), |
| format_cnci(safe_get(r, col_cnci)), |
| format_percent(safe_get(r, col_top10))] |
| for r in data_rows |
| ] |
| if not trimmed: |
| return [] |
| transposed = list(map(list, zip(*trimmed))) |
|
|
| logger.debug(" │ └─ 读取完成,数据行数:%d,转置后列数:%d", len(data_rows), len(transposed[0])) |
| return transposed |
|
|
|
|
| def read_timeseries(csv_path: Path): |
| """读取"Incites 研究领域t.csv/xlsx"中数据行,提取年份、篇数、CNCI。""" |
| logger.debug(" │ └─ 读取历年产出文件:%s", format_path(csv_path)) |
| rows = read_rows_from_file(csv_path) |
| if not rows: |
| logger.warning(" │ └─ 历年产出 CSV 为空:%s", format_path(csv_path)) |
| return [] |
|
|
| header = [str(h) for h in rows[0]] |
| col_count = _find_col(header, "论文数", "web of science") |
| col_year = _find_col(header, "出版年", "year", "publication year") |
| col_cnci = _find_col(header, "学科规范化") |
|
|
| if None in (col_count, col_year, col_cnci): |
| logger.warning(" │ └─ 表头缺少必要列 (论文数=%s 出版年=%s CNCI=%s),回退到默认索引", |
| col_count, col_year, col_cnci) |
| col_count, col_year, col_cnci = 1, 2, 7 |
|
|
| |
| all_col0 = {str(r[0]).strip() for r in rows[1:] if r} |
| csv_mode = "锁定项目基准值" in all_col0 |
| _SKIP_NAMES = {"名称", "baseline for pinned items", ""} |
|
|
| data = [] |
| for r in rows[1:]: |
| col0 = str(r[0]).strip() if r else "" |
| col1 = str(r[col_count]).strip() if col_count < len(r) else "" |
| if csv_mode: |
| if col0 != "锁定项目基准值": |
| continue |
| else: |
| |
| if col0.lower() in _SKIP_NAMES or not col1: |
| continue |
| try: |
| year_val = int(float(r[col_year])) |
| count_val = float(r[col_count]) |
| cnci_val = float(r[col_cnci]) |
| except (TypeError, ValueError, IndexError): |
| continue |
| data.append((year_val, count_val, round(cnci_val, 2))) |
| data.sort(key=lambda x: x[0]) |
| logger.debug(" │ └─ 提取到 %d 条数据", len(data)) |
| return data |
|
|
|
|
| def build_chart(data, out_path: Path): |
| """生成条形折线组合图,X 轴为年份,条为论文篇数,折线为 CNCI。""" |
| if not data: |
| logger.warning(" │ └─ 历年数据为空,跳过图表生成:%s", format_path(out_path)) |
| return False |
| logger.debug(" │ └─ 生成图表:%s", format_path(out_path)) |
| years = [d[0] for d in data] |
| counts = [d[1] for d in data] |
| cncis = [d[2] for d in data] |
|
|
| |
| |
| import matplotlib.font_manager as fm |
| |
| |
| try: |
| fm._rebuild() |
| except Exception: |
| pass |
| |
| |
| available_fonts = [f.name for f in fm.fontManager.ttflist] |
| chinese_fonts = [] |
| |
| |
| |
| |
| preferred_fonts = [ |
| |
| "SimSun", |
| "STSong", |
| "STKaiti", |
| "STFangsong", |
| "FangSong", |
| "KaiTi", |
| "Song", |
| "Songti", |
| |
| "Noto Serif CJK SC", |
| "Noto Serif CJK TC", |
| "Noto Serif CJK JP", |
| "Noto Serif CJK", |
| |
| "AR PL UMing CN", |
| "AR PL UMing TW", |
| "AR PL UMing HK", |
| "AR PL UKai CN", |
| "AR PL UKai TW", |
| "AR PL UKai HK", |
| |
| "Noto Sans CJK SC", |
| "Noto Sans CJK TC", |
| "Noto Sans CJK", |
| "WenQuanYi Micro Hei", |
| "WenQuanYi Zen Hei", |
| "WenQuanYi", |
| "Source Han Sans CN", |
| "Source Han Sans", |
| "SimHei", |
| "Microsoft YaHei", |
| "Arial Unicode MS", |
| ] |
| |
| |
| for font_name in preferred_fonts: |
| |
| if font_name in available_fonts: |
| chinese_fonts.append(font_name) |
| else: |
| |
| for available_font in available_fonts: |
| if font_name.lower() in available_font.lower() or available_font.lower() in font_name.lower(): |
| if available_font not in chinese_fonts: |
| chinese_fonts.append(available_font) |
| |
| |
| if not chinese_fonts: |
| chinese_fonts = ["DejaVu Sans"] |
| logger.warning(" │ └─ 未找到中文字体,中文可能显示为方块。请确保 packages.txt 中包含 fonts-noto-cjk") |
| else: |
| logger.debug(f" │ └─ 使用字体:{chinese_fonts[0]}") |
| |
| |
| plt.rcParams["font.sans-serif"] = chinese_fonts + ["DejaVu Sans", "Arial", "sans-serif"] |
| |
| plt.rcParams["font.serif"] = ["Times New Roman", "Times", "DejaVu Serif"] |
| |
| plt.rcParams["font.weight"] = "heavy" |
| plt.rcParams["font.size"] = 12 |
| plt.rcParams["axes.unicode_minus"] = False |
|
|
| fig_width = max(7.5, len(years) * 0.85) |
| fig, ax1 = plt.subplots(figsize=(fig_width, 4.5)) |
| ax2 = ax1.twinx() |
|
|
| bar_color = "#B54840" |
| line_color = "#2C65B6" |
| marker_color = "#A2BA65" |
| |
|
|
| bars = ax1.bar(years, counts, color=bar_color, label="论文篇数", width=0.65) |
| line, = ax2.plot(years, cncis, color=line_color, marker="^", markersize=6, linewidth=3.0, |
| markerfacecolor=marker_color, markeredgecolor=marker_color, |
| markeredgewidth=1, label="CNCI") |
| |
| line.set_markerfacecolor(marker_color) |
| line.set_markeredgecolor(marker_color) |
|
|
| |
| |
| |
|
|
| y1_max = max(counts) if counts else 0 |
| y2_max = max(cncis) if cncis else 0 |
| ax1.set_ylim(0, y1_max * 1.15 + 1) |
| ax2.set_ylim(0, y2_max * 1.15 + 1) |
|
|
| |
| ax1.set_xticks(years) |
| |
| rotation = 90 if len(years) > 12 else 0 |
| ax1.tick_params(axis="x", labelrotation=rotation, labelsize=12) |
| ax1.tick_params(axis="y", labelsize=12) |
| ax2.tick_params(axis="y", labelsize=12) |
| |
| |
| for label in ax1.get_xticklabels(): |
| label.set_weight("heavy") |
| label.set_fontsize(12) |
| |
| label.set_family("serif") |
| for label in ax1.get_yticklabels(): |
| label.set_weight("heavy") |
| label.set_fontsize(12) |
| |
| label.set_family("serif") |
| for label in ax2.get_yticklabels(): |
| label.set_weight("heavy") |
| label.set_fontsize(12) |
| |
| label.set_family("serif") |
|
|
| for x, y in zip(years, cncis): |
| ax2.text(x, y, f"{y:.2f}", |
| ha="center", va="bottom", fontsize=11, color="black", weight="heavy", |
| family="serif") |
|
|
| handles = [bars, line] |
| labels = [h.get_label() for h in handles] |
| |
| legend = ax1.legend(handles, labels, loc="upper center", ncol=2, frameon=False, fontsize=14, |
| bbox_to_anchor=(0.5, -0.12), prop={"weight": "heavy", "size": 14}) |
| |
| for text in legend.get_texts(): |
| text.set_weight("heavy") |
| text.set_fontsize(14) |
| |
| label_text = text.get_text() |
| has_chinese = any('\u4e00' <= char <= '\u9fff' for char in label_text) |
| if has_chinese: |
| text.set_family("sans-serif") |
| else: |
| text.set_family("serif") |
|
|
| fig.tight_layout() |
| fig.savefig(out_path, dpi=300, bbox_inches="tight") |
| plt.close(fig) |
| logger.debug(" │ └─ 图表生成完成") |
| return True |
|
|
|
|
| def insert_chart(doc: Document, image_path: Path): |
| """在"历年论文产出表现"标题段落之后插入图片(段落方式,嵌入式)。""" |
| logger.debug(" │ └─ 插入图表") |
| target_idx = None |
| for i, p in enumerate(doc.paragraphs): |
| if "历年论文产出表现" in p.text: |
| target_idx = i |
| break |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| new_para = doc.add_paragraph() |
| new_para.alignment = WD_ALIGN_PARAGRAPH.CENTER |
| run = new_para.add_run() |
| run.add_picture(str(image_path), width=Inches(5)) |
| |
| |
| if target_idx is not None: |
| target_para = doc.paragraphs[target_idx] |
| |
| new_para._p.getparent().remove(new_para._p) |
| target_para._p.addnext(new_para._p) |
| logger.debug(" │ └─ 插入位置:%s", "正文标题后" if target_idx is not None else "文末") |
|
|
| def set_cell(cell, text: str, bold: bool = False): |
| cell.text = "" |
| cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER |
| p = cell.paragraphs[0] |
| p.alignment = WD_ALIGN_PARAGRAPH.CENTER |
| run = p.add_run(text) |
| run.bold = bold |
| run.font.name = "Times New Roman" |
| run.font.size = Pt(10.5) |
|
|
|
|
| def find_table_after_paragraph(doc: Document, paragraph_text: str) -> Optional[Table]: |
| """ |
| 查找包含指定文本的段落后的第一个表格。 |
| |
| Args: |
| doc: Document对象 |
| paragraph_text: 要查找的段落文本(部分匹配) |
| |
| Returns: |
| 找到的Table对象,如果未找到则返回None |
| """ |
| |
| body = doc.element.body |
| found_paragraph = False |
| |
| for element in body: |
| |
| if element.tag == qn('w:p'): |
| para = Paragraph(element, doc) |
| if paragraph_text in para.text: |
| found_paragraph = True |
| continue |
| |
| |
| if found_paragraph and element.tag == qn('w:tbl'): |
| |
| |
| for table in doc.tables: |
| if table._element is element: |
| return table |
| |
| return None |
|
|
|
|
| def find_table_by_title(doc: Document, title_text: str) -> Optional[Table]: |
| """ |
| 查找包含指定标题文本的段落后的第一个表格。 |
| 标题通常在单独的段落中,表格紧跟在标题后。 |
| |
| Args: |
| doc: Document对象 |
| title_text: 要查找的标题文本(部分匹配) |
| |
| Returns: |
| 找到的Table对象,如果未找到则返回None |
| """ |
| return find_table_after_paragraph(doc, title_text) |
|
|
|
|
| def remove_paragraph(paragraph: Paragraph): |
| """安全删除段落。""" |
| p = paragraph._element |
| parent = p.getparent() |
| if parent is not None: |
| parent.remove(p) |
|
|
|
|
| def remove_element(element): |
| """删除docx底层元素。""" |
| parent = element.getparent() |
| if parent is not None: |
| parent.remove(element) |
|
|
|
|
| def remove_highly_cited_section(doc: Document) -> bool: |
| """ |
| 删除"ESI 高被引论文收录概况"段落及其后的表格。 |
| 同时清理正文中出现的"ESI 高被引论文"提示文字。 |
| |
| Returns: |
| bool: 如果找到并删除了目标段落,返回 True;否则返回 False |
| """ |
| target_texts = ["ESI 高被引论文收录概况", "ESI高被引论文收录概况"] |
| found_section = False |
| removed_table = False |
|
|
| |
| for p in list(doc.paragraphs): |
| if any(t in p.text for t in target_texts): |
| found_section = True |
| |
| |
| next_el = p._element.getnext() |
| if next_el is not None and next_el.tag.endswith("tbl"): |
| remove_element(next_el) |
| removed_table = True |
| remove_paragraph(p) |
| break |
|
|
| |
| cleaned_count = 0 |
| for p in list(doc.paragraphs): |
| |
| if any(t in p.text for t in ["ESI 高被引论文", "ESI高被引论文"]): |
| for run in p.runs: |
| for t in ["ESI 高被引论文", "ESI高被引论文"]: |
| if t in run.text: |
| run.text = run.text.replace(t, "") |
| cleaned_count += 1 |
| |
| |
| |
| if found_section: |
| result_parts = ["已删除段落"] |
| if removed_table: |
| result_parts.append("及表格") |
| if cleaned_count > 0: |
| result_parts.append(f",清理了 {cleaned_count} 处提示文字") |
| logger.debug(" │ └─ %s", "".join(result_parts)) |
| return True |
| else: |
| logger.debug(" │ └─ 未找到目标段落") |
| return False |
|
|
|
|
| def update_report_number(doc: Document, report_no: str) -> bool: |
| """ |
| 将文档中的"报告编号"更新为文件夹名(如 C2026-0003-吴沁绿)。 |
| 优先匹配文档开头的段落(前3段),整段替换为"报告编号:{report_no}"。 |
| 如未找到,仅打印警告,不再扫描全文以避免误匹配。 |
| |
| Returns: |
| bool: 如果找到并更新了报告编号,返回 True;否则返回 False |
| """ |
| found = False |
| for p in doc.paragraphs[:3]: |
| if "报告编号" in p.text: |
| p.text = f"报告编号:{report_no}" |
| for run in p.runs: |
| run.font.name = "Times New Roman" |
| |
| run.bold = True |
| |
| run.font.size = Pt(14) |
| found = True |
| break |
| if not found: |
| logger.warning(" │ └─ 未在文档开头找到'报告编号',期望编号:%s", report_no) |
| else: |
| logger.debug(" │ └─ 报告编号更新为:%s", report_no) |
| return found |
|
|
|
|
| def fill_table(doc: Document, data: List[List[str]]): |
| """ |
| 将转置后的数据填入"ESI学科表现"段落后的表格: |
| 行依次为 ESI学科、论文篇数、CNCI、Top10%论文比例; |
| 第二列(索引 1)加粗。 |
| """ |
| table = find_table_after_paragraph(doc, "ESI学科表现") |
| if table is None: |
| raise ValueError("文档中找不到'ESI学科表现'段落后的表格。") |
| logger.debug(" │ └─ 填充表格,列数:%d", len(data[0]) if data else 0) |
| |
| expected_rows = ["ESI学科", "论文篇数", "CNCI", "Top10%论文比例"] |
|
|
| |
| data_cols = max((len(row) for row in data), default=0) |
| desired_cols = data_cols + 1 |
| current_cols = len(table.columns) |
|
|
| |
| saved_col0_w = table.columns[0].width if current_cols >= 1 else None |
| saved_col1_w = table.columns[1].width if current_cols >= 2 else None |
|
|
| |
| if current_cols < desired_cols: |
| |
| last_width = table.columns[-1].width if current_cols else Inches(1.2) |
| for _ in range(desired_cols - current_cols): |
| table.add_column(int(last_width) if last_width else int(Inches(1.2))) |
| logger.info("表格列不足,已补齐至 %d 列", desired_cols) |
| elif current_cols > desired_cols: |
| grid = table._tbl.tblGrid |
| grid_cols = list(grid.iterchildren()) if grid is not None else [] |
| for _ in range(current_cols - desired_cols): |
| remove_idx = len(table.columns) - 1 |
| |
| if grid is not None and remove_idx < len(grid_cols): |
| grid.remove(grid_cols[remove_idx]) |
| |
| for row in table.rows: |
| cells = row.cells |
| if remove_idx < len(cells): |
| row._tr.remove(cells[remove_idx]._tc) |
| logger.info("表格列过多,已裁剪至 %d 列", desired_cols) |
|
|
| |
| if len(table.rows) < len(data): |
| for _ in range(len(data) - len(table.rows)): |
| table.add_row() |
|
|
| |
| for row_idx, row_name in enumerate(expected_rows): |
| if row_idx < len(table.rows): |
| |
| set_cell(table.cell(row_idx, 0), row_name, bold=True) |
|
|
| |
| for row_idx, row_data in enumerate(data): |
| for col_offset, value in enumerate(row_data, start=1): |
| if col_offset >= len(table.columns): |
| break |
| bold = col_offset == 1 |
| set_cell(table.cell(row_idx, col_offset), value, bold=bold) |
|
|
| |
| if len(table.columns) >= 3 and doc.sections: |
| section = doc.sections[0] |
| usable_width = section.page_width - section.left_margin - section.right_margin |
| |
| |
| col0_w = int(saved_col0_w) if saved_col0_w else int(Inches(1.2)) |
| col1_w = int(saved_col1_w) if saved_col1_w else int(Inches(1.5)) |
| |
| |
| remain_cols = len(table.columns) - 2 |
| weights = [1, 1] + [1] * remain_cols |
| |
| |
| fixed_widths = {0: col0_w, 1: col1_w} |
| |
| |
| autosize_table_columns(table, target_width=usable_width, |
| weights=weights, fixed_widths=fixed_widths) |
| logger.debug(" │ └─ 表格填充完成") |
|
|
|
|
| def normalize_title(title: str) -> str: |
| """标准化标题,用于模糊匹配:去除标点、空格,转小写""" |
| if not title: |
| return "" |
| |
| normalized = re.sub(r'[^\w]', '', title.lower()) |
| return normalized |
|
|
|
|
| def fuzzy_match_title(query_title: str, title_list: List[str], threshold: float = 0.7) -> Optional[int]: |
| """ |
| 模糊匹配标题,返回最佳匹配的索引。 |
| 使用简单的包含匹配和相似度计算。 |
| """ |
| if not query_title: |
| return None |
| |
| query_norm = normalize_title(query_title) |
| if not query_norm: |
| return None |
| |
| best_match_idx = None |
| best_score = 0.0 |
| |
| for idx, title in enumerate(title_list): |
| if not title: |
| continue |
| title_norm = normalize_title(str(title)) |
| if not title_norm: |
| continue |
| |
| |
| if query_norm == title_norm: |
| return idx |
| |
| |
| if query_norm in title_norm or title_norm in query_norm: |
| |
| shorter = min(len(query_norm), len(title_norm)) |
| longer = max(len(query_norm), len(title_norm)) |
| score = shorter / longer if longer > 0 else 0 |
| if score > best_score and score >= threshold: |
| best_score = score |
| best_match_idx = idx |
| |
| return best_match_idx |
|
|
|
|
| def read_sustech_xlsx(file_path: Path) -> Dict[str, str]: |
| """ |
| 读取SUSTECH开头的XLSX或CSV文件,建立标题到收录号的映射。 |
| 支持XLSX和CSV格式。 |
| 返回:{标题: 收录号} 的字典 |
| """ |
| try: |
| |
| if file_path.suffix.lower() == ".xlsx": |
| df = pd.read_excel(file_path, engine="openpyxl") |
| else: |
| |
| df = pd.read_csv(file_path, encoding="utf-8-sig") |
| |
| title_col = '标题' |
| accession_col = '收录号' |
| |
| if title_col not in df.columns or accession_col not in df.columns: |
| logger.warning("文件缺少必要的列(标题、收录号): %s", file_path) |
| return {} |
| |
| mapping = {} |
| for _, row in df.iterrows(): |
| title = str(row[title_col]).strip() if pd.notna(row[title_col]) else "" |
| accession = str(row[accession_col]).strip() if pd.notna(row[accession_col]) else "" |
| if title and accession: |
| |
| existing = mapping.get(title, "") |
| if not existing or (accession.startswith("WOS:") and not existing.startswith("WOS:")): |
| mapping[title] = accession |
| |
| file_type = "XLSX" if file_path.suffix.lower() == ".xlsx" else "CSV" |
| logger.debug(" │ └─ 从 %s 读取标题-收录号映射 %d 条:%s", file_type, len(mapping), format_path(file_path)) |
| return mapping |
| except Exception as e: |
| logger.error(" │ └─ 读取文件失败: %s,错误:%s", format_path(file_path), e) |
| return {} |
|
|
|
|
| def read_wos_mappings(csv_path: Path) -> tuple: |
| """ |
| 读取 WOS CSV/XLSX,一次性返回两个映射: |
| - accession_to_cnci: {入藏号: CNCI} |
| - title_to_cnci: {归一化标题: CNCI},用于兜底匹配 |
| """ |
| try: |
| df = _load_wos_df(csv_path) |
| accession_col, title_col, cnci_col = '入藏号', '论文标题', '学科规范化的引文影响力' |
| if accession_col not in df.columns or cnci_col not in df.columns: |
| logger.warning("WOS 文件缺少必要列: %s", csv_path) |
| return {}, {} |
|
|
| acc_map: Dict[str, str] = {} |
| title_map: Dict[str, str] = {} |
| for _, row in df.iterrows(): |
| cnci = row[cnci_col] if pd.notna(row[cnci_col]) else None |
| if cnci is None: |
| continue |
| try: |
| cnci_str = format_cnci(str(float(cnci))) |
| except (ValueError, TypeError): |
| cnci_str = str(cnci) |
|
|
| accession = str(row[accession_col]).strip() if pd.notna(row[accession_col]) else "" |
| if accession: |
| acc_map[accession] = cnci_str |
|
|
| if title_col in df.columns: |
| title = str(row[title_col]).strip() if pd.notna(row[title_col]) else "" |
| if title: |
| title_map[normalize_title(title)] = cnci_str |
|
|
| logger.debug(" │ └─ WOS 映射:入藏号 %d 条,标题 %d 条:%s", |
| len(acc_map), len(title_map), format_path(csv_path)) |
| return acc_map, title_map |
| except Exception as e: |
| logger.error(" │ └─ 读取 WOS 文件失败: %s,错误:%s", format_path(csv_path), e) |
| return {}, {} |
|
|
|
|
| def find_paper_table(doc: Document) -> Optional[Table]: |
| """ |
| 查找论文检索情况统计表。 |
| 通过查找"论文检索情况统计表"标题段落后的表格来定位。 |
| """ |
| table = find_table_by_title(doc, "论文检索情况统计表") |
| if table is None: |
| |
| for table in doc.tables: |
| if len(table.rows) == 0 or len(table.columns) == 0: |
| continue |
| |
| header_row = table.rows[0] |
| header_texts = [cell.text.strip() for cell in header_row.cells] |
| if '文献标题' in header_texts and 'CNCI' in header_texts: |
| return table |
| return None |
| return table |
|
|
|
|
| def autosize_table_columns(table: Table, target_width: Optional[float] = None, |
| weights: Optional[List[int]] = None, |
| fixed_widths: Optional[Dict[int, int]] = None, |
| min_width: Optional[float] = None) -> None: |
| """ |
| 根据权重或固定宽度分配列宽。 |
| |
| Args: |
| table: 要调整的表格 |
| target_width: 目标总宽度(EMU单位或长度对象),如果为None则使用页面可用宽度 |
| weights: 列宽度权重列表,如果为None则根据表头自动识别 |
| fixed_widths: 字典 {列索引: 宽度(EMU)},指定某些列的固定宽度 |
| min_width: 最小列宽(EMU单位或长度对象),默认约0.3英寸 |
| |
| 宽度分配策略(当weights为None时): |
| - 第1列(序号):较窄(权重1) |
| - 第2列(文献标题):最宽(权重6) |
| - 第3列(文献来源):第二宽(权重4) |
| - 其余列:等宽(权重2) |
| """ |
| if not table.columns: |
| return |
| |
| num_cols = len(table.columns) |
| if num_cols == 0: |
| return |
| |
| |
| table.allow_autofit = False |
| |
| |
| if target_width is None: |
| |
| target_width = Inches(7.0) |
| target_width_int = int(target_width) |
| |
| |
| if min_width is None: |
| min_width = Inches(0.3) |
| min_width_int = int(min_width) |
| |
| |
| if weights is None: |
| |
| header_row = table.rows[0] |
| header_texts = [cell.text.strip() for cell in header_row.cells] |
| |
| |
| weights = [] |
| for idx, header in enumerate(header_texts): |
| if idx == 0 or '序号' in header: |
| weights.append(1) |
| elif '文献标题' in header or '标题' in header: |
| weights.append(6) |
| elif '文献来源' in header or '来源' in header: |
| weights.append(4) |
| else: |
| weights.append(2) |
| |
| |
| if num_cols >= 3 and all(w == 2 for w in weights): |
| weights[0] = 1 |
| weights[1] = 6 |
| weights[2] = 4 |
| else: |
| |
| if len(weights) < num_cols: |
| weights.extend([2] * (num_cols - len(weights))) |
| elif len(weights) > num_cols: |
| weights = weights[:num_cols] |
| |
| |
| fixed_widths = fixed_widths or {} |
| fixed_total = sum(fixed_widths.values()) |
| remaining_width = max(0, target_width_int - fixed_total) |
| |
| |
| flexible_cols = [i for i in range(num_cols) if i not in fixed_widths] |
| if flexible_cols: |
| flexible_weights = [weights[i] for i in flexible_cols] |
| total_weight = sum(flexible_weights) |
| |
| |
| col_widths = [0] * num_cols |
| for idx in flexible_cols: |
| weight = weights[idx] |
| width = int((weight / total_weight) * remaining_width) if total_weight > 0 else remaining_width // len(flexible_cols) |
| width = max(width, min_width_int) |
| col_widths[idx] = width |
| |
| |
| current_flexible_total = sum(col_widths[i] for i in flexible_cols) |
| diff = remaining_width - current_flexible_total |
| if diff != 0 and flexible_cols: |
| |
| max_weight_idx = max(flexible_cols, key=lambda i: weights[i]) |
| col_widths[max_weight_idx] += diff |
| else: |
| col_widths = [0] * num_cols |
| |
| |
| for col_idx, width in fixed_widths.items(): |
| if 0 <= col_idx < num_cols: |
| col_widths[col_idx] = int(width) |
| |
| |
| for col_idx, column in enumerate(table.columns): |
| width = col_widths[col_idx] |
| column.width = width |
| for cell in column.cells: |
| cell.width = width |
|
|
| logger.debug(" │ └─ 自动调整列宽完成,列数:%d,总宽:%d EMU,权重:%s,固定宽度:%s", |
| num_cols, sum(col_widths), weights, fixed_widths) |
|
|
|
|
| def fill_cnci_column(doc: Document, record_path: Path, wos_path: Path) -> tuple[int, int]: |
| """ |
| 填充论文检索情况统计表的CNCI列。 |
| 通过标题模糊匹配到收录号,再通过收录号匹配到CNCI。 |
| |
| Returns: |
| tuple[int, int]: (匹配数量, 总行数) |
| """ |
| |
| table = find_paper_table(doc) |
| if not table: |
| logger.warning(" │ └─ ✗ 未找到论文检索情况统计表") |
| return (0, 0) |
| |
| |
| header_row = table.rows[0] |
| header_texts = [cell.text.strip() for cell in header_row.cells] |
| try: |
| title_col_idx = header_texts.index('文献标题') |
| cnci_col_idx = header_texts.index('CNCI') |
| except ValueError: |
| logger.warning(" │ └─ ✗ 表格缺少'文献标题'或'CNCI'列") |
| return (0, 0) |
| |
| |
| title_to_accession = read_sustech_xlsx(record_path) |
| if not title_to_accession: |
| logger.warning(" │ └─ ✗ 无法读取标题-收录号映射") |
| return (0, 0) |
| |
| accession_to_cnci, wos_title_to_cnci = read_wos_mappings(wos_path) |
| if not accession_to_cnci: |
| logger.warning(" │ └─ ✗ 无法读取入藏号-CNCI 映射") |
| return (0, 0) |
|
|
| |
| xlsx_titles = list(title_to_accession.keys()) |
| |
| |
| matched_count = 0 |
| total_rows = len(table.rows) - 1 |
| for row_idx in range(1, len(table.rows)): |
| row = table.rows[row_idx] |
| if title_col_idx >= len(row.cells) or cnci_col_idx >= len(row.cells): |
| continue |
| |
| title_cell = row.cells[title_col_idx] |
| cnci_cell = row.cells[cnci_col_idx] |
| title = title_cell.text.strip() |
| |
| if not title: |
| continue |
| |
| |
| accession = title_to_accession.get(title) |
| |
| |
| if not accession: |
| match_idx = fuzzy_match_title(title, xlsx_titles) |
| if match_idx is not None: |
| matched_title = xlsx_titles[match_idx] |
| accession = title_to_accession.get(matched_title) |
| |
| |
| cnci_value = "/" |
| if accession: |
| cnci_value = accession_to_cnci.get(accession, "/") |
|
|
| |
| if cnci_value == "/" and wos_title_to_cnci: |
| title_norm = normalize_title(title) |
| cnci_value = wos_title_to_cnci.get(title_norm, "/") |
| |
| |
| set_cell(cnci_cell, cnci_value) |
| if cnci_value != "/": |
| matched_count += 1 |
|
|
| autosize_table_columns(table) |
| logger.debug(" │ └─ CNCI 列填充完成,成功匹配:%d/%d 条", matched_count, total_rows) |
| return (matched_count, total_rows) |
|
|
|
|
| def read_savedrecs_stats(xlsx_path: Path) -> Dict[str, Any]: |
| """ |
| 读取savedrecs.xlsx文件,提取统计数据。 |
| 查找A列中的特定文本,获取其后的数值。 |
| |
| 返回字典,包含: |
| - paper_count: 论文篇数 (Results found) |
| - total_citations: 总被引次数 (Sum of the Times Cited, B列) |
| - other_citations: 他引次数 (Sum of the Times Cited, C列) |
| - avg_citations: 平均被引次数 (Average Citations per Item) |
| - h_index: H指数 (h-index) |
| """ |
| logger.debug(" │ └─ 读取 savedrecs.xlsx 统计数据:%s", format_path(xlsx_path)) |
| try: |
| |
| with zipfile.ZipFile(xlsx_path) as z: |
| |
| shared_strings = [] |
| try: |
| sst_xml = z.read('xl/sharedStrings.xml') |
| sst_root = ET.fromstring(sst_xml) |
| ns = {'a': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'} |
| for si in sst_root.findall('.//a:si', ns): |
| texts = [] |
| for t in si.findall('.//a:t', ns): |
| texts.append(t.text or '') |
| shared_strings.append(''.join(texts)) |
| except KeyError: |
| pass |
| |
| |
| sheet_xml = z.read('xl/worksheets/sheet1.xml') |
| sheet_root = ET.fromstring(sheet_xml) |
| ns = {'a': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'} |
| |
| def get_cell_value(cell_elem): |
| """获取单元格的值""" |
| t = cell_elem.get('t') |
| v_elem = cell_elem.find('a:v', ns) |
| if v_elem is None: |
| return None |
| val = v_elem.text |
| if t == 's': |
| try: |
| idx = int(val) |
| return shared_strings[idx] if idx < len(shared_strings) else None |
| except (ValueError, IndexError): |
| return None |
| return val |
| |
| |
| cell_map = {} |
| for row in sheet_root.findall('.//a:sheetData/a:row', ns): |
| for cell in row.findall('a:c', ns): |
| cell_ref = cell.get('r', '') |
| if cell_ref: |
| cell_map[cell_ref] = get_cell_value(cell) |
| |
| |
| stats = {} |
| |
| |
| for ref, val in cell_map.items(): |
| if val and isinstance(val, str) and 'Results found' in val: |
| |
| row_num = ''.join([c for c in ref if c.isdigit()]) |
| col_letter = ''.join([c for c in ref if c.isalpha()]) |
| if col_letter == 'A' and row_num: |
| b_ref = 'B' + row_num |
| if b_ref in cell_map: |
| try: |
| stats['paper_count'] = int(float(cell_map[b_ref])) |
| except (ValueError, TypeError): |
| pass |
| |
| |
| for ref, val in cell_map.items(): |
| if val and isinstance(val, str) and 'Sum of the Times Cited' in val: |
| row_num = ''.join([c for c in ref if c.isdigit()]) |
| if row_num: |
| b_ref = 'B' + row_num |
| c_ref = 'C' + row_num |
| if b_ref in cell_map: |
| try: |
| stats['total_citations'] = int(float(cell_map[b_ref])) |
| except (ValueError, TypeError): |
| pass |
| if c_ref in cell_map: |
| try: |
| stats['other_citations'] = int(float(cell_map[c_ref])) |
| except (ValueError, TypeError): |
| pass |
| |
| |
| for ref, val in cell_map.items(): |
| if val and isinstance(val, str) and 'Average Citations per Item' in val: |
| row_num = ''.join([c for c in ref if c.isdigit()]) |
| if row_num: |
| b_ref = 'B' + row_num |
| if b_ref in cell_map: |
| try: |
| stats['avg_citations'] = float(cell_map[b_ref]) |
| except (ValueError, TypeError): |
| pass |
| |
| |
| for ref, val in cell_map.items(): |
| if val and isinstance(val, str) and 'h-index' in val: |
| row_num = ''.join([c for c in ref if c.isdigit()]) |
| if row_num: |
| b_ref = 'B' + row_num |
| if b_ref in cell_map: |
| try: |
| stats['h_index'] = int(float(cell_map[b_ref])) |
| except (ValueError, TypeError): |
| pass |
| |
| logger.debug(" │ └─ 统计数据读取完成:%s", stats) |
| return stats |
| |
| except Exception as e: |
| logger.error(" │ └─ 读取失败,错误:%s", e) |
| return {} |
|
|
|
|
| def fill_scie_table(doc: Document, stats: Dict[str, Any]): |
| """ |
| 填充"SCIE收录引用概况"(若缺则回退到"SSCI收录引用概况")段落后的表格。 |
| 表格结构: |
| - 第一行:论文篇数 | 引用情况(跨4列) | JCR分区(跨2列) | 作者类型(跨3列) | 高被引论文篇数 |
| - 第二行:论文篇数 | 总被引次数 | 他引次数 | 平均被引次数 | H指数 | Q1篇数 | Q2篇数 | 第一篇数 | 共同第一篇数 | 通讯篇数 | 高被引论文篇数 |
| - 第三行:数据行(需要填充) |
| """ |
| |
| table = find_table_after_paragraph(doc, "SCIE收录引用概况") |
| table_label = "SCIE收录引用概况" |
| if table is None: |
| table = find_table_after_paragraph(doc, "SSCI收录引用概况") |
| table_label = "SSCI收录引用概况" if table is not None else table_label |
|
|
| if table is None: |
| logger.warning("未找到 'SCIE/SSCI 收录引用概况' 表格,跳过填充") |
| return |
| |
| |
| if len(table.rows) < 3: |
| logger.warning("表格行数不足,需要至少3行") |
| return |
| |
| |
| data_row = table.rows[-1] |
| |
| |
| |
| if 'paper_count' in stats: |
| set_cell(data_row.cells[0], str(stats['paper_count'])) |
| |
| |
| if 'total_citations' in stats: |
| set_cell(data_row.cells[1], str(stats['total_citations'])) |
| |
| |
| if 'other_citations' in stats: |
| set_cell(data_row.cells[2], str(stats['other_citations'])) |
| |
| |
| if 'avg_citations' in stats: |
| avg_str = f"{stats['avg_citations']:.2f}" |
| set_cell(data_row.cells[3], avg_str) |
| |
| |
| if 'h_index' in stats: |
| set_cell(data_row.cells[4], str(stats['h_index'])) |
| |
| |
| |
| stats_summary = [] |
| if 'paper_count' in stats: |
| stats_summary.append(f"论文: {stats['paper_count']}") |
| if 'total_citations' in stats: |
| stats_summary.append(f"总被引: {stats['total_citations']}") |
| if 'h_index' in stats: |
| stats_summary.append(f"H指数: {stats['h_index']}") |
| logger.debug(" │ └─ 填充 %s 完成 (%s)", table_label, ", ".join(stats_summary)) |
|
|
|
|
| def process_folder(folder: Path, index: int = 0, total: int = 0): |
| """处理单个文件夹,生成学术产出报告。""" |
| global _current_folder |
| _current_folder = folder |
| |
| |
| if total > 0: |
| logger.info("[%d/%d] 开始处理:%s", index, total, folder.name) |
| else: |
| logger.info("开始处理:%s", folder.name) |
| |
| docx_files = sorted(folder.glob("SUSTECH*.docx")) |
| if not docx_files: |
| |
| fallback = sorted(f for f in folder.glob("*.docx") if f.stem != folder.name) |
| if fallback: |
| logger.info(" ├─ 未找到 SUSTECH*.docx,使用:%s", fallback[0].name) |
| docx_files = fallback |
| else: |
| logger.warning(" └─ 未找到模板 docx,跳过") |
| return |
| |
| |
| src_docx = docx_files[0] |
| dst_docx = folder / f"{folder.name}.docx" |
| shutil.copy(src_docx, dst_docx) |
| logger.info(" ├─ 复制模板:%s → %s", format_path(src_docx), format_path(dst_docx)) |
|
|
| doc = Document(dst_docx) |
|
|
| |
| logger.info(" ├─ 移除 ESI 高被引论文收录概况") |
| removed = remove_highly_cited_section(doc) |
| if removed: |
| logger.info(" │ └─ ✓ 已删除") |
| else: |
| logger.warning(" │ └─ ✗ 未找到目标段落") |
|
|
| |
| name_parts = folder.name.split("-") |
| report_no = f"{name_parts[0]}-{name_parts[1]}" if len(name_parts) >= 2 else folder.name |
| logger.info(" ├─ 更新报告编号") |
| updated = update_report_number(doc, report_no) |
| if not updated: |
| logger.warning(" │ └─ ✗ 未找到报告编号段落") |
|
|
| |
| csv_path = find_csv_or_xlsx(folder / "Incites 研究领域.csv") |
| logger.info(" ├─ 填充 ESI 学科表现表") |
| if not csv_path.exists(): |
| |
| alt = find_file_by_keywords(folder, ["incites", "研究"], exclude=["t.csv", "t.xlsx", "t "]) |
| if alt: |
| logger.info(" │ └─ 使用模糊匹配文件:%s", format_path(alt)) |
| csv_path = alt |
| if csv_path.exists(): |
| data = read_csv_block(csv_path) |
| if data: |
| fill_table(doc, data) |
| logger.info(" │ └─ ✓ 填充完成 (%d 列)", len(data[0]) if data else 0) |
| else: |
| logger.warning(" │ └─ ✗ 文件数据为空") |
| else: |
| logger.warning(" │ └─ ✗ 未找到 ESI 学科文件") |
|
|
| |
| timeseries_csv = find_csv_or_xlsx(folder / "Incites 研究领域t.csv") |
| logger.info(" ├─ 生成历年产出图表") |
| if not timeseries_csv.exists(): |
| |
| alt_ts = find_file_by_keywords(folder, ["incites", "t"]) |
| if alt_ts: |
| logger.info(" │ └─ 使用模糊匹配文件:%s", format_path(alt_ts)) |
| timeseries_csv = alt_ts |
| if timeseries_csv.exists(): |
| ts_data = read_timeseries(timeseries_csv) |
| img_path = folder / "timeseries_chart.png" |
| if build_chart(ts_data, img_path): |
| insert_chart(doc, img_path) |
| logger.info(" │ └─ ✓ 图表生成并插入完成") |
| else: |
| logger.warning(" │ └─ ✗ 历年数据为空,未生成图表") |
| else: |
| logger.warning(" │ └─ ✗ 缺少历年产出 CSV") |
|
|
| |
| record_files = sorted(folder.glob("SUSTECH*.xlsx")) |
| logger.info(" ├─ 填充 CNCI 列") |
| if not record_files: |
| logger.warning(" │ └─ ✗ 未找到记录 XLSX") |
| else: |
| record_path = record_files[0] |
| wos_path = find_csv_or_xlsx(folder / "Web of Science Documents.csv") |
| if record_path.exists() and wos_path.exists(): |
| matched_count, total_rows = fill_cnci_column(doc, record_path, wos_path) |
| if matched_count > 0: |
| logger.info(" │ └─ ✓ 填充完成 (匹配: %d/%d)", matched_count, total_rows) |
| else: |
| logger.warning(" │ └─ ✗ 未匹配到任何数据") |
| else: |
| logger.warning(" │ └─ ✗ 缺少记录 XLSX 或 Web of Science Documents.csv") |
|
|
| |
| savedrecs_xlsx = folder / "savedrecs.xlsx" |
| logger.info(" ├─ 填充 SCIE/SSCI 收录引用概况") |
| if savedrecs_xlsx.exists(): |
| stats = read_savedrecs_stats(savedrecs_xlsx) |
| if stats: |
| fill_scie_table(doc, stats) |
| stats_summary = [] |
| if 'paper_count' in stats: |
| stats_summary.append(f"论文: {stats['paper_count']}") |
| if 'total_citations' in stats: |
| stats_summary.append(f"总被引: {stats['total_citations']}") |
| if 'h_index' in stats: |
| stats_summary.append(f"H指数: {stats['h_index']}") |
| logger.info(" │ └─ ✓ 填充完成 (%s)", ", ".join(stats_summary)) |
| else: |
| logger.warning(" │ └─ ✗ 无法读取统计数据") |
| else: |
| logger.warning(" │ └─ ✗ 未找到 savedrecs.xlsx") |
| |
| |
| |
| counter = 1 |
| parts_to_fix = [doc.part] + list(doc.part.related_parts.values()) |
| for part in parts_to_fix: |
| if hasattr(part, '_element'): |
| for docPr in part._element.iter(qn('wp:docPr')): |
| docPr.set('id', str(counter)) |
| counter += 1 |
|
|
| doc.save(dst_docx) |
| logger.info(" └─ ✓ 处理完成:%s", format_path(dst_docx)) |
|
|
|
|
| def main(): |
| configure_logging() |
| logger.info("开始批量处理学术产出报告") |
| base = Path(__file__).resolve().parent |
| |
| |
| folders = [f for f in sorted(base.iterdir()) if f.is_dir() and f.name.startswith("C")] |
| total = len(folders) |
| |
| if total == 0: |
| logger.warning("未找到需要处理的文件夹(以 C 开头)") |
| return |
| |
| logger.info("找到 %d 个文件夹需要处理\n", total) |
| |
| |
| for idx, folder in enumerate(folders, start=1): |
| process_folder(folder, index=idx, total=total) |
| if idx < total: |
| logger.info("") |
| |
| logger.info("批量处理完成,共处理 %d 个文件夹", total) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|