Spaces:
Sleeping
Sleeping
| # ----------------- Module Imports ----------------- | |
| # Import standard Python and third-party packages used for data processing and visualization | |
| import zipfile | |
| import pandas as pd | |
| import altair as alt | |
| import numpy as np | |
| import streamlit as st | |
| import seaborn as sns | |
| import matplotlib.pyplot as plt | |
| from matplotlib.colors import LinearSegmentedColormap, TwoSlopeNorm | |
| import pydeck as pdk | |
| from typing import Optional | |
| # ----------------- DataFrame Filter Helper ----------------- | |
| # This function adds an interactive filtering UI for a DataFrame in Streamlit. | |
| # Users can toggle filtering via a checkbox and then choose columns to filter by. | |
| # It supports: | |
| # - categorical columns: multiselect | |
| # - numerical columns: range slider | |
| # - text columns: substring or regex matching | |
| def filter_dataframe(df: pd.DataFrame, key_prefix: str = "") -> pd.DataFrame: | |
| """Adds a UI on top of a dataframe to let viewers filter columns.""" | |
| modify = st.checkbox("Add filters", value=False, key=f"{key_prefix}_filter_toggle") | |
| if not modify: | |
| return df | |
| df = df.copy() | |
| modification_container = st.container() | |
| with modification_container: | |
| to_filter_columns = st.multiselect("Filter dataframe on", df.columns, key=f"{key_prefix}_filter_cols") | |
| for column in to_filter_columns: | |
| if pd.api.types.is_categorical_dtype(df[column]) or df[column].nunique() < 10: | |
| user_cat_input = st.multiselect( | |
| f"Values for {column}", | |
| df[column].unique(), | |
| default=df[column].unique(), | |
| key=f"{key_prefix}_{column}_cat" | |
| ) | |
| df = df[df[column].isin(user_cat_input)] | |
| elif pd.api.types.is_numeric_dtype(df[column]): | |
| _min = float(df[column].min()) | |
| _max = float(df[column].max()) | |
| step = (_max - _min) / 100 if _max != _min else 1 | |
| user_num_input = st.slider( | |
| f"Values for {column}", | |
| min_value=_min, | |
| max_value=_max, | |
| value=(_min, _max), | |
| step=step, | |
| key=f"{key_prefix}_{column}_num" | |
| ) | |
| df = df[df[column].between(*user_num_input)] | |
| else: | |
| user_text_input = st.text_input( | |
| f"Substring or regex in {column}", | |
| key=f"{key_prefix}_{column}_text" | |
| ) | |
| if user_text_input: | |
| df = df[df[column].astype(str).str.contains(user_text_input, na=False)] | |
| return df | |
| # ----------------- Streamlit Page Setup ----------------- | |
| # Set layout to wide and define the app title | |
| st.set_page_config(layout="wide") | |
| st.title("Interactive Dashboard") | |
| # Styled HTML introduction box shown at the top of the app | |
| st.markdown( | |
| """ | |
| <div style='padding: 10px 20px; background-color: #f0f8ff; border-radius: 10px; margin-bottom: 20px;'> | |
| <h3 style='color: #1f4e79;'> Welcome to the Agricultural Trade Scenario Dashboard</h3> | |
| <p style='font-size: 16px; line-height: 1.6; color: #333;'> | |
| This interactive dashboard allows you to explore and compare the impact of different policy scenarios | |
| on agricultural trade metrics over time and across regions. | |
| </p> | |
| <ul style='font-size: 15px; color: #444;'> | |
| <li><b>Step 1:</b> Upload one or two ZIP files | |
| <li><b>Step 2:</b> Navigate through the tabs to view <b>line charts</b>, <b>regional bar charts</b>, </b>Top # Bar Charts</b>, <b>heatmaps</b> and </b>Correlation Matrix</b> .</li> | |
| <li><b>Optional:</b> Uploading a second ZIP enables comparison of two shock scenarios.</li> | |
| </ul> | |
| <p style='font-size: 15px; color: #666;'> | |
| ๐ File size limit: <b>5MB</b> per ZIP. Ensure files are properly named and formatted. | |
| </p> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| # ----------------- Upload Section----------------- | |
| # File uploader for Shock 1 and optional Shock 2 ZIP files | |
| zip_file_1 = st.file_uploader("Upload Shock 1 ZIP file", type="zip", key="zip1") | |
| zip_file_2 = st.file_uploader("Upload Shock 2 ZIP file (optional)", type="zip", key="zip2") | |
| # Define 20MB file size limit and validate each uploaded file | |
| MAX_FILE_SIZE = 20 * 1024 * 1024 # 20MB | |
| if zip_file_1 is not None and zip_file_1.size > MAX_FILE_SIZE: | |
| st.error("Shock 1 ZIP file exceeds 20MB. Please upload a smaller file.") | |
| zip_file_1 = None | |
| if zip_file_2 is not None and zip_file_2.size > MAX_FILE_SIZE: | |
| st.error("Shock 2 ZIP file exceeds 20MB. Please upload a smaller file.") | |
| zip_file_2 = None | |
| # ----------------- Load METADATA.csv from ZIP ----------------- | |
| # Load metadata file (METADATA.csv) from the given ZIP file | |
| def load_metadata_from_zip(zip_file): | |
| if zip_file is None: | |
| return None | |
| try: | |
| with zipfile.ZipFile(zip_file, "r") as z: | |
| meta_file = [f for f in z.namelist() if "METADATA.csv" in f] | |
| if not meta_file: | |
| return None | |
| meta_df = pd.read_csv(z.open(meta_file[0])) | |
| return meta_df | |
| except Exception as e: | |
| return None | |
| # Display metadata for Shock 1 ZIP file (if available) | |
| metadata_1 = load_metadata_from_zip(zip_file_1) | |
| if metadata_1 is not None: | |
| st.markdown( | |
| "<div style='margin-top: 10px; padding: 10px; border-left: 4px solid #1f77b4; background-color: #f9f9f9;'>" | |
| "<h5 style='color: #1f77b4;'> Metadata for Shock 1</h5>", | |
| unsafe_allow_html=True, | |
| ) | |
| for _, row in metadata_1.iterrows(): | |
| st.markdown( | |
| f"<div style='padding-left:15px; font-size:15px;'><b>{row['field']}</b>: {row['value']}</div>", | |
| unsafe_allow_html=True, | |
| ) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| elif zip_file_1 is not None: | |
| st.info("No metadata found in Shock 1 ZIP.") | |
| # Display metadata for Shock 2 ZIP file (if available) | |
| if zip_file_2 is not None: | |
| metadata_2 = load_metadata_from_zip(zip_file_2) | |
| if metadata_2 is not None: | |
| st.markdown( | |
| "<div style='margin-top: 10px; padding: 10px; border-left: 4px solid #ff7f0e; background-color: #f9f9f9;'>" | |
| "<h5 style='color: #ff7f0e;'> Metadata for Shock 2</h5>", | |
| unsafe_allow_html=True, | |
| ) | |
| for _, row in metadata_2.iterrows(): | |
| st.markdown( | |
| f"<div style='padding-left:15px; font-size:15px;'><b>{row['field']}</b>: {row['value']}</div>", | |
| unsafe_allow_html=True, | |
| ) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| else: | |
| st.info("No metadata found in Shock 2 ZIP.") | |
| # ----------------- Load Baseline Table (data_targets_adjusted.csv) ----------------- | |
| # This function searches for files containing 'data_targets_adjusted' in the ZIP archive | |
| # and loads them as a baseline DataFrame (used for model calibration or projection reference) | |
| def load_baseline_table(zip_file): | |
| if zip_file is None: | |
| return None | |
| try: | |
| with zipfile.ZipFile(zip_file, "r") as z: | |
| baseline_candidates = [f for f in z.namelist() if "data_targets_adjusted" in f] | |
| if not baseline_candidates: | |
| return None | |
| df_base = pd.read_csv(z.open(baseline_candidates[0])) | |
| return df_base | |
| except Exception: | |
| return None | |
| # ----------------- Force Filtering UI for Baseline Table ----------------- | |
| # Unlike filter_dataframe, this function does not require a checkbox toggle. | |
| # The filter UI is always visible to the user. | |
| def force_filter_dataframe(df: pd.DataFrame, key_prefix: str = "") -> pd.DataFrame: | |
| """Always show filter UI for a dataframe (no toggle checkbox).""" | |
| df = df.copy() | |
| modification_container = st.container() | |
| with modification_container: | |
| to_filter_columns = st.multiselect("Filter dataframe on", df.columns, key=f"{key_prefix}_filter_cols") | |
| for column in to_filter_columns: | |
| if pd.api.types.is_categorical_dtype(df[column]) or df[column].nunique() < 10: | |
| user_cat_input = st.multiselect( | |
| f"Values for {column}", | |
| df[column].unique(), | |
| default=df[column].unique(), | |
| key=f"{key_prefix}_{column}_cat" | |
| ) | |
| df = df[df[column].isin(user_cat_input)] | |
| elif pd.api.types.is_numeric_dtype(df[column]): | |
| _min = float(df[column].min()) | |
| _max = float(df[column].max()) | |
| step = (_max - _min) / 100 if _max != _min else 1 | |
| user_num_input = st.slider( | |
| f"Values for {column}", | |
| min_value=_min, | |
| max_value=_max, | |
| value=(_min, _max), | |
| step=step, | |
| key=f"{key_prefix}_{column}_num" | |
| ) | |
| df = df[df[column].between(*user_num_input)] | |
| else: | |
| user_text_input = st.text_input( | |
| f"Substring or regex in {column}", | |
| key=f"{key_prefix}_{column}_text" | |
| ) | |
| if user_text_input: | |
| df = df[df[column].astype(str).str.contains(user_text_input, na=False)] | |
| return df | |
| # ----------------- Load and Display Baseline Table ----------------- | |
| # Load baseline data for Shock 1 and optionally Shock 2. | |
| # Users can switch between the two baseline tables via radio buttons. | |
| baseline_df_1 = load_baseline_table(zip_file_1) | |
| baseline_df_2 = load_baseline_table(zip_file_2) if zip_file_2 else None | |
| if baseline_df_1 is not None or baseline_df_2 is not None: | |
| with st.expander("Baseline Table", expanded=False): | |
| selected_baseline = st.radio( | |
| "Select Baseline Source", | |
| options=["Shock 1", "Shock 2"], | |
| index=0 if baseline_df_1 is not None else 1, | |
| horizontal=True, | |
| key="baseline_radio" | |
| ) | |
| if selected_baseline == "Shock 1" and baseline_df_1 is not None: | |
| st.markdown("You can filter the baseline data from Shock 1 below:") | |
| st.dataframe(force_filter_dataframe(baseline_df_1, key_prefix="baseline1"), use_container_width=True) | |
| elif selected_baseline == "Shock 2" and baseline_df_2 is not None: | |
| st.markdown("You can filter the baseline data from Shock 2 below:") | |
| st.dataframe(force_filter_dataframe(baseline_df_2, key_prefix="baseline2"), use_container_width=True) | |
| else: | |
| st.warning(f"No baseline data found for {selected_baseline}.") | |
| elif zip_file_1 is not None: | |
| st.info("No `data_targets_adjusted.csv` found in Shock 1 ZIP.") | |
| # ----------------- Load data_shocks.csv ----------------- | |
| # This function loads policy shock values (ratios or multipliers applied to the baseline). | |
| def load_shocks_from_zip(zip_file): | |
| if zip_file is None: | |
| return None | |
| try: | |
| with zipfile.ZipFile(zip_file, "r") as z: | |
| shock_file = [f for f in z.namelist() if "data_shocks.csv" in f] | |
| if not shock_file: | |
| return None | |
| df = pd.read_csv(z.open(shock_file[0])) | |
| return df | |
| except Exception as e: | |
| return None | |
| # ----------------- Display Policy Shock Variables ----------------- | |
| # Allow users to either view all shock variables or only those that differ from neutral (value != 1). | |
| def show_shock_value_only(df: pd.DataFrame, label: str): | |
| st.markdown(f"### Policy Shock Variables ({label})") | |
| show_option = st.radio( | |
| f"View Option for {label}", | |
| ["Only show changed (non-neutral) values", "Show all variables"], | |
| index=0, | |
| horizontal=True, | |
| key=f"shock_radio_{label}" | |
| ) | |
| if show_option == "Only show changed (non-neutral) values": | |
| if "value" in df.columns: | |
| filtered_df = df[df["value"] != 1] | |
| if filtered_df.empty: | |
| st.info(f"No changed values found in {label} (all value = 1).") | |
| else: | |
| st.dataframe(filter_dataframe(filtered_df, key_prefix=label), use_container_width=True) | |
| else: | |
| st.warning(f"No 'value' column found in {label}.") | |
| else: | |
| st.dataframe(filter_dataframe(df, key_prefix=label), use_container_width=True) | |
| # Load shock variables from both ZIPs | |
| shock_df_1 = load_shocks_from_zip(zip_file_1) | |
| shock_df_2 = load_shocks_from_zip(zip_file_2) | |
| # Display shock variable tables inside an expander panel | |
| with st.expander("Policy Shock Variables", expanded=False): | |
| if shock_df_1 is not None: | |
| show_shock_value_only(shock_df_1, "Shock 1") | |
| if zip_file_2 is not None and shock_df_2 is not None: | |
| show_shock_value_only(shock_df_2, "Shock 2") | |
| # ----------------- Load & Process ZIP Pair: Baseline + Scenario ----------------- | |
| # This function loads the baseline and scenario data from the ZIP files, | |
| # calculates the ratio or inferred ratio (shock effect), and merges all into one DataFrame. | |
| def load_and_process_zip_pair(zip1, zip2): | |
| def load_zip(zip_obj): | |
| # Open and read ZIP file contents | |
| with zipfile.ZipFile(zip_obj, "r") as z: | |
| file_list = z.namelist() | |
| # Try to locate baseline and scenario files by naming pattern | |
| baseline_file = next((f for f in file_list if "baseline_projections.csv" in f or "data_targets_adjusted.csv" in f), None) | |
| scenario_file = next((f for f in file_list if "data_scenario_solution" in f or "data_solution.csv" in f), None) | |
| if not baseline_file or not scenario_file: | |
| raise ValueError("ZIP file must contain either 'baseline_projections.csv' or 'data_targets_adjusted.csv', and 'data_scenario_solution_*.csv' or 'data_solution.csv'.") | |
| # Load both baseline and scenario as DataFrames | |
| df_baseline = pd.read_csv(z.open(baseline_file)) | |
| df_scenario = pd.read_csv(z.open(scenario_file)) | |
| # Keep only relevant columns in baseline | |
| df_baseline = df_baseline[["region", "commodity", "year", "attribute", "value"]] | |
| # If scenario file already has a 'ratio' column, use it directly | |
| if "ratio" in df_scenario.columns: | |
| df_scenario = df_scenario[["region", "commodity", "year", "attribute", "ratio"]] | |
| # Otherwise, calculate ratio using scenario value / baseline value | |
| elif "value" in df_scenario.columns: | |
| df_scenario = df_scenario[["region", "commodity", "year", "attribute", "value"]] | |
| df_scenario.rename(columns={"value": "value_scenario"}, inplace=True) | |
| df_scenario = pd.merge( | |
| df_baseline, | |
| df_scenario, | |
| on=["region", "commodity", "year", "attribute"], | |
| how="left" | |
| ) | |
| df_scenario["ratio"] = df_scenario["value_scenario"] / df_scenario["value"] | |
| df_scenario = df_scenario[["region", "commodity", "year", "attribute", "ratio"]] | |
| else: | |
| raise ValueError("Scenario file must contain either a 'ratio' or 'value' column.") | |
| # Extract label for the scenario | |
| if "data_scenario_solution_" in scenario_file: | |
| shock_name = scenario_file.replace("data_scenario_solution_", "").replace(".csv", "") | |
| elif "data_solution.csv" in scenario_file: | |
| shock_name = "Scenario" | |
| else: | |
| shock_name = "Unknown" | |
| return df_baseline, df_scenario, shock_name | |
| # Load baseline and scenario data for Shock 1 | |
| df_base1, df_shock1, shock1_name = load_zip(zip1) | |
| # Merge baseline with scenario 1 and compute new values with shock | |
| df = pd.merge(df_base1, df_shock1, on=["region", "commodity", "year", "attribute"], how="left") | |
| df["value_with_shock1"] = df["value"] * df["ratio"] | |
| shock2_name = None | |
| # If Shock 2 is uploaded, merge its ratios too | |
| if zip2 is not None: | |
| _, df_shock2, shock2_name = load_zip(zip2) | |
| df = pd.merge(df, df_shock2[["region", "commodity", "year", "attribute", "ratio"]], | |
| on=["region", "commodity", "year", "attribute"], | |
| how="left", suffixes=('', '_shock2')) | |
| df["value_with_shock2"] = df["value"] * df["ratio_shock2"] | |
| # Clean-up: force attribute names to uppercase, and year to integer | |
| df["attribute"] = df["attribute"].str.upper() | |
| df["year"] = df["year"].astype(int) | |
| return df, shock1_name, shock2_name | |
| # ----------------- Run logic: only proceed if ZIP 1 is uploaded ----------------- | |
| if zip_file_1 is None: | |
| st.info("Please upload at least the Shock 1 ZIP file to begin.") | |
| else: | |
| # If files are ready, load and merge the datasets | |
| df, shock1_label, shock2_label = load_and_process_zip_pair(zip_file_1, zip_file_2) | |
| #----------------- TOP # REGION CHANGE BAR CHART ----------------- | |
| # This function extracts the top N regions with the largest absolute percent change for a given | |
| # commodity, attribute, and year. It supports both shock1 and shock2 scenario filtering. | |
| def filter_topn_data(df, topn_commodity, topn_attribute, topn_year, shock="shock1", top_n=10): | |
| # Select relevant columns depending on which shock we're filtering | |
| relevant_cols = ["region", "commodity", "year", "attribute", "ratio", "value"] | |
| if "ratio_shock2" in df.columns and shock == "shock2": | |
| relevant_cols = ["region", "commodity", "year", "attribute", "ratio_shock2", "value"] | |
| df_filtered = df[relevant_cols].copy() | |
| # Rename for consistency if using shock2 | |
| if shock == "shock2" and "ratio_shock2" in df_filtered.columns: | |
| df_filtered = df_filtered.rename(columns={"ratio_shock2": "ratio"}) | |
| # Filter by user selections | |
| df_filtered = df_filtered[ | |
| (df_filtered["commodity"] == topn_commodity) & | |
| (df_filtered["attribute"] == topn_attribute) & | |
| (df_filtered["year"] == topn_year) | |
| ] | |
| # Calculate percent change and sort by absolute magnitude | |
| df_filtered["pct_change"] = (df_filtered["ratio"] - 1) * 100 | |
| df_filtered["abs_change"] = df_filtered["pct_change"].abs() | |
| df_filtered = df_filtered.dropna(subset=["pct_change"]) | |
| df_topn = df_filtered.sort_values("abs_change", ascending=False).head(top_n) | |
| return df_topn | |
| # ----------------- Build Top N Bar Chart ----------------- | |
| # Generates a horizontal bar chart using Altair for percent change by region | |
| def build_topn_bar_chart(topn_df): | |
| # Color scale: red for positive, blue for negative | |
| color_scale = alt.Scale(domain=[-100, 0, 100], range=["#4575b4", "white", "#d73027"]) | |
| # Mapping from region codes to full country names | |
| country_mapping = { | |
| "ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada", | |
| "CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia", | |
| "IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico", | |
| "MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia", | |
| "THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa" | |
| } | |
| topn_df = topn_df.copy() | |
| topn_df["region_fullname"] = topn_df["region"].map(country_mapping) | |
| topn_df["baseline_value"] = topn_df["value"] | |
| # Compute shocked value using the appropriate ratio | |
| if "ratio_shock2" in topn_df.columns: | |
| topn_df["shocked_value"] = topn_df["baseline_value"] * topn_df["ratio_shock2"] | |
| else: | |
| topn_df["shocked_value"] = topn_df["baseline_value"] * topn_df["ratio"] | |
| # Create the bar chart | |
| chart = alt.Chart(topn_df).mark_bar().encode( | |
| x=alt.X('pct_change:Q', title='Percent Change (%)'), | |
| y=alt.Y('region:N', sort='-x', title='Region'), | |
| color=alt.Color('pct_change:Q', scale=color_scale, legend=None), | |
| tooltip=[ | |
| alt.Tooltip('region_fullname:N', title='Region'), | |
| alt.Tooltip('pct_change:Q', title='Percent Change (%)', format=".2f"), | |
| alt.Tooltip('baseline_value:Q', title='Baseline Value', format=".2f"), | |
| alt.Tooltip('shocked_value:Q', title='Shocked Value', format=".2f") | |
| ] | |
| ).properties(width=700, height=400) | |
| return chart | |
| # ----------------- Streamlit UI for Top N Chart ----------------- | |
| # UI for user to select commodity, attribute, year, number of regions (Top N), | |
| # and shock scenario to visualize Top N regional percent changes | |
| with st.expander("Top # Regional Changes", expanded=False): | |
| st.header("Top # Regions with Largest Changes") | |
| st.markdown("**Note:** Top # is selected based on the largest absolute change (positive or negative).", unsafe_allow_html=True) | |
| # User selects filtering parameters | |
| topn_commodity = st.selectbox("Select Commodity for Top #", sorted(df["commodity"].unique()), key="topn_commodity") | |
| topn_attribute = st.selectbox("Select Attribute for Top #", sorted(df["attribute"].unique()), key="topn_attribute") | |
| topn_year = st.selectbox("Select Year for Top #", sorted([y for y in df["year"].dropna().unique() if y >= 2023]), key="topn_year") | |
| topn_n = st.slider("Select Top #", min_value=5, max_value=20, value=10) | |
| # Choose between Shock 1 and Shock 2 if both are available | |
| if shock2_label: | |
| shock_choice = st.radio("Select Shock Scenario", ["Shock 1", "Shock 2"], index=0, horizontal=True) | |
| shock_key = "shock1" if shock_choice == "Shock 1" else "shock2" | |
| else: | |
| shock_key = "shock1" | |
| # Filter topN data and render chart | |
| topn_df = filter_topn_data(df, topn_commodity, topn_attribute, topn_year, shock=shock_key, top_n=topn_n) | |
| if topn_df.empty: | |
| st.warning("No data available for the selected combination.") | |
| else: | |
| chart = build_topn_bar_chart(topn_df) | |
| st.altair_chart(chart, use_container_width=True) | |
| # ----------------- Filter Data for Line Chart ----------------- | |
| # Returns a subset of the main dataframe matching selected region, commodity, and attribute. | |
| def filter_data(df, region_code, commodity, attribute): | |
| return df[ | |
| (df["region"] == region_code) & | |
| (df["commodity"] == commodity) & | |
| (df["attribute"] == attribute) | |
| ] | |
| # ----------------- Build Line Chart ----------------- | |
| # Draw a time series plot of baseline and shock scenarios. | |
| # Uses dashed/dotted lines and colors to distinguish each scenario. | |
| def build_line_chart(df_filtered, shock1_label, shock2_label): | |
| # Y-axis label with attribute name and assumed units | |
| y_label = df_filtered["attribute"].iloc[0].upper() + " (Unit: 1000 Metric tons)" | |
| # Normalize display labels | |
| shock1_display = "Shock 1" if shock1_label == "Scenario" else shock1_label | |
| shock2_display = "Shock 2" if shock2_label == "Scenario" else shock2_label | |
| # Line chart for baseline (solid) | |
| line_base = alt.Chart(df_filtered).mark_line(point=True, color='darkblue').encode( | |
| x=alt.X("year:O", title="Year"), | |
| y=alt.Y("value:Q", title=y_label), | |
| tooltip=["year", "value"] | |
| ) | |
| # Dashed line for Shock 1 (post-2023) | |
| line_shock1 = alt.Chart(df_filtered[df_filtered["year"] >= 2023]).mark_line( | |
| point=True, color='darkred', strokeDash=[4, 2] | |
| ).encode( | |
| x="year:O", | |
| y="value_with_shock1:Q", | |
| tooltip=["year", "value_with_shock1"] | |
| ) | |
| # Optional: dotted line for Shock 2 (post-2023) | |
| line_shock2 = None | |
| if "value_with_shock2" in df_filtered.columns: | |
| line_shock2 = alt.Chart(df_filtered[df_filtered["year"] >= 2023]).mark_line( | |
| point=True, color='darkgreen', strokeDash=[2, 2] | |
| ).encode( | |
| x="year:O", | |
| y="value_with_shock2:Q", | |
| tooltip=["year", "value_with_shock2"] | |
| ) | |
| # Build a manual legend using line styles and colors | |
| legend_items = [ | |
| {"Label": "Baseline", "LineType": "solid", "Color": "darkblue"}, | |
| {"Label": shock1_display, "LineType": "dash", "Color": "darkred"}, | |
| ] | |
| if shock2_label: | |
| legend_items.append({"Label": shock2_display, "LineType": "dot", "Color": "darkgreen"}) | |
| legend_df = pd.DataFrame(legend_items) | |
| legend_chart = alt.Chart(legend_df).mark_rule(size=3).encode( | |
| y=alt.Y("Label:N", axis=alt.Axis(title=" ")), | |
| color=alt.Color("Label:N", scale=alt.Scale(domain=legend_df["Label"].tolist(), | |
| range=legend_df["Color"].tolist()), legend=None), | |
| strokeDash=alt.StrokeDash("LineType:N", scale=alt.Scale(domain=["solid", "dash", "dot"], range=[[0], [4, 2], [2, 2]])) | |
| ).properties(width=100, height=40 * len(legend_items)) | |
| # Combine charts | |
| lines = line_base + line_shock1 | |
| if line_shock2: | |
| lines += line_shock2 | |
| return alt.hconcat(lines.properties(width=700, height=400), legend_chart) | |
| # ----------------- Country Code Mapping ----------------- | |
| # Used for translating 3-letter codes to full region names for user display | |
| country_mapping = { | |
| "ARG": "Argentina", | |
| "AUS": "Australia", | |
| "BRA": "Brazil", | |
| "CAN": "Canada", | |
| "CHN": "China", | |
| "EGY": "Egypt", | |
| "EUR": "European Union", | |
| "IDN": "Indonesia", | |
| "IND": "India", | |
| "JPN": "Japan", | |
| "KOR": "South Korea", | |
| "MEX": "Mexico", | |
| "MYS": "Malaysia", | |
| "PHL": "Philippines", | |
| "ROW": "Rest of World", | |
| "RUS": "Russia", | |
| "THA": "Thailand", | |
| "UKR": "Ukraine", | |
| "USA": "United States", | |
| "ZAF": "South Africa", | |
| } | |
| # ----------------- Streamlit UI for Line Chart ----------------- | |
| # UI for selecting country, commodity, and attribute, and rendering the time series line chart | |
| with st.expander("Line Chart", expanded=False): | |
| st.markdown("### Time Series: Baseline vs. Shocks") | |
| # Convert region codes to full names for display | |
| region_options = {v: k for k, v in country_mapping.items()} | |
| selected_country_name = st.selectbox("Select Region:", sorted(region_options.keys())) | |
| selected_region_code = region_options[selected_country_name] | |
| # Select commodity and attribute | |
| commodity = st.selectbox("Select Commodity:", sorted(df["commodity"].unique())) | |
| attribute = st.selectbox("Select Attribute:", sorted(df["attribute"].unique())) | |
| # Filter data for the selected combination | |
| df_filtered = filter_data(df, selected_region_code, commodity, attribute) | |
| # Render the line chart | |
| chart = build_line_chart(df_filtered, shock1_label, shock2_label) | |
| st.altair_chart(chart, use_container_width=True) | |
| # ----------------- Filter Data for Regional Bar Chart ----------------- | |
| # Filters the dataset for a selected commodity, attribute, and year | |
| def filter_bar_df(df, bar_commodity, bar_attribute, bar_year): | |
| return df[ | |
| (df["commodity"] == bar_commodity) & | |
| (df["attribute"] == bar_attribute) & | |
| (df["year"] == bar_year) | |
| ] | |
| columns = ["region", "value", "ratio"] | |
| if "ratio_shock2" in df.columns: | |
| columns.append("ratio_shock2") | |
| columns.append("value_with_shock2") if "value_with_shock2" in df.columns else None | |
| return df[ | |
| (df["commodity"] == bar_commodity) & | |
| (df["attribute"] == bar_attribute) & | |
| (df["year"] == bar_year) | |
| ][columns].copy() | |
| # ----------------- Build Bar Chart ----------------- | |
| # Draws a bar chart comparing percent change in each region | |
| # Optionally shows side-by-side bars if both shocks are available | |
| def build_bar_chart(bar_df, shock1_label, shock2_label): | |
| # Label handling for display | |
| label1 = "Shock 1" if shock1_label == "Scenario" else shock1_label | |
| label2 = "Shock 2" if shock2_label == "Scenario" else shock2_label | |
| # Country name mapping | |
| country_mapping = { | |
| "ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada", | |
| "CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia", | |
| "IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico", | |
| "MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia", | |
| "THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa", | |
| } | |
| # Adjust label spacing and bar width depending on # of shocks | |
| if shock2_label: | |
| bar_size, font_size, offset_pos, offset_neg = 15, 10, 0.5, -0.6 | |
| else: | |
| bar_size, font_size, offset_pos, offset_neg = 25, 11, 1.0, -1.2 | |
| bar_df = bar_df.copy().dropna(subset=["ratio"]) | |
| bar_df["pct_change_1"] = (bar_df["ratio"] - 1) * 100 | |
| bar_df["shocked_value_1"] = bar_df["value"] * bar_df["ratio"] | |
| # Handle second shock if available | |
| if "ratio_shock2" in bar_df.columns and "value_with_shock2" in bar_df.columns: | |
| bar_df["pct_change_2"] = (bar_df["ratio_shock2"] - 1) * 100 | |
| bar_df["shocked_value_2"] = bar_df["value_with_shock2"] | |
| # Create two separate dataframes and then merge for side-by-side display | |
| df1 = bar_df[["region", "value", "shocked_value_1", "pct_change_1"]].copy() | |
| df1["Shock"] = label1 | |
| df1.rename(columns={ | |
| "value": "Baseline Value", | |
| "shocked_value_1": "Shocked Value", | |
| "pct_change_1": "Percent Change" | |
| }, inplace=True) | |
| df2 = bar_df[["region", "value", "shocked_value_2", "pct_change_2"]].copy() | |
| df2["Shock"] = label2 | |
| df2.rename(columns={ | |
| "value": "Baseline Value", | |
| "shocked_value_2": "Shocked Value", | |
| "pct_change_2": "Percent Change" | |
| }, inplace=True) | |
| bar_df_melt = pd.concat([df1, df2], ignore_index=True) | |
| else: | |
| # Only one shock scenario | |
| bar_df_melt = bar_df[["region", "value", "shocked_value_1", "pct_change_1"]].copy() | |
| bar_df_melt["Shock"] = label1 | |
| bar_df_melt.rename(columns={ | |
| "value": "Baseline Value", | |
| "shocked_value_1": "Shocked Value", | |
| "pct_change_1": "Percent Change" | |
| }, inplace=True) | |
| # Add full region names and label adjustment | |
| bar_df_melt["region_fullname"] = bar_df_melt["region"].map(country_mapping) | |
| bar_df_melt["show_label"] = True | |
| bar_df_melt["label_y"] = bar_df_melt["Percent Change"] + bar_df_melt["Percent Change"].apply( | |
| lambda x: offset_pos if x >= 0 else offset_neg | |
| ) | |
| # Allow toggling between shocks via legend | |
| selection = alt.selection_multi(fields=["Shock"], bind="legend") | |
| # Main bar chart | |
| base = alt.Chart(bar_df_melt).encode( | |
| x=alt.X( | |
| "region:N", | |
| title="Region", | |
| axis=alt.Axis(labelAngle=0), | |
| scale=alt.Scale(paddingInner=0.5, paddingOuter=0.1) | |
| ), | |
| xOffset=alt.XOffset("Shock:N", sort=[label1, label2] if shock2_label else [label1]), | |
| y=alt.Y("Percent Change:Q", title="Percent Change (%)"), | |
| color=alt.Color( | |
| "Shock:N", | |
| scale=alt.Scale(domain=[label1, label2] if shock2_label else [label1], | |
| range=["#6baed6", "#fd8d3c"] if shock2_label else ["#6baed6"]) | |
| ) | |
| ) | |
| bar = base.mark_bar(size=bar_size).encode( | |
| tooltip=[ | |
| alt.Tooltip("region_fullname:N", title="Region"), | |
| alt.Tooltip("Shock:N", title="Shock"), | |
| alt.Tooltip("Percent Change:Q", title="Percent Change (%)", format=".2f"), | |
| alt.Tooltip("Baseline Value:Q", title="Baseline Value", format=".2f"), | |
| alt.Tooltip("Shocked Value:Q", title="Shocked Value", format=".2f") | |
| ] | |
| ).add_selection(selection).transform_filter(selection) | |
| # Numeric labels on top of bars | |
| text = base.mark_text( | |
| align="center", fontSize=font_size | |
| ).encode( | |
| y="label_y:Q", | |
| text=alt.Text("Percent Change:Q", format=".1f") | |
| ).transform_filter("datum.show_label == true") | |
| return (bar + text).properties(width=700, height=400) | |
| # ----------------- Streamlit UI for Bar Chart ----------------- | |
| # User selects commodity, attribute, and year; chart shows percent changes across regions | |
| with st.expander("Bar Chart", expanded=False): | |
| st.header("Regional Percent Change Overview") | |
| # UI selections | |
| bar_commodity = st.selectbox("Select Commodity", sorted(df["commodity"].unique()), key="bar_commodity") | |
| bar_attribute = st.selectbox("Select Attribute", sorted(df["attribute"].unique()), key="bar_attribute") | |
| bar_year = st.selectbox("Select Year", sorted([y for y in df["year"].dropna().unique() if y >= 2023]), key="bar_year") | |
| # Filter and render chart | |
| bar_df = filter_bar_df(df, bar_commodity, bar_attribute, bar_year) | |
| if bar_df.empty: | |
| st.warning("No data available for the selected combination.") | |
| else: | |
| chart = build_bar_chart(bar_df, shock1_label, shock2_label) | |
| st.altair_chart(chart, use_container_width=True) | |
| # ----------------- Filter Data for Heatmap ----------------- | |
| # Extracts data for the selected year and attribute | |
| def filter_heatmap_data(df, selected_year, selected_attribute_dynamic): | |
| return df[(df["year"] == selected_year) & (df["attribute"] == selected_attribute_dynamic)] | |
| # ----------------- Build Altair Heatmap(s) ----------------- | |
| # Builds one or two heatmaps (Shock 1 and optional Shock 2) | |
| # Each heatmap shows % change by region and commodity | |
| def build_heatmaps_altair(df_year, shock1_label, shock2_label): | |
| # Label setup | |
| label1 = "Shock 1" if shock1_label == "Scenario" else shock1_label | |
| label2 = "Shock 2" if shock2_label == "Scenario" else shock2_label | |
| # Region name mapping | |
| country_mapping = { | |
| "ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada", | |
| "CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia", | |
| "IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico", | |
| "MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia", | |
| "THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa" | |
| } | |
| charts = [] | |
| # Handle 1 or 2 shocks dynamically | |
| for label, ratio_col, shock_value_col in [ | |
| (label1, "ratio", "value_with_shock1"), | |
| (label2, "ratio_shock2", "value_with_shock2") | |
| ] if shock2_label else [(label1, "ratio", "value_with_shock1")]: | |
| # Skip if required columns are missing | |
| if ratio_col not in df_year.columns or shock_value_col not in df_year.columns: | |
| continue | |
| # Prepare data | |
| data = df_year[["region", "commodity", "value", ratio_col, shock_value_col]].copy() | |
| data["percent_change"] = (data[ratio_col] - 1) * 100 | |
| data["region_fullname"] = data["region"].map(country_mapping) | |
| data.rename(columns={ | |
| "value": "Baseline Value", | |
| shock_value_col: "Shocked Value" | |
| }, inplace=True) | |
| # Create the heatmap chart | |
| chart = alt.Chart(data).mark_rect().encode( | |
| x=alt.X('commodity:N', title='Commodity', sort=alt.EncodingSortField(field="commodity", order="ascending"), | |
| axis=alt.Axis(labelAngle=45)), | |
| y=alt.Y('region_fullname:N', title='Region', sort=alt.EncodingSortField(field="region_fullname", order="ascending")), | |
| color=alt.Color('percent_change:Q', | |
| scale=alt.Scale(domain=[-100, 0, 100], range=["#4575b4", "white", "#d73027"]), | |
| title="Percent Change (%)"), | |
| tooltip=[ | |
| alt.Tooltip('region_fullname:N', title='Region'), | |
| alt.Tooltip('commodity:N', title='Commodity'), | |
| alt.Tooltip('percent_change:Q', title='Percent Change (%)', format='.1f'), | |
| alt.Tooltip('Baseline Value:Q', title='Baseline Value', format=".2f"), | |
| alt.Tooltip('Shocked Value:Q', title='Shocked Value', format=".2f") | |
| ] | |
| ).properties( | |
| width=40 * data['commodity'].nunique(), | |
| height=40 * data['region'].nunique(), | |
| title=label | |
| ) | |
| charts.append(chart) | |
| # Combine charts horizontally | |
| if not charts: | |
| return None | |
| return alt.hconcat(*charts).resolve_scale(color='independent').configure_axis(grid=True) | |
| # ----------------- Streamlit UI for Heatmap ----------------- | |
| # User selects year and attribute, and the app renders 1 or 2 heatmaps | |
| with st.expander("Heatmap", expanded=False): | |
| st.header("Heatmap of Ratio by Selected Year") | |
| # Dropdowns for user input | |
| year_list = sorted([y for y in df["year"].dropna().unique() if y >= 2023]) | |
| selected_year = st.selectbox("Select Year for Heatmap", year_list) | |
| selected_attribute_dynamic = st.selectbox("Select Attribute for Yearly Heatmap", sorted(df["attribute"].unique()), key="yearly_attr") | |
| # Filter dataset | |
| df_year = filter_heatmap_data(df, selected_year, selected_attribute_dynamic) | |
| if df_year.empty: | |
| st.warning("No data available for the selected combination.") | |
| else: | |
| chart = build_heatmaps_altair(df_year, shock1_label, shock2_label) | |
| if chart is None: | |
| st.warning("No valid data to display.") | |
| else: | |
| st.altair_chart(chart, use_container_width=True) | |
| # ----------------- Ensure Percent Change Column Exists ----------------- | |
| # If not already created, compute percent change from ratio column. | |
| if "pct_change" not in df.columns and "ratio" in df.columns: | |
| df["pct_change"] = (df["ratio"] - 1) * 100 | |
| # ----------------- Compute Correlation Matrix ----------------- | |
| # Filters data by region, commodity, and years | |
| # Then pivots the table to wide format and computes pairwise correlations | |
| def compute_attribute_correlation(df, region, commodity, years): | |
| # Filter by region, commodity, and selected years | |
| df_filtered = df[ | |
| (df["region"] == region) & | |
| (df["commodity"] == commodity) & | |
| (df["year"].isin(years)) | |
| ] | |
| if df_filtered.empty: | |
| return None, "โ No data available for the selected filters." | |
| # Pivot the data: attributes as columns, year as index | |
| df_pivot = df_filtered.pivot_table( | |
| index="year", | |
| columns="attribute", | |
| values="pct_change" | |
| ) | |
| if df_pivot.shape[1] < 2: | |
| return None, "Not enough attributes to compute correlation." | |
| # Compute correlation matrix | |
| return df_pivot.corr(), None | |
| # ----------------- Streamlit UI for Attribute Correlation ----------------- | |
| # User selects region, commodity, and year range | |
| # App computes and renders attribute correlation heatmap | |
| with st.expander("Attribute Correlation Matrix (by Region & Years)", expanded=False): | |
| st.header("Attribute Interdependence Correlation") | |
| # Prompt to upload data first | |
| if zip_file_1 is None: | |
| st.info("Please upload at least one ZIP file to enable this analysis.") | |
| else: | |
| # Region mapping (for user-friendly dropdown) | |
| country_mapping = { | |
| "ARG": "Argentina", "AUS": "Australia", "BRA": "Brazil", "CAN": "Canada", | |
| "CHN": "China", "EGY": "Egypt", "EUR": "European Union", "IDN": "Indonesia", | |
| "IND": "India", "JPN": "Japan", "KOR": "South Korea", "MEX": "Mexico", | |
| "MYS": "Malaysia", "PHL": "Philippines", "ROW": "Rest of World", "RUS": "Russia", | |
| "THA": "Thailand", "UKR": "Ukraine", "USA": "United States", "ZAF": "South Africa" | |
| } | |
| # Dropdown for region (full name) | |
| region_code_list = sorted(df["region"].dropna().unique()) | |
| region_options = {country_mapping.get(code, code): code for code in region_code_list} | |
| selected_country_name = st.selectbox("Select Region", sorted(region_options.keys()), key="corr_region") | |
| selected_region = region_options[selected_country_name] | |
| # Dropdown for commodity | |
| commodity_options = sorted(df["commodity"].dropna().unique()) | |
| selected_commodity = st.selectbox("Select Commodity", commodity_options, key="corr_commodity") | |
| # Year range slider (minimum 3 years required) | |
| min_year = 2023 | |
| max_year = int(df["year"].dropna().max()) | |
| start_year, end_year = st.slider( | |
| "Select Year Range (minimum 3 years)", | |
| min_value=min_year, | |
| max_value=max_year, | |
| value=(max(max_year - 2, min_year), max_year), | |
| step=1, | |
| key="corr_year_range" | |
| ) | |
| if end_year - start_year + 1 < 3: | |
| st.warning("Please select a range of at least 3 years.") | |
| selected_years = [] | |
| else: | |
| selected_years = list(range(start_year, end_year + 1)) | |
| # Compute correlation matrix | |
| corr_matrix, err_msg = compute_attribute_correlation(df, selected_region, selected_commodity, selected_years) | |
| if err_msg: | |
| st.warning(err_msg) | |
| else: | |
| # Plot with Seaborn | |
| fig, ax = plt.subplots(figsize=(7, 6)) | |
| # Create upper triangle mask | |
| mask = np.triu(np.ones_like(corr_matrix, dtype=bool), k=1) | |
| # Draw heatmap with annotation | |
| sns.heatmap( | |
| corr_matrix, | |
| mask=mask, | |
| cmap="coolwarm", | |
| annot=True, | |
| fmt=".2f", | |
| center=0, | |
| square=True, | |
| linewidths=0.5, | |
| cbar_kws={"shrink": 0.7}, | |
| ax=ax | |
| ) | |
| ax.set_title(f"{country_mapping.get(selected_region, selected_region)} - {selected_commodity} ({start_year}โ{end_year})", | |
| fontsize=12, pad=10) | |
| ax.tick_params(axis='x', labelrotation=45, labelsize=9) | |
| ax.tick_params(axis='y', labelsize=9) | |
| ax.set_xlabel("") | |
| ax.set_ylabel("") | |
| # Display the plot | |
| st.pyplot(fig) |