# ----------------- Module Imports -----------------
# Import standard Python and third-party packages used for data processing and visualization
import zipfile
import pandas as pd
import altair as alt
import numpy as np
import streamlit as st
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap, TwoSlopeNorm
import pydeck as pdk
from typing import Optional
# ----------------- DataFrame Filter Helper -----------------
# This function adds an interactive filtering UI for a DataFrame in Streamlit.
# Users can toggle filtering via a checkbox and then choose columns to filter by.
# It supports:
# - categorical columns: multiselect
# - numerical columns: range slider
# - text columns: substring or regex matching
def filter_dataframe(df: pd.DataFrame, key_prefix: str = "") -> pd.DataFrame:
"""Adds a UI on top of a dataframe to let viewers filter columns."""
modify = st.checkbox("Add filters", value=False, key=f"{key_prefix}_filter_toggle")
if not modify:
return df
df = df.copy()
modification_container = st.container()
with modification_container:
to_filter_columns = st.multiselect("Filter dataframe on", df.columns, key=f"{key_prefix}_filter_cols")
for column in to_filter_columns:
if pd.api.types.is_categorical_dtype(df[column]) or df[column].nunique() < 10:
user_cat_input = st.multiselect(
f"Values for {column}",
df[column].unique(),
default=df[column].unique(),
key=f"{key_prefix}_{column}_cat"
)
df = df[df[column].isin(user_cat_input)]
elif pd.api.types.is_numeric_dtype(df[column]):
_min = float(df[column].min())
_max = float(df[column].max())
step = (_max - _min) / 100 if _max != _min else 1
user_num_input = st.slider(
f"Values for {column}",
min_value=_min,
max_value=_max,
value=(_min, _max),
step=step,
key=f"{key_prefix}_{column}_num"
)
df = df[df[column].between(*user_num_input)]
else:
user_text_input = st.text_input(
f"Substring or regex in {column}",
key=f"{key_prefix}_{column}_text"
)
if user_text_input:
df = df[df[column].astype(str).str.contains(user_text_input, na=False)]
return df
# ----------------- Streamlit Page Setup -----------------
# Set layout to wide and define the app title
st.set_page_config(layout="wide")
st.title("Interactive Dashboard")
# Styled HTML introduction box shown at the top of the app
st.markdown(
"""
Welcome to the Agricultural Trade Scenario Dashboard
This interactive dashboard allows you to explore and compare the impact of different policy scenarios
on agricultural trade metrics over time and across regions.
- Step 1: Upload one or two ZIP files
- Step 2: Navigate through the tabs to view line charts, regional bar charts, Top # Bar Charts, heatmaps and Correlation Matrix .
- Optional: Uploading a second ZIP enables comparison of two shock scenarios.
🔒 File size limit: 5MB per ZIP. Ensure files are properly named and formatted.
""",
unsafe_allow_html=True
)
# ----------------- Upload Section-----------------
# File uploader for Shock 1 and optional Shock 2 ZIP files
zip_file_1 = st.file_uploader("Upload Shock 1 ZIP file", type="zip", key="zip1")
zip_file_2 = st.file_uploader("Upload Shock 2 ZIP file (optional)", type="zip", key="zip2")
# Define 20MB file size limit and validate each uploaded file
MAX_FILE_SIZE = 20 * 1024 * 1024 # 20MB
if zip_file_1 is not None and zip_file_1.size > MAX_FILE_SIZE:
st.error("Shock 1 ZIP file exceeds 20MB. Please upload a smaller file.")
zip_file_1 = None
if zip_file_2 is not None and zip_file_2.size > MAX_FILE_SIZE:
st.error("Shock 2 ZIP file exceeds 20MB. Please upload a smaller file.")
zip_file_2 = None
# ----------------- Load METADATA.csv from ZIP -----------------
# Load metadata file (METADATA.csv) from the given ZIP file
@st.cache_data
def load_metadata_from_zip(zip_file):
if zip_file is None:
return None
try:
with zipfile.ZipFile(zip_file, "r") as z:
meta_file = [f for f in z.namelist() if "METADATA.csv" in f]
if not meta_file:
return None
meta_df = pd.read_csv(z.open(meta_file[0]))
return meta_df
except Exception as e:
return None
# Display metadata for Shock 1 ZIP file (if available)
metadata_1 = load_metadata_from_zip(zip_file_1)
if metadata_1 is not None:
st.markdown(
""
"
Metadata for Shock 1
",
unsafe_allow_html=True,
)
for _, row in metadata_1.iterrows():
st.markdown(
f"
{row['field']}: {row['value']}
",
unsafe_allow_html=True,
)
st.markdown("
", unsafe_allow_html=True)
elif zip_file_1 is not None:
st.info("No metadata found in Shock 1 ZIP.")
# Display metadata for Shock 2 ZIP file (if available)
if zip_file_2 is not None:
metadata_2 = load_metadata_from_zip(zip_file_2)
if metadata_2 is not None:
st.markdown(
""
"
Metadata for Shock 2
",
unsafe_allow_html=True,
)
for _, row in metadata_2.iterrows():
st.markdown(
f"
{row['field']}: {row['value']}
",
unsafe_allow_html=True,
)
st.markdown("
", unsafe_allow_html=True)
else:
st.info("No metadata found in Shock 2 ZIP.")
# ----------------- Load Baseline Table (data_targets_adjusted.csv) -----------------
# This function searches for files containing 'data_targets_adjusted' in the ZIP archive
# and loads them as a baseline DataFrame (used for model calibration or projection reference)
@st.cache_data
def load_baseline_table(zip_file):
if zip_file is None:
return None
try:
with zipfile.ZipFile(zip_file, "r") as z:
baseline_candidates = [f for f in z.namelist() if "data_targets_adjusted" in f]
if not baseline_candidates:
return None
df_base = pd.read_csv(z.open(baseline_candidates[0]))
return df_base
except Exception:
return None
# ----------------- Force Filtering UI for Baseline Table -----------------
# Unlike filter_dataframe, this function does not require a checkbox toggle.
# The filter UI is always visible to the user.
def force_filter_dataframe(df: pd.DataFrame, key_prefix: str = "") -> pd.DataFrame:
"""Always show filter UI for a dataframe (no toggle checkbox)."""
df = df.copy()
modification_container = st.container()
with modification_container:
to_filter_columns = st.multiselect("Filter dataframe on", df.columns, key=f"{key_prefix}_filter_cols")
for column in to_filter_columns:
if pd.api.types.is_categorical_dtype(df[column]) or df[column].nunique() < 10:
user_cat_input = st.multiselect(
f"Values for {column}",
df[column].unique(),
default=df[column].unique(),
key=f"{key_prefix}_{column}_cat"
)
df = df[df[column].isin(user_cat_input)]
elif pd.api.types.is_numeric_dtype(df[column]):
_min = float(df[column].min())
_max = float(df[column].max())
step = (_max - _min) / 100 if _max != _min else 1
user_num_input = st.slider(
f"Values for {column}",
min_value=_min,
max_value=_max,
value=(_min, _max),
step=step,
key=f"{key_prefix}_{column}_num"
)
df = df[df[column].between(*user_num_input)]
else:
user_text_input = st.text_input(
f"Substring or regex in {column}",
key=f"{key_prefix}_{column}_text"
)
if user_text_input:
df = df[df[column].astype(str).str.contains(user_text_input, na=False)]
return df
# ----------------- Load and Display Baseline Table -----------------
# Load baseline data for Shock 1 and optionally Shock 2.
# Users can switch between the two baseline tables via radio buttons.
baseline_df_1 = load_baseline_table(zip_file_1)
baseline_df_2 = load_baseline_table(zip_file_2) if zip_file_2 else None
if baseline_df_1 is not None or baseline_df_2 is not None:
with st.expander("Baseline Table", expanded=False):
selected_baseline = st.radio(
"Select Baseline Source",
options=["Shock 1", "Shock 2"],
index=0 if baseline_df_1 is not None else 1,
horizontal=True,
key="baseline_radio"
)
if selected_baseline == "Shock 1" and baseline_df_1 is not None:
st.markdown("You can filter the baseline data from Shock 1 below:")
st.dataframe(force_filter_dataframe(baseline_df_1, key_prefix="baseline1"), use_container_width=True)
elif selected_baseline == "Shock 2" and baseline_df_2 is not None:
st.markdown("You can filter the baseline data from Shock 2 below:")
st.dataframe(force_filter_dataframe(baseline_df_2, key_prefix="baseline2"), use_container_width=True)
else:
st.warning(f"No baseline data found for {selected_baseline}.")
elif zip_file_1 is not None:
st.info("No `data_targets_adjusted.csv` found in Shock 1 ZIP.")
# ----------------- Load data_shocks.csv -----------------
# This function loads policy shock values (ratios or multipliers applied to the baseline).
@st.cache_data
def load_shocks_from_zip(zip_file):
if zip_file is None:
return None
try:
with zipfile.ZipFile(zip_file, "r") as z:
shock_file = [f for f in z.namelist() if "data_shocks.csv" in f]
if not shock_file:
return None
df = pd.read_csv(z.open(shock_file[0]))
return df
except Exception as e:
return None
# ----------------- Display Policy Shock Variables -----------------
# Allow users to either view all shock variables or only those that differ from neutral (value != 1).
def show_shock_value_only(df: pd.DataFrame, label: str):
st.markdown(f"### Policy Shock Variables ({label})")
show_option = st.radio(
f"View Option for {label}",
["Only show changed (non-neutral) values", "Show all variables"],
index=0,
horizontal=True,
key=f"shock_radio_{label}"
)
if show_option == "Only show changed (non-neutral) values":
if "value" in df.columns:
filtered_df = df[df["value"] != 1]
if filtered_df.empty:
st.info(f"No changed values found in {label} (all value = 1).")
else:
st.dataframe(filter_dataframe(filtered_df, key_prefix=label), use_container_width=True)
else:
st.warning(f"No 'value' column found in {label}.")
else:
st.dataframe(filter_dataframe(df, key_prefix=label), use_container_width=True)
# Load shock variables from both ZIPs
shock_df_1 = load_shocks_from_zip(zip_file_1)
shock_df_2 = load_shocks_from_zip(zip_file_2)
# Display shock variable tables inside an expander panel
with st.expander("Policy Shock Variables", expanded=False):
if shock_df_1 is not None:
show_shock_value_only(shock_df_1, "Shock 1")
if zip_file_2 is not None and shock_df_2 is not None:
show_shock_value_only(shock_df_2, "Shock 2")
# ----------------- Load & Process ZIP Pair: Baseline + Scenario -----------------
# This function loads the baseline and scenario data from the ZIP files,
# calculates the ratio or inferred ratio (shock effect), and merges all into one DataFrame.
@st.cache_data
def load_and_process_zip_pair(zip1, zip2):
def load_zip(zip_obj):
# Open and read ZIP file contents
with zipfile.ZipFile(zip_obj, "r") as z:
file_list = z.namelist()
# Try to locate baseline and scenario files by naming pattern
baseline_file = next((f for f in file_list if "baseline_projections.csv" in f or "data_targets_adjusted.csv" in f), None)
scenario_file = next((f for f in file_list if "data_scenario_solution" in f or "data_solution.csv" in f), None)
if not baseline_file or not scenario_file:
raise ValueError("ZIP file must contain either 'baseline_projections.csv' or 'data_targets_adjusted.csv', and 'data_scenario_solution_*.csv' or 'data_solution.csv'.")
# Load both baseline and scenario as DataFrames
df_baseline = pd.read_csv(z.open(baseline_file))
df_scenario = pd.read_csv(z.open(scenario_file))
# Keep only relevant columns in baseline
df_baseline = df_baseline[["region", "commodity", "year", "attribute", "value"]]
# If scenario file already has a 'ratio' column, use it directly
if "ratio" in df_scenario.columns:
df_scenario = df_scenario[["region", "commodity", "year", "attribute", "ratio"]]
# Otherwise, calculate ratio using scenario value / baseline value
elif "value" in df_scenario.columns:
df_scenario = df_scenario[["region", "commodity", "year", "attribute", "value"]]
df_scenario.rename(columns={"value": "value_scenario"}, inplace=True)
df_scenario = pd.merge(
df_baseline,
df_scenario,
on=["region", "commodity", "year", "attribute"],
how="left"
)
df_scenario["ratio"] = df_scenario["value_scenario"] / df_scenario["value"]
df_scenario = df_scenario[["region", "commodity", "year", "attribute", "ratio"]]
else:
raise ValueError("Scenario file must contain either a 'ratio' or 'value' column.")
# Extract label for the scenario
if "data_scenario_solution_" in scenario_file:
shock_name = scenario_file.replace("data_scenario_solution_", "").replace(".csv", "")
elif "data_solution.csv" in scenario_file:
shock_name = "Scenario"
else:
shock_name = "Unknown"
return df_baseline, df_scenario, shock_name
# Load baseline and scenario data for Shock 1
df_base1, df_shock1, shock1_name = load_zip(zip1)
# Merge baseline with scenario 1 and compute new values with shock
df = pd.merge(df_base1, df_shock1, on=["region", "commodity", "year", "attribute"], how="left")
df["value_with_shock1"] = df["value"] * df["ratio"]
shock2_name = None
# If Shock 2 is uploaded, merge its ratios too
if zip2 is not None:
_, df_shock2, shock2_name = load_zip(zip2)
df = pd.merge(df, df_shock2[["region", "commodity", "year", "attribute", "ratio"]],
on=["region", "commodity", "year", "attribute"],
how="left", suffixes=('', '_shock2'))
df["value_with_shock2"] = df["value"] * df["ratio_shock2"]
# Clean-up: force attribute names to uppercase, and year to integer
df["attribute"] = df["attribute"].str.upper()
df["year"] = df["year"].astype(int)
return df, shock1_name, shock2_name
# ----------------- Run logic: only proceed if ZIP 1 is uploaded -----------------
if zip_file_1 is None:
st.info("Please upload at least the Shock 1 ZIP file to begin.")
else:
# If files are ready, load and merge the datasets
df, shock1_label, shock2_label = load_and_process_zip_pair(zip_file_1, zip_file_2)
#----------------- TOP # REGION CHANGE BAR CHART -----------------
# This function extracts the top N regions with the largest absolute percent change for a given
# commodity, attribute, and year. It supports both shock1 and shock2 scenario filtering.
@st.cache_data
def filter_topn_data(df, topn_commodity, topn_attribute, topn_year, shock="shock1", top_n=10):
# Select relevant columns depending on which shock we're filtering
relevant_cols = ["region", "commodity", "year", "attribute", "ratio", "value"]
if "ratio_shock2" in df.columns and shock == "shock2":
relevant_cols = ["region", "commodity", "year", "attribute", "ratio_shock2", "value"]
df_filtered = df[relevant_cols].copy()
# Rename for consistency if using shock2
if shock == "shock2" and "ratio_shock2" in df_filtered.columns:
df_filtered = df_filtered.rename(columns={"ratio_shock2": "ratio"})
# Filter by user selections
df_filtered = df_filtered[
(df_filtered["commodity"] == topn_commodity) &
(df_filtered["attribute"] == topn_attribute) &
(df_filtered["year"] == topn_year)
]
# Calculate percent change and sort by absolute magnitude
df_filtered["pct_change"] = (df_filtered["ratio"] - 1) * 100
df_filtered["abs_change"] = df_filtered["pct_change"].abs()
df_filtered = df_filtered.dropna(subset=["pct_change"])
df_topn = df_filtered.sort_values("abs_change", ascending=False).head(top_n)
return df_topn
# ----------------- Build Top N Bar Chart -----------------
# Generates a horizontal bar chart using Altair for percent change by region
@st.cache_resource
def build_topn_bar_chart(topn_df):
# Color scale: red for positive, blue for negative
color_scale = alt.Scale(domain=[-100, 0, 100], range=["#4575b4", "white", "#d73027"])
# Mapping from region codes to full country names
country_mapping = {
"ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada",
"CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia",
"IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico",
"MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia",
"THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa"
}
topn_df = topn_df.copy()
topn_df["region_fullname"] = topn_df["region"].map(country_mapping)
topn_df["baseline_value"] = topn_df["value"]
# Compute shocked value using the appropriate ratio
if "ratio_shock2" in topn_df.columns:
topn_df["shocked_value"] = topn_df["baseline_value"] * topn_df["ratio_shock2"]
else:
topn_df["shocked_value"] = topn_df["baseline_value"] * topn_df["ratio"]
# Create the bar chart
chart = alt.Chart(topn_df).mark_bar().encode(
x=alt.X('pct_change:Q', title='Percent Change (%)'),
y=alt.Y('region:N', sort='-x', title='Region'),
color=alt.Color('pct_change:Q', scale=color_scale, legend=None),
tooltip=[
alt.Tooltip('region_fullname:N', title='Region'),
alt.Tooltip('pct_change:Q', title='Percent Change (%)', format=".2f"),
alt.Tooltip('baseline_value:Q', title='Baseline Value', format=".2f"),
alt.Tooltip('shocked_value:Q', title='Shocked Value', format=".2f")
]
).properties(width=700, height=400)
return chart
# ----------------- Streamlit UI for Top N Chart -----------------
# UI for user to select commodity, attribute, year, number of regions (Top N),
# and shock scenario to visualize Top N regional percent changes
with st.expander("Top # Regional Changes", expanded=False):
st.header("Top # Regions with Largest Changes")
st.markdown("**Note:** Top # is selected based on the largest absolute change (positive or negative).", unsafe_allow_html=True)
# User selects filtering parameters
topn_commodity = st.selectbox("Select Commodity for Top #", sorted(df["commodity"].unique()), key="topn_commodity")
topn_attribute = st.selectbox("Select Attribute for Top #", sorted(df["attribute"].unique()), key="topn_attribute")
topn_year = st.selectbox("Select Year for Top #", sorted([y for y in df["year"].dropna().unique() if y >= 2023]), key="topn_year")
topn_n = st.slider("Select Top #", min_value=5, max_value=20, value=10)
# Choose between Shock 1 and Shock 2 if both are available
if shock2_label:
shock_choice = st.radio("Select Shock Scenario", ["Shock 1", "Shock 2"], index=0, horizontal=True)
shock_key = "shock1" if shock_choice == "Shock 1" else "shock2"
else:
shock_key = "shock1"
# Filter topN data and render chart
topn_df = filter_topn_data(df, topn_commodity, topn_attribute, topn_year, shock=shock_key, top_n=topn_n)
if topn_df.empty:
st.warning("No data available for the selected combination.")
else:
chart = build_topn_bar_chart(topn_df)
st.altair_chart(chart, use_container_width=True)
# ----------------- Filter Data for Line Chart -----------------
# Returns a subset of the main dataframe matching selected region, commodity, and attribute.
@st.cache_data
def filter_data(df, region_code, commodity, attribute):
return df[
(df["region"] == region_code) &
(df["commodity"] == commodity) &
(df["attribute"] == attribute)
]
# ----------------- Build Line Chart -----------------
# Draw a time series plot of baseline and shock scenarios.
# Uses dashed/dotted lines and colors to distinguish each scenario.
@st.cache_resource
def build_line_chart(df_filtered, shock1_label, shock2_label):
# Y-axis label with attribute name and assumed units
y_label = df_filtered["attribute"].iloc[0].upper() + " (Unit: 1000 Metric tons)"
# Normalize display labels
shock1_display = "Shock 1" if shock1_label == "Scenario" else shock1_label
shock2_display = "Shock 2" if shock2_label == "Scenario" else shock2_label
# Line chart for baseline (solid)
line_base = alt.Chart(df_filtered).mark_line(point=True, color='darkblue').encode(
x=alt.X("year:O", title="Year"),
y=alt.Y("value:Q", title=y_label),
tooltip=["year", "value"]
)
# Dashed line for Shock 1 (post-2023)
line_shock1 = alt.Chart(df_filtered[df_filtered["year"] >= 2023]).mark_line(
point=True, color='darkred', strokeDash=[4, 2]
).encode(
x="year:O",
y="value_with_shock1:Q",
tooltip=["year", "value_with_shock1"]
)
# Optional: dotted line for Shock 2 (post-2023)
line_shock2 = None
if "value_with_shock2" in df_filtered.columns:
line_shock2 = alt.Chart(df_filtered[df_filtered["year"] >= 2023]).mark_line(
point=True, color='darkgreen', strokeDash=[2, 2]
).encode(
x="year:O",
y="value_with_shock2:Q",
tooltip=["year", "value_with_shock2"]
)
# Build a manual legend using line styles and colors
legend_items = [
{"Label": "Baseline", "LineType": "solid", "Color": "darkblue"},
{"Label": shock1_display, "LineType": "dash", "Color": "darkred"},
]
if shock2_label:
legend_items.append({"Label": shock2_display, "LineType": "dot", "Color": "darkgreen"})
legend_df = pd.DataFrame(legend_items)
legend_chart = alt.Chart(legend_df).mark_rule(size=3).encode(
y=alt.Y("Label:N", axis=alt.Axis(title=" ")),
color=alt.Color("Label:N", scale=alt.Scale(domain=legend_df["Label"].tolist(),
range=legend_df["Color"].tolist()), legend=None),
strokeDash=alt.StrokeDash("LineType:N", scale=alt.Scale(domain=["solid", "dash", "dot"], range=[[0], [4, 2], [2, 2]]))
).properties(width=100, height=40 * len(legend_items))
# Combine charts
lines = line_base + line_shock1
if line_shock2:
lines += line_shock2
return alt.hconcat(lines.properties(width=700, height=400), legend_chart)
# ----------------- Country Code Mapping -----------------
# Used for translating 3-letter codes to full region names for user display
country_mapping = {
"ARG": "Argentina",
"AUS": "Australia",
"BRA": "Brazil",
"CAN": "Canada",
"CHN": "China",
"EGY": "Egypt",
"EUR": "European Union",
"IDN": "Indonesia",
"IND": "India",
"JPN": "Japan",
"KOR": "South Korea",
"MEX": "Mexico",
"MYS": "Malaysia",
"PHL": "Philippines",
"ROW": "Rest of World",
"RUS": "Russia",
"THA": "Thailand",
"UKR": "Ukraine",
"USA": "United States",
"ZAF": "South Africa",
}
# ----------------- Streamlit UI for Line Chart -----------------
# UI for selecting country, commodity, and attribute, and rendering the time series line chart
with st.expander("Line Chart", expanded=False):
st.markdown("### Time Series: Baseline vs. Shocks")
# Convert region codes to full names for display
region_options = {v: k for k, v in country_mapping.items()}
selected_country_name = st.selectbox("Select Region:", sorted(region_options.keys()))
selected_region_code = region_options[selected_country_name]
# Select commodity and attribute
commodity = st.selectbox("Select Commodity:", sorted(df["commodity"].unique()))
attribute = st.selectbox("Select Attribute:", sorted(df["attribute"].unique()))
# Filter data for the selected combination
df_filtered = filter_data(df, selected_region_code, commodity, attribute)
# Render the line chart
chart = build_line_chart(df_filtered, shock1_label, shock2_label)
st.altair_chart(chart, use_container_width=True)
# ----------------- Filter Data for Regional Bar Chart -----------------
# Filters the dataset for a selected commodity, attribute, and year
@st.cache_data
def filter_bar_df(df, bar_commodity, bar_attribute, bar_year):
return df[
(df["commodity"] == bar_commodity) &
(df["attribute"] == bar_attribute) &
(df["year"] == bar_year)
]
columns = ["region", "value", "ratio"]
if "ratio_shock2" in df.columns:
columns.append("ratio_shock2")
columns.append("value_with_shock2") if "value_with_shock2" in df.columns else None
return df[
(df["commodity"] == bar_commodity) &
(df["attribute"] == bar_attribute) &
(df["year"] == bar_year)
][columns].copy()
# ----------------- Build Bar Chart -----------------
# Draws a bar chart comparing percent change in each region
# Optionally shows side-by-side bars if both shocks are available
@st.cache_resource
def build_bar_chart(bar_df, shock1_label, shock2_label):
# Label handling for display
label1 = "Shock 1" if shock1_label == "Scenario" else shock1_label
label2 = "Shock 2" if shock2_label == "Scenario" else shock2_label
# Country name mapping
country_mapping = {
"ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada",
"CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia",
"IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico",
"MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia",
"THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa",
}
# Adjust label spacing and bar width depending on # of shocks
if shock2_label:
bar_size, font_size, offset_pos, offset_neg = 15, 10, 0.5, -0.6
else:
bar_size, font_size, offset_pos, offset_neg = 25, 11, 1.0, -1.2
bar_df = bar_df.copy().dropna(subset=["ratio"])
bar_df["pct_change_1"] = (bar_df["ratio"] - 1) * 100
bar_df["shocked_value_1"] = bar_df["value"] * bar_df["ratio"]
# Handle second shock if available
if "ratio_shock2" in bar_df.columns and "value_with_shock2" in bar_df.columns:
bar_df["pct_change_2"] = (bar_df["ratio_shock2"] - 1) * 100
bar_df["shocked_value_2"] = bar_df["value_with_shock2"]
# Create two separate dataframes and then merge for side-by-side display
df1 = bar_df[["region", "value", "shocked_value_1", "pct_change_1"]].copy()
df1["Shock"] = label1
df1.rename(columns={
"value": "Baseline Value",
"shocked_value_1": "Shocked Value",
"pct_change_1": "Percent Change"
}, inplace=True)
df2 = bar_df[["region", "value", "shocked_value_2", "pct_change_2"]].copy()
df2["Shock"] = label2
df2.rename(columns={
"value": "Baseline Value",
"shocked_value_2": "Shocked Value",
"pct_change_2": "Percent Change"
}, inplace=True)
bar_df_melt = pd.concat([df1, df2], ignore_index=True)
else:
# Only one shock scenario
bar_df_melt = bar_df[["region", "value", "shocked_value_1", "pct_change_1"]].copy()
bar_df_melt["Shock"] = label1
bar_df_melt.rename(columns={
"value": "Baseline Value",
"shocked_value_1": "Shocked Value",
"pct_change_1": "Percent Change"
}, inplace=True)
# Add full region names and label adjustment
bar_df_melt["region_fullname"] = bar_df_melt["region"].map(country_mapping)
bar_df_melt["show_label"] = True
bar_df_melt["label_y"] = bar_df_melt["Percent Change"] + bar_df_melt["Percent Change"].apply(
lambda x: offset_pos if x >= 0 else offset_neg
)
# Allow toggling between shocks via legend
selection = alt.selection_multi(fields=["Shock"], bind="legend")
# Main bar chart
base = alt.Chart(bar_df_melt).encode(
x=alt.X(
"region:N",
title="Region",
axis=alt.Axis(labelAngle=0),
scale=alt.Scale(paddingInner=0.5, paddingOuter=0.1)
),
xOffset=alt.XOffset("Shock:N", sort=[label1, label2] if shock2_label else [label1]),
y=alt.Y("Percent Change:Q", title="Percent Change (%)"),
color=alt.Color(
"Shock:N",
scale=alt.Scale(domain=[label1, label2] if shock2_label else [label1],
range=["#6baed6", "#fd8d3c"] if shock2_label else ["#6baed6"])
)
)
bar = base.mark_bar(size=bar_size).encode(
tooltip=[
alt.Tooltip("region_fullname:N", title="Region"),
alt.Tooltip("Shock:N", title="Shock"),
alt.Tooltip("Percent Change:Q", title="Percent Change (%)", format=".2f"),
alt.Tooltip("Baseline Value:Q", title="Baseline Value", format=".2f"),
alt.Tooltip("Shocked Value:Q", title="Shocked Value", format=".2f")
]
).add_selection(selection).transform_filter(selection)
# Numeric labels on top of bars
text = base.mark_text(
align="center", fontSize=font_size
).encode(
y="label_y:Q",
text=alt.Text("Percent Change:Q", format=".1f")
).transform_filter("datum.show_label == true")
return (bar + text).properties(width=700, height=400)
# ----------------- Streamlit UI for Bar Chart -----------------
# User selects commodity, attribute, and year; chart shows percent changes across regions
with st.expander("Bar Chart", expanded=False):
st.header("Regional Percent Change Overview")
# UI selections
bar_commodity = st.selectbox("Select Commodity", sorted(df["commodity"].unique()), key="bar_commodity")
bar_attribute = st.selectbox("Select Attribute", sorted(df["attribute"].unique()), key="bar_attribute")
bar_year = st.selectbox("Select Year", sorted([y for y in df["year"].dropna().unique() if y >= 2023]), key="bar_year")
# Filter and render chart
bar_df = filter_bar_df(df, bar_commodity, bar_attribute, bar_year)
if bar_df.empty:
st.warning("No data available for the selected combination.")
else:
chart = build_bar_chart(bar_df, shock1_label, shock2_label)
st.altair_chart(chart, use_container_width=True)
# ----------------- Filter Data for Heatmap -----------------
# Extracts data for the selected year and attribute
@st.cache_data
def filter_heatmap_data(df, selected_year, selected_attribute_dynamic):
return df[(df["year"] == selected_year) & (df["attribute"] == selected_attribute_dynamic)]
# ----------------- Build Altair Heatmap(s) -----------------
# Builds one or two heatmaps (Shock 1 and optional Shock 2)
# Each heatmap shows % change by region and commodity
@st.cache_resource
def build_heatmaps_altair(df_year, shock1_label, shock2_label):
# Label setup
label1 = "Shock 1" if shock1_label == "Scenario" else shock1_label
label2 = "Shock 2" if shock2_label == "Scenario" else shock2_label
# Region name mapping
country_mapping = {
"ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada",
"CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia",
"IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico",
"MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia",
"THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa"
}
charts = []
# Handle 1 or 2 shocks dynamically
for label, ratio_col, shock_value_col in [
(label1, "ratio", "value_with_shock1"),
(label2, "ratio_shock2", "value_with_shock2")
] if shock2_label else [(label1, "ratio", "value_with_shock1")]:
# Skip if required columns are missing
if ratio_col not in df_year.columns or shock_value_col not in df_year.columns:
continue
# Prepare data
data = df_year[["region", "commodity", "value", ratio_col, shock_value_col]].copy()
data["percent_change"] = (data[ratio_col] - 1) * 100
data["region_fullname"] = data["region"].map(country_mapping)
data.rename(columns={
"value": "Baseline Value",
shock_value_col: "Shocked Value"
}, inplace=True)
# Create the heatmap chart
chart = alt.Chart(data).mark_rect().encode(
x=alt.X('commodity:N', title='Commodity', sort=alt.EncodingSortField(field="commodity", order="ascending"),
axis=alt.Axis(labelAngle=45)),
y=alt.Y('region_fullname:N', title='Region', sort=alt.EncodingSortField(field="region_fullname", order="ascending")),
color=alt.Color('percent_change:Q',
scale=alt.Scale(domain=[-100, 0, 100], range=["#4575b4", "white", "#d73027"]),
title="Percent Change (%)"),
tooltip=[
alt.Tooltip('region_fullname:N', title='Region'),
alt.Tooltip('commodity:N', title='Commodity'),
alt.Tooltip('percent_change:Q', title='Percent Change (%)', format='.1f'),
alt.Tooltip('Baseline Value:Q', title='Baseline Value', format=".2f"),
alt.Tooltip('Shocked Value:Q', title='Shocked Value', format=".2f")
]
).properties(
width=40 * data['commodity'].nunique(),
height=40 * data['region'].nunique(),
title=label
)
charts.append(chart)
# Combine charts horizontally
if not charts:
return None
return alt.hconcat(*charts).resolve_scale(color='independent').configure_axis(grid=True)
# ----------------- Streamlit UI for Heatmap -----------------
# User selects year and attribute, and the app renders 1 or 2 heatmaps
with st.expander("Heatmap", expanded=False):
st.header("Heatmap of Ratio by Selected Year")
# Dropdowns for user input
year_list = sorted([y for y in df["year"].dropna().unique() if y >= 2023])
selected_year = st.selectbox("Select Year for Heatmap", year_list)
selected_attribute_dynamic = st.selectbox("Select Attribute for Yearly Heatmap", sorted(df["attribute"].unique()), key="yearly_attr")
# Filter dataset
df_year = filter_heatmap_data(df, selected_year, selected_attribute_dynamic)
if df_year.empty:
st.warning("No data available for the selected combination.")
else:
chart = build_heatmaps_altair(df_year, shock1_label, shock2_label)
if chart is None:
st.warning("No valid data to display.")
else:
st.altair_chart(chart, use_container_width=True)
# ----------------- Ensure Percent Change Column Exists -----------------
# If not already created, compute percent change from ratio column.
if "pct_change" not in df.columns and "ratio" in df.columns:
df["pct_change"] = (df["ratio"] - 1) * 100
# ----------------- Compute Correlation Matrix -----------------
# Filters data by region, commodity, and years
# Then pivots the table to wide format and computes pairwise correlations
@st.cache_data
def compute_attribute_correlation(df, region, commodity, years):
# Filter by region, commodity, and selected years
df_filtered = df[
(df["region"] == region) &
(df["commodity"] == commodity) &
(df["year"].isin(years))
]
if df_filtered.empty:
return None, "❗ No data available for the selected filters."
# Pivot the data: attributes as columns, year as index
df_pivot = df_filtered.pivot_table(
index="year",
columns="attribute",
values="pct_change"
)
if df_pivot.shape[1] < 2:
return None, "Not enough attributes to compute correlation."
# Compute correlation matrix
return df_pivot.corr(), None
# ----------------- Streamlit UI for Attribute Correlation -----------------
# User selects region, commodity, and year range
# App computes and renders attribute correlation heatmap
with st.expander("Attribute Correlation Matrix (by Region & Years)", expanded=False):
st.header("Attribute Interdependence Correlation")
# Prompt to upload data first
if zip_file_1 is None:
st.info("Please upload at least one ZIP file to enable this analysis.")
else:
# Region mapping (for user-friendly dropdown)
country_mapping = {
"ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada",
"CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia",
"IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico",
"MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia",
"THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa"
}
# Dropdown for region (full name)
region_code_list = sorted(df["region"].dropna().unique())
region_options = {country_mapping.get(code, code): code for code in region_code_list}
selected_country_name = st.selectbox("Select Region", sorted(region_options.keys()), key="corr_region")
selected_region = region_options[selected_country_name]
# Dropdown for commodity
commodity_options = sorted(df["commodity"].dropna().unique())
selected_commodity = st.selectbox("Select Commodity", commodity_options, key="corr_commodity")
# Year range slider (minimum 3 years required)
min_year = 2023
max_year = int(df["year"].dropna().max())
start_year, end_year = st.slider(
"Select Year Range (minimum 3 years)",
min_value=min_year,
max_value=max_year,
value=(max(max_year - 2, min_year), max_year),
step=1,
key="corr_year_range"
)
if end_year - start_year + 1 < 3:
st.warning("Please select a range of at least 3 years.")
selected_years = []
else:
selected_years = list(range(start_year, end_year + 1))
# Compute correlation matrix
corr_matrix, err_msg = compute_attribute_correlation(df, selected_region, selected_commodity, selected_years)
if err_msg:
st.warning(err_msg)
else:
# Plot with Seaborn
fig, ax = plt.subplots(figsize=(7, 6))
# Create upper triangle mask
mask = np.triu(np.ones_like(corr_matrix, dtype=bool), k=1)
# Draw heatmap with annotation
sns.heatmap(
corr_matrix,
mask=mask,
cmap="coolwarm",
annot=True,
fmt=".2f",
center=0,
square=True,
linewidths=0.5,
cbar_kws={"shrink": 0.7},
ax=ax
)
ax.set_title(f"{country_mapping.get(selected_region, selected_region)} - {selected_commodity} ({start_year}–{end_year})",
fontsize=12, pad=10)
ax.tick_params(axis='x', labelrotation=45, labelsize=9)
ax.tick_params(axis='y', labelsize=9)
ax.set_xlabel("")
ax.set_ylabel("")
# Display the plot
st.pyplot(fig)