| """DATASUS SIH-RD (hospital admissions) pull pipeline. |
| |
| Each admission record is one event in the patient's clinical timeline, |
| with: primary CID, secondary CIDs, admission date, discharge date, |
| length of stay, primary procedure (SIGTAP), outcome (discharge/death), |
| patient age + sex + UF. |
| |
| For training DT-FM on REAL Brazilian rare-disease event sequences. |
| |
| Pulls RDXX####.dbc from |
| ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/ |
| |
| Note: SIH-RD is monthly (RDxxYYMM), much larger files than SIM. We pull |
| selected months and filter aggressively to rare CIDs. |
| """ |
| from __future__ import annotations |
| import logging |
| import os |
| import tempfile |
| import urllib.request |
| from datetime import datetime |
| from pathlib import Path |
|
|
| logger = logging.getLogger("gemeo.datasus.sih") |
|
|
|
|
| |
| RARE_CIDS_SIH = { |
| |
| "G113": "100", |
| "E752": "646", |
| "E751": "355", |
| "E750": "355", |
| "G710": "98896", |
| "G711": "98896", |
| "G120": "70", |
| "G121": "71", |
| "G122": "83330", |
| "E840": "586", |
| "E841": "586", |
| "E848": "586", |
| "E849": "586", |
| "E760": "579", |
| "E761": "580", |
| "E83.0": "905", |
| "E830": "905", |
| "G111": "95", |
| "Q874": "558", |
| "Q850": "636", |
| "F842": "778", |
| "D811": "183660", |
| } |
|
|
|
|
| def parse_sih_record(rec: dict) -> dict | None: |
| """Parse a single SIH-RD record into a clean event dict.""" |
| cid_princ = (rec.get("DIAG_PRINC") or "").strip().upper() |
| cid_sec = (rec.get("DIAG_SECUN") or "").strip().upper() |
|
|
| matched = None |
| for code in (cid_princ, cid_sec, cid_princ[:3], cid_sec[:3]): |
| if code in RARE_CIDS_SIH: |
| matched = RARE_CIDS_SIH[code] |
| break |
| if matched is None: |
| return None |
|
|
| sex_code = str(rec.get("SEXO") or "").strip() |
| sex = "M" if sex_code in ("1", "M") else ("F" if sex_code in ("3", "F") else "?") |
| age = rec.get("IDADE") |
| cod_idade = str(rec.get("COD_IDADE") or "").strip() |
| age_yrs = None |
| try: |
| age = int(age) if age is not None else None |
| if cod_idade == "4" and age is not None: |
| age_yrs = float(age) |
| elif cod_idade == "3" and age is not None: |
| age_yrs = age / 12 |
| elif cod_idade == "2" and age is not None: |
| age_yrs = age / 365.25 |
| elif cod_idade == "5" and age is not None: |
| age_yrs = 100.0 + age |
| except (ValueError, TypeError): |
| pass |
|
|
| def _date(s): |
| if not s or len(str(s)) < 6: return None |
| s = str(s) |
| try: |
| if len(s) == 8: |
| return datetime.strptime(s, "%Y%m%d").date() |
| except ValueError: |
| pass |
| return None |
|
|
| los = None |
| try: |
| los_raw = rec.get("DIAS_PERM") |
| if los_raw is not None: |
| los = int(los_raw) |
| except (ValueError, TypeError): |
| pass |
|
|
| return { |
| "cid_princ": cid_princ, |
| "cid_sec": cid_sec or None, |
| "orpha": matched, |
| "sex": sex, |
| "age_at_admission_years": age_yrs, |
| "uf_code": (rec.get("UF_ZI") or "")[:2], |
| "admission_date": _date(rec.get("DT_INTER")), |
| "discharge_date": _date(rec.get("DT_SAIDA")), |
| "los_days": los, |
| "primary_procedure": (rec.get("PROC_REA") or "").strip() or None, |
| "death_during_stay": str(rec.get("MORTE") or "").strip() == "1", |
| } |
|
|
|
|
| def pull_sih(uf: str, year: int, month: int, *, cache_dir: str = None, |
| target_cids: set = None) -> list[dict]: |
| """Pull SIH-RD for one UF/year/month.""" |
| import pyreaddbc |
| from dbfread import DBF |
|
|
| if target_cids is None: |
| target_cids = set(RARE_CIDS_SIH.keys()) |
|
|
| fname = f"RD{uf}{str(year)[-2:]}{month:02d}.dbc" |
| url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/{fname}" |
|
|
| use_persistent = cache_dir is not None |
| if use_persistent: |
| os.makedirs(cache_dir, exist_ok=True) |
| dbc_path = os.path.join(cache_dir, fname) |
| dbf_path = dbc_path.replace(".dbc", ".dbf") |
| if os.path.exists(dbf_path) and os.path.getsize(dbf_path) > 1024: |
| pass |
| elif os.path.exists(dbc_path) and os.path.getsize(dbc_path) > 1024: |
| pyreaddbc.dbc2dbf(dbc_path, dbf_path) |
| else: |
| try: |
| logger.info(f" [{uf}/{year}/{month:02d}] download") |
| urllib.request.urlretrieve(url, dbc_path) |
| pyreaddbc.dbc2dbf(dbc_path, dbf_path) |
| except Exception as e: |
| logger.warning(f" [{uf}/{year}/{month:02d}] download failed: {e}") |
| return [] |
| else: |
| td = tempfile.mkdtemp() |
| dbc_path = os.path.join(td, fname) |
| dbf_path = dbc_path.replace(".dbc", ".dbf") |
| try: |
| urllib.request.urlretrieve(url, dbc_path) |
| pyreaddbc.dbc2dbf(dbc_path, dbf_path) |
| except Exception as e: |
| logger.warning(f" [{uf}/{year}/{month:02d}] failed: {e}") |
| return [] |
|
|
| out = [] |
| try: |
| for rec in DBF(dbf_path, encoding="latin-1", load=False): |
| cp = (rec.get("DIAG_PRINC") or "").strip().upper() |
| cs = (rec.get("DIAG_SECUN") or "").strip().upper() |
| if cp not in target_cids and cs not in target_cids \ |
| and cp[:3] not in target_cids and cs[:3] not in target_cids: |
| continue |
| parsed = parse_sih_record(rec) |
| if parsed: |
| parsed["year"] = year |
| parsed["month"] = month |
| out.append(parsed) |
| except Exception as e: |
| logger.warning(f" [{uf}/{year}/{month:02d}] parse failed: {e}") |
| if out: |
| logger.info(f" [{uf}/{year}/{month:02d}] matched {len(out)} admissions") |
| return out |
|
|
|
|
| def pull_sih_multi(ufs: list[str], year_months: list[tuple[int, int]], *, |
| cache_dir: str = None) -> list[dict]: |
| """Pull SIH-RD across multiple UFs and (year, month) pairs.""" |
| all_records = [] |
| for year, month in year_months: |
| for uf in ufs: |
| try: |
| recs = pull_sih(uf, year, month, cache_dir=cache_dir) |
| all_records.extend(recs) |
| except Exception as e: |
| logger.warning(f" [{uf}/{year}/{month}] error: {e}") |
| return all_records |
|
|
|
|
| def build_event_timelines(records: list[dict]) -> dict: |
| """Group SIH records into per-(orpha, sex, age-bucket) event timelines. |
| |
| Returns: {orpha: [{sex, age_bucket, events: [...]}]} |
| Each timeline is a chronological list of (admission_date, cid, los, death) events. |
| """ |
| from collections import defaultdict |
|
|
| |
| |
| by_orpha = defaultdict(list) |
| for r in records: |
| if r.get("age_at_admission_years") is None: |
| continue |
| by_orpha[r["orpha"]].append(r) |
|
|
| timelines = {} |
| for orpha, recs in by_orpha.items(): |
| recs.sort(key=lambda x: x["age_at_admission_years"]) |
| |
| events = [] |
| for r in recs: |
| ev = [] |
| ev.append(f"admission cid:{r['cid_princ']}") |
| if r.get("los_days") is not None: |
| ev.append(f"los {r['los_days']}d") |
| if r.get("primary_procedure"): |
| ev.append(f"proc:{r['primary_procedure']}") |
| if r.get("death_during_stay"): |
| ev.append("outcome death") |
| events.append({ |
| "age": r["age_at_admission_years"], |
| "tokens": ev, |
| "sex": r["sex"], |
| "uf": r["uf_code"], |
| }) |
| timelines[orpha] = events |
| return timelines |
|
|
|
|
| if __name__ == "__main__": |
| import argparse |
| import json |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s") |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--ufs", nargs="+", default=["SP"]) |
| parser.add_argument("--year", type=int, default=2019) |
| parser.add_argument("--months", nargs="+", type=int, default=list(range(1, 13))) |
| parser.add_argument("--cache-dir", default="/tmp/datasus_cache") |
| parser.add_argument("--out-json", default="/tmp/datasus_sih_timelines.json") |
| args = parser.parse_args() |
|
|
| year_months = [(args.year, m) for m in args.months] |
| print(f"Pulling SIH-RD for UFs={args.ufs} year={args.year} months={args.months}") |
| recs = pull_sih_multi(args.ufs, year_months, cache_dir=args.cache_dir) |
| print(f"\nTotal SIH rare-CID admissions: {len(recs)}") |
|
|
| timelines = build_event_timelines(recs) |
| print(f"\nEvent timelines per disease:") |
| for orpha, evs in sorted(timelines.items()): |
| print(f" ORPHA:{orpha:>6} {len(evs):>4} events age range [{min(e['age'] for e in evs):.1f}, {max(e['age'] for e in evs):.1f}]y") |
|
|
| out = { |
| "n_admissions": len(recs), |
| "ufs": args.ufs, |
| "year": args.year, |
| "months": args.months, |
| "timelines": {k: v for k, v in timelines.items()}, |
| } |
| with open(args.out_json, "w") as f: |
| json.dump(out, f, default=str, indent=2) |
| print(f"\nSaved → {args.out_json}") |
|
|