Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import spacy | |
| import numpy as np | |
| from gensim import corpora, models | |
| from itertools import chain | |
| from sklearn.preprocessing import MultiLabelBinarizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from itertools import islice | |
| from scipy.signal import argrelmax | |
| nlp = spacy.load('en_core_web_sm') | |
| def window(seq, n=3): | |
| it = iter(seq) | |
| result = tuple(islice(it, n)) | |
| if len(result) == n: | |
| yield result | |
| for elem in it: | |
| result = result[1:] + (elem,) | |
| yield result | |
| def get_depths(scores): | |
| def climb(seq, i, mode='left'): | |
| if mode == 'left': | |
| while True: | |
| curr = seq[i] | |
| if i == 0: | |
| return curr | |
| i = i-1 | |
| if not seq[i] > curr: | |
| return curr | |
| if mode == 'right': | |
| while True: | |
| curr = seq[i] | |
| if i == (len(seq)-1): | |
| return curr | |
| i = i+1 | |
| if not seq[i] > curr: | |
| return curr | |
| depths = [] | |
| for i in range(len(scores)): | |
| score = scores[i] | |
| l_peak = climb(scores, i, mode='left') | |
| r_peak = climb(scores, i, mode='right') | |
| depth = 0.5 * (l_peak + r_peak - (2*score)) | |
| depths.append(depth) | |
| return np.array(depths) | |
| def get_local_maxima(depth_scores, order=1): | |
| maxima_ids = argrelmax(depth_scores, order=order)[0] | |
| filtered_scores = np.zeros(len(depth_scores)) | |
| filtered_scores[maxima_ids] = depth_scores[maxima_ids] | |
| return filtered_scores | |
| def compute_threshold(scores): | |
| s = scores[np.nonzero(scores)] | |
| threshold = np.mean(s) - (np.std(s) / 2) | |
| return threshold | |
| def get_threshold_segments(scores, threshold=0.1): | |
| segment_ids = np.where(scores >= threshold)[0] | |
| return segment_ids | |
| def print_list(lst): | |
| for e in lst: | |
| st.markdown("- " + e) | |
| st.subheader("Topic Modeling with Segmentation") | |
| uploaded_file = st.file_uploader("choose a text file", type=["txt"]) | |
| if uploaded_file is not None: | |
| st.session_state["text"] = uploaded_file.getvalue().decode('utf-8') | |
| st.write("OR") | |
| input_text = st.text_area( | |
| label="Enter text separated by newlines", | |
| value="", | |
| key="text", | |
| height=150 | |
| ) | |
| button=st.button('Get Segments') | |
| if (button==True) and input_text != "": | |
| texts = input_text.split('\n') | |
| sents = [] | |
| for text in texts: | |
| doc = nlp(text) | |
| for sent in doc.sents: | |
| sents.append(sent) | |
| MIN_LENGTH = 3 | |
| tokenized_sents = [[token.lemma_.lower() for token in sent if | |
| not token.is_stop and not token.is_punct and token.text.strip() and len(token) >= MIN_LENGTH] | |
| for sent in sents] | |
| st.write("Modeling topics:") | |
| np.random.seed(123) | |
| N_TOPICS = 5 | |
| N_PASSES = 5 | |
| dictionary = corpora.Dictionary(tokenized_sents) | |
| bow = [dictionary.doc2bow(sent) for sent in tokenized_sents] | |
| topic_model = models.LdaModel(corpus=bow, id2word=dictionary, num_topics=N_TOPICS, passes=N_PASSES) | |
| st.write("inferring topics ...") | |
| THRESHOLD = 0.05 | |
| doc_topics = list(topic_model.get_document_topics(bow, minimum_probability=THRESHOLD)) | |
| k = 3 | |
| top_k_topics = [[t[0] for t in sorted(sent_topics, key=lambda x: x[1], reverse=True)][:k] | |
| for sent_topics in doc_topics] | |
| WINDOW_SIZE = 3 | |
| window_topics = window(top_k_topics, n=WINDOW_SIZE) | |
| window_topics = [list(set(chain.from_iterable(window))) for window in window_topics] | |
| binarizer = MultiLabelBinarizer(classes=range(N_TOPICS)) | |
| encoded_topic = binarizer.fit_transform(window_topics) | |
| st.write("generating segments ...") | |
| sims_topic = [cosine_similarity([pair[0]], [pair[1]])[0][0] for pair in zip(encoded_topic, encoded_topic[1:])] | |
| depths_topic = get_depths(sims_topic) | |
| filtered_topic = get_local_maxima(depths_topic, order=1) | |
| threshold_topic = compute_threshold(filtered_topic) | |
| threshold_segments_topic = get_threshold_segments(filtered_topic, threshold_topic) | |
| segment_ids = threshold_segments_topic + WINDOW_SIZE | |
| segment_ids = [0] + segment_ids.tolist() + [len(sents)] | |
| slices = list(zip(segment_ids[:-1], segment_ids[1:])) | |
| segmented = [sents[s[0]: s[1]] for s in slices] | |
| for segment in segmented[:-1]: | |
| print_list([s.text for s in segment]) | |
| st.markdown("""---""") | |
| print_list([s.text for s in segmented[-1]]) |