import streamlit as st import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics import pairwise_distances from rouge_score import rouge_scorer import gensim.downloader as api from sentence_transformers import SentenceTransformer from scipy.spatial.distance import cosine import PyPDF2 import spacy try: nlp = spacy.load("en_core_web_sm") except OSError: from spacy.cli import download download("en_core_web_sm") nlp = spacy.load("en_core_web_sm") from difflib import SequenceMatcher # Load spaCy model nlp = spacy.load('en_core_web_sm') # Load stop words from spaCy stop_words = set(nlp.Defaults.stop_words) # Initialize models @st.cache_resource def load_models(): model = SentenceTransformer('all-mpnet-base-v2') tfidf_vectorizer = TfidfVectorizer() word2vec_model = api.load("word2vec-google-news-300") # Load Word2Vec model return model, tfidf_vectorizer, word2vec_model model, tfidf_vectorizer, word2vec_model = load_models() # Initialize session state for results table if not already present if 'results_df' not in st.session_state: st.session_state.results_df = pd.DataFrame(columns=[ "LLM1", "LLM2", "Paraphrasing Similarity (%)", "Direct Text Comparison (%)", "Summarization Similarity (%)", "Combined Similarity (%)" ]) # Initialize session state for radar chart data if 'radar_chart_data' not in st.session_state: st.session_state.radar_chart_data = [] # Functions (same as before) @st.cache_data def chunk_text(text, chunk_size=500): return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)] @st.cache_data def create_embeddings(chunks): try: embeddings = model.encode(chunks, show_progress_bar=False) return embeddings except Exception as e: st.error(f"Error creating embeddings: {e}") return np.array([]) @st.cache_data def calculate_similarity_ratio_and_find_matches(embeddings1, embeddings2): try: similarities = np.dot(embeddings1, embeddings2.T) # Dot product max_similarities = np.max(similarities, axis=1) # Max similarity for each chunk in embeddings1 average_similarity = np.mean(max_similarities) return similarities, average_similarity except Exception as e: st.error(f"Error calculating similarity ratio: {e}") return np.array([]), 0 @st.cache_data def calculate_word_similarity_ratio(text1, text2): try: doc1 = nlp(text1) doc2 = nlp(text2) words1 = [token.text for token in doc1 if not token.is_stop and not token.is_punct] words2 = [token.text for token in doc2 if not token.is_stop and not token.is_punct] if not words1 or not words2: return 0 word_embeddings1 = model.encode(words1) word_embeddings2 = model.encode(words2) similarities = np.array([ max([1 - cosine(emb1, emb2) for emb2 in word_embeddings2], default=0) for emb1 in word_embeddings1 ]) average_word_similarity = np.mean(similarities) if similarities.size > 0 else 0 return average_word_similarity except Exception as e: st.error(f"Error calculating word similarity ratio: {e}") return 0 @st.cache_data def calculate_bleu_score(reference, candidate): from nltk.translate.bleu_score import sentence_bleu return sentence_bleu([reference.split()], candidate.split()) @st.cache_data def calculate_rouge_l_score(reference, candidate): scorer = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True) scores = scorer.score(reference, candidate) return scores['rougeL'].fmeasure * 100 @st.cache_data def calculate_bertscore(reference, candidate): import bert_score P, R, F1 = bert_score.score([candidate], [reference], model_type='bert-base-uncased') return F1.mean().item() * 100 @st.cache_data def calculate_wmd(reference, candidate): doc1 = nlp(reference.lower()) doc2 = nlp(candidate.lower()) reference_tokens = [token.text for token in doc1 if not token.is_stop and not token.is_punct] candidate_tokens = [token.text for token in doc2 if not token.is_stop and not token.is_punct] return word2vec_model.wmdistance(reference_tokens, candidate_tokens) @st.cache_data def extract_pdf_text(pdf_file): try: reader = PyPDF2.PdfReader(pdf_file) text = "" for page in reader.pages: text += page.extract_text() return text except Exception as e: st.error(f"Error extracting text from PDF: {e}") return "" @st.cache_data def calculate_levenshtein_ratio(text1, text2): return SequenceMatcher(None, text1, text2).ratio() @st.cache_data def calculate_jaccard_similarity(text1, text2): from sklearn.feature_extraction.text import CountVectorizer vectorizer = CountVectorizer(binary=True).fit_transform([text1, text2]) vectors = vectorizer.toarray() intersection = np.sum(np.minimum(vectors[0], vectors[1])) union = np.sum(np.maximum(vectors[0], vectors[1])) return intersection / union if union != 0 else 0 @st.cache_data def calculate_tfidf_cosine_similarity(text1, text2): tfidf_matrix = tfidf_vectorizer.fit_transform([text1, text2]) return 1 - pairwise_distances(tfidf_matrix, metric='cosine')[0, 1] @st.cache_data def calculate_paraphrasing_similarity(text1, text2): try: chunks_1 = chunk_text(text1) chunks_2 = chunk_text(text2) embeddings_1 = create_embeddings(chunks_1) embeddings_2 = create_embeddings(chunks_2) if embeddings_1.size > 0 and embeddings_2.size > 0: similarities, average_similarity = calculate_similarity_ratio_and_find_matches(embeddings_1, embeddings_2) return average_similarity * 100 return 0 except Exception as e: st.error(f"Error calculating paraphrasing similarity: {e}") return 0 @st.cache_data def calculate_direct_text_comparison_similarity(text1, text2): try: levenshtein_ratio = calculate_levenshtein_ratio(text1, text2) * 100 jaccard_similarity = calculate_jaccard_similarity(text1, text2) * 100 tfidf_cosine_similarity = calculate_tfidf_cosine_similarity(text1, text2) * 100 bleu_score = calculate_bleu_score(text1, text2) * 100 rouge_l_score = calculate_rouge_l_score(text1, text2) bertscore = calculate_bertscore(text1, text2) return (levenshtein_ratio * 0.1 + jaccard_similarity * 0.2 + tfidf_cosine_similarity * 0.2 + bleu_score * 0.2 + rouge_l_score * 0.2 + bertscore * 0.2) / 1.1 except Exception as e: st.error(f"Error calculating direct text comparison similarity: {e}") return 0 @st.cache_data def calculate_summarization_similarity(text1, text2): try: wmd = calculate_wmd(text1, text2) return (1 - wmd) * 100 except Exception as e: st.error(f"Error calculating summarization similarity: {e}") return 0 # Streamlit UI st.title("Text-Based Similarity Comparison") st.markdown("*Use in wide mode*") # Create a two-column layout for input col1, col2 = st.columns([2, 1]) with col1: st.sidebar.title("LLM Details") llm1_name = st.sidebar.text_input("What is LLM1?", "LLM1") llm2_name = st.sidebar.text_input("What is LLM2?", "LLM2") st.write("## Input") # Create two columns for text input input_col1, input_col2 = st.columns(2) with input_col1: st.write(f"{llm1_name} response") upload_pdf_1 = st.file_uploader(f"Upload PDF for {llm1_name} response", type="pdf", key="pdf1") if upload_pdf_1: text_input_1 = extract_pdf_text(upload_pdf_1) else: text_input_1 = st.text_area(f"Text for {llm1_name}", height=150, key="text1") with input_col2: st.write(f"{llm2_name} response") upload_pdf_2 = st.file_uploader(f"Upload PDF for {llm2_name} response", type="pdf", key="pdf2") if upload_pdf_2: text_input_2 = extract_pdf_text(upload_pdf_2) else: text_input_2 = st.text_area(f"Text for {llm2_name}", height=150, key="text2") if (text_input_1 and text_input_2) or (upload_pdf_1 and upload_pdf_2): if st.button("Submit"): # Calculate similarity metrics paraphrasing_similarity = calculate_paraphrasing_similarity(text_input_1, text_input_2) direct_text_comparison_similarity = calculate_direct_text_comparison_similarity(text_input_1, text_input_2) summarization_similarity = calculate_summarization_similarity(text_input_1, text_input_2) if summarization_similarity<0: summarization_similarity=0 if direct_text_comparison_similarity<0: direct_text_comparison_similarity=0 # Combine all metrics into a single similarity score total_similarity = (paraphrasing_similarity * 0.6 + # High weight direct_text_comparison_similarity * 0.3 + # Moderate weight summarization_similarity * 0.1) # Low weight # Update results table in session state new_row = pd.Series({ "LLM1": llm1_name, "LLM2": llm2_name, "Paraphrasing Similarity (%)": paraphrasing_similarity, "Direct Text Comparison (%)": direct_text_comparison_similarity, "Summarization Similarity (%)": summarization_similarity, "Combined Similarity (%)": total_similarity }) st.session_state.results_df = pd.concat([st.session_state.results_df, new_row.to_frame().T], ignore_index=True) # Add new data for radar chart st.session_state.radar_chart_data.append({ "name": f"{llm1_name} vs {llm2_name}", "paraphrasing_similarity": paraphrasing_similarity, "direct_text_comparison_similarity": direct_text_comparison_similarity, "summarization_similarity": summarization_similarity }) # Display metrics with large and bold text # Define a style for the combined score combined_score_style = """ """ good_case = """ """ bad_case = """ """ # Apply the style st.markdown(combined_score_style, unsafe_allow_html=True) st.markdown(good_case, unsafe_allow_html=True) st.markdown(bad_case, unsafe_allow_html=True) # Display the combined similarity score st.markdown(f'
Combined Similarity Score: {total_similarity:.2f}%
', unsafe_allow_html=True) # Calculate context-words difference context_words_diff = int(paraphrasing_similarity) - int(direct_text_comparison_similarity) # Display distinguishing factor if total_similarity >= 100: st.markdown(f'
Similar Responses
', unsafe_allow_html=True) elif total_similarity >= 55: if context_words_diff >= 42 and context_words_diff < 57.08: st.markdown(f'
Similar Responses
', unsafe_allow_html=True) elif context_words_diff > 35: st.markdown(f'
Response 2 is better.
', unsafe_allow_html=True) else: st.markdown(f'
Similar Responses
', unsafe_allow_html=True) else: st.markdown(f'
Similar Responses
', unsafe_allow_html=True) with col2: # Display radar chart if st.session_state.radar_chart_data: st.subheader("Metrics Comparison") st.markdown("*Larger area = More similarity of responses.*") labels = ["Context similarity", "Words Similarity", "Summarization Similarity"] num_vars = len(labels) angles = np.linspace(0, 2 * np.pi, num_vars, endpoint=False).tolist() angles += angles[:1] fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True)) # Plot each response with a different color color_palette = sns.color_palette("husl", len(st.session_state.radar_chart_data)) for idx, data in enumerate(st.session_state.radar_chart_data): values = [ data["paraphrasing_similarity"], data["direct_text_comparison_similarity"], data["summarization_similarity"] ] values += values[:1] ax.fill(angles, values, color=color_palette[idx], alpha=0.25, label=data["name"]) ax.plot(angles, values, color=color_palette[idx], linewidth=2, linestyle='solid') ax.set_yticklabels([]) ax.set_xticks(angles[:-1]) ax.set_xticklabels(labels) plt.title("Radar Chart of Similarity Metrics") plt.legend(loc='upper right', bbox_to_anchor=(1.3, 1.1)) st.pyplot(fig) # Display metrics sliders beside the radar chart if st.session_state.radar_chart_data: st.subheader("Similarity Factors") st.markdown("*100 being the best case*") slider_labels = { "paraphrasing_similarity": "Context", "direct_text_comparison_similarity": "Words", "summarization_similarity": "Summary" } metrics = st.session_state.radar_chart_data[-1] for metric_name in ["paraphrasing_similarity", "direct_text_comparison_similarity", "summarization_similarity"]: st.slider( slider_labels[metric_name], 0, 100, int(metrics[metric_name]), key=metric_name, disabled=True, # Make the slider non-editable format="%.0f" # Format the slider value to be an integer ) # Create a three-column layout for the results table and action buttons results_col, actions_col = st.columns([2, 1]) with results_col: st.write("## Detailed Results Table") if not st.session_state.results_df.empty: st.write(st.session_state.results_df) # Download the results as a CSV file csv_data = st.session_state.results_df.to_csv(index=False).encode('utf-8') st.download_button(label="Download Results as CSV", data=csv_data, file_name='similarity_results.csv', mime='text/csv') with actions_col: if st.button("Reset Table"): st.session_state.results_df = pd.DataFrame(columns=[ "LLM1", "LLM2", "Paraphrasing Similarity (%)", "Direct Text Comparison (%)", "Summarization Similarity (%)", "Combined Similarity (%)" ]) st.session_state.radar_chart_data = [] st.write("Results table has been reset.") # Add an "About" button in the sidebar if st.sidebar.button("About"): st.sidebar.markdown(""" ### About This App This app compares text similarity between different responses from Language Models (LLMs). It calculates various similarity metrics and provides a comprehensive comparison using a radar chart. **Features:** - Upload or input text for comparison. - Calculate and display multiple similarity metrics. - Visualize the results using a radar chart. - Download the results as a CSV file. **Similarity Metrics:** 1. **Paraphrasing Similarity**: - Compares chunks of text from both LLM responses using embeddings generated by a pre-trained model. - Calculates the average cosine similarity between the chunks. 2. **Direct Text Comparison**: - Uses a combination of metrics: - **Levenshtein Ratio**: Measures the similarity based on the minimum edit distance. - **Jaccard Similarity**: Compares the overlap of unique words. - **TF-IDF Cosine Similarity**: Compares the text using TF-IDF vectorization. - **BLEU Score**: Evaluates the overlap of n-grams. - **ROUGE-L Score**: Measures the longest matching sequence of words. - **BERTScore**: Uses BERT embeddings to compare sentence similarity. 3. **Summarization Similarity**: - Uses the Word Mover's Distance (WMD) to compare the semantic distance between the summaries of the texts. 4. **Combined Similarity**: - A weighted average of the above metrics to provide an overall similarity score. **Developed with:** - Streamlit - Sentence Transformers - SpaCy - Scikit-learn - NLTK - Gensim """)