import streamlit as st
import re
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import io
import time
from typing import List, Dict, Any
# Safe model loading without cache permission issues
@st.cache_resource
def load_sentence_transformer():
st.info("â ī¸ Semantic chunking disabled in this environment")
return None
@st.cache_resource
def load_nltk():
try:
import nltk
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
try:
nltk.download('punkt', quiet=True)
except:
pass
return nltk
except ImportError:
return None
class ChunkVisualizer:
def __init__(self):
self.colors = [
'#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FECA57',
'#FD79A8', '#A29BFE', '#6C5CE7', '#74B9FF', '#00B894'
]
self.model = None
self.nltk = None
def initialize_models(self):
"""Lazy load models only when needed"""
if self.model is None:
self.model = load_sentence_transformer()
if self.nltk is None:
self.nltk = load_nltk()
def extract_text_from_pdf(self, pdf_file):
"""Extract text from PDF file"""
try:
import PyPDF2
pdf_file.seek(0)
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = ""
st.write(f"đ Processing PDF with {len(pdf_reader.pages)} pages...")
for page_num, page in enumerate(pdf_reader.pages):
try:
page_text = page.extract_text()
if page_text.strip():
text += f"\n--- Page {page_num + 1} ---\n{page_text}\n"
except Exception as e:
st.warning(f"Could not extract text from page {page_num + 1}: {str(e)}")
if not text.strip():
st.warning("PDF appears to be image-based or empty.")
return "No extractable text found in PDF document."
return text.strip()
except Exception as e:
st.error(f"Error reading PDF: {str(e)}")
return ""
def extract_text_from_excel(self, excel_file):
"""Extract text from Excel file"""
try:
excel_file.seek(0)
try:
xl_data = pd.read_excel(excel_file, sheet_name=None, engine='openpyxl')
except:
try:
xl_data = pd.read_excel(excel_file, sheet_name=None, engine='xlrd')
except:
xl_data = pd.read_excel(excel_file, sheet_name=None)
text = ""
sheet_count = len(xl_data)
st.write(f"đ Processing Excel file with {sheet_count} sheet(s)...")
for sheet_name, df in xl_data.items():
text += f"\n=== Sheet: {sheet_name} ===\n"
if not df.empty:
headers = " | ".join(str(col) for col in df.columns)
text += f"Headers: {headers}\n"
text += "-" * 50 + "\n"
max_rows = min(100, len(df))
for idx, row in df.head(max_rows).iterrows():
row_text = " | ".join(str(val) if pd.notna(val) else "" for val in row)
text += row_text + "\n"
if len(df) > max_rows:
text += f"... ({len(df) - max_rows} more rows)\n"
else:
text += "Empty sheet\n"
text += "\n"
return text.strip()
except Exception as e:
st.error(f"Error reading Excel file: {str(e)}")
return ""
def extract_text_from_csv(self, csv_file):
"""Extract text from CSV file"""
try:
csv_file.seek(0)
for encoding in ['utf-8', 'latin-1', 'cp1252']:
try:
csv_file.seek(0)
df = pd.read_csv(csv_file, encoding=encoding)
break
except UnicodeDecodeError:
continue
else:
df = pd.read_csv(csv_file)
if df.empty:
return "Empty CSV file"
st.write(f"đ Processing CSV with {len(df)} rows and {len(df.columns)} columns...")
text = "=== CSV Data ===\n"
headers = " | ".join(str(col) for col in df.columns)
text += f"Headers: {headers}\n"
text += "-" * 50 + "\n"
max_rows = min(100, len(df))
for _, row in df.head(max_rows).iterrows():
row_text = " | ".join(str(val) if pd.notna(val) else "" for val in row)
text += row_text + "\n"
if len(df) > max_rows:
text += f"... ({len(df) - max_rows} more rows)\n"
return text.strip()
except Exception as e:
st.error(f"Error reading CSV file: {str(e)}")
return ""
def extract_text_from_docx(self, docx_file):
"""Extract text from Word document"""
try:
from docx import Document
docx_file.seek(0)
doc = Document(docx_file)
text = ""
for paragraph in doc.paragraphs:
if paragraph.text.strip():
text += paragraph.text + "\n"
for table in doc.tables:
text += "\n=== Table ===\n"
for row in table.rows:
row_text = " | ".join(cell.text.strip() for cell in row.cells)
text += row_text + "\n"
text += "\n"
return text.strip()
except Exception as e:
st.error(f"Error reading Word document: {str(e)}")
return ""
def simple_sentence_split(self, text: str) -> List[str]:
"""Fallback sentence splitting without NLTK"""
sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text)
return [s.strip() for s in sentences if s.strip()]
def robust_sentence_split(self, text: str) -> List[str]:
"""Use NLTK if available, fallback to regex"""
if self.nltk:
try:
return self.nltk.sent_tokenize(text)
except:
pass
return self.simple_sentence_split(text)
def fixed_size_chunking(self, text: str, chunk_size: int, overlap_size: int = 0) -> List[Dict]:
"""Split text into fixed-size chunks with word boundary respect"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
if end >= len(text):
chunk = text[start:]
else:
chunk = text[start:end]
if not text[end].isspace():
last_space = chunk.rfind(' ')
if last_space > chunk_size * 0.7:
chunk = chunk[:last_space]
end = start + last_space
if chunk.strip():
chunks.append({
'text': chunk.strip(),
'start': start,
'end': end if end < len(text) else len(text),
'method': 'Fixed Size',
'word_count': len(chunk.split()),
'char_count': len(chunk.strip())
})
start = end - overlap_size
if start >= len(text):
break
return chunks
def sentence_chunking(self, text: str, sentences_per_chunk: int = 3) -> List[Dict]:
"""Split text into sentence-based chunks"""
sentences = self.robust_sentence_split(text)
chunks = []
current_pos = 0
for i in range(0, len(sentences), sentences_per_chunk):
chunk_sentences = sentences[i:i + sentences_per_chunk]
chunk_text = ' '.join(chunk_sentences)
start_pos = text.find(chunk_sentences[0], current_pos)
if start_pos == -1:
start_pos = current_pos
end_pos = start_pos + len(chunk_text)
current_pos = end_pos
chunks.append({
'text': chunk_text,
'start': start_pos,
'end': min(end_pos, len(text)),
'method': 'Sentence-based',
'sentence_count': len(chunk_sentences),
'word_count': len(chunk_text.split()),
'char_count': len(chunk_text)
})
return chunks
def paragraph_chunking(self, text: str) -> List[Dict]:
"""Split text by paragraph boundaries"""
paragraphs = re.split(r'\n\s*\n', text)
chunks = []
current_pos = 0
for para in paragraphs:
para = para.strip()
if para:
start_pos = text.find(para, current_pos)
if start_pos == -1:
start_pos = current_pos
end_pos = start_pos + len(para)
chunks.append({
'text': para,
'start': start_pos,
'end': end_pos,
'method': 'Paragraph-based',
'paragraph_length': len(para),
'word_count': len(para.split()),
'char_count': len(para)
})
current_pos = end_pos
return chunks
def recursive_chunking(self, text: str, max_chunk_size: int = 1000) -> List[Dict]:
"""Hierarchical text splitting with multiple separators"""
separators = ["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " "]
def _recursive_split(text: str, separators: List[str], max_size: int, depth: int = 0) -> List[str]:
if len(text) <= max_size or depth > len(separators):
return [text]
separator = separators[0] if separators else " "
if separator not in text:
if len(separators) > 1:
return _recursive_split(text, separators[1:], max_size, depth + 1)
else:
return [text[i:i+max_size] for i in range(0, len(text), max_size)]
parts = text.split(separator)
result = []
current_chunk = ""
for part in parts:
potential_chunk = current_chunk + part + separator
if len(potential_chunk) <= max_size:
current_chunk = potential_chunk
else:
if current_chunk:
result.append(current_chunk.rstrip(separator))
if len(part) > max_size:
result.extend(_recursive_split(part, separators[1:], max_size, depth + 1))
current_chunk = ""
else:
current_chunk = part + separator
if current_chunk:
result.append(current_chunk.rstrip(separator))
return result
split_texts = _recursive_split(text, separators, max_chunk_size)
chunks = []
current_pos = 0
for chunk_text in split_texts:
if chunk_text.strip():
start_pos = text.find(chunk_text, current_pos)
if start_pos == -1:
start_pos = current_pos
end_pos = start_pos + len(chunk_text)
chunks.append({
'text': chunk_text,
'start': start_pos,
'end': end_pos,
'method': 'Recursive',
'max_size': max_chunk_size,
'word_count': len(chunk_text.split()),
'char_count': len(chunk_text)
})
current_pos = end_pos
return chunks
def calculate_metrics(self, chunks: List[Dict]) -> Dict[str, Any]:
"""Calculate comprehensive chunk metrics"""
if not chunks:
return {}
char_counts = [chunk['char_count'] for chunk in chunks]
word_counts = [chunk['word_count'] for chunk in chunks]
overlap_ratio = 0
if len(chunks) > 1:
total_chars = sum(char_counts)
text_length = max(chunk['end'] for chunk in chunks)
if text_length > 0:
overlap_ratio = max(0, (total_chars - text_length) / text_length)
char_cv = np.std(char_counts) / np.mean(char_counts) if np.mean(char_counts) > 0 else 0
return {
'total_chunks': len(chunks),
'avg_chars': np.mean(char_counts),
'std_chars': np.std(char_counts),
'min_chars': min(char_counts),
'max_chars': max(char_counts),
'avg_words': np.mean(word_counts),
'std_words': np.std(word_counts),
'char_cv': char_cv,
'overlap_ratio': overlap_ratio,
'size_consistency': 1 - char_cv,
'total_coverage': sum(chunk['end'] - chunk['start'] for chunk in chunks)
}
def visualize_chunks(self, chunks: List[Dict]):
"""Display chunks with color coding"""
if not chunks:
st.write("No chunks to display")
return
st.markdown("### đ¨ Chunk Visualization")
for i, chunk in enumerate(chunks):
color = self.colors[i % len(self.colors)]
st.markdown(f"""
CHUNK {i+1} âĸ Position {chunk['start']}-{chunk['end']}
{chunk['char_count']} chars âĸ {chunk['word_count']} words
{chunk['text'][:400]}{'...' if len(chunk['text']) > 400 else ''}
""", unsafe_allow_html=True)
def create_comparison_charts(self, all_results: Dict[str, List[Dict]]):
"""Create detailed analysis charts"""
if not all_results:
return
metrics_data = []
size_data = []
for method, chunks in all_results.items():
metrics = self.calculate_metrics(chunks)
metrics_data.append({
'Method': method,
'Chunks': metrics.get('total_chunks', 0),
'Avg Size': metrics.get('avg_chars', 0),
'Consistency': metrics.get('size_consistency', 0),
'Overlap': metrics.get('overlap_ratio', 0)
})
for chunk in chunks:
size_data.append({
'Method': method,
'Size': chunk['char_count'],
'Words': chunk['word_count']
})
fig = make_subplots(
rows=2, cols=2,
subplot_titles=(
'Chunk Count Comparison',
'Size Consistency',
'Size Distribution by Method',
'Words vs Characters'
),
specs=[
[{"type": "bar"}, {"type": "bar"}],
[{"type": "box"}, {"type": "scatter"}]
]
)
df_metrics = pd.DataFrame(metrics_data)
df_sizes = pd.DataFrame(size_data)
# Chart 1: Chunk counts
fig.add_trace(
go.Bar(x=df_metrics['Method'], y=df_metrics['Chunks'],
name='Chunk Count', marker_color='lightblue'),
row=1, col=1
)
# Chart 2: Consistency scores
fig.add_trace(
go.Bar(x=df_metrics['Method'], y=df_metrics['Consistency'],
name='Consistency', marker_color='lightgreen'),
row=1, col=2
)
# Chart 3: Size distribution box plots
for method in df_sizes['Method'].unique():
method_data = df_sizes[df_sizes['Method'] == method]
fig.add_trace(
go.Box(y=method_data['Size'], name=method, boxpoints='outliers'),
row=2, col=1
)
# Chart 4: Words vs Characters scatter
for method in df_sizes['Method'].unique():
method_data = df_sizes[df_sizes['Method'] == method]
fig.add_trace(
go.Scatter(x=method_data['Words'], y=method_data['Size'],
mode='markers', name=method, opacity=0.7),
row=2, col=2
)
fig.update_layout(height=800, showlegend=True)
fig.update_xaxes(tickangle=45)
st.plotly_chart(fig, width='stretch')
def main():
st.set_page_config(
page_title="RAG Chunk Visualizer",
page_icon="đ",
layout="wide",
initial_sidebar_state="expanded"
)
# Header
col1, col2 = st.columns([3, 1])
with col1:
st.title("đ RAG Chunk Visualizer")
st.markdown("**Professional chunking analysis for RAG systems**")
with col2:
if st.button("âšī¸ About", help="Learn about chunking strategies"):
with st.expander("Chunking Methods Explained", expanded=True):
st.markdown("""
**Fixed Size**: Splits text at character boundaries with word respect
**Sentence-based**: Groups sentences together for semantic coherence
**Paragraph-based**: Respects document structure and topic boundaries
**Recursive**: Hierarchical splitting using multiple separators
""")
visualizer = ChunkVisualizer()
# Sidebar for configuration
with st.sidebar:
st.header("âī¸ Configuration")
# Input method selection
input_method = st.radio(
"Choose input method:",
["đ Upload File", "âī¸ Custom Input"],
help="Select how you want to provide text for analysis"
)
# File upload or text input
text = ""
if input_method == "đ Upload File":
st.markdown("**File Upload**")
uploaded_file = st.file_uploader(
"Choose a file",
type=['txt', 'pdf', 'csv', 'xlsx', 'xls', 'docx'],
help="Supports: TXT, PDF, CSV, Excel (XLSX/XLS), Word (DOCX)"
)
if uploaded_file is not None:
st.success(f"đ File loaded: **{uploaded_file.name}**")
# Show file info
with st.expander("File Details", expanded=False):
st.write(f"**Name:** {uploaded_file.name}")
st.write(f"**Size:** {len(uploaded_file.getvalue()):,} bytes")
st.write(f"**Type:** {uploaded_file.type}")
# Process the file
file_name = uploaded_file.name.lower()
with st.spinner(f"Processing {uploaded_file.name}..."):
try:
if file_name.endswith('.txt'):
uploaded_file.seek(0)
text = str(uploaded_file.read(), "utf-8")
elif file_name.endswith('.pdf'):
text = visualizer.extract_text_from_pdf(uploaded_file)
elif file_name.endswith('.csv'):
text = visualizer.extract_text_from_csv(uploaded_file)
elif file_name.endswith(('.xlsx', '.xls')):
text = visualizer.extract_text_from_excel(uploaded_file)
elif file_name.endswith('.docx'):
text = visualizer.extract_text_from_docx(uploaded_file)
else:
st.warning("Unsupported file type - trying as text...")
uploaded_file.seek(0)
text = str(uploaded_file.read(), "utf-8")
except Exception as e:
st.error(f"Error processing file: {str(e)}")
text = ""
# Show processing results
if text and len(text.strip()) > 0:
st.success(f"â
Extracted {len(text):,} characters")
# Show preview
preview_text = text[:300] + "..." if len(text) > 300 else text
st.text_area(
"Content Preview:",
value=preview_text,
height=100,
disabled=True,
help="First 300 characters of extracted text"
)
else:
st.error("â No text could be extracted from the file")
else:
st.info("đ Choose a file to upload")
else: # Custom Input
text = st.text_area(
"Enter your text:",
height=200,
placeholder="Paste or type your text here to analyze different chunking strategies...",
help="Paste or type the text you want to analyze"
)
# Only show chunking options if we have text
if text and len(text.strip()) > 0:
st.divider()
# Method selection
st.subheader("đ§ Chunking Methods")
method_options = {
'Fixed Size': 'Character-based splitting with word boundaries',
'Sentence-based': 'Group by sentences for readability',
'Paragraph-based': 'Respect document structure',
'Recursive': 'Hierarchical splitting with multiple separators'
}
selected_methods = []
for method, description in method_options.items():
if st.checkbox(method, value=method in ['Fixed Size', 'Sentence-based'], help=description):
selected_methods.append(method)
if not selected_methods:
st.warning("â ī¸ Select at least one chunking method")
st.divider()
# Parameters
st.subheader("âī¸ Parameters")
params = {}
if 'Fixed Size' in selected_methods:
st.markdown("**Fixed Size Settings**")
params['chunk_size'] = st.slider("Chunk size (characters)", 200, 2000, 800, step=50)
params['overlap'] = st.slider("Overlap (characters)", 0, 300, 100, step=25)
if 'Sentence-based' in selected_methods:
st.markdown("**Sentence-based Settings**")
params['sentences_per_chunk'] = st.slider("Sentences per chunk", 1, 10, 4)
if 'Recursive' in selected_methods:
st.markdown("**Recursive Settings**")
params['max_recursive_size'] = st.slider("Max chunk size", 500, 2000, 1200, step=100)
else:
selected_methods = []
params = {}
# Main content area
if text and len(text.strip()) > 0 and selected_methods:
# Process text with selected methods
with st.spinner("Processing chunks..."):
all_results = {}
for method in selected_methods:
if method == 'Fixed Size':
chunks = visualizer.fixed_size_chunking(
text, params.get('chunk_size', 800), params.get('overlap', 100)
)
elif method == 'Sentence-based':
chunks = visualizer.sentence_chunking(
text, params.get('sentences_per_chunk', 4)
)
elif method == 'Paragraph-based':
chunks = visualizer.paragraph_chunking(text)
elif method == 'Recursive':
chunks = visualizer.recursive_chunking(
text, params.get('max_recursive_size', 1200)
)
all_results[method] = chunks
st.success(f"â
Processed {len(text):,} characters with {len(selected_methods)} methods")
# Display results in tabs
tabs = st.tabs([f"đ {method}" for method in selected_methods] + ["đ Comparison"])
# Individual method tabs
for i, (method, chunks) in enumerate(all_results.items()):
with tabs[i]:
metrics = visualizer.calculate_metrics(chunks)
# Metrics display
col1, col2, col3, col4, col5 = st.columns(5)
with col1:
st.metric("Total Chunks", metrics.get('total_chunks', 0))
with col2:
st.metric("Avg Characters", f"{metrics.get('avg_chars', 0):.0f}")
with col3:
st.metric("Avg Words", f"{metrics.get('avg_words', 0):.0f}")
with col4:
st.metric("Consistency", f"{metrics.get('size_consistency', 0):.2f}")
with col5:
overlap_pct = metrics.get('overlap_ratio', 0) * 100
st.metric("Overlap", f"{overlap_pct:.1f}%")
# Visualize chunks
visualizer.visualize_chunks(chunks)
# Size distribution chart
if len(chunks) > 1:
sizes = [chunk['char_count'] for chunk in chunks]
fig = px.histogram(
x=sizes, nbins=min(20, len(chunks)),
title=f"{method} - Chunk Size Distribution",
labels={'x': 'Characters', 'y': 'Count'}
)
fig.update_layout(height=300)
st.plotly_chart(fig, width='stretch')
# Comparison tab
with tabs[-1]:
st.header("đ Comprehensive Analysis")
# Comparison charts
visualizer.create_comparison_charts(all_results)
# Metrics table
st.subheader("đ Detailed Metrics Comparison")
comparison_data = []
for method, chunks in all_results.items():
metrics = visualizer.calculate_metrics(chunks)
comparison_data.append({
'Method': method,
'Chunks': metrics.get('total_chunks', 0),
'Avg Size': f"{metrics.get('avg_chars', 0):.0f}",
'Size StdDev': f"{metrics.get('std_chars', 0):.0f}",
'Consistency': f"{metrics.get('size_consistency', 0):.3f}",
'Overlap %': f"{metrics.get('overlap_ratio', 0)*100:.1f}%"
})
df_comparison = pd.DataFrame(comparison_data)
st.dataframe(df_comparison, width='stretch')
# Recommendations
st.subheader("đĄ Recommendations")
best_consistency = max(all_results.keys(),
key=lambda m: visualizer.calculate_metrics(all_results[m]).get('size_consistency', 0))
optimal_size_method = min(all_results.keys(),
key=lambda m: abs(visualizer.calculate_metrics(all_results[m]).get('avg_chars', 1000) - 600))
col1, col2 = st.columns(2)
with col1:
st.success(f"đ¯ **Most Consistent**: {best_consistency}")
consistency_score = visualizer.calculate_metrics(all_results[best_consistency]).get('size_consistency', 0)
st.write(f"Consistency score: {consistency_score:.3f}")
with col2:
st.info(f"âī¸ **Optimal Size**: {optimal_size_method}")
avg_size = visualizer.calculate_metrics(all_results[optimal_size_method]).get('avg_chars', 0)
st.write(f"Average size: {avg_size:.0f} characters")
# Use case recommendations
st.markdown("### đĄ Use Case Recommendations")
recommendations = {
"đ **Search & Retrieval**": "Use Fixed Size (600-800 chars) for consistent embedding",
"đ **Document Processing**": "Use Paragraph-based to preserve structure",
"đ¤ **LLM Input**": "Use Fixed Size (800-1200 chars) for token management",
"đ **Reading Comprehension**": "Use Sentence-based for natural flow",
"đ **Data Pipeline**": "Use Recursive for robust processing"
}
for use_case, recommendation in recommendations.items():
st.markdown(f"- {use_case}: {recommendation}")
else:
# Welcome screen when no text is provided
st.markdown("""
## đ Welcome to the RAG Chunk Visualizer
This tool analyzes how different chunking strategies split your documents for RAG systems.
### đ Getting Started
**Step 1:** Choose your input method in the sidebar:
- **đ Upload File**: Support for PDF, Excel, CSV, Word, and text files
- **âī¸ Custom Input**: Paste or type your own text
**Step 2:** Select chunking methods to compare (2-3 recommended)
**Step 3:** Adjust parameters for each method
**Step 4:** Analyze results with comprehensive metrics and visualizations
### đ§ Available Chunking Methods
- **Fixed Size**: Consistent character-based chunks with word boundaries
- **Sentence-based**: Natural language flow with sentence grouping
- **Paragraph-based**: Document structure preservation
- **Recursive**: Hierarchical splitting with multiple separators
### đ¯ Key Features
- **Real-time comparison** of different chunking strategies
- **Advanced metrics** including consistency scores and overlap analysis
- **Interactive visualizations** with detailed chunk inspection
- **Professional recommendations** for different use cases
- **Multi-format support** for various document types
### đ Supported File Formats
- **đ PDF**: Research papers, reports, documentation
- **đ Excel (XLSX/XLS)**: Spreadsheets, data tables, financial reports
- **đ CSV**: Data exports, logs, structured datasets
- **đ Word (DOCX)**: Business documents, proposals, manuscripts
- **đ Text (TXT)**: Plain text files, code, notes
---
**Ready to begin?** Select your input method in the sidebar! đ
""")
# Show example use cases
st.subheader("đĄ Example Use Cases")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("""
**đ RAG Optimization**
- Find optimal chunk sizes
- Minimize overlap issues
- Improve retrieval accuracy
- Balance context vs precision
""")
with col2:
st.markdown("""
**đ Document Processing**
- Preserve document structure
- Handle different file formats
- Maintain readability
- Process large documents
""")
with col3:
st.markdown("""
**đ¤ LLM Integration**
- Manage token limits
- Optimize context windows
- Improve response quality
- Reduce processing costs
""")
if __name__ == "__main__":
main()