#!/usr/bin/env python3 """ Convert one or more PubMed/MEDLINE (.txt) and/or RIS (.ris) files to a deduplicated Excel (.xlsx). Usage examples: # Single MEDLINE file (backwards-compatible behaviour) python reference_parser.py input_medline.txt output.xlsx # Single RIS file python reference_parser.py scopus.ris output.xlsx # Mixed multiple files (MEDLINE + RIS), merged and deduplicated python reference_parser.py merged.xlsx pubmed1.txt scopus.ris pubmed2.txt Rules: - If the FIRST non-script argument ends with .xlsx, it is treated as OUTPUT, and all remaining arguments are INPUT files. - Otherwise: - If there are 2 arguments: input, output (old behaviour). - If there is 1 argument: input only, output = input with .xlsx suffix. - If there are >2 arguments and none ends with .xlsx: * All are inputs, output = 'merged.xlsx' in current directory. The resulting Excel has: - References sheet: common fields + source metadata + a JSON dump of all raw tags. """ import re import json import sys from pathlib import Path from typing import List, Dict, Any import pandas as pd # public API symbols __all__ = ["parse_references", "run_cli", "process_paths"] # ---------- MEDLINE PARSER ---------- # Corrected regex for MEDLINE tags like: "TI - This is the title" TAG_RE = re.compile(r"^([A-Z0-9]{2,4})\s*-\s(.*)$") def parse_medline_text(text: str) -> List[Dict[str, Any]]: """Parse a PubMed/MEDLINE .txt export into a list of tag dictionaries.""" records: List[Dict[str, Any]] = [] current: Dict[str, List[str]] = {} current_tag: str | None = None def flush(): nonlocal current, current_tag if current: rec = {k: (v[0] if len(v) == 1 else v) for k, v in current.items()} records.append(rec) current = {} current_tag = None for line in text.splitlines(): if not line.strip(): # blank line separates records flush() continue m = TAG_RE.match(line) if m: tag, value = m.group(1), m.group(2).rstrip() current_tag = tag current.setdefault(tag, []).append(value) else: # continuation line if current_tag is None: continue cont = line.strip() current[current_tag][-1] = (current[current_tag][-1] + " " + cont).strip() # flush last record flush() return records def normalize_medline_records(records: List[Dict[str, Any]], source_file: Path) -> List[Dict[str, Any]]: """Convert MEDLINE tag dicts into normalized row dicts.""" rows: List[Dict[str, Any]] = [] for rec in records: def get(tag): return rec.get(tag, "") def join(tag, sep="; "): val = rec.get(tag, "") if isinstance(val, list): return sep.join([v for v in val if v]) return val pmid = str(get("PMID")).strip() if get("PMID") else "" journal = get("JT") or get("TA") or "" dp = get("DP") year = "" if isinstance(dp, str) and dp: year = dp[:4] # DOI in AID field with [doi] suffix doi = "" aid = rec.get("AID", "") if isinstance(aid, list): for a in aid: if "[doi]" in a: doi = a.split(" ")[0] break elif isinstance(aid, str) and "[doi]" in aid: doi = aid.split(" ")[0] url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" if pmid else "" row = { "PMID": pmid, "Title": get("TI") or "", "Abstract": get("AB") or "", "Authors": join("AU"), "FullAuthors": join("FAU"), "Journal": journal, "Year": year, "PublicationDate": dp or "", "DOI": doi, "PMCID": get("PMC") or "", "Language": join("LA"), "PublicationTypes": join("PT"), "MeSH": join("MH"), "Keywords": join("OT"), "URL": url, "SourceFormat": "MEDLINE", } rows.append(row) return rows # ---------- RIS PARSER ---------- def parse_ris_text(text: str) -> List[Dict[str, Any]]: """ Parse an RIS file into a list of tag dictionaries. RIS records are separated by 'ER -'. """ records: List[Dict[str, Any]] = [] current: Dict[str, List[str]] = {} def flush(): nonlocal current if current: # keep as list for multi-valued fields records.append(current) current = {} for line in text.splitlines(): if not line.strip(): # blank line: ignore (RIS uses ER - explicitly) continue if line.startswith("ER -"): flush() continue if len(line) >= 6 and line[2:6] == " - ": tag = line[:2] value = line[6:].strip() current.setdefault(tag, []).append(value) else: # continuation line if not current: continue # append to last added tag last_tag = list(current.keys())[-1] current[last_tag][-1] = (current[last_tag][-1] + " " + line.strip()).strip() # flush last flush() return records def normalize_ris_records(records: List[Dict[str, List[str]]], source_file: Path) -> List[Dict[str, Any]]: """Convert RIS tag dicts into normalized row dicts (same columns as MEDLINE rows).""" rows: List[Dict[str, Any]] = [] def first(rec, *tags): """Return first non-empty value among given tags.""" for t in tags: val = rec.get(t) if not val: continue if isinstance(val, list): if val and val[0]: return val[0] else: if val: return val return "" for rec in records: title = first(rec, "TI", "T1", "T2") or "" abstract = first(rec, "AB") or "" journal = first(rec, "JO", "JF") or "" year = first(rec, "PY", "Y1") or "" if isinstance(year, str) and len(year) >= 4: year = year[:4] else: year = "" authors_list = rec.get("AU", []) or [] if not isinstance(authors_list, list): authors_list = [authors_list] authors = "; ".join([a for a in authors_list if a]) doi = first(rec, "DO") or "" pmid = first(rec, "PM") or "" # some RIS exports may carry PMIDs # URL: prefer DOI if available url = "" if doi: doi_clean = doi.strip() if doi_clean.lower().startswith("http"): url = doi_clean else: url = f"https://doi.org/{doi_clean}" row = { "PMID": pmid, "Title": title, "Abstract": abstract, "Authors": authors, "FullAuthors": authors, # RIS often doesn’t distinguish full vs initials "Journal": journal, "Year": year, "PublicationDate": year, "DOI": doi, "PMCID": "", "Language": first(rec, "LA") or "", "PublicationTypes": first(rec, "PT") or "", "MeSH": "", "Keywords": "; ".join(rec.get("KW", [])) if rec.get("KW") else "", "URL": url, "SourceFormat": "RIS", } rows.append(row) return rows # ---------- DEDUPLICATION ---------- def build_dedup_key(row: pd.Series) -> str: """ Build a deduplication key: 1) If DOI present -> doi: 2) Else if PMID present -> pmid: 3) Else -> title_year:_ """ doi = (row.get("DOI") or "").strip().lower() pmid = (row.get("PMID") or "").strip() title = (row.get("Title") or "").strip().lower() year = (str(row.get("Year") or "")).strip() if doi: # strip URL prefix if any doi = doi.replace("https://doi.org/", "").replace("http://doi.org/", "").strip() return f"doi:{doi}" if pmid: return f"pmid:{pmid}" # fallback: normalized title + year title_norm = re.sub(r"\s+", " ", title) return f"title_year:{title_norm}_{year}" # ---------- CORE PROCESSING ---------- def process_paths( input_paths: List[Path], output_path: Path | None = None ) -> pd.DataFrame: """Parse, normalize and deduplicate references from the specified files. * ``input_paths`` is a list of file paths to MEDLINE or RIS exports. * ``output_path`` if provided will be used to write an Excel file. The caller may choose to inspect or write the returned :class:`DataFrame` themselves. The returned :class:`pandas.DataFrame` contains one row per unique reference and includes a ``DedupKey`` column used internally. """ all_rows: List[Dict[str, Any]] = [] for path in input_paths: if not path.exists(): print(f"Warning: input file not found: {path}", file=sys.stderr) continue text = path.read_text(encoding="utf-8", errors="replace") if path.suffix.lower() in [".txt", ".medline"]: med_records = parse_medline_text(text) rows = normalize_medline_records(med_records, path) all_rows.extend(rows) elif path.suffix.lower() in [".ris"]: ris_records = parse_ris_text(text) rows = normalize_ris_records(ris_records, path) all_rows.extend(rows) else: print(f"Warning: unrecognized file type for {path}, skipping.", file=sys.stderr) if not all_rows: # no data; caller can decide what to do return pd.DataFrame() df = pd.DataFrame(all_rows) if "PMID" in df.columns: df["PMID"] = df["PMID"].astype(str).str.strip() # build deduplication key column; this and the optional # SourceFormat/SourceFile columns are used solely for internal logic # (sorting and deduplication) and will be removed just before the # DataFrame is returned. df["DedupKey"] = df.apply(build_dedup_key, axis=1) # perform deduplication; ordering columns are not strictly required but # keeping SourceFormat when available makes the output deterministic. We # don't drop any of the helper columns until after this step. sort_cols = [c for c in ["SourceFormat"] if c in df.columns] if sort_cols: df = df.sort_values(sort_cols) df = df.drop_duplicates(subset=["DedupKey"], keep="first") # now that we have final deduplicated results, drop internal columns # that callers generally don't need for col in ["SourceFormat", "SourceFile", "DedupKey"]: if col in df.columns: df = df.drop(columns=[col]) if output_path is not None: with pd.ExcelWriter(output_path, engine="openpyxl") as writer: df.to_excel(writer, index=False, sheet_name="References") print(f"Wrote {len(df)} deduplicated records to {output_path}") return df # --------------------------------------------------------------------------- # Public API and CLI wrappers # --------------------------------------------------------------------------- def parse_references( input_paths: List[Path], output_path: Path | None = None ) -> pd.DataFrame: """Programmatic interface for parsing and deduplicating references. ``input_paths`` is a list of :class:`Path` objects (or convertible strings) referring to MEDLINE or RIS export files. ``output_path`` is an optional destination for the resulting Excel workbook; if ``None`` the data frame will still be returned but no file will be written. This function simply delegates to :func:`process_paths` and is intended to be imported and used by other Python code without invoking the CLI logic. """ return process_paths(input_paths, output_path) def run_cli(argv=None): """Command-line entry point. ``argv`` may be provided by callers (e.g. tests); when ``None`` it defaults to ``sys.argv[1:]``. After parsing arguments this function calls :func:`parse_references`. """ if argv is None: argv = sys.argv[1:] if len(argv) < 1: print( "Usage:\n" " python reference_parser.py input_medline.txt [output.xlsx]\n" " python reference_parser.py scopus.ris [output.xlsx]\n" " python reference_parser.py merged.xlsx pubmed1.txt scopus.ris …\n", file=sys.stderr, ) raise SystemExit(2) args = [Path(a) for a in argv] output_path: Path input_paths: List[Path] # Case 1: first argument is an .xlsx -> output first, rest inputs if args[0].suffix.lower() == ".xlsx": output_path = args[0] input_paths = args[1:] if not input_paths: print("Error: no input files provided.", file=sys.stderr) raise SystemExit(2) else: # No explicit output as first argument if len(args) == 1: input_paths = [args[0]] output_path = args[0].with_suffix(".xlsx") elif len(args) == 2 and args[1].suffix.lower() == ".xlsx": input_paths = [args[0]] output_path = args[1] else: input_paths = args output_path = Path("merged.xlsx") # delegate to public API parse_references(input_paths, output_path) if __name__ == "__main__": run_cli()