Spaces:
Sleeping
Sleeping
File size: 5,849 Bytes
3e12d11 ec6e9a7 3e12d11 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | """Core pipeline logic for SCB employment-only data.
This module fetches employment data from Statistics Sweden (SCB),
derives SSYK2012 hierarchy columns from 4-digit codes, and aggregates
employment totals across hierarchy levels. DAIOE exposure inputs have
been removed so the output contains only SCB employment counts.
"""
from __future__ import annotations
from typing import Dict, Optional
import logging
import pandas as pd
from .config import TAXONOMY
from .label_enrichment import apply_translations
from .scb_fetch import fetch_all_employment_data
logger = logging.getLogger(__name__)
def filter_years(
df: pd.DataFrame,
year_min: Optional[int],
year_max: Optional[int],
*,
year_col: str,
) -> pd.DataFrame:
"""Return a DataFrame filtered to the inclusive year range."""
if year_min is None and year_max is None:
return df.copy()
mask = pd.Series(True, index=df.index, dtype=bool)
if year_min is not None:
mask &= df[year_col] >= year_min
if year_max is not None:
mask &= df[year_col] <= year_max
mask = mask.fillna(False)
return df.loc[mask].copy()
def prepare_employment(
raw: pd.DataFrame,
*,
year_min: Optional[int] = None,
year_max: Optional[int] = None,
) -> pd.DataFrame:
"""Clean SCB employment data and derive SSYK hierarchy columns."""
if raw.empty:
raise ValueError("SCB fetch returned an empty DataFrame.")
emp = raw.copy()
emp["code4"] = emp["code_4"].astype(str).str.zfill(4)
emp["code3"] = emp["code4"].str[:3]
emp["code2"] = emp["code4"].str[:2]
emp["code1"] = emp["code4"].str[:1]
emp["label4"] = emp["occupation"].fillna("").str.strip()
emp["label3"] = emp["code3"]
emp["label2"] = emp["code2"]
emp["label1"] = emp["code1"]
emp["age"] = emp["age"].astype(str).str.strip()
emp["year"] = pd.to_numeric(emp["year"], errors="coerce").astype("Int64")
emp["employment"] = pd.to_numeric(emp["value"], errors="coerce").fillna(0)
emp = emp.dropna(subset=["year"])
emp = filter_years(emp, year_min, year_max, year_col="year")
ordered_cols = [
"year",
"age",
"code4",
"label4",
"code3",
"label3",
"code2",
"label2",
"code1",
"label1",
"employment",
]
return emp[ordered_cols]
def compute_children_maps(df: pd.DataFrame) -> Dict[int, pd.DataFrame]:
"""Count the number of descendants for each code at each hierarchy level."""
base = df[["year", "code4", "code3", "code2", "code1"]].drop_duplicates()
counts: Dict[int, pd.DataFrame] = {}
counts[3] = (
base.groupby(["year", "code3"])["code4"]
.nunique()
.reset_index(name="n_children")
)
counts[2] = (
base.groupby(["year", "code2"])["code3"]
.nunique()
.reset_index(name="n_children")
)
counts[1] = (
base.groupby(["year", "code1"])["code2"]
.nunique()
.reset_index(name="n_children")
)
lvl4 = base.groupby(["year", "code4"]).size().reset_index(name="n_children")
lvl4["n_children"] = 1
counts[4] = lvl4
return counts
def build_employment_views(emp: pd.DataFrame) -> Dict[int, Dict[str, pd.DataFrame]]:
"""Build employment views (age and totals) for each hierarchy level."""
views: Dict[int, Dict[str, pd.DataFrame]] = {}
for level in (4, 3, 2, 1):
code_col, label_col = f"code{level}", f"label{level}"
age_view = emp.groupby(
["year", "age", code_col, label_col], as_index=False
)["employment"].sum()
total_view = (
age_view.groupby(["year", code_col, label_col], as_index=False)["employment"]
.sum()
.rename(columns={"employment": "employment_total"})
)
views[level] = {"age": age_view, "total": total_view}
return views
def build_level_frame(
level: int, views: Dict[int, Dict[str, pd.DataFrame]], children: Dict[int, pd.DataFrame]
) -> pd.DataFrame:
"""Combine age-level employment, totals and child counts for a level."""
code_col, label_col = f"code{level}", f"label{level}"
age_view = views[level]["age"].copy()
totals = views[level]["total"]
merged = (
age_view.merge(totals, on=["year", code_col, label_col], how="left")
.merge(children[level], on=["year", code_col], how="left")
)
merged["level"] = level
merged["taxonomy"] = TAXONOMY
merged = merged.rename(columns={code_col: "code", label_col: "label"})
ordered = [
"taxonomy",
"level",
"code",
"label",
"year",
"n_children",
"age",
"employment",
"employment_total",
]
return merged[ordered]
def run_pipeline(
*,
year_min: Optional[int] = None,
year_max: Optional[int] = None,
) -> pd.DataFrame:
"""Run the SCB-only pipeline and return aggregated employment data.
The returned frame is normalized across hierarchy levels with columns:
`taxonomy`, `level`, `code`, `label`, `year`, `n_children`, `age`,
`employment`, and `employment_total`.
"""
logger.info("Starting SCB-only employment pipeline")
raw = fetch_all_employment_data()
employment = prepare_employment(raw, year_min=year_min, year_max=year_max)
if employment.empty:
raise ValueError("No SCB employment rows remain after filtering.")
children = compute_children_maps(employment)
emp_views = build_employment_views(employment)
levels = [
build_level_frame(level, emp_views, children) for level in (1, 2, 3, 4)
]
combined = pd.concat(levels, ignore_index=True)
combined = combined.sort_values(["level", "code", "year", "age"], ignore_index=True)
combined = apply_translations(combined)
return combined
|