#!/usr/bin/env python3 from __future__ import annotations import argparse from pathlib import Path from typing import Iterable, List, Sequence import pandas as pd # ============================== CONFIG ===================================== # # Default taxonomy (override via --tax-id) DEFAULT_TAX_ID = "ssyk96" # or "ssyk96" # DAIOE metric column suffixes to process DAIOE_SUFFIXES: Sequence[str] = ( "allapps", "stratgames", "videogames", "imgrec", "imgcompr", "imggen", "readcompr", "lngmod", "translat", "speechrec", "genai", ) # Input subfolders (relative to ROOT/data) RAW_DIRNAME = "daioe_raw" WEIGHTS_DIRNAME = "scb_weights" OUT_DIRNAME = "daioe_processed" # Output file name pattern OUT_FILE_TPL = "daioe_{tax_id}_weighted.csv" # ============================== PATHS ====================================== # def get_root() -> Path: """Resolve project root robustly (works in scripts and notebooks).""" try: return Path(__file__).resolve().parents[1] except NameError: # Fallback for interactive environments return Path.cwd() def data_path(*parts: str | Path) -> Path: return get_root() / "data" / Path(*parts) # ============================== IO HELPERS ================================= # def load_ssyk_table(tax_id: str) -> pd.DataFrame: """Load DAIOE table for the given taxonomy.""" # Files named like: daioe_ssyk2012.csv / daioe_ssyk96.csv (tab-separated) path = data_path(RAW_DIRNAME, f"daioe_{tax_id}.csv") if not path.exists(): raise FileNotFoundError(f"DAIOE file not found: {path}") return pd.read_csv(path, sep="\t") def load_weights(tax_id: str, *, employment_col: str = "value") -> pd.DataFrame: """Load the latest weights CSV for the given taxonomy (drop 'year').""" files = sorted(data_path(WEIGHTS_DIRNAME).glob(f"{tax_id}_weights_en_*.csv")) if not files: raise FileNotFoundError( f"No weights found in {data_path(WEIGHTS_DIRNAME)} for tax_id='{tax_id}'" ) df = pd.read_csv(files[-1]).drop(columns=["year"], errors="ignore") if employment_col not in df.columns: raise KeyError( f"Expected employment column '{employment_col}' in weights file {files[-1]}" ) return df # ============================== TRANSFORMS ================================= # def _clean_code(series: pd.Series, *, width: int = 4) -> pd.Series: """ Normalize code strings: - cast to string - remove non-digits - take first `width` digits - left-pad with zeros to `width` """ s = series.astype("string") s = s.str.replace(r"\D", "", regex=True) s = s.str.slice(0, width) return s.fillna("").str.zfill(width) def normalize_codes( ssyk: pd.DataFrame, weights: pd.DataFrame, tax_id: str ) -> tuple[pd.DataFrame, pd.DataFrame]: """ Add/clean 'code_4' in both ssyk and weights. Returns copies with a clean key. """ s = ssyk.copy() w = weights.copy() # ssyk has columns like "ssyk2012_4" or "ssyk96_4" containing "0110 Officerare" etc. s_code_col = f"{tax_id}_4" if s_code_col not in s.columns: raise KeyError( f"Expected '{s_code_col}' in DAIOE table columns: {list(s.columns)}" ) s["code_4"] = _clean_code( s[s_code_col].astype("string").str.extract(r"(\d{4})")[0], width=4 ) # weights should already have a code column; normalize and enforce 4 digits w_code_col = "code_4" if "code_4" in w.columns else "code" if w_code_col not in w.columns: raise KeyError( f"Expected a code column 'code_4' or 'code' in weights columns: {list(w.columns)}" ) w["code_4"] = _clean_code(w[w_code_col], width=4) # De-duplicate weight rows per code_4 to stabilize the join w = w.drop_duplicates(subset=["code_4"], keep="last") return s, w def merge_with_weights( ssyk: pd.DataFrame, weights: pd.DataFrame, *, employment_col: str = "value", ) -> pd.DataFrame: """ Merge employment counts and weight columns into ssyk on 'code_4'. Keeps only necessary columns from weights. """ required_weights = ["weight_in_code_1", "weight_in_code_2", "weight_in_code_3"] for c in required_weights + ["code_4", employment_col]: if c not in weights.columns: raise KeyError(f"Missing '{c}' in weights table") keep_cols = ["code_4", "occupation", employment_col, *required_weights] w_keep = weights[keep_cols].rename(columns={employment_col: "employment_count"}) merged = ssyk.merge(w_keep, on="code_4", how="left") # Fill missing weights with 0 to avoid NaN multiplications (optional, but practical) merged[["weight_in_code_1", "weight_in_code_2", "weight_in_code_3"]] = merged[ ["weight_in_code_1", "weight_in_code_2", "weight_in_code_3"] ].fillna(0.0) return merged def add_weighted_columns( df: pd.DataFrame, *, suffixes: Sequence[str], weight_cols: Sequence[str] = ( "weight_in_code_1", "weight_in_code_2", "weight_in_code_3", ), label_map: dict[str, str] | None = None, ) -> tuple[pd.DataFrame, List[str]]: """ Multiply each daioe column by each weight column. Returns (df_with_weights, list_of_new_weighted_cols). """ out = df.copy() new_cols: List[str] = [] # Optional mapping from weight column to short suffix label_map = label_map or { "weight_in_code_1": "w_code1", "weight_in_code_2": "w_code2", "weight_in_code_3": "w_code3", } for sfx in suffixes: base_col = f"daioe_{sfx}" if base_col not in out.columns: # Skip silently but you could raise if you want strictness continue for wcol in weight_cols: if wcol not in out.columns: continue wtag = label_map.get(wcol, wcol) new_col = f"{base_col}_{wtag}" out[new_col] = out[base_col] * out[wcol] new_cols.append(new_col) return out, new_cols def add_percentile_ranks( df: pd.DataFrame, *, rank_cols: Sequence[str], groupby: Iterable[str] = ("year",), scale_0_100: bool = True, prefix: str = "pct_rank_", ) -> pd.DataFrame: """ Add percentile ranks per group for the provided columns. """ out = df.copy() # Compute ranks for all rank_cols at once for speed and consistency ranked = ( out.groupby(list(groupby))[list(rank_cols)].rank(pct=True) # 0..1 ) if scale_0_100: ranked = ranked.mul(100) ranked = ranked.add_prefix(prefix) # e.g., pct_rank_daioe_imggen_w_code1 out = pd.concat([out, ranked], axis=1) return out # ============================== SAVE ======================================= # def write_output(df: pd.DataFrame, *, tax_id: str) -> Path: out_dir = data_path(OUT_DIRNAME) out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / OUT_FILE_TPL.format(tax_id=tax_id) df.to_csv(out_path, index=False) return out_path # ============================== PIPELINE =================================== # def build_weighted_daioe(tax_id: str) -> Path: """End-to-end pipeline: load → normalize → merge → weight → ranks → save.""" ssyk = load_ssyk_table(tax_id) weights = load_weights(tax_id) ssyk, weights = normalize_codes(ssyk, weights, tax_id) merged = merge_with_weights(ssyk, weights) merged, weighted_cols = add_weighted_columns( merged, suffixes=DAIOE_SUFFIXES, # defaults weight_in_code_{1,2,3} ) # Add percentile ranks (per year) for all weighted columns merged = add_percentile_ranks( merged, rank_cols=weighted_cols, groupby=("year",), scale_0_100=True, prefix="pct_rank_", ) out_path = write_output(merged, tax_id=tax_id) return out_path # ============================== CLI ======================================== # def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description="Build employment-weighted DAIOE tables with percentile ranks." ) p.add_argument( "--tax-id", default=DEFAULT_TAX_ID, help="Taxonomy id (e.g., ssyk2012, ssyk96)" ) return p.parse_args() def main() -> None: args = parse_args() out_path = build_weighted_daioe(args.tax_id) print(f"Wrote {out_path}") if __name__ == "__main__": main()