Metrics / src /streamlit_app.py
YaraKyrychenko's picture
Update src/streamlit_app.py
74e44d4 verified
import re
from pathlib import Path
from typing import Dict, Iterable, List
import pandas as pd
import streamlit as st
from datasets import load_dataset
st.set_page_config(
page_title="Misinformation Resilience Metrics",
page_icon="🧭",
layout="wide",
initial_sidebar_state="expanded",
)
DISPLAY_COLUMNS = [
"MeasureName",
"MeasureAbbreviation",
"Year",
"Reference",
"MeasureDetails",
"Paper",
"Measure",
"Language",
"LanguageFullNames",
"Population",
"PopulationFullNames",
"Validated",
"Online",
"Objective",
"Specific",
"LengthItems",
"LengthBucket",
"Constructs",
"LongTermMalleability",
"ShortTermMalleability",
"StimuliType",
"StimuliOrigin",
"StimuliSource",
"StimuliCharacteristics",
"StimuliPlatform",
"ResponseOption",
"ComponentType",
"BehaviorType",
"RiskType",
]
UNKNOWN_VALUES = {"", "unknown", "na", "n/a", "nan", "none", "unspecified"}
MULTI_DELIMITERS = r"\s*(?:,|;|/|\||&|\band\b|\+|\\)\s*"
FILTER_LABEL = "All"
ITEM_DATASET_ID = "MisinfoResilience/ItemSelectionTool"
APP_DIR = Path(__file__).resolve().parent
LOCAL_ITEM_DATA_PATH = APP_DIR / "MRM - Item Selection Tool.csv"
TEST_SIZE_OPTIONS = {
"4-item test": 1,
"20-item test": 5,
"40-item test": 10,
}
@st.cache_data(show_spinner=False)
def load_data() -> pd.DataFrame:
ds = load_dataset("MisinfoResilience/Metrics")
df = list(ds.values())[0].to_pandas().copy()
df = df.fillna("")
df["YearSort"] = pd.to_numeric(df["YearSort"], errors="coerce")
df["LengthItems"] = pd.to_numeric(df["LengthItems"], errors="coerce")
return df
@st.cache_data(show_spinner=False)
def load_item_data() -> pd.DataFrame:
try:
ds = load_dataset(ITEM_DATASET_ID)
df = list(ds.values())[0].to_pandas().copy()
except Exception:
df = pd.read_csv(LOCAL_ITEM_DATA_PATH).copy()
df = df.fillna("")
df["Rank"] = pd.to_numeric(df["Rank"], errors="coerce")
df["Lambda"] = pd.to_numeric(df["Lambda"], errors="coerce")
df["TestTierRank"] = pd.to_numeric(df["TestTierRank"], errors="coerce")
df["TestSizeItems"] = pd.to_numeric(df["TestSizeItems"], errors="coerce")
return df
def split_multivalue(value: str) -> List[str]:
text = str(value).strip()
if not text or text.lower() in UNKNOWN_VALUES:
return []
parts = [p.strip() for p in re.split(MULTI_DELIMITERS, text) if p.strip()]
return parts
def is_url(value: str) -> bool:
return bool(re.match(r"^https?://", str(value).strip(), flags=re.IGNORECASE))
def sorted_options(values: Iterable[str], all_label: str = FILTER_LABEL) -> List[str]:
cleaned = sorted({str(v).strip() for v in values if str(v).strip()}, key=lambda x: (x == "Unknown", x))
return [all_label] + cleaned
def sorted_code_options(values: Iterable[str], all_label: str = FILTER_LABEL) -> List[str]:
codes = set()
for value in values:
codes.update(split_multivalue(str(value)))
cleaned = sorted(code for code in codes if code and code != "Unknown")
return [all_label] + cleaned
def natural_cluster_options(values: Iterable[str]) -> List[str]:
return sorted({str(v).strip() for v in values if str(v).strip()})
def matches_selected_codes(value: str, selected: List[str]) -> bool:
if not selected or FILTER_LABEL in selected:
return True
value_codes = set(split_multivalue(str(value)))
return bool(value_codes & set(selected))
def construct_color(constructs: str) -> str:
construct_set = {item.strip() for item in str(constructs).split(",") if item.strip()}
if "Skill" in construct_set:
return "#1f7a8c"
if "Attitudes / Norms / Beliefs" in construct_set:
return "#b85c38"
if "Knowledge" in construct_set:
return "#6a994e"
if "Behavioral Correlate" in construct_set:
return "#7b2cbf"
if "Identity / Risk Factor" in construct_set:
return "#c1121f"
return "#6c757d"
def apply_filters(df: pd.DataFrame, filters: Dict[str, List[str]], search_query: str) -> pd.DataFrame:
out = df.copy()
for col, selected in filters.items():
if selected and FILTER_LABEL not in selected:
if col in {"Language", "Population"}:
out = out[out[col].apply(lambda value: matches_selected_codes(value, selected))]
else:
out = out[out[col].isin(selected)]
query = search_query.strip().lower()
if query:
out = out[out["SearchText"].str.contains(re.escape(query), na=False, regex=True)]
sort_cols = ["YearSort", "MeasureName"]
ascending = [False, True]
out = out.sort_values(sort_cols, ascending=ascending, na_position="last")
return out.reset_index(drop=True)
def compute_match_score(row: pd.Series, preferred_constructs: List[str]) -> int:
score = 0
if row.get("Validated") == "Yes":
score += 2
if row.get("Online") == "Yes":
score += 1
if row.get("LengthBucket") == "Short":
score += 1
if row.get("LengthBucket") == "Medium":
score += 1
if preferred_constructs:
row_constructs = {c.strip() for c in str(row.get("Constructs", "")).split(",") if c.strip()}
score += 2 * len(set(preferred_constructs) & row_constructs)
return score
def add_recommendation_score(df: pd.DataFrame, preferred_constructs: List[str]) -> pd.DataFrame:
out = df.copy()
out["MatchScore"] = out.apply(lambda row: compute_match_score(row, preferred_constructs), axis=1)
return out.sort_values(["MatchScore", "YearSort", "MeasureName"], ascending=[False, False, True])
def filter_item_data(items: pd.DataFrame, max_rank: int, clusters: List[str], query: str) -> pd.DataFrame:
out = items[items["Rank"] <= max_rank].copy()
if clusters:
out = out[out["Cluster"].isin(clusters)]
search = query.strip().lower()
if search:
out = out[out["SearchText"].str.contains(re.escape(search), na=False, regex=True)]
return out.sort_values(["Cluster", "Rank"], ascending=[True, True]).reset_index(drop=True)
def render_badges(row: pd.Series) -> str:
badges = []
for col in ["Validated", "Online", "Objective", "Specific", "LengthBucket"]:
value = row.get(col, "Unknown")
if value and value != "Unknown":
badges.append(f"`{col}: {value}`")
constructs = row.get("Constructs", "Unknown")
if constructs != "Unknown":
for c in [x.strip() for x in constructs.split(",") if x.strip()]:
badges.append(f"`{c}`")
return " ".join(badges)
def render_measure_card(row: pd.Series, key_prefix: str = "card") -> None:
title = row.get("MeasureName", "Unknown measure")
abbr = row.get("MeasureAbbreviation", "")
year = row.get("Year", "Unknown")
color = construct_color(row.get("Constructs", ""))
subtitle = f"**{title}**"
if abbr and abbr.lower() not in UNKNOWN_VALUES:
subtitle += f" ({abbr})"
subtitle += f" · {year}"
with st.container(border=True):
st.markdown(
f"""
<div style="height:0.45rem;background:{color};border-radius:0.5rem;margin:-0.25rem 0 0.75rem 0;"></div>
""",
unsafe_allow_html=True,
)
st.markdown(subtitle)
st.markdown(render_badges(row))
details = row.get("MeasureDetails", "") or "No description available."
st.write(details)
population_name = row.get("PopulationFullNames", "Unknown")
language_name = row.get("LanguageFullNames", "Unknown")
meta_left, meta_mid = st.columns(2)
meta_left.caption(f"Population: {population_name}")
meta_mid.caption(f"Language: {language_name}")
st.caption(f"Reference: {row.get('Reference', 'Unknown')}")
link_col1, link_col2 = st.columns(2)
paper = row.get("Paper", "")
measure = row.get("Measure", "")
if paper and is_url(paper):
link_col1.link_button("Open paper", paper, use_container_width=True)
elif paper:
if link_col1.toggle("Show paper text", value=False, key=f"{key_prefix}_paper_toggle_{title}"):
st.caption("Paper text")
st.code(paper, language=None)
else:
link_col1.button("Paper unavailable", disabled=True, use_container_width=True, key=f"{key_prefix}_paper_{title}")
if measure and is_url(measure):
link_col2.link_button("Open measure", measure, use_container_width=True)
elif measure:
if link_col2.toggle("Show measure text", value=False, key=f"{key_prefix}_measure_toggle_{title}"):
st.caption("Measure text")
st.code(measure, language=None)
else:
link_col2.button("Measure unavailable", disabled=True, use_container_width=True, key=f"{key_prefix}_measure_{title}")
with st.expander("See full metadata"):
metadata = {
"Reference": row.get("Reference", "Unknown"),
"Length (items)": row.get("LengthItems", "Unknown") if row.get("LengthItems") is not None else "Unknown",
"Long-term malleability": row.get("LongTermMalleability", "Unknown"),
"Short-term malleability": row.get("ShortTermMalleability", "Unknown"),
"Stimuli type": row.get("StimuliType", "Unknown"),
"Stimuli origin": row.get("StimuliOrigin", "Unknown"),
"Stimuli source": row.get("StimuliSource", "Unknown"),
"Stimuli characteristics": row.get("StimuliCharacteristics", "Unknown"),
"Stimuli platform": row.get("StimuliPlatform", "Unknown"),
"Response option": row.get("ResponseOption", "Unknown"),
"Component type": row.get("ComponentType", "Unknown"),
"Behavior type": row.get("BehaviorType", "Unknown"),
"Risk type": row.get("RiskType", "Unknown"),
}
meta_df = pd.DataFrame(metadata.items(), columns=["Field", "Value"])
st.dataframe(meta_df, hide_index=True, use_container_width=True)
def render_summary(filtered: pd.DataFrame) -> None:
total = len(filtered)
validated_yes = int((filtered["Validated"] == "Yes").sum()) if total else 0
online_yes = int((filtered["Online"] == "Yes").sum()) if total else 0
median_items = (
int(filtered["LengthItems"].dropna().median()) if total and filtered["LengthItems"].dropna().shape[0] else None
)
m1, m2, m3, m4 = st.columns(4)
m1.metric("Matching measures", total)
m2.metric("Validated", validated_yes)
m3.metric("Online", online_yes)
m4.metric("Median length", f"{median_items} items" if median_items is not None else "Unknown")
def render_charts(filtered: pd.DataFrame) -> None:
if filtered.empty:
return
c1, c2, c3 = st.columns(3)
with c1:
year_counts = (
filtered[filtered["Year"] != "Unknown"]["Year"]
.value_counts()
.sort_index()
)
st.caption("Measures by publication year")
if not year_counts.empty:
st.bar_chart(year_counts, use_container_width=True)
else:
st.info("No year data available.")
with c2:
validated_counts = filtered["Validated"].value_counts()
st.caption("Validation status")
st.bar_chart(validated_counts, use_container_width=True)
with c3:
length_counts = filtered["LengthBucket"].value_counts()
st.caption("Measure length")
st.bar_chart(length_counts, use_container_width=True)
def render_compare(compare_df: pd.DataFrame, selected_names: List[str]) -> None:
if not selected_names:
st.info("Select measures in this tab to compare them side by side.")
return
subset = compare_df[compare_df["MeasureName"].isin(selected_names)].copy()
if subset.empty:
st.info("No comparable measures match the current filters.")
return
subset = subset.set_index("MeasureName")
compare_fields = [
"MeasureAbbreviation",
"Year",
"Reference",
"Population",
"Language",
"Validated",
"Online",
"Objective",
"Specific",
"LengthItems",
"LengthBucket",
"Constructs",
"LongTermMalleability",
"ShortTermMalleability",
"StimuliType",
"StimuliOrigin",
"StimuliSource",
"StimuliCharacteristics",
"StimuliPlatform",
"ResponseOption",
"ComponentType",
"BehaviorType",
"RiskType",
"Paper",
"Measure",
]
comparison = subset[compare_fields].T.reset_index().rename(columns={"index": "Field"})
st.dataframe(
comparison,
hide_index=True,
use_container_width=True,
column_config={
"Paper": st.column_config.LinkColumn("Paper"),
"Measure": st.column_config.TextColumn("Measure", width="large"),
},
)
def render_measure_selection_tool(ranked: pd.DataFrame) -> None:
st.subheader("Measure selection tool")
st.write(
"Use the filters in the sidebar to narrow the measure database. The highest-ranked matches below prioritise validation, online administration, shorter measures, and your selected construct focus."
)
if ranked.empty:
st.warning("No measures match the current filters.")
return
top_n = min(6, len(ranked))
st.caption(f"Showing the top {top_n} matches from {len(ranked)} currently matching measures.")
cols = st.columns(2)
for idx, (_, row) in enumerate(ranked.head(top_n).iterrows()):
with cols[idx % 2]:
render_measure_card(row, key_prefix=f"selector_{idx}")
def render_item_card(row: pd.Series, example_mode: str, key_prefix: str) -> None:
with st.container(border=True):
st.markdown(f"**{row.get('Cluster', 'Unknown cluster')}**")
st.caption(
f"Rank {int(row['Rank']) if pd.notna(row.get('Rank')) else 'Unknown'} · "
f"{row.get('Variable', 'Unknown variable')} · λ={row.get('Lambda', 'Unknown')}"
)
st.write(row.get("ItemPhrasing", ""))
climate = row.get("ClimateExample", "")
crypto = row.get("CryptocurrencyExample", "")
if example_mode in {"Climate examples", "Both examples"} and climate:
with st.expander("Climate example"):
st.write(climate)
if example_mode in {"Cryptocurrency examples", "Both examples"} and crypto:
with st.expander("Cryptocurrency example"):
st.write(crypto)
def render_item_selection_tool(items: pd.DataFrame) -> None:
st.subheader("Select MRM items")
st.write(
"Build a short, balanced item set from the top-ranked items in each MRM cluster. The 4-item test takes the top item from each cluster, the 20-item test takes the top five, and the 40-item test takes the top ten."
)
c1, c2, c3 = st.columns([1, 1, 2])
test_size_label = c1.selectbox("Test size", list(TEST_SIZE_OPTIONS.keys()), index=0)
example_mode = c2.selectbox(
"Examples",
["Hide examples", "Climate examples", "Cryptocurrency examples", "Both examples"],
index=3,
)
selected_clusters = c3.multiselect(
"Clusters",
natural_cluster_options(items["Cluster"]),
placeholder="All clusters",
)
selected_items = filter_item_data(items, TEST_SIZE_OPTIONS[test_size_label], selected_clusters, "")
m1, m2, m3 = st.columns(3)
m1.metric("Selected items", len(selected_items))
m2.metric("Clusters represented", selected_items["Cluster"].nunique() if not selected_items.empty else 0)
median_lambda = selected_items["Lambda"].median() if not selected_items.empty else None
m3.metric("Median loading", f"{median_lambda:.3f}" if median_lambda is not None else "Unknown")
if selected_items.empty:
st.warning("No items match the current item filters.")
return
card_tab, table_tab = st.tabs(["Item cards", "Item table"])
with card_tab:
for idx, (_, row) in enumerate(selected_items.iterrows()):
render_item_card(row, example_mode, key_prefix=f"item_{idx}")
with table_tab:
download_columns = [
"Cluster",
"ClusterAbbreviation",
"Rank",
"Variable",
"Lambda",
"ItemPhrasing",
"ClimateExample",
"CryptocurrencyExample",
"TestTier",
"TestSizeItems",
]
st.download_button(
"Download selected items as CSV",
data=selected_items[download_columns].to_csv(index=False).encode("utf-8"),
file_name="mrm_selected_items.csv",
mime="text/csv",
use_container_width=True,
)
st.dataframe(
selected_items[download_columns],
hide_index=True,
use_container_width=True,
column_config={
"ItemPhrasing": st.column_config.TextColumn("Item phrasing", width="large"),
"ClimateExample": st.column_config.TextColumn("Climate example", width="large"),
"CryptocurrencyExample": st.column_config.TextColumn("Cryptocurrency example", width="large"),
"Lambda": st.column_config.NumberColumn("λ", format="%.3f"),
},
)
def render_explore_controls(df: pd.DataFrame) -> None:
with st.container(border=True):
st.text_input(
"Search",
key="search_query",
placeholder="Search by name, construct, reference, stimuli, or details",
)
c1, c2, c3, c4 = st.columns(4)
c1.button("Clear all", on_click=clear_filters, use_container_width=True)
c2.button("Validated + online", on_click=preset_validated_online, use_container_width=True)
c3.button("Short", on_click=preset_short, use_container_width=True)
c4.button("Recent", on_click=preset_recent, use_container_width=True)
st.subheader("Study context")
f1, f2, f3 = st.columns(3)
f1.multiselect("Language", sorted_code_options(df["Language"].unique()), default=[FILTER_LABEL], key="f_language")
f2.multiselect("Population", sorted_code_options(df["Population"].unique()), default=[FILTER_LABEL], key="f_population")
year_options = [FILTER_LABEL] + sorted([y for y in df["Year"].unique() if y != "Unknown"], reverse=True) + (
["Unknown"] if "Unknown" in set(df["Year"].unique()) else []
)
f3.multiselect("Year", year_options, default=[FILTER_LABEL], key="f_year")
st.subheader("Measure properties")
m1, m2, m3, m4, m5 = st.columns(5)
m1.multiselect("Online", sorted_options(df["Online"].unique()), default=[FILTER_LABEL], key="f_online")
m2.multiselect("Validated", sorted_options(df["Validated"].unique()), default=[FILTER_LABEL], key="f_validated")
m3.multiselect("Objective", sorted_options(df["Objective"].unique()), default=[FILTER_LABEL], key="f_objective")
m4.multiselect("Specific", sorted_options(df["Specific"].unique()), default=[FILTER_LABEL], key="f_specific")
m5.multiselect("Length", sorted_options(df["LengthBucket"].unique()), default=[FILTER_LABEL], key="f_lengthbucket")
st.subheader("Construct focus")
construct_options = [
"Skill",
"Attitudes / Norms / Beliefs",
"Knowledge",
"Behavioral Correlate",
"Identity / Risk Factor",
]
st.multiselect("Prioritise constructs", construct_options, key="f_constructs")
def clear_filters() -> None:
defaults = {
"search_query": "",
"f_language": [FILTER_LABEL],
"f_population": [FILTER_LABEL],
"f_year": [FILTER_LABEL],
"f_online": [FILTER_LABEL],
"f_validated": [FILTER_LABEL],
"f_objective": [FILTER_LABEL],
"f_specific": [FILTER_LABEL],
"f_lengthbucket": [FILTER_LABEL],
"f_constructs": [],
"compare_measures": [],
}
for key, value in defaults.items():
st.session_state[key] = value
def preset_validated_online() -> None:
st.session_state.f_validated = ["Yes"]
st.session_state.f_online = ["Yes"]
def preset_short() -> None:
st.session_state.f_lengthbucket = ["Short"]
def preset_recent() -> None:
recent_years = sorted({y for y in data["Year"].unique() if y.isdigit() and int(y) >= 2020}, reverse=True)
st.session_state.f_year = recent_years if recent_years else [FILTER_LABEL]
data = load_data()
item_data = load_item_data()
st.title("Misinformation Resilience Metrics")
st.markdown("Select measures, build item sets, compare tools, and explore the MRM database.")
st.caption("Maertens, R., Götz, F., Xu, R., Roozenbeek, J., Kyrychenko, Y., Rode, J., Golino, H., Remshard, M., Lewandowski, J. and Goldberg, B. (2026). The Cognitive-Affective-Behavioural Structure of Misinformation Resilience. OSF. https://doi.org/10.31234/osf.io/r8nhc_v2")
with st.sidebar:
st.header("MisinfoResilienceMetrics")
st.markdown("🔍 [**Explore measures**](#explore-measures)")
st.markdown("⚖️ [**Compare measures**](#compare-measures)")
st.markdown("🎯 [**Select MRM items**](#select-mrm-items)")
explore_tab, compare_tab, item_tab = st.tabs(["🔍 Explore measures", "⚖️ Compare measures", "🎯 Select MRM items"])
with explore_tab:
st.subheader("Explore measures")
render_explore_controls(data)
filters = {
"Language": st.session_state.get("f_language", [FILTER_LABEL]),
"Population": st.session_state.get("f_population", [FILTER_LABEL]),
"Year": st.session_state.get("f_year", [FILTER_LABEL]),
"Online": st.session_state.get("f_online", [FILTER_LABEL]),
"Validated": st.session_state.get("f_validated", [FILTER_LABEL]),
"Objective": st.session_state.get("f_objective", [FILTER_LABEL]),
"Specific": st.session_state.get("f_specific", [FILTER_LABEL]),
"LengthBucket": st.session_state.get("f_lengthbucket", [FILTER_LABEL]),
}
filtered = apply_filters(data, filters, st.session_state.get("search_query", ""))
ranked = add_recommendation_score(filtered, st.session_state.get("f_constructs", []))
render_summary(ranked)
render_charts(ranked)
render_measure_selection_tool(ranked)
with compare_tab:
st.subheader("Compare measures")
compare_options = ranked["MeasureName"].tolist()
st.multiselect(
"Select up to 4 measures to compare",
compare_options,
max_selections=4,
key="compare_measures",
)
render_compare(ranked, st.session_state.get("compare_measures", []))
with item_tab:
render_item_selection_tool(item_data)