#!/usr/bin/env python3 from __future__ import annotations import base64 import json from datetime import datetime from io import BytesIO from pathlib import Path from typing import Dict, Iterable, List, Tuple import pandas as pd from shiny import reactive from shiny.express import input, render, ui from shinywidgets import render_plotly from shinyswatch import theme from shiny.render import DataTable from zipfile import ZipFile, ZIP_DEFLATED from xml.sax.saxutils import escape from shinyswatch import theme import plotly.express as px # --------------------------------------------------------------------- # Data configuration # --------------------------------------------------------------------- ROOT = Path(__file__).resolve().parent DATA_DIR = ROOT / "data" WEIGHTING_OPTIONS: List[Tuple[str, str]] = [ ("Unweighted", "unweighted"), ("Level 1 weights", "level1"), ("Level 2 weights", "level2"), ("Level 3 weights", "level3"), ] WEIGHT_SUFFIXES: Dict[str, str] = { "unweighted": "", "level1": "_w_code1", "level2": "_w_code2", "level3": "_w_code3", } WEIGHT_LABELS: Dict[str, str] = {value: label for label, value in WEIGHTING_OPTIONS} WEIGHTING_CHOICE_MAP: Dict[str, str] = { value: label for label, value in WEIGHTING_OPTIONS } def weight_choice_label(choice: str) -> str: return WEIGHT_LABELS.get(choice, WEIGHT_LABELS.get("unweighted", "Unweighted")) TAXONOMY_CONFIG: Dict[str, Dict[str, object]] = { "🇸🇪 ssyk2012": { "label": "SSYK 2012", "data_path": DATA_DIR / "daioe_processed" / "daioe_ssyk2012_weighted.csv", "code_column": "ssyk2012_4", "supports_weights": True, "weight_suffixes": WEIGHT_SUFFIXES, }, "🇸🇪 ssyk96": { "label": "SSYK 1996", "data_path": DATA_DIR / "daioe_processed" / "daioe_ssyk96_weighted.csv", "code_column": "ssyk96_4", "supports_weights": True, "weight_suffixes": WEIGHT_SUFFIXES, }, } METRIC_OPTIONS: List[Tuple[str, str]] = [ ("📚 All Applications", "allapps"), ("♟️ Abstract strategy games", "stratgames"), ("🎮 Real-time video games", "videogames"), ("🖼️🔎 Image recognition", "imgrec"), ("🧩🖼️ Image comprehension", "imgcompr"), ("🖌️🖼️ Image generation", "imggen"), ("📖 Reading comprehension", "readcompr"), ("✍️🤖 Language modelling", "lngmod"), ("🌐🔤 Translation", "translat"), ("🗣️🎙️ Speech recognition", "speechrec"), ("🧠✨ Generative AI", "genai"), ] # --------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------- def clean_code(series: pd.Series) -> pd.Series: return series.astype(str).str.extract(r"(\d{1,4})")[0].fillna("").str.zfill(4) def ensure_numeric(df: pd.DataFrame, columns: Iterable[str]) -> None: for column in columns: if column in df.columns: df[column] = pd.to_numeric(df[column], errors="coerce") def load_dataset(tax_id: str) -> pd.DataFrame: cfg = TAXONOMY_CONFIG[tax_id] path = cfg["data_path"] if not path.exists(): raise FileNotFoundError(f"Dataset not found for {tax_id}: {path}") df = pd.read_csv(path) if "year" in df.columns: df["year"] = pd.to_numeric(df["year"], errors="coerce") df = df.dropna(subset=["year"]) df["year"] = df["year"].astype(int) code_col = cfg["code_column"] raw_code = df[code_col].astype(str) df["code"] = clean_code(raw_code) if "occupation" not in df.columns or df["occupation"].isna().all(): df["occupation"] = ( raw_code.str.replace(r"^\d{1,4}\s*", "", regex=True) .str.strip() .replace("", pd.NA) ) df["occupation"] = df["occupation"].fillna(df["code"]) metric_columns: List[str] = [ f"daioe_{metric_suffix}" for _, metric_suffix in METRIC_OPTIONS if f"daioe_{metric_suffix}" in df.columns ] weight_suffixes = cfg.get("weight_suffixes", {}) extra_suffixes = { suffix for suffix in weight_suffixes.values() if isinstance(suffix, str) and suffix } for weight_suffix in extra_suffixes: metric_columns.extend( f"daioe_{metric_suffix}{weight_suffix}" for _, metric_suffix in METRIC_OPTIONS if f"daioe_{metric_suffix}{weight_suffix}" in df.columns ) ensure_numeric(df, metric_columns) return df def metric_suffix(label: str) -> str: for display, suffix in METRIC_OPTIONS: if display == label: return suffix raise KeyError(f"Unknown metric label '{label}'") def weight_label_from_column(column: str) -> str: default_label = WEIGHT_LABELS.get("unweighted", "Unweighted") for choice, suffix in WEIGHT_SUFFIXES.items(): if suffix and column.endswith(suffix): return WEIGHT_LABELS.get(choice, default_label) return default_label def resolve_metric_column( df: pd.DataFrame, metric_suffix_value: str, cfg: Dict[str, object], weight_choice: str, ) -> str: base_column = f"daioe_{metric_suffix_value}" weight_suffixes = cfg.get("weight_suffixes", {}) selected_suffix = weight_suffixes.get(weight_choice, "") if isinstance(selected_suffix, str) and selected_suffix: weighted_column = f"{base_column}{selected_suffix}" if weighted_column in df.columns: return weighted_column return base_column # --------------------------------------------------------------------- # Reactive calculations # --------------------------------------------------------------------- @reactive.calc def current_dataset() -> pd.DataFrame: return load_dataset(input.taxonomy()) @reactive.calc def metric_column() -> str: suffix = metric_suffix(input.metric()) cfg = TAXONOMY_CONFIG[input.taxonomy()] df = current_dataset() weight_choice = input.weighting() return resolve_metric_column(df, suffix, cfg, weight_choice) @reactive.calc def filtered_data() -> pd.DataFrame: df = current_dataset() column = metric_column() if df.empty or column not in df.columns: return df.iloc[0:0] start_year, end_year = input.year_range() df = df[(df["year"] >= start_year) & (df["year"] <= end_year)] query = input.search().strip() if query: df = df[df["occupation"].str.contains(query, case=False, na=False)] df = df.dropna(subset=[column]) code_order: List[str] | None = None top_n = int(input.top_n()) if not df.empty: latest_year = df["year"].max() ranking = df[df["year"] == latest_year].sort_values( column, ascending=not input.descending() ) ranked_codes = list(dict.fromkeys(ranking["code"].tolist())) if top_n > 0: selected_codes = ranked_codes[:top_n] df = df[df["code"].isin(selected_codes)] code_order = selected_codes else: existing_codes = list(dict.fromkeys(df["code"].tolist())) full_order = list(dict.fromkeys(ranked_codes + existing_codes)) if full_order: code_order = full_order if df.empty: return df if code_order: df = df.assign( _code_order=pd.Categorical(df["code"], categories=code_order, ordered=True) ).sort_values(["_code_order", "year"]) df = df.drop(columns="_code_order") else: df = df.sort_values(["occupation", "year"]) return df @reactive.calc def latest_year_top_data() -> pd.DataFrame: df = current_dataset() column = metric_column() if df.empty or column not in df.columns: return df.iloc[0:0] start_year, end_year = input.year_range() df = df[(df["year"] >= start_year) & (df["year"] <= end_year)] query = input.search().strip() if query: df = df[df["occupation"].str.contains(query, case=False, na=False)] df = df.dropna(subset=[column]) if df.empty: return df latest_year = int(df["year"].max()) df_latest = ( df[df["year"] == latest_year] .copy() .sort_values(column, ascending=not input.descending()) ) top_n = int(input.top_n()) if top_n > 0: df_latest = df_latest.head(top_n) df_latest["latest_year"] = latest_year return df_latest @reactive.calc def comparison_radar_payload() -> Dict[str, object]: empty_payload: Dict[str, object] = { "data": pd.DataFrame(columns=["occupation", "Metric", "Value"]), "metric_order": [], "year": None, "weight_label": weight_choice_label("unweighted"), } df = current_dataset() if df.empty or "year" not in df.columns: return empty_payload cfg = TAXONOMY_CONFIG[input.taxonomy()] comparison_year_input = getattr(input, "comparison_year", None) if comparison_year_input is None: year_value = int(df["year"].max()) else: year_raw = comparison_year_input() if isinstance(year_raw, (list, tuple)): year_raw = year_raw[-1] if pd.isna(year_raw): return empty_payload year_value = int(year_raw) if year_value not in df["year"].values: return empty_payload df_year = df[df["year"] == year_value] if df_year.empty: return empty_payload comparison_occ_input = getattr(input, "comparison_occupations", None) if comparison_occ_input is None: return empty_payload selected_occupations = comparison_occ_input() if not selected_occupations: return empty_payload df_year = df_year[df_year["occupation"].isin(selected_occupations)] if df_year.empty: return empty_payload comparison_weight_input = getattr(input, "comparison_weighting", None) if comparison_weight_input is None: weight_choice = "unweighted" else: weight_choice = comparison_weight_input() metric_map: List[Tuple[str, str]] = [] for label, suffix in METRIC_OPTIONS: column_name = resolve_metric_column(df_year, suffix, cfg, weight_choice) if column_name in df_year.columns: metric_map.append((label, column_name)) if not metric_map: return empty_payload metric_columns = [column_name for _, column_name in metric_map] weight_labels_in_use = { weight_label_from_column(column_name) for column_name in metric_columns } if len(weight_labels_in_use) == 1: weight_label_display = weight_labels_in_use.pop() else: weight_label_display = ", ".join(sorted(weight_labels_in_use)) aggregated = df_year.groupby("occupation", as_index=False)[metric_columns].mean( numeric_only=True ) if aggregated.empty: return empty_payload rename_map = {column_name: label for label, column_name in metric_map} melted = ( aggregated.rename(columns=rename_map) .melt(id_vars="occupation", var_name="Metric", value_name="Value") .dropna(subset=["Value"]) ) if melted.empty: return empty_payload metric_order = [label for label, _ in metric_map] melted["Metric"] = pd.Categorical( melted["Metric"], categories=metric_order, ordered=True ) melted = melted.sort_values(["occupation", "Metric"]) return { "data": melted, "metric_order": metric_order, "year": year_value, "weight_label": weight_label_display, } @reactive.effect def _update_year_slider(): df = current_dataset() if df.empty or "year" not in df.columns: return min_year, max_year = int(df["year"].min()), int(df["year"].max()) ui.update_slider( "year_range", min=min_year, max=max_year, value=(min_year, max_year) ) comparison_slider = getattr(input, "comparison_year", None) if comparison_slider is not None: current_value = comparison_slider() if isinstance(current_value, (list, tuple)): current_value = current_value[-1] if pd.isna(current_value): current_value = max_year else: current_value = max_year current_value = int(max(min_year, min(max_year, current_value))) ui.update_slider("comparison_year", min=min_year, max=max_year, value=current_value) @reactive.effect def _update_comparison_controls(): df = current_dataset() if df.empty or "year" not in df.columns: ui.update_selectize("comparison_occupations", choices=[], selected=[]) return comparison_slider = getattr(input, "comparison_year", None) if comparison_slider is None: comparison_year = int(df["year"].max()) else: year_raw = comparison_slider() if isinstance(year_raw, (list, tuple)): year_raw = year_raw[-1] if pd.isna(year_raw): comparison_year = int(df["year"].max()) else: comparison_year = int(year_raw) df_year = df[df["year"] == comparison_year] occupations = sorted( df_year["occupation"].dropna().unique().tolist(), key=str.casefold ) comparison_occ_input = getattr(input, "comparison_occupations", None) if comparison_occ_input is not None: current_selection = comparison_occ_input() else: current_selection = [] if isinstance(current_selection, str): current_selection = [current_selection] valid_selection = [occ for occ in current_selection if occ in occupations] ui.update_selectize( "comparison_occupations", choices=occupations, selected=valid_selection, ) # --------------------------------------------------------------------- # Outputs # --------------------------------------------------------------------- # @render.data_frame # def occupations_table(): # df = filtered_data() # column = metric_column() # columns = ["code", "occupation", "year", column] # available = [c for c in columns if c in df.columns] # return df[available] def _table_dataframe() -> pd.DataFrame: df = filtered_data() column = metric_column() if df.empty or column not in df.columns: return df.iloc[0:0] columns = ["code", "occupation", "year", column] available = [c for c in columns if c in df.columns] return df[available] def _excel_column_name(index: int) -> str: """Convert 1-based column index to Excel-style letters.""" name = "" while index > 0: index, remainder = divmod(index - 1, 26) name = chr(65 + remainder) + name return name def _dataframe_to_xlsx_bytes( df: pd.DataFrame, sheet_name: str = "Occupations" ) -> bytes: """Create a minimal XLSX file from a DataFrame without external dependencies.""" content_types = """ """ root_rels = """ """ workbook = f""" """ workbook_rels = """ """ styles = """ """ rows_xml: List[str] = [] def cell_xml(col_idx: int, row_idx: int, value) -> str: ref = f"{_excel_column_name(col_idx)}{row_idx}" if pd.isna(value): return f'' if isinstance(value, (int, float)) and not isinstance(value, bool): return f'{value}' return f'{escape(str(value))}' # Header row header_cells = [ cell_xml(idx, 1, column) for idx, column in enumerate(df.columns, start=1) ] rows_xml.append(f'{"".join(header_cells)}') for row_offset, (_, row) in enumerate(df.iterrows(), start=2): cells = [ cell_xml(col_idx, row_offset, row[col_name]) for col_idx, col_name in enumerate(df.columns, start=1) ] rows_xml.append(f'{"".join(cells)}') sheet_xml = ( '' '' "" + "".join(rows_xml) + "" ) buf = BytesIO() with ZipFile(buf, "w", ZIP_DEFLATED) as zf: zf.writestr("[Content_Types].xml", content_types) zf.writestr("_rels/.rels", root_rels) zf.writestr("xl/workbook.xml", workbook) zf.writestr("xl/_rels/workbook.xml.rels", workbook_rels) zf.writestr("xl/styles.xml", styles) zf.writestr("xl/worksheets/sheet1.xml", sheet_xml) buf.seek(0) return buf.read() # --------------------------------------------------------------------- # UI (Shiny Express style) # --------------------------------------------------------------------- ui.page_opts(full_width=True, theme=theme.flatly) with ui.navset_tab(id="view"): with ui.nav_panel("Explore"): with ui.layout_sidebar(): with ui.sidebar(open="desktop"): taxonomy_choices = { key: cfg["label"] for key, cfg in TAXONOMY_CONFIG.items() } metric_labels = [label for label, _ in METRIC_OPTIONS] ui.input_select( "taxonomy", "Taxonomy", taxonomy_choices, selected="ssyk2012" ) ui.input_select( "metric", "Sub-index", metric_labels, selected=metric_labels[0] ) ui.input_select( "weighting", "Weighting", WEIGHTING_CHOICE_MAP, selected="unweighted", ) ui.input_slider( "year_range", "Year range", 2010, 2023, value=(2010, 2023) ) ui.input_slider( "top_n", "Occupations to display (0 = all)", min=0, max=20, value=10, step=1, ) ui.input_switch("descending", "Sort descending", True) ui.input_text("search", "Search occupation", placeholder="e.g. manager") with ui.card(): ui.card_header("Exposure Trends") @render_plotly def trend_plot(): df = filtered_data() column = metric_column() if df.empty: return px.line( title="No data available for the current filters." ) weight_label = weight_label_from_column(column) taxonomy_label = TAXONOMY_CONFIG[input.taxonomy()]["label"] fig = px.line( df, x="year", y=column, color="occupation", markers=True, labels={ "year": "Year", column: input.metric(), "occupation": "Occupation", }, title=f"{input.metric()} ({weight_label}, {taxonomy_label})", ) fig.update_layout(legend_title="Occupation", hovermode="x unified") return fig with ui.card(): ui.card_header("Latest Year Occupations") @render_plotly def latest_year_bar(): df = latest_year_top_data() column = metric_column() if df.empty: return px.bar( title="No data available for the current filters." ) latest_year = int(df["latest_year"].iloc[0]) weight_label = weight_label_from_column(column) taxonomy_label = TAXONOMY_CONFIG[input.taxonomy()]["label"] sorted_df = df.sort_values(column, ascending=not input.descending()) top_n = int(input.top_n()) display_count = len(sorted_df) if top_n <= 0: group_label = f"All {display_count} occupations" else: group_label = f"Showing {display_count} occupations" fig = px.bar( sorted_df, x=column, y="occupation", orientation="h", labels={ column: input.metric(), "occupation": "Occupation", }, title=( f"{input.metric()} ({weight_label}, {taxonomy_label}) — " f"{group_label} in {latest_year}" ), ) fig.update_layout(hovermode="closest") fig.update_yaxes( categoryorder="array", categoryarray=sorted_df["occupation"].tolist(), autorange="reversed", ) return fig with ui.nav_panel("Compare"): with ui.card(): ui.card_header("Comparison Controls") ui.input_slider( "comparison_year", "Comparison year", min=2010, max=2023, value=2023, step=1, sep="", ) ui.input_selectize( "comparison_occupations", "Occupations", choices=[], multiple=True, options={"placeholder": "Search occupations to compare"}, width="100%", ) ui.input_select( "comparison_weighting", "Weighting", WEIGHTING_CHOICE_MAP, selected="unweighted", ) with ui.card(): ui.card_header("Sub-index Comparison") @render_plotly def comparison_radar(): payload = comparison_radar_payload() df = payload["data"] if df.empty: return px.line_polar( title="Select at least one occupation to compare." ) metric_order = payload["metric_order"] year = payload["year"] weight_label = payload["weight_label"] fig = px.line_polar( df, r="Value", theta="Metric", color="occupation", category_orders={"Metric": metric_order}, line_close=True, ) fig.update_traces(fill="toself") fig.update_layout( title=(f"AI Sub-index radar comparison ({weight_label}, {year})"), legend_title="Occupation", polar=dict(radialaxis=dict(visible=True)), margin=dict(l=40, r=40, t=60, b=40), ) # fig.update_layout(width=700, height=700) return fig with ui.nav_panel("Data"): with ui.card(): ui.card_header("Download Options & Summary") ui.p("Select a format to download the filtered occupations data.") ui.div( ui.input_action_button( "download_csv_btn", "Download CSV", class_="btn btn-primary flex-fill", ), ui.input_action_button( "download_json_btn", "Download JSON", class_="btn btn-success flex-fill", ), ui.input_action_button( "download_xlsx_btn", "Download XLSX", class_="btn btn-info flex-fill", ), class_="d-flex flex-column flex-md-row gap-2 mb-3", ) ui.h5("Data Summary") @render.text def data_summary(): df = _table_dataframe() if df.empty: return "No data available for the current filters." column = metric_column() record_count = len(df) year_min = int(df["year"].min()) year_max = int(df["year"].max()) metric_label = input.metric() weighting = WEIGHTING_CHOICE_MAP.get(input.weighting(), "Unweighted") return ( f"Rows: {record_count}\n" f"Years: {year_min}–{year_max}\n" f"Metric: {metric_label}\n" f"Weighting: {weighting}" ) with ui.card(): ui.card_header("Filtered occupations") @render.data_frame def occupations_table(): df = _table_dataframe() return DataTable( df, filters=True, summary=True, height="500px", width="100%", ) @render.ui @reactive.event(input.download_csv_btn) def trigger_csv_download(): df = _table_dataframe() csv_data = df.to_csv(index=False) filename = f"occupations_{datetime.now():%Y-%m-%d}.csv" data_json = json.dumps(csv_data) filename_json = json.dumps(filename) script = ( "" ) return ui.HTML(script) @render.ui @reactive.event(input.download_json_btn) def trigger_json_download(): df = _table_dataframe() json_str = df.to_json(orient="records", indent=2) filename = f"occupations_{datetime.now():%Y-%m-%d}.json" data_json = json.dumps(json_str) filename_json = json.dumps(filename) script = ( "" ) return ui.HTML(script) @render.ui @reactive.event(input.download_xlsx_btn) def trigger_xlsx_download(): df = _table_dataframe() xlsx_bytes = _dataframe_to_xlsx_bytes(df) filename = f"occupations_{datetime.now():%Y-%m-%d}.xlsx" b64_data = base64.b64encode(xlsx_bytes).decode("utf-8") filename_json = json.dumps(filename) script = ( "" ) return ui.HTML(script)