################################
# CONFIGURATION
################################
import streamlit as st
import pandas as pd
import hashlib
import joblib
from io import BytesIO
from umap import UMAP
from hdbscan import HDBSCAN
import os
from streamlit_extras.metric_cards import style_metric_cards
from streamlit_extras.add_vertical_space import add_vertical_space
# ---- FUNCTIONS ----
from src.extract_usage import extract_usage
from src.necessity_index import compute_necessity, index_scaler, qcut_labels
from src.column_detection import detect_freeform_col, detect_id_col, detect_school_type_col
from src.shortlist import shortlist_applications
from src.twinkl_originals import find_book_candidates
from src.preprocess_text import normalise_text
import src.models.topic_modeling_pipeline as topic_modeling_pipeline
from src.px_charts import plot_histogram, plot_topic_countplot
from typing import Tuple
style_metric_cards(box_shadow=False, border_left_color='#E7F4FF',background_color='#E7F4FF', border_size_px=0, border_radius_px=6)
##################################
# CACHED PROCESSING FUNCTIONS
##################################
# -----------------------------------------------------------------------------
# Heavy processing (IO + NLP) is cached to avoid re‑executing when the UI state
# changes. The function only re‑runs if the **file contents** change.
# -----------------------------------------------------------------------------
@st.cache_resource
def load_heartfelt_predictor():
model_path = os.path.join("src", "models", "heartfelt_pipeline.joblib")
return joblib.load(model_path)
@st.cache_resource
def load_embeddings_model():
return topic_modeling_pipeline.load_embedding_model('all-MiniLM-L12-v2')
@st.cache_data(show_spinner=True)
def load_and_process(raw_csv: bytes) -> Tuple[pd.DataFrame, str]:
"""
Load CSV from raw bytes, detect freeform column, compute necessity scores,
and extract usage items. Returns processed DataFrame and freeform column name.
"""
# Read Uploaded Data
df_orig = pd.read_csv(BytesIO(raw_csv))
# Detect freeform column
freeform_col = detect_freeform_col(df_orig)
id_col = detect_id_col(df_orig)
school_type_col = detect_school_type_col(df_orig)
print(id_col)
df_orig = df_orig[df_orig[freeform_col].notna()]
#Word Count
df_orig['word_count'] = df_orig[freeform_col].fillna('').str.split().str.len()
# Compute Necessity Scores
scored = df_orig.join(df_orig[freeform_col].apply(compute_necessity))
scored['necessity_index'] = index_scaler(scored['necessity_index'].values)
scored['priority'] = qcut_labels(scored['necessity_index'])
# Find Twinkl Originals Candidates
scored['book_candidates'] = find_book_candidates(scored, freeform_col, school_type_col)
# Label Heartfelt Applications
scored['clean_text'] = scored[freeform_col].map(normalise_text)
model = load_heartfelt_predictor()
scored['is_heartfelt'] = model.predict(scored['clean_text'].astype(str))
# Usage Extraction
docs = df_orig[freeform_col].to_list()
scored['usage'] = extract_usage(docs)
return scored, freeform_col, id_col
# -----------------------------------------------------------------------------
# Derivative computations that rely only on the processed DataFrame are also
# cached. These are lightweight but still benefit from caching because this
# function might be called multiple times during widget interaction.
# -----------------------------------------------------------------------------
@st.cache_data(show_spinner=True)
def compute_shortlist(df: pd.DataFrame) -> pd.DataFrame:
"""Pre‑compute shortlist_score for all rows (used for both modes)."""
return shortlist_applications(df, k=len(df))
@st.cache_resource(show_spinner=True)
def run_topic_modeling():
try:
# ------- 1. Tokenize texts into sentences -------
nlp = topic_modeling_pipeline.load_spacy_model(model_name='en_core_web_sm')
sentences = []
mappings = []
for idx, application_text in df[freeform_col].dropna().items():
for sentence in topic_modeling_pipeline.spacy_sent_tokenize(application_text):
sentences.append(sentence)
mappings.append(idx)
# -------- 2. Generate embeddings -------
embeddings_model = load_embeddings_model()
embeddings = embeddings_model.encode(sentences, show_progress_bar=True)
# -------- 3. Topic Modeling --------
umap_model = UMAP(n_neighbors=7, n_components=5, min_dist=0.0, metric='cosine', random_state=42)
hdbscan_model = HDBSCAN(min_cluster_size=10, metric='euclidean', cluster_selection_method='eom', prediction_data=True)
# --------- 4. Perform Topic Modeling ---------
topic_model, topics, probs = topic_modeling_pipeline.bertopic_model(sentences, embeddings, embeddings_model, umap_model, hdbscan_model)
topic_modeling_pipeline.ai_labels_to_custom_name(topic_model)
return topic_model, topics, probs, mappings
except Exception as e:
st.error(f"Topic modeling failed: {e}")
st.code(traceback.format_exc()) # Shows the full error in a nice code box
return None, None, None, None
################################
# MAIN APP SCRIPT
################################
st.title("🪷 Community Collections Helper")
uploaded_file = st.file_uploader("Upload grant applications file for analysis", type='csv', label_visibility='collapsed')
# ========== FINGERPRINTING CURRENT FILE ==========
# This helps avoid reruns of certain functions as
# long as the file stays the same
if uploaded_file is not None:
raw = uploaded_file.read()
file_hash = hashlib.md5(raw).hexdigest()
st.session_state["current_file_hash"] = file_hash
else:
raw = None
st.session_state.pop("current_file_hash", None)
if raw is None:
st.stop()
## ====== DATA PROCESSING ======
df, freeform_col, id_col = load_and_process(raw) # from cached function
topic_model, topics, probs, mappings = run_topic_modeling() # from cached function
if topic_model is not None:
label_map = (topic_model
.get_topic_info()
.set_index("Topic")["CustomName"]
.to_dict())
df = topic_modeling_pipeline.attach_topics(df, mappings, topics, label_map, col="topics")
else:
st.warning("Topics could not be generated; continuing without them.")
topics_df = topic_model.get_topic_info()
topics_df = topics_df[topics_df['Topic'] > -1]
topics_df.drop(columns=['Name', 'OpenAI'], inplace=True)
cols_to_move = ['Topic','CustomName']
topics_df = topics_df[cols_to_move + [col for col in topics_df.columns if col not in cols_to_move]]
topics_df.rename(columns={'CustomName':'Topic Name', 'Topic':'Topic Nr.'}, inplace=True)
book_candidates_df = df[df['book_candidates'] == True]
###############################
# SIDE PANNEL #
###############################
with st.sidebar:
st.title("Shortlist Mode")
quantile_map = {"strict": 0.75, "generous": 0.5}
mode = st.segmented_control(
"Select one option",
options=["strict", "generous"],
default="strict",
)
scored_full = compute_shortlist(df)
threshold_score = scored_full["shortlist_score"].quantile(quantile_map[mode])
auto_short_df = scored_full[scored_full["shortlist_score"] >= threshold_score]
st.title("Filters")
## --- Dataframe To Filter ---
options = ['All applications', 'Not shortlisted']
selected_view = st.pills('Choose data to filter', options, default='Not shortlisted')
st.write("")
## --- Necessity Index Filtering ---
min_idx = float(df['necessity_index'].min())
max_idx = float(df['necessity_index'].max())
filter_range = st.slider(
"Necessity Index Range", min_value=min_idx, max_value=max_idx, value=(min_idx, max_idx)
)
def filter_all_applications(df, auto_short_df, filter_range):
return df[df['necessity_index'].between(filter_range[0], filter_range[1])]
def filter_not_shortlisted(df, auto_short_df, filter_range):
return df[
(~df.index.isin(auto_short_df.index)) &
(df['necessity_index'].between(filter_range[0], filter_range[1]))
]
filter_map = {
'All applications': filter_all_applications,
'Not shortlisted': filter_not_shortlisted,
}
filtered_df = filter_map[selected_view](df, auto_short_df, filter_range)
## -------- Topic Filtering -------
topic_options = sorted(topics_df['Topic Name'].unique())
selected_topics = st.multiselect("Filter by Topic(s)", options=topic_options, default=[])
if selected_topics:
selected_set = set(selected_topics)
filtered_df = filtered_df[
filtered_df['topics'].apply(lambda topic_list: selected_set.issubset(set(topic_list)))
]
st.markdown(f"**Total Applications:** {len(df)}")
st.markdown(f"**Filtered Applications:** {len(filtered_df)}")
manual_keys = [k for k in st.session_state.keys() if k.startswith("shortlist_")]
manually_shortlisted = [int(k.split("_")[1]) for k in manual_keys if st.session_state[k]]
st.markdown(f"**Manually Shortlisted:** {len(manually_shortlisted)}")
if manually_shortlisted:
csv = df.loc[manually_shortlisted].to_csv(index=False).encode("utf-8")
st.download_button(
"Download Manual Shortlist",
data=csv,
file_name="manual_shortlist.csv",
mime="text/csv",
icon="⬇️",
)
add_vertical_space(4)
st.divider()
st.badge("Version 1.0.0", icon=':material/category:',color='violet')
st.markdown("""
:grey[Made with 🩷 by the AI Innovation Team
Contact: lynn.perez@twinkl.com]
""")
## ====== CREATE TAB SECTIONS =======
tab1, tab2 = st.tabs(["Shortlist Manager","Insights"])
##################################################
# SHORTLIST MANAGER TAB #
##################################################
with tab1:
## =========== AUTOMATIC SHORTLIST =========
st.header("Automatic Shortlist")
csv_auto = auto_short_df.to_csv(index=False).encode("utf-8")
all_processed_data = df.to_csv(index=False).encode("utf-8")
book_candidates = book_candidates_df.to_csv(index=False).encode("utf-8")
topic_descriptions_csv = topics_df.to_csv(index=False).encode("utf-8")
csv_options = {
"Shortlist": (csv_auto, "shortlist.csv"),
"All Processed Data": (all_processed_data, "all_processed.csv"),
"Book Candidates": (book_candidates, "book_candidates.csv"),
"Topic Descriptions": (topic_descriptions_csv, "topic_descriptions.csv"),
}
choice = st.selectbox("Select a file for download", list(csv_options.keys()))
csv_data, file_name = csv_options[choice]
st.download_button(
label=f"Download {choice}",
data=csv_data,
file_name=file_name,
mime="text/csv",
help="This button will download the selected file from above",
icon="⬇️"
)
st.write("")
total_col, shortlistCounter_col, mode_col = st.columns(3)
total_col.metric("Applications Submitted", len(df))
shortlistCounter_col.metric("Shorlist Length", len(auto_short_df))
mode_col.metric("Mode", mode)
shorltist_cols_to_show = [
id_col,
freeform_col,
'book_candidates',
'usage',
'necessity_index',
'urgency_score',
'severity_score',
'vulnerability_score',
'shortlist_score',
'is_heartfelt',
'topics',
]
st.dataframe(auto_short_df.loc[:, shorltist_cols_to_show], hide_index=True)
## ====== APPLICATIONS REVIEW =======
add_vertical_space(2)
st.header("Manual Filtering")
st.info("Use the **side panel** filters to more easily sort through applications that you'd like to review.", icon=':material/info:')
st.write("")
if len(filtered_df) > 0:
st.markdown("#### Filtered Applications")
for idx, row in filtered_df.iterrows():
with st.expander(f"Application {int(row[id_col])}"):
st.write("")
col1, col2, col3, col4 = st.columns(4)
col1.metric("Necessity", f"{row['necessity_index']:.1f}")
col2.metric("Urgency", f"{int(row['urgency_score'])}")
col3.metric("Severity", f"{int(row['severity_score'])}")
col4.metric("Vulnerability", f"{int(row['vulnerability_score'])}")
st.markdown("##### Excerpt")
st.write(row[freeform_col])
# HTML for clean usage items
usage_items = [item for item in row['usage'] if item and item.lower() != 'none']
if usage_items:
st.markdown("##### Usage")
pills_html = "".join(
f"{item}"
for item in usage_items
)
st.html(pills_html)
else:
st.caption("*No usage found*")
topic_items = [item for item in row['topics'] if item and item.lower() != 'none']
if topic_items:
st.markdown("##### Topics")
topic_boxes_html= "".join(
f"{item}"
for item in topic_items
)
st.html(topic_boxes_html)
else:
st.caption("_No topics assigned for this application_")
st.checkbox(
"Add to shortlist",
key=f"shortlist_{idx}"
)
else:
st.markdown(
"""