"""DATASUS APAC-SIA (high-cost outpatient procedure / orphan-drug authorisation) pull. APAC = Autorização de Procedimentos de Alta Complexidade. This is the SUS pipeline through which rare-disease patients receive high-cost orphan drugs (enzyme replacement therapies, biologicals, etc.). Each APAC record is one authorisation event with: CID-10, procedure code (SIGTAP), patient sex/age, issuing UF, authorization date, validity period, monthly cost, and CNS-hash (when present, allows cohort linkage with SIH and SIM). Why this is the highest-leverage DATASUS subsystem for Gemeo: - It captures the TREATMENT trajectory (the orphan drug events), not just admission events. SIH-RD shows when the patient is hospitalised; APAC-SIA shows when the patient gets the high-cost therapy that prevents hospitalisation. - Each rare disease typically has a small number of valid APAC procedures (e.g., laronidase for MPS-I = 0604320XX). Filtering is straightforward. - Pulled monthly; same DBC/DBF format as SIH and SIM. Pulls APXX####.dbc from ftp://ftp.datasus.gov.br/dissemin/publicos/SIASUS/200801_/Dados/ (APXX = APAC; same UF/YYMM convention as SIH-RD.) """ from __future__ import annotations import logging import os import tempfile import urllib.request from datetime import datetime from pathlib import Path logger = logging.getLogger("gemeo.datasus.apac") # Same rare CID set as SIH; APAC matches against AP_CIDPRI / AP_CIDSEC / AP_CIDCAS RARE_CIDS_APAC = { "G113": "100", # AT "E752": "646", # NPC / Gaucher cohort "E751": "355", # Gaucher "E750": "355", # Gaucher subtype "G710": "98896", # DMD "G711": "98896", "G120": "70", # SMA-1 "G121": "71", # SMA-2 "G122": "83330", # SMA-3 "E840": "586", # CF "E841": "586", "E848": "586", "E849": "586", "E760": "579", # MPS I "E761": "580", # MPS II "E830": "905", # Wilson "G111": "95", # Friedreich "Q874": "558", # Marfan "Q850": "636", # NF1 "F842": "778", # Rett "D811": "183660", # SCID } # Known orphan-drug SIGTAP procedure prefixes for rare diseases. # Examples: # 060432016X — laronidase (MPS-I) [Aldurazyme] # 060432025X — idursulfase (MPS-II) [Elaprase] # 060432005X — alglucosidase alfa (Pompe) [Myozyme] # 060432004X — imiglucerase (Gaucher) [Cerezyme] # 060432006X — agalsidase alfa/beta (Fabry) [Replagal/Fabrazyme] # 060432042X — nusinersena (SMA) [Spinraza] # 060432014X — eculizumab ORPHAN_DRUG_PREFIXES = { "0604320", # broad orphan-drug class (most ERTs live here) "0303040", # neuro consult tier (proxy for chronic follow-up) "0301060", # clinical follow-up } def parse_apac_record(rec: dict) -> dict | None: """Parse one APAC record into a clean treatment-event dict.""" cid = (rec.get("AP_CIDPRI") or rec.get("AP_CIDSEC") or rec.get("AP_CIDCAS") or "").strip().upper() if not cid: return None matched = None for code in (cid, cid[:3]): if code in RARE_CIDS_APAC: matched = RARE_CIDS_APAC[code] break if matched is None: return None sex_code = str(rec.get("AP_SEXO") or "").strip() sex = "M" if sex_code in ("1", "M") else ("F" if sex_code in ("3", "F") else "?") age = None try: idade = rec.get("AP_NUIDADE") # AP_COIDADE is the unit-of-age code: 1=h, 2=d, 3=mo, 4=yr, 5=>100yr cod_idade = str(rec.get("AP_COIDADE") or rec.get("AP_TPIDADE") or "").strip() if idade is not None: idade = int(idade) if cod_idade == "4": age = float(idade) elif cod_idade == "3": age = idade / 12 elif cod_idade == "2": age = idade / 365.25 elif cod_idade == "5": age = 100.0 + idade elif cod_idade in ("", "0"): # if no unit code, assume years for plausible values if 0 <= idade <= 110: age = float(idade) except (ValueError, TypeError): pass def _date(s): if not s or len(str(s)) < 6: return None s = str(s) try: if len(s) == 8: return datetime.strptime(s, "%Y%m%d").date() except ValueError: pass return None proc = (rec.get("AP_PRIPAL") or rec.get("AP_PROC") or "").strip() cost = None try: v = rec.get("AP_VL_AP") or rec.get("AP_VLR_AP") if v is not None: cost = float(v) except (ValueError, TypeError): pass return { "cid": cid, "orpha": matched, "sex": sex, "age_at_authorization_years": age, "uf_code": (rec.get("AP_UFNACIO") or rec.get("AP_UFMUN") or "")[:2], "auth_date": _date(rec.get("AP_DTINIC")), "valid_until": _date(rec.get("AP_DTFIM")), "procedure_code": proc or None, "is_orphan_drug": any(proc.startswith(p) for p in ORPHAN_DRUG_PREFIXES) if proc else False, "monthly_cost_brl": cost, "cns_hash": (rec.get("AP_CNSPCN") or "").strip() or None, "type": "treatment", # for joint-event tokenization } def pull_apac(uf: str, year: int, month: int, *, cache_dir: str = None, target_cids: set = None) -> list[dict]: """Pull APAC-SIA for one UF/year/month.""" import pyreaddbc from dbfread import DBF if target_cids is None: target_cids = set(RARE_CIDS_APAC.keys()) # APAC-Medicamentos prefix is "AM" (high-cost orphan drugs). # Other APAC groups: AB (bariatric), AQ (chemo), AR (radio), AN (nephro), etc. fname = f"AM{uf}{str(year)[-2:]}{month:02d}.dbc" url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIASUS/200801_/Dados/{fname}" use_persistent = cache_dir is not None if use_persistent: os.makedirs(cache_dir, exist_ok=True) dbc_path = os.path.join(cache_dir, fname) dbf_path = dbc_path.replace(".dbc", ".dbf") if os.path.exists(dbf_path) and os.path.getsize(dbf_path) > 1024: pass elif os.path.exists(dbc_path) and os.path.getsize(dbc_path) > 1024: pyreaddbc.dbc2dbf(dbc_path, dbf_path) else: try: logger.info(f" [{uf}/{year}/{month:02d}] download") urllib.request.urlretrieve(url, dbc_path) pyreaddbc.dbc2dbf(dbc_path, dbf_path) except Exception as e: logger.warning(f" [{uf}/{year}/{month:02d}] download failed: {e}") return [] else: td = tempfile.mkdtemp() dbc_path = os.path.join(td, fname) dbf_path = dbc_path.replace(".dbc", ".dbf") try: urllib.request.urlretrieve(url, dbc_path) pyreaddbc.dbc2dbf(dbc_path, dbf_path) except Exception as e: logger.warning(f" [{uf}/{year}/{month:02d}] failed: {e}") return [] out = [] try: for rec in DBF(dbf_path, encoding="latin-1", load=False): cid = (rec.get("AP_CIDPRI") or rec.get("AP_CIDSEC") or rec.get("AP_CIDCAS") or "").strip().upper() if cid not in target_cids and cid[:3] not in target_cids: continue parsed = parse_apac_record(rec) if parsed: parsed["year"] = year parsed["month"] = month out.append(parsed) except Exception as e: logger.warning(f" [{uf}/{year}/{month:02d}] parse failed: {e}") if out: logger.info(f" [{uf}/{year}/{month:02d}] matched {len(out)} APAC events") return out def pull_apac_multi(ufs: list[str], year_months: list[tuple[int, int]], *, cache_dir: str = None) -> list[dict]: """Pull APAC-SIA across multiple UFs and (year, month) pairs.""" all_records = [] for year, month in year_months: for uf in ufs: try: recs = pull_apac(uf, year, month, cache_dir=cache_dir) all_records.extend(recs) except Exception as e: logger.warning(f" [{uf}/{year}/{month}] error: {e}") return all_records if __name__ == "__main__": import argparse import json logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s") parser = argparse.ArgumentParser() parser.add_argument("--ufs", nargs="+", default=["SP", "RJ", "MG"]) parser.add_argument("--year", type=int, default=2019) parser.add_argument("--months", nargs="+", type=int, default=[1, 4, 7, 10]) parser.add_argument("--cache-dir", default="/tmp/datasus_apac_cache") parser.add_argument("--out-json", default="/tmp/datasus_apac.json") args = parser.parse_args() year_months = [(args.year, m) for m in args.months] print(f"Pulling APAC-SIA UFs={args.ufs} year={args.year} months={args.months}") recs = pull_apac_multi(args.ufs, year_months, cache_dir=args.cache_dir) print(f"\nTotal APAC rare-CID events: {len(recs)}") from collections import Counter by_orpha = Counter(r["orpha"] for r in recs) print(f"\nPer-disease:") for o, c in by_orpha.most_common(): print(f" ORPHA:{o:>6} {c:>5} APAC events") with open(args.out_json, "w") as f: json.dump([{**r, "auth_date": str(r.get("auth_date") or ""), "valid_until": str(r.get("valid_until") or "")} for r in recs], f) print(f"\nSaved → {args.out_json}")