occupation_ai / src /pipeline.py
joseph-data's picture
updated the pipeline
ec6e9a7 unverified
"""Core pipeline logic for SCB employment-only data.
This module fetches employment data from Statistics Sweden (SCB),
derives SSYK2012 hierarchy columns from 4-digit codes, and aggregates
employment totals across hierarchy levels. DAIOE exposure inputs have
been removed so the output contains only SCB employment counts.
"""
from __future__ import annotations
from typing import Dict, Optional
import logging
import pandas as pd
from .config import TAXONOMY
from .label_enrichment import apply_translations
from .scb_fetch import fetch_all_employment_data
logger = logging.getLogger(__name__)
def filter_years(
df: pd.DataFrame,
year_min: Optional[int],
year_max: Optional[int],
*,
year_col: str,
) -> pd.DataFrame:
"""Return a DataFrame filtered to the inclusive year range."""
if year_min is None and year_max is None:
return df.copy()
mask = pd.Series(True, index=df.index, dtype=bool)
if year_min is not None:
mask &= df[year_col] >= year_min
if year_max is not None:
mask &= df[year_col] <= year_max
mask = mask.fillna(False)
return df.loc[mask].copy()
def prepare_employment(
raw: pd.DataFrame,
*,
year_min: Optional[int] = None,
year_max: Optional[int] = None,
) -> pd.DataFrame:
"""Clean SCB employment data and derive SSYK hierarchy columns."""
if raw.empty:
raise ValueError("SCB fetch returned an empty DataFrame.")
emp = raw.copy()
emp["code4"] = emp["code_4"].astype(str).str.zfill(4)
emp["code3"] = emp["code4"].str[:3]
emp["code2"] = emp["code4"].str[:2]
emp["code1"] = emp["code4"].str[:1]
emp["label4"] = emp["occupation"].fillna("").str.strip()
emp["label3"] = emp["code3"]
emp["label2"] = emp["code2"]
emp["label1"] = emp["code1"]
emp["age"] = emp["age"].astype(str).str.strip()
emp["year"] = pd.to_numeric(emp["year"], errors="coerce").astype("Int64")
emp["employment"] = pd.to_numeric(emp["value"], errors="coerce").fillna(0)
emp = emp.dropna(subset=["year"])
emp = filter_years(emp, year_min, year_max, year_col="year")
ordered_cols = [
"year",
"age",
"code4",
"label4",
"code3",
"label3",
"code2",
"label2",
"code1",
"label1",
"employment",
]
return emp[ordered_cols]
def compute_children_maps(df: pd.DataFrame) -> Dict[int, pd.DataFrame]:
"""Count the number of descendants for each code at each hierarchy level."""
base = df[["year", "code4", "code3", "code2", "code1"]].drop_duplicates()
counts: Dict[int, pd.DataFrame] = {}
counts[3] = (
base.groupby(["year", "code3"])["code4"]
.nunique()
.reset_index(name="n_children")
)
counts[2] = (
base.groupby(["year", "code2"])["code3"]
.nunique()
.reset_index(name="n_children")
)
counts[1] = (
base.groupby(["year", "code1"])["code2"]
.nunique()
.reset_index(name="n_children")
)
lvl4 = base.groupby(["year", "code4"]).size().reset_index(name="n_children")
lvl4["n_children"] = 1
counts[4] = lvl4
return counts
def build_employment_views(emp: pd.DataFrame) -> Dict[int, Dict[str, pd.DataFrame]]:
"""Build employment views (age and totals) for each hierarchy level."""
views: Dict[int, Dict[str, pd.DataFrame]] = {}
for level in (4, 3, 2, 1):
code_col, label_col = f"code{level}", f"label{level}"
age_view = emp.groupby(
["year", "age", code_col, label_col], as_index=False
)["employment"].sum()
total_view = (
age_view.groupby(["year", code_col, label_col], as_index=False)["employment"]
.sum()
.rename(columns={"employment": "employment_total"})
)
views[level] = {"age": age_view, "total": total_view}
return views
def build_level_frame(
level: int, views: Dict[int, Dict[str, pd.DataFrame]], children: Dict[int, pd.DataFrame]
) -> pd.DataFrame:
"""Combine age-level employment, totals and child counts for a level."""
code_col, label_col = f"code{level}", f"label{level}"
age_view = views[level]["age"].copy()
totals = views[level]["total"]
merged = (
age_view.merge(totals, on=["year", code_col, label_col], how="left")
.merge(children[level], on=["year", code_col], how="left")
)
merged["level"] = level
merged["taxonomy"] = TAXONOMY
merged = merged.rename(columns={code_col: "code", label_col: "label"})
ordered = [
"taxonomy",
"level",
"code",
"label",
"year",
"n_children",
"age",
"employment",
"employment_total",
]
return merged[ordered]
def run_pipeline(
*,
year_min: Optional[int] = None,
year_max: Optional[int] = None,
) -> pd.DataFrame:
"""Run the SCB-only pipeline and return aggregated employment data.
The returned frame is normalized across hierarchy levels with columns:
`taxonomy`, `level`, `code`, `label`, `year`, `n_children`, `age`,
`employment`, and `employment_total`.
"""
logger.info("Starting SCB-only employment pipeline")
raw = fetch_all_employment_data()
employment = prepare_employment(raw, year_min=year_min, year_max=year_max)
if employment.empty:
raise ValueError("No SCB employment rows remain after filtering.")
children = compute_children_maps(employment)
emp_views = build_employment_views(employment)
levels = [
build_level_frame(level, emp_views, children) for level in (1, 2, 3, 4)
]
combined = pd.concat(levels, ignore_index=True)
combined = combined.sort_values(["level", "code", "year", "age"], ignore_index=True)
combined = apply_translations(combined)
return combined