Spaces:
Build error
Build error
| import streamlit as st | |
| from gensim.models import FastText, KeyedVectors | |
| import re | |
| from gensim.utils import simple_preprocess | |
| import time | |
| import os | |
| import zipfile | |
| import io | |
| import tempfile | |
| import numpy as np | |
| from concurrent.futures import ThreadPoolExecutor | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from transformers import MarianTokenizer, MarianMTModel | |
| # Function to preprocess text | |
| def preprocess_text(text): | |
| text = text.lower() # Lowercase | |
| text = re.sub(r'[^\w\s]', '', text) # Remove punctuation | |
| return simple_preprocess(text) | |
| # Function to read and preprocess the corpus from an uploaded file | |
| def read_corpus(file): | |
| for line in file: | |
| yield preprocess_text(line.decode('utf-8')) | |
| # Function to zip the model files in memory | |
| def zip_model(model): | |
| zip_buffer = io.BytesIO() | |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| model.save(os.path.join(temp_dir, "fasttext_model.model")) | |
| model.wv.save(os.path.join(temp_dir, "fasttext_model_vectors.kv")) | |
| np.save(os.path.join(temp_dir, "fasttext_model.model.wv.vectors_ngrams.npy"), model.wv.vectors_ngrams) | |
| np.save(os.path.join(temp_dir, "fasttext_model_vectors.kv.vectors_ngrams.npy"), model.wv.vectors_ngrams) | |
| for root, dirs, files in os.walk(temp_dir): | |
| for file in files: | |
| file_path = os.path.join(root, file) | |
| arcname = os.path.relpath(file_path, start=temp_dir) | |
| zipf.write(file_path, arcname=arcname) | |
| zip_buffer.seek(0) | |
| return zip_buffer | |
| # Function to clean a chunk of text | |
| def clean_text_chunk(chunk): | |
| lines = chunk.split('\n') | |
| cleaned_lines = [] | |
| for line in lines: | |
| if len(re.findall(r'\b\w+\b', line.lower())) >= 5: | |
| tokens = re.findall(r'\b\w+\b', line.lower()) | |
| cleaned_line = ' '.join(tokens) | |
| cleaned_line = re.sub(r'[^a-zA-Z\s]', '', cleaned_line) | |
| cleaned_lines.append(cleaned_line) | |
| return '\n'.join(cleaned_lines) | |
| # Function to clean text using multithreading | |
| def clean_text_multithreaded(text): | |
| chunks = text.split('\n\n') | |
| with ThreadPoolExecutor() as executor: | |
| cleaned_chunks = list(executor.map(clean_text_chunk, chunks)) | |
| return '\n'.join(cleaned_chunks) | |
| # Function to load the FastText model from the specified folder | |
| def load_fasttext_model(model_folder): | |
| model_file = os.path.join(model_folder, "shona_fasttext_50d.model") | |
| vectors_file = os.path.join(model_folder, "shona_fasttext_vectors_50d.kv") | |
| model = FastText.load(model_file) | |
| model.wv = KeyedVectors.load(vectors_file) | |
| return model | |
| # Function to generate embeddings for a given word | |
| def generate_word_embedding(word, model): | |
| return model.wv.get_vector(word, norm=True) if word in model.wv else None | |
| # Function to find similar words | |
| def find_similar_words(word, model, topn=5): | |
| return model.wv.most_similar(word, topn=topn) if word in model.wv else [] | |
| # Function to tokenize a sentence using the given pattern | |
| def tokenize_sentence(sentence, pattern): | |
| tokens = re.findall(pattern, sentence) | |
| return [token.strip() for token in tokens if token.strip()] | |
| # Function to generate embeddings for words in a sentence | |
| def generate_embeddings_for_sentence(sentence, model, pattern): | |
| tokens = tokenize_sentence(sentence, pattern) | |
| embeddings = [] | |
| for token in tokens: | |
| if token in model.wv: | |
| embeddings.append((token, model.wv[token])) | |
| return embeddings | |
| # Function to generate embedding for a sentence | |
| def generate_sentence_embedding(sentence, model, pattern): | |
| word_embeddings = generate_embeddings_for_sentence(sentence, model, pattern) | |
| if not word_embeddings: | |
| return None | |
| return np.mean([embedding for _, embedding in word_embeddings], axis=0) | |
| # Function to generate embeddings for sentences | |
| def generate_sentence_embeddings(sentences, model, pattern): | |
| return [generate_sentence_embedding(sentence, model, pattern) for sentence in sentences] | |
| # Function to load the translation model and tokenizer | |
| def load_translation_model(model_folder): | |
| model_path = os.path.join(model_folder) | |
| tokenizer = MarianTokenizer.from_pretrained(model_path) | |
| model = MarianMTModel.from_pretrained(model_path) | |
| return tokenizer, model | |
| # Function to perform translation | |
| def translate_text(text, tokenizer, model): | |
| inputs = tokenizer.encode(text, return_tensors="pt", padding=True) | |
| translated_tokens = model.generate(inputs, max_length=512) | |
| translated_text = tokenizer.decode(translated_tokens[0], skip_special_tokens=True) | |
| return translated_text | |
| # Pages for Generate Embeddings | |
| def generate_word_embedding_page(model): | |
| st.subheader("Generate Word Embedding") | |
| word = st.text_input("Enter a word:") | |
| if word: | |
| embedding = generate_word_embedding(word, model) | |
| if embedding is not None: | |
| st.write(f"Embedding for '{word}':", embedding) | |
| else: | |
| st.write(f"'{word}' not in vocabulary") | |
| def find_similar_words_page(model): | |
| st.subheader("Find Similar Words") | |
| word_for_similar = st.text_input("Enter a word to find similar words:") | |
| if word_for_similar: | |
| similar_words = find_similar_words(word_for_similar, model) | |
| if similar_words: | |
| st.write("Similar words:") | |
| for word, similarity in similar_words: | |
| st.write(f"{word}: {similarity}") | |
| else: | |
| st.write(f"No similar words found for '{word_for_similar}'") | |
| def generate_embeddings_for_sentence_page(model): | |
| st.subheader("Generate Embeddings for Words in a Sentence") | |
| sentence = st.text_input("Enter a sentence:") | |
| if sentence: | |
| word_embeddings = generate_embeddings_for_sentence(sentence, model, r'\b\w+\b') | |
| if word_embeddings: | |
| for word, embedding in word_embeddings: | |
| st.write(f"'{word}' embedding:", embedding) | |
| else: | |
| st.write("No embeddings could be generated for the words in the sentence.") | |
| def generate_sentence_embedding_page(model): | |
| st.subheader("Generate Embedding for a Sentence") | |
| sentence_for_embedding = st.text_input("Enter a sentence to generate its embedding:") | |
| if sentence_for_embedding: | |
| sentence_embedding = generate_sentence_embedding(sentence_for_embedding, model, r'\b\w+\b') | |
| if sentence_embedding is not None: | |
| st.write("Sentence embedding:", sentence_embedding) | |
| else: | |
| st.write("No embedding could be generated for the sentence.") | |
| def find_most_similar_sentence_pairs_page(model): | |
| st.subheader("Find Most Similar Sentence Pairs") | |
| uploaded_sentences_file = st.file_uploader("Upload a text file with sentences (one per line)", type=["txt"]) | |
| if uploaded_sentences_file: | |
| sentences = uploaded_sentences_file.read().decode('utf-8').splitlines() | |
| sentence_embeddings = generate_sentence_embeddings(sentences, model, r'\b\w+\b') | |
| sentence_pairs = [] | |
| for i in range(len(sentences)): | |
| for j in range(i + 1, len(sentences)): | |
| if sentence_embeddings[i] is not None and sentence_embeddings[j] is not None: | |
| similarity = cosine_similarity([sentence_embeddings[i]], [sentence_embeddings[j]])[0][0] | |
| sentence_pairs.append((sentences[i], sentences[j], similarity)) | |
| sentence_pairs = sorted(sentence_pairs, key=lambda x: x[2], reverse=True) | |
| st.write("Most similar sentence pairs:") | |
| for sent1, sent2, sim in sentence_pairs[:5]: | |
| st.write(f"Sentence 1: {sent1}") | |
| st.write(f"Sentence 2: {sent2}") | |
| st.write(f"Similarity: {sim}") | |
| st.write("-----") | |
| # Page to search similar information from the document | |
| def search_similar_information_page(model): | |
| st.subheader("Search Similar Information from Document") | |
| uploaded_file = st.file_uploader("Upload a text file", type=["txt"]) | |
| if uploaded_file: | |
| document_text = uploaded_file.read().decode('utf-8').splitlines() | |
| document_sentences = [line for line in document_text if line.strip()] | |
| if document_sentences: | |
| search_query = st.text_input("Enter search query:") | |
| if search_query: | |
| query_embedding = generate_sentence_embedding(search_query, model, r'\b\w+\b') | |
| if query_embedding is not None: | |
| document_embeddings = generate_sentence_embeddings(document_sentences, model, r'\b\w+\b') | |
| similarities = [ | |
| (sentence, cosine_similarity([query_embedding], [embedding])[0][0]) | |
| for sentence, embedding in zip(document_sentences, document_embeddings) | |
| if embedding is not None | |
| ] | |
| similarities = sorted(similarities, key=lambda x: x[1], reverse=True) | |
| st.write("Most similar sentences in the document:") | |
| for sentence, sim in similarities[:5]: | |
| st.write(f"Sentence: {sentence}") | |
| st.write(f"Similarity: {sim}") | |
| st.write("-----") | |
| else: | |
| st.write("No embedding could be generated for the search query.") | |
| # Page for translation | |
| def translate_text_page(tokenizer_shona_to_english, model_shona_to_english, tokenizer_english_to_shona, model_english_to_shona): | |
| st.subheader("Translate Text") | |
| translation_direction = st.radio("Select translation direction", ("Shona to English", "English to Shona")) | |
| if translation_direction == "Shona to English": | |
| text_to_translate = st.text_area("Enter Shona text to translate:") | |
| if text_to_translate: | |
| translated_text = translate_text(text_to_translate, tokenizer_shona_to_english, model_shona_to_english) | |
| st.write("Translated text:") | |
| st.write(translated_text) | |
| else: | |
| text_to_translate = st.text_area("Enter English text to translate:") | |
| if text_to_translate: | |
| translated_text = translate_text(text_to_translate, tokenizer_english_to_shona, model_english_to_shona) | |
| st.write("Translated text:") | |
| st.write(translated_text) | |
| # Streamlit app | |
| def main(): | |
| st.title("Text Processing and FastText Word Embedding Trainer") | |
| # Load models if not already loaded | |
| if 'fasttext_model' not in st.session_state: | |
| st.session_state['fasttext_model'] = load_fasttext_model("Fast_text_50_dim") | |
| if 'translation_model_shona_to_english' not in st.session_state: | |
| st.session_state['tokenizer_shona_to_english'], st.session_state['translation_model_shona_to_english'] = load_translation_model("fine_tuned_shona_to_english_model") | |
| if 'translation_model_english_to_shona' not in st.session_state: | |
| st.session_state['tokenizer_english_to_shona'], st.session_state['translation_model_english_to_shona'] = load_translation_model("english_shona_model") | |
| fasttext_model = st.session_state['fasttext_model'] | |
| tokenizer_shona_to_english = st.session_state['tokenizer_shona_to_english'] | |
| translation_model_shona_to_english = st.session_state['translation_model_shona_to_english'] | |
| tokenizer_english_to_shona = st.session_state['tokenizer_english_to_shona'] | |
| translation_model_english_to_shona = st.session_state['translation_model_english_to_shona'] | |
| st.sidebar.title("Options") | |
| option = st.sidebar.radio("Select an option", ("Clean Dataset", "Train Word Embedding", "Generate Embeddings", "Translate Text")) | |
| if option == "Clean Dataset": | |
| st.header("Clean Text Dataset") | |
| uploaded_file = st.file_uploader("Upload Raw Text File", type=["txt"]) | |
| if uploaded_file is not None: | |
| raw_text = uploaded_file.read().decode('utf-8') | |
| if st.button("Clean Dataset"): | |
| try: | |
| cleaned_text = clean_text_multithreaded(raw_text) | |
| st.write("Dataset cleaned successfully!") | |
| cleaned_file = io.BytesIO(cleaned_text.encode('utf-8')) | |
| st.download_button(label="Download Cleaned Dataset", data=cleaned_file, file_name="cleaned_dataset.txt", mime="text/plain") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| st.error("Check the server logs for more details.") | |
| elif option == "Train Word Embedding": | |
| st.header("Train FastText Word Embedding") | |
| uploaded_file = st.file_uploader("Upload Cleaned Text File", type=["txt"]) | |
| if uploaded_file is not None: | |
| vector_size = st.number_input("Select Embedding Dimensions", min_value=10, max_value=500, value=50, step=10) | |
| if st.button("Train FastText Model"): | |
| try: | |
| sentences = list(read_corpus(uploaded_file)) | |
| start_time = time.time() | |
| model = FastText(sentences, vector_size=vector_size, window=7, min_count=5, workers=4, sg=1, epochs=100, bucket=2000000, min_n=3, max_n=6) | |
| end_time = time.time() | |
| elapsed_time = end_time - start_time | |
| st.write("Time taken: {:.2f} minutes".format(elapsed_time / 60)) | |
| st.write("Model trained successfully!") | |
| zip_buffer = zip_model(model) | |
| st.download_button(label="Download Model", data=zip_buffer, file_name="fasttext_model.zip", mime="application/zip") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| st.error("Check the server logs for more details.") | |
| elif option == "Generate Embeddings": | |
| st.header("Generate Embeddings with Pretrained FastText Model") | |
| embedding_option = st.sidebar.selectbox("Select an embedding operation", ("Generate Word Embedding", "Find Similar Words", "Generate Embeddings for Words in a Sentence", "Generate Embedding for a Sentence", "Find Most Similar Sentence Pairs", "Search Similar Information")) | |
| if embedding_option == "Generate Word Embedding": | |
| generate_word_embedding_page(fasttext_model) | |
| elif embedding_option == "Find Similar Words": | |
| find_similar_words_page(fasttext_model) | |
| elif embedding_option == "Generate Embeddings for Words in a Sentence": | |
| generate_embeddings_for_sentence_page(fasttext_model) | |
| elif embedding_option == "Generate Embedding for a Sentence": | |
| generate_sentence_embedding_page(fasttext_model) | |
| elif embedding_option == "Find Most Similar Sentence Pairs": | |
| find_most_similar_sentence_pairs_page(fasttext_model) | |
| elif embedding_option == "Search Similar Information": | |
| search_similar_information_page(fasttext_model) | |
| elif option == "Translate Text": | |
| st.header("Translate Text") | |
| translate_text_page(tokenizer_shona_to_english, translation_model_shona_to_english, tokenizer_english_to_shona, translation_model_english_to_shona) | |
| if __name__ == "__main__": | |
| main() | |