#!/usr/bin/env python3 """ Aggregate DAIOE raw indices into higher occupation levels (4→3→2→1) and persist one CSV per taxonomy under `data/daioe_aggregated/`. Each output row stores: taxonomy, level, code, label, year, n_children, daioe_* metrics Level 4 rows are the original records tagged with `n_children=1`. Levels 3/2/1 are simple child means grouped by the parent code and year. """ from __future__ import annotations import argparse from dataclasses import dataclass, field from pathlib import Path from typing import Dict, Iterable, List, Mapping, Sequence import pandas as pd # --------------------------------------------------------------------------- # # Paths & shared settings PROJECT_ROOT = Path(__file__).resolve().parents[1] RAW_DIR = PROJECT_ROOT / "data" / "daioe_raw" OUTPUT_DIR = PROJECT_ROOT / "data" / "daioe_aggregated" PARENT_LEVELS = (3, 2, 1) BASE_COLUMNS = ["taxonomy", "level", "code", "label", "year", "n_children"] METRIC_PREFIX = "daioe_" # --------------------------------------------------------------------------- # # Taxonomy metadata @dataclass(frozen=True) class TaxonomyConfig: code_column: str pad_digits: int = 4 label_columns: Mapping[int, str] = field(default_factory=dict) level_digits: Mapping[int, int] = field(default_factory=lambda: {3: 3, 2: 2, 1: 1}) level4_label_column: str | None = None def digits_for(self, level: int) -> int: return self.level_digits.get(level, level) TAXONOMY_CONFIG: Dict[str, TaxonomyConfig] = { "ssyk2012": TaxonomyConfig( code_column="ssyk2012_4", label_columns={3: "ssyk2012_3", 2: "ssyk2012_2", 1: "ssyk2012_1"}, level4_label_column="ssyk2012_4", ), "ssyk96": TaxonomyConfig( code_column="ssyk96_4", label_columns={3: "ssyk96_3", 2: "ssyk96_2", 1: "ssyk96_1"}, level4_label_column="ssyk96_4", ), # "isco08": TaxonomyConfig( # code_column="occ_code_isco08", # level4_label_column="occ_title_isco08", # ), # "soc2010": TaxonomyConfig( # code_column="occ_code_soc2010", # level4_label_column="occ_title_soc2010", # ), # "onetsoc2010": TaxonomyConfig( # code_column="occ_code_onetsoc2010", # level4_label_column="occ_title_onetsoc2010", # ), } # --------------------------------------------------------------------------- # # Data preparation helpers def clean_code(series: pd.Series, pad_digits: int) -> pd.Series: """Strip all non-digits while preserving leading zeros.""" cleaned = series.astype(str).str.replace(r"[^0-9]", "", regex=True) return cleaned.str.zfill(pad_digits).replace({"": None}) def find_metric_columns(df: pd.DataFrame) -> List[str]: cols = [c for c in df.columns if c.startswith(METRIC_PREFIX)] if not cols: raise ValueError("No DAIOE metric columns detected.") return cols def load_taxonomy_frame( tax_id: str, config: TaxonomyConfig ) -> tuple[pd.DataFrame, List[str]]: path = RAW_DIR / f"daioe_{tax_id}.csv" if not path.exists(): raise FileNotFoundError(f"Missing raw file: {path}") df = pd.read_csv(path, sep="\t", na_values=["", "NA"]) df = df.copy() df["year"] = pd.to_numeric(df["year"], errors="coerce").astype("Int64") df = df.dropna(subset=["year"]) df["code_clean"] = clean_code(df[config.code_column], config.pad_digits) df = df.dropna(subset=["code_clean"]) metrics = find_metric_columns(df) df[metrics] = df[metrics].apply(pd.to_numeric, errors="coerce") return df, metrics # --------------------------------------------------------------------------- # # Label resolution def label_from_column(df: pd.DataFrame, code_col: str, label_col: str) -> pd.DataFrame: return ( df[[code_col, label_col]] .dropna(subset=[label_col]) .drop_duplicates(subset=[code_col]) .rename(columns={code_col: "code", label_col: "label"}) .assign(label=lambda s: s["label"].astype(str).str.strip()) .query("label != ''") ) def infer_level4_labels( df: pd.DataFrame, config: TaxonomyConfig, tax_id: str ) -> pd.DataFrame: candidates: List[str | None] = [ config.level4_label_column, *(col for col in df.columns if col.lower().endswith("_title")), *(col for col in df.columns if "occupation" in col.lower()), ] seen: set[str] = set() for col in candidates: if not col or col in seen or col not in df.columns: continue seen.add(col) label_map = label_from_column(df, "code_clean", col) if not label_map.empty: return label_map fallback = ( df[["code_clean"]].drop_duplicates().rename(columns={"code_clean": "code"}) ) fallback["label"] = fallback["code"].apply( lambda code: f"{tax_id.upper()} L4 {code}" ) return fallback def parent_label_map( df: pd.DataFrame, parent_col: str, label_col: str | None, tax_id: str, level: int ) -> pd.DataFrame: if label_col and label_col in df.columns: return label_from_column(df, parent_col, label_col) fallback = df[[parent_col]].drop_duplicates().rename(columns={parent_col: "code"}) fallback["label"] = fallback["code"].apply( lambda code: f"{tax_id.upper()} L{level} {code}" ) return fallback # --------------------------------------------------------------------------- # # Aggregation routines def level4_frame( df: pd.DataFrame, metrics: Sequence[str], tax_id: str, config: TaxonomyConfig ) -> pd.DataFrame: labels = infer_level4_labels(df, config, tax_id) frame = ( df[["code_clean", "year", *metrics]] .rename(columns={"code_clean": "code"}) .assign(taxonomy=tax_id, level=4, n_children=1) .merge(labels, on="code", how="left") ) frame["label"] = frame["label"].fillna( frame["code"].apply(lambda c: f"{tax_id.upper()} L4 {c}") ) return frame[BASE_COLUMNS + list(metrics)].sort_values(["code", "year"]) def aggregate_parent_level( df: pd.DataFrame, metrics: Sequence[str], tax_id: str, level: int, digits: int, label_col: str | None, ) -> pd.DataFrame: parent_col = f"code_level_{level}" working = df.copy() working[parent_col] = working["code_clean"].str.slice(0, digits) working = working[working[parent_col].str.len() == digits] if working.empty: return pd.DataFrame() grouped = working.groupby([parent_col, "year"]) agg = grouped[list(metrics)].mean(numeric_only=True).reset_index() counts = ( grouped["code_clean"] .nunique() .reset_index() .rename(columns={"code_clean": "n_children"}) ) agg = agg.merge(counts, on=[parent_col, "year"], how="left") agg = agg.rename(columns={parent_col: "code"}) labels = parent_label_map(working, parent_col, label_col, tax_id, level) agg = ( agg.merge(labels, on="code", how="left") .assign(taxonomy=tax_id, level=level) .reindex(columns=BASE_COLUMNS + list(metrics)) .sort_values(["code", "year"]) ) return agg def build_aggregated_frame(tax_id: str, config: TaxonomyConfig) -> pd.DataFrame: df, metrics = load_taxonomy_frame(tax_id, config) frames = [level4_frame(df, metrics, tax_id, config)] for level in PARENT_LEVELS: digits = config.digits_for(level) label_col = config.label_columns.get(level) frame = aggregate_parent_level(df, metrics, tax_id, level, digits, label_col) if not frame.empty: frames.append(frame) combined = ( pd.concat(frames, ignore_index=True) .sort_values(["level", "code", "year"]) .reset_index(drop=True) ) return combined def write_output(df: pd.DataFrame, tax_id: str) -> Path: OUTPUT_DIR.mkdir(parents=True, exist_ok=True) target = OUTPUT_DIR / f"daioe_{tax_id}_aggregated.csv" df.to_csv(target, index=False) return target # --------------------------------------------------------------------------- # # CLI orchestration def discover_taxonomies(filter_list: Iterable[str] | None) -> List[str]: available = sorted( f.stem.replace("daioe_", "", 1) for f in RAW_DIR.glob("daioe_*.csv") ) if not filter_list: return available requested = set(filter_list) missing = sorted(requested - set(available)) if missing: raise SystemExit(f"Unknown taxonomy requested: {', '.join(missing)}") return [tax for tax in available if tax in requested] def run_taxonomy(tax_id: str) -> None: config = TAXONOMY_CONFIG.get(tax_id) if not config: print(f"[skip] No taxonomy config for {tax_id}") return aggregated = build_aggregated_frame(tax_id, config) path = write_output(aggregated, tax_id) print(f"[{tax_id}] wrote {len(aggregated)} rows → {path.relative_to(PROJECT_ROOT)}") def parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Aggregate DAIOE indices to levels 4/3/2/1." ) parser.add_argument( "-t", "--taxonomy", action="append", dest="taxonomies", help="Only process the given taxonomy (repeat for multiple). Defaults to all.", ) return parser.parse_args(argv) def main(argv: Sequence[str] | None = None) -> None: args = parse_args(argv) taxonomies = discover_taxonomies(args.taxonomies) if not taxonomies: raise SystemExit("No daioe_*.csv files found under data/daioe_raw/") for tax_id in taxonomies: run_taxonomy(tax_id) if __name__ == "__main__": main()