Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import argparse | |
| from pathlib import Path | |
| from typing import Iterable, List, Sequence | |
| import pandas as pd | |
| # ============================== CONFIG ===================================== # | |
| # Default taxonomy (override via --tax-id) | |
| DEFAULT_TAX_ID = "ssyk96" # or "ssyk96" | |
| # DAIOE metric column suffixes to process | |
| DAIOE_SUFFIXES: Sequence[str] = ( | |
| "allapps", | |
| "stratgames", | |
| "videogames", | |
| "imgrec", | |
| "imgcompr", | |
| "imggen", | |
| "readcompr", | |
| "lngmod", | |
| "translat", | |
| "speechrec", | |
| "genai", | |
| ) | |
| # Input subfolders (relative to ROOT/data) | |
| RAW_DIRNAME = "daioe_raw" | |
| WEIGHTS_DIRNAME = "scb_weights" | |
| OUT_DIRNAME = "daioe_processed" | |
| # Output file name pattern | |
| OUT_FILE_TPL = "daioe_{tax_id}_weighted.csv" | |
| # ============================== PATHS ====================================== # | |
| def get_root() -> Path: | |
| """Resolve project root robustly (works in scripts and notebooks).""" | |
| try: | |
| return Path(__file__).resolve().parents[1] | |
| except NameError: | |
| # Fallback for interactive environments | |
| return Path.cwd() | |
| def data_path(*parts: str | Path) -> Path: | |
| return get_root() / "data" / Path(*parts) | |
| # ============================== IO HELPERS ================================= # | |
| def load_ssyk_table(tax_id: str) -> pd.DataFrame: | |
| """Load DAIOE table for the given taxonomy.""" | |
| # Files named like: daioe_ssyk2012.csv / daioe_ssyk96.csv (tab-separated) | |
| path = data_path(RAW_DIRNAME, f"daioe_{tax_id}.csv") | |
| if not path.exists(): | |
| raise FileNotFoundError(f"DAIOE file not found: {path}") | |
| return pd.read_csv(path, sep="\t") | |
| def load_weights(tax_id: str, *, employment_col: str = "value") -> pd.DataFrame: | |
| """Load the latest weights CSV for the given taxonomy (drop 'year').""" | |
| files = sorted(data_path(WEIGHTS_DIRNAME).glob(f"{tax_id}_weights_en_*.csv")) | |
| if not files: | |
| raise FileNotFoundError( | |
| f"No weights found in {data_path(WEIGHTS_DIRNAME)} for tax_id='{tax_id}'" | |
| ) | |
| df = pd.read_csv(files[-1]).drop(columns=["year"], errors="ignore") | |
| if employment_col not in df.columns: | |
| raise KeyError( | |
| f"Expected employment column '{employment_col}' in weights file {files[-1]}" | |
| ) | |
| return df | |
| # ============================== TRANSFORMS ================================= # | |
| def _clean_code(series: pd.Series, *, width: int = 4) -> pd.Series: | |
| """ | |
| Normalize code strings: | |
| - cast to string | |
| - remove non-digits | |
| - take first `width` digits | |
| - left-pad with zeros to `width` | |
| """ | |
| s = series.astype("string") | |
| s = s.str.replace(r"\D", "", regex=True) | |
| s = s.str.slice(0, width) | |
| return s.fillna("").str.zfill(width) | |
| def normalize_codes( | |
| ssyk: pd.DataFrame, weights: pd.DataFrame, tax_id: str | |
| ) -> tuple[pd.DataFrame, pd.DataFrame]: | |
| """ | |
| Add/clean 'code_4' in both ssyk and weights. Returns copies with a clean key. | |
| """ | |
| s = ssyk.copy() | |
| w = weights.copy() | |
| # ssyk has columns like "ssyk2012_4" or "ssyk96_4" containing "0110 Officerare" etc. | |
| s_code_col = f"{tax_id}_4" | |
| if s_code_col not in s.columns: | |
| raise KeyError( | |
| f"Expected '{s_code_col}' in DAIOE table columns: {list(s.columns)}" | |
| ) | |
| s["code_4"] = _clean_code( | |
| s[s_code_col].astype("string").str.extract(r"(\d{4})")[0], width=4 | |
| ) | |
| # weights should already have a code column; normalize and enforce 4 digits | |
| w_code_col = "code_4" if "code_4" in w.columns else "code" | |
| if w_code_col not in w.columns: | |
| raise KeyError( | |
| f"Expected a code column 'code_4' or 'code' in weights columns: {list(w.columns)}" | |
| ) | |
| w["code_4"] = _clean_code(w[w_code_col], width=4) | |
| # De-duplicate weight rows per code_4 to stabilize the join | |
| w = w.drop_duplicates(subset=["code_4"], keep="last") | |
| return s, w | |
| def merge_with_weights( | |
| ssyk: pd.DataFrame, | |
| weights: pd.DataFrame, | |
| *, | |
| employment_col: str = "value", | |
| ) -> pd.DataFrame: | |
| """ | |
| Merge employment counts and weight columns into ssyk on 'code_4'. | |
| Keeps only necessary columns from weights. | |
| """ | |
| required_weights = ["weight_in_code_1", "weight_in_code_2", "weight_in_code_3"] | |
| for c in required_weights + ["code_4", employment_col]: | |
| if c not in weights.columns: | |
| raise KeyError(f"Missing '{c}' in weights table") | |
| keep_cols = ["code_4", "occupation", employment_col, *required_weights] | |
| w_keep = weights[keep_cols].rename(columns={employment_col: "employment_count"}) | |
| merged = ssyk.merge(w_keep, on="code_4", how="left") | |
| # Fill missing weights with 0 to avoid NaN multiplications (optional, but practical) | |
| merged[["weight_in_code_1", "weight_in_code_2", "weight_in_code_3"]] = merged[ | |
| ["weight_in_code_1", "weight_in_code_2", "weight_in_code_3"] | |
| ].fillna(0.0) | |
| return merged | |
| def add_weighted_columns( | |
| df: pd.DataFrame, | |
| *, | |
| suffixes: Sequence[str], | |
| weight_cols: Sequence[str] = ( | |
| "weight_in_code_1", | |
| "weight_in_code_2", | |
| "weight_in_code_3", | |
| ), | |
| label_map: dict[str, str] | None = None, | |
| ) -> tuple[pd.DataFrame, List[str]]: | |
| """ | |
| Multiply each daioe column by each weight column. | |
| Returns (df_with_weights, list_of_new_weighted_cols). | |
| """ | |
| out = df.copy() | |
| new_cols: List[str] = [] | |
| # Optional mapping from weight column to short suffix | |
| label_map = label_map or { | |
| "weight_in_code_1": "w_code1", | |
| "weight_in_code_2": "w_code2", | |
| "weight_in_code_3": "w_code3", | |
| } | |
| for sfx in suffixes: | |
| base_col = f"daioe_{sfx}" | |
| if base_col not in out.columns: | |
| # Skip silently but you could raise if you want strictness | |
| continue | |
| for wcol in weight_cols: | |
| if wcol not in out.columns: | |
| continue | |
| wtag = label_map.get(wcol, wcol) | |
| new_col = f"{base_col}_{wtag}" | |
| out[new_col] = out[base_col] * out[wcol] | |
| new_cols.append(new_col) | |
| return out, new_cols | |
| def add_percentile_ranks( | |
| df: pd.DataFrame, | |
| *, | |
| rank_cols: Sequence[str], | |
| groupby: Iterable[str] = ("year",), | |
| scale_0_100: bool = True, | |
| prefix: str = "pct_rank_", | |
| ) -> pd.DataFrame: | |
| """ | |
| Add percentile ranks per group for the provided columns. | |
| """ | |
| out = df.copy() | |
| # Compute ranks for all rank_cols at once for speed and consistency | |
| ranked = ( | |
| out.groupby(list(groupby))[list(rank_cols)].rank(pct=True) # 0..1 | |
| ) | |
| if scale_0_100: | |
| ranked = ranked.mul(100) | |
| ranked = ranked.add_prefix(prefix) # e.g., pct_rank_daioe_imggen_w_code1 | |
| out = pd.concat([out, ranked], axis=1) | |
| return out | |
| # ============================== SAVE ======================================= # | |
| def write_output(df: pd.DataFrame, *, tax_id: str) -> Path: | |
| out_dir = data_path(OUT_DIRNAME) | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| out_path = out_dir / OUT_FILE_TPL.format(tax_id=tax_id) | |
| df.to_csv(out_path, index=False) | |
| return out_path | |
| # ============================== PIPELINE =================================== # | |
| def build_weighted_daioe(tax_id: str) -> Path: | |
| """End-to-end pipeline: load β normalize β merge β weight β ranks β save.""" | |
| ssyk = load_ssyk_table(tax_id) | |
| weights = load_weights(tax_id) | |
| ssyk, weights = normalize_codes(ssyk, weights, tax_id) | |
| merged = merge_with_weights(ssyk, weights) | |
| merged, weighted_cols = add_weighted_columns( | |
| merged, | |
| suffixes=DAIOE_SUFFIXES, | |
| # defaults weight_in_code_{1,2,3} | |
| ) | |
| # Add percentile ranks (per year) for all weighted columns | |
| merged = add_percentile_ranks( | |
| merged, | |
| rank_cols=weighted_cols, | |
| groupby=("year",), | |
| scale_0_100=True, | |
| prefix="pct_rank_", | |
| ) | |
| out_path = write_output(merged, tax_id=tax_id) | |
| return out_path | |
| # ============================== CLI ======================================== # | |
| def parse_args() -> argparse.Namespace: | |
| p = argparse.ArgumentParser( | |
| description="Build employment-weighted DAIOE tables with percentile ranks." | |
| ) | |
| p.add_argument( | |
| "--tax-id", default=DEFAULT_TAX_ID, help="Taxonomy id (e.g., ssyk2012, ssyk96)" | |
| ) | |
| return p.parse_args() | |
| def main() -> None: | |
| args = parse_args() | |
| out_path = build_weighted_daioe(args.tax_id) | |
| print(f"Wrote {out_path}") | |
| if __name__ == "__main__": | |
| main() | |