LLM_Screener / reference_parser.py
diogo.rodrigues.silva
First commit
19e3935
raw
history blame
13.6 kB
#!/usr/bin/env python3
"""
Convert one or more PubMed/MEDLINE (.txt) and/or RIS (.ris) files to a deduplicated Excel (.xlsx).
Usage examples:
# Single MEDLINE file (backwards-compatible behaviour)
python reference_parser.py input_medline.txt output.xlsx
# Single RIS file
python reference_parser.py scopus.ris output.xlsx
# Mixed multiple files (MEDLINE + RIS), merged and deduplicated
python reference_parser.py merged.xlsx pubmed1.txt scopus.ris pubmed2.txt
Rules:
- If the FIRST non-script argument ends with .xlsx, it is treated as OUTPUT,
and all remaining arguments are INPUT files.
- Otherwise:
- If there are 2 arguments: input, output (old behaviour).
- If there is 1 argument: input only, output = input with .xlsx suffix.
- If there are >2 arguments and none ends with .xlsx:
* All are inputs, output = 'merged.xlsx' in current directory.
The resulting Excel has:
- References sheet: common fields + source metadata + a JSON dump of all raw tags.
"""
import re
import json
import sys
from pathlib import Path
from typing import List, Dict, Any
import pandas as pd
# public API symbols
__all__ = ["parse_references", "run_cli", "process_paths"]
# ---------- MEDLINE PARSER ----------
# Corrected regex for MEDLINE tags like: "TI - This is the title"
TAG_RE = re.compile(r"^([A-Z0-9]{2,4})\s*-\s(.*)$")
def parse_medline_text(text: str) -> List[Dict[str, Any]]:
"""Parse a PubMed/MEDLINE .txt export into a list of tag dictionaries."""
records: List[Dict[str, Any]] = []
current: Dict[str, List[str]] = {}
current_tag: str | None = None
def flush():
nonlocal current, current_tag
if current:
rec = {k: (v[0] if len(v) == 1 else v) for k, v in current.items()}
records.append(rec)
current = {}
current_tag = None
for line in text.splitlines():
if not line.strip():
# blank line separates records
flush()
continue
m = TAG_RE.match(line)
if m:
tag, value = m.group(1), m.group(2).rstrip()
current_tag = tag
current.setdefault(tag, []).append(value)
else:
# continuation line
if current_tag is None:
continue
cont = line.strip()
current[current_tag][-1] = (current[current_tag][-1] + " " + cont).strip()
# flush last record
flush()
return records
def normalize_medline_records(records: List[Dict[str, Any]], source_file: Path) -> List[Dict[str, Any]]:
"""Convert MEDLINE tag dicts into normalized row dicts."""
rows: List[Dict[str, Any]] = []
for rec in records:
def get(tag):
return rec.get(tag, "")
def join(tag, sep="; "):
val = rec.get(tag, "")
if isinstance(val, list):
return sep.join([v for v in val if v])
return val
pmid = str(get("PMID")).strip() if get("PMID") else ""
journal = get("JT") or get("TA") or ""
dp = get("DP")
year = ""
if isinstance(dp, str) and dp:
year = dp[:4]
# DOI in AID field with [doi] suffix
doi = ""
aid = rec.get("AID", "")
if isinstance(aid, list):
for a in aid:
if "[doi]" in a:
doi = a.split(" ")[0]
break
elif isinstance(aid, str) and "[doi]" in aid:
doi = aid.split(" ")[0]
url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" if pmid else ""
row = {
"PMID": pmid,
"Title": get("TI") or "",
"Abstract": get("AB") or "",
"Authors": join("AU"),
"FullAuthors": join("FAU"),
"Journal": journal,
"Year": year,
"PublicationDate": dp or "",
"DOI": doi,
"PMCID": get("PMC") or "",
"Language": join("LA"),
"PublicationTypes": join("PT"),
"MeSH": join("MH"),
"Keywords": join("OT"),
"URL": url,
"SourceFormat": "MEDLINE",
}
rows.append(row)
return rows
# ---------- RIS PARSER ----------
def parse_ris_text(text: str) -> List[Dict[str, Any]]:
"""
Parse an RIS file into a list of tag dictionaries.
RIS records are separated by 'ER -'.
"""
records: List[Dict[str, Any]] = []
current: Dict[str, List[str]] = {}
def flush():
nonlocal current
if current:
# keep as list for multi-valued fields
records.append(current)
current = {}
for line in text.splitlines():
if not line.strip():
# blank line: ignore (RIS uses ER - explicitly)
continue
if line.startswith("ER -"):
flush()
continue
if len(line) >= 6 and line[2:6] == " - ":
tag = line[:2]
value = line[6:].strip()
current.setdefault(tag, []).append(value)
else:
# continuation line
if not current:
continue
# append to last added tag
last_tag = list(current.keys())[-1]
current[last_tag][-1] = (current[last_tag][-1] + " " + line.strip()).strip()
# flush last
flush()
return records
def normalize_ris_records(records: List[Dict[str, List[str]]], source_file: Path) -> List[Dict[str, Any]]:
"""Convert RIS tag dicts into normalized row dicts (same columns as MEDLINE rows)."""
rows: List[Dict[str, Any]] = []
def first(rec, *tags):
"""Return first non-empty value among given tags."""
for t in tags:
val = rec.get(t)
if not val:
continue
if isinstance(val, list):
if val and val[0]:
return val[0]
else:
if val:
return val
return ""
for rec in records:
title = first(rec, "TI", "T1", "T2") or ""
abstract = first(rec, "AB") or ""
journal = first(rec, "JO", "JF") or ""
year = first(rec, "PY", "Y1") or ""
if isinstance(year, str) and len(year) >= 4:
year = year[:4]
else:
year = ""
authors_list = rec.get("AU", []) or []
if not isinstance(authors_list, list):
authors_list = [authors_list]
authors = "; ".join([a for a in authors_list if a])
doi = first(rec, "DO") or ""
pmid = first(rec, "PM") or "" # some RIS exports may carry PMIDs
# URL: prefer DOI if available
url = ""
if doi:
doi_clean = doi.strip()
if doi_clean.lower().startswith("http"):
url = doi_clean
else:
url = f"https://doi.org/{doi_clean}"
row = {
"PMID": pmid,
"Title": title,
"Abstract": abstract,
"Authors": authors,
"FullAuthors": authors, # RIS often doesn’t distinguish full vs initials
"Journal": journal,
"Year": year,
"PublicationDate": year,
"DOI": doi,
"PMCID": "",
"Language": first(rec, "LA") or "",
"PublicationTypes": first(rec, "PT") or "",
"MeSH": "",
"Keywords": "; ".join(rec.get("KW", [])) if rec.get("KW") else "",
"URL": url,
"SourceFormat": "RIS",
}
rows.append(row)
return rows
# ---------- DEDUPLICATION ----------
def build_dedup_key(row: pd.Series) -> str:
"""
Build a deduplication key:
1) If DOI present -> doi:<normalized_doi>
2) Else if PMID present -> pmid:<pmid>
3) Else -> title_year:<normalized_title>_<year>
"""
doi = (row.get("DOI") or "").strip().lower()
pmid = (row.get("PMID") or "").strip()
title = (row.get("Title") or "").strip().lower()
year = (str(row.get("Year") or "")).strip()
if doi:
# strip URL prefix if any
doi = doi.replace("https://doi.org/", "").replace("http://doi.org/", "").strip()
return f"doi:{doi}"
if pmid:
return f"pmid:{pmid}"
# fallback: normalized title + year
title_norm = re.sub(r"\s+", " ", title)
return f"title_year:{title_norm}_{year}"
# ---------- CORE PROCESSING ----------
def process_paths(
input_paths: List[Path], output_path: Path | None = None
) -> pd.DataFrame:
"""Parse, normalize and deduplicate references from the specified
files.
* ``input_paths`` is a list of file paths to MEDLINE or RIS exports.
* ``output_path`` if provided will be used to write an Excel file. The
caller may choose to inspect or write the returned :class:`DataFrame`
themselves.
The returned :class:`pandas.DataFrame` contains one row per unique
reference and includes a ``DedupKey`` column used internally.
"""
all_rows: List[Dict[str, Any]] = []
for path in input_paths:
if not path.exists():
print(f"Warning: input file not found: {path}", file=sys.stderr)
continue
text = path.read_text(encoding="utf-8", errors="replace")
if path.suffix.lower() in [".txt", ".medline"]:
med_records = parse_medline_text(text)
rows = normalize_medline_records(med_records, path)
all_rows.extend(rows)
elif path.suffix.lower() in [".ris"]:
ris_records = parse_ris_text(text)
rows = normalize_ris_records(ris_records, path)
all_rows.extend(rows)
else:
print(f"Warning: unrecognized file type for {path}, skipping.", file=sys.stderr)
if not all_rows:
# no data; caller can decide what to do
return pd.DataFrame()
df = pd.DataFrame(all_rows)
if "PMID" in df.columns:
df["PMID"] = df["PMID"].astype(str).str.strip()
# build deduplication key column; this and the optional
# SourceFormat/SourceFile columns are used solely for internal logic
# (sorting and deduplication) and will be removed just before the
# DataFrame is returned.
df["DedupKey"] = df.apply(build_dedup_key, axis=1)
# perform deduplication; ordering columns are not strictly required but
# keeping SourceFormat when available makes the output deterministic. We
# don't drop any of the helper columns until after this step.
sort_cols = [c for c in ["SourceFormat"] if c in df.columns]
if sort_cols:
df = df.sort_values(sort_cols)
df = df.drop_duplicates(subset=["DedupKey"], keep="first")
# now that we have final deduplicated results, drop internal columns
# that callers generally don't need
for col in ["SourceFormat", "SourceFile", "DedupKey"]:
if col in df.columns:
df = df.drop(columns=[col])
if output_path is not None:
with pd.ExcelWriter(output_path, engine="openpyxl") as writer:
df.to_excel(writer, index=False, sheet_name="References")
print(f"Wrote {len(df)} deduplicated records to {output_path}")
return df
# ---------------------------------------------------------------------------
# Public API and CLI wrappers
# ---------------------------------------------------------------------------
def parse_references(
input_paths: List[Path], output_path: Path | None = None
) -> pd.DataFrame:
"""Programmatic interface for parsing and deduplicating references.
``input_paths`` is a list of :class:`Path` objects (or convertible
strings) referring to MEDLINE or RIS export files. ``output_path`` is an
optional destination for the resulting Excel workbook; if ``None`` the
data frame will still be returned but no file will be written.
This function simply delegates to :func:`process_paths` and is intended to
be imported and used by other Python code without invoking the CLI logic.
"""
return process_paths(input_paths, output_path)
def run_cli(argv=None):
"""Command-line entry point.
``argv`` may be provided by callers (e.g. tests); when ``None`` it defaults
to ``sys.argv[1:]``. After parsing arguments this function calls
:func:`parse_references`.
"""
if argv is None:
argv = sys.argv[1:]
if len(argv) < 1:
print(
"Usage:\n"
" python reference_parser.py input_medline.txt [output.xlsx]\n"
" python reference_parser.py scopus.ris [output.xlsx]\n"
" python reference_parser.py merged.xlsx pubmed1.txt scopus.ris …\n",
file=sys.stderr,
)
raise SystemExit(2)
args = [Path(a) for a in argv]
output_path: Path
input_paths: List[Path]
# Case 1: first argument is an .xlsx -> output first, rest inputs
if args[0].suffix.lower() == ".xlsx":
output_path = args[0]
input_paths = args[1:]
if not input_paths:
print("Error: no input files provided.", file=sys.stderr)
raise SystemExit(2)
else:
# No explicit output as first argument
if len(args) == 1:
input_paths = [args[0]]
output_path = args[0].with_suffix(".xlsx")
elif len(args) == 2 and args[1].suffix.lower() == ".xlsx":
input_paths = [args[0]]
output_path = args[1]
else:
input_paths = args
output_path = Path("merged.xlsx")
# delegate to public API
parse_references(input_paths, output_path)
if __name__ == "__main__":
run_cli()