Spaces:
Sleeping
Sleeping
| """Core pipeline logic for SCB employment-only data. | |
| This module fetches employment data from Statistics Sweden (SCB), | |
| derives SSYK2012 hierarchy columns from 4-digit codes, and aggregates | |
| employment totals across hierarchy levels. DAIOE exposure inputs have | |
| been removed so the output contains only SCB employment counts. | |
| """ | |
| from __future__ import annotations | |
| from typing import Dict, Optional | |
| import logging | |
| import pandas as pd | |
| from .config import TAXONOMY | |
| from .label_enrichment import apply_translations | |
| from .scb_fetch import fetch_all_employment_data | |
| logger = logging.getLogger(__name__) | |
| def filter_years( | |
| df: pd.DataFrame, | |
| year_min: Optional[int], | |
| year_max: Optional[int], | |
| *, | |
| year_col: str, | |
| ) -> pd.DataFrame: | |
| """Return a DataFrame filtered to the inclusive year range.""" | |
| if year_min is None and year_max is None: | |
| return df.copy() | |
| mask = pd.Series(True, index=df.index, dtype=bool) | |
| if year_min is not None: | |
| mask &= df[year_col] >= year_min | |
| if year_max is not None: | |
| mask &= df[year_col] <= year_max | |
| mask = mask.fillna(False) | |
| return df.loc[mask].copy() | |
| def prepare_employment( | |
| raw: pd.DataFrame, | |
| *, | |
| year_min: Optional[int] = None, | |
| year_max: Optional[int] = None, | |
| ) -> pd.DataFrame: | |
| """Clean SCB employment data and derive SSYK hierarchy columns.""" | |
| if raw.empty: | |
| raise ValueError("SCB fetch returned an empty DataFrame.") | |
| emp = raw.copy() | |
| emp["code4"] = emp["code_4"].astype(str).str.zfill(4) | |
| emp["code3"] = emp["code4"].str[:3] | |
| emp["code2"] = emp["code4"].str[:2] | |
| emp["code1"] = emp["code4"].str[:1] | |
| emp["label4"] = emp["occupation"].fillna("").str.strip() | |
| emp["label3"] = emp["code3"] | |
| emp["label2"] = emp["code2"] | |
| emp["label1"] = emp["code1"] | |
| emp["age"] = emp["age"].astype(str).str.strip() | |
| emp["year"] = pd.to_numeric(emp["year"], errors="coerce").astype("Int64") | |
| emp["employment"] = pd.to_numeric(emp["value"], errors="coerce").fillna(0) | |
| emp = emp.dropna(subset=["year"]) | |
| emp = filter_years(emp, year_min, year_max, year_col="year") | |
| ordered_cols = [ | |
| "year", | |
| "age", | |
| "code4", | |
| "label4", | |
| "code3", | |
| "label3", | |
| "code2", | |
| "label2", | |
| "code1", | |
| "label1", | |
| "employment", | |
| ] | |
| return emp[ordered_cols] | |
| def compute_children_maps(df: pd.DataFrame) -> Dict[int, pd.DataFrame]: | |
| """Count the number of descendants for each code at each hierarchy level.""" | |
| base = df[["year", "code4", "code3", "code2", "code1"]].drop_duplicates() | |
| counts: Dict[int, pd.DataFrame] = {} | |
| counts[3] = ( | |
| base.groupby(["year", "code3"])["code4"] | |
| .nunique() | |
| .reset_index(name="n_children") | |
| ) | |
| counts[2] = ( | |
| base.groupby(["year", "code2"])["code3"] | |
| .nunique() | |
| .reset_index(name="n_children") | |
| ) | |
| counts[1] = ( | |
| base.groupby(["year", "code1"])["code2"] | |
| .nunique() | |
| .reset_index(name="n_children") | |
| ) | |
| lvl4 = base.groupby(["year", "code4"]).size().reset_index(name="n_children") | |
| lvl4["n_children"] = 1 | |
| counts[4] = lvl4 | |
| return counts | |
| def build_employment_views(emp: pd.DataFrame) -> Dict[int, Dict[str, pd.DataFrame]]: | |
| """Build employment views (age and totals) for each hierarchy level.""" | |
| views: Dict[int, Dict[str, pd.DataFrame]] = {} | |
| for level in (4, 3, 2, 1): | |
| code_col, label_col = f"code{level}", f"label{level}" | |
| age_view = emp.groupby( | |
| ["year", "age", code_col, label_col], as_index=False | |
| )["employment"].sum() | |
| total_view = ( | |
| age_view.groupby(["year", code_col, label_col], as_index=False)["employment"] | |
| .sum() | |
| .rename(columns={"employment": "employment_total"}) | |
| ) | |
| views[level] = {"age": age_view, "total": total_view} | |
| return views | |
| def build_level_frame( | |
| level: int, views: Dict[int, Dict[str, pd.DataFrame]], children: Dict[int, pd.DataFrame] | |
| ) -> pd.DataFrame: | |
| """Combine age-level employment, totals and child counts for a level.""" | |
| code_col, label_col = f"code{level}", f"label{level}" | |
| age_view = views[level]["age"].copy() | |
| totals = views[level]["total"] | |
| merged = ( | |
| age_view.merge(totals, on=["year", code_col, label_col], how="left") | |
| .merge(children[level], on=["year", code_col], how="left") | |
| ) | |
| merged["level"] = level | |
| merged["taxonomy"] = TAXONOMY | |
| merged = merged.rename(columns={code_col: "code", label_col: "label"}) | |
| ordered = [ | |
| "taxonomy", | |
| "level", | |
| "code", | |
| "label", | |
| "year", | |
| "n_children", | |
| "age", | |
| "employment", | |
| "employment_total", | |
| ] | |
| return merged[ordered] | |
| def run_pipeline( | |
| *, | |
| year_min: Optional[int] = None, | |
| year_max: Optional[int] = None, | |
| ) -> pd.DataFrame: | |
| """Run the SCB-only pipeline and return aggregated employment data. | |
| The returned frame is normalized across hierarchy levels with columns: | |
| `taxonomy`, `level`, `code`, `label`, `year`, `n_children`, `age`, | |
| `employment`, and `employment_total`. | |
| """ | |
| logger.info("Starting SCB-only employment pipeline") | |
| raw = fetch_all_employment_data() | |
| employment = prepare_employment(raw, year_min=year_min, year_max=year_max) | |
| if employment.empty: | |
| raise ValueError("No SCB employment rows remain after filtering.") | |
| children = compute_children_maps(employment) | |
| emp_views = build_employment_views(employment) | |
| levels = [ | |
| build_level_frame(level, emp_views, children) for level in (1, 2, 3, 4) | |
| ] | |
| combined = pd.concat(levels, ignore_index=True) | |
| combined = combined.sort_values(["level", "code", "year", "age"], ignore_index=True) | |
| combined = apply_translations(combined) | |
| return combined | |