paper_lifecycle / topic_fallback.py
elfsong
Add quarterly topic hype-cycle pipeline and slider visualization
82f767a
Raw
History Blame Contribute Delete
4.55 kB
#!/usr/bin/env python3
"""标题搜索兜底:对 arXiv-DOI 未命中的论文,用 OpenAlex 标题搜索找回 topics。
- 老论文优先(OpenAlex 更可能已收录,配额效率高)。
- 标题验证 Jaccard>=0.7 才算命中,避免误配。
- 命中并入 openalex_topics_map.json 的 matched;每个 id 记入 title_tried(至多搜一次)。
- 按当日剩余配额设预算;跑完用 enrich_topics.py --merge-only 重新合并。
可多日续跑:每天额度重置后再跑一次即可。
"""
import json
import os
import re
import sys
import time
import threading
import urllib.error
import urllib.parse
import urllib.request
from concurrent.futures import ThreadPoolExecutor, as_completed
SRC = "arxiv_cs_2022_2026.topics.jsonl"
MAP = "openalex_topics_map.json"
MAILTO = "elfsong@outlook.sg"
WORKERS = 6
SIM_TH = 0.7
BUDGET_BUFFER = 300 # 给配额留点缓冲,别打到硬 429
_lock = threading.Lock()
def norm(s):
return re.sub(r"[^a-z0-9]+", " ", (s or "").lower()).strip()
def jacc(a, b):
sa, sb = set(norm(a).split()), set(norm(b).split())
return len(sa & sb) / len(sa | sb) if sa | sb else 0.0
def quota_remaining():
url = ("https://api.openalex.org/works?filter=doi:10.48550/arxiv.2310.06825"
f"&select=id&mailto={MAILTO}")
req = urllib.request.Request(url, headers={"User-Agent": MAILTO})
with urllib.request.urlopen(req, timeout=30) as r:
return int(r.headers.get("x-ratelimit-remaining", "0"))
def load_map():
d = json.load(open(MAP)) if os.path.exists(MAP) else {}
return d.get("matched", {}), set(d.get("title_tried", []))
def save_map(matched, tried):
tmp = MAP + ".tmp"
json.dump({"matched": matched, "title_tried": sorted(tried)}, open(tmp, "w"))
os.replace(tmp, MAP)
def search_one(aid, title):
"""返回 ('hit', topic_obj) / ('nohit', None) / ('quota', None)。"""
q = urllib.parse.urlencode({
"filter": "title.search:" + title,
"select": "id,title,primary_topic,topics", "per-page": 1, "mailto": MAILTO})
url = "https://api.openalex.org/works?" + q
for attempt in range(4):
try:
req = urllib.request.Request(url, headers={"User-Agent": MAILTO})
res = json.load(urllib.request.urlopen(req, timeout=40)).get("results", [])
if res and jacc(title, res[0].get("title")) >= SIM_TH:
w = res[0]
return "hit", {
"primary": (w.get("primary_topic") or {}).get("display_name"),
"topics": [t["display_name"] for t in (w.get("topics") or [])],
}
return "nohit", None
except urllib.error.HTTPError as e:
if e.code == 429:
return "quota", None
time.sleep(3 * (attempt + 1))
except Exception:
time.sleep(3 * (attempt + 1))
return "nohit", None
def main():
matched, tried = load_map()
rows = [json.loads(l) for l in open(SRC)]
targets = [(d["year"], d["id"], d["title"]) for d in rows
if d.get("openalex_primary_topic") is None and d["id"] not in tried]
targets.sort(key=lambda x: (x[0], x[1])) # 老论文优先
rem = quota_remaining()
budget = max(0, rem - BUDGET_BUFFER)
todo = targets[:budget]
print(f"unmatched&untried={len(targets)} | quota_remaining={rem} | "
f"budget={budget} | running={len(todo)}", flush=True)
if not todo:
return
done = hits = 0
stop = threading.Event()
with ThreadPoolExecutor(max_workers=WORKERS) as ex:
futs = {ex.submit(search_one, aid, title): aid for _, aid, title in todo}
for fut in as_completed(futs):
aid = futs[fut]
status, obj = fut.result()
with _lock:
if status == "quota":
stop.set()
else:
tried.add(aid)
if status == "hit":
matched[aid] = obj
hits += 1
done += 1
if done % 100 == 0:
save_map(matched, tried)
print(f" {done}/{len(todo)} | new_hits={hits} "
f"matched_total={len(matched)}", flush=True)
if stop.is_set():
break
save_map(matched, tried)
print(f"FALLBACK DONE searched={done} new_hits={hits} "
f"matched_total={len(matched)} quota_hit={stop.is_set()}", flush=True)
if __name__ == "__main__":
main()