PolyFusionAgent / PolyAgent /rag_pipeline.py
kaurm43's picture
Update PolyAgent/rag_pipeline.py
0911f37 verified
from __future__ import annotations
import os
import re
import time
import json
import hashlib
import pathlib
import tempfile
from typing import List, Optional, Dict, Any, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import defaultdict
import requests
from tqdm import tqdm
# --------------------------------------------------------------------------------------
# Vector store, loaders, splitters
# --------------------------------------------------------------------------------------
from langchain_community.vectorstores import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader, DirectoryLoader
# --------------------------------------------------------------------------------------
# OpenAI embeddings
# --------------------------------------------------------------------------------------
from langchain_openai import OpenAIEmbeddings
# --------------------------------------------------------------------------------------
# Tokenizer for true token-based multi-scale segmentation
# --------------------------------------------------------------------------------------
import tiktoken
def sanitize_text(text: str) -> str:
"""
Remove surrogate pairs and invalid Unicode characters.
Prevents UnicodeEncodeError when adding documents to ChromaDB.
"""
if not text:
return text
# Replace surrogates and invalid chars with empty string
return text.encode("utf-8", errors="ignore").decode("utf-8", errors="ignore")
# --------------------------------------------------------------------------------------
# ARXIV, OPENALEX, EPMC API URLS
# --------------------------------------------------------------------------------------
ARXIV_SEARCH_URL = "http://export.arxiv.org/api/query"
OPENALEX_WORKS_URL = "https://api.openalex.org/works"
EPMC_SEARCH_URL = "https://www.ebi.ac.uk/europepmc/webservices/rest/search"
DEFAULT_PERSIST_DIR = "chroma_polymer_db"
DEFAULT_TMP_DOWNLOAD_DIR = os.path.join(tempfile.gettempdir(), "polymer_rag_pdfs")
MANIFEST_NAME = "manifest.jsonl"
# --------------------------------------------------------------------------------------
# Balanced target distribution
# --------------------------------------------------------------------------------------
TARGET_CURATED = 100
TARGET_JOURNALS = 200
TARGET_ARXIV = 800
TARGET_OPENALEX = 600
TARGET_EPMC = 200
TARGET_DATABASES = 100
# --------------------------------------------------------------------------------------
# Polymer keywords
# --------------------------------------------------------------------------------------
POLYMER_KEYWORDS = [
"polymer",
"macromolecule",
"macromolecular",
"polymeric",
"polymer informatics",
"polymer chemistry",
"polymer physics",
"PSMILES",
"pSMILES",
"BigSMILES",
"polymer SMILES",
"polymer sequence",
"polymer electrolyte",
"polymer morphology",
"polymer dielectric",
"polymer electrolyte membrane",
"block copolymer",
"biopolymer",
"polymer nanocomposite",
"polymer foundation model",
"self-supervised polymer",
"masked language model polymer",
"polymer transformer",
"generative polymer",
"copolymer",
"polymerization",
"polymer synthesis",
"polymer characterization",
]
# --------------------------------------------------------------------------------------
# IUPAC Guidelines & Standards (polymer nomenclature and terminology standards)
# --------------------------------------------------------------------------------------
CURATED_IUPAC_STANDARDS: List[Dict[str, Any]] = [
{
"url": "https://iupac.org/wp-content/uploads/2019/07/140-Brief-Guide-to-Polymer-Nomenclature-Web-Final-d.pdf",
"name": "IUPAC - Brief Guide to Polymer Nomenclature",
"meta": {
"title": "A Brief Guide to Polymer Nomenclature (IUPAC Technical Report)",
"year": "2012",
"venue": "IUPAC Pure and Applied Chemistry",
"source": "curated_iupac_standard",
},
},
{
"url": "https://rseq.org/wp-content/uploads/2022/10/20220816-English-BriefGuidePolymerTerminology-IUPAC.pdf",
"name": "IUPAC - Brief Guide to Polymerization Terminology",
"meta": {
"title": "A Brief Guide to Polymerization Terminology (IUPAC Recommendations)",
"year": "2022",
"venue": "IUPAC",
"source": "curated_iupac_standard",
},
},
{
"url": "https://www.rsc.org/images/richard-jones-naming-polymers_tcm18-243646.pdf",
"name": "RSC - Naming Polymers",
"meta": {
"title": "Naming Polymers (RSC Educational Resource)",
"year": "2020",
"venue": "Royal Society of Chemistry",
"source": "curated_iupac_standard",
},
},
]
# --------------------------------------------------------------------------------------
# ISO/ASTM Standards (polymer testing and characterization standards)
# --------------------------------------------------------------------------------------
CURATED_ISO_ASTM_STANDARDS: List[Dict[str, Any]] = [
{
"url": "https://cdn.standards.iteh.ai/samples/76910/29c8e7af07bd4188b297c39684ada79e/ISO-ASTM-52925-2022.pdf",
"name": "ISO/ASTM 52925:2022 - Additive Manufacturing Polymers",
"meta": {
"title": "ISO/ASTM 52925:2022 Additive manufacturing of polymers - Feedstock materials",
"year": "2022",
"venue": "ISO/ASTM",
"source": "curated_iso_astm_standard",
},
},
{
"url": "https://cdn.standards.iteh.ai/samples/76909/b9883b2f204248aca175e2f574bd879c/ISO-ASTM-52924-2023.pdf",
"name": "ISO/ASTM 52924:2023 - Additive Manufacturing Qualification",
"meta": {
"title": "ISO/ASTM 52924:2023 Additive manufacturing of polymers - Qualification principles",
"year": "2023",
"venue": "ISO/ASTM",
"source": "curated_iso_astm_standard",
},
},
{
"url": "https://nvlpubs.nist.gov/nistpubs/ir/2015/NIST.IR.8059.pdf",
"name": "NIST IR 8059 - Materials Testing Standards for Additive Manufacturing",
"meta": {
"title": "Materials Testing Standards for Additive Manufacturing of Polymer Materials",
"year": "2015",
"venue": "NIST",
"source": "curated_iso_astm_standard",
},
},
]
# --------------------------------------------------------------------------------------
# Foundational polymer informatics papers
# --------------------------------------------------------------------------------------
CURATED_POLYMER_INFORMATICS: List[Dict[str, Any]] = [
{
"url": "https://ramprasad.mse.gatech.edu/wp-content/uploads/2021/01/polymer-informatics.pdf",
"name": "Polymer Informatics - Current Status and Critical Next Steps",
"meta": {
"title": "Polymer informatics: Current status and critical next steps",
"year": "2020",
"venue": "Materials Science and Engineering: R",
"source": "curated_review_informatics",
},
},
{
"url": "https://arxiv.org/pdf/2011.00508.pdf",
"name": "Polymer Informatics - Current Status (arXiv)",
"meta": {
"title": "Polymer Informatics: Current Status and Critical Next Steps",
"year": "2020",
"venue": "arXiv:2011.00508",
"source": "curated_review_informatics",
},
},
]
# --------------------------------------------------------------------------------------
# BigSMILES notation papers (polymer representation standards)
# --------------------------------------------------------------------------------------
CURATED_BIGSMILES: List[Dict[str, Any]] = [
{
"url": "https://pubs.acs.org/doi/pdf/10.1021/acscentsci.9b00476",
"name": "BigSMILES - Structurally-Based Line Notation",
"meta": {
"title": "BigSMILES: A Structurally-Based Line Notation for Describing Macromolecules",
"year": "2019",
"venue": "ACS Central Science",
"source": "curated_bigsmiles",
},
},
{
"url": "https://www.rsc.org/suppdata/d3/dd/d3dd00147d/d3dd00147d1.pdf",
"name": "Generative BigSMILES - Supplementary Information",
"meta": {
"title": "Generative BigSMILES: an extension for polymer informatics (SI)",
"year": "2024",
"venue": "RSC Digital Discovery",
"source": "curated_bigsmiles",
},
},
]
# --------------------------------------------------------------------------------------
# Combine all curated sources
# --------------------------------------------------------------------------------------
CURATED_POLYMER_PDF_SOURCES = (
CURATED_IUPAC_STANDARDS
+ CURATED_ISO_ASTM_STANDARDS
+ CURATED_POLYMER_INFORMATICS
+ CURATED_BIGSMILES
)
# --------------------------------------------------------------------------------------
# Major polymer journals with OA content
# --------------------------------------------------------------------------------------
POLYMER_JOURNAL_QUERIES = [
# ACS Journals
{"journal": "Macromolecules", "issn": "0024-9297", "publisher": "ACS"},
{"journal": "ACS Polymers Au", "issn": "2768-1939", "publisher": "ACS"},
{"journal": "ACS Applied Polymer Materials", "issn": "2637-6105", "publisher": "ACS"},
{"journal": "Biomacromolecules", "issn": "1525-7797", "publisher": "ACS"},
{"journal": "ACS Macro Letters", "issn": "2161-1653", "publisher": "ACS"},
# RSC Journals
{"journal": "Polymer Chemistry", "issn": "1759-9954", "publisher": "RSC"},
{"journal": "RSC Applied Polymers", "issn": "2755-0656", "publisher": "RSC"},
{"journal": "Soft Matter", "issn": "1744-683X", "publisher": "RSC"},
# Springer/Nature Journals
{"journal": "Polymer Journal", "issn": "0032-3896", "publisher": "Nature"},
{"journal": "Journal of Polymer Science", "issn": "2642-4169", "publisher": "Wiley"},
# Additional OA Journals
{"journal": "Polymer Science and Technology", "issn": "2837-0341", "publisher": "ACS"},
{"journal": "Polymers", "issn": "2073-4360", "publisher": "MDPI"},
]
DEFAULT_MAILTO = "kaur-m43@webmail.uwinnipeg.ca" # polite defaults
# --------------------------------------------------------------------------------------
# DEDUPLICATION, DOWNLOAD, MANIFEST HELPERS
# --------------------------------------------------------------------------------------
def sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def safe_filename(name: str) -> str:
name = str(name or "").strip().replace("/", "_").replace("\\", "_")
name = re.sub(r"[^a-zA-Z0-9._\-]", "_", name)
return name[:200]
def is_probably_pdf(raw: bytes, content_type: str) -> bool:
if not raw:
return False
if raw[:4] == b"%PDF":
return True
return "pdf" in (content_type or "").lower()
def ensure_dir(path: str) -> None:
os.makedirs(path, exist_ok=True)
def append_manifest(out_dir: str, record: Dict[str, Any]) -> None:
try:
ensure_dir(out_dir)
with open(os.path.join(out_dir, MANIFEST_NAME), "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
except Exception:
pass
def load_manifest(out_dir: str) -> Dict[str, Dict[str, Any]]:
data: Dict[str, Dict[str, Any]] = {}
try:
mpath = os.path.join(out_dir, MANIFEST_NAME)
if not os.path.exists(mpath):
return data
with open(mpath, "r", encoding="utf-8") as f:
for line in f:
try:
rec = json.loads(line)
p = rec.get("path")
sha = rec.get("sha256")
if p:
data[p] = rec
if sha:
data[sha] = rec
except Exception:
continue
except Exception:
pass
return data
# --------------------------------------------------------------------------------------
# DOWNLOAD SINGLE PDF
# --------------------------------------------------------------------------------------
def download_pdf(
url: str,
out_dir: str,
suggested_name: Optional[str] = None,
timeout: int = 60,
meta: Optional[Dict[str, Any]] = None,
manifest: Optional[Dict[str, Dict[str, Any]]] = None,
) -> Optional[str]:
"""
Download a PDF and return local file path, or None on failure.
Deduplicates by SHA256 content hash.
Writes manifest record if meta provided.
"""
try:
headers = {"User-Agent": f"polymer-rag/1.0 ({DEFAULT_MAILTO})"}
with requests.get(
url, headers=headers, timeout=timeout, stream=True, allow_redirects=True
) as r:
r.raise_for_status()
content_type = r.headers.get("Content-Type", "")
raw = r.content
if not raw or not is_probably_pdf(raw, content_type):
return None
sha = sha256_bytes(raw)
ensure_dir(out_dir)
# Check manifest for existing SHA
if manifest and sha in manifest:
existing_path = manifest[sha].get("path")
if existing_path and os.path.exists(existing_path):
return existing_path
# Check filesystem for existing files with this hash
existing = list(pathlib.Path(out_dir).glob(f"{sha[:16]}*.pdf"))
if existing:
path = str(existing[0])
if meta:
rec = dict(meta)
rec.update({"sha256": sha, "path": path})
append_manifest(out_dir, rec)
return path
base = suggested_name or pathlib.Path(url).name or "paper.pdf"
base = safe_filename(base)
if not base.lower().endswith(".pdf"):
base += ".pdf"
fname = f"{sha[:16]}_{base}"
fpath = os.path.join(out_dir, fname)
with open(fpath, "wb") as f:
f.write(raw)
if meta:
rec = dict(meta)
rec.update({"sha256": sha, "path": fpath})
append_manifest(out_dir, rec)
return fpath
except Exception:
return None
def retry(fn, args, retries=3, sleep=0.6, **kwargs):
for i in range(retries):
out = fn(*args, **kwargs)
if out:
return out
time.sleep(sleep * (2**i))
return None
def download_one(entry: Union[str, Dict[str, Any]], out_dir: str, manifest: Dict):
if isinstance(entry, dict):
return download_pdf(
entry["url"],
out_dir,
suggested_name=entry.get("name"),
meta=entry.get("meta"),
manifest=manifest,
)
return download_pdf(entry, out_dir, manifest=manifest)
def parallel_download_pdfs(
entries: List[Union[str, Dict[str, Any]]],
out_dir: str,
manifest: Dict[str, Dict[str, Any]],
max_workers: int = 12,
desc: str = "Downloading PDFs",
) -> List[str]:
ensure_dir(out_dir)
results: List[str] = []
if not entries:
return results
with ThreadPoolExecutor(max_workers=max_workers) as ex:
futs = [ex.submit(retry, download_one, (e, out_dir, manifest)) for e in entries]
for f in tqdm(as_completed(futs), total=len(futs), desc=desc):
p = f.result()
if p:
results.append(p)
return results
# --------------------------------------------------------------------------------------
# ARXIV
# --------------------------------------------------------------------------------------
def arxiv_query_from_keywords(keywords: List[str]) -> str:
kw = [k.replace(" ", "+") for k in keywords]
terms = " OR ".join([f"ti:{k}" for k in kw] + [f"abs:{k}" for k in kw])
cats = (
"cat:cond-mat.mtrl-sci OR cat:cond-mat.soft OR cat:physics.chem-ph OR cat:cs.LG OR cat:stat.ML"
)
return f"({terms}) AND ({cats})"
def fetch_arxiv_pdf_urls(keywords: List[str], max_results: int = 800) -> List[str]:
"""
Extract explicit pdf links and fallback to building from id entries.
"""
query = arxiv_query_from_keywords(keywords)
params = {
"search_query": query,
"start": 0,
"max_results": max_results,
"sortBy": "submittedDate",
"sortOrder": "descending",
}
headers = {"User-Agent": f"polymer-rag/1.0 ({DEFAULT_MAILTO})"}
try:
resp = requests.get(ARXIV_SEARCH_URL, params=params, headers=headers, timeout=60)
resp.raise_for_status()
xml = resp.text
except Exception:
return []
pdfs: List[str] = []
seen = set()
# explicit pdf hrefs
for p in re.findall(r'href="(https?://arxiv\.org/pdf[^"]*)"', xml):
if p not in seen:
pdfs.append(p)
seen.add(p)
# fallback: build from id entries
for aid in re.findall(r'<id>(https?://arxiv\.org/abs[^<]*)</id>', xml):
m = re.search(r"arxiv\.org/abs/([^?v]+)", aid)
if m:
identifier = m.group(1)
pdf = f"https://arxiv.org/pdf/{identifier}.pdf"
if pdf not in seen:
pdfs.append(pdf)
seen.add(pdf)
return pdfs
def fetch_arxiv_pdfs(
keywords: List[str],
out_dir: str,
manifest: Dict[str, Dict[str, Any]],
max_results: int = 800,
) -> List[str]:
urls = fetch_arxiv_pdf_urls(keywords, max_results=max_results)
entries = [
{
"url": u,
"name": u.rstrip("/").split("/")[-1],
"meta": {"source": "arxiv", "url": u},
}
for u in urls
]
paths = parallel_download_pdfs(entries, out_dir, manifest, max_workers=8, desc="arXiv PDFs")
return paths
# --------------------------------------------------------------------------------------
# OPENALEX
# --------------------------------------------------------------------------------------
def openalex_fetch_works_try(
search: str,
filter_str: str,
per_page: int,
page: int,
mailto: Optional[str],
) -> Dict[str, Any]:
headers = {"User-Agent": f"polymer-rag/1.0 ({mailto or DEFAULT_MAILTO})"}
params: Dict[str, Any] = {
"search": search,
"per-page": per_page,
"per_page": per_page,
"page": page,
"sort": "publication_date:desc",
}
if filter_str:
params["filter"] = filter_str
if mailto:
params["mailto"] = mailto
resp = requests.get(OPENALEX_WORKS_URL, params=params, headers=headers, timeout=60)
resp.raise_for_status()
return resp.json()
def openalex_fetch_works(
keywords: List[str],
max_results: int = 600,
per_page: int = 200,
mailto: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
Try multiple query forms with relaxed filters if needed.
"""
kws = sorted(set(keywords or []), key=str.lower)
combined = " ".join(kws)
or_query = " OR ".join(kws)
attempts = [
{"q": combined, "filter": "is_oa:true,language:en"},
{"q": or_query, "filter": "is_oa:true,language:en"},
{"q": or_query, "filter": "is_oa:true"},
{"q": or_query, "filter": ""},
]
works: List[Dict[str, Any]] = []
for attempt in attempts:
search = attempt["q"]
filter_str = attempt["filter"]
page = 1
while len(works) < max_results:
try:
data = openalex_fetch_works_try(
search, filter_str, per_page, page, mailto or DEFAULT_MAILTO
)
except Exception as e:
print(f"[WARN] OpenAlex request failed: {e}")
break
results = data.get("results", [])
if not results:
break
works.extend(results)
if len(results) < per_page:
break
page += 1
time.sleep(0.12)
if len(works) >= max_results:
break
if works:
break
return works[:max_results]
def openalex_extract_pdf_entries(
works: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""
Extract candidate PDF URLs and metadata from OpenAlex works.
"""
out: List[Dict[str, Any]] = []
seen_urls = set()
for w in works:
pdf = ""
best = w.get("best_oa_location") or {}
if isinstance(best, dict):
pdf = best.get("pdf_url") or best.get("url_for_pdf") or best.get("url") or ""
if not pdf:
pl = w.get("primary_location") or {}
if isinstance(pl, dict):
pdf = (
pl.get("pdf_url")
or pl.get("url_for_pdf")
or pl.get("landing_page_url")
or ""
)
if not pdf:
oa = w.get("open_access") or {}
if isinstance(oa, dict):
pdf = oa.get("oa_url") or oa.get("oa_url_for_pdf") or ""
if not pdf or pdf in seen_urls:
continue
seen_urls.add(pdf)
title = (w.get("title") or w.get("display_name") or "").strip()
year = w.get("publication_year") or w.get("publication_date") or ""
venue = ""
pl = w.get("primary_location") or {}
if isinstance(pl, dict):
venue = (pl.get("source") or {}).get("display_name") or ""
if not venue:
venue = (w.get("host_venue") or {}).get("display_name") or "".strip()
name = " - ".join([s for s in [title, venue, str(year) or ""] if s])
meta = {"title": title, "year": year, "venue": venue, "source": "openalex"}
out.append({"url": pdf, "name": name, "meta": meta})
return out
def fetch_openalex_pdfs(
keywords: List[str],
out_dir: str,
manifest: Dict[str, Dict[str, Any]],
max_results: int = 600,
mailto: Optional[str] = None,
) -> List[str]:
works = openalex_fetch_works(keywords, max_results=max_results, mailto=mailto)
if not works:
print("[INFO] OpenAlex returned no works for given queries/filters.")
return []
entries = openalex_extract_pdf_entries(works)
if not entries:
print("[INFO] OpenAlex works found, but no PDF links extracted.")
return []
paths = parallel_download_pdfs(
entries, out_dir, manifest, max_workers=16, desc="OpenAlex PDFs"
)
return paths
# --------------------------------------------------------------------------------------
# EUROPE PMC
# --------------------------------------------------------------------------------------
def epmc_query_from_keywords(keywords: List[str]) -> str:
return " OR ".join([f'"{k}"' for k in keywords])
def epmc_extract_pdf_entries_from_results(
results: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
seen = set()
for r in results:
ftl = r.get("fullTextUrlList") or {}
urls: List[str] = []
if isinstance(ftl, dict):
for ful in ftl.get("fullTextUrl") or []:
if isinstance(ful, dict):
u = ful.get("url") or ""
if u:
urls.append(u)
if not urls:
fu = r.get("fullTextUrl")
if isinstance(fu, str) and fu:
urls.append(fu)
for u in urls:
if not u or u in seen:
continue
seen.add(u)
title = r.get("title") or "".strip()
year = r.get("firstPublicationDate") or r.get("pubYear") or ""
name = " - ".join([s for s in [title, str(year) or ""] if s])
out.append(
{
"url": u,
"name": name,
"meta": {"title": title, "year": year, "source": "epmc"},
}
)
return out
def fetch_epmc_pdfs(
keywords: List[str],
out_dir: str,
manifest: Dict[str, Dict[str, Any]],
max_results: int = 200,
page_size: int = 25,
) -> List[str]:
"""
Query Europe PMC and extract fullTextUrlList entries.
"""
q = epmc_query_from_keywords(keywords)
params = {
"query": q,
"format": "json",
"pageSize": page_size,
"sort": "FIRST_PDATE desc",
}
headers = {"User-Agent": f"polymer-rag/1.0 ({DEFAULT_MAILTO})"}
saved: List[str] = []
cursor = 1
total_fetched = 0
while total_fetched < max_results:
params["page"] = cursor
try:
resp = requests.get(EPMC_SEARCH_URL, params=params, headers=headers, timeout=30)
resp.raise_for_status()
data = resp.json()
except Exception as e:
print(f"[WARN] Europe PMC request failed: {e}")
break
results = (data.get("resultList") or {}).get("result") or []
if not results:
break
entries = epmc_extract_pdf_entries_from_results(results)
if not entries:
cursor += 1
total_fetched += len(results)
time.sleep(0.2)
continue
paths = parallel_download_pdfs(entries, out_dir, manifest, max_workers=8, desc="Europe PMC PDFs")
saved.extend(paths)
total_fetched += len(results)
cursor += 1
time.sleep(0.2)
return saved
# --------------------------------------------------------------------------------------
# POLYMER JOURNALS OA
# --------------------------------------------------------------------------------------
def fetch_polymer_journal_pdfs(
journal_queries: List[Dict[str, Any]],
out_dir: str,
manifest: Dict[str, Dict[str, Any]],
max_per_journal: int = 50,
mailto: Optional[str] = None,
) -> List[str]:
"""
Fetch OA papers from specific polymer journals via OpenAlex.
"""
all_paths: List[str] = []
for jq in journal_queries:
journal_name = jq["journal"]
issn = jq.get("issn", "")
publisher = jq.get("publisher", "")
print(f"→ Fetching from {journal_name} ({publisher})...")
# Build OpenAlex filter for this journal
filter_parts = ["is_oa:true", "language:en"]
if issn:
filter_parts.append(f"primary_location.source.issn:{issn}")
filter_str = ",".join(filter_parts)
# Search for polymer-related content in this journal
search_query = "polymer OR macromolecule OR copolymer"
page = 1
journal_works = []
while len(journal_works) < max_per_journal:
try:
data = openalex_fetch_works_try(
search_query, filter_str, 25, page, mailto or DEFAULT_MAILTO
)
except Exception as e:
print(f"[WARN] Failed to fetch {journal_name}: {e}")
break
results = data.get("results", [])
if not results:
break
journal_works.extend(results)
if len(results) < 25:
break
page += 1
time.sleep(0.15)
if journal_works:
entries = openalex_extract_pdf_entries(journal_works[:max_per_journal])
# Tag with journal source
for e in entries:
e["meta"]["journal"] = journal_name
e["meta"]["publisher"] = publisher
e["meta"]["source"] = f"{journal_name}_{publisher}".lower()
paths = parallel_download_pdfs(
entries, out_dir, manifest, max_workers=8, desc=f"{journal_name} PDFs"
)
all_paths.extend(paths)
print(f" → Downloaded {len(paths)} PDFs from {journal_name}")
time.sleep(0.3)
return all_paths
# --------------------------------------------------------------------------------------
# WRAPPER FOR OPENAI EMBEDDINGS
# --------------------------------------------------------------------------------------
class PolymerStyleOpenAIEmbeddings(OpenAIEmbeddings):
"""
OpenAI embeddings wrapper for polymer RAG.
Default model: text-embedding-3-small (1536-D) ← FIXED
"""
def __init__(self, model: str = "text-embedding-3-small", **kwargs):
super().__init__(model=model, **kwargs)
# --------------------------------------------------------------------------------------
# TOKENIZER FOR TRUE TOKEN-BASED SEGMENTATION
# --------------------------------------------------------------------------------------
TOKENIZER = tiktoken.get_encoding("cl100k_base")
def token_length(text: str) -> int:
if not text:
return 0
return len(TOKENIZER.encode(text))
# --------------------------------------------------------------------------------------
# METADATA ENRICHMENT FROM MANIFEST
# --------------------------------------------------------------------------------------
def attach_extra_metadata_from_manifest(
docs: List[Any], manifest: Dict[str, Dict[str, Any]]
) -> None:
"""
Enrich Document metadata with manifest data for later citation.
"""
for d in docs:
src_path = d.metadata.get("source", "")
if not src_path:
continue
rec = manifest.get(src_path)
if not rec:
for k, v in manifest.items():
if os.path.basename(k) == os.path.basename(src_path):
rec = v
break
if rec:
for k in ["title", "year", "venue", "url", "source", "journal", "publisher"]:
if k in rec:
d.metadata[k] = rec[k]
# --------------------------------------------------------------------------------------
# MULTI-SCALE CHUNKING
# --------------------------------------------------------------------------------------
def multiscale_chunk_documents(
docs: List[Any], min_chunk_tokens: int = 32
) -> List[Any]:
"""
Multi-scale segmentation at TOKEN level: 512, 256, 128 token windows.
"""
splitter_specs = [
("tokens=512", 512, 64), # 50% tokens overlap
("tokens=256", 256, 48),
("tokens=128", 128, 32),
]
all_chunks: List[Any] = []
seg_id = 0
for scale_label, chunk_size, overlap in splitter_specs:
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap,
length_function=token_length,
separators=["\n\n", "\n", ". ", " ", ""],
)
splits = splitter.split_documents(docs)
for d in splits:
if token_length(d.page_content or "") < min_chunk_tokens:
continue
d.metadata = dict(d.metadata or {})
d.metadata["segment_scale"] = scale_label
d.metadata["segment_id"] = seg_id
seg_id += 1
all_chunks.append(d)
return all_chunks
# --------------------------------------------------------------------------------------
# BUILD RETRIEVER FROM LOCAL PDFs
# --------------------------------------------------------------------------------------
def _split_and_build_retriever(
documents_dir: str,
persist_dir: Optional[str] = None,
k: int = 10,
embedding_model: str = "text-embedding-3-small",
vector_backend: str = "chroma",
min_chunk_tokens: int = 32,
api_key: Optional[str] = None,
):
"""
Load PDFs, chunk multi-scale, build dense retriever.
FIXED: Always uses text-embedding-3-small (1536-D) and handles existing DB correctly.
"""
print(f"→ Loading PDFs from {documents_dir}...")
try:
loader = DirectoryLoader(
documents_dir,
glob="*.pdf",
loader_cls=PyPDFLoader,
show_progress=True,
use_multithreading=True,
silent_errors=True,
)
except TypeError:
loader = DirectoryLoader(
documents_dir,
glob="*.pdf",
loader_cls=PyPDFLoader,
show_progress=True,
use_multithreading=True,
)
docs = loader.load()
if not docs:
raise RuntimeError("No PDF documents found to index.")
manifest = load_manifest(documents_dir)
attach_extra_metadata_from_manifest(docs, manifest)
documents = multiscale_chunk_documents(docs, min_chunk_tokens=min_chunk_tokens)
print(
f"→ Created {len(documents)} multi-scale segments from {len(docs)} PDFs (512/256/128-token windows)."
)
print(f"→ Using OpenAI embeddings model: {embedding_model}")
embeddings = PolymerStyleOpenAIEmbeddings(model=embedding_model, api_key=api_key)
if vector_backend.lower() == "chroma":
if persist_dir and os.path.exists(persist_dir):
print(f"→ Deleting existing Chroma database at {persist_dir} to prevent dimension mismatch...")
import shutil
shutil.rmtree(persist_dir)
print(f"→ Existing database deleted. Creating fresh database...")
# Sanitize all text content to prevent Unicode errors
for doc in documents:
doc.page_content = sanitize_text(doc.page_content or "")
for key, value in doc.metadata.items():
if isinstance(value, str):
doc.metadata[key] = sanitize_text(value)
# Process in batches to avoid rate limiting and memory issues
batch_size = 500 # Adjust based on your document sizes (500 is safe for most cases)
total_batches = (len(documents) + batch_size - 1) // batch_size
print(f"→ Processing {len(documents)} documents in {total_batches} batches of {batch_size}...")
vector_store = None
for i in range(0, len(documents), batch_size):
batch = documents[i : i + batch_size]
batch_num = (i // batch_size) + 1
print(f" → Embedding batch {batch_num}/{total_batches} ({len(batch)} documents)...")
if vector_store is None:
# First batch: create the vector store
if persist_dir:
print(f" → Creating new Chroma database at {persist_dir}")
vector_store = Chroma.from_documents(
batch, embeddings, persist_directory=persist_dir
)
else:
# In-memory mode also needs batching
vector_store = Chroma.from_documents(batch, embeddings)
else:
# Subsequent batches: add to existing store
vector_store.add_documents(batch)
time.sleep(0.5) # Small delay to avoid rate limiting
elif vector_backend.lower() == "faiss":
try:
from langchain_community.vectorstores import FAISS
except Exception as e:
raise RuntimeError("FAISS requested but not available") from e
# Sanitize all text content
for doc in documents:
doc.page_content = sanitize_text(doc.page_content or "")
for key, value in doc.metadata.items():
if isinstance(value, str):
doc.metadata[key] = sanitize_text(value)
# FAISS also needs batching
batch_size = 500
total_batches = (len(documents) + batch_size - 1) // batch_size
print(f"→ Processing {len(documents)} documents in {total_batches} batches of {batch_size}...")
vector_store = None
for i in range(0, len(documents), batch_size):
batch = documents[i : i + batch_size]
batch_num = (i // batch_size) + 1
print(f" → Embedding batch {batch_num}/{total_batches} ({len(batch)} documents)...")
if vector_store is None:
vector_store = FAISS.from_documents(batch, embeddings)
else:
batch_store = FAISS.from_documents(batch, embeddings)
vector_store.merge_from(batch_store)
time.sleep(0.5)
else:
raise ValueError("vector_backend must be 'chroma' or 'faiss'")
vector_retriever = vector_store.as_retriever(search_kwargs={"k": k})
print("→ RAG KB ready (dense retriever over multi-scale segments).")
return vector_retriever
# --------------------------------------------------------------------------------------
# PUBLIC API: BUILD RETRIEVER FROM WEB
# --------------------------------------------------------------------------------------
def build_retriever_from_web(
polymer_keywords: Optional[List[str]] = None,
target_curated: int = TARGET_CURATED,
target_journals: int = TARGET_JOURNALS,
target_arxiv: int = TARGET_ARXIV,
target_openalex: int = TARGET_OPENALEX,
target_epmc: int = TARGET_EPMC,
extra_pdf_urls: Optional[List[str]] = None,
persist_dir: str = DEFAULT_PERSIST_DIR,
tmp_download_dir: str = DEFAULT_TMP_DOWNLOAD_DIR,
k: int = 10,
embedding_model: str = "text-embedding-3-small",
vector_backend: str = "chroma",
mailto: Optional[str] = None,
include_curated: bool = True,
):
"""
Fetch balanced polymer corpus across multiple sources.
Target distribution (~2000 PDFs):
- Curated guidelines/standards: 100
- Polymer journals OA: 200
- arXiv: 800
- OpenAlex: 600
- Europe PMC: 200
- Extra/databases: 100
"""
polymer_keywords = sorted(set(polymer_keywords or POLYMER_KEYWORDS), key=str.lower)
print("=" * 70)
print("Fetching polymer PDFs from balanced sources...")
print(
f"Target: {target_curated} curated + {target_journals} journals + "
f"{target_arxiv} arXiv + {target_openalex} OpenAlex + {target_epmc} EPMC"
)
ensure_dir(tmp_download_dir)
manifest = load_manifest(tmp_download_dir)
source_stats = defaultdict(int)
all_paths: List[str] = []
# --------------------------------------------------------------------------------------
# 1) Curated sources (IUPAC, ISO/ASTM, polymer informatics reviews)
# --------------------------------------------------------------------------------------
if include_curated and CURATED_POLYMER_PDF_SOURCES:
print(f"[1/6] Downloading {len(CURATED_POLYMER_PDF_SOURCES)} curated PDFs...")
curated_paths = parallel_download_pdfs(
CURATED_POLYMER_PDF_SOURCES[:target_curated],
tmp_download_dir,
manifest,
max_workers=4,
desc="Curated PDFs",
)
for p in curated_paths:
if p not in all_paths:
all_paths.append(p)
source_stats["curated"] += 1
print(f" → {len(curated_paths)} curated PDFs downloaded")
# --------------------------------------------------------------------------------------
# 2) Polymer journals OA
# --------------------------------------------------------------------------------------
try:
print(f"[2/6] Fetching polymer journal PDFs (target: {target_journals})...")
journal_paths = fetch_polymer_journal_pdfs(
POLYMER_JOURNAL_QUERIES,
tmp_download_dir,
manifest,
max_per_journal=target_journals // len(POLYMER_JOURNAL_QUERIES) + 1,
mailto=mailto,
)
for p in journal_paths:
if p not in all_paths:
all_paths.append(p)
source_stats["journal"] += 1
print(f" → {len(journal_paths)} journal PDFs downloaded")
except Exception as e:
print(f"[WARN] Polymer journal fetch error: {e}")
# --------------------------------------------------------------------------------------
# 3) arXiv polymer-focused categories
# --------------------------------------------------------------------------------------
try:
print(f"[3/6] Fetching arXiv PDFs (target: {target_arxiv})...")
arxiv_paths = fetch_arxiv_pdfs(
polymer_keywords, tmp_download_dir, manifest, max_results=target_arxiv
)
for p in arxiv_paths:
if p not in all_paths:
all_paths.append(p)
source_stats["arxiv"] += 1
print(f" → {len(arxiv_paths)} arXiv PDFs downloaded")
except Exception as e:
print(f"[WARN] arXiv fetch error: {e}")
# --------------------------------------------------------------------------------------
# 4) OpenAlex broad polymer search
# --------------------------------------------------------------------------------------
try:
print(f"[4/6] Fetching OpenAlex PDFs (target: {target_openalex})...")
openalex_paths = fetch_openalex_pdfs(
polymer_keywords,
tmp_download_dir,
manifest,
max_results=target_openalex,
mailto=mailto,
)
for p in openalex_paths:
if p not in all_paths:
all_paths.append(p)
source_stats["openalex"] += 1
print(f" → {len(openalex_paths)} OpenAlex PDFs downloaded")
except Exception as e:
print(f"[WARN] OpenAlex fetch error: {e}")
# --------------------------------------------------------------------------------------
# 5) Europe PMC biopolymers/materials
# --------------------------------------------------------------------------------------
try:
print(f"[5/6] Fetching Europe PMC PDFs (target: {target_epmc})...")
epmc_paths = fetch_epmc_pdfs(
polymer_keywords, tmp_download_dir, manifest, max_results=target_epmc
)
for p in epmc_paths:
if p not in all_paths:
all_paths.append(p)
source_stats["epmc"] += 1
print(f" → {len(epmc_paths)} Europe PMC PDFs downloaded")
except Exception as e:
print(f"[WARN] Europe PMC fetch error: {e}")
# --------------------------------------------------------------------------------------
# 6) Extra URLs (user-provided, database exports, etc.)
# --------------------------------------------------------------------------------------
if extra_pdf_urls:
print(f"[6/6] Downloading {len(extra_pdf_urls)} extra PDFs...")
extra_entries = [
{"url": u, "name": None, "meta": {"url": u, "source": "extra"}}
for u in extra_pdf_urls
]
extra_paths = parallel_download_pdfs(
extra_entries, tmp_download_dir, manifest, max_workers=8, desc="Extra PDFs"
)
for p in extra_paths:
if p not in all_paths:
all_paths.append(p)
source_stats["extra"] += 1
print(f" → {len(extra_paths)} extra PDFs downloaded")
# --------------------------------------------------------------------------------------
# Summary
# --------------------------------------------------------------------------------------
total = len(all_paths)
print("=" * 70)
print("DOWNLOAD SUMMARY")
print("=" * 70)
print(f"Total unique PDFs downloaded: {total}")
print(" by source:")
for source, count in sorted(source_stats.items()):
pct = (count / total * 100) if total > 0 else 0
print(f" {source:20s} {count:4d} PDFs ({pct:5.1f}%)")
print("=" * 70)
if total == 0:
raise RuntimeError(
"No PDFs fetched. Adjust keywords, targets, or add extra_pdf_urls."
)
print("Building knowledge base from downloaded PDFs...")
retriever = _split_and_build_retriever(
documents_dir=tmp_download_dir,
persist_dir=persist_dir,
k=k,
embedding_model=embedding_model,
vector_backend=vector_backend,
)
return retriever
# --------------------------------------------------------------------------------------
# PUBLIC API: BUILD RETRIEVER FROM LOCAL PAPERS
# --------------------------------------------------------------------------------------
def build_retriever(
papers_path: str,
persist_dir: Optional[str] = DEFAULT_PERSIST_DIR,
k: int = 10,
embedding_model: str = "text-embedding-3-small",
vector_backend: str = "chroma",
):
"""
Build polymer RAG KB from local PDFs.
"""
print("Building RAG knowledge base from local PDFs...")
return _split_and_build_retriever(
documents_dir=papers_path,
persist_dir=persist_dir,
k=k,
embedding_model=embedding_model,
vector_backend=vector_backend,
)
# --------------------------------------------------------------------------------------
# CONVENIENCE WRAPPER: POLYMER FOUNDATION MODELS
# --------------------------------------------------------------------------------------
def build_retriever_polymer_foundation_models(
persist_dir: str = DEFAULT_PERSIST_DIR,
k: int = 10,
vector_backend: str = "chroma",
):
"""
Convenience wrapper for polymer foundation model corpus.
"""
fm_kw = list(
set(POLYMER_KEYWORDS)
| {
"BigSMILES",
"PSMILES",
"polymer SMILES",
"polymer language model",
"foundation model polymer",
"masked language model polymer",
"self-supervised polymer",
"generative polymer",
"polymer sequence modeling",
"representation learning polymer",
}
)
return build_retriever_from_web(
polymer_keywords=fm_kw,
target_curated=100,
target_journals=200,
target_arxiv=800,
target_openalex=600,
target_epmc=200,
persist_dir=persist_dir,
k=k,
embedding_model="text-embedding-3-small",
vector_backend=vector_backend,
)
# --------------------------------------------------------------------------------------
# MAIN
# --------------------------------------------------------------------------------------
if __name__ == "__main__":
retriever = build_retriever_from_web(
polymer_keywords=POLYMER_KEYWORDS,
target_curated=100,
target_journals=200,
target_arxiv=800,
target_openalex=600,
target_epmc=200,
persist_dir="chroma_polymer_db_balanced",
tmp_download_dir=DEFAULT_TMP_DOWNLOAD_DIR,
k=10,
embedding_model="text-embedding-3-small",
vector_backend="chroma",
mailto=DEFAULT_MAILTO,
include_curated=True,
)
print("\n" + "=" * 70)
print("Testing retrieval with sample query")
docs = retriever.get_relevant_documents("PSMILES polymer electrolyte design")
for i, d in enumerate(docs, 1):
meta = d.metadata or {}
title = meta.get("title") or os.path.basename(meta.get("source", "")) or "document"
year = meta.get("year", "")
src = meta.get("source", "unknown")
journal = meta.get("journal", "")
scale = meta.get("segment_scale", "")
source_str = f"{src}"
if journal:
source_str = f"{journal} ({src})"
print(f"\n[{i}] {title}")
print(f" Year: {year} | Source: {source_str} | Scale: {scale}")
print(f" Content: {(d.page_content or '')[:200]}...")