Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Aggregate DAIOE raw indices into higher occupation levels (4→3→2→1) and | |
| persist one CSV per taxonomy under `data/daioe_aggregated/`. | |
| Each output row stores: | |
| taxonomy, level, code, label, year, n_children, daioe_* metrics | |
| Level 4 rows are the original records tagged with `n_children=1`. Levels 3/2/1 | |
| are simple child means grouped by the parent code and year. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Mapping, Sequence | |
| import pandas as pd | |
| # --------------------------------------------------------------------------- # | |
| # Paths & shared settings | |
| PROJECT_ROOT = Path(__file__).resolve().parents[1] | |
| RAW_DIR = PROJECT_ROOT / "data" / "daioe_raw" | |
| OUTPUT_DIR = PROJECT_ROOT / "data" / "daioe_aggregated" | |
| PARENT_LEVELS = (3, 2, 1) | |
| BASE_COLUMNS = ["taxonomy", "level", "code", "label", "year", "n_children"] | |
| METRIC_PREFIX = "daioe_" | |
| # --------------------------------------------------------------------------- # | |
| # Taxonomy metadata | |
| class TaxonomyConfig: | |
| code_column: str | |
| pad_digits: int = 4 | |
| label_columns: Mapping[int, str] = field(default_factory=dict) | |
| level_digits: Mapping[int, int] = field(default_factory=lambda: {3: 3, 2: 2, 1: 1}) | |
| level4_label_column: str | None = None | |
| def digits_for(self, level: int) -> int: | |
| return self.level_digits.get(level, level) | |
| TAXONOMY_CONFIG: Dict[str, TaxonomyConfig] = { | |
| "ssyk2012": TaxonomyConfig( | |
| code_column="ssyk2012_4", | |
| label_columns={3: "ssyk2012_3", 2: "ssyk2012_2", 1: "ssyk2012_1"}, | |
| level4_label_column="ssyk2012_4", | |
| ), | |
| "ssyk96": TaxonomyConfig( | |
| code_column="ssyk96_4", | |
| label_columns={3: "ssyk96_3", 2: "ssyk96_2", 1: "ssyk96_1"}, | |
| level4_label_column="ssyk96_4", | |
| ), | |
| # "isco08": TaxonomyConfig( | |
| # code_column="occ_code_isco08", | |
| # level4_label_column="occ_title_isco08", | |
| # ), | |
| # "soc2010": TaxonomyConfig( | |
| # code_column="occ_code_soc2010", | |
| # level4_label_column="occ_title_soc2010", | |
| # ), | |
| # "onetsoc2010": TaxonomyConfig( | |
| # code_column="occ_code_onetsoc2010", | |
| # level4_label_column="occ_title_onetsoc2010", | |
| # ), | |
| } | |
| # --------------------------------------------------------------------------- # | |
| # Data preparation helpers | |
| def clean_code(series: pd.Series, pad_digits: int) -> pd.Series: | |
| """Strip all non-digits while preserving leading zeros.""" | |
| cleaned = series.astype(str).str.replace(r"[^0-9]", "", regex=True) | |
| return cleaned.str.zfill(pad_digits).replace({"": None}) | |
| def find_metric_columns(df: pd.DataFrame) -> List[str]: | |
| cols = [c for c in df.columns if c.startswith(METRIC_PREFIX)] | |
| if not cols: | |
| raise ValueError("No DAIOE metric columns detected.") | |
| return cols | |
| def load_taxonomy_frame( | |
| tax_id: str, config: TaxonomyConfig | |
| ) -> tuple[pd.DataFrame, List[str]]: | |
| path = RAW_DIR / f"daioe_{tax_id}.csv" | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Missing raw file: {path}") | |
| df = pd.read_csv(path, sep="\t", na_values=["", "NA"]) | |
| df = df.copy() | |
| df["year"] = pd.to_numeric(df["year"], errors="coerce").astype("Int64") | |
| df = df.dropna(subset=["year"]) | |
| df["code_clean"] = clean_code(df[config.code_column], config.pad_digits) | |
| df = df.dropna(subset=["code_clean"]) | |
| metrics = find_metric_columns(df) | |
| df[metrics] = df[metrics].apply(pd.to_numeric, errors="coerce") | |
| return df, metrics | |
| # --------------------------------------------------------------------------- # | |
| # Label resolution | |
| def label_from_column(df: pd.DataFrame, code_col: str, label_col: str) -> pd.DataFrame: | |
| return ( | |
| df[[code_col, label_col]] | |
| .dropna(subset=[label_col]) | |
| .drop_duplicates(subset=[code_col]) | |
| .rename(columns={code_col: "code", label_col: "label"}) | |
| .assign(label=lambda s: s["label"].astype(str).str.strip()) | |
| .query("label != ''") | |
| ) | |
| def infer_level4_labels( | |
| df: pd.DataFrame, config: TaxonomyConfig, tax_id: str | |
| ) -> pd.DataFrame: | |
| candidates: List[str | None] = [ | |
| config.level4_label_column, | |
| *(col for col in df.columns if col.lower().endswith("_title")), | |
| *(col for col in df.columns if "occupation" in col.lower()), | |
| ] | |
| seen: set[str] = set() | |
| for col in candidates: | |
| if not col or col in seen or col not in df.columns: | |
| continue | |
| seen.add(col) | |
| label_map = label_from_column(df, "code_clean", col) | |
| if not label_map.empty: | |
| return label_map | |
| fallback = ( | |
| df[["code_clean"]].drop_duplicates().rename(columns={"code_clean": "code"}) | |
| ) | |
| fallback["label"] = fallback["code"].apply( | |
| lambda code: f"{tax_id.upper()} L4 {code}" | |
| ) | |
| return fallback | |
| def parent_label_map( | |
| df: pd.DataFrame, parent_col: str, label_col: str | None, tax_id: str, level: int | |
| ) -> pd.DataFrame: | |
| if label_col and label_col in df.columns: | |
| return label_from_column(df, parent_col, label_col) | |
| fallback = df[[parent_col]].drop_duplicates().rename(columns={parent_col: "code"}) | |
| fallback["label"] = fallback["code"].apply( | |
| lambda code: f"{tax_id.upper()} L{level} {code}" | |
| ) | |
| return fallback | |
| # --------------------------------------------------------------------------- # | |
| # Aggregation routines | |
| def level4_frame( | |
| df: pd.DataFrame, metrics: Sequence[str], tax_id: str, config: TaxonomyConfig | |
| ) -> pd.DataFrame: | |
| labels = infer_level4_labels(df, config, tax_id) | |
| frame = ( | |
| df[["code_clean", "year", *metrics]] | |
| .rename(columns={"code_clean": "code"}) | |
| .assign(taxonomy=tax_id, level=4, n_children=1) | |
| .merge(labels, on="code", how="left") | |
| ) | |
| frame["label"] = frame["label"].fillna( | |
| frame["code"].apply(lambda c: f"{tax_id.upper()} L4 {c}") | |
| ) | |
| return frame[BASE_COLUMNS + list(metrics)].sort_values(["code", "year"]) | |
| def aggregate_parent_level( | |
| df: pd.DataFrame, | |
| metrics: Sequence[str], | |
| tax_id: str, | |
| level: int, | |
| digits: int, | |
| label_col: str | None, | |
| ) -> pd.DataFrame: | |
| parent_col = f"code_level_{level}" | |
| working = df.copy() | |
| working[parent_col] = working["code_clean"].str.slice(0, digits) | |
| working = working[working[parent_col].str.len() == digits] | |
| if working.empty: | |
| return pd.DataFrame() | |
| grouped = working.groupby([parent_col, "year"]) | |
| agg = grouped[list(metrics)].mean(numeric_only=True).reset_index() | |
| counts = ( | |
| grouped["code_clean"] | |
| .nunique() | |
| .reset_index() | |
| .rename(columns={"code_clean": "n_children"}) | |
| ) | |
| agg = agg.merge(counts, on=[parent_col, "year"], how="left") | |
| agg = agg.rename(columns={parent_col: "code"}) | |
| labels = parent_label_map(working, parent_col, label_col, tax_id, level) | |
| agg = ( | |
| agg.merge(labels, on="code", how="left") | |
| .assign(taxonomy=tax_id, level=level) | |
| .reindex(columns=BASE_COLUMNS + list(metrics)) | |
| .sort_values(["code", "year"]) | |
| ) | |
| return agg | |
| def build_aggregated_frame(tax_id: str, config: TaxonomyConfig) -> pd.DataFrame: | |
| df, metrics = load_taxonomy_frame(tax_id, config) | |
| frames = [level4_frame(df, metrics, tax_id, config)] | |
| for level in PARENT_LEVELS: | |
| digits = config.digits_for(level) | |
| label_col = config.label_columns.get(level) | |
| frame = aggregate_parent_level(df, metrics, tax_id, level, digits, label_col) | |
| if not frame.empty: | |
| frames.append(frame) | |
| combined = ( | |
| pd.concat(frames, ignore_index=True) | |
| .sort_values(["level", "code", "year"]) | |
| .reset_index(drop=True) | |
| ) | |
| return combined | |
| def write_output(df: pd.DataFrame, tax_id: str) -> Path: | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| target = OUTPUT_DIR / f"daioe_{tax_id}_aggregated.csv" | |
| df.to_csv(target, index=False) | |
| return target | |
| # --------------------------------------------------------------------------- # | |
| # CLI orchestration | |
| def discover_taxonomies(filter_list: Iterable[str] | None) -> List[str]: | |
| available = sorted( | |
| f.stem.replace("daioe_", "", 1) for f in RAW_DIR.glob("daioe_*.csv") | |
| ) | |
| if not filter_list: | |
| return available | |
| requested = set(filter_list) | |
| missing = sorted(requested - set(available)) | |
| if missing: | |
| raise SystemExit(f"Unknown taxonomy requested: {', '.join(missing)}") | |
| return [tax for tax in available if tax in requested] | |
| def run_taxonomy(tax_id: str) -> None: | |
| config = TAXONOMY_CONFIG.get(tax_id) | |
| if not config: | |
| print(f"[skip] No taxonomy config for {tax_id}") | |
| return | |
| aggregated = build_aggregated_frame(tax_id, config) | |
| path = write_output(aggregated, tax_id) | |
| print(f"[{tax_id}] wrote {len(aggregated)} rows → {path.relative_to(PROJECT_ROOT)}") | |
| def parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="Aggregate DAIOE indices to levels 4/3/2/1." | |
| ) | |
| parser.add_argument( | |
| "-t", | |
| "--taxonomy", | |
| action="append", | |
| dest="taxonomies", | |
| help="Only process the given taxonomy (repeat for multiple). Defaults to all.", | |
| ) | |
| return parser.parse_args(argv) | |
| def main(argv: Sequence[str] | None = None) -> None: | |
| args = parse_args(argv) | |
| taxonomies = discover_taxonomies(args.taxonomies) | |
| if not taxonomies: | |
| raise SystemExit("No daioe_*.csv files found under data/daioe_raw/") | |
| for tax_id in taxonomies: | |
| run_taxonomy(tax_id) | |
| if __name__ == "__main__": | |
| main() | |