Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import matplotlib.pyplot as plt | |
| import json | |
| import logging | |
| from datetime import datetime | |
| # ==== Metrics Calculation | |
| from sklearn.metrics import roc_auc_score | |
| from sklearn.metrics import mean_squared_error | |
| # === HuggingFace & Transformers === | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| # === Misc === | |
| import json | |
| import gradio as gr | |
| from json_repair import repair_json | |
| import datetime | |
| from utils import ( | |
| form_document_sentences_from_chunks, | |
| form_response_sentences, | |
| convert_to_serializable | |
| ) | |
| from constants import ( | |
| CHUNKING_STRATEGIES, | |
| EMBEDDING_MODELS, | |
| ) | |
| from ragbench import RAGSystem, RAGEvaluator | |
| import nltk | |
| nltk.download('punkt_tab') | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler() # Log to console | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| SUMMARY = 'summary' | |
| DATASET_TYPE = 'dataset_type' | |
| ENTRIES = 'entries' | |
| QUESTIONS = 'questions' | |
| ragbench_details = {} | |
| advanced_analysis = {} | |
| import os | |
| print(os.listdir("data")) # Check if the file is there | |
| # 1. FIXED DATA LOADING - use absolute path | |
| DATA_PATH = 'data/ragbench_summary_questions_chunking.json' | |
| try: | |
| with open(DATA_PATH) as f: | |
| ragbench_details = json.load(f) | |
| # advanced_analysis = json.load(f) | |
| logger.info(f"Loaded data with {len(ragbench_details)} subsets") | |
| except Exception as e: | |
| logger.error(f"Failed to load data: {e}") | |
| ragbench_details = {} # Fallback empty dict | |
| # Placeholder for generated values (integrate your pipeline here) | |
| available_questions = {} | |
| for item in ragbench_details.keys(): | |
| available_questions[item] = [] | |
| for sq in ragbench_details[item][QUESTIONS]: | |
| available_questions[item].append(sq['question']) | |
| # Sample Configs and Options | |
| available_subsets = list(ragbench_details.keys()) | |
| generator_models = ["mistralai/Mistral-7B-Instruct-v0.2"] | |
| evaluators = ["llama"] | |
| def update_question_list_and_info(subset): | |
| subset_questions = ragbench_details.get(subset, []) | |
| questions = [q["question"] for q in subset_questions[QUESTIONS]] | |
| selected = questions[0] if questions else None | |
| orig_ans, y_metrics = get_info_from_sample_questions(subset, selected) if selected else ("", "") | |
| return gr.update(choices=questions, value=selected), orig_ans, y_metrics | |
| def get_info_from_sample_questions(subset, question): | |
| subset_questions = ragbench_details.get(subset, []) | |
| for entry in subset_questions[QUESTIONS]: | |
| if entry.get("question") == question: | |
| orig_ans = entry.get("original_response", "N/A") | |
| y_metrics = entry.get("y_metrics", {}) | |
| # y_metrics_text = "\n".join( | |
| # f"{k.replace('_', ' ').title()}: {v}" for k, v in y_metrics.items() | |
| # ) if y_metrics else "No metrics available." | |
| return orig_ans, y_metrics | |
| return "No answer found.", "No metrics found." | |
| def plot_subset_metrics(subset_name): | |
| summary = ragbench_details[subset_name]["summary"] | |
| # Create a DataFrame for plotting | |
| keys = ['Entries', 'TotalDocs', 'TotalUniqueIds', 'TotalUniqueDocs', 'UniqueDocsPercent'] | |
| values = [summary.get(k, 0) for k in keys] | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| bars = ax.bar(keys, values, color='skyblue') | |
| plt.tight_layout() # Apply tight layout after plotting | |
| # β Add count on top of bars | |
| ax.bar_label(bars, fmt='%.0f', padding=3) | |
| # ax.bar(keys, values, color="mediumseagreen") | |
| ax.set_title(f"Metrics for Subset: {subset_name}, Domain: {summary.get('Domain')}", fontsize=14) | |
| ax.set_ylabel("Value") | |
| ax.grid(axis='y') | |
| ax.set_xticks(range(len(keys))) | |
| ax.set_xticklabels(keys, rotation=21, ha='right') | |
| return fig | |
| def plot_chunking_strategies(subset_name): | |
| chunking_data = ragbench_details[subset_name]["chunking"] | |
| plt.figure(figsize=(10, 6)) | |
| strategies = list(chunking_data.keys()) | |
| counts = list(chunking_data.values()) | |
| bars = plt.bar(strategies, counts, color="skyblue") | |
| # Add value labels on top of bars | |
| for bar in bars: | |
| yval = bar.get_height() | |
| plt.text(bar.get_x() + bar.get_width()/2, yval + 20, int(yval), ha='center', va='bottom', fontsize=10) | |
| # plt.xlabel("Chunking Strategies") | |
| plt.tight_layout() # Apply tight layout after plotting | |
| plt.ylabel("Number of Chunks") | |
| plt.title(f"Chunking Strategy Distribution - {subset_name}") | |
| plt.xticks(rotation=30) | |
| plt.tight_layout() | |
| # Return plot as figure (Gradio accepts it) | |
| return plt.gcf() | |
| # Initialize with first subset's summary | |
| initial_subset = available_subsets[0] if available_subsets else None | |
| initial_plot = plot_subset_metrics(initial_subset) if initial_subset else "No data available" | |
| def generate_advance_report(subset_dropdown, dataset_type_dropdown, chunking_dropdown, | |
| embed_dropdown, retriever_dropdown, | |
| chunk_count, retriever_type, noOfQuestions, | |
| reranking_checkbox, evaluator_dropdown): | |
| export_data = { | |
| "metadata": { | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "format_version": "1.0" | |
| }, | |
| "subset": { | |
| "subset": subset_dropdown, | |
| "dataset_type": dataset_type_dropdown, | |
| }, | |
| "model_details": { | |
| "strategy": chunking_dropdown, | |
| "embed_model": embed_dropdown, | |
| "generator_model": retriever_dropdown, | |
| "chunk_count": chunk_count, | |
| "noOfQuestions": noOfQuestions, | |
| "retriever_type": retriever_type, | |
| "reranking": reranking_checkbox, | |
| "evaluator_model": evaluator_dropdown | |
| } | |
| } | |
| print(f"Extracting advanced analysis for subset: {subset_dropdown}") | |
| print(f"Data: advanced_analysis: {advanced_analysis}") | |
| try: | |
| export_data["questions"] = advanced_analysis.get(subset_dropdown, {}).get('questions', []) | |
| except Exception as e: | |
| print(f"Failed to load questions for subset {subset_dropdown}: {e}") | |
| print(f"Failed to load questions for subset {subset_dropdown}, Data: {advanced_analysis}") | |
| export_data["questions"] = [] | |
| # Step 2: Convert to JSON string for display | |
| json_str = json.dumps(export_data, indent=2) | |
| fileName = f"{subset_dropdown}_{chunking_dropdown}_output_{datetime.datetime.now().strftime('%d-%B-%Y-%H-%M-%S')}.json" | |
| # Save to file inside Space | |
| with open(fileName, "w") as f: | |
| f.write(json_str) | |
| return json_str, fileName | |
| def generate_file(subset_dropdown, dataset_type_dropdown, | |
| chunking_dropdown, embed_dropdown, retriever_dropdown, | |
| chunk_count, retriever_type, | |
| reranking_checkbox, evaluator_dropdown, | |
| orig_ans_display, y_metrics_display, | |
| gen_ans_display, y_pred_metrics_display, | |
| chunks_retrieved_display, | |
| evaluator_json_output): | |
| """Generate a file with the given text and return the file path.""" | |
| export_data = { | |
| "metadata": { | |
| "timestamp": datetime.datetime.now().isoformat(), | |
| "format_version": "1.0" | |
| }, | |
| "subset": { | |
| "subset": subset_dropdown, | |
| "dataset_type": dataset_type_dropdown, | |
| }, | |
| "model_details": { | |
| "strategy": chunking_dropdown, | |
| "embed_model": embed_dropdown, | |
| "generator_model": retriever_dropdown, | |
| "chunk_count": chunk_count, | |
| "retriever_type": retriever_type, | |
| "reranking": reranking_checkbox, | |
| "evaluator_model": evaluator_dropdown | |
| }, | |
| "results": { | |
| "original_answer": orig_ans_display, | |
| "y_metrics": y_metrics_display, | |
| "generated_answer": gen_ans_display, | |
| "y_pred_metrics": y_pred_metrics_display, | |
| "retrieved_chunks": convert_to_serializable(chunks_retrieved_display), | |
| "evaluator_json_output": evaluator_json_output | |
| } | |
| } | |
| # Step 2: Convert to JSON string for display | |
| json_str = json.dumps(export_data, indent=2) | |
| fileName = f"{subset_dropdown}_output_{datetime.datetime.now().strftime('%d-%B-%Y-%H-%M-%S')}.json" | |
| # Save to file inside Space | |
| with open(fileName, "w") as f: | |
| f.write(json_str) | |
| return json_str, fileName | |
| def run_rag_pipeline_multiple_questions(subset, chunking, embed_model, retriever, noOfQuestions, retriever_type, | |
| chunk_count, reranking, evaluator): | |
| print(f"Running RAG Pipeline for {noOfQuestions} questions in subset: {subset}") | |
| try: | |
| with open(DATA_PATH) as f: | |
| advanced_analysis = json.load(f) | |
| logger.info(f"Loaded data with {len(ragbench_details)} subsets") | |
| except Exception as e: | |
| logger.error(f"Failed to load data: {e}") | |
| return None | |
| print("Starting RAG pipeline for {noOfQuestions} questions!!!") | |
| ragSystemObject = RAGSystem( | |
| subset=subset, | |
| dataset_type="test", | |
| strategy=chunking, | |
| chunks=[], # Not needed for loading | |
| generator_model_name=retriever, | |
| retriever_model_name=embed_model | |
| ) | |
| # 3. Load or use stored vector DB | |
| ragSystemObject.load_embeddings_database(retriever_type=retriever_type) | |
| print("Loaded the embeddings database - Complete") | |
| allQuestions = advanced_analysis.get(subset, {}).get('questions', []) | |
| for item in range(noOfQuestions): | |
| print(f"Processing question {item+1}/{noOfQuestions}, question: {allQuestions[item]['question']}") | |
| allQuestions[item]['generated_answer'] = "" | |
| allQuestions[item]['y_pred_metrics'] = {} | |
| # 4. Retrieve and Generate Answer | |
| retrieved_chunks = ragSystemObject.retrieve(allQuestions[item]['question'], top_k=chunk_count) | |
| print("Retrieved the chunks - Complete") | |
| context_docs = [chunk.text for chunk in retrieved_chunks] | |
| print(f"Retrieved Chunks: {context_docs}") | |
| generated_answer = ragSystemObject.generate(allQuestions[item]['question'], context_docs) | |
| print("Generated the answer - Complete, generated_answer: ", generated_answer) | |
| allQuestions[item]['generated_answer'] = generated_answer | |
| y_pred_metrics, json_output_groq = _evaluate_using_groq(context_docs, allQuestions[item]['question'], generated_answer) | |
| allQuestions[item]['y_pred_metrics'] = y_pred_metrics | |
| allQuestions[item]['evaluator_json_output'] = json_output_groq | |
| return None | |
| def _evaluate_using_groq(context_docs, question, generated_answer): | |
| document_sentences = form_document_sentences_from_chunks(context_docs) | |
| response_sentences = form_response_sentences(generated_answer) | |
| # print(f"\nResponse Sentences: {response_sentences}") | |
| print(f"Length of Response Sentences: {len(response_sentences)}") | |
| print(f"Length of Document Sentences : {len(document_sentences)}") | |
| y_pred_metrics = { | |
| "relevance_score": "NA", | |
| "utilization_score": "NA", | |
| "completeness_score": "NA", | |
| "adherence_score": "NA" | |
| } | |
| # Call evaluator with the right variables | |
| try: | |
| grok_api_key = os.environ.get("GROQ_API_KEY") # Safely loaded from HF Secrets | |
| evaluator = RAGEvaluator( | |
| use_groq=True, | |
| groq_api_key=grok_api_key, | |
| groq_model="llama3-70b-8192" | |
| ) | |
| result = evaluator.evaluate(document_sentences, question, response_sentences) | |
| print(f"\nResult----\n: {result}") | |
| if result is not None: | |
| y_pred_metrics = evaluator.extract_trace_metrics_from_json(result, len(document_sentences)) | |
| evaluator_json_output = json.dumps(result, indent=4) | |
| print(f"Result: {evaluator_json_output}") | |
| print(f"Metrics: {y_pred_metrics}") | |
| else: | |
| print("No result obtained for this question") | |
| except Exception as e: | |
| print(f"Exception Raised in evaluation / extract_trace_metrics_from_json. Details: {e}") | |
| return y_pred_metrics, evaluator_json_output | |
| def run_rag_pipeline(subset, question, custom_question, chunking, embed_model, retriever, chunk_count, retriever_type, | |
| reranking, evaluator): | |
| final_question = custom_question if custom_question.strip() else question | |
| print(f"The query is {final_question}") | |
| # 1. Load pre-chunked data (you should load from memory or cache as needed) | |
| # chunks = load_chunked_data(subset=subset, strategy=chunking) | |
| print("Starting RAG pipeline using print") | |
| logging.info("Starting RAG Pipeline using logging") | |
| gr.Info("Starting RAG Pipeline using gradio") # Shows as a toast notification in UI | |
| rag = RAGSystem( | |
| subset=subset, | |
| dataset_type="test", | |
| strategy=chunking, | |
| chunks=[], # Not needed for loading | |
| generator_model_name=retriever, | |
| retriever_model_name=embed_model | |
| ) | |
| # 3. Load or use stored vector DB | |
| # rag.store_embeddings_database() # or | |
| rag.load_embeddings_database(retriever_type=retriever_type) | |
| print("Loaded the embeddings database - Complete") | |
| # 4. Retrieve and Generate Answer | |
| retrieved_chunks = rag.retrieve(final_question, top_k=chunk_count) | |
| print("Retrieved the chunks - Complete") | |
| context_docs = [chunk.text for chunk in retrieved_chunks] | |
| print(f"Retrieved Chunks: {context_docs}") | |
| # document_sentences = form_document_sentences_from_chunks(context_docs) | |
| generated_answer = rag.generate(final_question, context_docs) | |
| print("Generated the answer - Complete, generated_answer: ", generated_answer) | |
| y_pred_metrics, evaluator_json_output = _evaluate_using_groq(context_docs, final_question, generated_answer) | |
| # response_sentences = form_response_sentences(generated_answer) | |
| # # print(f"\nResponse Sentences: {response_sentences}") | |
| # print(f"Length of Response Sentences: {len(response_sentences)}") | |
| # print(f"Length of Document Sentences : {len(document_sentences)}") | |
| # y_pred_metrics = { | |
| # "relevance_score": "NA", | |
| # "utilization_score": "NA", | |
| # "completeness_score": "NA", | |
| # "adherence_score": "NA" | |
| # } | |
| # # Call evaluator with the right variables | |
| # try: | |
| # grok_api_key = os.environ.get("GROQ_API_KEY") # Safely loaded from HF Secrets | |
| # evaluator = RAGEvaluator( | |
| # use_groq=True, | |
| # groq_api_key=grok_api_key, | |
| # groq_model="llama3-70b-8192" | |
| # ) | |
| # result = evaluator.evaluate(document_sentences, question, response_sentences) | |
| # print(f"\nResult----\n: {result}") | |
| # if result is not None: | |
| # y_pred_metrics = evaluator.extract_trace_metrics_from_json(result, len(document_sentences)) | |
| # evaluator_json_output = json.dumps(result, indent=4) | |
| # print(f"Result: {evaluator_json_output}") | |
| # print(f"Metrics: {y_pred_metrics}") | |
| # else: | |
| # print("No result obtained for this question") | |
| # except Exception as e: | |
| # print(f"Exception Raised in evaluation / extract_trace_metrics_from_json. Details: {e}") | |
| # Format as list of lists | |
| formatted_chunks = [ | |
| [f"Source {i+1}", chunk.text] | |
| for i, chunk in enumerate(retrieved_chunks) | |
| ] | |
| return ( | |
| generated_answer, | |
| json.dumps(y_pred_metrics, indent=2), | |
| formatted_chunks, | |
| json.loads(evaluator_json_output) if evaluator_json_output else {"error": "No evaluation result available."}, | |
| None | |
| ) | |
| # UI Layout | |
| # with gr.Blocks(title="RAGBench Explorer") as demo: | |
| with gr.Blocks( | |
| head="<!DOCTYPE html>", | |
| css=":root { -webkit-print-color-adjust: exact; }") as demo: | |
| demo.title = "RAGBench Interactive Explorer" | |
| gr.Markdown("## π RAGBench Interactive Explorer") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("### βοΈ Subset and Dataset Selection") | |
| subset_dropdown = gr.Dropdown(choices=available_subsets, label="π Subset", value=initial_subset, interactive=True) | |
| dataset_type_dropdown = gr.Dropdown(choices=["test"], label="π Dataset Type", value="test", interactive=False) | |
| with gr.Column(scale=3): | |
| gr.Markdown("### βοΈ Chunking and Model Selection") | |
| chunking_dropdown = gr.Dropdown(choices=CHUNKING_STRATEGIES, label="π¦ Chunking Strategy", value="SentenceBasedLangchain") | |
| embed_dropdown = gr.Dropdown(choices=EMBEDDING_MODELS, label="π Embedding Model", value="BAAI/bge-large-en-v1.5") | |
| retriever_dropdown = gr.Dropdown(choices=generator_models, label="π§ Generator Model", value="mistralai/Mistral-7B-Instruct-v0.2") | |
| with gr.Column(scale=4): | |
| gr.Markdown("### βοΈ Retrieval Settings and Evaluation Option") | |
| chunk_count = gr.Slider(minimum=1, maximum=15, | |
| value=5, # Default value | |
| step=1, label="Number of Chunks to Retrieve") | |
| retriever_type = gr.Dropdown( | |
| choices=["BM25", "Vector"], # Add other options as needed | |
| value="BM25", # Default selection | |
| label="Retriever Type") | |
| reranking_checkbox = gr.Checkbox(label="Use Reranking", value=False) | |
| evaluator_dropdown = gr.Dropdown(choices=evaluators, label="π§ Evaluator Model", value="llama") | |
| with gr.Row(): | |
| metrics_plot = gr.Plot() | |
| chunking_strategies_plot = gr.Plot() | |
| with gr.Row(): | |
| question_dropdown = gr.Dropdown(choices=available_questions[initial_subset], filterable=True, label="π Predefined Questions", interactive=True) | |
| custom_question_input = gr.Textbox(label="βοΈ Custom Question (optional)") | |
| # Link dropdown to plot | |
| subset_dropdown.change(fn=plot_subset_metrics, inputs=[subset_dropdown], outputs=[metrics_plot]) | |
| subset_dropdown.change(fn=plot_chunking_strategies, inputs=[subset_dropdown], outputs=[chunking_strategies_plot]) | |
| submit_btn = gr.Button("π Run RAG Evaluation", variant="primary") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π Original Answer and Metrics") | |
| orig_ans_display = gr.Textbox(label="π Original Answer", lines=5, interactive=False) | |
| y_metrics_display = gr.JSON(label="y-metrics", value={}) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π€ Generated Answer and Evaluation Metrics") | |
| gen_ans_display = gr.Textbox(label="π€ Generated Answer", lines=5) | |
| y_pred_metrics_display = gr.JSON(label="π Evaluation (y-pred) Metrics", value={}) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π Analysis Results") | |
| # chunks_retrieved_display = gr.Textbox(label="Chunks Retrieved", lines=5, interactive=False) | |
| with gr.Accordion(label="π Retrieved Chunks (Expand to View)", open=False): | |
| chunks_retrieved_display = gr.Dataframe( | |
| headers=["Source", "Chunk Text"], | |
| datatype=["str", "str"], | |
| interactive=False, | |
| wrap=True) | |
| evaluator_json_output = gr.JSON(label="Analysis Results", value={}) | |
| subset_dropdown.change(fn=update_question_list_and_info, inputs=[subset_dropdown], outputs=[question_dropdown, orig_ans_display, y_metrics_display]) | |
| question_dropdown.change(fn=get_info_from_sample_questions, inputs=[subset_dropdown, question_dropdown], outputs=[orig_ans_display, y_metrics_display]) | |
| with gr.Row(): | |
| # Add export button | |
| with gr.Column(scale=3): | |
| with gr.Accordion("Advanced Options", open=False): | |
| default_no_of_questions = 2 | |
| noOfQuestions = gr.Slider( | |
| minimum=1, | |
| maximum=50, | |
| value=default_no_of_questions, | |
| step=1, | |
| label="Number of Questions to Evaluate" | |
| ) | |
| # Create button with initial label | |
| evaluate_btn = gr.Button(f"π Run RAG Evaluation for {default_no_of_questions} Questions", variant="primary") | |
| # Create button with initial label | |
| evaluate_report = gr.Button("π Generate Collection Report", variant="primary") | |
| # Function to update button label | |
| def update_button_label(value): | |
| return gr.Button(f"π Run RAG Evaluation for {value} Questions", variant="primary") | |
| # Connect slider to button label update | |
| noOfQuestions.change( | |
| update_button_label, | |
| inputs=noOfQuestions, | |
| outputs=evaluate_btn | |
| ) | |
| evaluate_btn.click( | |
| fn=run_rag_pipeline_multiple_questions, | |
| inputs=[ | |
| subset_dropdown, | |
| chunking_dropdown, embed_dropdown, retriever_dropdown, | |
| noOfQuestions, retriever_type, chunk_count, | |
| reranking_checkbox, evaluator_dropdown | |
| ], | |
| outputs=[evaluator_json_output] | |
| ) | |
| generate_btn = gr.Button("Generate JSON & Download") | |
| with gr.Column(scale=2): | |
| json_output = gr.Code(label="JSON Output", max_lines=50, language="json") | |
| download_file = gr.File(label="Download Link") | |
| submit_btn.click( | |
| fn=run_rag_pipeline, | |
| inputs=[ | |
| subset_dropdown, question_dropdown, custom_question_input, | |
| chunking_dropdown, embed_dropdown, retriever_dropdown, | |
| chunk_count, retriever_type, | |
| reranking_checkbox, evaluator_dropdown | |
| ], | |
| outputs=[gen_ans_display, y_pred_metrics_display, chunks_retrieved_display, evaluator_json_output, download_file] | |
| ) | |
| generate_btn.click( | |
| fn=generate_file, | |
| inputs=[subset_dropdown, dataset_type_dropdown, | |
| chunking_dropdown, embed_dropdown, retriever_dropdown, | |
| chunk_count, retriever_type, | |
| reranking_checkbox, evaluator_dropdown, | |
| orig_ans_display, y_metrics_display, | |
| gen_ans_display, y_pred_metrics_display, | |
| chunks_retrieved_display, | |
| evaluator_json_output | |
| ], | |
| outputs=[json_output, download_file] | |
| ) | |
| evaluate_report.click( | |
| fn=generate_advance_report, | |
| inputs=[subset_dropdown, dataset_type_dropdown, | |
| chunking_dropdown, embed_dropdown, retriever_dropdown, | |
| chunk_count, retriever_type, noOfQuestions, | |
| reranking_checkbox, evaluator_dropdown | |
| ], | |
| outputs=[json_output, download_file] | |
| ) | |
| demo.launch(debug=True, prevent_thread_lock=True, share=True) |