dummy_app / scripts /04_weighted_AI_assisted.py
joseph-data's picture
dummy app start
1264273 unverified
#!/usr/bin/env python3
from __future__ import annotations
import argparse
from pathlib import Path
from typing import Iterable, List, Sequence
import pandas as pd
# ============================== CONFIG ===================================== #
# Default taxonomy (override via --tax-id)
DEFAULT_TAX_ID = "ssyk96" # or "ssyk96"
# DAIOE metric column suffixes to process
DAIOE_SUFFIXES: Sequence[str] = (
"allapps",
"stratgames",
"videogames",
"imgrec",
"imgcompr",
"imggen",
"readcompr",
"lngmod",
"translat",
"speechrec",
"genai",
)
# Input subfolders (relative to ROOT/data)
RAW_DIRNAME = "daioe_raw"
WEIGHTS_DIRNAME = "scb_weights"
OUT_DIRNAME = "daioe_processed"
# Output file name pattern
OUT_FILE_TPL = "daioe_{tax_id}_weighted.csv"
# ============================== PATHS ====================================== #
def get_root() -> Path:
"""Resolve project root robustly (works in scripts and notebooks)."""
try:
return Path(__file__).resolve().parents[1]
except NameError:
# Fallback for interactive environments
return Path.cwd()
def data_path(*parts: str | Path) -> Path:
return get_root() / "data" / Path(*parts)
# ============================== IO HELPERS ================================= #
def load_ssyk_table(tax_id: str) -> pd.DataFrame:
"""Load DAIOE table for the given taxonomy."""
# Files named like: daioe_ssyk2012.csv / daioe_ssyk96.csv (tab-separated)
path = data_path(RAW_DIRNAME, f"daioe_{tax_id}.csv")
if not path.exists():
raise FileNotFoundError(f"DAIOE file not found: {path}")
return pd.read_csv(path, sep="\t")
def load_weights(tax_id: str, *, employment_col: str = "value") -> pd.DataFrame:
"""Load the latest weights CSV for the given taxonomy (drop 'year')."""
files = sorted(data_path(WEIGHTS_DIRNAME).glob(f"{tax_id}_weights_en_*.csv"))
if not files:
raise FileNotFoundError(
f"No weights found in {data_path(WEIGHTS_DIRNAME)} for tax_id='{tax_id}'"
)
df = pd.read_csv(files[-1]).drop(columns=["year"], errors="ignore")
if employment_col not in df.columns:
raise KeyError(
f"Expected employment column '{employment_col}' in weights file {files[-1]}"
)
return df
# ============================== TRANSFORMS ================================= #
def _clean_code(series: pd.Series, *, width: int = 4) -> pd.Series:
"""
Normalize code strings:
- cast to string
- remove non-digits
- take first `width` digits
- left-pad with zeros to `width`
"""
s = series.astype("string")
s = s.str.replace(r"\D", "", regex=True)
s = s.str.slice(0, width)
return s.fillna("").str.zfill(width)
def normalize_codes(
ssyk: pd.DataFrame, weights: pd.DataFrame, tax_id: str
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Add/clean 'code_4' in both ssyk and weights. Returns copies with a clean key.
"""
s = ssyk.copy()
w = weights.copy()
# ssyk has columns like "ssyk2012_4" or "ssyk96_4" containing "0110 Officerare" etc.
s_code_col = f"{tax_id}_4"
if s_code_col not in s.columns:
raise KeyError(
f"Expected '{s_code_col}' in DAIOE table columns: {list(s.columns)}"
)
s["code_4"] = _clean_code(
s[s_code_col].astype("string").str.extract(r"(\d{4})")[0], width=4
)
# weights should already have a code column; normalize and enforce 4 digits
w_code_col = "code_4" if "code_4" in w.columns else "code"
if w_code_col not in w.columns:
raise KeyError(
f"Expected a code column 'code_4' or 'code' in weights columns: {list(w.columns)}"
)
w["code_4"] = _clean_code(w[w_code_col], width=4)
# De-duplicate weight rows per code_4 to stabilize the join
w = w.drop_duplicates(subset=["code_4"], keep="last")
return s, w
def merge_with_weights(
ssyk: pd.DataFrame,
weights: pd.DataFrame,
*,
employment_col: str = "value",
) -> pd.DataFrame:
"""
Merge employment counts and weight columns into ssyk on 'code_4'.
Keeps only necessary columns from weights.
"""
required_weights = ["weight_in_code_1", "weight_in_code_2", "weight_in_code_3"]
for c in required_weights + ["code_4", employment_col]:
if c not in weights.columns:
raise KeyError(f"Missing '{c}' in weights table")
keep_cols = ["code_4", "occupation", employment_col, *required_weights]
w_keep = weights[keep_cols].rename(columns={employment_col: "employment_count"})
merged = ssyk.merge(w_keep, on="code_4", how="left")
# Fill missing weights with 0 to avoid NaN multiplications (optional, but practical)
merged[["weight_in_code_1", "weight_in_code_2", "weight_in_code_3"]] = merged[
["weight_in_code_1", "weight_in_code_2", "weight_in_code_3"]
].fillna(0.0)
return merged
def add_weighted_columns(
df: pd.DataFrame,
*,
suffixes: Sequence[str],
weight_cols: Sequence[str] = (
"weight_in_code_1",
"weight_in_code_2",
"weight_in_code_3",
),
label_map: dict[str, str] | None = None,
) -> tuple[pd.DataFrame, List[str]]:
"""
Multiply each daioe column by each weight column.
Returns (df_with_weights, list_of_new_weighted_cols).
"""
out = df.copy()
new_cols: List[str] = []
# Optional mapping from weight column to short suffix
label_map = label_map or {
"weight_in_code_1": "w_code1",
"weight_in_code_2": "w_code2",
"weight_in_code_3": "w_code3",
}
for sfx in suffixes:
base_col = f"daioe_{sfx}"
if base_col not in out.columns:
# Skip silently but you could raise if you want strictness
continue
for wcol in weight_cols:
if wcol not in out.columns:
continue
wtag = label_map.get(wcol, wcol)
new_col = f"{base_col}_{wtag}"
out[new_col] = out[base_col] * out[wcol]
new_cols.append(new_col)
return out, new_cols
def add_percentile_ranks(
df: pd.DataFrame,
*,
rank_cols: Sequence[str],
groupby: Iterable[str] = ("year",),
scale_0_100: bool = True,
prefix: str = "pct_rank_",
) -> pd.DataFrame:
"""
Add percentile ranks per group for the provided columns.
"""
out = df.copy()
# Compute ranks for all rank_cols at once for speed and consistency
ranked = (
out.groupby(list(groupby))[list(rank_cols)].rank(pct=True) # 0..1
)
if scale_0_100:
ranked = ranked.mul(100)
ranked = ranked.add_prefix(prefix) # e.g., pct_rank_daioe_imggen_w_code1
out = pd.concat([out, ranked], axis=1)
return out
# ============================== SAVE ======================================= #
def write_output(df: pd.DataFrame, *, tax_id: str) -> Path:
out_dir = data_path(OUT_DIRNAME)
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / OUT_FILE_TPL.format(tax_id=tax_id)
df.to_csv(out_path, index=False)
return out_path
# ============================== PIPELINE =================================== #
def build_weighted_daioe(tax_id: str) -> Path:
"""End-to-end pipeline: load β†’ normalize β†’ merge β†’ weight β†’ ranks β†’ save."""
ssyk = load_ssyk_table(tax_id)
weights = load_weights(tax_id)
ssyk, weights = normalize_codes(ssyk, weights, tax_id)
merged = merge_with_weights(ssyk, weights)
merged, weighted_cols = add_weighted_columns(
merged,
suffixes=DAIOE_SUFFIXES,
# defaults weight_in_code_{1,2,3}
)
# Add percentile ranks (per year) for all weighted columns
merged = add_percentile_ranks(
merged,
rank_cols=weighted_cols,
groupby=("year",),
scale_0_100=True,
prefix="pct_rank_",
)
out_path = write_output(merged, tax_id=tax_id)
return out_path
# ============================== CLI ======================================== #
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Build employment-weighted DAIOE tables with percentile ranks."
)
p.add_argument(
"--tax-id", default=DEFAULT_TAX_ID, help="Taxonomy id (e.g., ssyk2012, ssyk96)"
)
return p.parse_args()
def main() -> None:
args = parse_args()
out_path = build_weighted_daioe(args.tax_id)
print(f"Wrote {out_path}")
if __name__ == "__main__":
main()