Spaces:
Building
Building
| """ | |
| Additional functions for two-text comparison feature. | |
| These functions should be added to the main app.py file. | |
| """ | |
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import plotly.graph_objects as go | |
| from scipy import stats | |
| from web_app.utils import MemoryFileHandler | |
| def get_text_input(label, key_suffix): | |
| """Get text input via upload or paste.""" | |
| text_input_method = st.radio( | |
| "Input Method", | |
| options=['Paste Text', 'Upload File'], | |
| horizontal=True, | |
| key=f"input_method_{key_suffix}" | |
| ) | |
| text_content = "" | |
| if text_input_method == 'Upload File': | |
| uploaded_file = st.file_uploader( | |
| "Upload Text File", | |
| type=['txt'], | |
| accept_multiple_files=False, | |
| key=f"file_upload_{key_suffix}" | |
| ) | |
| if uploaded_file: | |
| try: | |
| # Use memory-based approach to avoid filesystem restrictions | |
| text_content = MemoryFileHandler.process_uploaded_file(uploaded_file, as_text=True) | |
| if not text_content: | |
| st.error("Failed to read uploaded file. Please try again.") | |
| return "" | |
| except Exception as e: | |
| st.error(f"Error reading uploaded file: {str(e)}") | |
| return "" | |
| else: | |
| text_content = st.text_area( | |
| f"Enter {label}", | |
| height=200, | |
| placeholder=f"Paste your {label.lower()} here...", | |
| key=f"text_area_{key_suffix}" | |
| ) | |
| return text_content | |
| def display_comparison_results(results_a, results_b): | |
| """Display results for two-text comparison.""" | |
| st.subheader("π Comparison Results") | |
| # Basic text statistics comparison | |
| st.write("**Text Statistics Comparison**") | |
| stats_col_a, stats_col_b, stats_diff = st.columns(3) | |
| with stats_col_a: | |
| st.write("**Text A**") | |
| st.metric("Total Tokens", results_a['text_stats']['total_tokens']) | |
| st.metric("Unique Tokens", results_a['text_stats']['unique_tokens']) | |
| st.metric("Content Words", results_a['text_stats']['content_words']) | |
| st.metric("Function Words", results_a['text_stats']['function_words']) | |
| with stats_col_b: | |
| st.write("**Text B**") | |
| st.metric("Total Tokens", results_b['text_stats']['total_tokens']) | |
| st.metric("Unique Tokens", results_b['text_stats']['unique_tokens']) | |
| st.metric("Content Words", results_b['text_stats']['content_words']) | |
| st.metric("Function Words", results_b['text_stats']['function_words']) | |
| with stats_diff: | |
| st.write("**Difference**") | |
| diff_total = results_b['text_stats']['total_tokens'] - results_a['text_stats']['total_tokens'] | |
| diff_unique = results_b['text_stats']['unique_tokens'] - results_a['text_stats']['unique_tokens'] | |
| diff_content = results_b['text_stats']['content_words'] - results_a['text_stats']['content_words'] | |
| diff_function = results_b['text_stats']['function_words'] - results_a['text_stats']['function_words'] | |
| st.metric("Total Tokens", f"{diff_total:+d}") | |
| st.metric("Unique Tokens", f"{diff_unique:+d}") | |
| st.metric("Content Words", f"{diff_content:+d}") | |
| st.metric("Function Words", f"{diff_function:+d}") | |
| # Visual comparison | |
| display_visual_comparison(results_a, results_b) | |
| # Statistical significance testing | |
| display_statistical_comparison(results_a, results_b) | |
| # Token-level comparison | |
| display_token_comparison(results_a, results_b) | |
| def display_visual_comparison(results_a, results_b): | |
| """Display visual comparison charts.""" | |
| st.subheader("π Visual Comparison") | |
| if not results_a.get('summary') or not results_b.get('summary'): | |
| st.warning("No sophistication scores available for visual comparison.") | |
| return | |
| # Create distribution plots for each measure | |
| measures = list(results_a['summary'].keys()) | |
| for measure in measures: | |
| if measure in results_b['summary']: | |
| st.write(f"**{measure} Distribution Comparison**") | |
| # Get data for both texts | |
| data_a = results_a['raw_scores'].get(measure, []) | |
| data_b = results_b['raw_scores'].get(measure, []) | |
| if not data_a or not data_b: | |
| st.write("No detailed data available for this measure.") | |
| continue | |
| # Create word-to-score mapping for both texts | |
| word_score_map_a = {} | |
| word_score_map_b = {} | |
| # Build word mappings for Text A | |
| if '_bigram_' in measure: | |
| if 'bigram_details' in results_a and results_a['bigram_details']: | |
| idx = measure.rfind('_bigram') | |
| index_measure_col = measure[:idx] + measure[idx+7:] if idx != -1 else measure | |
| for bigram_detail in results_a['bigram_details']: | |
| if index_measure_col in bigram_detail and bigram_detail[index_measure_col] is not None: | |
| bigram_text = bigram_detail.get('bigram', '') | |
| word_score_map_a[bigram_text] = bigram_detail[index_measure_col] | |
| elif '_trigram_' in measure: | |
| if 'trigram_details' in results_a and results_a['trigram_details']: | |
| idx = measure.rfind('_trigram') | |
| index_measure_col = measure[:idx] + measure[idx+8:] if idx != -1 else measure | |
| for trigram_detail in results_a['trigram_details']: | |
| if index_measure_col in trigram_detail and trigram_detail[index_measure_col] is not None: | |
| trigram_text = trigram_detail.get('trigram', '') | |
| word_score_map_a[trigram_text] = trigram_detail[index_measure_col] | |
| else: | |
| if 'token_details' in results_a: | |
| matching_column = None | |
| if any(measure in token for token in results_a['token_details']): | |
| matching_column = measure | |
| else: | |
| base_key = measure | |
| for suffix in ['_CW', '_FW']: | |
| if measure.endswith(suffix): | |
| base_key = measure[:-len(suffix)] | |
| break | |
| if any(base_key in token for token in results_a['token_details']): | |
| matching_column = base_key | |
| else: | |
| for token in results_a['token_details']: | |
| for col_name in token.keys(): | |
| if col_name not in ['id', 'token', 'lemma', 'pos', 'tag', 'word_type']: | |
| if col_name in measure or measure.startswith(col_name): | |
| matching_column = col_name | |
| break | |
| if matching_column: | |
| break | |
| if matching_column: | |
| for token in results_a['token_details']: | |
| if matching_column in token and token[matching_column] is not None: | |
| word_score_map_a[token['token']] = token[matching_column] | |
| # Build word mappings for Text B (same logic) | |
| if '_bigram_' in measure: | |
| if 'bigram_details' in results_b and results_b['bigram_details']: | |
| idx = measure.rfind('_bigram') | |
| index_measure_col = measure[:idx] + measure[idx+7:] if idx != -1 else measure | |
| for bigram_detail in results_b['bigram_details']: | |
| if index_measure_col in bigram_detail and bigram_detail[index_measure_col] is not None: | |
| bigram_text = bigram_detail.get('bigram', '') | |
| word_score_map_b[bigram_text] = bigram_detail[index_measure_col] | |
| elif '_trigram_' in measure: | |
| if 'trigram_details' in results_b and results_b['trigram_details']: | |
| idx = measure.rfind('_trigram') | |
| index_measure_col = measure[:idx] + measure[idx+8:] if idx != -1 else measure | |
| for trigram_detail in results_b['trigram_details']: | |
| if index_measure_col in trigram_detail and trigram_detail[index_measure_col] is not None: | |
| trigram_text = trigram_detail.get('trigram', '') | |
| word_score_map_b[trigram_text] = trigram_detail[index_measure_col] | |
| else: | |
| if 'token_details' in results_b: | |
| matching_column = None | |
| if any(measure in token for token in results_b['token_details']): | |
| matching_column = measure | |
| else: | |
| base_key = measure | |
| for suffix in ['_CW', '_FW']: | |
| if measure.endswith(suffix): | |
| base_key = measure[:-len(suffix)] | |
| break | |
| if any(base_key in token for token in results_b['token_details']): | |
| matching_column = base_key | |
| else: | |
| for token in results_b['token_details']: | |
| for col_name in token.keys(): | |
| if col_name not in ['id', 'token', 'lemma', 'pos', 'tag', 'word_type']: | |
| if col_name in measure or measure.startswith(col_name): | |
| matching_column = col_name | |
| break | |
| if matching_column: | |
| break | |
| if matching_column: | |
| for token in results_b['token_details']: | |
| if matching_column in token and token[matching_column] is not None: | |
| word_score_map_b[token['token']] = token[matching_column] | |
| # Calculate bins for consistent binning | |
| all_data = data_a + data_b | |
| nbins = min(30, len(all_data)) | |
| data_min, data_max = min(all_data), max(all_data) | |
| data_range = data_max - data_min | |
| padding = data_range * 0.02 | |
| adjusted_min = data_min - padding | |
| adjusted_max = data_max + padding | |
| bin_edges = np.linspace(adjusted_min, adjusted_max, nbins + 1) | |
| # Assign words to bins for both texts | |
| bin_examples_a = {} | |
| bin_examples_b = {} | |
| if word_score_map_a: | |
| import random | |
| for word, score in word_score_map_a.items(): | |
| bin_idx = np.digitize(score, bin_edges) - 1 | |
| bin_idx = max(0, min(bin_idx, len(bin_edges) - 2)) | |
| if bin_idx not in bin_examples_a: | |
| bin_examples_a[bin_idx] = [] | |
| bin_examples_a[bin_idx].append(word) | |
| for bin_idx in bin_examples_a: | |
| if len(bin_examples_a[bin_idx]) > 3: | |
| bin_examples_a[bin_idx] = random.sample(bin_examples_a[bin_idx], 3) | |
| if word_score_map_b: | |
| import random | |
| for word, score in word_score_map_b.items(): | |
| bin_idx = np.digitize(score, bin_edges) - 1 | |
| bin_idx = max(0, min(bin_idx, len(bin_edges) - 2)) | |
| if bin_idx not in bin_examples_b: | |
| bin_examples_b[bin_idx] = [] | |
| bin_examples_b[bin_idx].append(word) | |
| for bin_idx in bin_examples_b: | |
| if len(bin_examples_b[bin_idx]) > 3: | |
| bin_examples_b[bin_idx] = random.sample(bin_examples_b[bin_idx], 3) | |
| # Create hover text for each bin | |
| hist_data_a, _ = np.histogram(data_a, bins=bin_edges) | |
| hist_data_b, _ = np.histogram(data_b, bins=bin_edges) | |
| hover_texts_a = [] | |
| hover_texts_b = [] | |
| for i in range(len(bin_edges) - 1): | |
| bin_start = bin_edges[i] | |
| bin_end = bin_edges[i + 1] | |
| examples_a = bin_examples_a.get(i, []) | |
| examples_b = bin_examples_b.get(i, []) | |
| # Hover text for Text A | |
| hover_text_a = f"Text A<br>Range: {bin_start:.3f} - {bin_end:.3f}<br>" | |
| hover_text_a += f"Count: {hist_data_a[i]}<br>" | |
| if examples_a: | |
| hover_text_a += f"Examples: {', '.join(examples_a)}" | |
| else: | |
| hover_text_a += "Examples: none" | |
| hover_texts_a.append(hover_text_a) | |
| # Hover text for Text B | |
| hover_text_b = f"Text B<br>Range: {bin_start:.3f} - {bin_end:.3f}<br>" | |
| hover_text_b += f"Count: {hist_data_b[i]}<br>" | |
| if examples_b: | |
| hover_text_b += f"Examples: {', '.join(examples_b)}" | |
| else: | |
| hover_text_b += "Examples: none" | |
| hover_texts_b.append(hover_text_b) | |
| # Create plotly figure | |
| fig = go.Figure() | |
| # Add histogram for Text A with custom hover | |
| fig.add_trace(go.Histogram( | |
| x=data_a, | |
| name="Text A", | |
| opacity=0.5, | |
| marker_color="blue", | |
| xbins=dict( | |
| start=bin_edges[0], | |
| end=bin_edges[-1], | |
| size=(bin_edges[-1] - bin_edges[0]) / nbins | |
| ), | |
| histnorm='probability density', | |
| hovertemplate='%{customdata}<extra></extra>', | |
| customdata=hover_texts_a | |
| )) | |
| # Add histogram for Text B with custom hover | |
| fig.add_trace(go.Histogram( | |
| x=data_b, | |
| name="Text B", | |
| opacity=0.5, | |
| marker_color="red", | |
| xbins=dict( | |
| start=bin_edges[0], | |
| end=bin_edges[-1], | |
| size=(bin_edges[-1] - bin_edges[0]) / nbins | |
| ), | |
| histnorm='probability density', | |
| hovertemplate='%{customdata}<extra></extra>', | |
| customdata=hover_texts_b | |
| )) | |
| # Calculate and add KDE (kernel density estimation) curve | |
| # Create smooth curve for KDE | |
| kde_a = stats.gaussian_kde(data_a) | |
| x_range_a = np.linspace(min(data_a), max(data_a), 100) | |
| kde_values_a = kde_a(x_range_a) | |
| fig.add_trace(go.Scatter( | |
| x=x_range_a, | |
| y=kde_values_a, | |
| mode='lines', | |
| name='Text A Density', | |
| line=dict(color='blue', width=2) | |
| )) | |
| # Calculate and add KDE (kernel density estimation) curve | |
| # Create smooth curve for KDE | |
| kde_b = stats.gaussian_kde(data_b) | |
| x_range_b = np.linspace(min(data_b), max(data_b), 100) | |
| kde_values_b = kde_b(x_range_b) | |
| fig.add_trace(go.Scatter( | |
| x=x_range_b, | |
| y=kde_values_b, | |
| mode='lines', | |
| name='Text B Density', | |
| line=dict(color='red', width=2) | |
| )) | |
| # Add vertical mean lines | |
| mean_a = np.mean(data_a) | |
| mean_b = np.mean(data_b) | |
| # Add mean line for Text A | |
| fig.add_vline( | |
| x=mean_a, | |
| line_dash="dash", | |
| line_color="blue", | |
| line_width=2, | |
| annotation_text=f"Text A Mean: {mean_a:.3f}", | |
| annotation_position="top left" | |
| ) | |
| # Add mean line for Text B | |
| fig.add_vline( | |
| x=mean_b, | |
| line_dash="dash", | |
| line_color="red", | |
| line_width=2, | |
| annotation_text=f"Text B Mean: {mean_b:.3f}", | |
| annotation_position="top right" | |
| ) | |
| # Update layout | |
| fig.update_layout( | |
| title=f"{measure} Distribution Comparison", | |
| xaxis_title="Score", | |
| yaxis_title="Frequency", | |
| barmode='overlay', | |
| height=400, | |
| showlegend=True | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| def display_statistical_comparison(results_a, results_b): | |
| """Display statistical significance testing results.""" | |
| st.subheader("π Statistical Analysis") | |
| if not results_a.get('summary') or not results_b.get('summary'): | |
| st.warning("No sophistication scores available for statistical analysis.") | |
| return | |
| # Statistical comparison table | |
| stat_data = [] | |
| measures = list(results_a['summary'].keys()) | |
| for measure in measures: | |
| if measure in results_b['summary']: | |
| data_a = results_a['raw_scores'].get(measure, []) | |
| data_b = results_b['raw_scores'].get(measure, []) | |
| if len(data_a) > 1 and len(data_b) > 1: | |
| # Perform t-test | |
| t_stat, p_value = stats.ttest_ind(data_a, data_b) | |
| # Calculate effect size (Cohen's d) | |
| pooled_std = np.sqrt(((len(data_a) - 1) * np.var(data_a, ddof=1) + | |
| (len(data_b) - 1) * np.var(data_b, ddof=1)) / | |
| (len(data_a) + len(data_b) - 2)) | |
| cohens_d = (np.mean(data_a) - np.mean(data_b)) / pooled_std if pooled_std > 0 else 0 | |
| # Effect size interpretation | |
| if abs(cohens_d) < 0.2: | |
| effect_size = "Negligible" | |
| elif abs(cohens_d) < 0.5: | |
| effect_size = "Small" | |
| elif abs(cohens_d) < 0.8: | |
| effect_size = "Medium" | |
| else: | |
| effect_size = "Large" | |
| # Significance level | |
| if p_value < 0.001: | |
| significance = "***" | |
| elif p_value < 0.01: | |
| significance = "**" | |
| elif p_value < 0.05: | |
| significance = "*" | |
| else: | |
| significance = "ns" | |
| stat_data.append({ | |
| 'Measure': measure, | |
| 't-statistic': round(t_stat, 3), | |
| 'p-value': f"{p_value:.6f}", | |
| 'Significance': significance, | |
| "Cohen's d": round(cohens_d, 3), | |
| 'Effect Size': effect_size | |
| }) | |
| if stat_data: | |
| st.write("Statistical analysis completed - results available in detailed outputs.") | |
| def display_token_comparison(results_a, results_b): | |
| """Display token-level comparison in two side-by-side tables.""" | |
| st.subheader("π Token-Level Comparison") | |
| if not results_a.get('token_details') or not results_b.get('token_details'): | |
| st.warning("No token-level data available for comparison.") | |
| return | |
| # Get token data | |
| tokens_a = results_a['token_details'] | |
| tokens_b = results_b['token_details'] | |
| # Create two separate dataframes | |
| def create_token_dataframe(tokens, text_name): | |
| """Create a dataframe for token data.""" | |
| token_data = [] | |
| for token in tokens: | |
| row = { | |
| 'Token': token.get('token', ''), | |
| 'Lemma': token.get('lemma', ''), | |
| 'POS': token.get('pos', ''), | |
| "TAG": token.get('tag', ''), | |
| 'Type': token.get('word_type', '') | |
| } | |
| # Add scores for each measure (skip basic fields) | |
| for key, value in token.items(): | |
| if key not in ['id', 'token', 'lemma', 'pos', 'tag', 'word_type']: | |
| row[key] = value if value != 'NA' else 'N/A' | |
| token_data.append(row) | |
| return pd.DataFrame(token_data) | |
| # Create dataframes for both texts | |
| df_a = create_token_dataframe(tokens_a, "Text A") | |
| df_b = create_token_dataframe(tokens_b, "Text B") | |
| # Display tables side by side | |
| col_a, col_b = st.columns(2) | |
| with col_a: | |
| st.write("**Text A Token Details**") | |
| if len(df_a) > 100: | |
| st.write(f"(showing first 100 of {len(df_a)} tokens)") | |
| st.dataframe(df_a.head(100), use_container_width=True) | |
| else: | |
| st.write(f"({len(df_a)} tokens)") | |
| st.dataframe(df_a, use_container_width=True) | |
| with col_b: | |
| st.write("**Text B Token Details**") | |
| if len(df_b) > 100: | |
| st.write(f"(showing first 100 of {len(df_b)} tokens)") | |
| st.dataframe(df_b.head(100), use_container_width=True) | |
| else: | |
| st.write(f"({len(df_b)} tokens)") | |
| st.dataframe(df_b, use_container_width=True) | |
| # Download options | |
| st.write("**Download Options**") | |
| download_col1, download_col2 = st.columns(2) | |
| with download_col1: | |
| csv_data_a = df_a.to_csv(index=False) | |
| st.download_button( | |
| label="Download Text A Tokens (CSV)", | |
| data=csv_data_a, | |
| file_name="text_a_tokens.csv", | |
| mime="text/csv" | |
| ) | |
| with download_col2: | |
| csv_data_b = df_b.to_csv(index=False) | |
| st.download_button( | |
| label="Download Text B Tokens (CSV)", | |
| data=csv_data_b, | |
| file_name="text_b_tokens.csv", | |
| mime="text/csv" | |
| ) | |