| """DATASUS APAC-SIA (high-cost outpatient procedure / orphan-drug authorisation) pull. |
| |
| APAC = Autorização de Procedimentos de Alta Complexidade. This is the SUS |
| pipeline through which rare-disease patients receive high-cost orphan drugs |
| (enzyme replacement therapies, biologicals, etc.). Each APAC record is one |
| authorisation event with: CID-10, procedure code (SIGTAP), patient sex/age, |
| issuing UF, authorization date, validity period, monthly cost, and CNS-hash |
| (when present, allows cohort linkage with SIH and SIM). |
| |
| Why this is the highest-leverage DATASUS subsystem for Gemeo: |
| - It captures the TREATMENT trajectory (the orphan drug events), not just |
| admission events. SIH-RD shows when the patient is hospitalised; |
| APAC-SIA shows when the patient gets the high-cost therapy that prevents |
| hospitalisation. |
| - Each rare disease typically has a small number of valid APAC procedures |
| (e.g., laronidase for MPS-I = 0604320XX). Filtering is straightforward. |
| - Pulled monthly; same DBC/DBF format as SIH and SIM. |
| |
| Pulls APXX####.dbc from |
| ftp://ftp.datasus.gov.br/dissemin/publicos/SIASUS/200801_/Dados/ |
| |
| (APXX = APAC; same UF/YYMM convention as SIH-RD.) |
| """ |
| from __future__ import annotations |
| import logging |
| import os |
| import tempfile |
| import urllib.request |
| from datetime import datetime |
| from pathlib import Path |
|
|
| logger = logging.getLogger("gemeo.datasus.apac") |
|
|
|
|
| |
| RARE_CIDS_APAC = { |
| "G113": "100", |
| "E752": "646", |
| "E751": "355", |
| "E750": "355", |
| "G710": "98896", |
| "G711": "98896", |
| "G120": "70", |
| "G121": "71", |
| "G122": "83330", |
| "E840": "586", |
| "E841": "586", |
| "E848": "586", |
| "E849": "586", |
| "E760": "579", |
| "E761": "580", |
| "E830": "905", |
| "G111": "95", |
| "Q874": "558", |
| "Q850": "636", |
| "F842": "778", |
| "D811": "183660", |
| } |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| ORPHAN_DRUG_PREFIXES = { |
| "0604320", |
| "0303040", |
| "0301060", |
| } |
|
|
|
|
| def parse_apac_record(rec: dict) -> dict | None: |
| """Parse one APAC record into a clean treatment-event dict.""" |
| cid = (rec.get("AP_CIDPRI") or rec.get("AP_CIDSEC") or |
| rec.get("AP_CIDCAS") or "").strip().upper() |
| if not cid: |
| return None |
|
|
| matched = None |
| for code in (cid, cid[:3]): |
| if code in RARE_CIDS_APAC: |
| matched = RARE_CIDS_APAC[code] |
| break |
| if matched is None: |
| return None |
|
|
| sex_code = str(rec.get("AP_SEXO") or "").strip() |
| sex = "M" if sex_code in ("1", "M") else ("F" if sex_code in ("3", "F") else "?") |
|
|
| age = None |
| try: |
| idade = rec.get("AP_NUIDADE") |
| |
| cod_idade = str(rec.get("AP_COIDADE") or rec.get("AP_TPIDADE") or "").strip() |
| if idade is not None: |
| idade = int(idade) |
| if cod_idade == "4": |
| age = float(idade) |
| elif cod_idade == "3": |
| age = idade / 12 |
| elif cod_idade == "2": |
| age = idade / 365.25 |
| elif cod_idade == "5": |
| age = 100.0 + idade |
| elif cod_idade in ("", "0"): |
| |
| if 0 <= idade <= 110: |
| age = float(idade) |
| except (ValueError, TypeError): |
| pass |
|
|
| def _date(s): |
| if not s or len(str(s)) < 6: return None |
| s = str(s) |
| try: |
| if len(s) == 8: |
| return datetime.strptime(s, "%Y%m%d").date() |
| except ValueError: |
| pass |
| return None |
|
|
| proc = (rec.get("AP_PRIPAL") or rec.get("AP_PROC") or "").strip() |
| cost = None |
| try: |
| v = rec.get("AP_VL_AP") or rec.get("AP_VLR_AP") |
| if v is not None: |
| cost = float(v) |
| except (ValueError, TypeError): |
| pass |
|
|
| return { |
| "cid": cid, |
| "orpha": matched, |
| "sex": sex, |
| "age_at_authorization_years": age, |
| "uf_code": (rec.get("AP_UFNACIO") or rec.get("AP_UFMUN") or "")[:2], |
| "auth_date": _date(rec.get("AP_DTINIC")), |
| "valid_until": _date(rec.get("AP_DTFIM")), |
| "procedure_code": proc or None, |
| "is_orphan_drug": any(proc.startswith(p) for p in ORPHAN_DRUG_PREFIXES) if proc else False, |
| "monthly_cost_brl": cost, |
| "cns_hash": (rec.get("AP_CNSPCN") or "").strip() or None, |
| "type": "treatment", |
| } |
|
|
|
|
| def pull_apac(uf: str, year: int, month: int, *, cache_dir: str = None, |
| target_cids: set = None) -> list[dict]: |
| """Pull APAC-SIA for one UF/year/month.""" |
| import pyreaddbc |
| from dbfread import DBF |
|
|
| if target_cids is None: |
| target_cids = set(RARE_CIDS_APAC.keys()) |
|
|
| |
| |
| fname = f"AM{uf}{str(year)[-2:]}{month:02d}.dbc" |
| url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIASUS/200801_/Dados/{fname}" |
|
|
| use_persistent = cache_dir is not None |
| if use_persistent: |
| os.makedirs(cache_dir, exist_ok=True) |
| dbc_path = os.path.join(cache_dir, fname) |
| dbf_path = dbc_path.replace(".dbc", ".dbf") |
| if os.path.exists(dbf_path) and os.path.getsize(dbf_path) > 1024: |
| pass |
| elif os.path.exists(dbc_path) and os.path.getsize(dbc_path) > 1024: |
| pyreaddbc.dbc2dbf(dbc_path, dbf_path) |
| else: |
| try: |
| logger.info(f" [{uf}/{year}/{month:02d}] download") |
| urllib.request.urlretrieve(url, dbc_path) |
| pyreaddbc.dbc2dbf(dbc_path, dbf_path) |
| except Exception as e: |
| logger.warning(f" [{uf}/{year}/{month:02d}] download failed: {e}") |
| return [] |
| else: |
| td = tempfile.mkdtemp() |
| dbc_path = os.path.join(td, fname) |
| dbf_path = dbc_path.replace(".dbc", ".dbf") |
| try: |
| urllib.request.urlretrieve(url, dbc_path) |
| pyreaddbc.dbc2dbf(dbc_path, dbf_path) |
| except Exception as e: |
| logger.warning(f" [{uf}/{year}/{month:02d}] failed: {e}") |
| return [] |
|
|
| out = [] |
| try: |
| for rec in DBF(dbf_path, encoding="latin-1", load=False): |
| cid = (rec.get("AP_CIDPRI") or rec.get("AP_CIDSEC") or |
| rec.get("AP_CIDCAS") or "").strip().upper() |
| if cid not in target_cids and cid[:3] not in target_cids: |
| continue |
| parsed = parse_apac_record(rec) |
| if parsed: |
| parsed["year"] = year |
| parsed["month"] = month |
| out.append(parsed) |
| except Exception as e: |
| logger.warning(f" [{uf}/{year}/{month:02d}] parse failed: {e}") |
| if out: |
| logger.info(f" [{uf}/{year}/{month:02d}] matched {len(out)} APAC events") |
| return out |
|
|
|
|
| def pull_apac_multi(ufs: list[str], year_months: list[tuple[int, int]], *, |
| cache_dir: str = None) -> list[dict]: |
| """Pull APAC-SIA across multiple UFs and (year, month) pairs.""" |
| all_records = [] |
| for year, month in year_months: |
| for uf in ufs: |
| try: |
| recs = pull_apac(uf, year, month, cache_dir=cache_dir) |
| all_records.extend(recs) |
| except Exception as e: |
| logger.warning(f" [{uf}/{year}/{month}] error: {e}") |
| return all_records |
|
|
|
|
| if __name__ == "__main__": |
| import argparse |
| import json |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s") |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--ufs", nargs="+", default=["SP", "RJ", "MG"]) |
| parser.add_argument("--year", type=int, default=2019) |
| parser.add_argument("--months", nargs="+", type=int, default=[1, 4, 7, 10]) |
| parser.add_argument("--cache-dir", default="/tmp/datasus_apac_cache") |
| parser.add_argument("--out-json", default="/tmp/datasus_apac.json") |
| args = parser.parse_args() |
|
|
| year_months = [(args.year, m) for m in args.months] |
| print(f"Pulling APAC-SIA UFs={args.ufs} year={args.year} months={args.months}") |
| recs = pull_apac_multi(args.ufs, year_months, cache_dir=args.cache_dir) |
| print(f"\nTotal APAC rare-CID events: {len(recs)}") |
|
|
| from collections import Counter |
| by_orpha = Counter(r["orpha"] for r in recs) |
| print(f"\nPer-disease:") |
| for o, c in by_orpha.most_common(): |
| print(f" ORPHA:{o:>6} {c:>5} APAC events") |
|
|
| with open(args.out_json, "w") as f: |
| json.dump([{**r, "auth_date": str(r.get("auth_date") or ""), |
| "valid_until": str(r.get("valid_until") or "")} |
| for r in recs], f) |
| print(f"\nSaved → {args.out_json}") |
|
|