Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import base64 | |
| import json | |
| from datetime import datetime | |
| from io import BytesIO | |
| from pathlib import Path | |
| from typing import Dict, Iterable, List, Tuple | |
| import pandas as pd | |
| from shiny import reactive | |
| from shiny.express import input, render, ui | |
| from shinywidgets import render_plotly | |
| from shinyswatch import theme | |
| from shiny.render import DataTable | |
| from zipfile import ZipFile, ZIP_DEFLATED | |
| from xml.sax.saxutils import escape | |
| from shinyswatch import theme | |
| import plotly.express as px | |
| # --------------------------------------------------------------------- | |
| # Data configuration | |
| # --------------------------------------------------------------------- | |
| ROOT = Path(__file__).resolve().parent | |
| DATA_DIR = ROOT / "data" | |
| WEIGHTING_OPTIONS: List[Tuple[str, str]] = [ | |
| ("Unweighted", "unweighted"), | |
| ("Level 1 weights", "level1"), | |
| ("Level 2 weights", "level2"), | |
| ("Level 3 weights", "level3"), | |
| ] | |
| WEIGHT_SUFFIXES: Dict[str, str] = { | |
| "unweighted": "", | |
| "level1": "_w_code1", | |
| "level2": "_w_code2", | |
| "level3": "_w_code3", | |
| } | |
| WEIGHT_LABELS: Dict[str, str] = {value: label for label, value in WEIGHTING_OPTIONS} | |
| WEIGHTING_CHOICE_MAP: Dict[str, str] = { | |
| value: label for label, value in WEIGHTING_OPTIONS | |
| } | |
| def weight_choice_label(choice: str) -> str: | |
| return WEIGHT_LABELS.get(choice, WEIGHT_LABELS.get("unweighted", "Unweighted")) | |
| TAXONOMY_CONFIG: Dict[str, Dict[str, object]] = { | |
| "๐ธ๐ช ssyk2012": { | |
| "label": "SSYK 2012", | |
| "data_path": DATA_DIR / "daioe_processed" / "daioe_ssyk2012_weighted.csv", | |
| "code_column": "ssyk2012_4", | |
| "supports_weights": True, | |
| "weight_suffixes": WEIGHT_SUFFIXES, | |
| }, | |
| "๐ธ๐ช ssyk96": { | |
| "label": "SSYK 1996", | |
| "data_path": DATA_DIR / "daioe_processed" / "daioe_ssyk96_weighted.csv", | |
| "code_column": "ssyk96_4", | |
| "supports_weights": True, | |
| "weight_suffixes": WEIGHT_SUFFIXES, | |
| }, | |
| } | |
| METRIC_OPTIONS: List[Tuple[str, str]] = [ | |
| ("๐ All Applications", "allapps"), | |
| ("โ๏ธ Abstract strategy games", "stratgames"), | |
| ("๐ฎ Real-time video games", "videogames"), | |
| ("๐ผ๏ธ๐ Image recognition", "imgrec"), | |
| ("๐งฉ๐ผ๏ธ Image comprehension", "imgcompr"), | |
| ("๐๏ธ๐ผ๏ธ Image generation", "imggen"), | |
| ("๐ Reading comprehension", "readcompr"), | |
| ("โ๏ธ๐ค Language modelling", "lngmod"), | |
| ("๐๐ค Translation", "translat"), | |
| ("๐ฃ๏ธ๐๏ธ Speech recognition", "speechrec"), | |
| ("๐ง โจ Generative AI", "genai"), | |
| ] | |
| # --------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------- | |
| def clean_code(series: pd.Series) -> pd.Series: | |
| return series.astype(str).str.extract(r"(\d{1,4})")[0].fillna("").str.zfill(4) | |
| def ensure_numeric(df: pd.DataFrame, columns: Iterable[str]) -> None: | |
| for column in columns: | |
| if column in df.columns: | |
| df[column] = pd.to_numeric(df[column], errors="coerce") | |
| def load_dataset(tax_id: str) -> pd.DataFrame: | |
| cfg = TAXONOMY_CONFIG[tax_id] | |
| path = cfg["data_path"] | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Dataset not found for {tax_id}: {path}") | |
| df = pd.read_csv(path) | |
| if "year" in df.columns: | |
| df["year"] = pd.to_numeric(df["year"], errors="coerce") | |
| df = df.dropna(subset=["year"]) | |
| df["year"] = df["year"].astype(int) | |
| code_col = cfg["code_column"] | |
| raw_code = df[code_col].astype(str) | |
| df["code"] = clean_code(raw_code) | |
| if "occupation" not in df.columns or df["occupation"].isna().all(): | |
| df["occupation"] = ( | |
| raw_code.str.replace(r"^\d{1,4}\s*", "", regex=True) | |
| .str.strip() | |
| .replace("", pd.NA) | |
| ) | |
| df["occupation"] = df["occupation"].fillna(df["code"]) | |
| metric_columns: List[str] = [ | |
| f"daioe_{metric_suffix}" | |
| for _, metric_suffix in METRIC_OPTIONS | |
| if f"daioe_{metric_suffix}" in df.columns | |
| ] | |
| weight_suffixes = cfg.get("weight_suffixes", {}) | |
| extra_suffixes = { | |
| suffix | |
| for suffix in weight_suffixes.values() | |
| if isinstance(suffix, str) and suffix | |
| } | |
| for weight_suffix in extra_suffixes: | |
| metric_columns.extend( | |
| f"daioe_{metric_suffix}{weight_suffix}" | |
| for _, metric_suffix in METRIC_OPTIONS | |
| if f"daioe_{metric_suffix}{weight_suffix}" in df.columns | |
| ) | |
| ensure_numeric(df, metric_columns) | |
| return df | |
| def metric_suffix(label: str) -> str: | |
| for display, suffix in METRIC_OPTIONS: | |
| if display == label: | |
| return suffix | |
| raise KeyError(f"Unknown metric label '{label}'") | |
| def weight_label_from_column(column: str) -> str: | |
| default_label = WEIGHT_LABELS.get("unweighted", "Unweighted") | |
| for choice, suffix in WEIGHT_SUFFIXES.items(): | |
| if suffix and column.endswith(suffix): | |
| return WEIGHT_LABELS.get(choice, default_label) | |
| return default_label | |
| def resolve_metric_column( | |
| df: pd.DataFrame, | |
| metric_suffix_value: str, | |
| cfg: Dict[str, object], | |
| weight_choice: str, | |
| ) -> str: | |
| base_column = f"daioe_{metric_suffix_value}" | |
| weight_suffixes = cfg.get("weight_suffixes", {}) | |
| selected_suffix = weight_suffixes.get(weight_choice, "") | |
| if isinstance(selected_suffix, str) and selected_suffix: | |
| weighted_column = f"{base_column}{selected_suffix}" | |
| if weighted_column in df.columns: | |
| return weighted_column | |
| return base_column | |
| # --------------------------------------------------------------------- | |
| # Reactive calculations | |
| # --------------------------------------------------------------------- | |
| def current_dataset() -> pd.DataFrame: | |
| return load_dataset(input.taxonomy()) | |
| def metric_column() -> str: | |
| suffix = metric_suffix(input.metric()) | |
| cfg = TAXONOMY_CONFIG[input.taxonomy()] | |
| df = current_dataset() | |
| weight_choice = input.weighting() | |
| return resolve_metric_column(df, suffix, cfg, weight_choice) | |
| def filtered_data() -> pd.DataFrame: | |
| df = current_dataset() | |
| column = metric_column() | |
| if df.empty or column not in df.columns: | |
| return df.iloc[0:0] | |
| start_year, end_year = input.year_range() | |
| df = df[(df["year"] >= start_year) & (df["year"] <= end_year)] | |
| query = input.search().strip() | |
| if query: | |
| df = df[df["occupation"].str.contains(query, case=False, na=False)] | |
| df = df.dropna(subset=[column]) | |
| code_order: List[str] | None = None | |
| top_n = int(input.top_n()) | |
| if not df.empty: | |
| latest_year = df["year"].max() | |
| ranking = df[df["year"] == latest_year].sort_values( | |
| column, ascending=not input.descending() | |
| ) | |
| ranked_codes = list(dict.fromkeys(ranking["code"].tolist())) | |
| if top_n > 0: | |
| selected_codes = ranked_codes[:top_n] | |
| df = df[df["code"].isin(selected_codes)] | |
| code_order = selected_codes | |
| else: | |
| existing_codes = list(dict.fromkeys(df["code"].tolist())) | |
| full_order = list(dict.fromkeys(ranked_codes + existing_codes)) | |
| if full_order: | |
| code_order = full_order | |
| if df.empty: | |
| return df | |
| if code_order: | |
| df = df.assign( | |
| _code_order=pd.Categorical(df["code"], categories=code_order, ordered=True) | |
| ).sort_values(["_code_order", "year"]) | |
| df = df.drop(columns="_code_order") | |
| else: | |
| df = df.sort_values(["occupation", "year"]) | |
| return df | |
| def latest_year_top_data() -> pd.DataFrame: | |
| df = current_dataset() | |
| column = metric_column() | |
| if df.empty or column not in df.columns: | |
| return df.iloc[0:0] | |
| start_year, end_year = input.year_range() | |
| df = df[(df["year"] >= start_year) & (df["year"] <= end_year)] | |
| query = input.search().strip() | |
| if query: | |
| df = df[df["occupation"].str.contains(query, case=False, na=False)] | |
| df = df.dropna(subset=[column]) | |
| if df.empty: | |
| return df | |
| latest_year = int(df["year"].max()) | |
| df_latest = ( | |
| df[df["year"] == latest_year] | |
| .copy() | |
| .sort_values(column, ascending=not input.descending()) | |
| ) | |
| top_n = int(input.top_n()) | |
| if top_n > 0: | |
| df_latest = df_latest.head(top_n) | |
| df_latest["latest_year"] = latest_year | |
| return df_latest | |
| def comparison_radar_payload() -> Dict[str, object]: | |
| empty_payload: Dict[str, object] = { | |
| "data": pd.DataFrame(columns=["occupation", "Metric", "Value"]), | |
| "metric_order": [], | |
| "year": None, | |
| "weight_label": weight_choice_label("unweighted"), | |
| } | |
| df = current_dataset() | |
| if df.empty or "year" not in df.columns: | |
| return empty_payload | |
| cfg = TAXONOMY_CONFIG[input.taxonomy()] | |
| comparison_year_input = getattr(input, "comparison_year", None) | |
| if comparison_year_input is None: | |
| year_value = int(df["year"].max()) | |
| else: | |
| year_raw = comparison_year_input() | |
| if isinstance(year_raw, (list, tuple)): | |
| year_raw = year_raw[-1] | |
| if pd.isna(year_raw): | |
| return empty_payload | |
| year_value = int(year_raw) | |
| if year_value not in df["year"].values: | |
| return empty_payload | |
| df_year = df[df["year"] == year_value] | |
| if df_year.empty: | |
| return empty_payload | |
| comparison_occ_input = getattr(input, "comparison_occupations", None) | |
| if comparison_occ_input is None: | |
| return empty_payload | |
| selected_occupations = comparison_occ_input() | |
| if not selected_occupations: | |
| return empty_payload | |
| df_year = df_year[df_year["occupation"].isin(selected_occupations)] | |
| if df_year.empty: | |
| return empty_payload | |
| comparison_weight_input = getattr(input, "comparison_weighting", None) | |
| if comparison_weight_input is None: | |
| weight_choice = "unweighted" | |
| else: | |
| weight_choice = comparison_weight_input() | |
| metric_map: List[Tuple[str, str]] = [] | |
| for label, suffix in METRIC_OPTIONS: | |
| column_name = resolve_metric_column(df_year, suffix, cfg, weight_choice) | |
| if column_name in df_year.columns: | |
| metric_map.append((label, column_name)) | |
| if not metric_map: | |
| return empty_payload | |
| metric_columns = [column_name for _, column_name in metric_map] | |
| weight_labels_in_use = { | |
| weight_label_from_column(column_name) for column_name in metric_columns | |
| } | |
| if len(weight_labels_in_use) == 1: | |
| weight_label_display = weight_labels_in_use.pop() | |
| else: | |
| weight_label_display = ", ".join(sorted(weight_labels_in_use)) | |
| aggregated = df_year.groupby("occupation", as_index=False)[metric_columns].mean( | |
| numeric_only=True | |
| ) | |
| if aggregated.empty: | |
| return empty_payload | |
| rename_map = {column_name: label for label, column_name in metric_map} | |
| melted = ( | |
| aggregated.rename(columns=rename_map) | |
| .melt(id_vars="occupation", var_name="Metric", value_name="Value") | |
| .dropna(subset=["Value"]) | |
| ) | |
| if melted.empty: | |
| return empty_payload | |
| metric_order = [label for label, _ in metric_map] | |
| melted["Metric"] = pd.Categorical( | |
| melted["Metric"], categories=metric_order, ordered=True | |
| ) | |
| melted = melted.sort_values(["occupation", "Metric"]) | |
| return { | |
| "data": melted, | |
| "metric_order": metric_order, | |
| "year": year_value, | |
| "weight_label": weight_label_display, | |
| } | |
| def _update_year_slider(): | |
| df = current_dataset() | |
| if df.empty or "year" not in df.columns: | |
| return | |
| min_year, max_year = int(df["year"].min()), int(df["year"].max()) | |
| ui.update_slider( | |
| "year_range", min=min_year, max=max_year, value=(min_year, max_year) | |
| ) | |
| comparison_slider = getattr(input, "comparison_year", None) | |
| if comparison_slider is not None: | |
| current_value = comparison_slider() | |
| if isinstance(current_value, (list, tuple)): | |
| current_value = current_value[-1] | |
| if pd.isna(current_value): | |
| current_value = max_year | |
| else: | |
| current_value = max_year | |
| current_value = int(max(min_year, min(max_year, current_value))) | |
| ui.update_slider("comparison_year", min=min_year, max=max_year, value=current_value) | |
| def _update_comparison_controls(): | |
| df = current_dataset() | |
| if df.empty or "year" not in df.columns: | |
| ui.update_selectize("comparison_occupations", choices=[], selected=[]) | |
| return | |
| comparison_slider = getattr(input, "comparison_year", None) | |
| if comparison_slider is None: | |
| comparison_year = int(df["year"].max()) | |
| else: | |
| year_raw = comparison_slider() | |
| if isinstance(year_raw, (list, tuple)): | |
| year_raw = year_raw[-1] | |
| if pd.isna(year_raw): | |
| comparison_year = int(df["year"].max()) | |
| else: | |
| comparison_year = int(year_raw) | |
| df_year = df[df["year"] == comparison_year] | |
| occupations = sorted( | |
| df_year["occupation"].dropna().unique().tolist(), key=str.casefold | |
| ) | |
| comparison_occ_input = getattr(input, "comparison_occupations", None) | |
| if comparison_occ_input is not None: | |
| current_selection = comparison_occ_input() | |
| else: | |
| current_selection = [] | |
| if isinstance(current_selection, str): | |
| current_selection = [current_selection] | |
| valid_selection = [occ for occ in current_selection if occ in occupations] | |
| ui.update_selectize( | |
| "comparison_occupations", | |
| choices=occupations, | |
| selected=valid_selection, | |
| ) | |
| # --------------------------------------------------------------------- | |
| # Outputs | |
| # --------------------------------------------------------------------- | |
| # @render.data_frame | |
| # def occupations_table(): | |
| # df = filtered_data() | |
| # column = metric_column() | |
| # columns = ["code", "occupation", "year", column] | |
| # available = [c for c in columns if c in df.columns] | |
| # return df[available] | |
| def _table_dataframe() -> pd.DataFrame: | |
| df = filtered_data() | |
| column = metric_column() | |
| if df.empty or column not in df.columns: | |
| return df.iloc[0:0] | |
| columns = ["code", "occupation", "year", column] | |
| available = [c for c in columns if c in df.columns] | |
| return df[available] | |
| def _excel_column_name(index: int) -> str: | |
| """Convert 1-based column index to Excel-style letters.""" | |
| name = "" | |
| while index > 0: | |
| index, remainder = divmod(index - 1, 26) | |
| name = chr(65 + remainder) + name | |
| return name | |
| def _dataframe_to_xlsx_bytes( | |
| df: pd.DataFrame, sheet_name: str = "Occupations" | |
| ) -> bytes: | |
| """Create a minimal XLSX file from a DataFrame without external dependencies.""" | |
| content_types = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?> | |
| <Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types"> | |
| <Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/> | |
| <Default Extension="xml" ContentType="application/xml"/> | |
| <Override PartName="/xl/worksheets/sheet1.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.worksheet+xml"/> | |
| <Override PartName="/xl/workbook.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet.main+xml"/> | |
| <Override PartName="/xl/styles.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.styles+xml"/> | |
| </Types> | |
| """ | |
| root_rels = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?> | |
| <Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships"> | |
| <Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" Target="xl/workbook.xml"/> | |
| </Relationships> | |
| """ | |
| workbook = f"""<?xml version="1.0" encoding="UTF-8" standalone="yes"?> | |
| <workbook xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main" | |
| xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships"> | |
| <sheets> | |
| <sheet name="{escape(sheet_name)}" sheetId="1" r:id="rId1"/> | |
| </sheets> | |
| </workbook> | |
| """ | |
| workbook_rels = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?> | |
| <Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships"> | |
| <Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/worksheet" Target="worksheets/sheet1.xml"/> | |
| <Relationship Id="rId2" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/styles" Target="styles.xml"/> | |
| </Relationships> | |
| """ | |
| styles = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?> | |
| <styleSheet xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main"> | |
| <fonts count="1"><font><sz val="11"/><color theme="1"/><name val="Calibri"/><family val="2"/></font></fonts> | |
| <fills count="1"><fill><patternFill patternType="none"/></fill></fills> | |
| <borders count="1"><border/></borders> | |
| <cellStyleXfs count="1"><xf numFmtId="0" fontId="0" fillId="0" borderId="0"/></cellStyleXfs> | |
| <cellXfs count="1"><xf numFmtId="0" fontId="0" fillId="0" borderId="0" xfId="0"/></cellXfs> | |
| <cellStyles count="1"><cellStyle name="Normal" xfId="0" builtinId="0"/></cellStyles> | |
| </styleSheet> | |
| """ | |
| rows_xml: List[str] = [] | |
| def cell_xml(col_idx: int, row_idx: int, value) -> str: | |
| ref = f"{_excel_column_name(col_idx)}{row_idx}" | |
| if pd.isna(value): | |
| return f'<c r="{ref}"/>' | |
| if isinstance(value, (int, float)) and not isinstance(value, bool): | |
| return f'<c r="{ref}"><v>{value}</v></c>' | |
| return f'<c r="{ref}" t="str"><v>{escape(str(value))}</v></c>' | |
| # Header row | |
| header_cells = [ | |
| cell_xml(idx, 1, column) for idx, column in enumerate(df.columns, start=1) | |
| ] | |
| rows_xml.append(f'<row r="1">{"".join(header_cells)}</row>') | |
| for row_offset, (_, row) in enumerate(df.iterrows(), start=2): | |
| cells = [ | |
| cell_xml(col_idx, row_offset, row[col_name]) | |
| for col_idx, col_name in enumerate(df.columns, start=1) | |
| ] | |
| rows_xml.append(f'<row r="{row_offset}">{"".join(cells)}</row>') | |
| sheet_xml = ( | |
| '<?xml version="1.0" encoding="UTF-8" standalone="yes"?>' | |
| '<worksheet xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main" ' | |
| 'xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships">' | |
| "<sheetData>" + "".join(rows_xml) + "</sheetData></worksheet>" | |
| ) | |
| buf = BytesIO() | |
| with ZipFile(buf, "w", ZIP_DEFLATED) as zf: | |
| zf.writestr("[Content_Types].xml", content_types) | |
| zf.writestr("_rels/.rels", root_rels) | |
| zf.writestr("xl/workbook.xml", workbook) | |
| zf.writestr("xl/_rels/workbook.xml.rels", workbook_rels) | |
| zf.writestr("xl/styles.xml", styles) | |
| zf.writestr("xl/worksheets/sheet1.xml", sheet_xml) | |
| buf.seek(0) | |
| return buf.read() | |
| # --------------------------------------------------------------------- | |
| # UI (Shiny Express style) | |
| # --------------------------------------------------------------------- | |
| ui.page_opts(full_width=True, theme=theme.flatly) | |
| with ui.navset_tab(id="view"): | |
| with ui.nav_panel("Explore"): | |
| with ui.layout_sidebar(): | |
| with ui.sidebar(open="desktop"): | |
| taxonomy_choices = { | |
| key: cfg["label"] for key, cfg in TAXONOMY_CONFIG.items() | |
| } | |
| metric_labels = [label for label, _ in METRIC_OPTIONS] | |
| ui.input_select( | |
| "taxonomy", "Taxonomy", taxonomy_choices, selected="ssyk2012" | |
| ) | |
| ui.input_select( | |
| "metric", "Sub-index", metric_labels, selected=metric_labels[0] | |
| ) | |
| ui.input_select( | |
| "weighting", | |
| "Weighting", | |
| WEIGHTING_CHOICE_MAP, | |
| selected="unweighted", | |
| ) | |
| ui.input_slider( | |
| "year_range", "Year range", 2010, 2023, value=(2010, 2023) | |
| ) | |
| ui.input_slider( | |
| "top_n", | |
| "Occupations to display (0 = all)", | |
| min=0, | |
| max=20, | |
| value=10, | |
| step=1, | |
| ) | |
| ui.input_switch("descending", "Sort descending", True) | |
| ui.input_text("search", "Search occupation", placeholder="e.g. manager") | |
| with ui.card(): | |
| ui.card_header("Exposure Trends") | |
| def trend_plot(): | |
| df = filtered_data() | |
| column = metric_column() | |
| if df.empty: | |
| return px.line( | |
| title="No data available for the current filters." | |
| ) | |
| weight_label = weight_label_from_column(column) | |
| taxonomy_label = TAXONOMY_CONFIG[input.taxonomy()]["label"] | |
| fig = px.line( | |
| df, | |
| x="year", | |
| y=column, | |
| color="occupation", | |
| markers=True, | |
| labels={ | |
| "year": "Year", | |
| column: input.metric(), | |
| "occupation": "Occupation", | |
| }, | |
| title=f"{input.metric()} ({weight_label}, {taxonomy_label})", | |
| ) | |
| fig.update_layout(legend_title="Occupation", hovermode="x unified") | |
| return fig | |
| with ui.card(): | |
| ui.card_header("Latest Year Occupations") | |
| def latest_year_bar(): | |
| df = latest_year_top_data() | |
| column = metric_column() | |
| if df.empty: | |
| return px.bar( | |
| title="No data available for the current filters." | |
| ) | |
| latest_year = int(df["latest_year"].iloc[0]) | |
| weight_label = weight_label_from_column(column) | |
| taxonomy_label = TAXONOMY_CONFIG[input.taxonomy()]["label"] | |
| sorted_df = df.sort_values(column, ascending=not input.descending()) | |
| top_n = int(input.top_n()) | |
| display_count = len(sorted_df) | |
| if top_n <= 0: | |
| group_label = f"All {display_count} occupations" | |
| else: | |
| group_label = f"Showing {display_count} occupations" | |
| fig = px.bar( | |
| sorted_df, | |
| x=column, | |
| y="occupation", | |
| orientation="h", | |
| labels={ | |
| column: input.metric(), | |
| "occupation": "Occupation", | |
| }, | |
| title=( | |
| f"{input.metric()} ({weight_label}, {taxonomy_label}) โ " | |
| f"{group_label} in {latest_year}" | |
| ), | |
| ) | |
| fig.update_layout(hovermode="closest") | |
| fig.update_yaxes( | |
| categoryorder="array", | |
| categoryarray=sorted_df["occupation"].tolist(), | |
| autorange="reversed", | |
| ) | |
| return fig | |
| with ui.nav_panel("Compare"): | |
| with ui.card(): | |
| ui.card_header("Comparison Controls") | |
| ui.input_slider( | |
| "comparison_year", | |
| "Comparison year", | |
| min=2010, | |
| max=2023, | |
| value=2023, | |
| step=1, | |
| sep="", | |
| ) | |
| ui.input_selectize( | |
| "comparison_occupations", | |
| "Occupations", | |
| choices=[], | |
| multiple=True, | |
| options={"placeholder": "Search occupations to compare"}, | |
| width="100%", | |
| ) | |
| ui.input_select( | |
| "comparison_weighting", | |
| "Weighting", | |
| WEIGHTING_CHOICE_MAP, | |
| selected="unweighted", | |
| ) | |
| with ui.card(): | |
| ui.card_header("Sub-index Comparison") | |
| def comparison_radar(): | |
| payload = comparison_radar_payload() | |
| df = payload["data"] | |
| if df.empty: | |
| return px.line_polar( | |
| title="Select at least one occupation to compare." | |
| ) | |
| metric_order = payload["metric_order"] | |
| year = payload["year"] | |
| weight_label = payload["weight_label"] | |
| fig = px.line_polar( | |
| df, | |
| r="Value", | |
| theta="Metric", | |
| color="occupation", | |
| category_orders={"Metric": metric_order}, | |
| line_close=True, | |
| ) | |
| fig.update_traces(fill="toself") | |
| fig.update_layout( | |
| title=(f"AI Sub-index radar comparison ({weight_label}, {year})"), | |
| legend_title="Occupation", | |
| polar=dict(radialaxis=dict(visible=True)), | |
| margin=dict(l=40, r=40, t=60, b=40), | |
| ) | |
| # fig.update_layout(width=700, height=700) | |
| return fig | |
| with ui.nav_panel("Data"): | |
| with ui.card(): | |
| ui.card_header("Download Options & Summary") | |
| ui.p("Select a format to download the filtered occupations data.") | |
| ui.div( | |
| ui.input_action_button( | |
| "download_csv_btn", | |
| "Download CSV", | |
| class_="btn btn-primary flex-fill", | |
| ), | |
| ui.input_action_button( | |
| "download_json_btn", | |
| "Download JSON", | |
| class_="btn btn-success flex-fill", | |
| ), | |
| ui.input_action_button( | |
| "download_xlsx_btn", | |
| "Download XLSX", | |
| class_="btn btn-info flex-fill", | |
| ), | |
| class_="d-flex flex-column flex-md-row gap-2 mb-3", | |
| ) | |
| ui.h5("Data Summary") | |
| def data_summary(): | |
| df = _table_dataframe() | |
| if df.empty: | |
| return "No data available for the current filters." | |
| column = metric_column() | |
| record_count = len(df) | |
| year_min = int(df["year"].min()) | |
| year_max = int(df["year"].max()) | |
| metric_label = input.metric() | |
| weighting = WEIGHTING_CHOICE_MAP.get(input.weighting(), "Unweighted") | |
| return ( | |
| f"Rows: {record_count}\n" | |
| f"Years: {year_min}โ{year_max}\n" | |
| f"Metric: {metric_label}\n" | |
| f"Weighting: {weighting}" | |
| ) | |
| with ui.card(): | |
| ui.card_header("Filtered occupations") | |
| def occupations_table(): | |
| df = _table_dataframe() | |
| return DataTable( | |
| df, | |
| filters=True, | |
| summary=True, | |
| height="500px", | |
| width="100%", | |
| ) | |
| def trigger_csv_download(): | |
| df = _table_dataframe() | |
| csv_data = df.to_csv(index=False) | |
| filename = f"occupations_{datetime.now():%Y-%m-%d}.csv" | |
| data_json = json.dumps(csv_data) | |
| filename_json = json.dumps(filename) | |
| script = ( | |
| "<script>(function(){const dataStr = " | |
| f"{data_json};" | |
| "const element=document.createElement('a');" | |
| "element.href='data:text/csv;charset=utf-8,'+encodeURIComponent(dataStr);" | |
| f"element.download={filename_json};" | |
| "document.body.appendChild(element);" | |
| "element.click();" | |
| "document.body.removeChild(element);" | |
| "})();</script>" | |
| ) | |
| return ui.HTML(script) | |
| def trigger_json_download(): | |
| df = _table_dataframe() | |
| json_str = df.to_json(orient="records", indent=2) | |
| filename = f"occupations_{datetime.now():%Y-%m-%d}.json" | |
| data_json = json.dumps(json_str) | |
| filename_json = json.dumps(filename) | |
| script = ( | |
| "<script>(function(){const dataStr = " | |
| f"{data_json};" | |
| "const element=document.createElement('a');" | |
| "element.href='data:application/json;charset=utf-8,'+encodeURIComponent(dataStr);" | |
| f"element.download={filename_json};" | |
| "document.body.appendChild(element);" | |
| "element.click();" | |
| "document.body.removeChild(element);" | |
| "})();</script>" | |
| ) | |
| return ui.HTML(script) | |
| def trigger_xlsx_download(): | |
| df = _table_dataframe() | |
| xlsx_bytes = _dataframe_to_xlsx_bytes(df) | |
| filename = f"occupations_{datetime.now():%Y-%m-%d}.xlsx" | |
| b64_data = base64.b64encode(xlsx_bytes).decode("utf-8") | |
| filename_json = json.dumps(filename) | |
| script = ( | |
| "<script>(function(){const element=document.createElement('a');" | |
| "element.href='data:application/vnd.openxmlformats-officedocument." | |
| "spreadsheetml.sheet;base64," | |
| f"{b64_data}';" | |
| f"element.download={filename_json};" | |
| "document.body.appendChild(element);" | |
| "element.click();" | |
| "document.body.removeChild(element);" | |
| "})();</script>" | |
| ) | |
| return ui.HTML(script) | |