Spaces:
Running
Running
| """fetcher.py — Fetch review data from bundle or OpenReview API (via openreview-py).""" | |
| import json | |
| import os | |
| import time | |
| import gzip | |
| from typing import Optional | |
| _BUNDLE_PATH = os.path.join(os.path.dirname(__file__), "bundle.json.gz") | |
| _bundle: dict = {} | |
| _bundle_loaded = False | |
| _or_client = None | |
| _or_client_init = False | |
| def _get_or_client(): | |
| """Get openreview client, authenticated if credentials available.""" | |
| global _or_client, _or_client_init | |
| if _or_client_init: | |
| return _or_client | |
| _or_client_init = True | |
| username = os.environ.get("OPENREVIEW_USERNAME", "") | |
| password = os.environ.get("OPENREVIEW_PASSWORD", "") | |
| try: | |
| import openreview | |
| if username and password: | |
| try: | |
| _or_client = openreview.api.OpenReviewClient( | |
| baseurl="https://api2.openreview.net", | |
| username=username, | |
| password=password, | |
| ) | |
| print(f"[fetcher] OpenReview authenticated as {username}") | |
| except Exception as e: | |
| print(f"[fetcher] Auth failed ({e}), trying anonymous...") | |
| _or_client = openreview.api.OpenReviewClient( | |
| baseurl="https://api2.openreview.net", | |
| ) | |
| print("[fetcher] OpenReview anonymous client ready") | |
| else: | |
| _or_client = openreview.api.OpenReviewClient( | |
| baseurl="https://api2.openreview.net", | |
| ) | |
| print("[fetcher] OpenReview anonymous client (no credentials set)") | |
| except Exception as e: | |
| print(f"[fetcher] openreview-py init failed: {e}") | |
| _or_client = None | |
| return _or_client | |
| def _load_bundle(): | |
| global _bundle, _bundle_loaded | |
| if _bundle_loaded: | |
| return | |
| _bundle_loaded = True | |
| if not os.path.exists(_BUNDLE_PATH): | |
| print(f"[fetcher] bundle not found at {_BUNDLE_PATH}") | |
| return | |
| size = os.path.getsize(_BUNDLE_PATH) | |
| if size < 1024: | |
| print(f"[fetcher] bundle looks like LFS pointer ({size} bytes), trying hf_hub download...") | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| hf_hub_download( | |
| repo_id="nuojohnchen/Kahneman4Review", | |
| filename="bundle.json.gz", | |
| repo_type="space", | |
| local_dir=os.path.dirname(_BUNDLE_PATH), | |
| ) | |
| except Exception as e: | |
| print(f"[fetcher] hf_hub download failed: {e}") | |
| return | |
| try: | |
| with gzip.open(_BUNDLE_PATH, "rt", encoding="utf-8") as f: | |
| raw = json.load(f) | |
| for pid, p in raw.items(): | |
| # Deduplicate: keep first occurrence per reviewer_id, exclude Authors | |
| seen_reviewers = set() | |
| clean_reviews = [] | |
| for r in p.get("reviews", []): | |
| rid = r["r"] | |
| if rid == "Authors": | |
| continue | |
| if rid in seen_reviewers: | |
| continue | |
| seen_reviewers.add(rid) | |
| clean_reviews.append({ | |
| "reviewer_id": rid, | |
| "review_content": r["c"], | |
| "initial_rating": r["i"], | |
| "final_rating": r["f"], | |
| }) | |
| _bundle[pid] = { | |
| "title": p.get("title", pid), | |
| "conference": p.get("conf", ""), | |
| "metareview": p.get("metareview", ""), | |
| "decision": p.get("decision", ""), | |
| "reviews": clean_reviews, | |
| } | |
| print(f"[fetcher] bundle loaded: {len(_bundle)} papers") | |
| except Exception as e: | |
| print(f"[fetcher] bundle load error: {e}") | |
| def _extract_field(content: dict, *keys) -> str: | |
| for key in keys: | |
| val = content.get(key, "") | |
| if isinstance(val, dict): | |
| val = val.get("value", "") | |
| if isinstance(val, (int, float)): | |
| return str(val) | |
| if isinstance(val, str) and val.strip(): | |
| return val.strip() | |
| return "" | |
| def _is_review_note(note) -> bool: | |
| invs = getattr(note, "invitations", None) or [] | |
| if isinstance(invs, str): | |
| invs = [invs] | |
| inv_str = " ".join(invs) | |
| return ( | |
| "Official_Review" in inv_str | |
| or inv_str.endswith("/review") | |
| or inv_str.endswith("/Review") | |
| ) | |
| def _is_metareview_note(note) -> bool: | |
| invs = getattr(note, "invitations", None) or [] | |
| if isinstance(invs, str): | |
| invs = [invs] | |
| inv_str = " ".join(invs) | |
| return "Meta_Review" in inv_str or "meta_review" in inv_str | |
| def _is_decision_note(note) -> bool: | |
| invs = getattr(note, "invitations", None) or [] | |
| if isinstance(invs, str): | |
| invs = [invs] | |
| inv_str = " ".join(invs) | |
| return "/-/Decision" in inv_str and "Meta_Review" not in inv_str | |
| def _fetch_from_api(paper_id: str) -> dict: | |
| client = _get_or_client() | |
| if client is None: | |
| raise ValueError( | |
| f"Paper '{paper_id}' not in bundle and OpenReview client unavailable." | |
| ) | |
| try: | |
| notes = client.get_all_notes(forum=paper_id) | |
| except Exception as e: | |
| raise ValueError( | |
| f"Paper '{paper_id}' not found in bundle (32,171 papers). " | |
| f"OpenReview API error: {e}" | |
| ) | |
| if not notes: | |
| raise ValueError(f"Paper '{paper_id}' not found on OpenReview.") | |
| # Extract title and conference from root note | |
| title = "" | |
| conference = "" | |
| for note in notes: | |
| if note.forum == note.id: | |
| content = note.content or {} | |
| title = _extract_field(content, "title") | |
| invs = getattr(note, "invitations", []) or [] | |
| inv = invs[0] if invs else "" | |
| conference = inv.split("/-/")[0] if "/-/" in inv else inv | |
| break | |
| # Collect reviews, meta-review, decision | |
| reviews = [] | |
| metareview = "" | |
| decision = "" | |
| seen_reviewers: set = set() | |
| for note in notes: | |
| content = note.content or {} | |
| if _is_decision_note(note): | |
| decision = _extract_field(content, "decision") | |
| elif _is_metareview_note(note): | |
| parts = [] | |
| for field in ["summary", "reviewer_concerns", "reviewer_scores", "metareview", "comment"]: | |
| val = _extract_field(content, field) | |
| if val: | |
| label = field.replace("_", " ").capitalize() | |
| parts.append(f"**{label}:**\n{val}") | |
| metareview = "\n\n".join(parts) if parts else _extract_field(content, "metareview", "comment") | |
| elif _is_review_note(note): | |
| sigs = getattr(note, "signatures", []) or [] | |
| reviewer_id = sigs[0].split("/")[-1] if sigs else "Unknown" | |
| # Skip non-reviewer entries | |
| if reviewer_id in ("Authors", "Unknown"): | |
| continue | |
| # Skip rebuttal notes (invitation contains "Rebuttal") | |
| invs_str = " ".join(getattr(note, "invitations", []) or []) | |
| if "Rebuttal" in invs_str: | |
| continue | |
| # Skip duplicate reviewer (keep first occurrence) | |
| if reviewer_id in seen_reviewers: | |
| continue | |
| seen_reviewers.add(reviewer_id) | |
| parts = [] | |
| for field in [ | |
| "summary", | |
| "strengths", "weaknesses", | |
| "strengths_and_weaknesses", | |
| # ICML 2025 specific fields | |
| "claims_and_evidence", | |
| "methods_and_evaluation_criteria", | |
| "theoretical_claims", | |
| "experimental_designs_or_analyses", | |
| "other_strengths_and_weaknesses", | |
| "other_comments_or_suggestions", | |
| "questions", "questions_for_authors", | |
| ]: | |
| val = _extract_field(content, field) | |
| if val: | |
| label = field.replace("_", " ").capitalize() | |
| parts.append(f"**{label}:**\n{val}") | |
| review_text = "\n\n".join(parts) if parts else _extract_field(content, "review", "comment") | |
| if not review_text or len(review_text) < 50: | |
| continue | |
| rating = _extract_field(content, "rating", "overall_recommendation", "recommendation", "score") | |
| reviews.append({ | |
| "reviewer_id": reviewer_id, | |
| "review_content": review_text, | |
| "initial_rating": rating or "unknown", | |
| "final_rating": rating or "unknown", | |
| }) | |
| if not reviews: | |
| raise ValueError(f"No reviews found for paper '{paper_id}'.") | |
| return { | |
| "paper_id": paper_id, | |
| "title": title, | |
| "conference": conference, | |
| "metareview": metareview, | |
| "decision": decision, | |
| "reviews": reviews, | |
| } | |
| def fetch_paper_reviews(paper_id: str) -> dict: | |
| """ | |
| 1. Check local bundle (32,171 papers offline) | |
| 2. Fall back to OpenReview API via openreview-py | |
| """ | |
| paper_id = paper_id.strip() | |
| _load_bundle() | |
| # NeurIPS/ICML 2025 bundle has incomplete review content (missing strengths_and_weaknesses) | |
| # — always fetch from API for these conferences | |
| _BUNDLE_INCOMPLETE_CONFS = {"NeurIPS.cc/2025/Conference", "ICML.cc/2025/Conference"} | |
| if paper_id in _bundle: | |
| p = _bundle[paper_id] | |
| if p.get("conference") in _BUNDLE_INCOMPLETE_CONFS: | |
| return _fetch_from_api(paper_id) | |
| reviews = p["reviews"] | |
| metareview = p.get("metareview", "") | |
| decision = p.get("decision", "") | |
| # If all ratings are unknown, try to patch from API | |
| if all(r.get("initial_rating", "unknown") == "unknown" for r in reviews): | |
| try: | |
| api_data = _fetch_from_api(paper_id) | |
| # Build a lookup by reviewer_id | |
| api_ratings = { | |
| r["reviewer_id"]: r["initial_rating"] | |
| for r in api_data.get("reviews", []) | |
| } | |
| reviews = [ | |
| {**r, "initial_rating": api_ratings.get(r["reviewer_id"], "unknown"), | |
| "final_rating": api_ratings.get(r["reviewer_id"], "unknown")} | |
| for r in reviews | |
| ] | |
| if not metareview: | |
| metareview = api_data.get("metareview", "") | |
| if not decision: | |
| decision = api_data.get("decision", "") | |
| except Exception: | |
| pass # silently fall back to bundle data | |
| return { | |
| "paper_id": paper_id, | |
| "title": p.get("title", paper_id), | |
| "conference": p.get("conference", ""), | |
| "metareview": metareview, | |
| "decision": decision, | |
| "reviews": reviews, | |
| } | |
| return _fetch_from_api(paper_id) | |
| def get_bundled_ids() -> list: | |
| _load_bundle() | |
| return sorted(_bundle.keys()) | |