Spaces:
Sleeping
Sleeping
| ## Required Libraries | |
| import pandas as pd | |
| from pyscbwrapper import SCB | |
| from pathlib import Path | |
| ## Sorting the working directory | |
| # ROOT = Path(__file__).resolve().parents[1] ## for scripts | |
| ROOT = Path.cwd().resolve().parents[1] ## for notebooks | |
| ## Define the Table Nodes - Can be changed | |
| TAX_ID = "ssyk2012" # or ssyk96 Classification | |
| TABLES = { | |
| "ssyk2012_tab": ("en", "AM", "AM0208", "AM0208E", "YREG51BAS"), | |
| "ssyk96_tab": ("en", "AM", "AM0208", "AM0208E", "YREG33"), | |
| } | |
| scb = SCB(*TABLES[f"{TAX_ID}_tab"]) | |
| # scb.info() | |
| var_ = scb.get_variables() | |
| occupations_key, occupations = next(iter(var_.items())) | |
| clean_key = occupations_key.replace(" ", "") | |
| # Years can be strings; coerce to int safely and pick max | |
| def coerce_year(y): | |
| try: | |
| return int(y) | |
| except Exception: | |
| return None | |
| years = [coerce_year(y) for y in var_["year"]] | |
| years = [y for y in years if y is not None] | |
| latest_year = str(max(years)) | |
| # ========= 3) Build query (match exact variable names) ========= # | |
| scb.set_query( | |
| **{ | |
| clean_key: occupations, | |
| "year": [latest_year], | |
| } | |
| ) | |
| scb_data = scb.get_data() | |
| scb_fetch = scb_data["data"] | |
| codes = scb.get_query()["query"][0]["selection"]["values"] | |
| occ_dict = dict(zip(codes, occupations)) | |
| # CREATING CLEANED DATAFRAME | |
| records = [] | |
| for r in scb_fetch: | |
| code, year = r["key"][:2] # occupation code, year | |
| name = occ_dict.get(code, code) | |
| value = r["values"][0] # raw string | |
| records.append({"code_4": code, "occupation": name, "year": year, "value": value}) | |
| df = pd.DataFrame(records) | |
| df = df[df["code_4"] != "0002"].reset_index(drop=True) ## remove unidentified group | |
| # Drop missing values before weighting | |
| df = df.dropna(subset=["value"]).reset_index(drop=True) | |
| # Code formatting | |
| df["code_4"] = df["code_4"].astype(str).str.zfill(4) | |
| df["code_3"] = df["code_4"].str[:3] | |
| df["code_2"] = df["code_4"].str[:2] | |
| df["code_1"] = df["code_4"].str[0] | |
| df["value"] = df["value"].astype(int) | |
| # Create aggregated totals automatically | |
| for level in ["code_1", "code_2", "code_3"]: | |
| df[f"value_{level[-1]}"] = ( | |
| df.groupby(["year", level])["value"].transform("sum").astype(int) | |
| ) | |
| # weighting per the next broader group | |
| df["weight_in_code_3"] = (df["value"] / df["value_3"]).round(6) | |
| df["weight_in_code_2"] = (df["value_3"] / df["value_2"]).round(6) | |
| df["weight_in_code_1"] = (df["value_2"] / df["value_1"]).round(6) | |
| ## REMOVE EXTRA COLUMNS AND PREPARE DATA FOR NEXT STAGE | |
| df = df[ | |
| [ | |
| "code_4", | |
| "occupation", | |
| "value", | |
| "weight_in_code_3", | |
| "weight_in_code_2", | |
| "weight_in_code_1", | |
| ] | |
| ] | |
| # sanity check | |
| out_path = ROOT / "data" / "scb_weights" / f"{TAX_ID}_weights_en_{latest_year}.csv" | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| df.to_csv(out_path, index=False) | |
| print(f"Wrote: {out_path.resolve()}") | |