Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import re | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Dict, Iterable, List | |
| ROOT = Path(__file__).resolve().parents[1] | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| RAW_DATASET_PATH = ROOT / "data" / "raw" / "scientific_papers_2m.jsonl" | |
| RAW_METADATA_PATH = ROOT / "data" / "raw" / "scientific_papers_2m_metadata.json" | |
| EVAL_QUESTIONS_PATH = ROOT / "data" / "eval" / "scientific_eval_questions.json" | |
| BENCHMARK_RESULTS_PATH = ROOT / "data" / "results" / "scientific_benchmark_results.json" | |
| ACCURACY_REPORT_PATH = ROOT / "data" / "results" / "scientific_accuracy_report.json" | |
| FINAL_SUMMARY_PATH = ROOT / "data" / "results" / "final_summary.json" | |
| PIPELINES = ("llm_only", "basic_rag", "graphrag") | |
| def ensure_parent(path: Path) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| def read_json(path: Path, fallback: Any) -> Any: | |
| try: | |
| if not path.exists(): | |
| return fallback | |
| content = path.read_text(encoding="utf-8").strip() | |
| return json.loads(content) if content else fallback | |
| except json.JSONDecodeError: | |
| return fallback | |
| def write_json(path: Path, data: Any) -> None: | |
| ensure_parent(path) | |
| path.write_text(json.dumps(data, indent=2), encoding="utf-8") | |
| def iter_jsonl(path: Path) -> Iterable[Dict[str, Any]]: | |
| with path.open(encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| yield json.loads(line) | |
| def write_jsonl(path: Path, rows: Iterable[Dict[str, Any]]) -> None: | |
| ensure_parent(path) | |
| with path.open("w", encoding="utf-8") as f: | |
| for row in rows: | |
| f.write(json.dumps(row, ensure_ascii=False) + "\n") | |
| def token_counter(): | |
| try: | |
| import tiktoken | |
| enc = tiktoken.get_encoding("cl100k_base") | |
| return lambda text: len(enc.encode(text or "")) | |
| except Exception: | |
| return lambda text: max(1, len((text or "").split())) | |
| def combined_doc_text(doc: Dict[str, Any]) -> str: | |
| parts = [ | |
| doc.get("title", ""), | |
| doc.get("abstract", ""), | |
| doc.get("article", ""), | |
| ] | |
| return "\n\n".join(part for part in parts if part) | |
| def normalize_sections(value: Any) -> List[str]: | |
| if isinstance(value, list): | |
| return [str(v) for v in value if str(v).strip()] | |
| if isinstance(value, str): | |
| return [v.strip() for v in re.split(r"\n|/n|\\n|;", value) if v.strip()] | |
| return [] | |
| def safe_doc_id(index: int, row: Dict[str, Any]) -> str: | |
| existing = row.get("doc_id") or row.get("paper_id") or row.get("id") | |
| if existing: | |
| return str(existing) | |
| return f"arxiv_{index:05d}" | |
| def estimate_cost(tokens: int, cost_per_1k: float | None = None) -> float: | |
| rate = cost_per_1k | |
| if rate is None: | |
| rate = float(os.getenv("BENCHMARK_COST_PER_1K", "0.002")) | |
| return (tokens / 1000) * rate | |
| def words(text: str) -> List[str]: | |
| return re.findall(r"\b[A-Za-z][A-Za-z0-9-]{2,}\b", text or "") | |
| def first_sentence(text: str, max_chars: int = 360) -> str: | |
| cleaned = " ".join((text or "").split()) | |
| if not cleaned: | |
| return "" | |
| pieces = re.split(r"(?<=[.!?])\s+", cleaned) | |
| sentence = pieces[0] if pieces else cleaned | |
| return sentence[:max_chars].rstrip() | |
| def top_terms(text: str, limit: int = 5) -> List[str]: | |
| stop = { | |
| "the", | |
| "and", | |
| "for", | |
| "that", | |
| "with", | |
| "this", | |
| "from", | |
| "are", | |
| "was", | |
| "were", | |
| "paper", | |
| "using", | |
| "method", | |
| "results", | |
| "show", | |
| } | |
| counts: Dict[str, int] = {} | |
| for term in words(text.lower()): | |
| if len(term) < 4 or term in stop: | |
| continue | |
| counts[term] = counts.get(term, 0) + 1 | |
| return [term for term, _ in sorted(counts.items(), key=lambda item: item[1], reverse=True)[:limit]] | |