Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import matplotlib.pyplot as plt | |
| import json | |
| import logging | |
| from datetime import datetime | |
| import numpy as np | |
| import matplotlib | |
| # === Misc === | |
| import json | |
| import gradio as gr | |
| import datetime | |
| from utils import ( | |
| form_document_sentences_from_chunks, | |
| form_response_sentences, | |
| convert_to_serializable, | |
| extract_metric_lists, | |
| compute_metric_with_missing, | |
| upload_file | |
| ) | |
| from constants import ( | |
| CHUNKING_STRATEGIES, | |
| EMBEDDING_MODELS, | |
| RELEVANCE_SCORE, | |
| UTILIZATION_SCORE, | |
| COMPLETENESS_SCORE, | |
| ADHERENCE_SCORE, | |
| ) | |
| from ragbench import RAGSystem, RAGEvaluator | |
| import nltk | |
| nltk.download('punkt_tab') | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler() # Log to console | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| SUMMARY = 'summary' | |
| DATASET_TYPE = 'dataset_type' | |
| ENTRIES = 'entries' | |
| QUESTIONS = 'questions' | |
| ragbench_details = {} | |
| advanced_analysis = {} | |
| import os | |
| print(os.listdir("data")) # Check if the file is there | |
| # 1. FIXED DATA LOADING - use absolute path | |
| DATA_PATH = 'data/ragbench_summary_questions_chunking.json' | |
| try: | |
| with open(DATA_PATH) as f: | |
| ragbench_details = json.load(f) | |
| # advanced_analysis = json.load(f) | |
| logger.info(f"Loaded data with {len(ragbench_details)} subsets") | |
| except Exception as e: | |
| logger.error(f"Failed to load data: {e}") | |
| ragbench_details = {} # Fallback empty dict | |
| # Placeholder for generated values (integrate your pipeline here) | |
| available_questions = {} | |
| for item in ragbench_details.keys(): | |
| available_questions[item] = [] | |
| for sq in ragbench_details[item][QUESTIONS]: | |
| available_questions[item].append(sq['question']) | |
| # Sample Configs and Options | |
| available_subsets = list(ragbench_details.keys()) | |
| generator_models = ["mistralai/Mistral-7B-Instruct-v0.2"] | |
| evaluators = ["llama"] | |
| def update_question_list_and_info(subset): | |
| subset_questions = ragbench_details.get(subset, []) | |
| questions = [q["question"] for q in subset_questions[QUESTIONS]] | |
| selected = questions[0] if questions else None | |
| orig_ans, y_metrics = get_info_from_sample_questions(subset, selected) if selected else ("", "") | |
| return gr.update(choices=questions, value=selected), orig_ans, y_metrics | |
| def get_info_from_sample_questions(subset, question): | |
| subset_questions = ragbench_details.get(subset, []) | |
| for entry in subset_questions[QUESTIONS]: | |
| if entry.get("question") == question: | |
| orig_ans = entry.get("original_response", "N/A") | |
| y_metrics = entry.get("y_metrics", {}) | |
| # y_metrics_text = "\n".join( | |
| # f"{k.replace('_', ' ').title()}: {v}" for k, v in y_metrics.items() | |
| # ) if y_metrics else "No metrics available." | |
| return orig_ans, y_metrics | |
| return "No answer found.", "No metrics found." | |
| def plot_subset_metrics_old1(subset_name): | |
| """Generate a bar plot of key metrics for a given subset with proper title display.""" | |
| try: | |
| summary = ragbench_details[subset_name]["summary"] | |
| # Metrics to plot | |
| metrics = { | |
| 'Entries': 'Total Entries', | |
| 'TotalDocs': 'Total Documents', | |
| 'TotalUniqueIds': 'Unique IDs', | |
| 'TotalUniqueDocs': 'Unique Documents', | |
| 'UniqueDocsPercent': '% Unique Docs' | |
| } | |
| # Prepare data | |
| display_names = list(metrics.values()) | |
| values = [summary.get(metric, 0) for metric in metrics.keys()] | |
| # Create figure with constrained layout | |
| fig, ax = plt.subplots(figsize=(10, 6), constrained_layout=True) | |
| # Plot bars | |
| bars = ax.bar(display_names, values, color='skyblue') | |
| # Add value labels | |
| ax.bar_label(bars, fmt='%.0f', padding=3, fontsize=9) | |
| # Title adjustments | |
| title = f"Dataset Metrics: {subset_name}" | |
| if 'Domain' in summary: | |
| title += f" - Domain: {summary['Domain']}" | |
| ax.set_title(title, fontsize=12, pad=20, loc='left') # Explicit title positioning | |
| # Axis formatting | |
| ax.set_ylabel("Count", fontsize=10) | |
| ax.grid(axis='y', linestyle=':', alpha=0.6) | |
| # Rotate x-labels | |
| ax.set_xticks(range(len(display_names))) | |
| ax.set_xticklabels(display_names, rotation=25, ha='right', fontsize=9) | |
| # Special handling for percentage value | |
| if 'UniqueDocsPercent' in summary: | |
| ax.text( | |
| len(metrics)-1, | |
| summary['UniqueDocsPercent'], | |
| f"{summary['UniqueDocsPercent']}%", | |
| ha='center', | |
| va='bottom', | |
| fontsize=10, | |
| bbox=dict(facecolor='white', alpha=0.8, edgecolor='none') | |
| ) | |
| return fig | |
| except Exception as e: | |
| print(f"Plotting error: {str(e)}") | |
| def plot_chunking_strategies_old1(subset_name): | |
| """Visualize chunking strategy distribution with enhanced formatting.""" | |
| try: | |
| chunking_data = ragbench_details[subset_name]["chunking"] | |
| # Create figure with constrained layout to prevent clipping | |
| fig, ax = plt.subplots(figsize=(10, 5), constrained_layout=True) | |
| # Prepare data | |
| strategies = list(chunking_data.keys()) | |
| counts = list(chunking_data.values()) | |
| # Plot bars with different colors | |
| bars = ax.bar(strategies, counts, color='skyblue', edgecolor='white', linewidth=0.7) | |
| # Add value labels with better positioning | |
| for bar in bars: | |
| height = bar.get_height() | |
| ax.text( | |
| bar.get_x() + bar.get_width()/2, | |
| height + max(counts)*0.02, # Dynamic padding | |
| f'{int(height):,}', # Format with thousands separator | |
| ha='center', | |
| va='bottom', | |
| fontsize=10) | |
| # Customize plot | |
| ax.set_title( | |
| f"Chunking Strategy Distribution - {subset_name}", | |
| fontsize=12, | |
| pad=20) | |
| ax.set_ylabel("Number of Chunks", fontsize=10) | |
| # ax.set_xlabel("Chunking Method", fontsize=10, labelpad=10) | |
| # Rotate x-labels and adjust appearance | |
| ax.set_xticks(range(len(strategies))) | |
| ax.set_xticklabels( | |
| strategies, | |
| rotation=30, | |
| ha='right', | |
| fontsize=9, | |
| rotation_mode='anchor' | |
| ) | |
| # Improve grid and spines | |
| ax.grid(axis='y', linestyle=':', alpha=0.6) | |
| ax.spines[['top', 'right']].set_visible(False) | |
| # Auto-scale y-axis with 10% headroom | |
| ax.set_ylim(0, max(counts) * 1.1) | |
| return fig | |
| except Exception as e: | |
| print(f"Error plotting chunking strategies: {str(e)}") | |
| def plot_chunking_strategies_working(subset_name): | |
| """Visualize chunking strategy distribution with consistent formatting and border.""" | |
| try: | |
| chunking_data = ragbench_details[subset_name]["chunking"] | |
| # Create figure with constrained layout and white background | |
| fig, ax = plt.subplots(figsize=(10, 5), constrained_layout=True, facecolor='white') | |
| # Add a subtle border around the entire figure | |
| fig.patch.set_edgecolor('lightgray') | |
| fig.patch.set_linewidth(2) | |
| # Prepare data | |
| strategies = list(chunking_data.keys()) | |
| counts = list(chunking_data.values()) | |
| # Plot bars with consistent styling | |
| bars = ax.bar(strategies, counts, color='skyblue', edgecolor='white', linewidth=0.7) | |
| # Add value labels | |
| ax.bar_label(bars, fmt='%d', padding=3, fontsize=9) | |
| # Customize plot | |
| ax.set_title( | |
| f"Chunking Strategy Distribution - {subset_name}", | |
| fontsize=12, | |
| pad=20, | |
| loc='left' | |
| ) | |
| ax.set_ylabel("Number of Chunks", fontsize=10) | |
| # Rotate x-labels | |
| ax.set_xticks(range(len(strategies))) | |
| ax.set_xticklabels( | |
| strategies, | |
| rotation=25, | |
| ha='right', | |
| fontsize=9, | |
| rotation_mode='anchor' | |
| ) | |
| # Consistent grid and spines | |
| ax.grid(axis='y', linestyle=':', alpha=0.6) | |
| ax.spines[['top', 'right']].set_visible(False) | |
| # Add border around plot area | |
| ax.spines[['left', 'bottom']].set_color('lightgray') | |
| ax.spines[['left', 'bottom']].set_linewidth(1.5) | |
| # Auto-scale with same headroom | |
| ax.set_ylim(0, max(counts) * 1.1) | |
| return fig | |
| except Exception as e: | |
| print(f"Error plotting chunking strategies: {str(e)}") | |
| def plot_chunking_strategies(subset_name): | |
| """Visualize chunking strategy distribution with gradient-filled bars and complementary borders.""" | |
| try: | |
| chunking_data = ragbench_details[subset_name]["chunking"] | |
| # Create figure with constrained layout | |
| fig, ax = plt.subplots(figsize=(10, 5), constrained_layout=True, facecolor='white') | |
| fig.patch.set_edgecolor('lightgray') | |
| fig.patch.set_linewidth(2) | |
| # Prepare data in original order | |
| strategies = list(chunking_data.keys()) | |
| counts = list(chunking_data.values()) | |
| counts_array = np.array(counts) | |
| # Create color gradient | |
| norm = plt.Normalize(min(counts), max(counts)) | |
| blues = plt.cm.Blues(norm(counts_array)) | |
| # Create complementary border colors (darker version of each bar's color) | |
| border_colors = [] | |
| for color in blues: | |
| # Convert to HSV, make darker, then back to RGB | |
| hsv = matplotlib.colors.rgb_to_hsv(color[:3]) | |
| hsv[2] *= 0.7 # Reduce value (brightness) | |
| border_colors.append(matplotlib.colors.hsv_to_rgb(hsv)) | |
| # Plot bars with gradient fill and custom borders | |
| bars = ax.bar( | |
| strategies, | |
| counts, | |
| color=blues, | |
| edgecolor=border_colors, # Custom border color | |
| linewidth=2, # Thicker border | |
| alpha=0.9 # Slightly transparent fill | |
| ) | |
| # Add value labels with contrasting text color | |
| for bar in bars: | |
| height = bar.get_height() | |
| ax.text( | |
| bar.get_x() + bar.get_width()/2, | |
| height + max(counts)*0.02, | |
| f'{int(height):,}', | |
| ha='center', | |
| va='bottom', | |
| fontsize=9, | |
| color='black' if height > max(counts)/2 else 'black', | |
| bbox=dict(facecolor='white', alpha=0.7, edgecolor='none', pad=1) | |
| ) | |
| # Rest of the plot customization remains the same... | |
| ax.set_title(f"Chunking Strategy Distribution - {subset_name}", fontsize=12, pad=20, loc='left') | |
| ax.set_ylabel("Number of Chunks", fontsize=10) | |
| ax.set_xticks(range(len(strategies))) | |
| ax.set_xticklabels(strategies, rotation=25, ha='right', fontsize=9, rotation_mode='anchor') | |
| # Add colorbar | |
| sm = plt.cm.ScalarMappable(cmap='Blues', norm=norm) | |
| sm.set_array([]) | |
| cbar = plt.colorbar(sm, ax=ax, pad=0.02) | |
| cbar.set_label('Count Intensity', fontsize=9) | |
| # Styling | |
| ax.grid(axis='y', linestyle=':', alpha=0.6) | |
| ax.spines[['top', 'right']].set_visible(False) | |
| ax.spines[['left', 'bottom']].set_color('lightgray') | |
| ax.spines[['left', 'bottom']].set_linewidth(1.5) | |
| ax.set_ylim(0, max(counts) * 1.1) | |
| return fig | |
| except Exception as e: | |
| print(f"Error: {str(e)}") | |
| def plot_subset_metrics(subset_name): | |
| """Generate a bar plot of key metrics with consistent formatting and border.""" | |
| try: | |
| summary = ragbench_details[subset_name]["summary"] | |
| # Metrics to plot | |
| metrics = { | |
| 'Entries': 'Total Entries', | |
| 'TotalDocs': 'Total Documents', | |
| 'TotalUniqueIds': 'Unique IDs', | |
| 'TotalUniqueDocs': 'Unique Documents', | |
| 'UniqueDocsPercent': '% Unique Docs' | |
| } | |
| # Create figure with white background and border | |
| fig, ax = plt.subplots(figsize=(10, 5), constrained_layout=True, facecolor='white') | |
| fig.patch.set_edgecolor('lightgray') | |
| fig.patch.set_linewidth(2) | |
| # Prepare data | |
| display_names = list(metrics.values()) | |
| values = [summary.get(metric, 0) for metric in metrics.keys()] | |
| # Plot bars with same style | |
| bars = ax.bar(display_names, values, color='skyblue', edgecolor='white', linewidth=0.7) | |
| # Consistent value labels | |
| ax.bar_label(bars, fmt='%d', padding=3, fontsize=9) | |
| # Title with same style | |
| title = f"Dataset Metrics - {subset_name}" | |
| if 'Domain' in summary: | |
| title += f" (Domain: {summary['Domain']})" | |
| ax.set_title(title, fontsize=12, pad=20, loc='left') | |
| # Consistent axis styling with border | |
| ax.set_ylabel("Count", fontsize=10) | |
| ax.grid(axis='y', linestyle=':', alpha=0.6) | |
| ax.spines[['top', 'right']].set_visible(False) | |
| # Add border around plot area | |
| ax.spines[['left', 'bottom']].set_color('lightgray') | |
| ax.spines[['left', 'bottom']].set_linewidth(1.5) | |
| # Same label rotation | |
| ax.set_xticks(range(len(display_names))) | |
| ax.set_xticklabels(display_names, rotation=25, ha='right', fontsize=9) | |
| # Special percentage handling | |
| if 'UniqueDocsPercent' in summary: | |
| current_ylim = ax.get_ylim() | |
| ax.set_ylim(current_ylim[0], max(current_ylim[1], summary['UniqueDocsPercent'] * 1.2)) | |
| ax.text( | |
| len(metrics)-1, | |
| summary['UniqueDocsPercent'], | |
| f"{summary['UniqueDocsPercent']}%", | |
| ha='center', | |
| va='bottom', | |
| fontsize=10, | |
| bbox=dict(facecolor='white', alpha=0.8, edgecolor='none') | |
| ) | |
| return fig | |
| except Exception as e: | |
| print(f"Error plotting metrics: {str(e)}") | |
| # Initialize with first subset's summary | |
| initial_subset = available_subsets[0] if available_subsets else None | |
| initial_plot = plot_subset_metrics(initial_subset) if initial_subset else "No data available" | |
| def generate_advance_report(subset_dropdown, dataset_type_dropdown, chunking_dropdown, | |
| embed_dropdown, generator_dropdown, | |
| chunk_count, retriever_type, noOfQuestions, | |
| reranking_checkbox, reranking_dropdown, evaluator_dropdown): | |
| export_data = { | |
| "metadata": { | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "format_version": "1.0" | |
| }, | |
| "subset": { | |
| "subset": subset_dropdown, | |
| "dataset_type": dataset_type_dropdown, | |
| }, | |
| "model_details": { | |
| "strategy": chunking_dropdown, | |
| "embed_model": embed_dropdown, | |
| "generator_model": generator_dropdown, | |
| "chunk_count": chunk_count, | |
| "noOfQuestions": noOfQuestions, | |
| "retriever_type": retriever_type, | |
| "reranking": reranking_checkbox, | |
| "reranking_method": reranking_dropdown if reranking_checkbox else None, | |
| "evaluator_model": evaluator_dropdown | |
| } | |
| } | |
| global advanced_analysis | |
| print(f"Extracting advanced analysis for subset: {subset_dropdown}") | |
| try: | |
| advQuestions = advanced_analysis.get(subset_dropdown, {}).get('questions', []) | |
| export_data['questions'] = [] | |
| for item in advQuestions[:noOfQuestions]: | |
| export_data['questions'].append(item) | |
| except Exception as e: | |
| print(f"Failed to load questions for subset {subset_dropdown}: {e}") | |
| # Compute RMSE and AUC for each question | |
| metricsSummary = {} | |
| for index in [RELEVANCE_SCORE, UTILIZATION_SCORE, COMPLETENESS_SCORE, ADHERENCE_SCORE]: | |
| result = None | |
| y_true, y_pred = extract_metric_lists(export_data['questions'], metric_key=index) | |
| cMetric = "RMSE" | |
| if index == ADHERENCE_SCORE: | |
| cMetric = "AUC" | |
| result = compute_metric_with_missing(y_true, y_pred, cMetric) | |
| if result is None and cMetric == 'AUC': | |
| result = "Skipping AUC β only one class present" | |
| metricsSummary[f"{index}_{cMetric}"] = result | |
| export_data['metricsSummary'] = metricsSummary | |
| # Step 2: Convert to JSON string for display | |
| json_str = json.dumps(export_data, indent=2) | |
| rmEmbedName = embed_dropdown.replace("/", ":") | |
| rmGenName = generator_dropdown.replace("/", ":") | |
| fileName = f"{subset_dropdown}_{noOfQuestions}_{chunking_dropdown}_{rmEmbedName}_{rmGenName}_output_{datetime.datetime.now().strftime('%d-%B-%H-%M')}.json" | |
| # Save to file inside Space | |
| with open(fileName, "w") as f: | |
| f.write(json_str) | |
| resultsFolderPath = f"{subset_dropdown}/results" | |
| try: | |
| upload_file(fileName, resultsFolderPath) | |
| print(f"File {fileName} uploaded to Hugging Face {resultsFolderPath} successfully.") | |
| except Exception as e: | |
| print(f"Failed to upload file {fileName} to Hugging Face: {e}") | |
| return json_str, fileName | |
| def generate_file(subset_dropdown, dataset_type_dropdown, | |
| chunking_dropdown, embed_dropdown, generator_dropdown, | |
| chunk_count, retriever_type, | |
| reranking_checkbox, reranking_dropdown, evaluator_dropdown, | |
| orig_ans_display, y_metrics_display, | |
| gen_ans_display, y_pred_metrics_display, | |
| chunks_retrieved_display, | |
| evaluator_json_output): | |
| """Generate a file with the given text and return the file path.""" | |
| export_data = { | |
| "metadata": { | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "format_version": "1.0" | |
| }, | |
| "subset": { | |
| "subset": subset_dropdown, | |
| "dataset_type": dataset_type_dropdown, | |
| }, | |
| "model_details": { | |
| "strategy": chunking_dropdown, | |
| "embed_model": embed_dropdown, | |
| "generator_model": generator_dropdown, | |
| "chunk_count": chunk_count, | |
| "retriever_type": retriever_type, | |
| "reranking": reranking_checkbox, | |
| "reranking_method": reranking_dropdown if reranking_checkbox else None, | |
| "evaluator_model": evaluator_dropdown | |
| }, | |
| "results": { | |
| "original_answer": orig_ans_display, | |
| "y_metrics": y_metrics_display, | |
| "generated_answer": gen_ans_display, | |
| "y_pred_metrics": y_pred_metrics_display, | |
| "retrieved_chunks": convert_to_serializable(chunks_retrieved_display), | |
| "evaluator_json_output": evaluator_json_output | |
| } | |
| } | |
| # Step 2: Convert to JSON string for display | |
| json_str = json.dumps(export_data, indent=2) | |
| fileName = f"{subset_dropdown}_output_{datetime.datetime.now().strftime('%d-%B-%Y-%H-%M-%S')}.json" | |
| # Save to file inside Space | |
| with open(fileName, "w") as f: | |
| f.write(json_str) | |
| return json_str, fileName | |
| def run_rag_pipeline_multiple_questions(subset, chunking, embed_model, retriever, noOfQuestions, retriever_type, | |
| chunk_count, reranking, reranking_dropdown, evaluator): | |
| print(f"Running RAG Pipeline for {noOfQuestions} questions in subset: {subset}") | |
| global advanced_analysis | |
| try: | |
| with open(DATA_PATH) as f: | |
| advanced_analysis = json.load(f) | |
| logger.info(f"Loaded data with {len(ragbench_details)} subsets") | |
| except Exception as e: | |
| logger.error(f"Failed to load data: {e}") | |
| return None | |
| ranking_method = reranking_dropdown if reranking else None | |
| print(f"Using reranking: {reranking}, method: {ranking_method}") | |
| print("Starting RAG pipeline for {noOfQuestions} questions!!!") | |
| ragSystemObject = RAGSystem( | |
| subset=subset, | |
| dataset_type="test", | |
| strategy=chunking, | |
| chunks=[], # Not needed for loading | |
| generator_model_name=retriever, | |
| retriever_model_name=embed_model, | |
| reranker_model_name=ranking_method | |
| ) | |
| # 3. Load or use stored vector DB | |
| ragSystemObject.load_embeddings_database(retriever_type=retriever_type) | |
| print("Loaded the embeddings database - Complete") | |
| allQuestions = advanced_analysis.get(subset, {}).get('questions', []) | |
| for item in range(noOfQuestions): | |
| print(f"Processing question {item+1}/{noOfQuestions}, question: {allQuestions[item]['question']}") | |
| allQuestions[item]['generated_answer'] = "" | |
| allQuestions[item]['y_pred_metrics'] = {} | |
| # 4. Retrieve and Generate Answer | |
| retrieved_chunks = ragSystemObject.retrieve(allQuestions[item]['question'], top_k=chunk_count) | |
| print("Retrieved the chunks - Complete") | |
| context_docs = [chunk.text for chunk in retrieved_chunks] | |
| print(f"Retrieved Chunks: {context_docs}") | |
| generated_answer = ragSystemObject.generate(allQuestions[item]['question'], context_docs) | |
| print("Generated the answer - Complete, generated_answer: ", generated_answer) | |
| allQuestions[item]['generated_answer'] = generated_answer | |
| y_pred_metrics, json_output_groq = _evaluate_using_groq(context_docs, allQuestions[item]['question'], generated_answer) | |
| allQuestions[item]['y_pred_metrics'] = y_pred_metrics | |
| allQuestions[item]['evaluator_json_output'] = json_output_groq | |
| generate_advance_report( | |
| subset_dropdown=subset, | |
| dataset_type_dropdown="test", | |
| chunking_dropdown=chunking, | |
| embed_dropdown=embed_model, | |
| generator_dropdown=retriever, | |
| chunk_count=chunk_count, | |
| retriever_type=retriever_type, | |
| noOfQuestions=noOfQuestions, | |
| reranking_checkbox=reranking, | |
| reranking_dropdown=ranking_method, | |
| evaluator_dropdown=evaluator | |
| ) | |
| def _evaluate_using_groq(context_docs, question, generated_answer): | |
| document_sentences = form_document_sentences_from_chunks(context_docs) | |
| response_sentences = form_response_sentences(generated_answer) | |
| # print(f"\nResponse Sentences: {response_sentences}") | |
| print(f"Length of Response Sentences: {len(response_sentences)}") | |
| print(f"Length of Document Sentences : {len(document_sentences)}") | |
| y_pred_metrics = { | |
| "relevance_score": "NA", | |
| "utilization_score": "NA", | |
| "completeness_score": "NA", | |
| "adherence_score": "NA" | |
| } | |
| # Call evaluator with the right variables | |
| try: | |
| grok_api_key = os.environ.get("GROQ_API_KEY") # Safely loaded from HF Secrets | |
| evaluator = RAGEvaluator( | |
| use_groq=True, | |
| groq_api_key=grok_api_key, | |
| groq_model="llama3-70b-8192" | |
| ) | |
| result = evaluator.evaluate(document_sentences, question, response_sentences) | |
| print(f"\nResult----\n: {result}") | |
| if result is not None: | |
| y_pred_metrics = evaluator.extract_trace_metrics_from_json(result, len(document_sentences)) | |
| evaluator_json_output = json.dumps(result, indent=4) | |
| print(f"Result: {evaluator_json_output}") | |
| print(f"Metrics: {y_pred_metrics}") | |
| else: | |
| print("No result obtained for this question") | |
| except Exception as e: | |
| print(f"Exception Raised in evaluation / extract_trace_metrics_from_json. Details: {e}") | |
| return y_pred_metrics, evaluator_json_output | |
| def run_rag_pipeline(subset, question, custom_question, chunking, embed_model, retriever, chunk_count, retriever_type, | |
| reranking, reranking_dropdown, evaluator): | |
| final_question = custom_question if custom_question.strip() else question | |
| print(f"The query is {final_question}") | |
| # 1. Load pre-chunked data (you should load from memory or cache as needed) | |
| # chunks = load_chunked_data(subset=subset, strategy=chunking) | |
| print("Starting RAG pipeline using print") | |
| logging.info("Starting RAG Pipeline using logging") | |
| gr.Info("Starting RAG Pipeline using gradio") # Shows as a toast notification in UI | |
| ranking_method = reranking_dropdown if reranking else None | |
| print(f"Using reranking: {reranking}, method: {ranking_method}") | |
| rag = RAGSystem( | |
| subset=subset, | |
| dataset_type="test", | |
| strategy=chunking, | |
| chunks=[], # Not needed for loading | |
| generator_model_name=retriever, | |
| retriever_model_name=embed_model, | |
| reranker_model_name=ranking_method | |
| ) | |
| # 3. Load or use stored vector DB | |
| # rag.store_embeddings_database() # or | |
| rag.load_embeddings_database(retriever_type=retriever_type) | |
| print("Loaded the embeddings database - Complete") | |
| # 4. Retrieve and Generate Answer | |
| retrieved_chunks = rag.retrieve(final_question, top_k=chunk_count) | |
| print("Retrieved the chunks - Complete") | |
| context_docs = [chunk.text for chunk in retrieved_chunks] | |
| print(f"Retrieved Chunks: {context_docs}") | |
| # document_sentences = form_document_sentences_from_chunks(context_docs) | |
| generated_answer = rag.generate(final_question, context_docs) | |
| print("Generated the answer - Complete, generated_answer: ", generated_answer) | |
| y_pred_metrics, evaluator_json_output = _evaluate_using_groq(context_docs, final_question, generated_answer) | |
| # Format as list of lists | |
| formatted_chunks = [ | |
| [f"Source {i+1}", chunk.text] | |
| for i, chunk in enumerate(retrieved_chunks) | |
| ] | |
| return ( | |
| generated_answer, | |
| json.dumps(y_pred_metrics, indent=2), | |
| formatted_chunks, | |
| json.loads(evaluator_json_output) if evaluator_json_output else {"error": "No evaluation result available."}, | |
| None | |
| ) | |
| # UI Layout | |
| # with gr.Blocks(title="RAGBench Explorer") as demo: | |
| with gr.Blocks( | |
| head="<!DOCTYPE html>", | |
| css=":root { -webkit-print-color-adjust: exact; }") as demo: | |
| demo.title = "RAGBench Interactive Explorer" | |
| gr.Markdown("## π RAGBench Interactive Explorer") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("### βοΈ Subset and Dataset Selection") | |
| subset_dropdown = gr.Dropdown(choices=available_subsets, label="π Subset", value=initial_subset, interactive=True) | |
| dataset_type_dropdown = gr.Dropdown(choices=["test"], label="π Dataset Type", value="test", interactive=False) | |
| with gr.Column(scale=3): | |
| gr.Markdown("### βοΈ Chunking and Model Selection") | |
| chunking_dropdown = gr.Dropdown(choices=CHUNKING_STRATEGIES, label="π¦ Chunking Strategy", value="SentenceBasedLangchain") | |
| embed_dropdown = gr.Dropdown(choices=EMBEDDING_MODELS, label="π Embedding Model", value="BAAI/bge-large-en-v1.5") | |
| generator_dropdown = gr.Dropdown(choices=generator_models, label="π§ Generator Model", value="mistralai/Mistral-7B-Instruct-v0.2") | |
| with gr.Column(scale=4): | |
| gr.Markdown("### βοΈ Retrieval Settings and Evaluation Option") | |
| chunk_count = gr.Slider(minimum=1, maximum=15, | |
| value=5, # Default value | |
| step=1, label="Number of Chunks to Retrieve") | |
| retriever_type = gr.Dropdown( | |
| choices=["BM25", "Vector"], # Add other options as needed | |
| value="BM25", # Default selection | |
| label="Retriever Type") | |
| reranking_checkbox = gr.Checkbox(label="Use Reranking", value=False) | |
| # Reranking dropdown, initially hidden | |
| reranking_dropdown = gr.Dropdown( | |
| choices=["cross-encoder/ms-marco-MiniLM-L-12-v2", "cross-encoder/ms-marco-MiniLM-L-6-v2"], | |
| label="Reranking Method", | |
| value="cross-encoder/ms-marco-MiniLM-L-12-v2", | |
| visible=False, # Initially hidden, can be shown based on reranking_checkbox | |
| interactive=True | |
| ) | |
| reranking_checkbox.change( | |
| fn=lambda x: gr.update(visible=x), | |
| inputs=[reranking_checkbox], | |
| outputs=reranking_dropdown | |
| ) | |
| evaluator_dropdown = gr.Dropdown(choices=evaluators, label="π§ Evaluator Model", value="llama") | |
| with gr.Row(): | |
| metrics_plot = gr.Plot() | |
| chunking_strategies_plot = gr.Plot() | |
| with gr.Row(): | |
| question_dropdown = gr.Dropdown(choices=available_questions[initial_subset], filterable=True, label="π Predefined Questions", interactive=True) | |
| custom_question_input = gr.Textbox(label="βοΈ Custom Question (optional)") | |
| # Link dropdown to plot | |
| subset_dropdown.change(fn=plot_subset_metrics, inputs=[subset_dropdown], outputs=[metrics_plot]) | |
| subset_dropdown.change(fn=plot_chunking_strategies, inputs=[subset_dropdown], outputs=[chunking_strategies_plot]) | |
| submit_btn = gr.Button("π Run RAG Evaluation", variant="primary") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π Original Answer and Metrics") | |
| orig_ans_display = gr.Textbox(label="π Original Answer", lines=5, interactive=False) | |
| y_metrics_display = gr.JSON(label="y-metrics", value={}) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π€ Generated Answer and Evaluation Metrics") | |
| gen_ans_display = gr.Textbox(label="π€ Generated Answer", lines=5) | |
| y_pred_metrics_display = gr.JSON(label="π Evaluation (y-pred) Metrics", value={}) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π Analysis Results") | |
| # chunks_retrieved_display = gr.Textbox(label="Chunks Retrieved", lines=5, interactive=False) | |
| with gr.Accordion(label="π Retrieved Chunks (Expand to View)", open=False): | |
| chunks_retrieved_display = gr.Dataframe( | |
| headers=["Source", "Chunk Text"], | |
| datatype=["str", "str"], | |
| interactive=False, | |
| wrap=True) | |
| evaluator_json_output = gr.JSON(label="Analysis Results", value={}) | |
| subset_dropdown.change(fn=update_question_list_and_info, inputs=[subset_dropdown], outputs=[question_dropdown, orig_ans_display, y_metrics_display]) | |
| question_dropdown.change(fn=get_info_from_sample_questions, inputs=[subset_dropdown, question_dropdown], outputs=[orig_ans_display, y_metrics_display]) | |
| with gr.Row(): | |
| # Add export button | |
| with gr.Column(scale=3): | |
| with gr.Accordion("Advanced Options", open=False): | |
| default_no_of_questions = 2 | |
| noOfQuestions = gr.Slider( | |
| minimum=1, | |
| maximum=50, | |
| value=default_no_of_questions, | |
| step=1, | |
| label="Number of Questions to Evaluate" | |
| ) | |
| # Create button with initial label | |
| evaluate_btn = gr.Button(f"π Run RAG Evaluation for {default_no_of_questions} Questions", variant="primary") | |
| # Create button with initial label | |
| evaluate_report = gr.Button("π Generate Metrics & Collection Report", variant="primary") | |
| # Function to update button label | |
| def update_button_label(value): | |
| return gr.Button(f"π Run RAG Evaluation for {value} Questions", variant="primary") | |
| # Connect slider to button label update | |
| noOfQuestions.change( | |
| update_button_label, | |
| inputs=noOfQuestions, | |
| outputs=evaluate_btn | |
| ) | |
| evaluate_btn.click( | |
| fn=run_rag_pipeline_multiple_questions, | |
| inputs=[ | |
| subset_dropdown, | |
| chunking_dropdown, embed_dropdown, generator_dropdown, | |
| noOfQuestions, retriever_type, chunk_count, | |
| reranking_checkbox, reranking_dropdown, evaluator_dropdown | |
| ] | |
| ) | |
| generate_btn = gr.Button("Generate JSON & Download") | |
| with gr.Column(scale=2): | |
| json_output = gr.Code(label="JSON Output", max_lines=50, language="json") | |
| download_file = gr.File(label="Download Link") | |
| submit_btn.click( | |
| fn=run_rag_pipeline, | |
| inputs=[ | |
| subset_dropdown, question_dropdown, custom_question_input, | |
| chunking_dropdown, embed_dropdown, generator_dropdown, | |
| chunk_count, retriever_type, | |
| reranking_checkbox, reranking_dropdown, evaluator_dropdown | |
| ], | |
| outputs=[gen_ans_display, y_pred_metrics_display, chunks_retrieved_display, evaluator_json_output, download_file] | |
| ) | |
| generate_btn.click( | |
| fn=generate_file, | |
| inputs=[subset_dropdown, dataset_type_dropdown, | |
| chunking_dropdown, embed_dropdown, generator_dropdown, | |
| chunk_count, retriever_type, | |
| reranking_checkbox, reranking_dropdown, evaluator_dropdown, | |
| orig_ans_display, y_metrics_display, | |
| gen_ans_display, y_pred_metrics_display, | |
| chunks_retrieved_display, | |
| evaluator_json_output | |
| ], | |
| outputs=[json_output, download_file] | |
| ) | |
| evaluate_report.click( | |
| fn=generate_advance_report, | |
| inputs=[subset_dropdown, dataset_type_dropdown, | |
| chunking_dropdown, embed_dropdown, generator_dropdown, | |
| chunk_count, retriever_type, noOfQuestions, | |
| reranking_checkbox, reranking_dropdown, evaluator_dropdown | |
| ], | |
| outputs=[json_output, download_file] | |
| ) | |
| demo.launch(debug=True, prevent_thread_lock=True, share=True) |