"""
Additional functions for two-text comparison feature.
These functions should be added to the main app.py file.
"""
import streamlit as st
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from scipy import stats
from web_app.utils import MemoryFileHandler
def get_text_input(label, key_suffix):
"""Get text input via upload or paste."""
text_input_method = st.radio(
"Input Method",
options=['Paste Text', 'Upload File'],
horizontal=True,
key=f"input_method_{key_suffix}"
)
text_content = ""
if text_input_method == 'Upload File':
uploaded_file = st.file_uploader(
"Upload Text File",
type=['txt'],
accept_multiple_files=False,
key=f"file_upload_{key_suffix}"
)
if uploaded_file:
try:
# Use memory-based approach to avoid filesystem restrictions
text_content = MemoryFileHandler.process_uploaded_file(uploaded_file, as_text=True)
if not text_content:
st.error("Failed to read uploaded file. Please try again.")
return ""
except Exception as e:
st.error(f"Error reading uploaded file: {str(e)}")
return ""
else:
text_content = st.text_area(
f"Enter {label}",
height=200,
placeholder=f"Paste your {label.lower()} here...",
key=f"text_area_{key_suffix}"
)
return text_content
def display_comparison_results(results_a, results_b):
"""Display results for two-text comparison."""
st.subheader("📊 Comparison Results")
# Basic text statistics comparison
st.write("**Text Statistics Comparison**")
stats_col_a, stats_col_b, stats_diff = st.columns(3)
with stats_col_a:
st.write("**Text A**")
st.metric("Total Tokens", results_a['text_stats']['total_tokens'])
st.metric("Unique Tokens", results_a['text_stats']['unique_tokens'])
st.metric("Content Words", results_a['text_stats']['content_words'])
st.metric("Function Words", results_a['text_stats']['function_words'])
with stats_col_b:
st.write("**Text B**")
st.metric("Total Tokens", results_b['text_stats']['total_tokens'])
st.metric("Unique Tokens", results_b['text_stats']['unique_tokens'])
st.metric("Content Words", results_b['text_stats']['content_words'])
st.metric("Function Words", results_b['text_stats']['function_words'])
with stats_diff:
st.write("**Difference**")
diff_total = results_b['text_stats']['total_tokens'] - results_a['text_stats']['total_tokens']
diff_unique = results_b['text_stats']['unique_tokens'] - results_a['text_stats']['unique_tokens']
diff_content = results_b['text_stats']['content_words'] - results_a['text_stats']['content_words']
diff_function = results_b['text_stats']['function_words'] - results_a['text_stats']['function_words']
st.metric("Total Tokens", f"{diff_total:+d}")
st.metric("Unique Tokens", f"{diff_unique:+d}")
st.metric("Content Words", f"{diff_content:+d}")
st.metric("Function Words", f"{diff_function:+d}")
# Visual comparison
display_visual_comparison(results_a, results_b)
# Statistical significance testing
display_statistical_comparison(results_a, results_b)
# Token-level comparison
display_token_comparison(results_a, results_b)
def display_visual_comparison(results_a, results_b):
"""Display visual comparison charts."""
st.subheader("📈 Visual Comparison")
if not results_a.get('summary') or not results_b.get('summary'):
st.warning("No sophistication scores available for visual comparison.")
return
# Create distribution plots for each measure
measures = list(results_a['summary'].keys())
for measure in measures:
if measure in results_b['summary']:
st.write(f"**{measure} Distribution Comparison**")
# Get data for both texts
data_a = results_a['raw_scores'].get(measure, [])
data_b = results_b['raw_scores'].get(measure, [])
if not data_a or not data_b:
st.write("No detailed data available for this measure.")
continue
# Create word-to-score mapping for both texts
word_score_map_a = {}
word_score_map_b = {}
# Build word mappings for Text A
if '_bigram_' in measure:
if 'bigram_details' in results_a and results_a['bigram_details']:
idx = measure.rfind('_bigram')
index_measure_col = measure[:idx] + measure[idx+7:] if idx != -1 else measure
for bigram_detail in results_a['bigram_details']:
if index_measure_col in bigram_detail and bigram_detail[index_measure_col] is not None:
bigram_text = bigram_detail.get('bigram', '')
word_score_map_a[bigram_text] = bigram_detail[index_measure_col]
elif '_trigram_' in measure:
if 'trigram_details' in results_a and results_a['trigram_details']:
idx = measure.rfind('_trigram')
index_measure_col = measure[:idx] + measure[idx+8:] if idx != -1 else measure
for trigram_detail in results_a['trigram_details']:
if index_measure_col in trigram_detail and trigram_detail[index_measure_col] is not None:
trigram_text = trigram_detail.get('trigram', '')
word_score_map_a[trigram_text] = trigram_detail[index_measure_col]
else:
if 'token_details' in results_a:
matching_column = None
if any(measure in token for token in results_a['token_details']):
matching_column = measure
else:
base_key = measure
for suffix in ['_CW', '_FW']:
if measure.endswith(suffix):
base_key = measure[:-len(suffix)]
break
if any(base_key in token for token in results_a['token_details']):
matching_column = base_key
else:
for token in results_a['token_details']:
for col_name in token.keys():
if col_name not in ['id', 'token', 'lemma', 'pos', 'tag', 'word_type']:
if col_name in measure or measure.startswith(col_name):
matching_column = col_name
break
if matching_column:
break
if matching_column:
for token in results_a['token_details']:
if matching_column in token and token[matching_column] is not None:
word_score_map_a[token['token']] = token[matching_column]
# Build word mappings for Text B (same logic)
if '_bigram_' in measure:
if 'bigram_details' in results_b and results_b['bigram_details']:
idx = measure.rfind('_bigram')
index_measure_col = measure[:idx] + measure[idx+7:] if idx != -1 else measure
for bigram_detail in results_b['bigram_details']:
if index_measure_col in bigram_detail and bigram_detail[index_measure_col] is not None:
bigram_text = bigram_detail.get('bigram', '')
word_score_map_b[bigram_text] = bigram_detail[index_measure_col]
elif '_trigram_' in measure:
if 'trigram_details' in results_b and results_b['trigram_details']:
idx = measure.rfind('_trigram')
index_measure_col = measure[:idx] + measure[idx+8:] if idx != -1 else measure
for trigram_detail in results_b['trigram_details']:
if index_measure_col in trigram_detail and trigram_detail[index_measure_col] is not None:
trigram_text = trigram_detail.get('trigram', '')
word_score_map_b[trigram_text] = trigram_detail[index_measure_col]
else:
if 'token_details' in results_b:
matching_column = None
if any(measure in token for token in results_b['token_details']):
matching_column = measure
else:
base_key = measure
for suffix in ['_CW', '_FW']:
if measure.endswith(suffix):
base_key = measure[:-len(suffix)]
break
if any(base_key in token for token in results_b['token_details']):
matching_column = base_key
else:
for token in results_b['token_details']:
for col_name in token.keys():
if col_name not in ['id', 'token', 'lemma', 'pos', 'tag', 'word_type']:
if col_name in measure or measure.startswith(col_name):
matching_column = col_name
break
if matching_column:
break
if matching_column:
for token in results_b['token_details']:
if matching_column in token and token[matching_column] is not None:
word_score_map_b[token['token']] = token[matching_column]
# Calculate bins for consistent binning
all_data = data_a + data_b
nbins = min(30, len(all_data))
data_min, data_max = min(all_data), max(all_data)
data_range = data_max - data_min
padding = data_range * 0.02
adjusted_min = data_min - padding
adjusted_max = data_max + padding
bin_edges = np.linspace(adjusted_min, adjusted_max, nbins + 1)
# Assign words to bins for both texts
bin_examples_a = {}
bin_examples_b = {}
if word_score_map_a:
import random
for word, score in word_score_map_a.items():
bin_idx = np.digitize(score, bin_edges) - 1
bin_idx = max(0, min(bin_idx, len(bin_edges) - 2))
if bin_idx not in bin_examples_a:
bin_examples_a[bin_idx] = []
bin_examples_a[bin_idx].append(word)
for bin_idx in bin_examples_a:
if len(bin_examples_a[bin_idx]) > 3:
bin_examples_a[bin_idx] = random.sample(bin_examples_a[bin_idx], 3)
if word_score_map_b:
import random
for word, score in word_score_map_b.items():
bin_idx = np.digitize(score, bin_edges) - 1
bin_idx = max(0, min(bin_idx, len(bin_edges) - 2))
if bin_idx not in bin_examples_b:
bin_examples_b[bin_idx] = []
bin_examples_b[bin_idx].append(word)
for bin_idx in bin_examples_b:
if len(bin_examples_b[bin_idx]) > 3:
bin_examples_b[bin_idx] = random.sample(bin_examples_b[bin_idx], 3)
# Create hover text for each bin
hist_data_a, _ = np.histogram(data_a, bins=bin_edges)
hist_data_b, _ = np.histogram(data_b, bins=bin_edges)
hover_texts_a = []
hover_texts_b = []
for i in range(len(bin_edges) - 1):
bin_start = bin_edges[i]
bin_end = bin_edges[i + 1]
examples_a = bin_examples_a.get(i, [])
examples_b = bin_examples_b.get(i, [])
# Hover text for Text A
hover_text_a = f"Text A
Range: {bin_start:.3f} - {bin_end:.3f}
"
hover_text_a += f"Count: {hist_data_a[i]}
"
if examples_a:
hover_text_a += f"Examples: {', '.join(examples_a)}"
else:
hover_text_a += "Examples: none"
hover_texts_a.append(hover_text_a)
# Hover text for Text B
hover_text_b = f"Text B
Range: {bin_start:.3f} - {bin_end:.3f}
"
hover_text_b += f"Count: {hist_data_b[i]}
"
if examples_b:
hover_text_b += f"Examples: {', '.join(examples_b)}"
else:
hover_text_b += "Examples: none"
hover_texts_b.append(hover_text_b)
# Create plotly figure
fig = go.Figure()
# Add histogram for Text A with custom hover
fig.add_trace(go.Histogram(
x=data_a,
name="Text A",
opacity=0.5,
marker_color="blue",
xbins=dict(
start=bin_edges[0],
end=bin_edges[-1],
size=(bin_edges[-1] - bin_edges[0]) / nbins
),
histnorm='probability density',
hovertemplate='%{customdata}',
customdata=hover_texts_a
))
# Add histogram for Text B with custom hover
fig.add_trace(go.Histogram(
x=data_b,
name="Text B",
opacity=0.5,
marker_color="red",
xbins=dict(
start=bin_edges[0],
end=bin_edges[-1],
size=(bin_edges[-1] - bin_edges[0]) / nbins
),
histnorm='probability density',
hovertemplate='%{customdata}',
customdata=hover_texts_b
))
# Calculate and add KDE (kernel density estimation) curve
# Create smooth curve for KDE
kde_a = stats.gaussian_kde(data_a)
x_range_a = np.linspace(min(data_a), max(data_a), 100)
kde_values_a = kde_a(x_range_a)
fig.add_trace(go.Scatter(
x=x_range_a,
y=kde_values_a,
mode='lines',
name='Text A Density',
line=dict(color='blue', width=2)
))
# Calculate and add KDE (kernel density estimation) curve
# Create smooth curve for KDE
kde_b = stats.gaussian_kde(data_b)
x_range_b = np.linspace(min(data_b), max(data_b), 100)
kde_values_b = kde_b(x_range_b)
fig.add_trace(go.Scatter(
x=x_range_b,
y=kde_values_b,
mode='lines',
name='Text B Density',
line=dict(color='red', width=2)
))
# Add vertical mean lines
mean_a = np.mean(data_a)
mean_b = np.mean(data_b)
# Add mean line for Text A
fig.add_vline(
x=mean_a,
line_dash="dash",
line_color="blue",
line_width=2,
annotation_text=f"Text A Mean: {mean_a:.3f}",
annotation_position="top left"
)
# Add mean line for Text B
fig.add_vline(
x=mean_b,
line_dash="dash",
line_color="red",
line_width=2,
annotation_text=f"Text B Mean: {mean_b:.3f}",
annotation_position="top right"
)
# Update layout
fig.update_layout(
title=f"{measure} Distribution Comparison",
xaxis_title="Score",
yaxis_title="Frequency",
barmode='overlay',
height=400,
showlegend=True
)
st.plotly_chart(fig, use_container_width=True)
def display_statistical_comparison(results_a, results_b):
"""Display statistical significance testing results."""
st.subheader("📊 Statistical Analysis")
if not results_a.get('summary') or not results_b.get('summary'):
st.warning("No sophistication scores available for statistical analysis.")
return
# Statistical comparison table
stat_data = []
measures = list(results_a['summary'].keys())
for measure in measures:
if measure in results_b['summary']:
data_a = results_a['raw_scores'].get(measure, [])
data_b = results_b['raw_scores'].get(measure, [])
if len(data_a) > 1 and len(data_b) > 1:
# Perform t-test
t_stat, p_value = stats.ttest_ind(data_a, data_b)
# Calculate effect size (Cohen's d)
pooled_std = np.sqrt(((len(data_a) - 1) * np.var(data_a, ddof=1) +
(len(data_b) - 1) * np.var(data_b, ddof=1)) /
(len(data_a) + len(data_b) - 2))
cohens_d = (np.mean(data_a) - np.mean(data_b)) / pooled_std if pooled_std > 0 else 0
# Effect size interpretation
if abs(cohens_d) < 0.2:
effect_size = "Negligible"
elif abs(cohens_d) < 0.5:
effect_size = "Small"
elif abs(cohens_d) < 0.8:
effect_size = "Medium"
else:
effect_size = "Large"
# Significance level
if p_value < 0.001:
significance = "***"
elif p_value < 0.01:
significance = "**"
elif p_value < 0.05:
significance = "*"
else:
significance = "ns"
stat_data.append({
'Measure': measure,
't-statistic': round(t_stat, 3),
'p-value': f"{p_value:.6f}",
'Significance': significance,
"Cohen's d": round(cohens_d, 3),
'Effect Size': effect_size
})
if stat_data:
st.write("Statistical analysis completed - results available in detailed outputs.")
def display_token_comparison(results_a, results_b):
"""Display token-level comparison in two side-by-side tables."""
st.subheader("🔍 Token-Level Comparison")
if not results_a.get('token_details') or not results_b.get('token_details'):
st.warning("No token-level data available for comparison.")
return
# Get token data
tokens_a = results_a['token_details']
tokens_b = results_b['token_details']
# Create two separate dataframes
def create_token_dataframe(tokens, text_name):
"""Create a dataframe for token data."""
token_data = []
for token in tokens:
row = {
'Token': token.get('token', ''),
'Lemma': token.get('lemma', ''),
'POS': token.get('pos', ''),
"TAG": token.get('tag', ''),
'Type': token.get('word_type', '')
}
# Add scores for each measure (skip basic fields)
for key, value in token.items():
if key not in ['id', 'token', 'lemma', 'pos', 'tag', 'word_type']:
row[key] = value if value != 'NA' else 'N/A'
token_data.append(row)
return pd.DataFrame(token_data)
# Create dataframes for both texts
df_a = create_token_dataframe(tokens_a, "Text A")
df_b = create_token_dataframe(tokens_b, "Text B")
# Display tables side by side
col_a, col_b = st.columns(2)
with col_a:
st.write("**Text A Token Details**")
if len(df_a) > 100:
st.write(f"(showing first 100 of {len(df_a)} tokens)")
st.dataframe(df_a.head(100), use_container_width=True)
else:
st.write(f"({len(df_a)} tokens)")
st.dataframe(df_a, use_container_width=True)
with col_b:
st.write("**Text B Token Details**")
if len(df_b) > 100:
st.write(f"(showing first 100 of {len(df_b)} tokens)")
st.dataframe(df_b.head(100), use_container_width=True)
else:
st.write(f"({len(df_b)} tokens)")
st.dataframe(df_b, use_container_width=True)
# Download options
st.write("**Download Options**")
download_col1, download_col2 = st.columns(2)
with download_col1:
csv_data_a = df_a.to_csv(index=False)
st.download_button(
label="Download Text A Tokens (CSV)",
data=csv_data_a,
file_name="text_a_tokens.csv",
mime="text/csv"
)
with download_col2:
csv_data_b = df_b.to_csv(index=False)
st.download_button(
label="Download Text B Tokens (CSV)",
data=csv_data_b,
file_name="text_b_tokens.csv",
mime="text/csv"
)