gemeo-twin-stack / src /gemeo /datasus /apac_pull.py
timmers's picture
GEMEO world-model — initial release (module + NeuralSurv ckpt + RareBench v49 + KG embeddings)
089d665 verified
"""DATASUS APAC-SIA (high-cost outpatient procedure / orphan-drug authorisation) pull.
APAC = Autorização de Procedimentos de Alta Complexidade. This is the SUS
pipeline through which rare-disease patients receive high-cost orphan drugs
(enzyme replacement therapies, biologicals, etc.). Each APAC record is one
authorisation event with: CID-10, procedure code (SIGTAP), patient sex/age,
issuing UF, authorization date, validity period, monthly cost, and CNS-hash
(when present, allows cohort linkage with SIH and SIM).
Why this is the highest-leverage DATASUS subsystem for Gemeo:
- It captures the TREATMENT trajectory (the orphan drug events), not just
admission events. SIH-RD shows when the patient is hospitalised;
APAC-SIA shows when the patient gets the high-cost therapy that prevents
hospitalisation.
- Each rare disease typically has a small number of valid APAC procedures
(e.g., laronidase for MPS-I = 0604320XX). Filtering is straightforward.
- Pulled monthly; same DBC/DBF format as SIH and SIM.
Pulls APXX####.dbc from
ftp://ftp.datasus.gov.br/dissemin/publicos/SIASUS/200801_/Dados/
(APXX = APAC; same UF/YYMM convention as SIH-RD.)
"""
from __future__ import annotations
import logging
import os
import tempfile
import urllib.request
from datetime import datetime
from pathlib import Path
logger = logging.getLogger("gemeo.datasus.apac")
# Same rare CID set as SIH; APAC matches against AP_CIDPRI / AP_CIDSEC / AP_CIDCAS
RARE_CIDS_APAC = {
"G113": "100", # AT
"E752": "646", # NPC / Gaucher cohort
"E751": "355", # Gaucher
"E750": "355", # Gaucher subtype
"G710": "98896", # DMD
"G711": "98896",
"G120": "70", # SMA-1
"G121": "71", # SMA-2
"G122": "83330", # SMA-3
"E840": "586", # CF
"E841": "586",
"E848": "586",
"E849": "586",
"E760": "579", # MPS I
"E761": "580", # MPS II
"E830": "905", # Wilson
"G111": "95", # Friedreich
"Q874": "558", # Marfan
"Q850": "636", # NF1
"F842": "778", # Rett
"D811": "183660", # SCID
}
# Known orphan-drug SIGTAP procedure prefixes for rare diseases.
# Examples:
# 060432016X — laronidase (MPS-I) [Aldurazyme]
# 060432025X — idursulfase (MPS-II) [Elaprase]
# 060432005X — alglucosidase alfa (Pompe) [Myozyme]
# 060432004X — imiglucerase (Gaucher) [Cerezyme]
# 060432006X — agalsidase alfa/beta (Fabry) [Replagal/Fabrazyme]
# 060432042X — nusinersena (SMA) [Spinraza]
# 060432014X — eculizumab
ORPHAN_DRUG_PREFIXES = {
"0604320", # broad orphan-drug class (most ERTs live here)
"0303040", # neuro consult tier (proxy for chronic follow-up)
"0301060", # clinical follow-up
}
def parse_apac_record(rec: dict) -> dict | None:
"""Parse one APAC record into a clean treatment-event dict."""
cid = (rec.get("AP_CIDPRI") or rec.get("AP_CIDSEC") or
rec.get("AP_CIDCAS") or "").strip().upper()
if not cid:
return None
matched = None
for code in (cid, cid[:3]):
if code in RARE_CIDS_APAC:
matched = RARE_CIDS_APAC[code]
break
if matched is None:
return None
sex_code = str(rec.get("AP_SEXO") or "").strip()
sex = "M" if sex_code in ("1", "M") else ("F" if sex_code in ("3", "F") else "?")
age = None
try:
idade = rec.get("AP_NUIDADE")
# AP_COIDADE is the unit-of-age code: 1=h, 2=d, 3=mo, 4=yr, 5=>100yr
cod_idade = str(rec.get("AP_COIDADE") or rec.get("AP_TPIDADE") or "").strip()
if idade is not None:
idade = int(idade)
if cod_idade == "4":
age = float(idade)
elif cod_idade == "3":
age = idade / 12
elif cod_idade == "2":
age = idade / 365.25
elif cod_idade == "5":
age = 100.0 + idade
elif cod_idade in ("", "0"):
# if no unit code, assume years for plausible values
if 0 <= idade <= 110:
age = float(idade)
except (ValueError, TypeError):
pass
def _date(s):
if not s or len(str(s)) < 6: return None
s = str(s)
try:
if len(s) == 8:
return datetime.strptime(s, "%Y%m%d").date()
except ValueError:
pass
return None
proc = (rec.get("AP_PRIPAL") or rec.get("AP_PROC") or "").strip()
cost = None
try:
v = rec.get("AP_VL_AP") or rec.get("AP_VLR_AP")
if v is not None:
cost = float(v)
except (ValueError, TypeError):
pass
return {
"cid": cid,
"orpha": matched,
"sex": sex,
"age_at_authorization_years": age,
"uf_code": (rec.get("AP_UFNACIO") or rec.get("AP_UFMUN") or "")[:2],
"auth_date": _date(rec.get("AP_DTINIC")),
"valid_until": _date(rec.get("AP_DTFIM")),
"procedure_code": proc or None,
"is_orphan_drug": any(proc.startswith(p) for p in ORPHAN_DRUG_PREFIXES) if proc else False,
"monthly_cost_brl": cost,
"cns_hash": (rec.get("AP_CNSPCN") or "").strip() or None,
"type": "treatment", # for joint-event tokenization
}
def pull_apac(uf: str, year: int, month: int, *, cache_dir: str = None,
target_cids: set = None) -> list[dict]:
"""Pull APAC-SIA for one UF/year/month."""
import pyreaddbc
from dbfread import DBF
if target_cids is None:
target_cids = set(RARE_CIDS_APAC.keys())
# APAC-Medicamentos prefix is "AM" (high-cost orphan drugs).
# Other APAC groups: AB (bariatric), AQ (chemo), AR (radio), AN (nephro), etc.
fname = f"AM{uf}{str(year)[-2:]}{month:02d}.dbc"
url = f"ftp://ftp.datasus.gov.br/dissemin/publicos/SIASUS/200801_/Dados/{fname}"
use_persistent = cache_dir is not None
if use_persistent:
os.makedirs(cache_dir, exist_ok=True)
dbc_path = os.path.join(cache_dir, fname)
dbf_path = dbc_path.replace(".dbc", ".dbf")
if os.path.exists(dbf_path) and os.path.getsize(dbf_path) > 1024:
pass
elif os.path.exists(dbc_path) and os.path.getsize(dbc_path) > 1024:
pyreaddbc.dbc2dbf(dbc_path, dbf_path)
else:
try:
logger.info(f" [{uf}/{year}/{month:02d}] download")
urllib.request.urlretrieve(url, dbc_path)
pyreaddbc.dbc2dbf(dbc_path, dbf_path)
except Exception as e:
logger.warning(f" [{uf}/{year}/{month:02d}] download failed: {e}")
return []
else:
td = tempfile.mkdtemp()
dbc_path = os.path.join(td, fname)
dbf_path = dbc_path.replace(".dbc", ".dbf")
try:
urllib.request.urlretrieve(url, dbc_path)
pyreaddbc.dbc2dbf(dbc_path, dbf_path)
except Exception as e:
logger.warning(f" [{uf}/{year}/{month:02d}] failed: {e}")
return []
out = []
try:
for rec in DBF(dbf_path, encoding="latin-1", load=False):
cid = (rec.get("AP_CIDPRI") or rec.get("AP_CIDSEC") or
rec.get("AP_CIDCAS") or "").strip().upper()
if cid not in target_cids and cid[:3] not in target_cids:
continue
parsed = parse_apac_record(rec)
if parsed:
parsed["year"] = year
parsed["month"] = month
out.append(parsed)
except Exception as e:
logger.warning(f" [{uf}/{year}/{month:02d}] parse failed: {e}")
if out:
logger.info(f" [{uf}/{year}/{month:02d}] matched {len(out)} APAC events")
return out
def pull_apac_multi(ufs: list[str], year_months: list[tuple[int, int]], *,
cache_dir: str = None) -> list[dict]:
"""Pull APAC-SIA across multiple UFs and (year, month) pairs."""
all_records = []
for year, month in year_months:
for uf in ufs:
try:
recs = pull_apac(uf, year, month, cache_dir=cache_dir)
all_records.extend(recs)
except Exception as e:
logger.warning(f" [{uf}/{year}/{month}] error: {e}")
return all_records
if __name__ == "__main__":
import argparse
import json
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(name)s] %(message)s")
parser = argparse.ArgumentParser()
parser.add_argument("--ufs", nargs="+", default=["SP", "RJ", "MG"])
parser.add_argument("--year", type=int, default=2019)
parser.add_argument("--months", nargs="+", type=int, default=[1, 4, 7, 10])
parser.add_argument("--cache-dir", default="/tmp/datasus_apac_cache")
parser.add_argument("--out-json", default="/tmp/datasus_apac.json")
args = parser.parse_args()
year_months = [(args.year, m) for m in args.months]
print(f"Pulling APAC-SIA UFs={args.ufs} year={args.year} months={args.months}")
recs = pull_apac_multi(args.ufs, year_months, cache_dir=args.cache_dir)
print(f"\nTotal APAC rare-CID events: {len(recs)}")
from collections import Counter
by_orpha = Counter(r["orpha"] for r in recs)
print(f"\nPer-disease:")
for o, c in by_orpha.most_common():
print(f" ORPHA:{o:>6} {c:>5} APAC events")
with open(args.out_json, "w") as f:
json.dump([{**r, "auth_date": str(r.get("auth_date") or ""),
"valid_until": str(r.get("valid_until") or "")}
for r in recs], f)
print(f"\nSaved → {args.out_json}")