monthly_exit_survey_analysis_app / functions /topicModeling_contentRequests.py
lynn-twinkl
Added snippet to downlaod stopwords if not found
e4cb412
import streamlit as st
import re
import string
import torch
import spacy
from sentence_transformers import SentenceTransformer
import nltk
from nltk.corpus import stopwords
import contractions
from sklearn.feature_extraction.text import CountVectorizer
from bertopic import BERTopic
from bertopic.representation import KeyBERTInspired, MaximalMarginalRelevance, OpenAI, PartOfSpeech
import openai
import numpy as np
OPENAI_API_KEY = st.secrets["OPENAI_API_KEY"]
"""
-----------------------------------
Lemmatization & Stopword Removal
-----------------------------------
"""
def topicModeling_preprocessing(df, spacy_model="en_core_web_lg"):
base_stopwords = set(stopwords.words('english'))
custom_stopwords = {
'material', 'materials', 'resources', 'resource', 'activity',
'activities', 'sheet', 'sheets', 'worksheet', 'worksheets',
'teacher', 'teachers', 'teach', 'high school', 'highschool',
'middle school', 'grade', 'grades', 'hs', 'level', 'age', 'ages',
'older', 'older kid', 'kid', 'student', "1st", "2nd", "3rd", "4th", '5th', '6th',
'7th', '8th', '9th'
}
stopword_set = base_stopwords.union(custom_stopwords)
stopword_pattern = r'\b(?:' + '|'.join(re.escape(word) for word in stopword_set) + r')\b'
nlp = spacy.load(spacy_model)
def clean_lemmatize_text(text):
if not isinstance(text, str):
return None
text = contractions.fix(text)
text = re.sub(r'\s+', ' ', text).strip()
text = re.sub(stopword_pattern, '', text)
doc = nlp(text)
tokens = [token.lemma_ for token in doc]
clean_text = " ".join(tokens).strip()
clean_text = re.sub(r'\s+', ' ', clean_text)
return clean_text if clean_text else None
df['processedForModeling'] = df['preprocessedBasic'].apply(clean_lemmatize_text)
# Drop rows where cleaned text is empty or None
df = df.dropna(subset=['processedForModeling'])
return df
"""
--------------------------
Load Transformer Model
--------------------------
"""
@st.cache_resource
def load_embedding_model():
if torch.cuda.is_available():
device = "cuda"
elif torch.backends.mps.is_available():
device = "mps"
else:
device = "cpu"
st.write(f"Using device: {device}")
return SentenceTransformer("paraphrase-mpnet-base-v2", device=device)
"""
-------------------------
Batch Embedding Creation
-------------------------
"""
def encode_content_documents(embedding_model, content_documents, batch_size=20):
embeddings_batches = []
for i in range(0, len(content_documents), batch_size):
batch_docs = content_documents[i:i + batch_size]
batch_embeddings = embedding_model.encode(batch_docs, convert_to_numpy=True, show_progress_bar=True)
embeddings_batches.append(batch_embeddings)
return np.vstack(embeddings_batches)
"""
-----------------------------
Topic Modeling with BERTopic
-----------------------------
"""
try:
nltk.data.find("corpora/stopwords")
except LookupError:
nltk.download("stopwords")
stopwords = list(stopwords.words('english')) + [
'activities',
'activity',
'class',
'classroom',
'material',
'materials',
'membership',
'memberships',
'pupil',
'pupils',
'resource',
'resources',
'sheet',
'sheets',
'student',
'students',
'subscription',
'subscriptions',
'subscribe',
'subscribed',
'recommend',
'recommendation',
'teach',
'teacher',
'teachers',
'tutor',
'tutors',
'twinkl',
'twinkls',
'twinkle',
'worksheet',
'worksheets',
]
######### --------------- BERTOPIC ----------------- #############
@st.cache_resource
def bertopic_model(docs, embeddings, _embedding_model, _umap_model, _hdbscan_model):
main_representation_model = KeyBERTInspired()
aspect_representation_model1 = MaximalMarginalRelevance(diversity=.3)
# OpenAI Representation Model
client = openai.OpenAI(api_key=OPENAI_API_KEY)
prompt = """
I have a topic that contains the following documents:
[DOCUMENTS]
The topic is described by the following keywords: [KEYWORDS]
Based on the information above, extract a short but highly descriptive topic label of at most 5 words. Make sure it is in the following format:
topic: <topic label>
"""
openai_model = OpenAI(client, model="gpt-4o-mini", exponential_backoff=True, chat=True, prompt=prompt)
representation_model = {
"Main": main_representation_model,
"Secondary Representation": aspect_representation_model1,
}
vectorizer_model = CountVectorizer(min_df=2, max_df=0.60, stop_words=stopwords)
seed_topic_list = [
["autism", "special needs", "special education needs", "special education", "adhd", "autistic", "dyslexia", "dyslexic", "sen"],
]
topic_model = BERTopic(
verbose=True,
embedding_model=_embedding_model,
umap_model=_umap_model,
hdbscan_model = _hdbscan_model,
vectorizer_model=vectorizer_model,
#seed_topic_list = seed_topic_list,
representation_model=representation_model,
)
topics, probs = topic_model.fit_transform(docs, embeddings)
return topic_model, topics, probs
##################################
# TOPIC MERGING
##################################
def merge_specific_topics(topic_model, sentences,
cancellation_keywords=["cancel", "cancellation", "cancel", "canceled"],
thanks_keywords=["thank", "thanks", "thank you", "thankyou", "ty", "thx"],
expensive_keywords=["can't afford", "price", "expensive", "cost"]):
topic_info = topic_model.get_topic_info()
# Identify cancellation-related topics by checking if any cancellation keyword appears in the topic name.
cancellation_regex = '|'.join(cancellation_keywords)
cancellation_topics = topic_info[
topic_info['Name'].str.contains(cancellation_regex, case=False, na=False)
]['Topic'].tolist()
# Identify thank-you-related topics similarly.
thanks_regex = '|'.join(thanks_keywords)
thanks_topics = topic_info[
topic_info['Name'].str.contains(thanks_regex, case=False, na=False)
]['Topic'].tolist()
# Identify expensive-related topics.
expensive_regex = '|'.join(expensive_keywords)
expensive_topics = topic_info[
topic_info['Name'].str.contains(expensive_regex, case=False, na=False)
]['Topic'].tolist()
# Exclude the outlier topic (-1) if it appears.
cancellation_topics = [t for t in cancellation_topics if t != -1]
thanks_topics = [t for t in thanks_topics if t != -1]
expensive_topics = [t for t in expensive_topics if t != -1]
# Create a list of topics to merge
topics_to_merge = []
if len(cancellation_topics) > 1:
print(f"Merging cancellation topics: {cancellation_topics}")
topics_to_merge.append(cancellation_topics)
if len(thanks_topics) > 1:
print(f"Merging thank-you topics: {thanks_topics}")
topics_to_merge.append(thanks_topics)
if len(expensive_topics) > 1:
print(f"Merging expensive topics: {expensive_topics}")
topics_to_merge.append(expensive_topics)
# Call merge_topics
if topics_to_merge:
topic_model.merge_topics(sentences, topics_to_merge)
return topic_model
##################################
# Topic to Dataframe Mapping
#################################
def update_df_with_topics(df, mapping, sentence_topics, topic_label_map):
topics_by_row = {}
for i, row_idx in enumerate(mapping):
topic = sentence_topics[i]
topics_by_row.setdefault(row_idx, set()).add(topic)
updated_df = df.copy()
def map_topics(row_idx):
topic_ids = topics_by_row.get(row_idx, set())
topic_names = [topic_label_map.get(t, str(t)) for t in topic_ids if t != -1]
return ", ".join(sorted(topic_names))
updated_df['Topics'] = updated_df.index.map(map_topics)
return updated_df