File size: 5,849 Bytes
3e12d11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ec6e9a7
 
 
 
 
 
3e12d11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""Core pipeline logic for SCB employment-only data.

This module fetches employment data from Statistics Sweden (SCB),
derives SSYK2012 hierarchy columns from 4-digit codes, and aggregates
employment totals across hierarchy levels. DAIOE exposure inputs have
been removed so the output contains only SCB employment counts.
"""

from __future__ import annotations

from typing import Dict, Optional

import logging
import pandas as pd

from .config import TAXONOMY
from .label_enrichment import apply_translations
from .scb_fetch import fetch_all_employment_data

logger = logging.getLogger(__name__)


def filter_years(
    df: pd.DataFrame,
    year_min: Optional[int],
    year_max: Optional[int],
    *,
    year_col: str,
) -> pd.DataFrame:
    """Return a DataFrame filtered to the inclusive year range."""
    if year_min is None and year_max is None:
        return df.copy()
    mask = pd.Series(True, index=df.index, dtype=bool)
    if year_min is not None:
        mask &= df[year_col] >= year_min
    if year_max is not None:
        mask &= df[year_col] <= year_max
    mask = mask.fillna(False)
    return df.loc[mask].copy()


def prepare_employment(
    raw: pd.DataFrame,
    *,
    year_min: Optional[int] = None,
    year_max: Optional[int] = None,
) -> pd.DataFrame:
    """Clean SCB employment data and derive SSYK hierarchy columns."""
    if raw.empty:
        raise ValueError("SCB fetch returned an empty DataFrame.")

    emp = raw.copy()
    emp["code4"] = emp["code_4"].astype(str).str.zfill(4)
    emp["code3"] = emp["code4"].str[:3]
    emp["code2"] = emp["code4"].str[:2]
    emp["code1"] = emp["code4"].str[:1]

    emp["label4"] = emp["occupation"].fillna("").str.strip()
    emp["label3"] = emp["code3"]
    emp["label2"] = emp["code2"]
    emp["label1"] = emp["code1"]

    emp["age"] = emp["age"].astype(str).str.strip()
    emp["year"] = pd.to_numeric(emp["year"], errors="coerce").astype("Int64")
    emp["employment"] = pd.to_numeric(emp["value"], errors="coerce").fillna(0)

    emp = emp.dropna(subset=["year"])
    emp = filter_years(emp, year_min, year_max, year_col="year")

    ordered_cols = [
        "year",
        "age",
        "code4",
        "label4",
        "code3",
        "label3",
        "code2",
        "label2",
        "code1",
        "label1",
        "employment",
    ]
    return emp[ordered_cols]


def compute_children_maps(df: pd.DataFrame) -> Dict[int, pd.DataFrame]:
    """Count the number of descendants for each code at each hierarchy level."""
    base = df[["year", "code4", "code3", "code2", "code1"]].drop_duplicates()
    counts: Dict[int, pd.DataFrame] = {}
    counts[3] = (
        base.groupby(["year", "code3"])["code4"]
        .nunique()
        .reset_index(name="n_children")
    )
    counts[2] = (
        base.groupby(["year", "code2"])["code3"]
        .nunique()
        .reset_index(name="n_children")
    )
    counts[1] = (
        base.groupby(["year", "code1"])["code2"]
        .nunique()
        .reset_index(name="n_children")
    )
    lvl4 = base.groupby(["year", "code4"]).size().reset_index(name="n_children")
    lvl4["n_children"] = 1
    counts[4] = lvl4
    return counts


def build_employment_views(emp: pd.DataFrame) -> Dict[int, Dict[str, pd.DataFrame]]:
    """Build employment views (age and totals) for each hierarchy level."""
    views: Dict[int, Dict[str, pd.DataFrame]] = {}
    for level in (4, 3, 2, 1):
        code_col, label_col = f"code{level}", f"label{level}"
        age_view = emp.groupby(
            ["year", "age", code_col, label_col], as_index=False
        )["employment"].sum()
        total_view = (
            age_view.groupby(["year", code_col, label_col], as_index=False)["employment"]
            .sum()
            .rename(columns={"employment": "employment_total"})
        )
        views[level] = {"age": age_view, "total": total_view}
    return views


def build_level_frame(
    level: int, views: Dict[int, Dict[str, pd.DataFrame]], children: Dict[int, pd.DataFrame]
) -> pd.DataFrame:
    """Combine age-level employment, totals and child counts for a level."""
    code_col, label_col = f"code{level}", f"label{level}"
    age_view = views[level]["age"].copy()
    totals = views[level]["total"]

    merged = (
        age_view.merge(totals, on=["year", code_col, label_col], how="left")
        .merge(children[level], on=["year", code_col], how="left")
    )
    merged["level"] = level
    merged["taxonomy"] = TAXONOMY
    merged = merged.rename(columns={code_col: "code", label_col: "label"})

    ordered = [
        "taxonomy",
        "level",
        "code",
        "label",
        "year",
        "n_children",
        "age",
        "employment",
        "employment_total",
    ]
    return merged[ordered]


def run_pipeline(
    *,
    year_min: Optional[int] = None,
    year_max: Optional[int] = None,
) -> pd.DataFrame:
    """Run the SCB-only pipeline and return aggregated employment data.

    The returned frame is normalized across hierarchy levels with columns:
    `taxonomy`, `level`, `code`, `label`, `year`, `n_children`, `age`,
    `employment`, and `employment_total`.
    """
    logger.info("Starting SCB-only employment pipeline")
    raw = fetch_all_employment_data()
    employment = prepare_employment(raw, year_min=year_min, year_max=year_max)

    if employment.empty:
        raise ValueError("No SCB employment rows remain after filtering.")

    children = compute_children_maps(employment)
    emp_views = build_employment_views(employment)

    levels = [
        build_level_frame(level, emp_views, children) for level in (1, 2, 3, 4)
    ]
    combined = pd.concat(levels, ignore_index=True)
    combined = combined.sort_values(["level", "code", "year", "age"], ignore_index=True)
    combined = apply_translations(combined)
    return combined