""" Additional functions for two-text comparison feature. These functions should be added to the main app.py file. """ import streamlit as st import pandas as pd import numpy as np import plotly.graph_objects as go from scipy import stats from web_app.utils import MemoryFileHandler def get_text_input(label, key_suffix): """Get text input via upload or paste.""" text_input_method = st.radio( "Input Method", options=['Paste Text', 'Upload File'], horizontal=True, key=f"input_method_{key_suffix}" ) text_content = "" if text_input_method == 'Upload File': uploaded_file = st.file_uploader( "Upload Text File", type=['txt'], accept_multiple_files=False, key=f"file_upload_{key_suffix}" ) if uploaded_file: try: # Use memory-based approach to avoid filesystem restrictions text_content = MemoryFileHandler.process_uploaded_file(uploaded_file, as_text=True) if not text_content: st.error("Failed to read uploaded file. Please try again.") return "" except Exception as e: st.error(f"Error reading uploaded file: {str(e)}") return "" else: text_content = st.text_area( f"Enter {label}", height=200, placeholder=f"Paste your {label.lower()} here...", key=f"text_area_{key_suffix}" ) return text_content def display_comparison_results(results_a, results_b): """Display results for two-text comparison.""" st.subheader("📊 Comparison Results") # Basic text statistics comparison st.write("**Text Statistics Comparison**") stats_col_a, stats_col_b, stats_diff = st.columns(3) with stats_col_a: st.write("**Text A**") st.metric("Total Tokens", results_a['text_stats']['total_tokens']) st.metric("Unique Tokens", results_a['text_stats']['unique_tokens']) st.metric("Content Words", results_a['text_stats']['content_words']) st.metric("Function Words", results_a['text_stats']['function_words']) with stats_col_b: st.write("**Text B**") st.metric("Total Tokens", results_b['text_stats']['total_tokens']) st.metric("Unique Tokens", results_b['text_stats']['unique_tokens']) st.metric("Content Words", results_b['text_stats']['content_words']) st.metric("Function Words", results_b['text_stats']['function_words']) with stats_diff: st.write("**Difference**") diff_total = results_b['text_stats']['total_tokens'] - results_a['text_stats']['total_tokens'] diff_unique = results_b['text_stats']['unique_tokens'] - results_a['text_stats']['unique_tokens'] diff_content = results_b['text_stats']['content_words'] - results_a['text_stats']['content_words'] diff_function = results_b['text_stats']['function_words'] - results_a['text_stats']['function_words'] st.metric("Total Tokens", f"{diff_total:+d}") st.metric("Unique Tokens", f"{diff_unique:+d}") st.metric("Content Words", f"{diff_content:+d}") st.metric("Function Words", f"{diff_function:+d}") # Visual comparison display_visual_comparison(results_a, results_b) # Statistical significance testing display_statistical_comparison(results_a, results_b) # Token-level comparison display_token_comparison(results_a, results_b) def display_visual_comparison(results_a, results_b): """Display visual comparison charts.""" st.subheader("📈 Visual Comparison") if not results_a.get('summary') or not results_b.get('summary'): st.warning("No sophistication scores available for visual comparison.") return # Create distribution plots for each measure measures = list(results_a['summary'].keys()) for measure in measures: if measure in results_b['summary']: st.write(f"**{measure} Distribution Comparison**") # Get data for both texts data_a = results_a['raw_scores'].get(measure, []) data_b = results_b['raw_scores'].get(measure, []) if not data_a or not data_b: st.write("No detailed data available for this measure.") continue # Create word-to-score mapping for both texts word_score_map_a = {} word_score_map_b = {} # Build word mappings for Text A if '_bigram_' in measure: if 'bigram_details' in results_a and results_a['bigram_details']: idx = measure.rfind('_bigram') index_measure_col = measure[:idx] + measure[idx+7:] if idx != -1 else measure for bigram_detail in results_a['bigram_details']: if index_measure_col in bigram_detail and bigram_detail[index_measure_col] is not None: bigram_text = bigram_detail.get('bigram', '') word_score_map_a[bigram_text] = bigram_detail[index_measure_col] elif '_trigram_' in measure: if 'trigram_details' in results_a and results_a['trigram_details']: idx = measure.rfind('_trigram') index_measure_col = measure[:idx] + measure[idx+8:] if idx != -1 else measure for trigram_detail in results_a['trigram_details']: if index_measure_col in trigram_detail and trigram_detail[index_measure_col] is not None: trigram_text = trigram_detail.get('trigram', '') word_score_map_a[trigram_text] = trigram_detail[index_measure_col] else: if 'token_details' in results_a: matching_column = None if any(measure in token for token in results_a['token_details']): matching_column = measure else: base_key = measure for suffix in ['_CW', '_FW']: if measure.endswith(suffix): base_key = measure[:-len(suffix)] break if any(base_key in token for token in results_a['token_details']): matching_column = base_key else: for token in results_a['token_details']: for col_name in token.keys(): if col_name not in ['id', 'token', 'lemma', 'pos', 'tag', 'word_type']: if col_name in measure or measure.startswith(col_name): matching_column = col_name break if matching_column: break if matching_column: for token in results_a['token_details']: if matching_column in token and token[matching_column] is not None: word_score_map_a[token['token']] = token[matching_column] # Build word mappings for Text B (same logic) if '_bigram_' in measure: if 'bigram_details' in results_b and results_b['bigram_details']: idx = measure.rfind('_bigram') index_measure_col = measure[:idx] + measure[idx+7:] if idx != -1 else measure for bigram_detail in results_b['bigram_details']: if index_measure_col in bigram_detail and bigram_detail[index_measure_col] is not None: bigram_text = bigram_detail.get('bigram', '') word_score_map_b[bigram_text] = bigram_detail[index_measure_col] elif '_trigram_' in measure: if 'trigram_details' in results_b and results_b['trigram_details']: idx = measure.rfind('_trigram') index_measure_col = measure[:idx] + measure[idx+8:] if idx != -1 else measure for trigram_detail in results_b['trigram_details']: if index_measure_col in trigram_detail and trigram_detail[index_measure_col] is not None: trigram_text = trigram_detail.get('trigram', '') word_score_map_b[trigram_text] = trigram_detail[index_measure_col] else: if 'token_details' in results_b: matching_column = None if any(measure in token for token in results_b['token_details']): matching_column = measure else: base_key = measure for suffix in ['_CW', '_FW']: if measure.endswith(suffix): base_key = measure[:-len(suffix)] break if any(base_key in token for token in results_b['token_details']): matching_column = base_key else: for token in results_b['token_details']: for col_name in token.keys(): if col_name not in ['id', 'token', 'lemma', 'pos', 'tag', 'word_type']: if col_name in measure or measure.startswith(col_name): matching_column = col_name break if matching_column: break if matching_column: for token in results_b['token_details']: if matching_column in token and token[matching_column] is not None: word_score_map_b[token['token']] = token[matching_column] # Calculate bins for consistent binning all_data = data_a + data_b nbins = min(30, len(all_data)) data_min, data_max = min(all_data), max(all_data) data_range = data_max - data_min padding = data_range * 0.02 adjusted_min = data_min - padding adjusted_max = data_max + padding bin_edges = np.linspace(adjusted_min, adjusted_max, nbins + 1) # Assign words to bins for both texts bin_examples_a = {} bin_examples_b = {} if word_score_map_a: import random for word, score in word_score_map_a.items(): bin_idx = np.digitize(score, bin_edges) - 1 bin_idx = max(0, min(bin_idx, len(bin_edges) - 2)) if bin_idx not in bin_examples_a: bin_examples_a[bin_idx] = [] bin_examples_a[bin_idx].append(word) for bin_idx in bin_examples_a: if len(bin_examples_a[bin_idx]) > 3: bin_examples_a[bin_idx] = random.sample(bin_examples_a[bin_idx], 3) if word_score_map_b: import random for word, score in word_score_map_b.items(): bin_idx = np.digitize(score, bin_edges) - 1 bin_idx = max(0, min(bin_idx, len(bin_edges) - 2)) if bin_idx not in bin_examples_b: bin_examples_b[bin_idx] = [] bin_examples_b[bin_idx].append(word) for bin_idx in bin_examples_b: if len(bin_examples_b[bin_idx]) > 3: bin_examples_b[bin_idx] = random.sample(bin_examples_b[bin_idx], 3) # Create hover text for each bin hist_data_a, _ = np.histogram(data_a, bins=bin_edges) hist_data_b, _ = np.histogram(data_b, bins=bin_edges) hover_texts_a = [] hover_texts_b = [] for i in range(len(bin_edges) - 1): bin_start = bin_edges[i] bin_end = bin_edges[i + 1] examples_a = bin_examples_a.get(i, []) examples_b = bin_examples_b.get(i, []) # Hover text for Text A hover_text_a = f"Text A
Range: {bin_start:.3f} - {bin_end:.3f}
" hover_text_a += f"Count: {hist_data_a[i]}
" if examples_a: hover_text_a += f"Examples: {', '.join(examples_a)}" else: hover_text_a += "Examples: none" hover_texts_a.append(hover_text_a) # Hover text for Text B hover_text_b = f"Text B
Range: {bin_start:.3f} - {bin_end:.3f}
" hover_text_b += f"Count: {hist_data_b[i]}
" if examples_b: hover_text_b += f"Examples: {', '.join(examples_b)}" else: hover_text_b += "Examples: none" hover_texts_b.append(hover_text_b) # Create plotly figure fig = go.Figure() # Add histogram for Text A with custom hover fig.add_trace(go.Histogram( x=data_a, name="Text A", opacity=0.5, marker_color="blue", xbins=dict( start=bin_edges[0], end=bin_edges[-1], size=(bin_edges[-1] - bin_edges[0]) / nbins ), histnorm='probability density', hovertemplate='%{customdata}', customdata=hover_texts_a )) # Add histogram for Text B with custom hover fig.add_trace(go.Histogram( x=data_b, name="Text B", opacity=0.5, marker_color="red", xbins=dict( start=bin_edges[0], end=bin_edges[-1], size=(bin_edges[-1] - bin_edges[0]) / nbins ), histnorm='probability density', hovertemplate='%{customdata}', customdata=hover_texts_b )) # Calculate and add KDE (kernel density estimation) curve # Create smooth curve for KDE kde_a = stats.gaussian_kde(data_a) x_range_a = np.linspace(min(data_a), max(data_a), 100) kde_values_a = kde_a(x_range_a) fig.add_trace(go.Scatter( x=x_range_a, y=kde_values_a, mode='lines', name='Text A Density', line=dict(color='blue', width=2) )) # Calculate and add KDE (kernel density estimation) curve # Create smooth curve for KDE kde_b = stats.gaussian_kde(data_b) x_range_b = np.linspace(min(data_b), max(data_b), 100) kde_values_b = kde_b(x_range_b) fig.add_trace(go.Scatter( x=x_range_b, y=kde_values_b, mode='lines', name='Text B Density', line=dict(color='red', width=2) )) # Add vertical mean lines mean_a = np.mean(data_a) mean_b = np.mean(data_b) # Add mean line for Text A fig.add_vline( x=mean_a, line_dash="dash", line_color="blue", line_width=2, annotation_text=f"Text A Mean: {mean_a:.3f}", annotation_position="top left" ) # Add mean line for Text B fig.add_vline( x=mean_b, line_dash="dash", line_color="red", line_width=2, annotation_text=f"Text B Mean: {mean_b:.3f}", annotation_position="top right" ) # Update layout fig.update_layout( title=f"{measure} Distribution Comparison", xaxis_title="Score", yaxis_title="Frequency", barmode='overlay', height=400, showlegend=True ) st.plotly_chart(fig, use_container_width=True) def display_statistical_comparison(results_a, results_b): """Display statistical significance testing results.""" st.subheader("📊 Statistical Analysis") if not results_a.get('summary') or not results_b.get('summary'): st.warning("No sophistication scores available for statistical analysis.") return # Statistical comparison table stat_data = [] measures = list(results_a['summary'].keys()) for measure in measures: if measure in results_b['summary']: data_a = results_a['raw_scores'].get(measure, []) data_b = results_b['raw_scores'].get(measure, []) if len(data_a) > 1 and len(data_b) > 1: # Perform t-test t_stat, p_value = stats.ttest_ind(data_a, data_b) # Calculate effect size (Cohen's d) pooled_std = np.sqrt(((len(data_a) - 1) * np.var(data_a, ddof=1) + (len(data_b) - 1) * np.var(data_b, ddof=1)) / (len(data_a) + len(data_b) - 2)) cohens_d = (np.mean(data_a) - np.mean(data_b)) / pooled_std if pooled_std > 0 else 0 # Effect size interpretation if abs(cohens_d) < 0.2: effect_size = "Negligible" elif abs(cohens_d) < 0.5: effect_size = "Small" elif abs(cohens_d) < 0.8: effect_size = "Medium" else: effect_size = "Large" # Significance level if p_value < 0.001: significance = "***" elif p_value < 0.01: significance = "**" elif p_value < 0.05: significance = "*" else: significance = "ns" stat_data.append({ 'Measure': measure, 't-statistic': round(t_stat, 3), 'p-value': f"{p_value:.6f}", 'Significance': significance, "Cohen's d": round(cohens_d, 3), 'Effect Size': effect_size }) if stat_data: st.write("Statistical analysis completed - results available in detailed outputs.") def display_token_comparison(results_a, results_b): """Display token-level comparison in two side-by-side tables.""" st.subheader("🔍 Token-Level Comparison") if not results_a.get('token_details') or not results_b.get('token_details'): st.warning("No token-level data available for comparison.") return # Get token data tokens_a = results_a['token_details'] tokens_b = results_b['token_details'] # Create two separate dataframes def create_token_dataframe(tokens, text_name): """Create a dataframe for token data.""" token_data = [] for token in tokens: row = { 'Token': token.get('token', ''), 'Lemma': token.get('lemma', ''), 'POS': token.get('pos', ''), "TAG": token.get('tag', ''), 'Type': token.get('word_type', '') } # Add scores for each measure (skip basic fields) for key, value in token.items(): if key not in ['id', 'token', 'lemma', 'pos', 'tag', 'word_type']: row[key] = value if value != 'NA' else 'N/A' token_data.append(row) return pd.DataFrame(token_data) # Create dataframes for both texts df_a = create_token_dataframe(tokens_a, "Text A") df_b = create_token_dataframe(tokens_b, "Text B") # Display tables side by side col_a, col_b = st.columns(2) with col_a: st.write("**Text A Token Details**") if len(df_a) > 100: st.write(f"(showing first 100 of {len(df_a)} tokens)") st.dataframe(df_a.head(100), use_container_width=True) else: st.write(f"({len(df_a)} tokens)") st.dataframe(df_a, use_container_width=True) with col_b: st.write("**Text B Token Details**") if len(df_b) > 100: st.write(f"(showing first 100 of {len(df_b)} tokens)") st.dataframe(df_b.head(100), use_container_width=True) else: st.write(f"({len(df_b)} tokens)") st.dataframe(df_b, use_container_width=True) # Download options st.write("**Download Options**") download_col1, download_col2 = st.columns(2) with download_col1: csv_data_a = df_a.to_csv(index=False) st.download_button( label="Download Text A Tokens (CSV)", data=csv_data_a, file_name="text_a_tokens.csv", mime="text/csv" ) with download_col2: csv_data_b = df_b.to_csv(index=False) st.download_button( label="Download Text B Tokens (CSV)", data=csv_data_b, file_name="text_b_tokens.csv", mime="text/csv" )