gemeo-twin-stack / src /gemeo /datasus /sim_pull.py
timmers's picture
GEMEO world-model — initial release (module + NeuralSurv ckpt + RareBench v49 + KG embeddings)
089d665 verified
"""DATASUS SIM (mortality) pull pipeline.
For training NeuralSurv on REAL Brazilian rare-disease mortality data.
Pulls DOXX####.dbc for given UFs and years from
ftp://ftp.datasus.gov.br/dissemin/publicos/SIM/CID10/DORES/, parses with
pyreaddbc, filters to rare-disease CIDs, extracts (sex, age, UF,
cause_cid, date_of_death, date_of_birth) tuples.
Output: pandas DataFrame ready for survival analysis.
"""
from __future__ import annotations
import logging
import os
import tempfile
import urllib.request
from datetime import datetime
from pathlib import Path
logger = logging.getLogger("gemeo.datasus.sim")
# Rare disease CID-10 codes (CID-10-BR encoding strips the dot in DBC)
# Maps DBC code → ORPHA code for our seeded diseases
RARE_CIDS_CID10 = {
"G113": "100", # AT (G11.3) — though sometimes G11.3 is generic ataxia
"E752": ["646", "355"], # NPC + Gaucher (E75.2 sphingolipidosis)
"E751": ["355"], # Gaucher specifically
"G710": "98896", # DMD (G71.0)
"G120": "70", # SMA-1 (G12.0)
"E840": "586", "E841": "586", "E848": "586", "E849": "586", # CF
"E760": "579", # MPS I (E76.0)
"E761": "580", # MPS II (E76.1)
"E83.0": "905", "E830": "905", # Wilson
"G11.1": "95", "G111": "95", # Friedreich
"Q874": "558", # Marfan (Q87.4)
"Q850": "636", # NF1 (Q85.0)
"F842": "778", # Rett (F84.2)
"D811": "183660", # SCID (D81.1)
}
# Brazilian UFs
ALL_UFS = ["AC", "AL", "AP", "AM", "BA", "CE", "DF", "ES", "GO", "MA",
"MT", "MS", "MG", "PA", "PB", "PR", "PE", "PI", "RJ", "RN",
"RS", "RO", "RR", "SC", "SP", "SE", "TO"]
def parse_age_idade(idade_str: str) -> float:
"""Parse SIM IDADE field. Format: prefix + value.
0XX = minutes/seconds (newborn)
1XX = hours
2XX = days
3XX = months
4XX = years
5XX = 100+ years (XX is years - 100)
"""
if not idade_str or len(idade_str) < 2:
return None
try:
prefix = int(idade_str[0])
val = int(idade_str[1:].lstrip("0") or "0")
except (ValueError, IndexError):
return None
if prefix == 0: # minutes
return val / 525600
if prefix == 1: # hours
return val / 8760
if prefix == 2: # days
return val / 365.25
if prefix == 3: # months
return val / 12
if prefix == 4: # years
return float(val)
if prefix == 5: # 100+ years
return 100.0 + val
return None
def parse_date_yyyymmdd(date_str: str):
"""Parse SIM date field (DDMMYYYY format)."""
if not date_str or len(date_str) != 8:
return None
try:
return datetime.strptime(date_str, "%d%m%Y").date()
except (ValueError, TypeError):
return None
def parse_sim_record(rec: dict) -> dict | None:
"""Parse a single SIM record into a clean dict."""
cid = (rec.get("CAUSABAS") or "").strip().upper()
if not cid:
return None
# Map to rare-disease ORPHA(s)
matched_orpha = RARE_CIDS_CID10.get(cid) or RARE_CIDS_CID10.get(cid + "0")
if matched_orpha is None:
# Try with dot
for k in (cid[:3] + "." + cid[3:], cid):
if k in RARE_CIDS_CID10:
matched_orpha = RARE_CIDS_CID10[k]
break
if matched_orpha is None:
return None
if isinstance(matched_orpha, list):
matched_orpha = matched_orpha[0]
age_yr = parse_age_idade((rec.get("IDADE") or "").strip())
sex_code = str(rec.get("SEXO") or "").strip()
sex = "M" if sex_code == "1" else ("F" if sex_code == "2" else "?")
uf_code = (rec.get("CODMUNRES") or "").strip()[:2]
return {
"cid": cid,
"orpha": matched_orpha,
"age_at_death_years": age_yr,
"sex": sex,
"uf_code": uf_code,
"date_of_death": parse_date_yyyymmdd((rec.get("DTOBITO") or "").strip()),
"date_of_birth": parse_date_yyyymmdd((rec.get("DTNASC") or "").strip()),
"race": rec.get("RACACOR"),
"education": rec.get("ESC"),
}
def pull_sim(uf: str, year: int, *, cache_dir: str = None,
target_cids: set = None) -> list[dict]:
"""Pull SIM for one UF/year, return parsed records matching target CIDs.
Args:
uf: 2-letter UF code (SP, RJ, MG, etc.)
year: 4-digit year
cache_dir: optional persistent cache; defaults to tempdir
target_cids: set of CID-10 codes (without dot) to filter; if None
uses RARE_CIDS_CID10
Returns:
list of parsed record dicts (each = parse_sim_record output)
"""
import pyreaddbc
from dbfread import DBF
if target_cids is None:
target_cids = set(RARE_CIDS_CID10.keys())
fname = f"DO{uf}{year}.dbc"
url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIM/CID10/DORES/{fname}"
use_persistent = cache_dir is not None
if use_persistent:
os.makedirs(cache_dir, exist_ok=True)
dbc_path = os.path.join(cache_dir, fname)
dbf_path = dbc_path.replace(".dbc", ".dbf")
# Skip download if already cached and non-empty
if os.path.exists(dbf_path) and os.path.getsize(dbf_path) > 1024:
logger.info(f" [{uf}/{year}] cached: {fname}")
elif os.path.exists(dbc_path) and os.path.getsize(dbc_path) > 1024:
pyreaddbc.dbc2dbf(dbc_path, dbf_path)
else:
logger.info(f" [{uf}/{year}] download {url}")
urllib.request.urlretrieve(url, dbc_path)
pyreaddbc.dbc2dbf(dbc_path, dbf_path)
else:
td = tempfile.mkdtemp()
dbc_path = os.path.join(td, fname)
dbf_path = dbc_path.replace(".dbc", ".dbf")
try:
urllib.request.urlretrieve(url, dbc_path)
pyreaddbc.dbc2dbf(dbc_path, dbf_path)
except Exception as e:
logger.warning(f" [{uf}/{year}] download/convert failed: {e}")
return []
out = []
try:
for rec in DBF(dbf_path, encoding="latin-1", load=False):
cid_short = (rec.get("CAUSABAS") or "").strip().upper()
if cid_short not in target_cids:
continue
parsed = parse_sim_record(rec)
if parsed:
parsed["year"] = year
out.append(parsed)
except Exception as e:
logger.warning(f" [{uf}/{year}] parse failed: {e}")
logger.info(f" [{uf}/{year}] matched {len(out)} records")
return out
def pull_sim_multi(ufs: list[str], years: list[int], *,
cache_dir: str = None) -> list[dict]:
"""Pull SIM across multiple UFs and years, aggregate."""
all_records = []
for year in years:
for uf in ufs:
try:
recs = pull_sim(uf, year, cache_dir=cache_dir)
all_records.extend(recs)
except Exception as e:
logger.warning(f" [{uf}/{year}] error: {e}")
return all_records
def survival_distributions(records: list[dict]) -> dict:
"""Compute per-disease survival statistics from SIM records."""
from collections import defaultdict
import statistics
by_orpha = defaultdict(list)
for r in records:
if r.get("age_at_death_years") is not None:
by_orpha[r["orpha"]].append(r["age_at_death_years"])
out = {}
for orpha, ages in by_orpha.items():
if len(ages) < 3:
out[orpha] = {"n": len(ages), "ages": ages, "median": None}
continue
ages_sorted = sorted(ages)
n = len(ages_sorted)
median = statistics.median(ages_sorted)
p25 = ages_sorted[n // 4]
p75 = ages_sorted[3 * n // 4]
iqr = p75 - p25
# Weibull fit (lightweight method-of-moments)
try:
mean = sum(ages_sorted) / n
var = sum((a - mean) ** 2 for a in ages_sorted) / n
cv = (var ** 0.5) / mean if mean > 0 else 1.0
shape = 1.2 / cv if cv > 0 else 1.5
scale = mean / 0.91 # rough approximation
except Exception:
shape = 1.5
scale = median * 1.4
out[orpha] = {
"n": n,
"median": round(median, 2),
"p25": round(p25, 2),
"p75": round(p75, 2),
"iqr": round(iqr, 2),
"min_age": round(min(ages_sorted), 2),
"max_age": round(max(ages_sorted), 2),
"weibull_shape": round(shape, 2),
"weibull_scale": round(scale, 2),
"ages": ages_sorted,
}
return out
if __name__ == "__main__":
import argparse
import json
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")
parser = argparse.ArgumentParser()
parser.add_argument("--ufs", nargs="+", default=["SP"])
parser.add_argument("--years", nargs="+", type=int, default=[2018, 2019, 2020])
parser.add_argument("--cache-dir", default="/tmp/datasus_cache")
parser.add_argument("--out-json", default="/tmp/datasus_survival.json")
args = parser.parse_args()
print(f"Pulling SIM for UFs={args.ufs} years={args.years}")
recs = pull_sim_multi(args.ufs, args.years, cache_dir=args.cache_dir)
print(f"\nTotal rare-CID records: {len(recs)}")
survival = survival_distributions(recs)
print(f"\nPer-disease survival distributions ({len(survival)} diseases):\n")
for orpha, s in sorted(survival.items()):
if s.get("median"):
print(f" ORPHA:{orpha:>6} n={s['n']:>4} median={s['median']:>6}y "
f"IQR=[{s['p25']:.0f}-{s['p75']:.0f}] Weibull(shape={s['weibull_shape']}, scale={s['weibull_scale']})")
else:
print(f" ORPHA:{orpha:>6} n={s['n']:>4} (insufficient for fit)")
# Save
out = {"records_count": len(recs), "ufs": args.ufs, "years": args.years,
"survival": {k: {kk: vv for kk, vv in v.items() if kk != "ages"}
for k, v in survival.items()},
"raw_sample": recs[:50]}
with open(args.out_json, "w") as f:
json.dump(out, f, default=str, indent=2)
print(f"\nSaved → {args.out_json}")