import streamlit as st import re import numpy as np import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots import pandas as pd import io import time from typing import List, Dict, Any # Safe model loading without cache permission issues @st.cache_resource def load_sentence_transformer(): st.info("âš ī¸ Semantic chunking disabled in this environment") return None @st.cache_resource def load_nltk(): try: import nltk try: nltk.data.find('tokenizers/punkt') except LookupError: try: nltk.download('punkt', quiet=True) except: pass return nltk except ImportError: return None class ChunkVisualizer: def __init__(self): self.colors = [ '#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FECA57', '#FD79A8', '#A29BFE', '#6C5CE7', '#74B9FF', '#00B894' ] self.model = None self.nltk = None def initialize_models(self): """Lazy load models only when needed""" if self.model is None: self.model = load_sentence_transformer() if self.nltk is None: self.nltk = load_nltk() def extract_text_from_pdf(self, pdf_file): """Extract text from PDF file""" try: import PyPDF2 pdf_file.seek(0) pdf_reader = PyPDF2.PdfReader(pdf_file) text = "" st.write(f"📄 Processing PDF with {len(pdf_reader.pages)} pages...") for page_num, page in enumerate(pdf_reader.pages): try: page_text = page.extract_text() if page_text.strip(): text += f"\n--- Page {page_num + 1} ---\n{page_text}\n" except Exception as e: st.warning(f"Could not extract text from page {page_num + 1}: {str(e)}") if not text.strip(): st.warning("PDF appears to be image-based or empty.") return "No extractable text found in PDF document." return text.strip() except Exception as e: st.error(f"Error reading PDF: {str(e)}") return "" def extract_text_from_excel(self, excel_file): """Extract text from Excel file""" try: excel_file.seek(0) try: xl_data = pd.read_excel(excel_file, sheet_name=None, engine='openpyxl') except: try: xl_data = pd.read_excel(excel_file, sheet_name=None, engine='xlrd') except: xl_data = pd.read_excel(excel_file, sheet_name=None) text = "" sheet_count = len(xl_data) st.write(f"📊 Processing Excel file with {sheet_count} sheet(s)...") for sheet_name, df in xl_data.items(): text += f"\n=== Sheet: {sheet_name} ===\n" if not df.empty: headers = " | ".join(str(col) for col in df.columns) text += f"Headers: {headers}\n" text += "-" * 50 + "\n" max_rows = min(100, len(df)) for idx, row in df.head(max_rows).iterrows(): row_text = " | ".join(str(val) if pd.notna(val) else "" for val in row) text += row_text + "\n" if len(df) > max_rows: text += f"... ({len(df) - max_rows} more rows)\n" else: text += "Empty sheet\n" text += "\n" return text.strip() except Exception as e: st.error(f"Error reading Excel file: {str(e)}") return "" def extract_text_from_csv(self, csv_file): """Extract text from CSV file""" try: csv_file.seek(0) for encoding in ['utf-8', 'latin-1', 'cp1252']: try: csv_file.seek(0) df = pd.read_csv(csv_file, encoding=encoding) break except UnicodeDecodeError: continue else: df = pd.read_csv(csv_file) if df.empty: return "Empty CSV file" st.write(f"📋 Processing CSV with {len(df)} rows and {len(df.columns)} columns...") text = "=== CSV Data ===\n" headers = " | ".join(str(col) for col in df.columns) text += f"Headers: {headers}\n" text += "-" * 50 + "\n" max_rows = min(100, len(df)) for _, row in df.head(max_rows).iterrows(): row_text = " | ".join(str(val) if pd.notna(val) else "" for val in row) text += row_text + "\n" if len(df) > max_rows: text += f"... ({len(df) - max_rows} more rows)\n" return text.strip() except Exception as e: st.error(f"Error reading CSV file: {str(e)}") return "" def extract_text_from_docx(self, docx_file): """Extract text from Word document""" try: from docx import Document docx_file.seek(0) doc = Document(docx_file) text = "" for paragraph in doc.paragraphs: if paragraph.text.strip(): text += paragraph.text + "\n" for table in doc.tables: text += "\n=== Table ===\n" for row in table.rows: row_text = " | ".join(cell.text.strip() for cell in row.cells) text += row_text + "\n" text += "\n" return text.strip() except Exception as e: st.error(f"Error reading Word document: {str(e)}") return "" def simple_sentence_split(self, text: str) -> List[str]: """Fallback sentence splitting without NLTK""" sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text) return [s.strip() for s in sentences if s.strip()] def robust_sentence_split(self, text: str) -> List[str]: """Use NLTK if available, fallback to regex""" if self.nltk: try: return self.nltk.sent_tokenize(text) except: pass return self.simple_sentence_split(text) def fixed_size_chunking(self, text: str, chunk_size: int, overlap_size: int = 0) -> List[Dict]: """Split text into fixed-size chunks with word boundary respect""" chunks = [] start = 0 while start < len(text): end = start + chunk_size if end >= len(text): chunk = text[start:] else: chunk = text[start:end] if not text[end].isspace(): last_space = chunk.rfind(' ') if last_space > chunk_size * 0.7: chunk = chunk[:last_space] end = start + last_space if chunk.strip(): chunks.append({ 'text': chunk.strip(), 'start': start, 'end': end if end < len(text) else len(text), 'method': 'Fixed Size', 'word_count': len(chunk.split()), 'char_count': len(chunk.strip()) }) start = end - overlap_size if start >= len(text): break return chunks def sentence_chunking(self, text: str, sentences_per_chunk: int = 3) -> List[Dict]: """Split text into sentence-based chunks""" sentences = self.robust_sentence_split(text) chunks = [] current_pos = 0 for i in range(0, len(sentences), sentences_per_chunk): chunk_sentences = sentences[i:i + sentences_per_chunk] chunk_text = ' '.join(chunk_sentences) start_pos = text.find(chunk_sentences[0], current_pos) if start_pos == -1: start_pos = current_pos end_pos = start_pos + len(chunk_text) current_pos = end_pos chunks.append({ 'text': chunk_text, 'start': start_pos, 'end': min(end_pos, len(text)), 'method': 'Sentence-based', 'sentence_count': len(chunk_sentences), 'word_count': len(chunk_text.split()), 'char_count': len(chunk_text) }) return chunks def paragraph_chunking(self, text: str) -> List[Dict]: """Split text by paragraph boundaries""" paragraphs = re.split(r'\n\s*\n', text) chunks = [] current_pos = 0 for para in paragraphs: para = para.strip() if para: start_pos = text.find(para, current_pos) if start_pos == -1: start_pos = current_pos end_pos = start_pos + len(para) chunks.append({ 'text': para, 'start': start_pos, 'end': end_pos, 'method': 'Paragraph-based', 'paragraph_length': len(para), 'word_count': len(para.split()), 'char_count': len(para) }) current_pos = end_pos return chunks def recursive_chunking(self, text: str, max_chunk_size: int = 1000) -> List[Dict]: """Hierarchical text splitting with multiple separators""" separators = ["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " "] def _recursive_split(text: str, separators: List[str], max_size: int, depth: int = 0) -> List[str]: if len(text) <= max_size or depth > len(separators): return [text] separator = separators[0] if separators else " " if separator not in text: if len(separators) > 1: return _recursive_split(text, separators[1:], max_size, depth + 1) else: return [text[i:i+max_size] for i in range(0, len(text), max_size)] parts = text.split(separator) result = [] current_chunk = "" for part in parts: potential_chunk = current_chunk + part + separator if len(potential_chunk) <= max_size: current_chunk = potential_chunk else: if current_chunk: result.append(current_chunk.rstrip(separator)) if len(part) > max_size: result.extend(_recursive_split(part, separators[1:], max_size, depth + 1)) current_chunk = "" else: current_chunk = part + separator if current_chunk: result.append(current_chunk.rstrip(separator)) return result split_texts = _recursive_split(text, separators, max_chunk_size) chunks = [] current_pos = 0 for chunk_text in split_texts: if chunk_text.strip(): start_pos = text.find(chunk_text, current_pos) if start_pos == -1: start_pos = current_pos end_pos = start_pos + len(chunk_text) chunks.append({ 'text': chunk_text, 'start': start_pos, 'end': end_pos, 'method': 'Recursive', 'max_size': max_chunk_size, 'word_count': len(chunk_text.split()), 'char_count': len(chunk_text) }) current_pos = end_pos return chunks def calculate_metrics(self, chunks: List[Dict]) -> Dict[str, Any]: """Calculate comprehensive chunk metrics""" if not chunks: return {} char_counts = [chunk['char_count'] for chunk in chunks] word_counts = [chunk['word_count'] for chunk in chunks] overlap_ratio = 0 if len(chunks) > 1: total_chars = sum(char_counts) text_length = max(chunk['end'] for chunk in chunks) if text_length > 0: overlap_ratio = max(0, (total_chars - text_length) / text_length) char_cv = np.std(char_counts) / np.mean(char_counts) if np.mean(char_counts) > 0 else 0 return { 'total_chunks': len(chunks), 'avg_chars': np.mean(char_counts), 'std_chars': np.std(char_counts), 'min_chars': min(char_counts), 'max_chars': max(char_counts), 'avg_words': np.mean(word_counts), 'std_words': np.std(word_counts), 'char_cv': char_cv, 'overlap_ratio': overlap_ratio, 'size_consistency': 1 - char_cv, 'total_coverage': sum(chunk['end'] - chunk['start'] for chunk in chunks) } def visualize_chunks(self, chunks: List[Dict]): """Display chunks with color coding""" if not chunks: st.write("No chunks to display") return st.markdown("### 🎨 Chunk Visualization") for i, chunk in enumerate(chunks): color = self.colors[i % len(self.colors)] st.markdown(f"""
CHUNK {i+1} â€ĸ Position {chunk['start']}-{chunk['end']}
{chunk['char_count']} chars â€ĸ {chunk['word_count']} words
{chunk['text'][:400]}{'...' if len(chunk['text']) > 400 else ''}
""", unsafe_allow_html=True) def create_comparison_charts(self, all_results: Dict[str, List[Dict]]): """Create detailed analysis charts""" if not all_results: return metrics_data = [] size_data = [] for method, chunks in all_results.items(): metrics = self.calculate_metrics(chunks) metrics_data.append({ 'Method': method, 'Chunks': metrics.get('total_chunks', 0), 'Avg Size': metrics.get('avg_chars', 0), 'Consistency': metrics.get('size_consistency', 0), 'Overlap': metrics.get('overlap_ratio', 0) }) for chunk in chunks: size_data.append({ 'Method': method, 'Size': chunk['char_count'], 'Words': chunk['word_count'] }) fig = make_subplots( rows=2, cols=2, subplot_titles=( 'Chunk Count Comparison', 'Size Consistency', 'Size Distribution by Method', 'Words vs Characters' ), specs=[ [{"type": "bar"}, {"type": "bar"}], [{"type": "box"}, {"type": "scatter"}] ] ) df_metrics = pd.DataFrame(metrics_data) df_sizes = pd.DataFrame(size_data) # Chart 1: Chunk counts fig.add_trace( go.Bar(x=df_metrics['Method'], y=df_metrics['Chunks'], name='Chunk Count', marker_color='lightblue'), row=1, col=1 ) # Chart 2: Consistency scores fig.add_trace( go.Bar(x=df_metrics['Method'], y=df_metrics['Consistency'], name='Consistency', marker_color='lightgreen'), row=1, col=2 ) # Chart 3: Size distribution box plots for method in df_sizes['Method'].unique(): method_data = df_sizes[df_sizes['Method'] == method] fig.add_trace( go.Box(y=method_data['Size'], name=method, boxpoints='outliers'), row=2, col=1 ) # Chart 4: Words vs Characters scatter for method in df_sizes['Method'].unique(): method_data = df_sizes[df_sizes['Method'] == method] fig.add_trace( go.Scatter(x=method_data['Words'], y=method_data['Size'], mode='markers', name=method, opacity=0.7), row=2, col=2 ) fig.update_layout(height=800, showlegend=True) fig.update_xaxes(tickangle=45) st.plotly_chart(fig, width='stretch') def main(): st.set_page_config( page_title="RAG Chunk Visualizer", page_icon="🔍", layout="wide", initial_sidebar_state="expanded" ) # Header col1, col2 = st.columns([3, 1]) with col1: st.title("🔍 RAG Chunk Visualizer") st.markdown("**Professional chunking analysis for RAG systems**") with col2: if st.button("â„šī¸ About", help="Learn about chunking strategies"): with st.expander("Chunking Methods Explained", expanded=True): st.markdown(""" **Fixed Size**: Splits text at character boundaries with word respect **Sentence-based**: Groups sentences together for semantic coherence **Paragraph-based**: Respects document structure and topic boundaries **Recursive**: Hierarchical splitting using multiple separators """) visualizer = ChunkVisualizer() # Sidebar for configuration with st.sidebar: st.header("âš™ī¸ Configuration") # Input method selection input_method = st.radio( "Choose input method:", ["📁 Upload File", "âœī¸ Custom Input"], help="Select how you want to provide text for analysis" ) # File upload or text input text = "" if input_method == "📁 Upload File": st.markdown("**File Upload**") uploaded_file = st.file_uploader( "Choose a file", type=['txt', 'pdf', 'csv', 'xlsx', 'xls', 'docx'], help="Supports: TXT, PDF, CSV, Excel (XLSX/XLS), Word (DOCX)" ) if uploaded_file is not None: st.success(f"📁 File loaded: **{uploaded_file.name}**") # Show file info with st.expander("File Details", expanded=False): st.write(f"**Name:** {uploaded_file.name}") st.write(f"**Size:** {len(uploaded_file.getvalue()):,} bytes") st.write(f"**Type:** {uploaded_file.type}") # Process the file file_name = uploaded_file.name.lower() with st.spinner(f"Processing {uploaded_file.name}..."): try: if file_name.endswith('.txt'): uploaded_file.seek(0) text = str(uploaded_file.read(), "utf-8") elif file_name.endswith('.pdf'): text = visualizer.extract_text_from_pdf(uploaded_file) elif file_name.endswith('.csv'): text = visualizer.extract_text_from_csv(uploaded_file) elif file_name.endswith(('.xlsx', '.xls')): text = visualizer.extract_text_from_excel(uploaded_file) elif file_name.endswith('.docx'): text = visualizer.extract_text_from_docx(uploaded_file) else: st.warning("Unsupported file type - trying as text...") uploaded_file.seek(0) text = str(uploaded_file.read(), "utf-8") except Exception as e: st.error(f"Error processing file: {str(e)}") text = "" # Show processing results if text and len(text.strip()) > 0: st.success(f"✅ Extracted {len(text):,} characters") # Show preview preview_text = text[:300] + "..." if len(text) > 300 else text st.text_area( "Content Preview:", value=preview_text, height=100, disabled=True, help="First 300 characters of extracted text" ) else: st.error("❌ No text could be extracted from the file") else: st.info("👆 Choose a file to upload") else: # Custom Input text = st.text_area( "Enter your text:", height=200, placeholder="Paste or type your text here to analyze different chunking strategies...", help="Paste or type the text you want to analyze" ) # Only show chunking options if we have text if text and len(text.strip()) > 0: st.divider() # Method selection st.subheader("🔧 Chunking Methods") method_options = { 'Fixed Size': 'Character-based splitting with word boundaries', 'Sentence-based': 'Group by sentences for readability', 'Paragraph-based': 'Respect document structure', 'Recursive': 'Hierarchical splitting with multiple separators' } selected_methods = [] for method, description in method_options.items(): if st.checkbox(method, value=method in ['Fixed Size', 'Sentence-based'], help=description): selected_methods.append(method) if not selected_methods: st.warning("âš ī¸ Select at least one chunking method") st.divider() # Parameters st.subheader("âš™ī¸ Parameters") params = {} if 'Fixed Size' in selected_methods: st.markdown("**Fixed Size Settings**") params['chunk_size'] = st.slider("Chunk size (characters)", 200, 2000, 800, step=50) params['overlap'] = st.slider("Overlap (characters)", 0, 300, 100, step=25) if 'Sentence-based' in selected_methods: st.markdown("**Sentence-based Settings**") params['sentences_per_chunk'] = st.slider("Sentences per chunk", 1, 10, 4) if 'Recursive' in selected_methods: st.markdown("**Recursive Settings**") params['max_recursive_size'] = st.slider("Max chunk size", 500, 2000, 1200, step=100) else: selected_methods = [] params = {} # Main content area if text and len(text.strip()) > 0 and selected_methods: # Process text with selected methods with st.spinner("Processing chunks..."): all_results = {} for method in selected_methods: if method == 'Fixed Size': chunks = visualizer.fixed_size_chunking( text, params.get('chunk_size', 800), params.get('overlap', 100) ) elif method == 'Sentence-based': chunks = visualizer.sentence_chunking( text, params.get('sentences_per_chunk', 4) ) elif method == 'Paragraph-based': chunks = visualizer.paragraph_chunking(text) elif method == 'Recursive': chunks = visualizer.recursive_chunking( text, params.get('max_recursive_size', 1200) ) all_results[method] = chunks st.success(f"✅ Processed {len(text):,} characters with {len(selected_methods)} methods") # Display results in tabs tabs = st.tabs([f"📊 {method}" for method in selected_methods] + ["📈 Comparison"]) # Individual method tabs for i, (method, chunks) in enumerate(all_results.items()): with tabs[i]: metrics = visualizer.calculate_metrics(chunks) # Metrics display col1, col2, col3, col4, col5 = st.columns(5) with col1: st.metric("Total Chunks", metrics.get('total_chunks', 0)) with col2: st.metric("Avg Characters", f"{metrics.get('avg_chars', 0):.0f}") with col3: st.metric("Avg Words", f"{metrics.get('avg_words', 0):.0f}") with col4: st.metric("Consistency", f"{metrics.get('size_consistency', 0):.2f}") with col5: overlap_pct = metrics.get('overlap_ratio', 0) * 100 st.metric("Overlap", f"{overlap_pct:.1f}%") # Visualize chunks visualizer.visualize_chunks(chunks) # Size distribution chart if len(chunks) > 1: sizes = [chunk['char_count'] for chunk in chunks] fig = px.histogram( x=sizes, nbins=min(20, len(chunks)), title=f"{method} - Chunk Size Distribution", labels={'x': 'Characters', 'y': 'Count'} ) fig.update_layout(height=300) st.plotly_chart(fig, width='stretch') # Comparison tab with tabs[-1]: st.header("📈 Comprehensive Analysis") # Comparison charts visualizer.create_comparison_charts(all_results) # Metrics table st.subheader("📊 Detailed Metrics Comparison") comparison_data = [] for method, chunks in all_results.items(): metrics = visualizer.calculate_metrics(chunks) comparison_data.append({ 'Method': method, 'Chunks': metrics.get('total_chunks', 0), 'Avg Size': f"{metrics.get('avg_chars', 0):.0f}", 'Size StdDev': f"{metrics.get('std_chars', 0):.0f}", 'Consistency': f"{metrics.get('size_consistency', 0):.3f}", 'Overlap %': f"{metrics.get('overlap_ratio', 0)*100:.1f}%" }) df_comparison = pd.DataFrame(comparison_data) st.dataframe(df_comparison, width='stretch') # Recommendations st.subheader("💡 Recommendations") best_consistency = max(all_results.keys(), key=lambda m: visualizer.calculate_metrics(all_results[m]).get('size_consistency', 0)) optimal_size_method = min(all_results.keys(), key=lambda m: abs(visualizer.calculate_metrics(all_results[m]).get('avg_chars', 1000) - 600)) col1, col2 = st.columns(2) with col1: st.success(f"đŸŽ¯ **Most Consistent**: {best_consistency}") consistency_score = visualizer.calculate_metrics(all_results[best_consistency]).get('size_consistency', 0) st.write(f"Consistency score: {consistency_score:.3f}") with col2: st.info(f"âš–ī¸ **Optimal Size**: {optimal_size_method}") avg_size = visualizer.calculate_metrics(all_results[optimal_size_method]).get('avg_chars', 0) st.write(f"Average size: {avg_size:.0f} characters") # Use case recommendations st.markdown("### 💡 Use Case Recommendations") recommendations = { "🔍 **Search & Retrieval**": "Use Fixed Size (600-800 chars) for consistent embedding", "📚 **Document Processing**": "Use Paragraph-based to preserve structure", "🤖 **LLM Input**": "Use Fixed Size (800-1200 chars) for token management", "📖 **Reading Comprehension**": "Use Sentence-based for natural flow", "🔄 **Data Pipeline**": "Use Recursive for robust processing" } for use_case, recommendation in recommendations.items(): st.markdown(f"- {use_case}: {recommendation}") else: # Welcome screen when no text is provided st.markdown(""" ## 👋 Welcome to the RAG Chunk Visualizer This tool analyzes how different chunking strategies split your documents for RAG systems. ### 🚀 Getting Started **Step 1:** Choose your input method in the sidebar: - **📁 Upload File**: Support for PDF, Excel, CSV, Word, and text files - **âœī¸ Custom Input**: Paste or type your own text **Step 2:** Select chunking methods to compare (2-3 recommended) **Step 3:** Adjust parameters for each method **Step 4:** Analyze results with comprehensive metrics and visualizations ### 🔧 Available Chunking Methods - **Fixed Size**: Consistent character-based chunks with word boundaries - **Sentence-based**: Natural language flow with sentence grouping - **Paragraph-based**: Document structure preservation - **Recursive**: Hierarchical splitting with multiple separators ### đŸŽ¯ Key Features - **Real-time comparison** of different chunking strategies - **Advanced metrics** including consistency scores and overlap analysis - **Interactive visualizations** with detailed chunk inspection - **Professional recommendations** for different use cases - **Multi-format support** for various document types ### 📁 Supported File Formats - **📄 PDF**: Research papers, reports, documentation - **📊 Excel (XLSX/XLS)**: Spreadsheets, data tables, financial reports - **📋 CSV**: Data exports, logs, structured datasets - **📝 Word (DOCX)**: Business documents, proposals, manuscripts - **📜 Text (TXT)**: Plain text files, code, notes --- **Ready to begin?** Select your input method in the sidebar! 👈 """) # Show example use cases st.subheader("💡 Example Use Cases") col1, col2, col3 = st.columns(3) with col1: st.markdown(""" **🔍 RAG Optimization** - Find optimal chunk sizes - Minimize overlap issues - Improve retrieval accuracy - Balance context vs precision """) with col2: st.markdown(""" **📚 Document Processing** - Preserve document structure - Handle different file formats - Maintain readability - Process large documents """) with col3: st.markdown(""" **🤖 LLM Integration** - Manage token limits - Optimize context windows - Improve response quality - Reduce processing costs """) if __name__ == "__main__": main()