sustech-lib-academic-report / process_incites_docx.py
jam-cc
fix: format_percent整数去除多余.0(如0.0%→0%)
1ae2819
"""
自动处理学术产出报告:
1) 复制 SUSTECH 开头的 docx 为所在文件夹同名副本;
2) 删除“ESI 高被引论文收录概况”段落及表格;
3) 更新报告编号为文件夹名的前两段(如 C2026-0003);
4) 读取同文件夹下的“Incites 研究领域.csv”,转置后填入 ESI 学科表;
5) 读取历年产出 CSV 生成图表并插入;
6) 读取记录 XLSX 与 WOS CSV,填充论文检索统计表的 CNCI 列;
7) 读取 savedrecs.xlsx,填充 SCIE 收录引用概况表。
"""
from __future__ import annotations
import csv
import logging
from pathlib import Path
import shutil
from typing import Any, Dict, List, Optional
import re
from docx import Document
from docx.shared import Pt, Inches
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.enum.table import WD_ALIGN_VERTICAL
from docx.oxml.ns import qn
from docx.text.paragraph import Paragraph
from docx.table import _Cell, Table
import pandas as pd
import zipfile
import xml.etree.ElementTree as ET
import matplotlib
matplotlib.use("Agg") # headless
import matplotlib.pyplot as plt
LOGGER_NAME = "academic_report_processor"
logger = logging.getLogger(LOGGER_NAME)
# 全局变量:当前处理的文件夹路径,用于路径格式化
_current_folder: Optional[Path] = None
def format_path(path: Path) -> str:
"""
格式化路径显示,优先显示相对路径或文件名。
如果路径在当前处理的文件夹内,显示相对路径;否则显示文件名。
"""
global _current_folder
if _current_folder and path.is_relative_to(_current_folder):
return str(path.relative_to(_current_folder))
return path.name
def read_rows_from_file(path: Path) -> List[List[str]]:
"""读取 CSV 或 XLSX 文件,统一返回 List[List[str]]。"""
if path.suffix.lower() == ".xlsx":
df = pd.read_excel(path, header=None, dtype=str, engine="openpyxl")
df = df.fillna("")
return df.values.tolist()
else:
with path.open(newline="", encoding="utf-8-sig") as f:
return list(csv.reader(f))
def _load_wos_df(path: Path):
"""加载 WOS CSV 或 XLSX 为 DataFrame。"""
if path.suffix.lower() == ".xlsx":
return pd.read_excel(path, engine="openpyxl")
return pd.read_csv(path, encoding='utf-8-sig')
def find_csv_or_xlsx(csv_path: Path) -> Path:
"""给定一个 .csv 路径,若不存在则自动查找同名 .xlsx,返回实际存在的路径(原路径不存在且无替代时返回原路径)。"""
if csv_path.exists():
return csv_path
xlsx_path = csv_path.with_suffix(".xlsx")
if xlsx_path.exists():
return xlsx_path
return csv_path
def find_file_by_keywords(folder: Path, include: List[str], exclude: Optional[List[str]] = None) -> Optional[Path]:
"""
在指定文件夹内通过关键字模糊匹配文件名。
include: 文件名必须包含的关键字(不区分大小写)
exclude: 文件名不能包含的关键字(可选,不区分大小写)
"""
exclude = exclude or []
if not folder.exists():
return None
for f in folder.iterdir():
if not f.is_file():
continue
name = f.name.lower()
if all(k.lower() in name for k in include) and not any(ex.lower() in name for ex in exclude):
return f
return None
def configure_logging(level: int = logging.INFO) -> None:
"""初始化全局日志配置,统一输出格式。"""
logger.setLevel(level)
root_logger = logging.getLogger()
if not root_logger.handlers:
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def format_cnci(value: str) -> str:
try:
value_float = float(value)
except (TypeError, ValueError):
return value
if value_float == 0:
return "0"
return f"{value_float:.2f}"
def format_percent(value: str) -> str:
text = str(value or "").strip()
if not text or text.endswith("%"):
return text
try:
f = float(text)
except ValueError:
return text # 非数字(如 n/a)不加百分号
# 整数值去掉多余的 .0(如 "0.0" → "0%","66.67" 保持原样)
formatted = str(int(f)) if f == int(f) else text
return f"{formatted}%"
def _find_col(header: List[str], *keywords: str) -> Optional[int]:
"""在表头中查找包含任一关键字的列索引(不区分大小写)。"""
for i, h in enumerate(header):
h_lower = str(h).lower()
if any(kw.lower() in h_lower for kw in keywords):
return i
return None
def read_csv_block(csv_path: Path) -> List[List[str]]:
logger.debug(" │ └─ 读取 ESI 学科文件:%s", format_path(csv_path))
rows = read_rows_from_file(csv_path)
if not rows:
logger.warning(" │ └─ CSV 数据为空:%s", format_path(csv_path))
return []
header = [str(h) for h in rows[0]]
col_name = _find_col(header, "名称", "name")
col_count = _find_col(header, "论文数", "web of science")
col_cnci = _find_col(header, "学科规范化")
col_top10 = _find_col(header, "10%")
if None in (col_name, col_count, col_cnci, col_top10):
logger.warning(" │ └─ 表头缺少必要列 (名称=%s 论文数=%s CNCI=%s Top10%%=%s),回退到默认索引",
col_name, col_count, col_cnci, col_top10)
col_name, col_count, col_cnci, col_top10 = 0, 1, 6, 7
# 如果有数据行,设置首行第一列为 Total
if len(rows) > 1:
rows[1][col_name] = "Total"
# 截取到第一个空白行(以名称列为空为止)
data_rows = []
for r in rows[1:]:
if len(r) <= col_name or not str(r[col_name]).strip():
break
data_rows.append(r)
def safe_get(r, i):
return str(r[i]) if i < len(r) else ""
trimmed = [
[safe_get(r, col_name),
safe_get(r, col_count),
format_cnci(safe_get(r, col_cnci)),
format_percent(safe_get(r, col_top10))]
for r in data_rows
]
if not trimmed:
return []
transposed = list(map(list, zip(*trimmed)))
logger.debug(" │ └─ 读取完成,数据行数:%d,转置后列数:%d", len(data_rows), len(transposed[0]))
return transposed
def read_timeseries(csv_path: Path):
"""读取"Incites 研究领域t.csv/xlsx"中数据行,提取年份、篇数、CNCI。"""
logger.debug(" │ └─ 读取历年产出文件:%s", format_path(csv_path))
rows = read_rows_from_file(csv_path)
if not rows:
logger.warning(" │ └─ 历年产出 CSV 为空:%s", format_path(csv_path))
return []
header = [str(h) for h in rows[0]]
col_count = _find_col(header, "论文数", "web of science")
col_year = _find_col(header, "出版年", "year", "publication year")
col_cnci = _find_col(header, "学科规范化")
if None in (col_count, col_year, col_cnci):
logger.warning(" │ └─ 表头缺少必要列 (论文数=%s 出版年=%s CNCI=%s),回退到默认索引",
col_count, col_year, col_cnci)
col_count, col_year, col_cnci = 1, 2, 7
# 检测模式:CSV导出用"锁定项目基准值",xlsx导出用英文字段名
all_col0 = {str(r[0]).strip() for r in rows[1:] if r}
csv_mode = "锁定项目基准值" in all_col0
_SKIP_NAMES = {"名称", "baseline for pinned items", ""}
data = []
for r in rows[1:]:
col0 = str(r[0]).strip() if r else ""
col1 = str(r[col_count]).strip() if col_count < len(r) else ""
if csv_mode:
if col0 != "锁定项目基准值":
continue
else:
# XLSX模式:跳过表头行和基准行,取有论文数的行
if col0.lower() in _SKIP_NAMES or not col1:
continue
try:
year_val = int(float(r[col_year]))
count_val = float(r[col_count])
cnci_val = float(r[col_cnci])
except (TypeError, ValueError, IndexError):
continue
data.append((year_val, count_val, round(cnci_val, 2)))
data.sort(key=lambda x: x[0])
logger.debug(" │ └─ 提取到 %d 条数据", len(data))
return data
def build_chart(data, out_path: Path):
"""生成条形折线组合图,X 轴为年份,条为论文篇数,折线为 CNCI。"""
if not data:
logger.warning(" │ └─ 历年数据为空,跳过图表生成:%s", format_path(out_path))
return False
logger.debug(" │ └─ 生成图表:%s", format_path(out_path))
years = [d[0] for d in data]
counts = [d[1] for d in data]
cncis = [d[2] for d in data]
# 设置中文字体(优先使用 Linux 系统可用的字体)
# 在 Hugging Face Space (Linux) 中,通过 packages.txt 安装 fonts-noto-cjk
import matplotlib.font_manager as fm
# 重新加载字体缓存(确保新安装的字体被识别)
try:
fm._rebuild()
except Exception:
pass # 如果重建失败,继续使用现有缓存
# 查找可用的中文字体
available_fonts = [f.name for f in fm.fontManager.ttflist]
chinese_fonts = []
# 优先尝试的字体列表(优先宋体及相近字体,按优先级排序)
# 说明:Linux 下没有原生 SimSun,首选 Noto Serif CJK (衬线) 作为宋体替代,
# 其次 AR PL UMing/UKai,最后 Noto Sans / 文泉驿 / 思源等无衬线。
preferred_fonts = [
# Windows/Mac 可能存在的宋体/楷体
"SimSun",
"STSong",
"STKaiti",
"STFangsong",
"FangSong",
"KaiTi",
"Song",
"Songti",
# 宋体替代(衬线,优先)
"Noto Serif CJK SC",
"Noto Serif CJK TC",
"Noto Serif CJK JP",
"Noto Serif CJK",
# AR PL 系列(类似宋体/楷体)
"AR PL UMing CN",
"AR PL UMing TW",
"AR PL UMing HK",
"AR PL UKai CN",
"AR PL UKai TW",
"AR PL UKai HK",
# 无衬线中文(次选)
"Noto Sans CJK SC",
"Noto Sans CJK TC",
"Noto Sans CJK",
"WenQuanYi Micro Hei",
"WenQuanYi Zen Hei",
"WenQuanYi",
"Source Han Sans CN",
"Source Han Sans",
"SimHei",
"Microsoft YaHei",
"Arial Unicode MS",
]
# 查找可用的中文字体(支持部分匹配)
for font_name in preferred_fonts:
# 精确匹配
if font_name in available_fonts:
chinese_fonts.append(font_name)
else:
# 部分匹配(处理字体名称变体)
for available_font in available_fonts:
if font_name.lower() in available_font.lower() or available_font.lower() in font_name.lower():
if available_font not in chinese_fonts:
chinese_fonts.append(available_font)
# 如果没有找到中文字体,使用 DejaVu Sans(至少能显示,虽然中文可能显示为方块)
if not chinese_fonts:
chinese_fonts = ["DejaVu Sans"]
logger.warning(" │ └─ 未找到中文字体,中文可能显示为方块。请确保 packages.txt 中包含 fonts-noto-cjk")
else:
logger.debug(f" │ └─ 使用字体:{chinese_fonts[0]}")
# 设置字体(优先使用找到的中文字体)
plt.rcParams["font.sans-serif"] = chinese_fonts + ["DejaVu Sans", "Arial", "sans-serif"]
# 设置英文字体(serif),Times New Roman 优先
plt.rcParams["font.serif"] = ["Times New Roman", "Times", "DejaVu Serif"]
# 不设置全局 font.family,让中文使用 sans-serif,数字单独设置为 serif
plt.rcParams["font.weight"] = "heavy"
plt.rcParams["font.size"] = 12
plt.rcParams["axes.unicode_minus"] = False
fig_width = max(7.5, len(years) * 0.85)
fig, ax1 = plt.subplots(figsize=(fig_width, 4.5))
ax2 = ax1.twinx()
bar_color = "#B54840"
line_color = "#2C65B6" # 蓝色
marker_color = "#A2BA65" # 绿色
# marker_color = "#0070C0" # 绿色
bars = ax1.bar(years, counts, color=bar_color, label="论文篇数", width=0.65)
line, = ax2.plot(years, cncis, color=line_color, marker="^", markersize=6, linewidth=3.0,
markerfacecolor=marker_color, markeredgecolor=marker_color,
markeredgewidth=1, label="CNCI")
# 强制设置标记颜色为绿色
line.set_markerfacecolor(marker_color)
line.set_markeredgecolor(marker_color)
# ax1.set_xlabel("年份", fontsize=12, weight="heavy")
# ax1.set_ylabel("论文篇数", fontsize=12, weight="heavy")
# ax2.set_ylabel("CNCI", fontsize=12, weight="heavy")
y1_max = max(counts) if counts else 0
y2_max = max(cncis) if cncis else 0
ax1.set_ylim(0, y1_max * 1.15 + 1)
ax2.set_ylim(0, y2_max * 1.15 + 1)
# 强制显示所有年份刻度
ax1.set_xticks(years)
# 智能设置标签旋转:如果年份较多(超过12个),则竖排以防重叠;否则横排
rotation = 90 if len(years) > 12 else 0
ax1.tick_params(axis="x", labelrotation=rotation, labelsize=12)
ax1.tick_params(axis="y", labelsize=12)
ax2.tick_params(axis="y", labelsize=12)
# 设置刻度标签字体加粗
for label in ax1.get_xticklabels():
label.set_weight("heavy")
label.set_fontsize(12)
# 年份是数字,使用 Times New Roman
label.set_family("serif")
for label in ax1.get_yticklabels():
label.set_weight("heavy")
label.set_fontsize(12)
# 数值是数字,使用 Times New Roman
label.set_family("serif")
for label in ax2.get_yticklabels():
label.set_weight("heavy")
label.set_fontsize(12)
# 数值是数字,使用 Times New Roman
label.set_family("serif")
for x, y in zip(years, cncis):
ax2.text(x, y, f"{y:.2f}",
ha="center", va="bottom", fontsize=11, color="black", weight="heavy",
family="serif")
handles = [bars, line]
labels = [h.get_label() for h in handles]
# 创建图例
legend = ax1.legend(handles, labels, loc="upper center", ncol=2, frameon=False, fontsize=14,
bbox_to_anchor=(0.5, -0.12), prop={"weight": "heavy", "size": 14})
# 遍历图例文本,根据内容设置字体:中文用 sans-serif,英文用 serif
for text in legend.get_texts():
text.set_weight("heavy")
text.set_fontsize(14)
# 检查是否包含中文字符
label_text = text.get_text()
has_chinese = any('\u4e00' <= char <= '\u9fff' for char in label_text)
if has_chinese:
text.set_family("sans-serif") # 中文用宋体
else:
text.set_family("serif") # 英文用 Times New Roman
fig.tight_layout()
fig.savefig(out_path, dpi=300, bbox_inches="tight")
plt.close(fig)
logger.debug(" │ └─ 图表生成完成")
return True
def insert_chart(doc: Document, image_path: Path):
"""在"历年论文产出表现"标题段落之后插入图片(段落方式,嵌入式)。"""
logger.debug(" │ └─ 插入图表")
target_idx = None
for i, p in enumerate(doc.paragraphs):
if "历年论文产出表现" in p.text:
target_idx = i
break
# # 在目标位置插入空段落作为占位符(这一大段注释永远不要删除,有可能要插入空段落使用)
# if target_idx is not None:
# target_para = doc.paragraphs[target_idx]
# # 使用底层XML操作在目标段落后插入空段落(只插入,不移动,应该安全)
# from docx.oxml import OxmlElement
# from docx.oxml.ns import qn
# from docx.text.paragraph import Paragraph
# new_p = OxmlElement('w:p')
# # 添加段落属性(居中对齐)
# pPr = OxmlElement('w:pPr')
# jc = OxmlElement('w:jc')
# jc.set(qn('w:val'), 'center')
# pPr.append(jc)
# new_p.append(pPr)
# # 插入到目标段落之后
# target_para._p.addnext(new_p)
# 在文档末尾添加图片段落(避免移动XML导致问题)
new_para = doc.add_paragraph()
new_para.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = new_para.add_run()
run.add_picture(str(image_path), width=Inches(5))
# 如果找到了目标段落,将新段落移动到目标段落之后(会引起word发现无法读取的内容)
if target_idx is not None:
target_para = doc.paragraphs[target_idx]
# 将新段落的 XML 元素移动到目标段落之后
new_para._p.getparent().remove(new_para._p)
target_para._p.addnext(new_para._p)
logger.debug(" │ └─ 插入位置:%s", "正文标题后" if target_idx is not None else "文末")
def set_cell(cell, text: str, bold: bool = False):
cell.text = ""
cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
p = cell.paragraphs[0]
p.alignment = WD_ALIGN_PARAGRAPH.CENTER
run = p.add_run(text)
run.bold = bold
run.font.name = "Times New Roman"
run.font.size = Pt(10.5) # 五号
def find_table_after_paragraph(doc: Document, paragraph_text: str) -> Optional[Table]:
"""
查找包含指定文本的段落后的第一个表格。
Args:
doc: Document对象
paragraph_text: 要查找的段落文本(部分匹配)
Returns:
找到的Table对象,如果未找到则返回None
"""
# 遍历文档的所有元素(段落和表格)
body = doc.element.body
found_paragraph = False
for element in body:
# 检查是否是段落
if element.tag == qn('w:p'):
para = Paragraph(element, doc)
if paragraph_text in para.text:
found_paragraph = True
continue
# 如果找到了目标段落,查找下一个表格
if found_paragraph and element.tag == qn('w:tbl'):
# 找到表格,需要在doc.tables中找到对应的Table对象
# 通过比较XML元素来匹配
for table in doc.tables:
if table._element is element:
return table
return None
def find_table_by_title(doc: Document, title_text: str) -> Optional[Table]:
"""
查找包含指定标题文本的段落后的第一个表格。
标题通常在单独的段落中,表格紧跟在标题后。
Args:
doc: Document对象
title_text: 要查找的标题文本(部分匹配)
Returns:
找到的Table对象,如果未找到则返回None
"""
return find_table_after_paragraph(doc, title_text)
def remove_paragraph(paragraph: Paragraph):
"""安全删除段落。"""
p = paragraph._element
parent = p.getparent()
if parent is not None:
parent.remove(p)
def remove_element(element):
"""删除docx底层元素。"""
parent = element.getparent()
if parent is not None:
parent.remove(element)
def remove_highly_cited_section(doc: Document) -> bool:
"""
删除"ESI 高被引论文收录概况"段落及其后的表格。
同时清理正文中出现的"ESI 高被引论文"提示文字。
Returns:
bool: 如果找到并删除了目标段落,返回 True;否则返回 False
"""
target_texts = ["ESI 高被引论文收录概况", "ESI高被引论文收录概况"]
found_section = False
removed_table = False
# 删除段落及紧随的表格
for p in list(doc.paragraphs):
if any(t in p.text for t in target_texts):
found_section = True
# 记录其后的表格(若有)
# 查找与段落相邻的表格
next_el = p._element.getnext()
if next_el is not None and next_el.tag.endswith("tbl"):
remove_element(next_el)
removed_table = True
remove_paragraph(p)
break
# 清空其他出现的"ESI 高被引论文"提示文字
cleaned_count = 0
for p in list(doc.paragraphs):
# 预先检查段落文本,减少不必要的循环
if any(t in p.text for t in ["ESI 高被引论文", "ESI高被引论文"]):
for run in p.runs:
for t in ["ESI 高被引论文", "ESI高被引论文"]:
if t in run.text:
run.text = run.text.replace(t, "")
cleaned_count += 1
# 注意:如果关键词跨越了两个 Run,上述方法可能失效,但在当前场景下通常是在一起的。
# 这样修改可以保留原有的字体格式。
if found_section:
result_parts = ["已删除段落"]
if removed_table:
result_parts.append("及表格")
if cleaned_count > 0:
result_parts.append(f",清理了 {cleaned_count} 处提示文字")
logger.debug(" │ └─ %s", "".join(result_parts))
return True
else:
logger.debug(" │ └─ 未找到目标段落")
return False
def update_report_number(doc: Document, report_no: str) -> bool:
"""
将文档中的"报告编号"更新为文件夹名(如 C2026-0003-吴沁绿)。
优先匹配文档开头的段落(前3段),整段替换为"报告编号:{report_no}"。
如未找到,仅打印警告,不再扫描全文以避免误匹配。
Returns:
bool: 如果找到并更新了报告编号,返回 True;否则返回 False
"""
found = False
for p in doc.paragraphs[:3]:
if "报告编号" in p.text:
p.text = f"报告编号:{report_no}"
for run in p.runs:
run.font.name = "Times New Roman"
# 加粗
run.bold = True
# 四号字体
run.font.size = Pt(14)
found = True
break
if not found:
logger.warning(" │ └─ 未在文档开头找到'报告编号',期望编号:%s", report_no)
else:
logger.debug(" │ └─ 报告编号更新为:%s", report_no)
return found
def fill_table(doc: Document, data: List[List[str]]):
"""
将转置后的数据填入"ESI学科表现"段落后的表格:
行依次为 ESI学科、论文篇数、CNCI、Top10%论文比例;
第二列(索引 1)加粗。
"""
table = find_table_after_paragraph(doc, "ESI学科表现")
if table is None:
raise ValueError("文档中找不到'ESI学科表现'段落后的表格。")
logger.debug(" │ └─ 填充表格,列数:%d", len(data[0]) if data else 0)
expected_rows = ["ESI学科", "论文篇数", "CNCI", "Top10%论文比例"]
# 目标列数 = 数据列数 + 1(首列为行标题)
data_cols = max((len(row) for row in data), default=0)
desired_cols = data_cols + 1
current_cols = len(table.columns)
# 在修改表格之前,先保存前两列的原始宽度
saved_col0_w = table.columns[0].width if current_cols >= 1 else None
saved_col1_w = table.columns[1].width if current_cols >= 2 else None
# 列数不足则补齐,过多则从右侧裁掉
if current_cols < desired_cols:
# 使用最后一列的宽度作为新增列宽度的参考,若无则设一个默认值
last_width = table.columns[-1].width if current_cols else Inches(1.2)
for _ in range(desired_cols - current_cols):
table.add_column(int(last_width) if last_width else int(Inches(1.2)))
logger.info("表格列不足,已补齐至 %d 列", desired_cols)
elif current_cols > desired_cols:
grid = table._tbl.tblGrid
grid_cols = list(grid.iterchildren()) if grid is not None else []
for _ in range(current_cols - desired_cols):
remove_idx = len(table.columns) - 1
# 删除表格网格定义中的对应列
if grid is not None and remove_idx < len(grid_cols):
grid.remove(grid_cols[remove_idx])
# 从每一行删除对应单元格
for row in table.rows:
cells = row.cells
if remove_idx < len(cells):
row._tr.remove(cells[remove_idx]._tc)
logger.info("表格列过多,已裁剪至 %d 列", desired_cols)
# 行数不足则补齐
if len(table.rows) < len(data):
for _ in range(len(data) - len(table.rows)):
table.add_row()
# 补齐首列行标题
for row_idx, row_name in enumerate(expected_rows):
if row_idx < len(table.rows):
# 行标题居首列,加粗以与数据区分
set_cell(table.cell(row_idx, 0), row_name, bold=True)
# data: 4 行 × N 列
for row_idx, row_data in enumerate(data):
for col_offset, value in enumerate(row_data, start=1):
if col_offset >= len(table.columns):
break
bold = col_offset == 1 # 第二列加粗
set_cell(table.cell(row_idx, col_offset), value, bold=bold)
# 使用 autosize_table_columns 设置列宽:前两列保持原宽度,第三列及之后均分剩余宽度
if len(table.columns) >= 3 and doc.sections:
section = doc.sections[0]
usable_width = section.page_width - section.left_margin - section.right_margin
# 确定前两列的宽度
col0_w = int(saved_col0_w) if saved_col0_w else int(Inches(1.2))
col1_w = int(saved_col1_w) if saved_col1_w else int(Inches(1.5))
# 计算第三列及之后的列数,设置权重为1(均分)
remain_cols = len(table.columns) - 2
weights = [1, 1] + [1] * remain_cols # 前两列权重用于固定宽度,实际不使用
# 设置固定宽度
fixed_widths = {0: col0_w, 1: col1_w}
# 调用 autosize_table_columns
autosize_table_columns(table, target_width=usable_width,
weights=weights, fixed_widths=fixed_widths)
logger.debug(" │ └─ 表格填充完成")
def normalize_title(title: str) -> str:
"""标准化标题,用于模糊匹配:去除标点、空格,转小写"""
if not title:
return ""
# 去除标点符号和空格
normalized = re.sub(r'[^\w]', '', title.lower())
return normalized
def fuzzy_match_title(query_title: str, title_list: List[str], threshold: float = 0.7) -> Optional[int]:
"""
模糊匹配标题,返回最佳匹配的索引。
使用简单的包含匹配和相似度计算。
"""
if not query_title:
return None
query_norm = normalize_title(query_title)
if not query_norm:
return None
best_match_idx = None
best_score = 0.0
for idx, title in enumerate(title_list):
if not title:
continue
title_norm = normalize_title(str(title))
if not title_norm:
continue
# 完全匹配
if query_norm == title_norm:
return idx
# 包含匹配
if query_norm in title_norm or title_norm in query_norm:
# 计算相似度(较短的字符串在较长的字符串中的比例)
shorter = min(len(query_norm), len(title_norm))
longer = max(len(query_norm), len(title_norm))
score = shorter / longer if longer > 0 else 0
if score > best_score and score >= threshold:
best_score = score
best_match_idx = idx
return best_match_idx
def read_sustech_xlsx(file_path: Path) -> Dict[str, str]:
"""
读取SUSTECH开头的XLSX或CSV文件,建立标题到收录号的映射。
支持XLSX和CSV格式。
返回:{标题: 收录号} 的字典
"""
try:
# 根据文件扩展名选择读取方式
if file_path.suffix.lower() == ".xlsx":
df = pd.read_excel(file_path, engine="openpyxl")
else:
# 假设是CSV文件
df = pd.read_csv(file_path, encoding="utf-8-sig")
title_col = '标题'
accession_col = '收录号'
if title_col not in df.columns or accession_col not in df.columns:
logger.warning("文件缺少必要的列(标题、收录号): %s", file_path)
return {}
mapping = {}
for _, row in df.iterrows():
title = str(row[title_col]).strip() if pd.notna(row[title_col]) else ""
accession = str(row[accession_col]).strip() if pd.notna(row[accession_col]) else ""
if title and accession:
# 优先保留 WOS: 开头的收录号,避免被 Scopus 等格式覆盖
existing = mapping.get(title, "")
if not existing or (accession.startswith("WOS:") and not existing.startswith("WOS:")):
mapping[title] = accession
file_type = "XLSX" if file_path.suffix.lower() == ".xlsx" else "CSV"
logger.debug(" │ └─ 从 %s 读取标题-收录号映射 %d 条:%s", file_type, len(mapping), format_path(file_path))
return mapping
except Exception as e:
logger.error(" │ └─ 读取文件失败: %s,错误:%s", format_path(file_path), e)
return {}
def read_wos_mappings(csv_path: Path) -> tuple:
"""
读取 WOS CSV/XLSX,一次性返回两个映射:
- accession_to_cnci: {入藏号: CNCI}
- title_to_cnci: {归一化标题: CNCI},用于兜底匹配
"""
try:
df = _load_wos_df(csv_path)
accession_col, title_col, cnci_col = '入藏号', '论文标题', '学科规范化的引文影响力'
if accession_col not in df.columns or cnci_col not in df.columns:
logger.warning("WOS 文件缺少必要列: %s", csv_path)
return {}, {}
acc_map: Dict[str, str] = {}
title_map: Dict[str, str] = {}
for _, row in df.iterrows():
cnci = row[cnci_col] if pd.notna(row[cnci_col]) else None
if cnci is None:
continue
try:
cnci_str = format_cnci(str(float(cnci)))
except (ValueError, TypeError):
cnci_str = str(cnci)
accession = str(row[accession_col]).strip() if pd.notna(row[accession_col]) else ""
if accession:
acc_map[accession] = cnci_str
if title_col in df.columns:
title = str(row[title_col]).strip() if pd.notna(row[title_col]) else ""
if title:
title_map[normalize_title(title)] = cnci_str
logger.debug(" │ └─ WOS 映射:入藏号 %d 条,标题 %d 条:%s",
len(acc_map), len(title_map), format_path(csv_path))
return acc_map, title_map
except Exception as e:
logger.error(" │ └─ 读取 WOS 文件失败: %s,错误:%s", format_path(csv_path), e)
return {}, {}
def find_paper_table(doc: Document) -> Optional[Table]:
"""
查找论文检索情况统计表。
通过查找"论文检索情况统计表"标题段落后的表格来定位。
"""
table = find_table_by_title(doc, "论文检索情况统计表")
if table is None:
# 如果通过标题找不到,回退到通过表头查找
for table in doc.tables:
if len(table.rows) == 0 or len(table.columns) == 0:
continue
# 检查表头
header_row = table.rows[0]
header_texts = [cell.text.strip() for cell in header_row.cells]
if '文献标题' in header_texts and 'CNCI' in header_texts:
return table
return None
return table
def autosize_table_columns(table: Table, target_width: Optional[float] = None,
weights: Optional[List[int]] = None,
fixed_widths: Optional[Dict[int, int]] = None,
min_width: Optional[float] = None) -> None:
"""
根据权重或固定宽度分配列宽。
Args:
table: 要调整的表格
target_width: 目标总宽度(EMU单位或长度对象),如果为None则使用页面可用宽度
weights: 列宽度权重列表,如果为None则根据表头自动识别
fixed_widths: 字典 {列索引: 宽度(EMU)},指定某些列的固定宽度
min_width: 最小列宽(EMU单位或长度对象),默认约0.3英寸
宽度分配策略(当weights为None时):
- 第1列(序号):较窄(权重1)
- 第2列(文献标题):最宽(权重6)
- 第3列(文献来源):第二宽(权重4)
- 其余列:等宽(权重2)
"""
if not table.columns:
return
num_cols = len(table.columns)
if num_cols == 0:
return
# 关闭自动适应
table.allow_autofit = False
# 确定目标总宽度
if target_width is None:
# 使用默认宽度
target_width = Inches(7.0)
target_width_int = int(target_width)
# 确定最小宽度
if min_width is None:
min_width = Inches(0.3)
min_width_int = int(min_width)
# 确定权重
if weights is None:
# 读取表头以识别列类型
header_row = table.rows[0]
header_texts = [cell.text.strip() for cell in header_row.cells]
# 根据列标题分配权重
weights = []
for idx, header in enumerate(header_texts):
if idx == 0 or '序号' in header:
weights.append(1)
elif '文献标题' in header or '标题' in header:
weights.append(6)
elif '文献来源' in header or '来源' in header:
weights.append(4)
else:
weights.append(2)
# 如果没有识别到特殊列,使用默认策略
if num_cols >= 3 and all(w == 2 for w in weights):
weights[0] = 1
weights[1] = 6
weights[2] = 4
else:
# 确保权重数量匹配列数
if len(weights) < num_cols:
weights.extend([2] * (num_cols - len(weights)))
elif len(weights) > num_cols:
weights = weights[:num_cols]
# 处理固定宽度
fixed_widths = fixed_widths or {}
fixed_total = sum(fixed_widths.values())
remaining_width = max(0, target_width_int - fixed_total)
# 计算需要按权重分配的列
flexible_cols = [i for i in range(num_cols) if i not in fixed_widths]
if flexible_cols:
flexible_weights = [weights[i] for i in flexible_cols]
total_weight = sum(flexible_weights)
# 按权重分配剩余宽度
col_widths = [0] * num_cols
for idx in flexible_cols:
weight = weights[idx]
width = int((weight / total_weight) * remaining_width) if total_weight > 0 else remaining_width // len(flexible_cols)
width = max(width, min_width_int)
col_widths[idx] = width
# 处理取整误差
current_flexible_total = sum(col_widths[i] for i in flexible_cols)
diff = remaining_width - current_flexible_total
if diff != 0 and flexible_cols:
# 将误差加到权重最大的列
max_weight_idx = max(flexible_cols, key=lambda i: weights[i])
col_widths[max_weight_idx] += diff
else:
col_widths = [0] * num_cols
# 应用固定宽度
for col_idx, width in fixed_widths.items():
if 0 <= col_idx < num_cols:
col_widths[col_idx] = int(width)
# 应用宽度
for col_idx, column in enumerate(table.columns):
width = col_widths[col_idx]
column.width = width
for cell in column.cells:
cell.width = width
logger.debug(" │ └─ 自动调整列宽完成,列数:%d,总宽:%d EMU,权重:%s,固定宽度:%s",
num_cols, sum(col_widths), weights, fixed_widths)
def fill_cnci_column(doc: Document, record_path: Path, wos_path: Path) -> tuple[int, int]:
"""
填充论文检索情况统计表的CNCI列。
通过标题模糊匹配到收录号,再通过收录号匹配到CNCI。
Returns:
tuple[int, int]: (匹配数量, 总行数)
"""
# 查找论文检索情况统计表
table = find_paper_table(doc)
if not table:
logger.warning(" │ └─ ✗ 未找到论文检索情况统计表")
return (0, 0)
# 找到文献标题和CNCI列的索引
header_row = table.rows[0]
header_texts = [cell.text.strip() for cell in header_row.cells]
try:
title_col_idx = header_texts.index('文献标题')
cnci_col_idx = header_texts.index('CNCI')
except ValueError:
logger.warning(" │ └─ ✗ 表格缺少'文献标题'或'CNCI'列")
return (0, 0)
# 读取SUSTECH XLSX文件
title_to_accession = read_sustech_xlsx(record_path)
if not title_to_accession:
logger.warning(" │ └─ ✗ 无法读取标题-收录号映射")
return (0, 0)
accession_to_cnci, wos_title_to_cnci = read_wos_mappings(wos_path)
if not accession_to_cnci:
logger.warning(" │ └─ ✗ 无法读取入藏号-CNCI 映射")
return (0, 0)
# 准备标题列表用于模糊匹配
xlsx_titles = list(title_to_accession.keys())
# 填充CNCI列
matched_count = 0
total_rows = len(table.rows) - 1 # 减去表头
for row_idx in range(1, len(table.rows)): # 跳过表头
row = table.rows[row_idx]
if title_col_idx >= len(row.cells) or cnci_col_idx >= len(row.cells):
continue
title_cell = row.cells[title_col_idx]
cnci_cell = row.cells[cnci_col_idx]
title = title_cell.text.strip()
if not title:
continue
# 尝试精确匹配
accession = title_to_accession.get(title)
# 如果精确匹配失败,尝试模糊匹配
if not accession:
match_idx = fuzzy_match_title(title, xlsx_titles)
if match_idx is not None:
matched_title = xlsx_titles[match_idx]
accession = title_to_accession.get(matched_title)
# 通过收录号获取CNCI
cnci_value = "/"
if accession:
cnci_value = accession_to_cnci.get(accession, "/")
# 兜底:直接用标题匹配 WOS CSV 的论文标题
if cnci_value == "/" and wos_title_to_cnci:
title_norm = normalize_title(title)
cnci_value = wos_title_to_cnci.get(title_norm, "/")
# 设置CNCI单元格
set_cell(cnci_cell, cnci_value)
if cnci_value != "/":
matched_count += 1
autosize_table_columns(table)
logger.debug(" │ └─ CNCI 列填充完成,成功匹配:%d/%d 条", matched_count, total_rows)
return (matched_count, total_rows)
def read_savedrecs_stats(xlsx_path: Path) -> Dict[str, Any]:
"""
读取savedrecs.xlsx文件,提取统计数据。
查找A列中的特定文本,获取其后的数值。
返回字典,包含:
- paper_count: 论文篇数 (Results found)
- total_citations: 总被引次数 (Sum of the Times Cited, B列)
- other_citations: 他引次数 (Sum of the Times Cited, C列)
- avg_citations: 平均被引次数 (Average Citations per Item)
- h_index: H指数 (h-index)
"""
logger.debug(" │ └─ 读取 savedrecs.xlsx 统计数据:%s", format_path(xlsx_path))
try:
# 使用zipfile和xml解析,避免pandas可能的崩溃问题
with zipfile.ZipFile(xlsx_path) as z:
# 读取共享字符串
shared_strings = []
try:
sst_xml = z.read('xl/sharedStrings.xml')
sst_root = ET.fromstring(sst_xml)
ns = {'a': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'}
for si in sst_root.findall('.//a:si', ns):
texts = []
for t in si.findall('.//a:t', ns):
texts.append(t.text or '')
shared_strings.append(''.join(texts))
except KeyError:
pass # 如果没有共享字符串表,使用内联字符串
# 读取工作表
sheet_xml = z.read('xl/worksheets/sheet1.xml')
sheet_root = ET.fromstring(sheet_xml)
ns = {'a': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'}
def get_cell_value(cell_elem):
"""获取单元格的值"""
t = cell_elem.get('t')
v_elem = cell_elem.find('a:v', ns)
if v_elem is None:
return None
val = v_elem.text
if t == 's': # 共享字符串
try:
idx = int(val)
return shared_strings[idx] if idx < len(shared_strings) else None
except (ValueError, IndexError):
return None
return val
# 构建单元格映射 (A1 -> value)
cell_map = {}
for row in sheet_root.findall('.//a:sheetData/a:row', ns):
for cell in row.findall('a:c', ns):
cell_ref = cell.get('r', '')
if cell_ref:
cell_map[cell_ref] = get_cell_value(cell)
# 查找目标值
stats = {}
# Results found -> B列的值
for ref, val in cell_map.items():
if val and isinstance(val, str) and 'Results found' in val:
# 获取B列的值(同一行)
row_num = ''.join([c for c in ref if c.isdigit()])
col_letter = ''.join([c for c in ref if c.isalpha()])
if col_letter == 'A' and row_num:
b_ref = 'B' + row_num
if b_ref in cell_map:
try:
stats['paper_count'] = int(float(cell_map[b_ref]))
except (ValueError, TypeError):
pass
# Sum of the Times Cited -> B列和C列的值
for ref, val in cell_map.items():
if val and isinstance(val, str) and 'Sum of the Times Cited' in val:
row_num = ''.join([c for c in ref if c.isdigit()])
if row_num:
b_ref = 'B' + row_num
c_ref = 'C' + row_num
if b_ref in cell_map:
try:
stats['total_citations'] = int(float(cell_map[b_ref]))
except (ValueError, TypeError):
pass
if c_ref in cell_map:
try:
stats['other_citations'] = int(float(cell_map[c_ref]))
except (ValueError, TypeError):
pass
# Average Citations per Item -> B列的值
for ref, val in cell_map.items():
if val and isinstance(val, str) and 'Average Citations per Item' in val:
row_num = ''.join([c for c in ref if c.isdigit()])
if row_num:
b_ref = 'B' + row_num
if b_ref in cell_map:
try:
stats['avg_citations'] = float(cell_map[b_ref])
except (ValueError, TypeError):
pass
# h-index -> B列的值
for ref, val in cell_map.items():
if val and isinstance(val, str) and 'h-index' in val:
row_num = ''.join([c for c in ref if c.isdigit()])
if row_num:
b_ref = 'B' + row_num
if b_ref in cell_map:
try:
stats['h_index'] = int(float(cell_map[b_ref]))
except (ValueError, TypeError):
pass
logger.debug(" │ └─ 统计数据读取完成:%s", stats)
return stats
except Exception as e:
logger.error(" │ └─ 读取失败,错误:%s", e)
return {}
def fill_scie_table(doc: Document, stats: Dict[str, Any]):
"""
填充"SCIE收录引用概况"(若缺则回退到"SSCI收录引用概况")段落后的表格。
表格结构:
- 第一行:论文篇数 | 引用情况(跨4列) | JCR分区(跨2列) | 作者类型(跨3列) | 高被引论文篇数
- 第二行:论文篇数 | 总被引次数 | 他引次数 | 平均被引次数 | H指数 | Q1篇数 | Q2篇数 | 第一篇数 | 共同第一篇数 | 通讯篇数 | 高被引论文篇数
- 第三行:数据行(需要填充)
"""
# 查找表格
table = find_table_after_paragraph(doc, "SCIE收录引用概况")
table_label = "SCIE收录引用概况"
if table is None:
table = find_table_after_paragraph(doc, "SSCI收录引用概况")
table_label = "SSCI收录引用概况" if table is not None else table_label
if table is None:
logger.warning("未找到 'SCIE/SSCI 收录引用概况' 表格,跳过填充")
return
# 检查表格结构
if len(table.rows) < 3:
logger.warning("表格行数不足,需要至少3行")
return
# 最后一行是数据行
data_row = table.rows[-1]
# 填充数据(根据列索引)
# 列0: 论文篇数
if 'paper_count' in stats:
set_cell(data_row.cells[0], str(stats['paper_count']))
# 列1: 总被引次数
if 'total_citations' in stats:
set_cell(data_row.cells[1], str(stats['total_citations']))
# 列2: 他引次数
if 'other_citations' in stats:
set_cell(data_row.cells[2], str(stats['other_citations']))
# 列3: 平均被引次数
if 'avg_citations' in stats:
avg_str = f"{stats['avg_citations']:.2f}"
set_cell(data_row.cells[3], avg_str)
# 列4: H指数
if 'h_index' in stats:
set_cell(data_row.cells[4], str(stats['h_index']))
# 注意:Q1篇数、Q2篇数、第一篇数、共同第一篇数、通讯篇数、高被引论文篇数
# 这些数据需要从其他来源获取,目前只填充从savedrecs.xlsx读取的数据
stats_summary = []
if 'paper_count' in stats:
stats_summary.append(f"论文: {stats['paper_count']}")
if 'total_citations' in stats:
stats_summary.append(f"总被引: {stats['total_citations']}")
if 'h_index' in stats:
stats_summary.append(f"H指数: {stats['h_index']}")
logger.debug(" │ └─ 填充 %s 完成 (%s)", table_label, ", ".join(stats_summary))
def process_folder(folder: Path, index: int = 0, total: int = 0):
"""处理单个文件夹,生成学术产出报告。"""
global _current_folder
_current_folder = folder
# 显示进度
if total > 0:
logger.info("[%d/%d] 开始处理:%s", index, total, folder.name)
else:
logger.info("开始处理:%s", folder.name)
docx_files = sorted(folder.glob("SUSTECH*.docx"))
if not docx_files:
# Fallback: any .docx that isn't the output file (named after the folder)
fallback = sorted(f for f in folder.glob("*.docx") if f.stem != folder.name)
if fallback:
logger.info(" ├─ 未找到 SUSTECH*.docx,使用:%s", fallback[0].name)
docx_files = fallback
else:
logger.warning(" └─ 未找到模板 docx,跳过")
return
# 步骤1:复制模板为目标 docx
src_docx = docx_files[0]
dst_docx = folder / f"{folder.name}.docx"
shutil.copy(src_docx, dst_docx)
logger.info(" ├─ 复制模板:%s → %s", format_path(src_docx), format_path(dst_docx))
doc = Document(dst_docx)
# 步骤2:删除"ESI 高被引论文收录概况"段落及表格
logger.info(" ├─ 移除 ESI 高被引论文收录概况")
removed = remove_highly_cited_section(doc)
if removed:
logger.info(" │ └─ ✓ 已删除")
else:
logger.warning(" │ └─ ✗ 未找到目标段落")
# 步骤3:更新报告编号为文件夹名(格式为"年份-编号-姓名",取编号部分)
name_parts = folder.name.split("-")
report_no = f"{name_parts[0]}-{name_parts[1]}" if len(name_parts) >= 2 else folder.name
logger.info(" ├─ 更新报告编号")
updated = update_report_number(doc, report_no)
if not updated:
logger.warning(" │ └─ ✗ 未找到报告编号段落")
# 步骤4:填充 ESI 学科表
csv_path = find_csv_or_xlsx(folder / "Incites 研究领域.csv")
logger.info(" ├─ 填充 ESI 学科表现表")
if not csv_path.exists():
# 模糊匹配:包含 incites / 研究,排除带 t 的文件(csv 和 xlsx 均可)
alt = find_file_by_keywords(folder, ["incites", "研究"], exclude=["t.csv", "t.xlsx", "t "])
if alt:
logger.info(" │ └─ 使用模糊匹配文件:%s", format_path(alt))
csv_path = alt
if csv_path.exists():
data = read_csv_block(csv_path)
if data:
fill_table(doc, data)
logger.info(" │ └─ ✓ 填充完成 (%d 列)", len(data[0]) if data else 0)
else:
logger.warning(" │ └─ ✗ 文件数据为空")
else:
logger.warning(" │ └─ ✗ 未找到 ESI 学科文件")
# 步骤5:生成历年论文产出表现图并插入
timeseries_csv = find_csv_or_xlsx(folder / "Incites 研究领域t.csv")
logger.info(" ├─ 生成历年产出图表")
if not timeseries_csv.exists():
# 模糊匹配:包含 incites / t(csv 和 xlsx 均可)
alt_ts = find_file_by_keywords(folder, ["incites", "t"])
if alt_ts:
logger.info(" │ └─ 使用模糊匹配文件:%s", format_path(alt_ts))
timeseries_csv = alt_ts
if timeseries_csv.exists():
ts_data = read_timeseries(timeseries_csv)
img_path = folder / "timeseries_chart.png"
if build_chart(ts_data, img_path):
insert_chart(doc, img_path)
logger.info(" │ └─ ✓ 图表生成并插入完成")
else:
logger.warning(" │ └─ ✗ 历年数据为空,未生成图表")
else:
logger.warning(" │ └─ ✗ 缺少历年产出 CSV")
# 步骤6:填充论文检索情况统计表的 CNCI 列
record_files = sorted(folder.glob("SUSTECH*.xlsx"))
logger.info(" ├─ 填充 CNCI 列")
if not record_files:
logger.warning(" │ └─ ✗ 未找到记录 XLSX")
else:
record_path = record_files[0]
wos_path = find_csv_or_xlsx(folder / "Web of Science Documents.csv")
if record_path.exists() and wos_path.exists():
matched_count, total_rows = fill_cnci_column(doc, record_path, wos_path)
if matched_count > 0:
logger.info(" │ └─ ✓ 填充完成 (匹配: %d/%d)", matched_count, total_rows)
else:
logger.warning(" │ └─ ✗ 未匹配到任何数据")
else:
logger.warning(" │ └─ ✗ 缺少记录 XLSX 或 Web of Science Documents.csv")
# 步骤7:填充 SCIE 收录引用概况表格
savedrecs_xlsx = folder / "savedrecs.xlsx"
logger.info(" ├─ 填充 SCIE/SSCI 收录引用概况")
if savedrecs_xlsx.exists():
stats = read_savedrecs_stats(savedrecs_xlsx)
if stats:
fill_scie_table(doc, stats)
stats_summary = []
if 'paper_count' in stats:
stats_summary.append(f"论文: {stats['paper_count']}")
if 'total_citations' in stats:
stats_summary.append(f"总被引: {stats['total_citations']}")
if 'h_index' in stats:
stats_summary.append(f"H指数: {stats['h_index']}")
logger.info(" │ └─ ✓ 填充完成 (%s)", ", ".join(stats_summary))
else:
logger.warning(" │ └─ ✗ 无法读取统计数据")
else:
logger.warning(" │ └─ ✗ 未找到 savedrecs.xlsx")
# python-docx bug: next_id() only scans document.xml, causing wp:docPr id
# collisions with header/footer shapes → Word "unreadable content" error
counter = 1
parts_to_fix = [doc.part] + list(doc.part.related_parts.values())
for part in parts_to_fix:
if hasattr(part, '_element'):
for docPr in part._element.iter(qn('wp:docPr')):
docPr.set('id', str(counter))
counter += 1
doc.save(dst_docx)
logger.info(" └─ ✓ 处理完成:%s", format_path(dst_docx))
def main():
configure_logging()
logger.info("开始批量处理学术产出报告")
base = Path(__file__).resolve().parent
# 先统计需要处理的文件夹数量
folders = [f for f in sorted(base.iterdir()) if f.is_dir() and f.name.startswith("C")]
total = len(folders)
if total == 0:
logger.warning("未找到需要处理的文件夹(以 C 开头)")
return
logger.info("找到 %d 个文件夹需要处理\n", total)
# 逐个处理
for idx, folder in enumerate(folders, start=1):
process_folder(folder, index=idx, total=total)
if idx < total:
logger.info("") # 添加空行分隔
logger.info("批量处理完成,共处理 %d 个文件夹", total)
if __name__ == "__main__":
main()