"""DATASUS SIH-RD (hospital admissions) pull pipeline. Each admission record is one event in the patient's clinical timeline, with: primary CID, secondary CIDs, admission date, discharge date, length of stay, primary procedure (SIGTAP), outcome (discharge/death), patient age + sex + UF. For training DT-FM on REAL Brazilian rare-disease event sequences. Pulls RDXX####.dbc from ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/ Note: SIH-RD is monthly (RDxxYYMM), much larger files than SIM. We pull selected months and filter aggressively to rare CIDs. """ from __future__ import annotations import logging import os import tempfile import urllib.request from datetime import datetime from pathlib import Path logger = logging.getLogger("gemeo.datasus.sih") # SIH-RD CID-10 codes (no dot in DBC) for rare diseases RARE_CIDS_SIH = { # Format: SIH stores CID-10 as e.g. "G113" not "G11.3" "G113": "100", # AT "E752": "646", # NPC / Gaucher cohort "E751": "355", # Gaucher specifically (some encodings) "E750": "355", # Gaucher subtype "G710": "98896", # DMD "G711": "98896", # DMD other "G120": "70", # SMA-1 "G121": "71", # SMA-2 "G122": "83330", # SMA-3 "E840": "586", # CF lung "E841": "586", # CF other "E848": "586", # CF combined "E849": "586", # CF unspecified "E760": "579", # MPS I "E761": "580", # MPS II "E83.0": "905", # Wilson "E830": "905", # Wilson "G111": "95", # Friedreich "Q874": "558", # Marfan "Q850": "636", # NF1 "F842": "778", # Rett "D811": "183660", # SCID } def parse_sih_record(rec: dict) -> dict | None: """Parse a single SIH-RD record into a clean event dict.""" cid_princ = (rec.get("DIAG_PRINC") or "").strip().upper() cid_sec = (rec.get("DIAG_SECUN") or "").strip().upper() matched = None for code in (cid_princ, cid_sec, cid_princ[:3], cid_sec[:3]): if code in RARE_CIDS_SIH: matched = RARE_CIDS_SIH[code] break if matched is None: return None sex_code = str(rec.get("SEXO") or "").strip() sex = "M" if sex_code in ("1", "M") else ("F" if sex_code in ("3", "F") else "?") age = rec.get("IDADE") cod_idade = str(rec.get("COD_IDADE") or "").strip() age_yrs = None try: age = int(age) if age is not None else None if cod_idade == "4" and age is not None: age_yrs = float(age) elif cod_idade == "3" and age is not None: age_yrs = age / 12 elif cod_idade == "2" and age is not None: age_yrs = age / 365.25 elif cod_idade == "5" and age is not None: age_yrs = 100.0 + age except (ValueError, TypeError): pass def _date(s): if not s or len(str(s)) < 6: return None s = str(s) try: if len(s) == 8: return datetime.strptime(s, "%Y%m%d").date() except ValueError: pass return None los = None try: los_raw = rec.get("DIAS_PERM") if los_raw is not None: los = int(los_raw) except (ValueError, TypeError): pass return { "cid_princ": cid_princ, "cid_sec": cid_sec or None, "orpha": matched, "sex": sex, "age_at_admission_years": age_yrs, "uf_code": (rec.get("UF_ZI") or "")[:2], "admission_date": _date(rec.get("DT_INTER")), "discharge_date": _date(rec.get("DT_SAIDA")), "los_days": los, "primary_procedure": (rec.get("PROC_REA") or "").strip() or None, "death_during_stay": str(rec.get("MORTE") or "").strip() == "1", } def pull_sih(uf: str, year: int, month: int, *, cache_dir: str = None, target_cids: set = None) -> list[dict]: """Pull SIH-RD for one UF/year/month.""" import pyreaddbc from dbfread import DBF if target_cids is None: target_cids = set(RARE_CIDS_SIH.keys()) fname = f"RD{uf}{str(year)[-2:]}{month:02d}.dbc" url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/{fname}" use_persistent = cache_dir is not None if use_persistent: os.makedirs(cache_dir, exist_ok=True) dbc_path = os.path.join(cache_dir, fname) dbf_path = dbc_path.replace(".dbc", ".dbf") if os.path.exists(dbf_path) and os.path.getsize(dbf_path) > 1024: pass elif os.path.exists(dbc_path) and os.path.getsize(dbc_path) > 1024: pyreaddbc.dbc2dbf(dbc_path, dbf_path) else: try: logger.info(f" [{uf}/{year}/{month:02d}] download") urllib.request.urlretrieve(url, dbc_path) pyreaddbc.dbc2dbf(dbc_path, dbf_path) except Exception as e: logger.warning(f" [{uf}/{year}/{month:02d}] download failed: {e}") return [] else: td = tempfile.mkdtemp() dbc_path = os.path.join(td, fname) dbf_path = dbc_path.replace(".dbc", ".dbf") try: urllib.request.urlretrieve(url, dbc_path) pyreaddbc.dbc2dbf(dbc_path, dbf_path) except Exception as e: logger.warning(f" [{uf}/{year}/{month:02d}] failed: {e}") return [] out = [] try: for rec in DBF(dbf_path, encoding="latin-1", load=False): cp = (rec.get("DIAG_PRINC") or "").strip().upper() cs = (rec.get("DIAG_SECUN") or "").strip().upper() if cp not in target_cids and cs not in target_cids \ and cp[:3] not in target_cids and cs[:3] not in target_cids: continue parsed = parse_sih_record(rec) if parsed: parsed["year"] = year parsed["month"] = month out.append(parsed) except Exception as e: logger.warning(f" [{uf}/{year}/{month:02d}] parse failed: {e}") if out: logger.info(f" [{uf}/{year}/{month:02d}] matched {len(out)} admissions") return out def pull_sih_multi(ufs: list[str], year_months: list[tuple[int, int]], *, cache_dir: str = None) -> list[dict]: """Pull SIH-RD across multiple UFs and (year, month) pairs.""" all_records = [] for year, month in year_months: for uf in ufs: try: recs = pull_sih(uf, year, month, cache_dir=cache_dir) all_records.extend(recs) except Exception as e: logger.warning(f" [{uf}/{year}/{month}] error: {e}") return all_records def build_event_timelines(records: list[dict]) -> dict: """Group SIH records into per-(orpha, sex, age-bucket) event timelines. Returns: {orpha: [{sex, age_bucket, events: [...]}]} Each timeline is a chronological list of (admission_date, cid, los, death) events. """ from collections import defaultdict # Since SIH-RD doesn't link patients (anonymized), we build SYNTHETIC # per-disease chronological event chains by sorting admissions by age. by_orpha = defaultdict(list) for r in records: if r.get("age_at_admission_years") is None: continue by_orpha[r["orpha"]].append(r) timelines = {} for orpha, recs in by_orpha.items(): recs.sort(key=lambda x: x["age_at_admission_years"]) # Build event sequence at population level events = [] for r in recs: ev = [] ev.append(f"admission cid:{r['cid_princ']}") if r.get("los_days") is not None: ev.append(f"los {r['los_days']}d") if r.get("primary_procedure"): ev.append(f"proc:{r['primary_procedure']}") if r.get("death_during_stay"): ev.append("outcome death") events.append({ "age": r["age_at_admission_years"], "tokens": ev, "sex": r["sex"], "uf": r["uf_code"], }) timelines[orpha] = events return timelines if __name__ == "__main__": import argparse import json logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s") parser = argparse.ArgumentParser() parser.add_argument("--ufs", nargs="+", default=["SP"]) parser.add_argument("--year", type=int, default=2019) parser.add_argument("--months", nargs="+", type=int, default=list(range(1, 13))) parser.add_argument("--cache-dir", default="/tmp/datasus_cache") parser.add_argument("--out-json", default="/tmp/datasus_sih_timelines.json") args = parser.parse_args() year_months = [(args.year, m) for m in args.months] print(f"Pulling SIH-RD for UFs={args.ufs} year={args.year} months={args.months}") recs = pull_sih_multi(args.ufs, year_months, cache_dir=args.cache_dir) print(f"\nTotal SIH rare-CID admissions: {len(recs)}") timelines = build_event_timelines(recs) print(f"\nEvent timelines per disease:") for orpha, evs in sorted(timelines.items()): print(f" ORPHA:{orpha:>6} {len(evs):>4} events age range [{min(e['age'] for e in evs):.1f}, {max(e['age'] for e in evs):.1f}]y") out = { "n_admissions": len(recs), "ufs": args.ufs, "year": args.year, "months": args.months, "timelines": {k: v for k, v in timelines.items()}, } with open(args.out_json, "w") as f: json.dump(out, f, default=str, indent=2) print(f"\nSaved → {args.out_json}")