#!/usr/bin/env python3 """标题搜索兜底:对 arXiv-DOI 未命中的论文,用 OpenAlex 标题搜索找回 topics。 - 老论文优先(OpenAlex 更可能已收录,配额效率高)。 - 标题验证 Jaccard>=0.7 才算命中,避免误配。 - 命中并入 openalex_topics_map.json 的 matched;每个 id 记入 title_tried(至多搜一次)。 - 按当日剩余配额设预算;跑完用 enrich_topics.py --merge-only 重新合并。 可多日续跑:每天额度重置后再跑一次即可。 """ import json import os import re import sys import time import threading import urllib.error import urllib.parse import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed SRC = "arxiv_cs_2022_2026.topics.jsonl" MAP = "openalex_topics_map.json" MAILTO = "elfsong@outlook.sg" WORKERS = 6 SIM_TH = 0.7 BUDGET_BUFFER = 300 # 给配额留点缓冲,别打到硬 429 _lock = threading.Lock() def norm(s): return re.sub(r"[^a-z0-9]+", " ", (s or "").lower()).strip() def jacc(a, b): sa, sb = set(norm(a).split()), set(norm(b).split()) return len(sa & sb) / len(sa | sb) if sa | sb else 0.0 def quota_remaining(): url = ("https://api.openalex.org/works?filter=doi:10.48550/arxiv.2310.06825" f"&select=id&mailto={MAILTO}") req = urllib.request.Request(url, headers={"User-Agent": MAILTO}) with urllib.request.urlopen(req, timeout=30) as r: return int(r.headers.get("x-ratelimit-remaining", "0")) def load_map(): d = json.load(open(MAP)) if os.path.exists(MAP) else {} return d.get("matched", {}), set(d.get("title_tried", [])) def save_map(matched, tried): tmp = MAP + ".tmp" json.dump({"matched": matched, "title_tried": sorted(tried)}, open(tmp, "w")) os.replace(tmp, MAP) def search_one(aid, title): """返回 ('hit', topic_obj) / ('nohit', None) / ('quota', None)。""" q = urllib.parse.urlencode({ "filter": "title.search:" + title, "select": "id,title,primary_topic,topics", "per-page": 1, "mailto": MAILTO}) url = "https://api.openalex.org/works?" + q for attempt in range(4): try: req = urllib.request.Request(url, headers={"User-Agent": MAILTO}) res = json.load(urllib.request.urlopen(req, timeout=40)).get("results", []) if res and jacc(title, res[0].get("title")) >= SIM_TH: w = res[0] return "hit", { "primary": (w.get("primary_topic") or {}).get("display_name"), "topics": [t["display_name"] for t in (w.get("topics") or [])], } return "nohit", None except urllib.error.HTTPError as e: if e.code == 429: return "quota", None time.sleep(3 * (attempt + 1)) except Exception: time.sleep(3 * (attempt + 1)) return "nohit", None def main(): matched, tried = load_map() rows = [json.loads(l) for l in open(SRC)] targets = [(d["year"], d["id"], d["title"]) for d in rows if d.get("openalex_primary_topic") is None and d["id"] not in tried] targets.sort(key=lambda x: (x[0], x[1])) # 老论文优先 rem = quota_remaining() budget = max(0, rem - BUDGET_BUFFER) todo = targets[:budget] print(f"unmatched&untried={len(targets)} | quota_remaining={rem} | " f"budget={budget} | running={len(todo)}", flush=True) if not todo: return done = hits = 0 stop = threading.Event() with ThreadPoolExecutor(max_workers=WORKERS) as ex: futs = {ex.submit(search_one, aid, title): aid for _, aid, title in todo} for fut in as_completed(futs): aid = futs[fut] status, obj = fut.result() with _lock: if status == "quota": stop.set() else: tried.add(aid) if status == "hit": matched[aid] = obj hits += 1 done += 1 if done % 100 == 0: save_map(matched, tried) print(f" {done}/{len(todo)} | new_hits={hits} " f"matched_total={len(matched)}", flush=True) if stop.is_set(): break save_map(matched, tried) print(f"FALLBACK DONE searched={done} new_hits={hits} " f"matched_total={len(matched)} quota_hit={stop.is_set()}", flush=True) if __name__ == "__main__": main()