dummy_app / scripts /05_aggregate_daioe_levels.py
joseph-data's picture
dummy app start
1264273 unverified
#!/usr/bin/env python3
"""
Aggregate DAIOE raw indices into higher occupation levels (4→3→2→1) and
persist one CSV per taxonomy under `data/daioe_aggregated/`.
Each output row stores:
taxonomy, level, code, label, year, n_children, daioe_* metrics
Level 4 rows are the original records tagged with `n_children=1`. Levels 3/2/1
are simple child means grouped by the parent code and year.
"""
from __future__ import annotations
import argparse
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Iterable, List, Mapping, Sequence
import pandas as pd
# --------------------------------------------------------------------------- #
# Paths & shared settings
PROJECT_ROOT = Path(__file__).resolve().parents[1]
RAW_DIR = PROJECT_ROOT / "data" / "daioe_raw"
OUTPUT_DIR = PROJECT_ROOT / "data" / "daioe_aggregated"
PARENT_LEVELS = (3, 2, 1)
BASE_COLUMNS = ["taxonomy", "level", "code", "label", "year", "n_children"]
METRIC_PREFIX = "daioe_"
# --------------------------------------------------------------------------- #
# Taxonomy metadata
@dataclass(frozen=True)
class TaxonomyConfig:
code_column: str
pad_digits: int = 4
label_columns: Mapping[int, str] = field(default_factory=dict)
level_digits: Mapping[int, int] = field(default_factory=lambda: {3: 3, 2: 2, 1: 1})
level4_label_column: str | None = None
def digits_for(self, level: int) -> int:
return self.level_digits.get(level, level)
TAXONOMY_CONFIG: Dict[str, TaxonomyConfig] = {
"ssyk2012": TaxonomyConfig(
code_column="ssyk2012_4",
label_columns={3: "ssyk2012_3", 2: "ssyk2012_2", 1: "ssyk2012_1"},
level4_label_column="ssyk2012_4",
),
"ssyk96": TaxonomyConfig(
code_column="ssyk96_4",
label_columns={3: "ssyk96_3", 2: "ssyk96_2", 1: "ssyk96_1"},
level4_label_column="ssyk96_4",
),
# "isco08": TaxonomyConfig(
# code_column="occ_code_isco08",
# level4_label_column="occ_title_isco08",
# ),
# "soc2010": TaxonomyConfig(
# code_column="occ_code_soc2010",
# level4_label_column="occ_title_soc2010",
# ),
# "onetsoc2010": TaxonomyConfig(
# code_column="occ_code_onetsoc2010",
# level4_label_column="occ_title_onetsoc2010",
# ),
}
# --------------------------------------------------------------------------- #
# Data preparation helpers
def clean_code(series: pd.Series, pad_digits: int) -> pd.Series:
"""Strip all non-digits while preserving leading zeros."""
cleaned = series.astype(str).str.replace(r"[^0-9]", "", regex=True)
return cleaned.str.zfill(pad_digits).replace({"": None})
def find_metric_columns(df: pd.DataFrame) -> List[str]:
cols = [c for c in df.columns if c.startswith(METRIC_PREFIX)]
if not cols:
raise ValueError("No DAIOE metric columns detected.")
return cols
def load_taxonomy_frame(
tax_id: str, config: TaxonomyConfig
) -> tuple[pd.DataFrame, List[str]]:
path = RAW_DIR / f"daioe_{tax_id}.csv"
if not path.exists():
raise FileNotFoundError(f"Missing raw file: {path}")
df = pd.read_csv(path, sep="\t", na_values=["", "NA"])
df = df.copy()
df["year"] = pd.to_numeric(df["year"], errors="coerce").astype("Int64")
df = df.dropna(subset=["year"])
df["code_clean"] = clean_code(df[config.code_column], config.pad_digits)
df = df.dropna(subset=["code_clean"])
metrics = find_metric_columns(df)
df[metrics] = df[metrics].apply(pd.to_numeric, errors="coerce")
return df, metrics
# --------------------------------------------------------------------------- #
# Label resolution
def label_from_column(df: pd.DataFrame, code_col: str, label_col: str) -> pd.DataFrame:
return (
df[[code_col, label_col]]
.dropna(subset=[label_col])
.drop_duplicates(subset=[code_col])
.rename(columns={code_col: "code", label_col: "label"})
.assign(label=lambda s: s["label"].astype(str).str.strip())
.query("label != ''")
)
def infer_level4_labels(
df: pd.DataFrame, config: TaxonomyConfig, tax_id: str
) -> pd.DataFrame:
candidates: List[str | None] = [
config.level4_label_column,
*(col for col in df.columns if col.lower().endswith("_title")),
*(col for col in df.columns if "occupation" in col.lower()),
]
seen: set[str] = set()
for col in candidates:
if not col or col in seen or col not in df.columns:
continue
seen.add(col)
label_map = label_from_column(df, "code_clean", col)
if not label_map.empty:
return label_map
fallback = (
df[["code_clean"]].drop_duplicates().rename(columns={"code_clean": "code"})
)
fallback["label"] = fallback["code"].apply(
lambda code: f"{tax_id.upper()} L4 {code}"
)
return fallback
def parent_label_map(
df: pd.DataFrame, parent_col: str, label_col: str | None, tax_id: str, level: int
) -> pd.DataFrame:
if label_col and label_col in df.columns:
return label_from_column(df, parent_col, label_col)
fallback = df[[parent_col]].drop_duplicates().rename(columns={parent_col: "code"})
fallback["label"] = fallback["code"].apply(
lambda code: f"{tax_id.upper()} L{level} {code}"
)
return fallback
# --------------------------------------------------------------------------- #
# Aggregation routines
def level4_frame(
df: pd.DataFrame, metrics: Sequence[str], tax_id: str, config: TaxonomyConfig
) -> pd.DataFrame:
labels = infer_level4_labels(df, config, tax_id)
frame = (
df[["code_clean", "year", *metrics]]
.rename(columns={"code_clean": "code"})
.assign(taxonomy=tax_id, level=4, n_children=1)
.merge(labels, on="code", how="left")
)
frame["label"] = frame["label"].fillna(
frame["code"].apply(lambda c: f"{tax_id.upper()} L4 {c}")
)
return frame[BASE_COLUMNS + list(metrics)].sort_values(["code", "year"])
def aggregate_parent_level(
df: pd.DataFrame,
metrics: Sequence[str],
tax_id: str,
level: int,
digits: int,
label_col: str | None,
) -> pd.DataFrame:
parent_col = f"code_level_{level}"
working = df.copy()
working[parent_col] = working["code_clean"].str.slice(0, digits)
working = working[working[parent_col].str.len() == digits]
if working.empty:
return pd.DataFrame()
grouped = working.groupby([parent_col, "year"])
agg = grouped[list(metrics)].mean(numeric_only=True).reset_index()
counts = (
grouped["code_clean"]
.nunique()
.reset_index()
.rename(columns={"code_clean": "n_children"})
)
agg = agg.merge(counts, on=[parent_col, "year"], how="left")
agg = agg.rename(columns={parent_col: "code"})
labels = parent_label_map(working, parent_col, label_col, tax_id, level)
agg = (
agg.merge(labels, on="code", how="left")
.assign(taxonomy=tax_id, level=level)
.reindex(columns=BASE_COLUMNS + list(metrics))
.sort_values(["code", "year"])
)
return agg
def build_aggregated_frame(tax_id: str, config: TaxonomyConfig) -> pd.DataFrame:
df, metrics = load_taxonomy_frame(tax_id, config)
frames = [level4_frame(df, metrics, tax_id, config)]
for level in PARENT_LEVELS:
digits = config.digits_for(level)
label_col = config.label_columns.get(level)
frame = aggregate_parent_level(df, metrics, tax_id, level, digits, label_col)
if not frame.empty:
frames.append(frame)
combined = (
pd.concat(frames, ignore_index=True)
.sort_values(["level", "code", "year"])
.reset_index(drop=True)
)
return combined
def write_output(df: pd.DataFrame, tax_id: str) -> Path:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
target = OUTPUT_DIR / f"daioe_{tax_id}_aggregated.csv"
df.to_csv(target, index=False)
return target
# --------------------------------------------------------------------------- #
# CLI orchestration
def discover_taxonomies(filter_list: Iterable[str] | None) -> List[str]:
available = sorted(
f.stem.replace("daioe_", "", 1) for f in RAW_DIR.glob("daioe_*.csv")
)
if not filter_list:
return available
requested = set(filter_list)
missing = sorted(requested - set(available))
if missing:
raise SystemExit(f"Unknown taxonomy requested: {', '.join(missing)}")
return [tax for tax in available if tax in requested]
def run_taxonomy(tax_id: str) -> None:
config = TAXONOMY_CONFIG.get(tax_id)
if not config:
print(f"[skip] No taxonomy config for {tax_id}")
return
aggregated = build_aggregated_frame(tax_id, config)
path = write_output(aggregated, tax_id)
print(f"[{tax_id}] wrote {len(aggregated)} rows → {path.relative_to(PROJECT_ROOT)}")
def parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Aggregate DAIOE indices to levels 4/3/2/1."
)
parser.add_argument(
"-t",
"--taxonomy",
action="append",
dest="taxonomies",
help="Only process the given taxonomy (repeat for multiple). Defaults to all.",
)
return parser.parse_args(argv)
def main(argv: Sequence[str] | None = None) -> None:
args = parse_args(argv)
taxonomies = discover_taxonomies(args.taxonomies)
if not taxonomies:
raise SystemExit("No daioe_*.csv files found under data/daioe_raw/")
for tax_id in taxonomies:
run_taxonomy(tax_id)
if __name__ == "__main__":
main()