Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Convert one or more PubMed/MEDLINE (.txt) and/or RIS (.ris) files to a deduplicated Excel (.xlsx). | |
| Usage examples: | |
| # Single MEDLINE file (backwards-compatible behaviour) | |
| python reference_parser.py input_medline.txt output.xlsx | |
| # Single RIS file | |
| python reference_parser.py scopus.ris output.xlsx | |
| # Mixed multiple files (MEDLINE + RIS), merged and deduplicated | |
| python reference_parser.py merged.xlsx pubmed1.txt scopus.ris pubmed2.txt | |
| Rules: | |
| - If the FIRST non-script argument ends with .xlsx, it is treated as OUTPUT, | |
| and all remaining arguments are INPUT files. | |
| - Otherwise: | |
| - If there are 2 arguments: input, output (old behaviour). | |
| - If there is 1 argument: input only, output = input with .xlsx suffix. | |
| - If there are >2 arguments and none ends with .xlsx: | |
| * All are inputs, output = 'merged.xlsx' in current directory. | |
| The resulting Excel has: | |
| - References sheet: common fields + source metadata + a JSON dump of all raw tags. | |
| """ | |
| import re | |
| import json | |
| import sys | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| import pandas as pd | |
| # public API symbols | |
| __all__ = ["parse_references", "run_cli", "process_paths"] | |
| # ---------- MEDLINE PARSER ---------- | |
| # Corrected regex for MEDLINE tags like: "TI - This is the title" | |
| TAG_RE = re.compile(r"^([A-Z0-9]{2,4})\s*-\s(.*)$") | |
| def parse_medline_text(text: str) -> List[Dict[str, Any]]: | |
| """Parse a PubMed/MEDLINE .txt export into a list of tag dictionaries.""" | |
| records: List[Dict[str, Any]] = [] | |
| current: Dict[str, List[str]] = {} | |
| current_tag: str | None = None | |
| def flush(): | |
| nonlocal current, current_tag | |
| if current: | |
| rec = {k: (v[0] if len(v) == 1 else v) for k, v in current.items()} | |
| records.append(rec) | |
| current = {} | |
| current_tag = None | |
| for line in text.splitlines(): | |
| if not line.strip(): | |
| # blank line separates records | |
| flush() | |
| continue | |
| m = TAG_RE.match(line) | |
| if m: | |
| tag, value = m.group(1), m.group(2).rstrip() | |
| current_tag = tag | |
| current.setdefault(tag, []).append(value) | |
| else: | |
| # continuation line | |
| if current_tag is None: | |
| continue | |
| cont = line.strip() | |
| current[current_tag][-1] = (current[current_tag][-1] + " " + cont).strip() | |
| # flush last record | |
| flush() | |
| return records | |
| def normalize_medline_records(records: List[Dict[str, Any]], source_file: Path) -> List[Dict[str, Any]]: | |
| """Convert MEDLINE tag dicts into normalized row dicts.""" | |
| rows: List[Dict[str, Any]] = [] | |
| for rec in records: | |
| def get(tag): | |
| return rec.get(tag, "") | |
| def join(tag, sep="; "): | |
| val = rec.get(tag, "") | |
| if isinstance(val, list): | |
| return sep.join([v for v in val if v]) | |
| return val | |
| pmid = str(get("PMID")).strip() if get("PMID") else "" | |
| journal = get("JT") or get("TA") or "" | |
| dp = get("DP") | |
| year = "" | |
| if isinstance(dp, str) and dp: | |
| year = dp[:4] | |
| # DOI in AID field with [doi] suffix | |
| doi = "" | |
| aid = rec.get("AID", "") | |
| if isinstance(aid, list): | |
| for a in aid: | |
| if "[doi]" in a: | |
| doi = a.split(" ")[0] | |
| break | |
| elif isinstance(aid, str) and "[doi]" in aid: | |
| doi = aid.split(" ")[0] | |
| url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" if pmid else "" | |
| row = { | |
| "PMID": pmid, | |
| "Title": get("TI") or "", | |
| "Abstract": get("AB") or "", | |
| "Authors": join("AU"), | |
| "FullAuthors": join("FAU"), | |
| "Journal": journal, | |
| "Year": year, | |
| "PublicationDate": dp or "", | |
| "DOI": doi, | |
| "PMCID": get("PMC") or "", | |
| "Language": join("LA"), | |
| "PublicationTypes": join("PT"), | |
| "MeSH": join("MH"), | |
| "Keywords": join("OT"), | |
| "URL": url, | |
| "SourceFormat": "MEDLINE", | |
| } | |
| rows.append(row) | |
| return rows | |
| # ---------- RIS PARSER ---------- | |
| def parse_ris_text(text: str) -> List[Dict[str, Any]]: | |
| """ | |
| Parse an RIS file into a list of tag dictionaries. | |
| RIS records are separated by 'ER -'. | |
| """ | |
| records: List[Dict[str, Any]] = [] | |
| current: Dict[str, List[str]] = {} | |
| def flush(): | |
| nonlocal current | |
| if current: | |
| # keep as list for multi-valued fields | |
| records.append(current) | |
| current = {} | |
| for line in text.splitlines(): | |
| if not line.strip(): | |
| # blank line: ignore (RIS uses ER - explicitly) | |
| continue | |
| if line.startswith("ER -"): | |
| flush() | |
| continue | |
| if len(line) >= 6 and line[2:6] == " - ": | |
| tag = line[:2] | |
| value = line[6:].strip() | |
| current.setdefault(tag, []).append(value) | |
| else: | |
| # continuation line | |
| if not current: | |
| continue | |
| # append to last added tag | |
| last_tag = list(current.keys())[-1] | |
| current[last_tag][-1] = (current[last_tag][-1] + " " + line.strip()).strip() | |
| # flush last | |
| flush() | |
| return records | |
| def normalize_ris_records(records: List[Dict[str, List[str]]], source_file: Path) -> List[Dict[str, Any]]: | |
| """Convert RIS tag dicts into normalized row dicts (same columns as MEDLINE rows).""" | |
| rows: List[Dict[str, Any]] = [] | |
| def first(rec, *tags): | |
| """Return first non-empty value among given tags.""" | |
| for t in tags: | |
| val = rec.get(t) | |
| if not val: | |
| continue | |
| if isinstance(val, list): | |
| if val and val[0]: | |
| return val[0] | |
| else: | |
| if val: | |
| return val | |
| return "" | |
| for rec in records: | |
| title = first(rec, "TI", "T1", "T2") or "" | |
| abstract = first(rec, "AB") or "" | |
| journal = first(rec, "JO", "JF") or "" | |
| year = first(rec, "PY", "Y1") or "" | |
| if isinstance(year, str) and len(year) >= 4: | |
| year = year[:4] | |
| else: | |
| year = "" | |
| authors_list = rec.get("AU", []) or [] | |
| if not isinstance(authors_list, list): | |
| authors_list = [authors_list] | |
| authors = "; ".join([a for a in authors_list if a]) | |
| doi = first(rec, "DO") or "" | |
| pmid = first(rec, "PM") or "" # some RIS exports may carry PMIDs | |
| # URL: prefer DOI if available | |
| url = "" | |
| if doi: | |
| doi_clean = doi.strip() | |
| if doi_clean.lower().startswith("http"): | |
| url = doi_clean | |
| else: | |
| url = f"https://doi.org/{doi_clean}" | |
| row = { | |
| "PMID": pmid, | |
| "Title": title, | |
| "Abstract": abstract, | |
| "Authors": authors, | |
| "FullAuthors": authors, # RIS often doesn’t distinguish full vs initials | |
| "Journal": journal, | |
| "Year": year, | |
| "PublicationDate": year, | |
| "DOI": doi, | |
| "PMCID": "", | |
| "Language": first(rec, "LA") or "", | |
| "PublicationTypes": first(rec, "PT") or "", | |
| "MeSH": "", | |
| "Keywords": "; ".join(rec.get("KW", [])) if rec.get("KW") else "", | |
| "URL": url, | |
| "SourceFormat": "RIS", | |
| } | |
| rows.append(row) | |
| return rows | |
| # ---------- DEDUPLICATION ---------- | |
| def build_dedup_key(row: pd.Series) -> str: | |
| """ | |
| Build a deduplication key: | |
| 1) If DOI present -> doi:<normalized_doi> | |
| 2) Else if PMID present -> pmid:<pmid> | |
| 3) Else -> title_year:<normalized_title>_<year> | |
| """ | |
| doi = (row.get("DOI") or "").strip().lower() | |
| pmid = (row.get("PMID") or "").strip() | |
| title = (row.get("Title") or "").strip().lower() | |
| year = (str(row.get("Year") or "")).strip() | |
| if doi: | |
| # strip URL prefix if any | |
| doi = doi.replace("https://doi.org/", "").replace("http://doi.org/", "").strip() | |
| return f"doi:{doi}" | |
| if pmid: | |
| return f"pmid:{pmid}" | |
| # fallback: normalized title + year | |
| title_norm = re.sub(r"\s+", " ", title) | |
| return f"title_year:{title_norm}_{year}" | |
| # ---------- CORE PROCESSING ---------- | |
| def process_paths( | |
| input_paths: List[Path], output_path: Path | None = None | |
| ) -> pd.DataFrame: | |
| """Parse, normalize and deduplicate references from the specified | |
| files. | |
| * ``input_paths`` is a list of file paths to MEDLINE or RIS exports. | |
| * ``output_path`` if provided will be used to write an Excel file. The | |
| caller may choose to inspect or write the returned :class:`DataFrame` | |
| themselves. | |
| The returned :class:`pandas.DataFrame` contains one row per unique | |
| reference and includes a ``DedupKey`` column used internally. | |
| """ | |
| all_rows: List[Dict[str, Any]] = [] | |
| for path in input_paths: | |
| if not path.exists(): | |
| print(f"Warning: input file not found: {path}", file=sys.stderr) | |
| continue | |
| text = path.read_text(encoding="utf-8", errors="replace") | |
| if path.suffix.lower() in [".txt", ".medline"]: | |
| med_records = parse_medline_text(text) | |
| rows = normalize_medline_records(med_records, path) | |
| all_rows.extend(rows) | |
| elif path.suffix.lower() in [".ris"]: | |
| ris_records = parse_ris_text(text) | |
| rows = normalize_ris_records(ris_records, path) | |
| all_rows.extend(rows) | |
| else: | |
| print(f"Warning: unrecognized file type for {path}, skipping.", file=sys.stderr) | |
| if not all_rows: | |
| # no data; caller can decide what to do | |
| return pd.DataFrame() | |
| df = pd.DataFrame(all_rows) | |
| if "PMID" in df.columns: | |
| df["PMID"] = df["PMID"].astype(str).str.strip() | |
| # build deduplication key column; this and the optional | |
| # SourceFormat/SourceFile columns are used solely for internal logic | |
| # (sorting and deduplication) and will be removed just before the | |
| # DataFrame is returned. | |
| df["DedupKey"] = df.apply(build_dedup_key, axis=1) | |
| # perform deduplication; ordering columns are not strictly required but | |
| # keeping SourceFormat when available makes the output deterministic. We | |
| # don't drop any of the helper columns until after this step. | |
| sort_cols = [c for c in ["SourceFormat"] if c in df.columns] | |
| if sort_cols: | |
| df = df.sort_values(sort_cols) | |
| df = df.drop_duplicates(subset=["DedupKey"], keep="first") | |
| # now that we have final deduplicated results, drop internal columns | |
| # that callers generally don't need | |
| for col in ["SourceFormat", "SourceFile", "DedupKey"]: | |
| if col in df.columns: | |
| df = df.drop(columns=[col]) | |
| if output_path is not None: | |
| with pd.ExcelWriter(output_path, engine="openpyxl") as writer: | |
| df.to_excel(writer, index=False, sheet_name="References") | |
| print(f"Wrote {len(df)} deduplicated records to {output_path}") | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Public API and CLI wrappers | |
| # --------------------------------------------------------------------------- | |
| def parse_references( | |
| input_paths: List[Path], output_path: Path | None = None | |
| ) -> pd.DataFrame: | |
| """Programmatic interface for parsing and deduplicating references. | |
| ``input_paths`` is a list of :class:`Path` objects (or convertible | |
| strings) referring to MEDLINE or RIS export files. ``output_path`` is an | |
| optional destination for the resulting Excel workbook; if ``None`` the | |
| data frame will still be returned but no file will be written. | |
| This function simply delegates to :func:`process_paths` and is intended to | |
| be imported and used by other Python code without invoking the CLI logic. | |
| """ | |
| return process_paths(input_paths, output_path) | |
| def run_cli(argv=None): | |
| """Command-line entry point. | |
| ``argv`` may be provided by callers (e.g. tests); when ``None`` it defaults | |
| to ``sys.argv[1:]``. After parsing arguments this function calls | |
| :func:`parse_references`. | |
| """ | |
| if argv is None: | |
| argv = sys.argv[1:] | |
| if len(argv) < 1: | |
| print( | |
| "Usage:\n" | |
| " python reference_parser.py input_medline.txt [output.xlsx]\n" | |
| " python reference_parser.py scopus.ris [output.xlsx]\n" | |
| " python reference_parser.py merged.xlsx pubmed1.txt scopus.ris …\n", | |
| file=sys.stderr, | |
| ) | |
| raise SystemExit(2) | |
| args = [Path(a) for a in argv] | |
| output_path: Path | |
| input_paths: List[Path] | |
| # Case 1: first argument is an .xlsx -> output first, rest inputs | |
| if args[0].suffix.lower() == ".xlsx": | |
| output_path = args[0] | |
| input_paths = args[1:] | |
| if not input_paths: | |
| print("Error: no input files provided.", file=sys.stderr) | |
| raise SystemExit(2) | |
| else: | |
| # No explicit output as first argument | |
| if len(args) == 1: | |
| input_paths = [args[0]] | |
| output_path = args[0].with_suffix(".xlsx") | |
| elif len(args) == 2 and args[1].suffix.lower() == ".xlsx": | |
| input_paths = [args[0]] | |
| output_path = args[1] | |
| else: | |
| input_paths = args | |
| output_path = Path("merged.xlsx") | |
| # delegate to public API | |
| parse_references(input_paths, output_path) | |
| if __name__ == "__main__": | |
| run_cli() | |