File size: 5,020 Bytes
3e12d11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""Helpers for fetching employment data from the SCB API.

This module wraps the ``pyscbwrapper`` library to download
occupation/employment tables from Statistics Sweden.  Error handling
and logging are centralised here so that callers of ``fetch_all_employment_data``
can remain agnostic of the details.
"""

from typing import Tuple
import logging

import pandas as pd
from pyscbwrapper import SCB

from .config import AGE_EXCLUSIONS, EXCLUDED_CODES, TABLES

logger = logging.getLogger(__name__)


def fetch_scb_table(
    table_id: str, config: Tuple[str, str, str, str, str]
) -> pd.DataFrame:
    """Fetch and transform a single SCB table.

    Parameters
    ----------
    table_id : str
        A key identifying which table definition in ``TABLES`` to use.
    config : Tuple[str, str, str, str, str]
        The tuple of (language, subject, table, variable_code, filter) used
        by ``pyscbwrapper.SCB`` to form the query.

    Returns
    -------
    pd.DataFrame
        A DataFrame containing one row per (4‑digit occupation code, age,
        year) combination.  Returns an empty frame on error.
    """
    logger.info("Starting SCB fetch for table %s", table_id)
    try:
        scb = SCB(*config)
        var_ = scb.get_variables()

        def get_key_raw(term: str) -> str:
            return next(k for k in var_ if term in k.lower())

        # Identify variable keys from the SCB metadata
        occ_key_raw = get_key_raw("occupation")
        year_key_raw = get_key_raw("year")
        age_key_raw = get_key_raw("age")

        # Filter out excluded ages
        all_ages = var_[age_key_raw]
        filtered_ages = [age for age in all_ages if age not in AGE_EXCLUSIONS]

        # Build the query: remove spaces from the occupation key because SCB
        # uses inconsistent spacing conventions
        query_args = {
            occ_key_raw.replace(" ", ""): var_[occ_key_raw],
            year_key_raw: var_[year_key_raw],
            age_key_raw: filtered_ages,
        }
        scb.set_query(**query_args)

        raw_data = scb.get_data()
        scb_fetch = raw_data.get("data", [])

        # Build a mapping from code to human‑readable occupation name using the
        # query metadata.  We fall back to the code itself if no mapping
        # exists.
        query_meta = scb.get_query().get("query", [])
        occ_meta_vals = next(
            q["selection"]["values"]
            for q in query_meta
            if "occupation" in q["code"].lower() or q["code"] == "Yrke2012"
        )
        occ_dict = dict(zip(occ_meta_vals, var_[occ_key_raw]))

        records = []
        for r in scb_fetch:
            code, age, year = r.get("key", [])[:3]
            records.append(
                {
                    "code_4": code,
                    "occupation": occ_dict.get(code, code),
                    "age": age,
                    "year": year,
                    "value": r.get("values", [None])[0],
                    "source_table": table_id,
                }
            )
        return pd.DataFrame.from_records(records)

    except Exception as exc:
        logger.error("Error processing SCB table %s: %s", table_id, exc)
        return pd.DataFrame()


def fetch_all_employment_data() -> pd.DataFrame:
    """Fetch and consolidate employment data across all configured SCB tables.

    The configured tables in ``TABLES`` may overlap in years.  When
    overlaps occur, later tables in the dictionary take precedence over
    earlier ones.  Rows whose occupation codes are listed in
    ``EXCLUDED_CODES`` are removed.

    Returns
    -------
    pd.DataFrame
        A DataFrame indexed by (code_4, age, year) with a single
        numeric ``value`` column containing the employment counts.
        Returns an empty frame if no data could be retrieved.
    """
    logger.info("Beginning employment data collection from SCB")
    dfs: list[pd.DataFrame] = []
    for tab_id, config in TABLES.items():
        df_part = fetch_scb_table(tab_id, config)
        if not df_part.empty:
            dfs.append(df_part)
        else:
            logger.warning("No data retrieved for table %s", tab_id)

    # If nothing fetched, return an empty DataFrame
    if not dfs:
        logger.warning("All SCB table fetches returned empty DataFrames")
        return pd.DataFrame()

    df = pd.concat(dfs, ignore_index=True)

    # Resolve overlaps between tables by assigning a priority to each table.
    table_priority = {key: i for i, key in enumerate(TABLES.keys())}
    df["table_priority"] = df["source_table"].map(table_priority)
    df = (
        df.sort_values(["code_4", "age", "year", "table_priority"])
        .drop_duplicates(subset=["code_4", "age", "year"], keep="last")
        .drop(columns=["table_priority"])
    )

    # Exclude specified codes and coerce the value column to numeric
    df = df[~df["code_4"].isin(EXCLUDED_CODES)].reset_index(drop=True)
    df["value"] = pd.to_numeric(df["value"], errors="coerce")

    return df