"""DATASUS SIM (mortality) pull pipeline. For training NeuralSurv on REAL Brazilian rare-disease mortality data. Pulls DOXX####.dbc for given UFs and years from ftp://ftp.datasus.gov.br/dissemin/publicos/SIM/CID10/DORES/, parses with pyreaddbc, filters to rare-disease CIDs, extracts (sex, age, UF, cause_cid, date_of_death, date_of_birth) tuples. Output: pandas DataFrame ready for survival analysis. """ from __future__ import annotations import logging import os import tempfile import urllib.request from datetime import datetime from pathlib import Path logger = logging.getLogger("gemeo.datasus.sim") # Rare disease CID-10 codes (CID-10-BR encoding strips the dot in DBC) # Maps DBC code → ORPHA code for our seeded diseases RARE_CIDS_CID10 = { "G113": "100", # AT (G11.3) — though sometimes G11.3 is generic ataxia "E752": ["646", "355"], # NPC + Gaucher (E75.2 sphingolipidosis) "E751": ["355"], # Gaucher specifically "G710": "98896", # DMD (G71.0) "G120": "70", # SMA-1 (G12.0) "E840": "586", "E841": "586", "E848": "586", "E849": "586", # CF "E760": "579", # MPS I (E76.0) "E761": "580", # MPS II (E76.1) "E83.0": "905", "E830": "905", # Wilson "G11.1": "95", "G111": "95", # Friedreich "Q874": "558", # Marfan (Q87.4) "Q850": "636", # NF1 (Q85.0) "F842": "778", # Rett (F84.2) "D811": "183660", # SCID (D81.1) } # Brazilian UFs ALL_UFS = ["AC", "AL", "AP", "AM", "BA", "CE", "DF", "ES", "GO", "MA", "MT", "MS", "MG", "PA", "PB", "PR", "PE", "PI", "RJ", "RN", "RS", "RO", "RR", "SC", "SP", "SE", "TO"] def parse_age_idade(idade_str: str) -> float: """Parse SIM IDADE field. Format: prefix + value. 0XX = minutes/seconds (newborn) 1XX = hours 2XX = days 3XX = months 4XX = years 5XX = 100+ years (XX is years - 100) """ if not idade_str or len(idade_str) < 2: return None try: prefix = int(idade_str[0]) val = int(idade_str[1:].lstrip("0") or "0") except (ValueError, IndexError): return None if prefix == 0: # minutes return val / 525600 if prefix == 1: # hours return val / 8760 if prefix == 2: # days return val / 365.25 if prefix == 3: # months return val / 12 if prefix == 4: # years return float(val) if prefix == 5: # 100+ years return 100.0 + val return None def parse_date_yyyymmdd(date_str: str): """Parse SIM date field (DDMMYYYY format).""" if not date_str or len(date_str) != 8: return None try: return datetime.strptime(date_str, "%d%m%Y").date() except (ValueError, TypeError): return None def parse_sim_record(rec: dict) -> dict | None: """Parse a single SIM record into a clean dict.""" cid = (rec.get("CAUSABAS") or "").strip().upper() if not cid: return None # Map to rare-disease ORPHA(s) matched_orpha = RARE_CIDS_CID10.get(cid) or RARE_CIDS_CID10.get(cid + "0") if matched_orpha is None: # Try with dot for k in (cid[:3] + "." + cid[3:], cid): if k in RARE_CIDS_CID10: matched_orpha = RARE_CIDS_CID10[k] break if matched_orpha is None: return None if isinstance(matched_orpha, list): matched_orpha = matched_orpha[0] age_yr = parse_age_idade((rec.get("IDADE") or "").strip()) sex_code = str(rec.get("SEXO") or "").strip() sex = "M" if sex_code == "1" else ("F" if sex_code == "2" else "?") uf_code = (rec.get("CODMUNRES") or "").strip()[:2] return { "cid": cid, "orpha": matched_orpha, "age_at_death_years": age_yr, "sex": sex, "uf_code": uf_code, "date_of_death": parse_date_yyyymmdd((rec.get("DTOBITO") or "").strip()), "date_of_birth": parse_date_yyyymmdd((rec.get("DTNASC") or "").strip()), "race": rec.get("RACACOR"), "education": rec.get("ESC"), } def pull_sim(uf: str, year: int, *, cache_dir: str = None, target_cids: set = None) -> list[dict]: """Pull SIM for one UF/year, return parsed records matching target CIDs. Args: uf: 2-letter UF code (SP, RJ, MG, etc.) year: 4-digit year cache_dir: optional persistent cache; defaults to tempdir target_cids: set of CID-10 codes (without dot) to filter; if None uses RARE_CIDS_CID10 Returns: list of parsed record dicts (each = parse_sim_record output) """ import pyreaddbc from dbfread import DBF if target_cids is None: target_cids = set(RARE_CIDS_CID10.keys()) fname = f"DO{uf}{year}.dbc" url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIM/CID10/DORES/{fname}" use_persistent = cache_dir is not None if use_persistent: os.makedirs(cache_dir, exist_ok=True) dbc_path = os.path.join(cache_dir, fname) dbf_path = dbc_path.replace(".dbc", ".dbf") # Skip download if already cached and non-empty if os.path.exists(dbf_path) and os.path.getsize(dbf_path) > 1024: logger.info(f" [{uf}/{year}] cached: {fname}") elif os.path.exists(dbc_path) and os.path.getsize(dbc_path) > 1024: pyreaddbc.dbc2dbf(dbc_path, dbf_path) else: logger.info(f" [{uf}/{year}] download {url}") urllib.request.urlretrieve(url, dbc_path) pyreaddbc.dbc2dbf(dbc_path, dbf_path) else: td = tempfile.mkdtemp() dbc_path = os.path.join(td, fname) dbf_path = dbc_path.replace(".dbc", ".dbf") try: urllib.request.urlretrieve(url, dbc_path) pyreaddbc.dbc2dbf(dbc_path, dbf_path) except Exception as e: logger.warning(f" [{uf}/{year}] download/convert failed: {e}") return [] out = [] try: for rec in DBF(dbf_path, encoding="latin-1", load=False): cid_short = (rec.get("CAUSABAS") or "").strip().upper() if cid_short not in target_cids: continue parsed = parse_sim_record(rec) if parsed: parsed["year"] = year out.append(parsed) except Exception as e: logger.warning(f" [{uf}/{year}] parse failed: {e}") logger.info(f" [{uf}/{year}] matched {len(out)} records") return out def pull_sim_multi(ufs: list[str], years: list[int], *, cache_dir: str = None) -> list[dict]: """Pull SIM across multiple UFs and years, aggregate.""" all_records = [] for year in years: for uf in ufs: try: recs = pull_sim(uf, year, cache_dir=cache_dir) all_records.extend(recs) except Exception as e: logger.warning(f" [{uf}/{year}] error: {e}") return all_records def survival_distributions(records: list[dict]) -> dict: """Compute per-disease survival statistics from SIM records.""" from collections import defaultdict import statistics by_orpha = defaultdict(list) for r in records: if r.get("age_at_death_years") is not None: by_orpha[r["orpha"]].append(r["age_at_death_years"]) out = {} for orpha, ages in by_orpha.items(): if len(ages) < 3: out[orpha] = {"n": len(ages), "ages": ages, "median": None} continue ages_sorted = sorted(ages) n = len(ages_sorted) median = statistics.median(ages_sorted) p25 = ages_sorted[n // 4] p75 = ages_sorted[3 * n // 4] iqr = p75 - p25 # Weibull fit (lightweight method-of-moments) try: mean = sum(ages_sorted) / n var = sum((a - mean) ** 2 for a in ages_sorted) / n cv = (var ** 0.5) / mean if mean > 0 else 1.0 shape = 1.2 / cv if cv > 0 else 1.5 scale = mean / 0.91 # rough approximation except Exception: shape = 1.5 scale = median * 1.4 out[orpha] = { "n": n, "median": round(median, 2), "p25": round(p25, 2), "p75": round(p75, 2), "iqr": round(iqr, 2), "min_age": round(min(ages_sorted), 2), "max_age": round(max(ages_sorted), 2), "weibull_shape": round(shape, 2), "weibull_scale": round(scale, 2), "ages": ages_sorted, } return out if __name__ == "__main__": import argparse import json logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s") parser = argparse.ArgumentParser() parser.add_argument("--ufs", nargs="+", default=["SP"]) parser.add_argument("--years", nargs="+", type=int, default=[2018, 2019, 2020]) parser.add_argument("--cache-dir", default="/tmp/datasus_cache") parser.add_argument("--out-json", default="/tmp/datasus_survival.json") args = parser.parse_args() print(f"Pulling SIM for UFs={args.ufs} years={args.years}") recs = pull_sim_multi(args.ufs, args.years, cache_dir=args.cache_dir) print(f"\nTotal rare-CID records: {len(recs)}") survival = survival_distributions(recs) print(f"\nPer-disease survival distributions ({len(survival)} diseases):\n") for orpha, s in sorted(survival.items()): if s.get("median"): print(f" ORPHA:{orpha:>6} n={s['n']:>4} median={s['median']:>6}y " f"IQR=[{s['p25']:.0f}-{s['p75']:.0f}] Weibull(shape={s['weibull_shape']}, scale={s['weibull_scale']})") else: print(f" ORPHA:{orpha:>6} n={s['n']:>4} (insufficient for fit)") # Save out = {"records_count": len(recs), "ufs": args.ufs, "years": args.years, "survival": {k: {kk: vv for kk, vv in v.items() if kk != "ages"} for k, v in survival.items()}, "raw_sample": recs[:50]} with open(args.out_json, "w") as f: json.dump(out, f, default=str, indent=2) print(f"\nSaved → {args.out_json}")