""" Ingest facility data from N-SUMHSS / National Directory into app schema. Reads a CSV or Excel file (e.g. downloaded from SAMHSA N-SUMHSS or National Directory), maps columns to the internal schema using a configurable mapping, and writes data/facilities.csv. If the source uses codes, extend SOURCE_TO_APP or add a code-resolution step using the N-SUMHSS codebook. Usage: python scripts/ingest_facilities.py [path_to_source.csv] python scripts/ingest_facilities.py path/to/national_directory.xlsx If no path is given, reads from stdin (CSV) or exits with usage. Output: data/facilities.csv (same directory as this script: repo root/data/). """ import argparse import sys import warnings from pathlib import Path import pandas as pd # Repo root (parent of scripts/) REPO_ROOT = Path(__file__).resolve().parent.parent DATA_DIR = REPO_ROOT / "data" OUTPUT_CSV = DATA_DIR / "facilities.csv" # Internal schema: only columns we need (no duplicate/redundant attribute columns). # Search matches against "services" for treatment type, payment, languages, populations, substances. APP_COLUMNS = [ "facility_name", "address", "city", "state", "zip", "phone", "mat", "services", ] # Map source column names (lowercase) -> app column name. # N-SUMHSS / National Directory use different names; adjust per codebook. # National Directory Excel may use "Facility/Program Name", "Street", "City", "State", etc. # See data/README.md for the data story and mapping notes. SOURCE_TO_APP = { "facility_name": "facility_name", "facility name": "facility_name", "facility/program name": "facility_name", "program name": "facility_name", "name": "facility_name", "name1": "facility_name", "name2": "facility_name", "provider name": "facility_name", "organization": "facility_name", "treatment facility name": "facility_name", "location name": "facility_name", "facility": "facility_name", "address": "address", "street": "address", "street address": "address", "address1": "address", "address line 1": "address", "street1": "address", "street2": "address", "physical address": "address", "location address": "address", "city": "city", "state": "state", "state abbreviation": "state", "zip": "zip", "zipcode": "zip", "zip code": "zip", "phone": "phone", "telephone": "phone", "phone number": "phone", "treatment_type": "treatment_type", "treatment type": "treatment_type", "type of care": "treatment_type", "care type": "treatment_type", "service setting": "treatment_type", "treatment setting": "treatment_type", "level of care": "treatment_type", "payment_options": "payment_options", "payment": "payment_options", "payment options": "payment_options", "payment accepted": "payment_options", "accepted payment": "payment_options", "insurance accepted": "payment_options", "sliding fee": "payment_options", "fee scale": "payment_options", "mat": "mat", "medication_assisted": "mat", "medication assisted": "mat", "medication assisted treatment": "mat", "buprenorphine": "mat", "services": "services", "services offered": "services", "service codes": "services", "types of care": "services", "substances_addressed": "substances_addressed", "substances": "substances_addressed", "substances addressed": "substances_addressed", "primary focus": "substances_addressed", "substance focus": "substances_addressed", "drugs treated": "substances_addressed", "languages": "languages", "language": "languages", "languages spoken": "languages", "non-english languages": "languages", "language services": "languages", "populations": "populations", "population": "populations", "population served": "populations", "special populations": "populations", "ages served": "populations", "age group": "populations", "description": "description", "comments": "description", "notes": "description", } def _normalize_mat(val) -> str: """Map various MAT values to yes/no.""" if pd.isna(val): return "" s = str(val).lower().strip() if s in ("yes", "1", "true", "y"): return "yes" if s in ("no", "0", "false", "n", ""): return "no" return "yes" if "yes" in s or "offer" in s else "no" def load_code_key(path: str | Path) -> dict[str, str] | None: """Load the code reference sheet from a National Directory Excel and return code -> description dict. SAMHSA 2024 uses sheet 'Service Code Reference' with service_code and service_name columns. """ path = Path(path) if path.suffix.lower() not in (".xlsx", ".xls"): return None if not path.exists(): return None with warnings.catch_warnings(): warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*") xl = pd.ExcelFile(path) key_df = None for name in xl.sheet_names: nlower = name.lower() if "service code reference" in nlower or "code reference" in nlower or ("key" in nlower and "code" in nlower): with warnings.catch_warnings(): warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*") key_df = pd.read_excel(path, sheet_name=name) break if key_df is None: for name in xl.sheet_names: with warnings.catch_warnings(): warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*") sheet = pd.read_excel(path, sheet_name=name) if 2 <= len(sheet) <= 600 and len(sheet.columns) >= 2: cols_lower = [str(c).lower() for c in sheet.columns] if "service_code" in cols_lower and "service_name" in cols_lower: key_df = sheet break if key_df is None or len(key_df) == 0: return None key_df.columns = [str(c).strip() for c in key_df.columns] cols_lower = [c.lower() for c in key_df.columns] code_col = None desc_col = None if "service_code" in cols_lower: code_col = key_df.columns[cols_lower.index("service_code")] if "service_name" in cols_lower: desc_col = key_df.columns[cols_lower.index("service_name")] if not code_col or not desc_col: code_col = key_df.columns[0] desc_col = key_df.columns[1] if len(key_df.columns) > 1 else key_df.columns[0] code_key = {} for _, row in key_df.iterrows(): k = str(row.get(code_col, "")).strip() v = str(row.get(desc_col, "")).strip() if k and v and k != "nan" and v != "nan" and len(k) <= 20: code_key[k] = v return code_key if code_key else None def _decode_service_codes(series: pd.Series, code_key: dict[str, str]) -> pd.Series: """Replace code tokens with descriptions; join with ', '. Skip * and unknown tokens (only output decoded).""" def decode_one(cell: str) -> str: if pd.isna(cell) or not str(cell).strip(): return "" parts = [] for token in str(cell).split(): token = token.strip() if not token or token == "*": continue if token in code_key: parts.append(code_key[token]) return ", ".join(parts) if parts else "" return series.apply(decode_one) def load_source(path: str | Path) -> pd.DataFrame: """Load CSV or Excel into a DataFrame with lowercase column names. For Excel with multiple sheets (e.g. National Directory + Key), uses the sheet that looks like facility data (has facility name or state, and many rows). """ path = Path(path) if not path.exists(): raise FileNotFoundError(path) suf = path.suffix.lower() if suf == ".csv": df = pd.read_csv(path) elif suf in (".xlsx", ".xls"): # Suppress openpyxl header/footer parse warnings (harmless; SAMHSA Excel often has them) with warnings.catch_warnings(): warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*") xl = pd.ExcelFile(path) if len(xl.sheet_names) == 1: with warnings.catch_warnings(): warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*") df = pd.read_excel(path) else: # Pick the sheet that has facility data: prefer one with a facility-name-like column and many rows def sheet_has_facility_name_col(sheet: pd.DataFrame) -> bool: cols_lower = [str(c).lower().strip() for c in sheet.columns] if "facility name" in cols_lower or "facility_name" in cols_lower: return True if "program name" in cols_lower or "facility/program name" in cols_lower: return True if any(("facility" in c or "program" in c) and "name" in c for c in cols_lower): return True if "organization" in cols_lower or "provider name" in cols_lower: return True return False best = None best_score = -1 for name in xl.sheet_names: with warnings.catch_warnings(): warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*") sheet = pd.read_excel(path, sheet_name=name) if len(sheet) < 10: continue cols_lower = [str(c).lower().strip() for c in sheet.columns] has_state_city = "state" in cols_lower and "city" in cols_lower has_name_col = sheet_has_facility_name_col(sheet) # Strongly prefer sheet that has a facility name column; then state/city; then row count score = (1000 if has_name_col else 0) + (10 if has_state_city else 0) + min(len(sheet), 5000) if score > best_score: best_score = score best = sheet if best is not None: df = best else: with warnings.catch_warnings(): warnings.filterwarnings("ignore", message=".*Cannot parse header or footer.*") df = pd.read_excel(path, sheet_name=0) else: raise ValueError(f"Unsupported format: {suf}. Use .csv or .xlsx") df.columns = [str(c).lower().strip() for c in df.columns] return df def _guess_facility_name_column(df: pd.DataFrame, col_map: dict) -> str | None: """If no facility_name mapping, find a column that likely holds facility/program name.""" if "facility_name" in col_map: return None for src_col in df.columns: c = str(src_col).lower().strip() if "program" in c and "name" in c: return src_col if "facility" in c and "name" in c: return src_col if c in ("organization", "provider name", "location name"): return src_col # First column is often the name in directory layouts if list(df.columns)[0] == src_col and ("name" in c or "facility" in c or "program" in c): return src_col return None def _guess_address_column(df: pd.DataFrame, col_map: dict) -> str | None: """If no address mapping, find a column that likely holds street address.""" if "address" in col_map: return None for src_col in df.columns: c = str(src_col).lower().strip() if "street" in c or ("address" in c and "line" in c): return src_col if c in ("physical address", "location address"): return src_col return None # Keywords to try when guessing unmapped columns (app_col -> list of substrings; any match in column name). _GUESS_COLUMN_KEYWORDS = { "treatment_type": ["treatment type", "type of care", "care type", "service setting", "level of care", "setting"], "payment_options": ["payment", "insurance", "fee", "sliding", "medicaid", "accepted payment"], "services": ["services", "service codes", "types of care", "offered", "treatment modalities"], "substances_addressed": ["substance", "primary focus", "drug", "alcohol", "opioid"], "languages": ["language", "non-english", "spanish", "bilingual"], "populations": ["population", "age", "special population", "veteran", "gender", "served"], "description": ["description", "comments", "notes", "remarks"], } def _guess_column_by_keywords(df: pd.DataFrame, col_map: dict, app_col: str) -> str | None: """If app_col not yet mapped, find a source column whose name contains any of the keywords.""" if app_col in col_map: return None keywords = _GUESS_COLUMN_KEYWORDS.get(app_col, []) for src_col in df.columns: c = str(src_col).lower().strip() for kw in keywords: if kw in c: return src_col return None def map_columns(df: pd.DataFrame) -> pd.DataFrame: """Map source columns to app schema; add missing app columns as empty.""" out = {} for app_col in APP_COLUMNS: out[app_col] = [] # Find which source column maps to each app column col_map = {} for src_col in df.columns: src_lower = str(src_col).lower().strip() if src_lower in SOURCE_TO_APP: app_col = SOURCE_TO_APP[src_lower] if app_col not in col_map: col_map[app_col] = src_col # Fallbacks for National Directory Excel when headers differ guess_name = _guess_facility_name_column(df, col_map) if guess_name and "facility_name" not in col_map: col_map["facility_name"] = guess_name guess_addr = _guess_address_column(df, col_map) if guess_addr and "address" not in col_map: col_map["address"] = guess_addr for app_col in ("treatment_type", "payment_options", "services", "substances_addressed", "languages", "populations", "description"): guess = _guess_column_by_keywords(df, col_map, app_col) if guess and app_col not in col_map: col_map[app_col] = guess for app_col in APP_COLUMNS: if app_col in col_map: out[app_col] = df[col_map[app_col]].astype(str).replace("nan", "").tolist() else: out[app_col] = [""] * len(df) result = pd.DataFrame(out) # National Directory format: merge name1+name2 -> facility_name, street1+street2 -> address cols_lower = [str(c).lower().strip() for c in df.columns] if "name1" in cols_lower and "name2" in cols_lower: n1 = df["name1"].astype(str).replace("nan", "").str.strip() n2 = df["name2"].astype(str).replace("nan", "").str.strip() merged = (n1 + " " + n2).str.strip() result["facility_name"] = merged.where(merged != "", result["facility_name"]) if "street1" in cols_lower and "street2" in cols_lower: s1 = df["street1"].astype(str).replace("nan", "").str.strip() s2 = df["street2"].astype(str).replace("nan", "").str.strip() merged = (s1 + " " + s2).str.strip() result["address"] = merged.where(merged != "", result["address"]) # service_code_info is decoded in main() using the Key sheet when available (see load_code_key). # Normalize MAT to yes/no if "mat" in result.columns: result["mat"] = result["mat"].apply(_normalize_mat) return result def drop_missing_location(df: pd.DataFrame) -> pd.DataFrame: """Keep only rows with non-empty city and state.""" if "city" not in df.columns or "state" not in df.columns: return df return df[ df["city"].notna() & (df["city"].astype(str).str.strip() != "") & df["state"].notna() & (df["state"].astype(str).str.strip() != "") ].copy() def main(): ap = argparse.ArgumentParser(description="Ingest N-SUMHSS/National Directory data into facilities.csv") ap.add_argument("source", nargs="?", help="Path to source CSV or Excel file. If omitted, print usage and exit.") ap.add_argument("-o", "--output", default=str(OUTPUT_CSV), help="Output CSV path") args = ap.parse_args() if not args.source: ap.print_help() sys.exit(0) path = Path(args.source) raw_df = load_source(path) df = map_columns(raw_df) if df["facility_name"].str.strip().eq("").all(): print( "Warning: no facility names were mapped. Source columns were:\n " + ", ".join(repr(c) for c in raw_df.columns), file=sys.stderr, ) # Decode service_code_info using the Key sheet; store only in services (search uses it for all filters). if "service_code_info" in raw_df.columns and path.suffix.lower() in (".xlsx", ".xls"): code_key = load_code_key(path) if code_key: decoded = _decode_service_codes(raw_df["service_code_info"], code_key) df["services"] = decoded # Report if services is still empty (couldn't decode) empty_attrs = [c for c in ("services",) if c in df.columns and (df[c].astype(str).str.strip() == "").all()] if empty_attrs and "service_code_info" in raw_df.columns: print( "Note: " + ", ".join(empty_attrs) + " had no data (source has coded service_code_info; " "Key sheet not found or could not be parsed for decoding).", file=sys.stderr, ) elif empty_attrs: print( "Note: these attributes had no data after mapping: " + ", ".join(empty_attrs) + ".", file=sys.stderr, ) df = drop_missing_location(df) # Deduplicate by facility_name + address + city + state (keep first occurrence) key_cols = ["facility_name", "address", "city", "state"] if all(c in df.columns for c in key_cols): before = len(df) df = df.drop_duplicates(subset=key_cols, keep="first").reset_index(drop=True) if len(df) < before: print(f"Dropped {before - len(df)} duplicate rows (same name+address+city+state).", file=sys.stderr) DATA_DIR.mkdir(parents=True, exist_ok=True) df.to_csv(args.output, index=False) print(f"Wrote {len(df)} rows to {args.output}", file=sys.stderr) if __name__ == "__main__": main()