dummy_app / scripts /01_scbpull_mine.py
joseph-data's picture
dummy app start
1264273 unverified
## Required Libraries
import pandas as pd
from pyscbwrapper import SCB
from pathlib import Path
## Sorting the working directory
# ROOT = Path(__file__).resolve().parents[1] ## for scripts
ROOT = Path.cwd().resolve().parents[1] ## for notebooks
## Define the Table Nodes - Can be changed
TAX_ID = "ssyk2012" # or ssyk96 Classification
TABLES = {
"ssyk2012_tab": ("en", "AM", "AM0208", "AM0208E", "YREG51BAS"),
"ssyk96_tab": ("en", "AM", "AM0208", "AM0208E", "YREG33"),
}
scb = SCB(*TABLES[f"{TAX_ID}_tab"])
# scb.info()
var_ = scb.get_variables()
occupations_key, occupations = next(iter(var_.items()))
clean_key = occupations_key.replace(" ", "")
# Years can be strings; coerce to int safely and pick max
def coerce_year(y):
try:
return int(y)
except Exception:
return None
years = [coerce_year(y) for y in var_["year"]]
years = [y for y in years if y is not None]
latest_year = str(max(years))
# ========= 3) Build query (match exact variable names) ========= #
scb.set_query(
**{
clean_key: occupations,
"year": [latest_year],
}
)
scb_data = scb.get_data()
scb_fetch = scb_data["data"]
codes = scb.get_query()["query"][0]["selection"]["values"]
occ_dict = dict(zip(codes, occupations))
# CREATING CLEANED DATAFRAME
records = []
for r in scb_fetch:
code, year = r["key"][:2] # occupation code, year
name = occ_dict.get(code, code)
value = r["values"][0] # raw string
records.append({"code_4": code, "occupation": name, "year": year, "value": value})
df = pd.DataFrame(records)
df = df[df["code_4"] != "0002"].reset_index(drop=True) ## remove unidentified group
# Drop missing values before weighting
df = df.dropna(subset=["value"]).reset_index(drop=True)
# Code formatting
df["code_4"] = df["code_4"].astype(str).str.zfill(4)
df["code_3"] = df["code_4"].str[:3]
df["code_2"] = df["code_4"].str[:2]
df["code_1"] = df["code_4"].str[0]
df["value"] = df["value"].astype(int)
# Create aggregated totals automatically
for level in ["code_1", "code_2", "code_3"]:
df[f"value_{level[-1]}"] = (
df.groupby(["year", level])["value"].transform("sum").astype(int)
)
# weighting per the next broader group
df["weight_in_code_3"] = (df["value"] / df["value_3"]).round(6)
df["weight_in_code_2"] = (df["value_3"] / df["value_2"]).round(6)
df["weight_in_code_1"] = (df["value_2"] / df["value_1"]).round(6)
## REMOVE EXTRA COLUMNS AND PREPARE DATA FOR NEXT STAGE
df = df[
[
"code_4",
"occupation",
"value",
"weight_in_code_3",
"weight_in_code_2",
"weight_in_code_1",
]
]
# sanity check
out_path = ROOT / "data" / "scb_weights" / f"{TAX_ID}_weights_en_{latest_year}.csv"
out_path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(out_path, index=False)
print(f"Wrote: {out_path.resolve()}")