dummy_app / app.py
joseph-data's picture
dummy app start
1264273 unverified
#!/usr/bin/env python3
from __future__ import annotations
import base64
import json
from datetime import datetime
from io import BytesIO
from pathlib import Path
from typing import Dict, Iterable, List, Tuple
import pandas as pd
from shiny import reactive
from shiny.express import input, render, ui
from shinywidgets import render_plotly
from shinyswatch import theme
from shiny.render import DataTable
from zipfile import ZipFile, ZIP_DEFLATED
from xml.sax.saxutils import escape
from shinyswatch import theme
import plotly.express as px
# ---------------------------------------------------------------------
# Data configuration
# ---------------------------------------------------------------------
ROOT = Path(__file__).resolve().parent
DATA_DIR = ROOT / "data"
WEIGHTING_OPTIONS: List[Tuple[str, str]] = [
("Unweighted", "unweighted"),
("Level 1 weights", "level1"),
("Level 2 weights", "level2"),
("Level 3 weights", "level3"),
]
WEIGHT_SUFFIXES: Dict[str, str] = {
"unweighted": "",
"level1": "_w_code1",
"level2": "_w_code2",
"level3": "_w_code3",
}
WEIGHT_LABELS: Dict[str, str] = {value: label for label, value in WEIGHTING_OPTIONS}
WEIGHTING_CHOICE_MAP: Dict[str, str] = {
value: label for label, value in WEIGHTING_OPTIONS
}
def weight_choice_label(choice: str) -> str:
return WEIGHT_LABELS.get(choice, WEIGHT_LABELS.get("unweighted", "Unweighted"))
TAXONOMY_CONFIG: Dict[str, Dict[str, object]] = {
"๐Ÿ‡ธ๐Ÿ‡ช ssyk2012": {
"label": "SSYK 2012",
"data_path": DATA_DIR / "daioe_processed" / "daioe_ssyk2012_weighted.csv",
"code_column": "ssyk2012_4",
"supports_weights": True,
"weight_suffixes": WEIGHT_SUFFIXES,
},
"๐Ÿ‡ธ๐Ÿ‡ช ssyk96": {
"label": "SSYK 1996",
"data_path": DATA_DIR / "daioe_processed" / "daioe_ssyk96_weighted.csv",
"code_column": "ssyk96_4",
"supports_weights": True,
"weight_suffixes": WEIGHT_SUFFIXES,
},
}
METRIC_OPTIONS: List[Tuple[str, str]] = [
("๐Ÿ“š All Applications", "allapps"),
("โ™Ÿ๏ธ Abstract strategy games", "stratgames"),
("๐ŸŽฎ Real-time video games", "videogames"),
("๐Ÿ–ผ๏ธ๐Ÿ”Ž Image recognition", "imgrec"),
("๐Ÿงฉ๐Ÿ–ผ๏ธ Image comprehension", "imgcompr"),
("๐Ÿ–Œ๏ธ๐Ÿ–ผ๏ธ Image generation", "imggen"),
("๐Ÿ“– Reading comprehension", "readcompr"),
("โœ๏ธ๐Ÿค– Language modelling", "lngmod"),
("๐ŸŒ๐Ÿ”ค Translation", "translat"),
("๐Ÿ—ฃ๏ธ๐ŸŽ™๏ธ Speech recognition", "speechrec"),
("๐Ÿง โœจ Generative AI", "genai"),
]
# ---------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------
def clean_code(series: pd.Series) -> pd.Series:
return series.astype(str).str.extract(r"(\d{1,4})")[0].fillna("").str.zfill(4)
def ensure_numeric(df: pd.DataFrame, columns: Iterable[str]) -> None:
for column in columns:
if column in df.columns:
df[column] = pd.to_numeric(df[column], errors="coerce")
def load_dataset(tax_id: str) -> pd.DataFrame:
cfg = TAXONOMY_CONFIG[tax_id]
path = cfg["data_path"]
if not path.exists():
raise FileNotFoundError(f"Dataset not found for {tax_id}: {path}")
df = pd.read_csv(path)
if "year" in df.columns:
df["year"] = pd.to_numeric(df["year"], errors="coerce")
df = df.dropna(subset=["year"])
df["year"] = df["year"].astype(int)
code_col = cfg["code_column"]
raw_code = df[code_col].astype(str)
df["code"] = clean_code(raw_code)
if "occupation" not in df.columns or df["occupation"].isna().all():
df["occupation"] = (
raw_code.str.replace(r"^\d{1,4}\s*", "", regex=True)
.str.strip()
.replace("", pd.NA)
)
df["occupation"] = df["occupation"].fillna(df["code"])
metric_columns: List[str] = [
f"daioe_{metric_suffix}"
for _, metric_suffix in METRIC_OPTIONS
if f"daioe_{metric_suffix}" in df.columns
]
weight_suffixes = cfg.get("weight_suffixes", {})
extra_suffixes = {
suffix
for suffix in weight_suffixes.values()
if isinstance(suffix, str) and suffix
}
for weight_suffix in extra_suffixes:
metric_columns.extend(
f"daioe_{metric_suffix}{weight_suffix}"
for _, metric_suffix in METRIC_OPTIONS
if f"daioe_{metric_suffix}{weight_suffix}" in df.columns
)
ensure_numeric(df, metric_columns)
return df
def metric_suffix(label: str) -> str:
for display, suffix in METRIC_OPTIONS:
if display == label:
return suffix
raise KeyError(f"Unknown metric label '{label}'")
def weight_label_from_column(column: str) -> str:
default_label = WEIGHT_LABELS.get("unweighted", "Unweighted")
for choice, suffix in WEIGHT_SUFFIXES.items():
if suffix and column.endswith(suffix):
return WEIGHT_LABELS.get(choice, default_label)
return default_label
def resolve_metric_column(
df: pd.DataFrame,
metric_suffix_value: str,
cfg: Dict[str, object],
weight_choice: str,
) -> str:
base_column = f"daioe_{metric_suffix_value}"
weight_suffixes = cfg.get("weight_suffixes", {})
selected_suffix = weight_suffixes.get(weight_choice, "")
if isinstance(selected_suffix, str) and selected_suffix:
weighted_column = f"{base_column}{selected_suffix}"
if weighted_column in df.columns:
return weighted_column
return base_column
# ---------------------------------------------------------------------
# Reactive calculations
# ---------------------------------------------------------------------
@reactive.calc
def current_dataset() -> pd.DataFrame:
return load_dataset(input.taxonomy())
@reactive.calc
def metric_column() -> str:
suffix = metric_suffix(input.metric())
cfg = TAXONOMY_CONFIG[input.taxonomy()]
df = current_dataset()
weight_choice = input.weighting()
return resolve_metric_column(df, suffix, cfg, weight_choice)
@reactive.calc
def filtered_data() -> pd.DataFrame:
df = current_dataset()
column = metric_column()
if df.empty or column not in df.columns:
return df.iloc[0:0]
start_year, end_year = input.year_range()
df = df[(df["year"] >= start_year) & (df["year"] <= end_year)]
query = input.search().strip()
if query:
df = df[df["occupation"].str.contains(query, case=False, na=False)]
df = df.dropna(subset=[column])
code_order: List[str] | None = None
top_n = int(input.top_n())
if not df.empty:
latest_year = df["year"].max()
ranking = df[df["year"] == latest_year].sort_values(
column, ascending=not input.descending()
)
ranked_codes = list(dict.fromkeys(ranking["code"].tolist()))
if top_n > 0:
selected_codes = ranked_codes[:top_n]
df = df[df["code"].isin(selected_codes)]
code_order = selected_codes
else:
existing_codes = list(dict.fromkeys(df["code"].tolist()))
full_order = list(dict.fromkeys(ranked_codes + existing_codes))
if full_order:
code_order = full_order
if df.empty:
return df
if code_order:
df = df.assign(
_code_order=pd.Categorical(df["code"], categories=code_order, ordered=True)
).sort_values(["_code_order", "year"])
df = df.drop(columns="_code_order")
else:
df = df.sort_values(["occupation", "year"])
return df
@reactive.calc
def latest_year_top_data() -> pd.DataFrame:
df = current_dataset()
column = metric_column()
if df.empty or column not in df.columns:
return df.iloc[0:0]
start_year, end_year = input.year_range()
df = df[(df["year"] >= start_year) & (df["year"] <= end_year)]
query = input.search().strip()
if query:
df = df[df["occupation"].str.contains(query, case=False, na=False)]
df = df.dropna(subset=[column])
if df.empty:
return df
latest_year = int(df["year"].max())
df_latest = (
df[df["year"] == latest_year]
.copy()
.sort_values(column, ascending=not input.descending())
)
top_n = int(input.top_n())
if top_n > 0:
df_latest = df_latest.head(top_n)
df_latest["latest_year"] = latest_year
return df_latest
@reactive.calc
def comparison_radar_payload() -> Dict[str, object]:
empty_payload: Dict[str, object] = {
"data": pd.DataFrame(columns=["occupation", "Metric", "Value"]),
"metric_order": [],
"year": None,
"weight_label": weight_choice_label("unweighted"),
}
df = current_dataset()
if df.empty or "year" not in df.columns:
return empty_payload
cfg = TAXONOMY_CONFIG[input.taxonomy()]
comparison_year_input = getattr(input, "comparison_year", None)
if comparison_year_input is None:
year_value = int(df["year"].max())
else:
year_raw = comparison_year_input()
if isinstance(year_raw, (list, tuple)):
year_raw = year_raw[-1]
if pd.isna(year_raw):
return empty_payload
year_value = int(year_raw)
if year_value not in df["year"].values:
return empty_payload
df_year = df[df["year"] == year_value]
if df_year.empty:
return empty_payload
comparison_occ_input = getattr(input, "comparison_occupations", None)
if comparison_occ_input is None:
return empty_payload
selected_occupations = comparison_occ_input()
if not selected_occupations:
return empty_payload
df_year = df_year[df_year["occupation"].isin(selected_occupations)]
if df_year.empty:
return empty_payload
comparison_weight_input = getattr(input, "comparison_weighting", None)
if comparison_weight_input is None:
weight_choice = "unweighted"
else:
weight_choice = comparison_weight_input()
metric_map: List[Tuple[str, str]] = []
for label, suffix in METRIC_OPTIONS:
column_name = resolve_metric_column(df_year, suffix, cfg, weight_choice)
if column_name in df_year.columns:
metric_map.append((label, column_name))
if not metric_map:
return empty_payload
metric_columns = [column_name for _, column_name in metric_map]
weight_labels_in_use = {
weight_label_from_column(column_name) for column_name in metric_columns
}
if len(weight_labels_in_use) == 1:
weight_label_display = weight_labels_in_use.pop()
else:
weight_label_display = ", ".join(sorted(weight_labels_in_use))
aggregated = df_year.groupby("occupation", as_index=False)[metric_columns].mean(
numeric_only=True
)
if aggregated.empty:
return empty_payload
rename_map = {column_name: label for label, column_name in metric_map}
melted = (
aggregated.rename(columns=rename_map)
.melt(id_vars="occupation", var_name="Metric", value_name="Value")
.dropna(subset=["Value"])
)
if melted.empty:
return empty_payload
metric_order = [label for label, _ in metric_map]
melted["Metric"] = pd.Categorical(
melted["Metric"], categories=metric_order, ordered=True
)
melted = melted.sort_values(["occupation", "Metric"])
return {
"data": melted,
"metric_order": metric_order,
"year": year_value,
"weight_label": weight_label_display,
}
@reactive.effect
def _update_year_slider():
df = current_dataset()
if df.empty or "year" not in df.columns:
return
min_year, max_year = int(df["year"].min()), int(df["year"].max())
ui.update_slider(
"year_range", min=min_year, max=max_year, value=(min_year, max_year)
)
comparison_slider = getattr(input, "comparison_year", None)
if comparison_slider is not None:
current_value = comparison_slider()
if isinstance(current_value, (list, tuple)):
current_value = current_value[-1]
if pd.isna(current_value):
current_value = max_year
else:
current_value = max_year
current_value = int(max(min_year, min(max_year, current_value)))
ui.update_slider("comparison_year", min=min_year, max=max_year, value=current_value)
@reactive.effect
def _update_comparison_controls():
df = current_dataset()
if df.empty or "year" not in df.columns:
ui.update_selectize("comparison_occupations", choices=[], selected=[])
return
comparison_slider = getattr(input, "comparison_year", None)
if comparison_slider is None:
comparison_year = int(df["year"].max())
else:
year_raw = comparison_slider()
if isinstance(year_raw, (list, tuple)):
year_raw = year_raw[-1]
if pd.isna(year_raw):
comparison_year = int(df["year"].max())
else:
comparison_year = int(year_raw)
df_year = df[df["year"] == comparison_year]
occupations = sorted(
df_year["occupation"].dropna().unique().tolist(), key=str.casefold
)
comparison_occ_input = getattr(input, "comparison_occupations", None)
if comparison_occ_input is not None:
current_selection = comparison_occ_input()
else:
current_selection = []
if isinstance(current_selection, str):
current_selection = [current_selection]
valid_selection = [occ for occ in current_selection if occ in occupations]
ui.update_selectize(
"comparison_occupations",
choices=occupations,
selected=valid_selection,
)
# ---------------------------------------------------------------------
# Outputs
# ---------------------------------------------------------------------
# @render.data_frame
# def occupations_table():
# df = filtered_data()
# column = metric_column()
# columns = ["code", "occupation", "year", column]
# available = [c for c in columns if c in df.columns]
# return df[available]
def _table_dataframe() -> pd.DataFrame:
df = filtered_data()
column = metric_column()
if df.empty or column not in df.columns:
return df.iloc[0:0]
columns = ["code", "occupation", "year", column]
available = [c for c in columns if c in df.columns]
return df[available]
def _excel_column_name(index: int) -> str:
"""Convert 1-based column index to Excel-style letters."""
name = ""
while index > 0:
index, remainder = divmod(index - 1, 26)
name = chr(65 + remainder) + name
return name
def _dataframe_to_xlsx_bytes(
df: pd.DataFrame, sheet_name: str = "Occupations"
) -> bytes:
"""Create a minimal XLSX file from a DataFrame without external dependencies."""
content_types = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types">
<Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/>
<Default Extension="xml" ContentType="application/xml"/>
<Override PartName="/xl/worksheets/sheet1.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.worksheet+xml"/>
<Override PartName="/xl/workbook.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet.main+xml"/>
<Override PartName="/xl/styles.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.styles+xml"/>
</Types>
"""
root_rels = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">
<Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" Target="xl/workbook.xml"/>
</Relationships>
"""
workbook = f"""<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<workbook xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main"
xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships">
<sheets>
<sheet name="{escape(sheet_name)}" sheetId="1" r:id="rId1"/>
</sheets>
</workbook>
"""
workbook_rels = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">
<Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/worksheet" Target="worksheets/sheet1.xml"/>
<Relationship Id="rId2" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/styles" Target="styles.xml"/>
</Relationships>
"""
styles = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<styleSheet xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main">
<fonts count="1"><font><sz val="11"/><color theme="1"/><name val="Calibri"/><family val="2"/></font></fonts>
<fills count="1"><fill><patternFill patternType="none"/></fill></fills>
<borders count="1"><border/></borders>
<cellStyleXfs count="1"><xf numFmtId="0" fontId="0" fillId="0" borderId="0"/></cellStyleXfs>
<cellXfs count="1"><xf numFmtId="0" fontId="0" fillId="0" borderId="0" xfId="0"/></cellXfs>
<cellStyles count="1"><cellStyle name="Normal" xfId="0" builtinId="0"/></cellStyles>
</styleSheet>
"""
rows_xml: List[str] = []
def cell_xml(col_idx: int, row_idx: int, value) -> str:
ref = f"{_excel_column_name(col_idx)}{row_idx}"
if pd.isna(value):
return f'<c r="{ref}"/>'
if isinstance(value, (int, float)) and not isinstance(value, bool):
return f'<c r="{ref}"><v>{value}</v></c>'
return f'<c r="{ref}" t="str"><v>{escape(str(value))}</v></c>'
# Header row
header_cells = [
cell_xml(idx, 1, column) for idx, column in enumerate(df.columns, start=1)
]
rows_xml.append(f'<row r="1">{"".join(header_cells)}</row>')
for row_offset, (_, row) in enumerate(df.iterrows(), start=2):
cells = [
cell_xml(col_idx, row_offset, row[col_name])
for col_idx, col_name in enumerate(df.columns, start=1)
]
rows_xml.append(f'<row r="{row_offset}">{"".join(cells)}</row>')
sheet_xml = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<worksheet xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main" '
'xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships">'
"<sheetData>" + "".join(rows_xml) + "</sheetData></worksheet>"
)
buf = BytesIO()
with ZipFile(buf, "w", ZIP_DEFLATED) as zf:
zf.writestr("[Content_Types].xml", content_types)
zf.writestr("_rels/.rels", root_rels)
zf.writestr("xl/workbook.xml", workbook)
zf.writestr("xl/_rels/workbook.xml.rels", workbook_rels)
zf.writestr("xl/styles.xml", styles)
zf.writestr("xl/worksheets/sheet1.xml", sheet_xml)
buf.seek(0)
return buf.read()
# ---------------------------------------------------------------------
# UI (Shiny Express style)
# ---------------------------------------------------------------------
ui.page_opts(full_width=True, theme=theme.flatly)
with ui.navset_tab(id="view"):
with ui.nav_panel("Explore"):
with ui.layout_sidebar():
with ui.sidebar(open="desktop"):
taxonomy_choices = {
key: cfg["label"] for key, cfg in TAXONOMY_CONFIG.items()
}
metric_labels = [label for label, _ in METRIC_OPTIONS]
ui.input_select(
"taxonomy", "Taxonomy", taxonomy_choices, selected="ssyk2012"
)
ui.input_select(
"metric", "Sub-index", metric_labels, selected=metric_labels[0]
)
ui.input_select(
"weighting",
"Weighting",
WEIGHTING_CHOICE_MAP,
selected="unweighted",
)
ui.input_slider(
"year_range", "Year range", 2010, 2023, value=(2010, 2023)
)
ui.input_slider(
"top_n",
"Occupations to display (0 = all)",
min=0,
max=20,
value=10,
step=1,
)
ui.input_switch("descending", "Sort descending", True)
ui.input_text("search", "Search occupation", placeholder="e.g. manager")
with ui.card():
ui.card_header("Exposure Trends")
@render_plotly
def trend_plot():
df = filtered_data()
column = metric_column()
if df.empty:
return px.line(
title="No data available for the current filters."
)
weight_label = weight_label_from_column(column)
taxonomy_label = TAXONOMY_CONFIG[input.taxonomy()]["label"]
fig = px.line(
df,
x="year",
y=column,
color="occupation",
markers=True,
labels={
"year": "Year",
column: input.metric(),
"occupation": "Occupation",
},
title=f"{input.metric()} ({weight_label}, {taxonomy_label})",
)
fig.update_layout(legend_title="Occupation", hovermode="x unified")
return fig
with ui.card():
ui.card_header("Latest Year Occupations")
@render_plotly
def latest_year_bar():
df = latest_year_top_data()
column = metric_column()
if df.empty:
return px.bar(
title="No data available for the current filters."
)
latest_year = int(df["latest_year"].iloc[0])
weight_label = weight_label_from_column(column)
taxonomy_label = TAXONOMY_CONFIG[input.taxonomy()]["label"]
sorted_df = df.sort_values(column, ascending=not input.descending())
top_n = int(input.top_n())
display_count = len(sorted_df)
if top_n <= 0:
group_label = f"All {display_count} occupations"
else:
group_label = f"Showing {display_count} occupations"
fig = px.bar(
sorted_df,
x=column,
y="occupation",
orientation="h",
labels={
column: input.metric(),
"occupation": "Occupation",
},
title=(
f"{input.metric()} ({weight_label}, {taxonomy_label}) โ€” "
f"{group_label} in {latest_year}"
),
)
fig.update_layout(hovermode="closest")
fig.update_yaxes(
categoryorder="array",
categoryarray=sorted_df["occupation"].tolist(),
autorange="reversed",
)
return fig
with ui.nav_panel("Compare"):
with ui.card():
ui.card_header("Comparison Controls")
ui.input_slider(
"comparison_year",
"Comparison year",
min=2010,
max=2023,
value=2023,
step=1,
sep="",
)
ui.input_selectize(
"comparison_occupations",
"Occupations",
choices=[],
multiple=True,
options={"placeholder": "Search occupations to compare"},
width="100%",
)
ui.input_select(
"comparison_weighting",
"Weighting",
WEIGHTING_CHOICE_MAP,
selected="unweighted",
)
with ui.card():
ui.card_header("Sub-index Comparison")
@render_plotly
def comparison_radar():
payload = comparison_radar_payload()
df = payload["data"]
if df.empty:
return px.line_polar(
title="Select at least one occupation to compare."
)
metric_order = payload["metric_order"]
year = payload["year"]
weight_label = payload["weight_label"]
fig = px.line_polar(
df,
r="Value",
theta="Metric",
color="occupation",
category_orders={"Metric": metric_order},
line_close=True,
)
fig.update_traces(fill="toself")
fig.update_layout(
title=(f"AI Sub-index radar comparison ({weight_label}, {year})"),
legend_title="Occupation",
polar=dict(radialaxis=dict(visible=True)),
margin=dict(l=40, r=40, t=60, b=40),
)
# fig.update_layout(width=700, height=700)
return fig
with ui.nav_panel("Data"):
with ui.card():
ui.card_header("Download Options & Summary")
ui.p("Select a format to download the filtered occupations data.")
ui.div(
ui.input_action_button(
"download_csv_btn",
"Download CSV",
class_="btn btn-primary flex-fill",
),
ui.input_action_button(
"download_json_btn",
"Download JSON",
class_="btn btn-success flex-fill",
),
ui.input_action_button(
"download_xlsx_btn",
"Download XLSX",
class_="btn btn-info flex-fill",
),
class_="d-flex flex-column flex-md-row gap-2 mb-3",
)
ui.h5("Data Summary")
@render.text
def data_summary():
df = _table_dataframe()
if df.empty:
return "No data available for the current filters."
column = metric_column()
record_count = len(df)
year_min = int(df["year"].min())
year_max = int(df["year"].max())
metric_label = input.metric()
weighting = WEIGHTING_CHOICE_MAP.get(input.weighting(), "Unweighted")
return (
f"Rows: {record_count}\n"
f"Years: {year_min}โ€“{year_max}\n"
f"Metric: {metric_label}\n"
f"Weighting: {weighting}"
)
with ui.card():
ui.card_header("Filtered occupations")
@render.data_frame
def occupations_table():
df = _table_dataframe()
return DataTable(
df,
filters=True,
summary=True,
height="500px",
width="100%",
)
@render.ui
@reactive.event(input.download_csv_btn)
def trigger_csv_download():
df = _table_dataframe()
csv_data = df.to_csv(index=False)
filename = f"occupations_{datetime.now():%Y-%m-%d}.csv"
data_json = json.dumps(csv_data)
filename_json = json.dumps(filename)
script = (
"<script>(function(){const dataStr = "
f"{data_json};"
"const element=document.createElement('a');"
"element.href='data:text/csv;charset=utf-8,'+encodeURIComponent(dataStr);"
f"element.download={filename_json};"
"document.body.appendChild(element);"
"element.click();"
"document.body.removeChild(element);"
"})();</script>"
)
return ui.HTML(script)
@render.ui
@reactive.event(input.download_json_btn)
def trigger_json_download():
df = _table_dataframe()
json_str = df.to_json(orient="records", indent=2)
filename = f"occupations_{datetime.now():%Y-%m-%d}.json"
data_json = json.dumps(json_str)
filename_json = json.dumps(filename)
script = (
"<script>(function(){const dataStr = "
f"{data_json};"
"const element=document.createElement('a');"
"element.href='data:application/json;charset=utf-8,'+encodeURIComponent(dataStr);"
f"element.download={filename_json};"
"document.body.appendChild(element);"
"element.click();"
"document.body.removeChild(element);"
"})();</script>"
)
return ui.HTML(script)
@render.ui
@reactive.event(input.download_xlsx_btn)
def trigger_xlsx_download():
df = _table_dataframe()
xlsx_bytes = _dataframe_to_xlsx_bytes(df)
filename = f"occupations_{datetime.now():%Y-%m-%d}.xlsx"
b64_data = base64.b64encode(xlsx_bytes).decode("utf-8")
filename_json = json.dumps(filename)
script = (
"<script>(function(){const element=document.createElement('a');"
"element.href='data:application/vnd.openxmlformats-officedocument."
"spreadsheetml.sheet;base64,"
f"{b64_data}';"
f"element.download={filename_json};"
"document.body.appendChild(element);"
"element.click();"
"document.body.removeChild(element);"
"})();</script>"
)
return ui.HTML(script)