| """DATASUS SIM (mortality) pull pipeline. |
| |
| For training NeuralSurv on REAL Brazilian rare-disease mortality data. |
| |
| Pulls DOXX####.dbc for given UFs and years from |
| ftp://ftp.datasus.gov.br/dissemin/publicos/SIM/CID10/DORES/, parses with |
| pyreaddbc, filters to rare-disease CIDs, extracts (sex, age, UF, |
| cause_cid, date_of_death, date_of_birth) tuples. |
| |
| Output: pandas DataFrame ready for survival analysis. |
| """ |
| from __future__ import annotations |
| import logging |
| import os |
| import tempfile |
| import urllib.request |
| from datetime import datetime |
| from pathlib import Path |
|
|
| logger = logging.getLogger("gemeo.datasus.sim") |
|
|
|
|
| |
| |
| RARE_CIDS_CID10 = { |
| "G113": "100", |
| "E752": ["646", "355"], |
| "E751": ["355"], |
| "G710": "98896", |
| "G120": "70", |
| "E840": "586", "E841": "586", "E848": "586", "E849": "586", |
| "E760": "579", |
| "E761": "580", |
| "E83.0": "905", "E830": "905", |
| "G11.1": "95", "G111": "95", |
| "Q874": "558", |
| "Q850": "636", |
| "F842": "778", |
| "D811": "183660", |
| } |
|
|
| |
| ALL_UFS = ["AC", "AL", "AP", "AM", "BA", "CE", "DF", "ES", "GO", "MA", |
| "MT", "MS", "MG", "PA", "PB", "PR", "PE", "PI", "RJ", "RN", |
| "RS", "RO", "RR", "SC", "SP", "SE", "TO"] |
|
|
|
|
| def parse_age_idade(idade_str: str) -> float: |
| """Parse SIM IDADE field. Format: prefix + value. |
| 0XX = minutes/seconds (newborn) |
| 1XX = hours |
| 2XX = days |
| 3XX = months |
| 4XX = years |
| 5XX = 100+ years (XX is years - 100) |
| """ |
| if not idade_str or len(idade_str) < 2: |
| return None |
| try: |
| prefix = int(idade_str[0]) |
| val = int(idade_str[1:].lstrip("0") or "0") |
| except (ValueError, IndexError): |
| return None |
| if prefix == 0: |
| return val / 525600 |
| if prefix == 1: |
| return val / 8760 |
| if prefix == 2: |
| return val / 365.25 |
| if prefix == 3: |
| return val / 12 |
| if prefix == 4: |
| return float(val) |
| if prefix == 5: |
| return 100.0 + val |
| return None |
|
|
|
|
| def parse_date_yyyymmdd(date_str: str): |
| """Parse SIM date field (DDMMYYYY format).""" |
| if not date_str or len(date_str) != 8: |
| return None |
| try: |
| return datetime.strptime(date_str, "%d%m%Y").date() |
| except (ValueError, TypeError): |
| return None |
|
|
|
|
| def parse_sim_record(rec: dict) -> dict | None: |
| """Parse a single SIM record into a clean dict.""" |
| cid = (rec.get("CAUSABAS") or "").strip().upper() |
| if not cid: |
| return None |
|
|
| |
| matched_orpha = RARE_CIDS_CID10.get(cid) or RARE_CIDS_CID10.get(cid + "0") |
| if matched_orpha is None: |
| |
| for k in (cid[:3] + "." + cid[3:], cid): |
| if k in RARE_CIDS_CID10: |
| matched_orpha = RARE_CIDS_CID10[k] |
| break |
| if matched_orpha is None: |
| return None |
| if isinstance(matched_orpha, list): |
| matched_orpha = matched_orpha[0] |
|
|
| age_yr = parse_age_idade((rec.get("IDADE") or "").strip()) |
| sex_code = str(rec.get("SEXO") or "").strip() |
| sex = "M" if sex_code == "1" else ("F" if sex_code == "2" else "?") |
| uf_code = (rec.get("CODMUNRES") or "").strip()[:2] |
|
|
| return { |
| "cid": cid, |
| "orpha": matched_orpha, |
| "age_at_death_years": age_yr, |
| "sex": sex, |
| "uf_code": uf_code, |
| "date_of_death": parse_date_yyyymmdd((rec.get("DTOBITO") or "").strip()), |
| "date_of_birth": parse_date_yyyymmdd((rec.get("DTNASC") or "").strip()), |
| "race": rec.get("RACACOR"), |
| "education": rec.get("ESC"), |
| } |
|
|
|
|
| def pull_sim(uf: str, year: int, *, cache_dir: str = None, |
| target_cids: set = None) -> list[dict]: |
| """Pull SIM for one UF/year, return parsed records matching target CIDs. |
| |
| Args: |
| uf: 2-letter UF code (SP, RJ, MG, etc.) |
| year: 4-digit year |
| cache_dir: optional persistent cache; defaults to tempdir |
| target_cids: set of CID-10 codes (without dot) to filter; if None |
| uses RARE_CIDS_CID10 |
| Returns: |
| list of parsed record dicts (each = parse_sim_record output) |
| """ |
| import pyreaddbc |
| from dbfread import DBF |
|
|
| if target_cids is None: |
| target_cids = set(RARE_CIDS_CID10.keys()) |
|
|
| fname = f"DO{uf}{year}.dbc" |
| url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIM/CID10/DORES/{fname}" |
|
|
| use_persistent = cache_dir is not None |
| if use_persistent: |
| os.makedirs(cache_dir, exist_ok=True) |
| dbc_path = os.path.join(cache_dir, fname) |
| dbf_path = dbc_path.replace(".dbc", ".dbf") |
| |
| if os.path.exists(dbf_path) and os.path.getsize(dbf_path) > 1024: |
| logger.info(f" [{uf}/{year}] cached: {fname}") |
| elif os.path.exists(dbc_path) and os.path.getsize(dbc_path) > 1024: |
| pyreaddbc.dbc2dbf(dbc_path, dbf_path) |
| else: |
| logger.info(f" [{uf}/{year}] download {url}") |
| urllib.request.urlretrieve(url, dbc_path) |
| pyreaddbc.dbc2dbf(dbc_path, dbf_path) |
| else: |
| td = tempfile.mkdtemp() |
| dbc_path = os.path.join(td, fname) |
| dbf_path = dbc_path.replace(".dbc", ".dbf") |
| try: |
| urllib.request.urlretrieve(url, dbc_path) |
| pyreaddbc.dbc2dbf(dbc_path, dbf_path) |
| except Exception as e: |
| logger.warning(f" [{uf}/{year}] download/convert failed: {e}") |
| return [] |
|
|
| out = [] |
| try: |
| for rec in DBF(dbf_path, encoding="latin-1", load=False): |
| cid_short = (rec.get("CAUSABAS") or "").strip().upper() |
| if cid_short not in target_cids: |
| continue |
| parsed = parse_sim_record(rec) |
| if parsed: |
| parsed["year"] = year |
| out.append(parsed) |
| except Exception as e: |
| logger.warning(f" [{uf}/{year}] parse failed: {e}") |
| logger.info(f" [{uf}/{year}] matched {len(out)} records") |
| return out |
|
|
|
|
| def pull_sim_multi(ufs: list[str], years: list[int], *, |
| cache_dir: str = None) -> list[dict]: |
| """Pull SIM across multiple UFs and years, aggregate.""" |
| all_records = [] |
| for year in years: |
| for uf in ufs: |
| try: |
| recs = pull_sim(uf, year, cache_dir=cache_dir) |
| all_records.extend(recs) |
| except Exception as e: |
| logger.warning(f" [{uf}/{year}] error: {e}") |
| return all_records |
|
|
|
|
| def survival_distributions(records: list[dict]) -> dict: |
| """Compute per-disease survival statistics from SIM records.""" |
| from collections import defaultdict |
| import statistics |
|
|
| by_orpha = defaultdict(list) |
| for r in records: |
| if r.get("age_at_death_years") is not None: |
| by_orpha[r["orpha"]].append(r["age_at_death_years"]) |
|
|
| out = {} |
| for orpha, ages in by_orpha.items(): |
| if len(ages) < 3: |
| out[orpha] = {"n": len(ages), "ages": ages, "median": None} |
| continue |
| ages_sorted = sorted(ages) |
| n = len(ages_sorted) |
| median = statistics.median(ages_sorted) |
| p25 = ages_sorted[n // 4] |
| p75 = ages_sorted[3 * n // 4] |
| iqr = p75 - p25 |
| |
| try: |
| mean = sum(ages_sorted) / n |
| var = sum((a - mean) ** 2 for a in ages_sorted) / n |
| cv = (var ** 0.5) / mean if mean > 0 else 1.0 |
| shape = 1.2 / cv if cv > 0 else 1.5 |
| scale = mean / 0.91 |
| except Exception: |
| shape = 1.5 |
| scale = median * 1.4 |
| out[orpha] = { |
| "n": n, |
| "median": round(median, 2), |
| "p25": round(p25, 2), |
| "p75": round(p75, 2), |
| "iqr": round(iqr, 2), |
| "min_age": round(min(ages_sorted), 2), |
| "max_age": round(max(ages_sorted), 2), |
| "weibull_shape": round(shape, 2), |
| "weibull_scale": round(scale, 2), |
| "ages": ages_sorted, |
| } |
| return out |
|
|
|
|
| if __name__ == "__main__": |
| import argparse |
| import json |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s") |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--ufs", nargs="+", default=["SP"]) |
| parser.add_argument("--years", nargs="+", type=int, default=[2018, 2019, 2020]) |
| parser.add_argument("--cache-dir", default="/tmp/datasus_cache") |
| parser.add_argument("--out-json", default="/tmp/datasus_survival.json") |
| args = parser.parse_args() |
|
|
| print(f"Pulling SIM for UFs={args.ufs} years={args.years}") |
| recs = pull_sim_multi(args.ufs, args.years, cache_dir=args.cache_dir) |
| print(f"\nTotal rare-CID records: {len(recs)}") |
| survival = survival_distributions(recs) |
| print(f"\nPer-disease survival distributions ({len(survival)} diseases):\n") |
| for orpha, s in sorted(survival.items()): |
| if s.get("median"): |
| print(f" ORPHA:{orpha:>6} n={s['n']:>4} median={s['median']:>6}y " |
| f"IQR=[{s['p25']:.0f}-{s['p75']:.0f}] Weibull(shape={s['weibull_shape']}, scale={s['weibull_scale']})") |
| else: |
| print(f" ORPHA:{orpha:>6} n={s['n']:>4} (insufficient for fit)") |
|
|
| |
| out = {"records_count": len(recs), "ufs": args.ufs, "years": args.years, |
| "survival": {k: {kk: vv for kk, vv in v.items() if kk != "ages"} |
| for k, v in survival.items()}, |
| "raw_sample": recs[:50]} |
| with open(args.out_json, "w") as f: |
| json.dump(out, f, default=str, indent=2) |
| print(f"\nSaved → {args.out_json}") |
|
|