Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import re | |
| import numpy as np | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| import pandas as pd | |
| import io | |
| import time | |
| from typing import List, Dict, Any | |
| # Safe model loading without cache permission issues | |
| def load_sentence_transformer(): | |
| st.info("β οΈ Semantic chunking disabled in this environment") | |
| return None | |
| def load_nltk(): | |
| try: | |
| import nltk | |
| try: | |
| nltk.data.find('tokenizers/punkt') | |
| except LookupError: | |
| try: | |
| nltk.download('punkt', quiet=True) | |
| except: | |
| pass | |
| return nltk | |
| except ImportError: | |
| return None | |
| class ChunkVisualizer: | |
| def __init__(self): | |
| self.colors = [ | |
| '#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FECA57', | |
| '#FD79A8', '#A29BFE', '#6C5CE7', '#74B9FF', '#00B894' | |
| ] | |
| self.model = None | |
| self.nltk = None | |
| def initialize_models(self): | |
| """Lazy load models only when needed""" | |
| if self.model is None: | |
| self.model = load_sentence_transformer() | |
| if self.nltk is None: | |
| self.nltk = load_nltk() | |
| def extract_text_from_pdf(self, pdf_file): | |
| """Extract text from PDF file""" | |
| try: | |
| import PyPDF2 | |
| pdf_file.seek(0) | |
| pdf_reader = PyPDF2.PdfReader(pdf_file) | |
| text = "" | |
| st.write(f"π Processing PDF with {len(pdf_reader.pages)} pages...") | |
| for page_num, page in enumerate(pdf_reader.pages): | |
| try: | |
| page_text = page.extract_text() | |
| if page_text.strip(): | |
| text += f"\n--- Page {page_num + 1} ---\n{page_text}\n" | |
| except Exception as e: | |
| st.warning(f"Could not extract text from page {page_num + 1}: {str(e)}") | |
| if not text.strip(): | |
| st.warning("PDF appears to be image-based or empty.") | |
| return "No extractable text found in PDF document." | |
| return text.strip() | |
| except Exception as e: | |
| st.error(f"Error reading PDF: {str(e)}") | |
| return "" | |
| def extract_text_from_excel(self, excel_file): | |
| """Extract text from Excel file""" | |
| try: | |
| excel_file.seek(0) | |
| try: | |
| xl_data = pd.read_excel(excel_file, sheet_name=None, engine='openpyxl') | |
| except: | |
| try: | |
| xl_data = pd.read_excel(excel_file, sheet_name=None, engine='xlrd') | |
| except: | |
| xl_data = pd.read_excel(excel_file, sheet_name=None) | |
| text = "" | |
| sheet_count = len(xl_data) | |
| st.write(f"π Processing Excel file with {sheet_count} sheet(s)...") | |
| for sheet_name, df in xl_data.items(): | |
| text += f"\n=== Sheet: {sheet_name} ===\n" | |
| if not df.empty: | |
| headers = " | ".join(str(col) for col in df.columns) | |
| text += f"Headers: {headers}\n" | |
| text += "-" * 50 + "\n" | |
| max_rows = min(100, len(df)) | |
| for idx, row in df.head(max_rows).iterrows(): | |
| row_text = " | ".join(str(val) if pd.notna(val) else "" for val in row) | |
| text += row_text + "\n" | |
| if len(df) > max_rows: | |
| text += f"... ({len(df) - max_rows} more rows)\n" | |
| else: | |
| text += "Empty sheet\n" | |
| text += "\n" | |
| return text.strip() | |
| except Exception as e: | |
| st.error(f"Error reading Excel file: {str(e)}") | |
| return "" | |
| def extract_text_from_csv(self, csv_file): | |
| """Extract text from CSV file""" | |
| try: | |
| csv_file.seek(0) | |
| for encoding in ['utf-8', 'latin-1', 'cp1252']: | |
| try: | |
| csv_file.seek(0) | |
| df = pd.read_csv(csv_file, encoding=encoding) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| else: | |
| df = pd.read_csv(csv_file) | |
| if df.empty: | |
| return "Empty CSV file" | |
| st.write(f"π Processing CSV with {len(df)} rows and {len(df.columns)} columns...") | |
| text = "=== CSV Data ===\n" | |
| headers = " | ".join(str(col) for col in df.columns) | |
| text += f"Headers: {headers}\n" | |
| text += "-" * 50 + "\n" | |
| max_rows = min(100, len(df)) | |
| for _, row in df.head(max_rows).iterrows(): | |
| row_text = " | ".join(str(val) if pd.notna(val) else "" for val in row) | |
| text += row_text + "\n" | |
| if len(df) > max_rows: | |
| text += f"... ({len(df) - max_rows} more rows)\n" | |
| return text.strip() | |
| except Exception as e: | |
| st.error(f"Error reading CSV file: {str(e)}") | |
| return "" | |
| def extract_text_from_docx(self, docx_file): | |
| """Extract text from Word document""" | |
| try: | |
| from docx import Document | |
| docx_file.seek(0) | |
| doc = Document(docx_file) | |
| text = "" | |
| for paragraph in doc.paragraphs: | |
| if paragraph.text.strip(): | |
| text += paragraph.text + "\n" | |
| for table in doc.tables: | |
| text += "\n=== Table ===\n" | |
| for row in table.rows: | |
| row_text = " | ".join(cell.text.strip() for cell in row.cells) | |
| text += row_text + "\n" | |
| text += "\n" | |
| return text.strip() | |
| except Exception as e: | |
| st.error(f"Error reading Word document: {str(e)}") | |
| return "" | |
| def simple_sentence_split(self, text: str) -> List[str]: | |
| """Fallback sentence splitting without NLTK""" | |
| sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text) | |
| return [s.strip() for s in sentences if s.strip()] | |
| def robust_sentence_split(self, text: str) -> List[str]: | |
| """Use NLTK if available, fallback to regex""" | |
| if self.nltk: | |
| try: | |
| return self.nltk.sent_tokenize(text) | |
| except: | |
| pass | |
| return self.simple_sentence_split(text) | |
| def fixed_size_chunking(self, text: str, chunk_size: int, overlap_size: int = 0) -> List[Dict]: | |
| """Split text into fixed-size chunks with word boundary respect""" | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| if end >= len(text): | |
| chunk = text[start:] | |
| else: | |
| chunk = text[start:end] | |
| if not text[end].isspace(): | |
| last_space = chunk.rfind(' ') | |
| if last_space > chunk_size * 0.7: | |
| chunk = chunk[:last_space] | |
| end = start + last_space | |
| if chunk.strip(): | |
| chunks.append({ | |
| 'text': chunk.strip(), | |
| 'start': start, | |
| 'end': end if end < len(text) else len(text), | |
| 'method': 'Fixed Size', | |
| 'word_count': len(chunk.split()), | |
| 'char_count': len(chunk.strip()) | |
| }) | |
| start = end - overlap_size | |
| if start >= len(text): | |
| break | |
| return chunks | |
| def sentence_chunking(self, text: str, sentences_per_chunk: int = 3) -> List[Dict]: | |
| """Split text into sentence-based chunks""" | |
| sentences = self.robust_sentence_split(text) | |
| chunks = [] | |
| current_pos = 0 | |
| for i in range(0, len(sentences), sentences_per_chunk): | |
| chunk_sentences = sentences[i:i + sentences_per_chunk] | |
| chunk_text = ' '.join(chunk_sentences) | |
| start_pos = text.find(chunk_sentences[0], current_pos) | |
| if start_pos == -1: | |
| start_pos = current_pos | |
| end_pos = start_pos + len(chunk_text) | |
| current_pos = end_pos | |
| chunks.append({ | |
| 'text': chunk_text, | |
| 'start': start_pos, | |
| 'end': min(end_pos, len(text)), | |
| 'method': 'Sentence-based', | |
| 'sentence_count': len(chunk_sentences), | |
| 'word_count': len(chunk_text.split()), | |
| 'char_count': len(chunk_text) | |
| }) | |
| return chunks | |
| def paragraph_chunking(self, text: str) -> List[Dict]: | |
| """Split text by paragraph boundaries""" | |
| paragraphs = re.split(r'\n\s*\n', text) | |
| chunks = [] | |
| current_pos = 0 | |
| for para in paragraphs: | |
| para = para.strip() | |
| if para: | |
| start_pos = text.find(para, current_pos) | |
| if start_pos == -1: | |
| start_pos = current_pos | |
| end_pos = start_pos + len(para) | |
| chunks.append({ | |
| 'text': para, | |
| 'start': start_pos, | |
| 'end': end_pos, | |
| 'method': 'Paragraph-based', | |
| 'paragraph_length': len(para), | |
| 'word_count': len(para.split()), | |
| 'char_count': len(para) | |
| }) | |
| current_pos = end_pos | |
| return chunks | |
| def recursive_chunking(self, text: str, max_chunk_size: int = 1000) -> List[Dict]: | |
| """Hierarchical text splitting with multiple separators""" | |
| separators = ["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " "] | |
| def _recursive_split(text: str, separators: List[str], max_size: int, depth: int = 0) -> List[str]: | |
| if len(text) <= max_size or depth > len(separators): | |
| return [text] | |
| separator = separators[0] if separators else " " | |
| if separator not in text: | |
| if len(separators) > 1: | |
| return _recursive_split(text, separators[1:], max_size, depth + 1) | |
| else: | |
| return [text[i:i+max_size] for i in range(0, len(text), max_size)] | |
| parts = text.split(separator) | |
| result = [] | |
| current_chunk = "" | |
| for part in parts: | |
| potential_chunk = current_chunk + part + separator | |
| if len(potential_chunk) <= max_size: | |
| current_chunk = potential_chunk | |
| else: | |
| if current_chunk: | |
| result.append(current_chunk.rstrip(separator)) | |
| if len(part) > max_size: | |
| result.extend(_recursive_split(part, separators[1:], max_size, depth + 1)) | |
| current_chunk = "" | |
| else: | |
| current_chunk = part + separator | |
| if current_chunk: | |
| result.append(current_chunk.rstrip(separator)) | |
| return result | |
| split_texts = _recursive_split(text, separators, max_chunk_size) | |
| chunks = [] | |
| current_pos = 0 | |
| for chunk_text in split_texts: | |
| if chunk_text.strip(): | |
| start_pos = text.find(chunk_text, current_pos) | |
| if start_pos == -1: | |
| start_pos = current_pos | |
| end_pos = start_pos + len(chunk_text) | |
| chunks.append({ | |
| 'text': chunk_text, | |
| 'start': start_pos, | |
| 'end': end_pos, | |
| 'method': 'Recursive', | |
| 'max_size': max_chunk_size, | |
| 'word_count': len(chunk_text.split()), | |
| 'char_count': len(chunk_text) | |
| }) | |
| current_pos = end_pos | |
| return chunks | |
| def calculate_metrics(self, chunks: List[Dict]) -> Dict[str, Any]: | |
| """Calculate comprehensive chunk metrics""" | |
| if not chunks: | |
| return {} | |
| char_counts = [chunk['char_count'] for chunk in chunks] | |
| word_counts = [chunk['word_count'] for chunk in chunks] | |
| overlap_ratio = 0 | |
| if len(chunks) > 1: | |
| total_chars = sum(char_counts) | |
| text_length = max(chunk['end'] for chunk in chunks) | |
| if text_length > 0: | |
| overlap_ratio = max(0, (total_chars - text_length) / text_length) | |
| char_cv = np.std(char_counts) / np.mean(char_counts) if np.mean(char_counts) > 0 else 0 | |
| return { | |
| 'total_chunks': len(chunks), | |
| 'avg_chars': np.mean(char_counts), | |
| 'std_chars': np.std(char_counts), | |
| 'min_chars': min(char_counts), | |
| 'max_chars': max(char_counts), | |
| 'avg_words': np.mean(word_counts), | |
| 'std_words': np.std(word_counts), | |
| 'char_cv': char_cv, | |
| 'overlap_ratio': overlap_ratio, | |
| 'size_consistency': 1 - char_cv, | |
| 'total_coverage': sum(chunk['end'] - chunk['start'] for chunk in chunks) | |
| } | |
| def visualize_chunks(self, chunks: List[Dict]): | |
| """Display chunks with color coding""" | |
| if not chunks: | |
| st.write("No chunks to display") | |
| return | |
| st.markdown("### π¨ Chunk Visualization") | |
| for i, chunk in enumerate(chunks): | |
| color = self.colors[i % len(self.colors)] | |
| st.markdown(f""" | |
| <div style='background: linear-gradient(135deg, {color}15, {color}25); | |
| border-left: 5px solid {color}; | |
| padding: 15px; | |
| margin: 10px 0; | |
| border-radius: 8px; | |
| box-shadow: 0 2px 4px rgba(0,0,0,0.1);'> | |
| <div style='display: flex; justify-content: space-between; align-items: center; margin-bottom: 8px;'> | |
| <div style='color: {color}; font-weight: bold; font-size: 14px;'> | |
| CHUNK {i+1} β’ Position {chunk['start']}-{chunk['end']} | |
| </div> | |
| <div style='color: #666; font-size: 12px;'> | |
| {chunk['char_count']} chars β’ {chunk['word_count']} words | |
| </div> | |
| </div> | |
| <div style='color: #333; line-height: 1.6; font-size: 14px;'> | |
| {chunk['text'][:400]}{'...' if len(chunk['text']) > 400 else ''} | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| def create_comparison_charts(self, all_results: Dict[str, List[Dict]]): | |
| """Create detailed analysis charts""" | |
| if not all_results: | |
| return | |
| metrics_data = [] | |
| size_data = [] | |
| for method, chunks in all_results.items(): | |
| metrics = self.calculate_metrics(chunks) | |
| metrics_data.append({ | |
| 'Method': method, | |
| 'Chunks': metrics.get('total_chunks', 0), | |
| 'Avg Size': metrics.get('avg_chars', 0), | |
| 'Consistency': metrics.get('size_consistency', 0), | |
| 'Overlap': metrics.get('overlap_ratio', 0) | |
| }) | |
| for chunk in chunks: | |
| size_data.append({ | |
| 'Method': method, | |
| 'Size': chunk['char_count'], | |
| 'Words': chunk['word_count'] | |
| }) | |
| fig = make_subplots( | |
| rows=2, cols=2, | |
| subplot_titles=( | |
| 'Chunk Count Comparison', | |
| 'Size Consistency', | |
| 'Size Distribution by Method', | |
| 'Words vs Characters' | |
| ), | |
| specs=[ | |
| [{"type": "bar"}, {"type": "bar"}], | |
| [{"type": "box"}, {"type": "scatter"}] | |
| ] | |
| ) | |
| df_metrics = pd.DataFrame(metrics_data) | |
| df_sizes = pd.DataFrame(size_data) | |
| # Chart 1: Chunk counts | |
| fig.add_trace( | |
| go.Bar(x=df_metrics['Method'], y=df_metrics['Chunks'], | |
| name='Chunk Count', marker_color='lightblue'), | |
| row=1, col=1 | |
| ) | |
| # Chart 2: Consistency scores | |
| fig.add_trace( | |
| go.Bar(x=df_metrics['Method'], y=df_metrics['Consistency'], | |
| name='Consistency', marker_color='lightgreen'), | |
| row=1, col=2 | |
| ) | |
| # Chart 3: Size distribution box plots | |
| for method in df_sizes['Method'].unique(): | |
| method_data = df_sizes[df_sizes['Method'] == method] | |
| fig.add_trace( | |
| go.Box(y=method_data['Size'], name=method, boxpoints='outliers'), | |
| row=2, col=1 | |
| ) | |
| # Chart 4: Words vs Characters scatter | |
| for method in df_sizes['Method'].unique(): | |
| method_data = df_sizes[df_sizes['Method'] == method] | |
| fig.add_trace( | |
| go.Scatter(x=method_data['Words'], y=method_data['Size'], | |
| mode='markers', name=method, opacity=0.7), | |
| row=2, col=2 | |
| ) | |
| fig.update_layout(height=800, showlegend=True) | |
| fig.update_xaxes(tickangle=45) | |
| st.plotly_chart(fig, width='stretch') | |
| def main(): | |
| st.set_page_config( | |
| page_title="RAG Chunk Visualizer", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Header | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| st.title("π RAG Chunk Visualizer") | |
| st.markdown("**Professional chunking analysis for RAG systems**") | |
| with col2: | |
| if st.button("βΉοΈ About", help="Learn about chunking strategies"): | |
| with st.expander("Chunking Methods Explained", expanded=True): | |
| st.markdown(""" | |
| **Fixed Size**: Splits text at character boundaries with word respect | |
| **Sentence-based**: Groups sentences together for semantic coherence | |
| **Paragraph-based**: Respects document structure and topic boundaries | |
| **Recursive**: Hierarchical splitting using multiple separators | |
| """) | |
| visualizer = ChunkVisualizer() | |
| # Sidebar for configuration | |
| with st.sidebar: | |
| st.header("βοΈ Configuration") | |
| # Input method selection | |
| input_method = st.radio( | |
| "Choose input method:", | |
| ["π Upload File", "βοΈ Custom Input"], | |
| help="Select how you want to provide text for analysis" | |
| ) | |
| # File upload or text input | |
| text = "" | |
| if input_method == "π Upload File": | |
| st.markdown("**File Upload**") | |
| uploaded_file = st.file_uploader( | |
| "Choose a file", | |
| type=['txt', 'pdf', 'csv', 'xlsx', 'xls', 'docx'], | |
| help="Supports: TXT, PDF, CSV, Excel (XLSX/XLS), Word (DOCX)" | |
| ) | |
| if uploaded_file is not None: | |
| st.success(f"π File loaded: **{uploaded_file.name}**") | |
| # Show file info | |
| with st.expander("File Details", expanded=False): | |
| st.write(f"**Name:** {uploaded_file.name}") | |
| st.write(f"**Size:** {len(uploaded_file.getvalue()):,} bytes") | |
| st.write(f"**Type:** {uploaded_file.type}") | |
| # Process the file | |
| file_name = uploaded_file.name.lower() | |
| with st.spinner(f"Processing {uploaded_file.name}..."): | |
| try: | |
| if file_name.endswith('.txt'): | |
| uploaded_file.seek(0) | |
| text = str(uploaded_file.read(), "utf-8") | |
| elif file_name.endswith('.pdf'): | |
| text = visualizer.extract_text_from_pdf(uploaded_file) | |
| elif file_name.endswith('.csv'): | |
| text = visualizer.extract_text_from_csv(uploaded_file) | |
| elif file_name.endswith(('.xlsx', '.xls')): | |
| text = visualizer.extract_text_from_excel(uploaded_file) | |
| elif file_name.endswith('.docx'): | |
| text = visualizer.extract_text_from_docx(uploaded_file) | |
| else: | |
| st.warning("Unsupported file type - trying as text...") | |
| uploaded_file.seek(0) | |
| text = str(uploaded_file.read(), "utf-8") | |
| except Exception as e: | |
| st.error(f"Error processing file: {str(e)}") | |
| text = "" | |
| # Show processing results | |
| if text and len(text.strip()) > 0: | |
| st.success(f"β Extracted {len(text):,} characters") | |
| # Show preview | |
| preview_text = text[:300] + "..." if len(text) > 300 else text | |
| st.text_area( | |
| "Content Preview:", | |
| value=preview_text, | |
| height=100, | |
| disabled=True, | |
| help="First 300 characters of extracted text" | |
| ) | |
| else: | |
| st.error("β No text could be extracted from the file") | |
| else: | |
| st.info("π Choose a file to upload") | |
| else: # Custom Input | |
| text = st.text_area( | |
| "Enter your text:", | |
| height=200, | |
| placeholder="Paste or type your text here to analyze different chunking strategies...", | |
| help="Paste or type the text you want to analyze" | |
| ) | |
| # Only show chunking options if we have text | |
| if text and len(text.strip()) > 0: | |
| st.divider() | |
| # Method selection | |
| st.subheader("π§ Chunking Methods") | |
| method_options = { | |
| 'Fixed Size': 'Character-based splitting with word boundaries', | |
| 'Sentence-based': 'Group by sentences for readability', | |
| 'Paragraph-based': 'Respect document structure', | |
| 'Recursive': 'Hierarchical splitting with multiple separators' | |
| } | |
| selected_methods = [] | |
| for method, description in method_options.items(): | |
| if st.checkbox(method, value=method in ['Fixed Size', 'Sentence-based'], help=description): | |
| selected_methods.append(method) | |
| if not selected_methods: | |
| st.warning("β οΈ Select at least one chunking method") | |
| st.divider() | |
| # Parameters | |
| st.subheader("βοΈ Parameters") | |
| params = {} | |
| if 'Fixed Size' in selected_methods: | |
| st.markdown("**Fixed Size Settings**") | |
| params['chunk_size'] = st.slider("Chunk size (characters)", 200, 2000, 800, step=50) | |
| params['overlap'] = st.slider("Overlap (characters)", 0, 300, 100, step=25) | |
| if 'Sentence-based' in selected_methods: | |
| st.markdown("**Sentence-based Settings**") | |
| params['sentences_per_chunk'] = st.slider("Sentences per chunk", 1, 10, 4) | |
| if 'Recursive' in selected_methods: | |
| st.markdown("**Recursive Settings**") | |
| params['max_recursive_size'] = st.slider("Max chunk size", 500, 2000, 1200, step=100) | |
| else: | |
| selected_methods = [] | |
| params = {} | |
| # Main content area | |
| if text and len(text.strip()) > 0 and selected_methods: | |
| # Process text with selected methods | |
| with st.spinner("Processing chunks..."): | |
| all_results = {} | |
| for method in selected_methods: | |
| if method == 'Fixed Size': | |
| chunks = visualizer.fixed_size_chunking( | |
| text, params.get('chunk_size', 800), params.get('overlap', 100) | |
| ) | |
| elif method == 'Sentence-based': | |
| chunks = visualizer.sentence_chunking( | |
| text, params.get('sentences_per_chunk', 4) | |
| ) | |
| elif method == 'Paragraph-based': | |
| chunks = visualizer.paragraph_chunking(text) | |
| elif method == 'Recursive': | |
| chunks = visualizer.recursive_chunking( | |
| text, params.get('max_recursive_size', 1200) | |
| ) | |
| all_results[method] = chunks | |
| st.success(f"β Processed {len(text):,} characters with {len(selected_methods)} methods") | |
| # Display results in tabs | |
| tabs = st.tabs([f"π {method}" for method in selected_methods] + ["π Comparison"]) | |
| # Individual method tabs | |
| for i, (method, chunks) in enumerate(all_results.items()): | |
| with tabs[i]: | |
| metrics = visualizer.calculate_metrics(chunks) | |
| # Metrics display | |
| col1, col2, col3, col4, col5 = st.columns(5) | |
| with col1: | |
| st.metric("Total Chunks", metrics.get('total_chunks', 0)) | |
| with col2: | |
| st.metric("Avg Characters", f"{metrics.get('avg_chars', 0):.0f}") | |
| with col3: | |
| st.metric("Avg Words", f"{metrics.get('avg_words', 0):.0f}") | |
| with col4: | |
| st.metric("Consistency", f"{metrics.get('size_consistency', 0):.2f}") | |
| with col5: | |
| overlap_pct = metrics.get('overlap_ratio', 0) * 100 | |
| st.metric("Overlap", f"{overlap_pct:.1f}%") | |
| # Visualize chunks | |
| visualizer.visualize_chunks(chunks) | |
| # Size distribution chart | |
| if len(chunks) > 1: | |
| sizes = [chunk['char_count'] for chunk in chunks] | |
| fig = px.histogram( | |
| x=sizes, nbins=min(20, len(chunks)), | |
| title=f"{method} - Chunk Size Distribution", | |
| labels={'x': 'Characters', 'y': 'Count'} | |
| ) | |
| fig.update_layout(height=300) | |
| st.plotly_chart(fig, width='stretch') | |
| # Comparison tab | |
| with tabs[-1]: | |
| st.header("π Comprehensive Analysis") | |
| # Comparison charts | |
| visualizer.create_comparison_charts(all_results) | |
| # Metrics table | |
| st.subheader("π Detailed Metrics Comparison") | |
| comparison_data = [] | |
| for method, chunks in all_results.items(): | |
| metrics = visualizer.calculate_metrics(chunks) | |
| comparison_data.append({ | |
| 'Method': method, | |
| 'Chunks': metrics.get('total_chunks', 0), | |
| 'Avg Size': f"{metrics.get('avg_chars', 0):.0f}", | |
| 'Size StdDev': f"{metrics.get('std_chars', 0):.0f}", | |
| 'Consistency': f"{metrics.get('size_consistency', 0):.3f}", | |
| 'Overlap %': f"{metrics.get('overlap_ratio', 0)*100:.1f}%" | |
| }) | |
| df_comparison = pd.DataFrame(comparison_data) | |
| st.dataframe(df_comparison, width='stretch') | |
| # Recommendations | |
| st.subheader("π‘ Recommendations") | |
| best_consistency = max(all_results.keys(), | |
| key=lambda m: visualizer.calculate_metrics(all_results[m]).get('size_consistency', 0)) | |
| optimal_size_method = min(all_results.keys(), | |
| key=lambda m: abs(visualizer.calculate_metrics(all_results[m]).get('avg_chars', 1000) - 600)) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.success(f"π― **Most Consistent**: {best_consistency}") | |
| consistency_score = visualizer.calculate_metrics(all_results[best_consistency]).get('size_consistency', 0) | |
| st.write(f"Consistency score: {consistency_score:.3f}") | |
| with col2: | |
| st.info(f"βοΈ **Optimal Size**: {optimal_size_method}") | |
| avg_size = visualizer.calculate_metrics(all_results[optimal_size_method]).get('avg_chars', 0) | |
| st.write(f"Average size: {avg_size:.0f} characters") | |
| # Use case recommendations | |
| st.markdown("### π‘ Use Case Recommendations") | |
| recommendations = { | |
| "π **Search & Retrieval**": "Use Fixed Size (600-800 chars) for consistent embedding", | |
| "π **Document Processing**": "Use Paragraph-based to preserve structure", | |
| "π€ **LLM Input**": "Use Fixed Size (800-1200 chars) for token management", | |
| "π **Reading Comprehension**": "Use Sentence-based for natural flow", | |
| "π **Data Pipeline**": "Use Recursive for robust processing" | |
| } | |
| for use_case, recommendation in recommendations.items(): | |
| st.markdown(f"- {use_case}: {recommendation}") | |
| else: | |
| # Welcome screen when no text is provided | |
| st.markdown(""" | |
| ## π Welcome to the RAG Chunk Visualizer | |
| This tool analyzes how different chunking strategies split your documents for RAG systems. | |
| ### π Getting Started | |
| **Step 1:** Choose your input method in the sidebar: | |
| - **π Upload File**: Support for PDF, Excel, CSV, Word, and text files | |
| - **βοΈ Custom Input**: Paste or type your own text | |
| **Step 2:** Select chunking methods to compare (2-3 recommended) | |
| **Step 3:** Adjust parameters for each method | |
| **Step 4:** Analyze results with comprehensive metrics and visualizations | |
| ### π§ Available Chunking Methods | |
| - **Fixed Size**: Consistent character-based chunks with word boundaries | |
| - **Sentence-based**: Natural language flow with sentence grouping | |
| - **Paragraph-based**: Document structure preservation | |
| - **Recursive**: Hierarchical splitting with multiple separators | |
| ### π― Key Features | |
| - **Real-time comparison** of different chunking strategies | |
| - **Advanced metrics** including consistency scores and overlap analysis | |
| - **Interactive visualizations** with detailed chunk inspection | |
| - **Professional recommendations** for different use cases | |
| - **Multi-format support** for various document types | |
| ### π Supported File Formats | |
| - **π PDF**: Research papers, reports, documentation | |
| - **π Excel (XLSX/XLS)**: Spreadsheets, data tables, financial reports | |
| - **π CSV**: Data exports, logs, structured datasets | |
| - **π Word (DOCX)**: Business documents, proposals, manuscripts | |
| - **π Text (TXT)**: Plain text files, code, notes | |
| --- | |
| **Ready to begin?** Select your input method in the sidebar! π | |
| """) | |
| # Show example use cases | |
| st.subheader("π‘ Example Use Cases") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.markdown(""" | |
| **π RAG Optimization** | |
| - Find optimal chunk sizes | |
| - Minimize overlap issues | |
| - Improve retrieval accuracy | |
| - Balance context vs precision | |
| """) | |
| with col2: | |
| st.markdown(""" | |
| **π Document Processing** | |
| - Preserve document structure | |
| - Handle different file formats | |
| - Maintain readability | |
| - Process large documents | |
| """) | |
| with col3: | |
| st.markdown(""" | |
| **π€ LLM Integration** | |
| - Manage token limits | |
| - Optimize context windows | |
| - Improve response quality | |
| - Reduce processing costs | |
| """) | |
| if __name__ == "__main__": | |
| main() |