|
|
import streamlit as st |
|
|
import re |
|
|
import string |
|
|
import torch |
|
|
import spacy |
|
|
|
|
|
from sentence_transformers import SentenceTransformer |
|
|
import nltk |
|
|
from nltk.corpus import stopwords |
|
|
import contractions |
|
|
|
|
|
|
|
|
from sklearn.feature_extraction.text import CountVectorizer |
|
|
from bertopic import BERTopic |
|
|
from bertopic.representation import KeyBERTInspired, MaximalMarginalRelevance, OpenAI, PartOfSpeech |
|
|
import openai |
|
|
import numpy as np |
|
|
|
|
|
OPENAI_API_KEY = st.secrets["OPENAI_API_KEY"] |
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
----------------------------------- |
|
|
Lemmatization & Stopword Removal |
|
|
----------------------------------- |
|
|
|
|
|
""" |
|
|
def topicModeling_preprocessing(df, spacy_model="en_core_web_lg"): |
|
|
|
|
|
base_stopwords = set(stopwords.words('english')) |
|
|
|
|
|
custom_stopwords = { |
|
|
'material', 'materials', 'resources', 'resource', 'activity', |
|
|
'activities', 'sheet', 'sheets', 'worksheet', 'worksheets', |
|
|
'teacher', 'teachers', 'teach', 'high school', 'highschool', |
|
|
'middle school', 'grade', 'grades', 'hs', 'level', 'age', 'ages', |
|
|
'older', 'older kid', 'kid', 'student', "1st", "2nd", "3rd", "4th", '5th', '6th', |
|
|
'7th', '8th', '9th' |
|
|
} |
|
|
|
|
|
stopword_set = base_stopwords.union(custom_stopwords) |
|
|
|
|
|
stopword_pattern = r'\b(?:' + '|'.join(re.escape(word) for word in stopword_set) + r')\b' |
|
|
|
|
|
nlp = spacy.load(spacy_model) |
|
|
|
|
|
def clean_lemmatize_text(text): |
|
|
if not isinstance(text, str): |
|
|
return None |
|
|
|
|
|
text = contractions.fix(text) |
|
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
text = re.sub(stopword_pattern, '', text) |
|
|
|
|
|
doc = nlp(text) |
|
|
tokens = [token.lemma_ for token in doc] |
|
|
|
|
|
clean_text = " ".join(tokens).strip() |
|
|
clean_text = re.sub(r'\s+', ' ', clean_text) |
|
|
|
|
|
return clean_text if clean_text else None |
|
|
|
|
|
|
|
|
df['processedForModeling'] = df['preprocessedBasic'].apply(clean_lemmatize_text) |
|
|
|
|
|
|
|
|
df = df.dropna(subset=['processedForModeling']) |
|
|
|
|
|
return df |
|
|
|
|
|
""" |
|
|
-------------------------- |
|
|
Load Transformer Model |
|
|
-------------------------- |
|
|
""" |
|
|
|
|
|
@st.cache_resource |
|
|
def load_embedding_model(): |
|
|
if torch.cuda.is_available(): |
|
|
device = "cuda" |
|
|
elif torch.backends.mps.is_available(): |
|
|
device = "mps" |
|
|
else: |
|
|
device = "cpu" |
|
|
|
|
|
st.write(f"Using device: {device}") |
|
|
return SentenceTransformer("paraphrase-mpnet-base-v2", device=device) |
|
|
|
|
|
|
|
|
""" |
|
|
------------------------- |
|
|
Batch Embedding Creation |
|
|
------------------------- |
|
|
""" |
|
|
|
|
|
def encode_content_documents(embedding_model, content_documents, batch_size=20): |
|
|
embeddings_batches = [] |
|
|
|
|
|
for i in range(0, len(content_documents), batch_size): |
|
|
batch_docs = content_documents[i:i + batch_size] |
|
|
batch_embeddings = embedding_model.encode(batch_docs, convert_to_numpy=True, show_progress_bar=True) |
|
|
embeddings_batches.append(batch_embeddings) |
|
|
|
|
|
return np.vstack(embeddings_batches) |
|
|
|
|
|
""" |
|
|
----------------------------- |
|
|
Topic Modeling with BERTopic |
|
|
----------------------------- |
|
|
""" |
|
|
|
|
|
try: |
|
|
nltk.data.find("corpora/stopwords") |
|
|
except LookupError: |
|
|
nltk.download("stopwords") |
|
|
|
|
|
stopwords = list(stopwords.words('english')) + [ |
|
|
'activities', |
|
|
'activity', |
|
|
'class', |
|
|
'classroom', |
|
|
'material', |
|
|
'materials', |
|
|
'membership', |
|
|
'memberships', |
|
|
'pupil', |
|
|
'pupils', |
|
|
'resource', |
|
|
'resources', |
|
|
'sheet', |
|
|
'sheets', |
|
|
'student', |
|
|
'students', |
|
|
'subscription', |
|
|
'subscriptions', |
|
|
'subscribe', |
|
|
'subscribed', |
|
|
'recommend', |
|
|
'recommendation', |
|
|
'teach', |
|
|
'teacher', |
|
|
'teachers', |
|
|
'tutor', |
|
|
'tutors', |
|
|
'twinkl', |
|
|
'twinkls', |
|
|
'twinkle', |
|
|
'worksheet', |
|
|
'worksheets', |
|
|
] |
|
|
|
|
|
|
|
|
@st.cache_resource |
|
|
def bertopic_model(docs, embeddings, _embedding_model, _umap_model, _hdbscan_model): |
|
|
|
|
|
main_representation_model = KeyBERTInspired() |
|
|
aspect_representation_model1 = MaximalMarginalRelevance(diversity=.3) |
|
|
|
|
|
|
|
|
client = openai.OpenAI(api_key=OPENAI_API_KEY) |
|
|
prompt = """ |
|
|
I have a topic that contains the following documents: |
|
|
[DOCUMENTS] |
|
|
|
|
|
The topic is described by the following keywords: [KEYWORDS] |
|
|
|
|
|
Based on the information above, extract a short but highly descriptive topic label of at most 5 words. Make sure it is in the following format: |
|
|
|
|
|
topic: <topic label> |
|
|
""" |
|
|
openai_model = OpenAI(client, model="gpt-4o-mini", exponential_backoff=True, chat=True, prompt=prompt) |
|
|
|
|
|
representation_model = { |
|
|
"Main": main_representation_model, |
|
|
"Secondary Representation": aspect_representation_model1, |
|
|
} |
|
|
|
|
|
vectorizer_model = CountVectorizer(min_df=2, max_df=0.60, stop_words=stopwords) |
|
|
|
|
|
seed_topic_list = [ |
|
|
["autism", "special needs", "special education needs", "special education", "adhd", "autistic", "dyslexia", "dyslexic", "sen"], |
|
|
] |
|
|
|
|
|
topic_model = BERTopic( |
|
|
verbose=True, |
|
|
embedding_model=_embedding_model, |
|
|
umap_model=_umap_model, |
|
|
hdbscan_model = _hdbscan_model, |
|
|
vectorizer_model=vectorizer_model, |
|
|
|
|
|
representation_model=representation_model, |
|
|
) |
|
|
|
|
|
topics, probs = topic_model.fit_transform(docs, embeddings) |
|
|
return topic_model, topics, probs |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def merge_specific_topics(topic_model, sentences, |
|
|
cancellation_keywords=["cancel", "cancellation", "cancel", "canceled"], |
|
|
thanks_keywords=["thank", "thanks", "thank you", "thankyou", "ty", "thx"], |
|
|
expensive_keywords=["can't afford", "price", "expensive", "cost"]): |
|
|
|
|
|
|
|
|
topic_info = topic_model.get_topic_info() |
|
|
|
|
|
|
|
|
cancellation_regex = '|'.join(cancellation_keywords) |
|
|
cancellation_topics = topic_info[ |
|
|
topic_info['Name'].str.contains(cancellation_regex, case=False, na=False) |
|
|
]['Topic'].tolist() |
|
|
|
|
|
|
|
|
thanks_regex = '|'.join(thanks_keywords) |
|
|
thanks_topics = topic_info[ |
|
|
topic_info['Name'].str.contains(thanks_regex, case=False, na=False) |
|
|
]['Topic'].tolist() |
|
|
|
|
|
|
|
|
expensive_regex = '|'.join(expensive_keywords) |
|
|
expensive_topics = topic_info[ |
|
|
topic_info['Name'].str.contains(expensive_regex, case=False, na=False) |
|
|
]['Topic'].tolist() |
|
|
|
|
|
|
|
|
cancellation_topics = [t for t in cancellation_topics if t != -1] |
|
|
thanks_topics = [t for t in thanks_topics if t != -1] |
|
|
expensive_topics = [t for t in expensive_topics if t != -1] |
|
|
|
|
|
|
|
|
topics_to_merge = [] |
|
|
|
|
|
if len(cancellation_topics) > 1: |
|
|
print(f"Merging cancellation topics: {cancellation_topics}") |
|
|
topics_to_merge.append(cancellation_topics) |
|
|
|
|
|
if len(thanks_topics) > 1: |
|
|
print(f"Merging thank-you topics: {thanks_topics}") |
|
|
topics_to_merge.append(thanks_topics) |
|
|
|
|
|
if len(expensive_topics) > 1: |
|
|
print(f"Merging expensive topics: {expensive_topics}") |
|
|
topics_to_merge.append(expensive_topics) |
|
|
|
|
|
|
|
|
if topics_to_merge: |
|
|
topic_model.merge_topics(sentences, topics_to_merge) |
|
|
|
|
|
return topic_model |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_df_with_topics(df, mapping, sentence_topics, topic_label_map): |
|
|
topics_by_row = {} |
|
|
for i, row_idx in enumerate(mapping): |
|
|
topic = sentence_topics[i] |
|
|
topics_by_row.setdefault(row_idx, set()).add(topic) |
|
|
|
|
|
updated_df = df.copy() |
|
|
|
|
|
def map_topics(row_idx): |
|
|
topic_ids = topics_by_row.get(row_idx, set()) |
|
|
topic_names = [topic_label_map.get(t, str(t)) for t in topic_ids if t != -1] |
|
|
return ", ".join(sorted(topic_names)) |
|
|
|
|
|
updated_df['Topics'] = updated_df.index.map(map_topics) |
|
|
return updated_df |
|
|
|
|
|
|