# ----------------- Module Imports ----------------- # Import standard Python and third-party packages used for data processing and visualization import zipfile import pandas as pd import altair as alt import numpy as np import streamlit as st import seaborn as sns import matplotlib.pyplot as plt from matplotlib.colors import LinearSegmentedColormap, TwoSlopeNorm import pydeck as pdk from typing import Optional # ----------------- DataFrame Filter Helper ----------------- # This function adds an interactive filtering UI for a DataFrame in Streamlit. # Users can toggle filtering via a checkbox and then choose columns to filter by. # It supports: # - categorical columns: multiselect # - numerical columns: range slider # - text columns: substring or regex matching def filter_dataframe(df: pd.DataFrame, key_prefix: str = "") -> pd.DataFrame: """Adds a UI on top of a dataframe to let viewers filter columns.""" modify = st.checkbox("Add filters", value=False, key=f"{key_prefix}_filter_toggle") if not modify: return df df = df.copy() modification_container = st.container() with modification_container: to_filter_columns = st.multiselect("Filter dataframe on", df.columns, key=f"{key_prefix}_filter_cols") for column in to_filter_columns: if pd.api.types.is_categorical_dtype(df[column]) or df[column].nunique() < 10: user_cat_input = st.multiselect( f"Values for {column}", df[column].unique(), default=df[column].unique(), key=f"{key_prefix}_{column}_cat" ) df = df[df[column].isin(user_cat_input)] elif pd.api.types.is_numeric_dtype(df[column]): _min = float(df[column].min()) _max = float(df[column].max()) step = (_max - _min) / 100 if _max != _min else 1 user_num_input = st.slider( f"Values for {column}", min_value=_min, max_value=_max, value=(_min, _max), step=step, key=f"{key_prefix}_{column}_num" ) df = df[df[column].between(*user_num_input)] else: user_text_input = st.text_input( f"Substring or regex in {column}", key=f"{key_prefix}_{column}_text" ) if user_text_input: df = df[df[column].astype(str).str.contains(user_text_input, na=False)] return df # ----------------- Streamlit Page Setup ----------------- # Set layout to wide and define the app title st.set_page_config(layout="wide") st.title("Interactive Dashboard") # Styled HTML introduction box shown at the top of the app st.markdown( """

Welcome to the Agricultural Trade Scenario Dashboard

This interactive dashboard allows you to explore and compare the impact of different policy scenarios on agricultural trade metrics over time and across regions.

🔒 File size limit: 5MB per ZIP. Ensure files are properly named and formatted.

""", unsafe_allow_html=True ) # ----------------- Upload Section----------------- # File uploader for Shock 1 and optional Shock 2 ZIP files zip_file_1 = st.file_uploader("Upload Shock 1 ZIP file", type="zip", key="zip1") zip_file_2 = st.file_uploader("Upload Shock 2 ZIP file (optional)", type="zip", key="zip2") # Define 20MB file size limit and validate each uploaded file MAX_FILE_SIZE = 20 * 1024 * 1024 # 20MB if zip_file_1 is not None and zip_file_1.size > MAX_FILE_SIZE: st.error("Shock 1 ZIP file exceeds 20MB. Please upload a smaller file.") zip_file_1 = None if zip_file_2 is not None and zip_file_2.size > MAX_FILE_SIZE: st.error("Shock 2 ZIP file exceeds 20MB. Please upload a smaller file.") zip_file_2 = None # ----------------- Load METADATA.csv from ZIP ----------------- # Load metadata file (METADATA.csv) from the given ZIP file @st.cache_data def load_metadata_from_zip(zip_file): if zip_file is None: return None try: with zipfile.ZipFile(zip_file, "r") as z: meta_file = [f for f in z.namelist() if "METADATA.csv" in f] if not meta_file: return None meta_df = pd.read_csv(z.open(meta_file[0])) return meta_df except Exception as e: return None # Display metadata for Shock 1 ZIP file (if available) metadata_1 = load_metadata_from_zip(zip_file_1) if metadata_1 is not None: st.markdown( "
" "
Metadata for Shock 1
", unsafe_allow_html=True, ) for _, row in metadata_1.iterrows(): st.markdown( f"
{row['field']}: {row['value']}
", unsafe_allow_html=True, ) st.markdown("
", unsafe_allow_html=True) elif zip_file_1 is not None: st.info("No metadata found in Shock 1 ZIP.") # Display metadata for Shock 2 ZIP file (if available) if zip_file_2 is not None: metadata_2 = load_metadata_from_zip(zip_file_2) if metadata_2 is not None: st.markdown( "
" "
Metadata for Shock 2
", unsafe_allow_html=True, ) for _, row in metadata_2.iterrows(): st.markdown( f"
{row['field']}: {row['value']}
", unsafe_allow_html=True, ) st.markdown("
", unsafe_allow_html=True) else: st.info("No metadata found in Shock 2 ZIP.") # ----------------- Load Baseline Table (data_targets_adjusted.csv) ----------------- # This function searches for files containing 'data_targets_adjusted' in the ZIP archive # and loads them as a baseline DataFrame (used for model calibration or projection reference) @st.cache_data def load_baseline_table(zip_file): if zip_file is None: return None try: with zipfile.ZipFile(zip_file, "r") as z: baseline_candidates = [f for f in z.namelist() if "data_targets_adjusted" in f] if not baseline_candidates: return None df_base = pd.read_csv(z.open(baseline_candidates[0])) return df_base except Exception: return None # ----------------- Force Filtering UI for Baseline Table ----------------- # Unlike filter_dataframe, this function does not require a checkbox toggle. # The filter UI is always visible to the user. def force_filter_dataframe(df: pd.DataFrame, key_prefix: str = "") -> pd.DataFrame: """Always show filter UI for a dataframe (no toggle checkbox).""" df = df.copy() modification_container = st.container() with modification_container: to_filter_columns = st.multiselect("Filter dataframe on", df.columns, key=f"{key_prefix}_filter_cols") for column in to_filter_columns: if pd.api.types.is_categorical_dtype(df[column]) or df[column].nunique() < 10: user_cat_input = st.multiselect( f"Values for {column}", df[column].unique(), default=df[column].unique(), key=f"{key_prefix}_{column}_cat" ) df = df[df[column].isin(user_cat_input)] elif pd.api.types.is_numeric_dtype(df[column]): _min = float(df[column].min()) _max = float(df[column].max()) step = (_max - _min) / 100 if _max != _min else 1 user_num_input = st.slider( f"Values for {column}", min_value=_min, max_value=_max, value=(_min, _max), step=step, key=f"{key_prefix}_{column}_num" ) df = df[df[column].between(*user_num_input)] else: user_text_input = st.text_input( f"Substring or regex in {column}", key=f"{key_prefix}_{column}_text" ) if user_text_input: df = df[df[column].astype(str).str.contains(user_text_input, na=False)] return df # ----------------- Load and Display Baseline Table ----------------- # Load baseline data for Shock 1 and optionally Shock 2. # Users can switch between the two baseline tables via radio buttons. baseline_df_1 = load_baseline_table(zip_file_1) baseline_df_2 = load_baseline_table(zip_file_2) if zip_file_2 else None if baseline_df_1 is not None or baseline_df_2 is not None: with st.expander("Baseline Table", expanded=False): selected_baseline = st.radio( "Select Baseline Source", options=["Shock 1", "Shock 2"], index=0 if baseline_df_1 is not None else 1, horizontal=True, key="baseline_radio" ) if selected_baseline == "Shock 1" and baseline_df_1 is not None: st.markdown("You can filter the baseline data from Shock 1 below:") st.dataframe(force_filter_dataframe(baseline_df_1, key_prefix="baseline1"), use_container_width=True) elif selected_baseline == "Shock 2" and baseline_df_2 is not None: st.markdown("You can filter the baseline data from Shock 2 below:") st.dataframe(force_filter_dataframe(baseline_df_2, key_prefix="baseline2"), use_container_width=True) else: st.warning(f"No baseline data found for {selected_baseline}.") elif zip_file_1 is not None: st.info("No `data_targets_adjusted.csv` found in Shock 1 ZIP.") # ----------------- Load data_shocks.csv ----------------- # This function loads policy shock values (ratios or multipliers applied to the baseline). @st.cache_data def load_shocks_from_zip(zip_file): if zip_file is None: return None try: with zipfile.ZipFile(zip_file, "r") as z: shock_file = [f for f in z.namelist() if "data_shocks.csv" in f] if not shock_file: return None df = pd.read_csv(z.open(shock_file[0])) return df except Exception as e: return None # ----------------- Display Policy Shock Variables ----------------- # Allow users to either view all shock variables or only those that differ from neutral (value != 1). def show_shock_value_only(df: pd.DataFrame, label: str): st.markdown(f"### Policy Shock Variables ({label})") show_option = st.radio( f"View Option for {label}", ["Only show changed (non-neutral) values", "Show all variables"], index=0, horizontal=True, key=f"shock_radio_{label}" ) if show_option == "Only show changed (non-neutral) values": if "value" in df.columns: filtered_df = df[df["value"] != 1] if filtered_df.empty: st.info(f"No changed values found in {label} (all value = 1).") else: st.dataframe(filter_dataframe(filtered_df, key_prefix=label), use_container_width=True) else: st.warning(f"No 'value' column found in {label}.") else: st.dataframe(filter_dataframe(df, key_prefix=label), use_container_width=True) # Load shock variables from both ZIPs shock_df_1 = load_shocks_from_zip(zip_file_1) shock_df_2 = load_shocks_from_zip(zip_file_2) # Display shock variable tables inside an expander panel with st.expander("Policy Shock Variables", expanded=False): if shock_df_1 is not None: show_shock_value_only(shock_df_1, "Shock 1") if zip_file_2 is not None and shock_df_2 is not None: show_shock_value_only(shock_df_2, "Shock 2") # ----------------- Load & Process ZIP Pair: Baseline + Scenario ----------------- # This function loads the baseline and scenario data from the ZIP files, # calculates the ratio or inferred ratio (shock effect), and merges all into one DataFrame. @st.cache_data def load_and_process_zip_pair(zip1, zip2): def load_zip(zip_obj): # Open and read ZIP file contents with zipfile.ZipFile(zip_obj, "r") as z: file_list = z.namelist() # Try to locate baseline and scenario files by naming pattern baseline_file = next((f for f in file_list if "baseline_projections.csv" in f or "data_targets_adjusted.csv" in f), None) scenario_file = next((f for f in file_list if "data_scenario_solution" in f or "data_solution.csv" in f), None) if not baseline_file or not scenario_file: raise ValueError("ZIP file must contain either 'baseline_projections.csv' or 'data_targets_adjusted.csv', and 'data_scenario_solution_*.csv' or 'data_solution.csv'.") # Load both baseline and scenario as DataFrames df_baseline = pd.read_csv(z.open(baseline_file)) df_scenario = pd.read_csv(z.open(scenario_file)) # Keep only relevant columns in baseline df_baseline = df_baseline[["region", "commodity", "year", "attribute", "value"]] # If scenario file already has a 'ratio' column, use it directly if "ratio" in df_scenario.columns: df_scenario = df_scenario[["region", "commodity", "year", "attribute", "ratio"]] # Otherwise, calculate ratio using scenario value / baseline value elif "value" in df_scenario.columns: df_scenario = df_scenario[["region", "commodity", "year", "attribute", "value"]] df_scenario.rename(columns={"value": "value_scenario"}, inplace=True) df_scenario = pd.merge( df_baseline, df_scenario, on=["region", "commodity", "year", "attribute"], how="left" ) df_scenario["ratio"] = df_scenario["value_scenario"] / df_scenario["value"] df_scenario = df_scenario[["region", "commodity", "year", "attribute", "ratio"]] else: raise ValueError("Scenario file must contain either a 'ratio' or 'value' column.") # Extract label for the scenario if "data_scenario_solution_" in scenario_file: shock_name = scenario_file.replace("data_scenario_solution_", "").replace(".csv", "") elif "data_solution.csv" in scenario_file: shock_name = "Scenario" else: shock_name = "Unknown" return df_baseline, df_scenario, shock_name # Load baseline and scenario data for Shock 1 df_base1, df_shock1, shock1_name = load_zip(zip1) # Merge baseline with scenario 1 and compute new values with shock df = pd.merge(df_base1, df_shock1, on=["region", "commodity", "year", "attribute"], how="left") df["value_with_shock1"] = df["value"] * df["ratio"] shock2_name = None # If Shock 2 is uploaded, merge its ratios too if zip2 is not None: _, df_shock2, shock2_name = load_zip(zip2) df = pd.merge(df, df_shock2[["region", "commodity", "year", "attribute", "ratio"]], on=["region", "commodity", "year", "attribute"], how="left", suffixes=('', '_shock2')) df["value_with_shock2"] = df["value"] * df["ratio_shock2"] # Clean-up: force attribute names to uppercase, and year to integer df["attribute"] = df["attribute"].str.upper() df["year"] = df["year"].astype(int) return df, shock1_name, shock2_name # ----------------- Run logic: only proceed if ZIP 1 is uploaded ----------------- if zip_file_1 is None: st.info("Please upload at least the Shock 1 ZIP file to begin.") else: # If files are ready, load and merge the datasets df, shock1_label, shock2_label = load_and_process_zip_pair(zip_file_1, zip_file_2) #----------------- TOP # REGION CHANGE BAR CHART ----------------- # This function extracts the top N regions with the largest absolute percent change for a given # commodity, attribute, and year. It supports both shock1 and shock2 scenario filtering. @st.cache_data def filter_topn_data(df, topn_commodity, topn_attribute, topn_year, shock="shock1", top_n=10): # Select relevant columns depending on which shock we're filtering relevant_cols = ["region", "commodity", "year", "attribute", "ratio", "value"] if "ratio_shock2" in df.columns and shock == "shock2": relevant_cols = ["region", "commodity", "year", "attribute", "ratio_shock2", "value"] df_filtered = df[relevant_cols].copy() # Rename for consistency if using shock2 if shock == "shock2" and "ratio_shock2" in df_filtered.columns: df_filtered = df_filtered.rename(columns={"ratio_shock2": "ratio"}) # Filter by user selections df_filtered = df_filtered[ (df_filtered["commodity"] == topn_commodity) & (df_filtered["attribute"] == topn_attribute) & (df_filtered["year"] == topn_year) ] # Calculate percent change and sort by absolute magnitude df_filtered["pct_change"] = (df_filtered["ratio"] - 1) * 100 df_filtered["abs_change"] = df_filtered["pct_change"].abs() df_filtered = df_filtered.dropna(subset=["pct_change"]) df_topn = df_filtered.sort_values("abs_change", ascending=False).head(top_n) return df_topn # ----------------- Build Top N Bar Chart ----------------- # Generates a horizontal bar chart using Altair for percent change by region @st.cache_resource def build_topn_bar_chart(topn_df): # Color scale: red for positive, blue for negative color_scale = alt.Scale(domain=[-100, 0, 100], range=["#4575b4", "white", "#d73027"]) # Mapping from region codes to full country names country_mapping = { "ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada", "CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia", "IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico", "MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia", "THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa" } topn_df = topn_df.copy() topn_df["region_fullname"] = topn_df["region"].map(country_mapping) topn_df["baseline_value"] = topn_df["value"] # Compute shocked value using the appropriate ratio if "ratio_shock2" in topn_df.columns: topn_df["shocked_value"] = topn_df["baseline_value"] * topn_df["ratio_shock2"] else: topn_df["shocked_value"] = topn_df["baseline_value"] * topn_df["ratio"] # Create the bar chart chart = alt.Chart(topn_df).mark_bar().encode( x=alt.X('pct_change:Q', title='Percent Change (%)'), y=alt.Y('region:N', sort='-x', title='Region'), color=alt.Color('pct_change:Q', scale=color_scale, legend=None), tooltip=[ alt.Tooltip('region_fullname:N', title='Region'), alt.Tooltip('pct_change:Q', title='Percent Change (%)', format=".2f"), alt.Tooltip('baseline_value:Q', title='Baseline Value', format=".2f"), alt.Tooltip('shocked_value:Q', title='Shocked Value', format=".2f") ] ).properties(width=700, height=400) return chart # ----------------- Streamlit UI for Top N Chart ----------------- # UI for user to select commodity, attribute, year, number of regions (Top N), # and shock scenario to visualize Top N regional percent changes with st.expander("Top # Regional Changes", expanded=False): st.header("Top # Regions with Largest Changes") st.markdown("**Note:** Top # is selected based on the largest absolute change (positive or negative).", unsafe_allow_html=True) # User selects filtering parameters topn_commodity = st.selectbox("Select Commodity for Top #", sorted(df["commodity"].unique()), key="topn_commodity") topn_attribute = st.selectbox("Select Attribute for Top #", sorted(df["attribute"].unique()), key="topn_attribute") topn_year = st.selectbox("Select Year for Top #", sorted([y for y in df["year"].dropna().unique() if y >= 2023]), key="topn_year") topn_n = st.slider("Select Top #", min_value=5, max_value=20, value=10) # Choose between Shock 1 and Shock 2 if both are available if shock2_label: shock_choice = st.radio("Select Shock Scenario", ["Shock 1", "Shock 2"], index=0, horizontal=True) shock_key = "shock1" if shock_choice == "Shock 1" else "shock2" else: shock_key = "shock1" # Filter topN data and render chart topn_df = filter_topn_data(df, topn_commodity, topn_attribute, topn_year, shock=shock_key, top_n=topn_n) if topn_df.empty: st.warning("No data available for the selected combination.") else: chart = build_topn_bar_chart(topn_df) st.altair_chart(chart, use_container_width=True) # ----------------- Filter Data for Line Chart ----------------- # Returns a subset of the main dataframe matching selected region, commodity, and attribute. @st.cache_data def filter_data(df, region_code, commodity, attribute): return df[ (df["region"] == region_code) & (df["commodity"] == commodity) & (df["attribute"] == attribute) ] # ----------------- Build Line Chart ----------------- # Draw a time series plot of baseline and shock scenarios. # Uses dashed/dotted lines and colors to distinguish each scenario. @st.cache_resource def build_line_chart(df_filtered, shock1_label, shock2_label): # Y-axis label with attribute name and assumed units y_label = df_filtered["attribute"].iloc[0].upper() + " (Unit: 1000 Metric tons)" # Normalize display labels shock1_display = "Shock 1" if shock1_label == "Scenario" else shock1_label shock2_display = "Shock 2" if shock2_label == "Scenario" else shock2_label # Line chart for baseline (solid) line_base = alt.Chart(df_filtered).mark_line(point=True, color='darkblue').encode( x=alt.X("year:O", title="Year"), y=alt.Y("value:Q", title=y_label), tooltip=["year", "value"] ) # Dashed line for Shock 1 (post-2023) line_shock1 = alt.Chart(df_filtered[df_filtered["year"] >= 2023]).mark_line( point=True, color='darkred', strokeDash=[4, 2] ).encode( x="year:O", y="value_with_shock1:Q", tooltip=["year", "value_with_shock1"] ) # Optional: dotted line for Shock 2 (post-2023) line_shock2 = None if "value_with_shock2" in df_filtered.columns: line_shock2 = alt.Chart(df_filtered[df_filtered["year"] >= 2023]).mark_line( point=True, color='darkgreen', strokeDash=[2, 2] ).encode( x="year:O", y="value_with_shock2:Q", tooltip=["year", "value_with_shock2"] ) # Build a manual legend using line styles and colors legend_items = [ {"Label": "Baseline", "LineType": "solid", "Color": "darkblue"}, {"Label": shock1_display, "LineType": "dash", "Color": "darkred"}, ] if shock2_label: legend_items.append({"Label": shock2_display, "LineType": "dot", "Color": "darkgreen"}) legend_df = pd.DataFrame(legend_items) legend_chart = alt.Chart(legend_df).mark_rule(size=3).encode( y=alt.Y("Label:N", axis=alt.Axis(title=" ")), color=alt.Color("Label:N", scale=alt.Scale(domain=legend_df["Label"].tolist(), range=legend_df["Color"].tolist()), legend=None), strokeDash=alt.StrokeDash("LineType:N", scale=alt.Scale(domain=["solid", "dash", "dot"], range=[[0], [4, 2], [2, 2]])) ).properties(width=100, height=40 * len(legend_items)) # Combine charts lines = line_base + line_shock1 if line_shock2: lines += line_shock2 return alt.hconcat(lines.properties(width=700, height=400), legend_chart) # ----------------- Country Code Mapping ----------------- # Used for translating 3-letter codes to full region names for user display country_mapping = { "ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada", "CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia", "IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico", "MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia", "THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa", } # ----------------- Streamlit UI for Line Chart ----------------- # UI for selecting country, commodity, and attribute, and rendering the time series line chart with st.expander("Line Chart", expanded=False): st.markdown("### Time Series: Baseline vs. Shocks") # Convert region codes to full names for display region_options = {v: k for k, v in country_mapping.items()} selected_country_name = st.selectbox("Select Region:", sorted(region_options.keys())) selected_region_code = region_options[selected_country_name] # Select commodity and attribute commodity = st.selectbox("Select Commodity:", sorted(df["commodity"].unique())) attribute = st.selectbox("Select Attribute:", sorted(df["attribute"].unique())) # Filter data for the selected combination df_filtered = filter_data(df, selected_region_code, commodity, attribute) # Render the line chart chart = build_line_chart(df_filtered, shock1_label, shock2_label) st.altair_chart(chart, use_container_width=True) # ----------------- Filter Data for Regional Bar Chart ----------------- # Filters the dataset for a selected commodity, attribute, and year @st.cache_data def filter_bar_df(df, bar_commodity, bar_attribute, bar_year): return df[ (df["commodity"] == bar_commodity) & (df["attribute"] == bar_attribute) & (df["year"] == bar_year) ] columns = ["region", "value", "ratio"] if "ratio_shock2" in df.columns: columns.append("ratio_shock2") columns.append("value_with_shock2") if "value_with_shock2" in df.columns else None return df[ (df["commodity"] == bar_commodity) & (df["attribute"] == bar_attribute) & (df["year"] == bar_year) ][columns].copy() # ----------------- Build Bar Chart ----------------- # Draws a bar chart comparing percent change in each region # Optionally shows side-by-side bars if both shocks are available @st.cache_resource def build_bar_chart(bar_df, shock1_label, shock2_label): # Label handling for display label1 = "Shock 1" if shock1_label == "Scenario" else shock1_label label2 = "Shock 2" if shock2_label == "Scenario" else shock2_label # Country name mapping country_mapping = { "ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada", "CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia", "IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico", "MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia", "THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa", } # Adjust label spacing and bar width depending on # of shocks if shock2_label: bar_size, font_size, offset_pos, offset_neg = 15, 10, 0.5, -0.6 else: bar_size, font_size, offset_pos, offset_neg = 25, 11, 1.0, -1.2 bar_df = bar_df.copy().dropna(subset=["ratio"]) bar_df["pct_change_1"] = (bar_df["ratio"] - 1) * 100 bar_df["shocked_value_1"] = bar_df["value"] * bar_df["ratio"] # Handle second shock if available if "ratio_shock2" in bar_df.columns and "value_with_shock2" in bar_df.columns: bar_df["pct_change_2"] = (bar_df["ratio_shock2"] - 1) * 100 bar_df["shocked_value_2"] = bar_df["value_with_shock2"] # Create two separate dataframes and then merge for side-by-side display df1 = bar_df[["region", "value", "shocked_value_1", "pct_change_1"]].copy() df1["Shock"] = label1 df1.rename(columns={ "value": "Baseline Value", "shocked_value_1": "Shocked Value", "pct_change_1": "Percent Change" }, inplace=True) df2 = bar_df[["region", "value", "shocked_value_2", "pct_change_2"]].copy() df2["Shock"] = label2 df2.rename(columns={ "value": "Baseline Value", "shocked_value_2": "Shocked Value", "pct_change_2": "Percent Change" }, inplace=True) bar_df_melt = pd.concat([df1, df2], ignore_index=True) else: # Only one shock scenario bar_df_melt = bar_df[["region", "value", "shocked_value_1", "pct_change_1"]].copy() bar_df_melt["Shock"] = label1 bar_df_melt.rename(columns={ "value": "Baseline Value", "shocked_value_1": "Shocked Value", "pct_change_1": "Percent Change" }, inplace=True) # Add full region names and label adjustment bar_df_melt["region_fullname"] = bar_df_melt["region"].map(country_mapping) bar_df_melt["show_label"] = True bar_df_melt["label_y"] = bar_df_melt["Percent Change"] + bar_df_melt["Percent Change"].apply( lambda x: offset_pos if x >= 0 else offset_neg ) # Allow toggling between shocks via legend selection = alt.selection_multi(fields=["Shock"], bind="legend") # Main bar chart base = alt.Chart(bar_df_melt).encode( x=alt.X( "region:N", title="Region", axis=alt.Axis(labelAngle=0), scale=alt.Scale(paddingInner=0.5, paddingOuter=0.1) ), xOffset=alt.XOffset("Shock:N", sort=[label1, label2] if shock2_label else [label1]), y=alt.Y("Percent Change:Q", title="Percent Change (%)"), color=alt.Color( "Shock:N", scale=alt.Scale(domain=[label1, label2] if shock2_label else [label1], range=["#6baed6", "#fd8d3c"] if shock2_label else ["#6baed6"]) ) ) bar = base.mark_bar(size=bar_size).encode( tooltip=[ alt.Tooltip("region_fullname:N", title="Region"), alt.Tooltip("Shock:N", title="Shock"), alt.Tooltip("Percent Change:Q", title="Percent Change (%)", format=".2f"), alt.Tooltip("Baseline Value:Q", title="Baseline Value", format=".2f"), alt.Tooltip("Shocked Value:Q", title="Shocked Value", format=".2f") ] ).add_selection(selection).transform_filter(selection) # Numeric labels on top of bars text = base.mark_text( align="center", fontSize=font_size ).encode( y="label_y:Q", text=alt.Text("Percent Change:Q", format=".1f") ).transform_filter("datum.show_label == true") return (bar + text).properties(width=700, height=400) # ----------------- Streamlit UI for Bar Chart ----------------- # User selects commodity, attribute, and year; chart shows percent changes across regions with st.expander("Bar Chart", expanded=False): st.header("Regional Percent Change Overview") # UI selections bar_commodity = st.selectbox("Select Commodity", sorted(df["commodity"].unique()), key="bar_commodity") bar_attribute = st.selectbox("Select Attribute", sorted(df["attribute"].unique()), key="bar_attribute") bar_year = st.selectbox("Select Year", sorted([y for y in df["year"].dropna().unique() if y >= 2023]), key="bar_year") # Filter and render chart bar_df = filter_bar_df(df, bar_commodity, bar_attribute, bar_year) if bar_df.empty: st.warning("No data available for the selected combination.") else: chart = build_bar_chart(bar_df, shock1_label, shock2_label) st.altair_chart(chart, use_container_width=True) # ----------------- Filter Data for Heatmap ----------------- # Extracts data for the selected year and attribute @st.cache_data def filter_heatmap_data(df, selected_year, selected_attribute_dynamic): return df[(df["year"] == selected_year) & (df["attribute"] == selected_attribute_dynamic)] # ----------------- Build Altair Heatmap(s) ----------------- # Builds one or two heatmaps (Shock 1 and optional Shock 2) # Each heatmap shows % change by region and commodity @st.cache_resource def build_heatmaps_altair(df_year, shock1_label, shock2_label): # Label setup label1 = "Shock 1" if shock1_label == "Scenario" else shock1_label label2 = "Shock 2" if shock2_label == "Scenario" else shock2_label # Region name mapping country_mapping = { "ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada", "CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia", "IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico", "MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia", "THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa" } charts = [] # Handle 1 or 2 shocks dynamically for label, ratio_col, shock_value_col in [ (label1, "ratio", "value_with_shock1"), (label2, "ratio_shock2", "value_with_shock2") ] if shock2_label else [(label1, "ratio", "value_with_shock1")]: # Skip if required columns are missing if ratio_col not in df_year.columns or shock_value_col not in df_year.columns: continue # Prepare data data = df_year[["region", "commodity", "value", ratio_col, shock_value_col]].copy() data["percent_change"] = (data[ratio_col] - 1) * 100 data["region_fullname"] = data["region"].map(country_mapping) data.rename(columns={ "value": "Baseline Value", shock_value_col: "Shocked Value" }, inplace=True) # Create the heatmap chart chart = alt.Chart(data).mark_rect().encode( x=alt.X('commodity:N', title='Commodity', sort=alt.EncodingSortField(field="commodity", order="ascending"), axis=alt.Axis(labelAngle=45)), y=alt.Y('region_fullname:N', title='Region', sort=alt.EncodingSortField(field="region_fullname", order="ascending")), color=alt.Color('percent_change:Q', scale=alt.Scale(domain=[-100, 0, 100], range=["#4575b4", "white", "#d73027"]), title="Percent Change (%)"), tooltip=[ alt.Tooltip('region_fullname:N', title='Region'), alt.Tooltip('commodity:N', title='Commodity'), alt.Tooltip('percent_change:Q', title='Percent Change (%)', format='.1f'), alt.Tooltip('Baseline Value:Q', title='Baseline Value', format=".2f"), alt.Tooltip('Shocked Value:Q', title='Shocked Value', format=".2f") ] ).properties( width=40 * data['commodity'].nunique(), height=40 * data['region'].nunique(), title=label ) charts.append(chart) # Combine charts horizontally if not charts: return None return alt.hconcat(*charts).resolve_scale(color='independent').configure_axis(grid=True) # ----------------- Streamlit UI for Heatmap ----------------- # User selects year and attribute, and the app renders 1 or 2 heatmaps with st.expander("Heatmap", expanded=False): st.header("Heatmap of Ratio by Selected Year") # Dropdowns for user input year_list = sorted([y for y in df["year"].dropna().unique() if y >= 2023]) selected_year = st.selectbox("Select Year for Heatmap", year_list) selected_attribute_dynamic = st.selectbox("Select Attribute for Yearly Heatmap", sorted(df["attribute"].unique()), key="yearly_attr") # Filter dataset df_year = filter_heatmap_data(df, selected_year, selected_attribute_dynamic) if df_year.empty: st.warning("No data available for the selected combination.") else: chart = build_heatmaps_altair(df_year, shock1_label, shock2_label) if chart is None: st.warning("No valid data to display.") else: st.altair_chart(chart, use_container_width=True) # ----------------- Ensure Percent Change Column Exists ----------------- # If not already created, compute percent change from ratio column. if "pct_change" not in df.columns and "ratio" in df.columns: df["pct_change"] = (df["ratio"] - 1) * 100 # ----------------- Compute Correlation Matrix ----------------- # Filters data by region, commodity, and years # Then pivots the table to wide format and computes pairwise correlations @st.cache_data def compute_attribute_correlation(df, region, commodity, years): # Filter by region, commodity, and selected years df_filtered = df[ (df["region"] == region) & (df["commodity"] == commodity) & (df["year"].isin(years)) ] if df_filtered.empty: return None, "❗ No data available for the selected filters." # Pivot the data: attributes as columns, year as index df_pivot = df_filtered.pivot_table( index="year", columns="attribute", values="pct_change" ) if df_pivot.shape[1] < 2: return None, "Not enough attributes to compute correlation." # Compute correlation matrix return df_pivot.corr(), None # ----------------- Streamlit UI for Attribute Correlation ----------------- # User selects region, commodity, and year range # App computes and renders attribute correlation heatmap with st.expander("Attribute Correlation Matrix (by Region & Years)", expanded=False): st.header("Attribute Interdependence Correlation") # Prompt to upload data first if zip_file_1 is None: st.info("Please upload at least one ZIP file to enable this analysis.") else: # Region mapping (for user-friendly dropdown) country_mapping = { "ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada", "CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia", "IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico", "MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia", "THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa" } # Dropdown for region (full name) region_code_list = sorted(df["region"].dropna().unique()) region_options = {country_mapping.get(code, code): code for code in region_code_list} selected_country_name = st.selectbox("Select Region", sorted(region_options.keys()), key="corr_region") selected_region = region_options[selected_country_name] # Dropdown for commodity commodity_options = sorted(df["commodity"].dropna().unique()) selected_commodity = st.selectbox("Select Commodity", commodity_options, key="corr_commodity") # Year range slider (minimum 3 years required) min_year = 2023 max_year = int(df["year"].dropna().max()) start_year, end_year = st.slider( "Select Year Range (minimum 3 years)", min_value=min_year, max_value=max_year, value=(max(max_year - 2, min_year), max_year), step=1, key="corr_year_range" ) if end_year - start_year + 1 < 3: st.warning("Please select a range of at least 3 years.") selected_years = [] else: selected_years = list(range(start_year, end_year + 1)) # Compute correlation matrix corr_matrix, err_msg = compute_attribute_correlation(df, selected_region, selected_commodity, selected_years) if err_msg: st.warning(err_msg) else: # Plot with Seaborn fig, ax = plt.subplots(figsize=(7, 6)) # Create upper triangle mask mask = np.triu(np.ones_like(corr_matrix, dtype=bool), k=1) # Draw heatmap with annotation sns.heatmap( corr_matrix, mask=mask, cmap="coolwarm", annot=True, fmt=".2f", center=0, square=True, linewidths=0.5, cbar_kws={"shrink": 0.7}, ax=ax ) ax.set_title(f"{country_mapping.get(selected_region, selected_region)} - {selected_commodity} ({start_year}–{end_year})", fontsize=12, pad=10) ax.tick_params(axis='x', labelrotation=45, labelsize=9) ax.tick_params(axis='y', labelsize=9) ax.set_xlabel("") ax.set_ylabel("") # Display the plot st.pyplot(fig)