gemeo-twin-stack / src /gemeo /datasus /sih_pull.py
timmers's picture
GEMEO world-model — initial release (module + NeuralSurv ckpt + RareBench v49 + KG embeddings)
089d665 verified
"""DATASUS SIH-RD (hospital admissions) pull pipeline.
Each admission record is one event in the patient's clinical timeline,
with: primary CID, secondary CIDs, admission date, discharge date,
length of stay, primary procedure (SIGTAP), outcome (discharge/death),
patient age + sex + UF.
For training DT-FM on REAL Brazilian rare-disease event sequences.
Pulls RDXX####.dbc from
ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/
Note: SIH-RD is monthly (RDxxYYMM), much larger files than SIM. We pull
selected months and filter aggressively to rare CIDs.
"""
from __future__ import annotations
import logging
import os
import tempfile
import urllib.request
from datetime import datetime
from pathlib import Path
logger = logging.getLogger("gemeo.datasus.sih")
# SIH-RD CID-10 codes (no dot in DBC) for rare diseases
RARE_CIDS_SIH = {
# Format: SIH stores CID-10 as e.g. "G113" not "G11.3"
"G113": "100", # AT
"E752": "646", # NPC / Gaucher cohort
"E751": "355", # Gaucher specifically (some encodings)
"E750": "355", # Gaucher subtype
"G710": "98896", # DMD
"G711": "98896", # DMD other
"G120": "70", # SMA-1
"G121": "71", # SMA-2
"G122": "83330", # SMA-3
"E840": "586", # CF lung
"E841": "586", # CF other
"E848": "586", # CF combined
"E849": "586", # CF unspecified
"E760": "579", # MPS I
"E761": "580", # MPS II
"E83.0": "905", # Wilson
"E830": "905", # Wilson
"G111": "95", # Friedreich
"Q874": "558", # Marfan
"Q850": "636", # NF1
"F842": "778", # Rett
"D811": "183660", # SCID
}
def parse_sih_record(rec: dict) -> dict | None:
"""Parse a single SIH-RD record into a clean event dict."""
cid_princ = (rec.get("DIAG_PRINC") or "").strip().upper()
cid_sec = (rec.get("DIAG_SECUN") or "").strip().upper()
matched = None
for code in (cid_princ, cid_sec, cid_princ[:3], cid_sec[:3]):
if code in RARE_CIDS_SIH:
matched = RARE_CIDS_SIH[code]
break
if matched is None:
return None
sex_code = str(rec.get("SEXO") or "").strip()
sex = "M" if sex_code in ("1", "M") else ("F" if sex_code in ("3", "F") else "?")
age = rec.get("IDADE")
cod_idade = str(rec.get("COD_IDADE") or "").strip()
age_yrs = None
try:
age = int(age) if age is not None else None
if cod_idade == "4" and age is not None:
age_yrs = float(age)
elif cod_idade == "3" and age is not None:
age_yrs = age / 12
elif cod_idade == "2" and age is not None:
age_yrs = age / 365.25
elif cod_idade == "5" and age is not None:
age_yrs = 100.0 + age
except (ValueError, TypeError):
pass
def _date(s):
if not s or len(str(s)) < 6: return None
s = str(s)
try:
if len(s) == 8:
return datetime.strptime(s, "%Y%m%d").date()
except ValueError:
pass
return None
los = None
try:
los_raw = rec.get("DIAS_PERM")
if los_raw is not None:
los = int(los_raw)
except (ValueError, TypeError):
pass
return {
"cid_princ": cid_princ,
"cid_sec": cid_sec or None,
"orpha": matched,
"sex": sex,
"age_at_admission_years": age_yrs,
"uf_code": (rec.get("UF_ZI") or "")[:2],
"admission_date": _date(rec.get("DT_INTER")),
"discharge_date": _date(rec.get("DT_SAIDA")),
"los_days": los,
"primary_procedure": (rec.get("PROC_REA") or "").strip() or None,
"death_during_stay": str(rec.get("MORTE") or "").strip() == "1",
}
def pull_sih(uf: str, year: int, month: int, *, cache_dir: str = None,
target_cids: set = None) -> list[dict]:
"""Pull SIH-RD for one UF/year/month."""
import pyreaddbc
from dbfread import DBF
if target_cids is None:
target_cids = set(RARE_CIDS_SIH.keys())
fname = f"RD{uf}{str(year)[-2:]}{month:02d}.dbc"
url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIHSUS/200801_/Dados/{fname}"
use_persistent = cache_dir is not None
if use_persistent:
os.makedirs(cache_dir, exist_ok=True)
dbc_path = os.path.join(cache_dir, fname)
dbf_path = dbc_path.replace(".dbc", ".dbf")
if os.path.exists(dbf_path) and os.path.getsize(dbf_path) > 1024:
pass
elif os.path.exists(dbc_path) and os.path.getsize(dbc_path) > 1024:
pyreaddbc.dbc2dbf(dbc_path, dbf_path)
else:
try:
logger.info(f" [{uf}/{year}/{month:02d}] download")
urllib.request.urlretrieve(url, dbc_path)
pyreaddbc.dbc2dbf(dbc_path, dbf_path)
except Exception as e:
logger.warning(f" [{uf}/{year}/{month:02d}] download failed: {e}")
return []
else:
td = tempfile.mkdtemp()
dbc_path = os.path.join(td, fname)
dbf_path = dbc_path.replace(".dbc", ".dbf")
try:
urllib.request.urlretrieve(url, dbc_path)
pyreaddbc.dbc2dbf(dbc_path, dbf_path)
except Exception as e:
logger.warning(f" [{uf}/{year}/{month:02d}] failed: {e}")
return []
out = []
try:
for rec in DBF(dbf_path, encoding="latin-1", load=False):
cp = (rec.get("DIAG_PRINC") or "").strip().upper()
cs = (rec.get("DIAG_SECUN") or "").strip().upper()
if cp not in target_cids and cs not in target_cids \
and cp[:3] not in target_cids and cs[:3] not in target_cids:
continue
parsed = parse_sih_record(rec)
if parsed:
parsed["year"] = year
parsed["month"] = month
out.append(parsed)
except Exception as e:
logger.warning(f" [{uf}/{year}/{month:02d}] parse failed: {e}")
if out:
logger.info(f" [{uf}/{year}/{month:02d}] matched {len(out)} admissions")
return out
def pull_sih_multi(ufs: list[str], year_months: list[tuple[int, int]], *,
cache_dir: str = None) -> list[dict]:
"""Pull SIH-RD across multiple UFs and (year, month) pairs."""
all_records = []
for year, month in year_months:
for uf in ufs:
try:
recs = pull_sih(uf, year, month, cache_dir=cache_dir)
all_records.extend(recs)
except Exception as e:
logger.warning(f" [{uf}/{year}/{month}] error: {e}")
return all_records
def build_event_timelines(records: list[dict]) -> dict:
"""Group SIH records into per-(orpha, sex, age-bucket) event timelines.
Returns: {orpha: [{sex, age_bucket, events: [...]}]}
Each timeline is a chronological list of (admission_date, cid, los, death) events.
"""
from collections import defaultdict
# Since SIH-RD doesn't link patients (anonymized), we build SYNTHETIC
# per-disease chronological event chains by sorting admissions by age.
by_orpha = defaultdict(list)
for r in records:
if r.get("age_at_admission_years") is None:
continue
by_orpha[r["orpha"]].append(r)
timelines = {}
for orpha, recs in by_orpha.items():
recs.sort(key=lambda x: x["age_at_admission_years"])
# Build event sequence at population level
events = []
for r in recs:
ev = []
ev.append(f"admission cid:{r['cid_princ']}")
if r.get("los_days") is not None:
ev.append(f"los {r['los_days']}d")
if r.get("primary_procedure"):
ev.append(f"proc:{r['primary_procedure']}")
if r.get("death_during_stay"):
ev.append("outcome death")
events.append({
"age": r["age_at_admission_years"],
"tokens": ev,
"sex": r["sex"],
"uf": r["uf_code"],
})
timelines[orpha] = events
return timelines
if __name__ == "__main__":
import argparse
import json
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")
parser = argparse.ArgumentParser()
parser.add_argument("--ufs", nargs="+", default=["SP"])
parser.add_argument("--year", type=int, default=2019)
parser.add_argument("--months", nargs="+", type=int, default=list(range(1, 13)))
parser.add_argument("--cache-dir", default="/tmp/datasus_cache")
parser.add_argument("--out-json", default="/tmp/datasus_sih_timelines.json")
args = parser.parse_args()
year_months = [(args.year, m) for m in args.months]
print(f"Pulling SIH-RD for UFs={args.ufs} year={args.year} months={args.months}")
recs = pull_sih_multi(args.ufs, year_months, cache_dir=args.cache_dir)
print(f"\nTotal SIH rare-CID admissions: {len(recs)}")
timelines = build_event_timelines(recs)
print(f"\nEvent timelines per disease:")
for orpha, evs in sorted(timelines.items()):
print(f" ORPHA:{orpha:>6} {len(evs):>4} events age range [{min(e['age'] for e in evs):.1f}, {max(e['age'] for e in evs):.1f}]y")
out = {
"n_admissions": len(recs),
"ufs": args.ufs,
"year": args.year,
"months": args.months,
"timelines": {k: v for k, v in timelines.items()},
}
with open(args.out_json, "w") as f:
json.dump(out, f, default=str, indent=2)
print(f"\nSaved → {args.out_json}")