6.c395miniproject / scripts /ingest_facilities.py
Bookmaster9
initial commit
458593e
"""
Ingest facility data from N-SUMHSS / National Directory into app schema.
Reads a CSV or Excel file (e.g. downloaded from SAMHSA N-SUMHSS or National
Directory), maps columns to the internal schema using a configurable mapping,
and writes data/facilities.csv. If the source uses codes, extend SOURCE_TO_APP
or add a code-resolution step using the N-SUMHSS codebook.
Usage:
python scripts/ingest_facilities.py [path_to_source.csv]
python scripts/ingest_facilities.py path/to/national_directory.xlsx
If no path is given, reads from stdin (CSV) or exits with usage.
Output: data/facilities.csv (same directory as this script: repo root/data/).
"""
import argparse
import sys
import warnings
from pathlib import Path
import pandas as pd
# Repo root (parent of scripts/)
REPO_ROOT = Path(__file__).resolve().parent.parent
DATA_DIR = REPO_ROOT / "data"
OUTPUT_CSV = DATA_DIR / "facilities.csv"
# Internal schema: only columns we need (no duplicate/redundant attribute columns).
# Search matches against "services" for treatment type, payment, languages, populations, substances.
APP_COLUMNS = [
"facility_name",
"address",
"city",
"state",
"zip",
"phone",
"mat",
"services",
]
# Map source column names (lowercase) -> app column name.
# N-SUMHSS / National Directory use different names; adjust per codebook.
# National Directory Excel may use "Facility/Program Name", "Street", "City", "State", etc.
# See data/README.md for the data story and mapping notes.
SOURCE_TO_APP = {
"facility_name": "facility_name",
"facility name": "facility_name",
"facility/program name": "facility_name",
"program name": "facility_name",
"name": "facility_name",
"name1": "facility_name",
"name2": "facility_name",
"provider name": "facility_name",
"organization": "facility_name",
"treatment facility name": "facility_name",
"location name": "facility_name",
"facility": "facility_name",
"address": "address",
"street": "address",
"street address": "address",
"address1": "address",
"address line 1": "address",
"street1": "address",
"street2": "address",
"physical address": "address",
"location address": "address",
"city": "city",
"state": "state",
"state abbreviation": "state",
"zip": "zip",
"zipcode": "zip",
"zip code": "zip",
"phone": "phone",
"telephone": "phone",
"phone number": "phone",
"treatment_type": "treatment_type",
"treatment type": "treatment_type",
"type of care": "treatment_type",
"care type": "treatment_type",
"service setting": "treatment_type",
"treatment setting": "treatment_type",
"level of care": "treatment_type",
"payment_options": "payment_options",
"payment": "payment_options",
"payment options": "payment_options",
"payment accepted": "payment_options",
"accepted payment": "payment_options",
"insurance accepted": "payment_options",
"sliding fee": "payment_options",
"fee scale": "payment_options",
"mat": "mat",
"medication_assisted": "mat",
"medication assisted": "mat",
"medication assisted treatment": "mat",
"buprenorphine": "mat",
"services": "services",
"services offered": "services",
"service codes": "services",
"types of care": "services",
"substances_addressed": "substances_addressed",
"substances": "substances_addressed",
"substances addressed": "substances_addressed",
"primary focus": "substances_addressed",
"substance focus": "substances_addressed",
"drugs treated": "substances_addressed",
"languages": "languages",
"language": "languages",
"languages spoken": "languages",
"non-english languages": "languages",
"language services": "languages",
"populations": "populations",
"population": "populations",
"population served": "populations",
"special populations": "populations",
"ages served": "populations",
"age group": "populations",
"description": "description",
"comments": "description",
"notes": "description",
}
def _normalize_mat(val) -> str:
"""Map various MAT values to yes/no."""
if pd.isna(val):
return ""
s = str(val).lower().strip()
if s in ("yes", "1", "true", "y"):
return "yes"
if s in ("no", "0", "false", "n", ""):
return "no"
return "yes" if "yes" in s or "offer" in s else "no"
def load_code_key(path: str | Path) -> dict[str, str] | None:
"""Load the code reference sheet from a National Directory Excel and return code -> description dict.
SAMHSA 2024 uses sheet 'Service Code Reference' with service_code and service_name columns.
"""
path = Path(path)
if path.suffix.lower() not in (".xlsx", ".xls"):
return None
if not path.exists():
return None
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*")
xl = pd.ExcelFile(path)
key_df = None
for name in xl.sheet_names:
nlower = name.lower()
if "service code reference" in nlower or "code reference" in nlower or ("key" in nlower and "code" in nlower):
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*")
key_df = pd.read_excel(path, sheet_name=name)
break
if key_df is None:
for name in xl.sheet_names:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*")
sheet = pd.read_excel(path, sheet_name=name)
if 2 <= len(sheet) <= 600 and len(sheet.columns) >= 2:
cols_lower = [str(c).lower() for c in sheet.columns]
if "service_code" in cols_lower and "service_name" in cols_lower:
key_df = sheet
break
if key_df is None or len(key_df) == 0:
return None
key_df.columns = [str(c).strip() for c in key_df.columns]
cols_lower = [c.lower() for c in key_df.columns]
code_col = None
desc_col = None
if "service_code" in cols_lower:
code_col = key_df.columns[cols_lower.index("service_code")]
if "service_name" in cols_lower:
desc_col = key_df.columns[cols_lower.index("service_name")]
if not code_col or not desc_col:
code_col = key_df.columns[0]
desc_col = key_df.columns[1] if len(key_df.columns) > 1 else key_df.columns[0]
code_key = {}
for _, row in key_df.iterrows():
k = str(row.get(code_col, "")).strip()
v = str(row.get(desc_col, "")).strip()
if k and v and k != "nan" and v != "nan" and len(k) <= 20:
code_key[k] = v
return code_key if code_key else None
def _decode_service_codes(series: pd.Series, code_key: dict[str, str]) -> pd.Series:
"""Replace code tokens with descriptions; join with ', '. Skip * and unknown tokens (only output decoded)."""
def decode_one(cell: str) -> str:
if pd.isna(cell) or not str(cell).strip():
return ""
parts = []
for token in str(cell).split():
token = token.strip()
if not token or token == "*":
continue
if token in code_key:
parts.append(code_key[token])
return ", ".join(parts) if parts else ""
return series.apply(decode_one)
def load_source(path: str | Path) -> pd.DataFrame:
"""Load CSV or Excel into a DataFrame with lowercase column names.
For Excel with multiple sheets (e.g. National Directory + Key), uses the
sheet that looks like facility data (has facility name or state, and many rows).
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(path)
suf = path.suffix.lower()
if suf == ".csv":
df = pd.read_csv(path)
elif suf in (".xlsx", ".xls"):
# Suppress openpyxl header/footer parse warnings (harmless; SAMHSA Excel often has them)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*")
xl = pd.ExcelFile(path)
if len(xl.sheet_names) == 1:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*")
df = pd.read_excel(path)
else:
# Pick the sheet that has facility data: prefer one with a facility-name-like column and many rows
def sheet_has_facility_name_col(sheet: pd.DataFrame) -> bool:
cols_lower = [str(c).lower().strip() for c in sheet.columns]
if "facility name" in cols_lower or "facility_name" in cols_lower:
return True
if "program name" in cols_lower or "facility/program name" in cols_lower:
return True
if any(("facility" in c or "program" in c) and "name" in c for c in cols_lower):
return True
if "organization" in cols_lower or "provider name" in cols_lower:
return True
return False
best = None
best_score = -1
for name in xl.sheet_names:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*")
sheet = pd.read_excel(path, sheet_name=name)
if len(sheet) < 10:
continue
cols_lower = [str(c).lower().strip() for c in sheet.columns]
has_state_city = "state" in cols_lower and "city" in cols_lower
has_name_col = sheet_has_facility_name_col(sheet)
# Strongly prefer sheet that has a facility name column; then state/city; then row count
score = (1000 if has_name_col else 0) + (10 if has_state_city else 0) + min(len(sheet), 5000)
if score > best_score:
best_score = score
best = sheet
if best is not None:
df = best
else:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*")
df = pd.read_excel(path, sheet_name=0)
else:
raise ValueError(f"Unsupported format: {suf}. Use .csv or .xlsx")
df.columns = [str(c).lower().strip() for c in df.columns]
return df
def _guess_facility_name_column(df: pd.DataFrame, col_map: dict) -> str | None:
"""If no facility_name mapping, find a column that likely holds facility/program name."""
if "facility_name" in col_map:
return None
for src_col in df.columns:
c = str(src_col).lower().strip()
if "program" in c and "name" in c:
return src_col
if "facility" in c and "name" in c:
return src_col
if c in ("organization", "provider name", "location name"):
return src_col
# First column is often the name in directory layouts
if list(df.columns)[0] == src_col and ("name" in c or "facility" in c or "program" in c):
return src_col
return None
def _guess_address_column(df: pd.DataFrame, col_map: dict) -> str | None:
"""If no address mapping, find a column that likely holds street address."""
if "address" in col_map:
return None
for src_col in df.columns:
c = str(src_col).lower().strip()
if "street" in c or ("address" in c and "line" in c):
return src_col
if c in ("physical address", "location address"):
return src_col
return None
# Keywords to try when guessing unmapped columns (app_col -> list of substrings; any match in column name).
_GUESS_COLUMN_KEYWORDS = {
"treatment_type": ["treatment type", "type of care", "care type", "service setting", "level of care", "setting"],
"payment_options": ["payment", "insurance", "fee", "sliding", "medicaid", "accepted payment"],
"services": ["services", "service codes", "types of care", "offered", "treatment modalities"],
"substances_addressed": ["substance", "primary focus", "drug", "alcohol", "opioid"],
"languages": ["language", "non-english", "spanish", "bilingual"],
"populations": ["population", "age", "special population", "veteran", "gender", "served"],
"description": ["description", "comments", "notes", "remarks"],
}
def _guess_column_by_keywords(df: pd.DataFrame, col_map: dict, app_col: str) -> str | None:
"""If app_col not yet mapped, find a source column whose name contains any of the keywords."""
if app_col in col_map:
return None
keywords = _GUESS_COLUMN_KEYWORDS.get(app_col, [])
for src_col in df.columns:
c = str(src_col).lower().strip()
for kw in keywords:
if kw in c:
return src_col
return None
def map_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Map source columns to app schema; add missing app columns as empty."""
out = {}
for app_col in APP_COLUMNS:
out[app_col] = []
# Find which source column maps to each app column
col_map = {}
for src_col in df.columns:
src_lower = str(src_col).lower().strip()
if src_lower in SOURCE_TO_APP:
app_col = SOURCE_TO_APP[src_lower]
if app_col not in col_map:
col_map[app_col] = src_col
# Fallbacks for National Directory Excel when headers differ
guess_name = _guess_facility_name_column(df, col_map)
if guess_name and "facility_name" not in col_map:
col_map["facility_name"] = guess_name
guess_addr = _guess_address_column(df, col_map)
if guess_addr and "address" not in col_map:
col_map["address"] = guess_addr
for app_col in ("treatment_type", "payment_options", "services", "substances_addressed", "languages", "populations", "description"):
guess = _guess_column_by_keywords(df, col_map, app_col)
if guess and app_col not in col_map:
col_map[app_col] = guess
for app_col in APP_COLUMNS:
if app_col in col_map:
out[app_col] = df[col_map[app_col]].astype(str).replace("nan", "").tolist()
else:
out[app_col] = [""] * len(df)
result = pd.DataFrame(out)
# National Directory format: merge name1+name2 -> facility_name, street1+street2 -> address
cols_lower = [str(c).lower().strip() for c in df.columns]
if "name1" in cols_lower and "name2" in cols_lower:
n1 = df["name1"].astype(str).replace("nan", "").str.strip()
n2 = df["name2"].astype(str).replace("nan", "").str.strip()
merged = (n1 + " " + n2).str.strip()
result["facility_name"] = merged.where(merged != "", result["facility_name"])
if "street1" in cols_lower and "street2" in cols_lower:
s1 = df["street1"].astype(str).replace("nan", "").str.strip()
s2 = df["street2"].astype(str).replace("nan", "").str.strip()
merged = (s1 + " " + s2).str.strip()
result["address"] = merged.where(merged != "", result["address"])
# service_code_info is decoded in main() using the Key sheet when available (see load_code_key).
# Normalize MAT to yes/no
if "mat" in result.columns:
result["mat"] = result["mat"].apply(_normalize_mat)
return result
def drop_missing_location(df: pd.DataFrame) -> pd.DataFrame:
"""Keep only rows with non-empty city and state."""
if "city" not in df.columns or "state" not in df.columns:
return df
return df[
df["city"].notna() & (df["city"].astype(str).str.strip() != "")
& df["state"].notna() & (df["state"].astype(str).str.strip() != "")
].copy()
def main():
ap = argparse.ArgumentParser(description="Ingest N-SUMHSS/National Directory data into facilities.csv")
ap.add_argument("source", nargs="?", help="Path to source CSV or Excel file. If omitted, print usage and exit.")
ap.add_argument("-o", "--output", default=str(OUTPUT_CSV), help="Output CSV path")
args = ap.parse_args()
if not args.source:
ap.print_help()
sys.exit(0)
path = Path(args.source)
raw_df = load_source(path)
df = map_columns(raw_df)
if df["facility_name"].str.strip().eq("").all():
print(
"Warning: no facility names were mapped. Source columns were:\n "
+ ", ".join(repr(c) for c in raw_df.columns),
file=sys.stderr,
)
# Decode service_code_info using the Key sheet; store only in services (search uses it for all filters).
if "service_code_info" in raw_df.columns and path.suffix.lower() in (".xlsx", ".xls"):
code_key = load_code_key(path)
if code_key:
decoded = _decode_service_codes(raw_df["service_code_info"], code_key)
df["services"] = decoded
# Report if services is still empty (couldn't decode)
empty_attrs = [c for c in ("services",) if c in df.columns and (df[c].astype(str).str.strip() == "").all()]
if empty_attrs and "service_code_info" in raw_df.columns:
print(
"Note: " + ", ".join(empty_attrs) + " had no data (source has coded service_code_info; "
"Key sheet not found or could not be parsed for decoding).",
file=sys.stderr,
)
elif empty_attrs:
print(
"Note: these attributes had no data after mapping: " + ", ".join(empty_attrs) + ".",
file=sys.stderr,
)
df = drop_missing_location(df)
# Deduplicate by facility_name + address + city + state (keep first occurrence)
key_cols = ["facility_name", "address", "city", "state"]
if all(c in df.columns for c in key_cols):
before = len(df)
df = df.drop_duplicates(subset=key_cols, keep="first").reset_index(drop=True)
if len(df) < before:
print(f"Dropped {before - len(df)} duplicate rows (same name+address+city+state).", file=sys.stderr)
DATA_DIR.mkdir(parents=True, exist_ok=True)
df.to_csv(args.output, index=False)
print(f"Wrote {len(df)} rows to {args.output}", file=sys.stderr)
if __name__ == "__main__":
main()