Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from sklearn.metrics import mean_squared_error, roc_auc_score | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from data_processing import load_query_dataset | |
| global ground_truth_answer, ground_truth_metrics,rmse_scores | |
| ground_truth_answer = '' | |
| ground_truth_metrics = {} | |
| rmse_scores = {} | |
| # def calculate_metrics(question, response, docs, time_taken): | |
| # data = load_ragbench() | |
| # retrieve_ground_truths(question, data) | |
| # # Predicted metrics | |
| # predicted_metrics = { | |
| # "ground_truth": ground_truth_answer, | |
| # "context_relevance": context_relevance(question, docs), | |
| # "context_utilization": context_utilization(response, docs), | |
| # "completeness": completeness(response, ground_truth_answer), | |
| # "adherence": adherence(response, docs), | |
| # "response_time" : time_taken | |
| # } | |
| # return predicted_metrics | |
| # def retrieve_ground_truths(question,ragbench_set): | |
| # for dataset_name in ragbench_set.keys(): | |
| # for split_name,instances in ragbench_set[dataset_name].items(): # Fixed: Removed extra '.' and corrected indentation | |
| # print(f"Processing {split_name} split") | |
| # for instance in instances: # Fixed: Corrected indentation | |
| # # Check if the question (data) matches the query | |
| # if instance['question'] == question: | |
| # # If a match is found, retrieve id and response | |
| # instance_id = instance['id'] | |
| # instance_response = instance['response'] | |
| # ground_truth_metrics = { | |
| # "context_relevance": instance['relevance_score'], | |
| # "context_utilization": instance['utilization_score'], | |
| # "completeness": instance['completeness_score'], | |
| # "adherence": instance['adherence_score'] | |
| # } | |
| # ground_truth_answer = instance_response | |
| # print(f"Match found in {split_name} split!") | |
| # print(f"ID: {instance_id}, Response: {instance_response}") | |
| # break # Exit after finding the first match (optional) | |
| # Step 1: Helper function to compute cosine similarity | |
| def compute_cosine_similarity(text1, text2): | |
| if not text1 or not text2: # Check for empty or None values | |
| print("Error: One or both input texts are empty. Returning similarity as 0.") | |
| return 0.0 | |
| vectorizer = TfidfVectorizer(stop_words="english") | |
| try: | |
| vectors = vectorizer.fit_transform([text1, text2]) | |
| similarity = cosine_similarity(vectors[0], vectors[1])[0][0] | |
| return similarity | |
| except ValueError as e: | |
| print(f"Error in vectorization: {e}. Returning similarity as 0.") | |
| return 0.0 | |
| # Step 2: Metric 1 - Context Relevance | |
| def context_relevance(question, relevant_documents): | |
| # combined_docs = " ".join([doc.page_content for doc in relevant_documents]) | |
| combined_docs = " ".join([doc for doc in relevant_documents]) | |
| return compute_cosine_similarity(question, combined_docs) | |
| # Step 3: Metric 2 - Context Utilization | |
| def context_utilization(response, relevant_documents): | |
| #combined_docs = " ".join([doc.page_content for doc in relevant_documents]) | |
| combined_docs = " ".join([doc for doc in relevant_documents]) | |
| return compute_cosine_similarity(response, combined_docs) | |
| # Step 4: Metric 3 - Completeness | |
| def completeness(response, ground_truth_answer): | |
| return compute_cosine_similarity(response, ground_truth_answer) | |
| # Step 5: Metric 4 - Adherence | |
| def adherence(response, relevant_documents): | |
| #combined_docs = " ".join([doc.page_content for doc in relevant_documents]) | |
| combined_docs = " ".join([doc for doc in relevant_documents]) | |
| response_tokens = set(response.split()) | |
| relevant_tokens = set(combined_docs.split()) | |
| supported_tokens = response_tokens.intersection(relevant_tokens) | |
| return len(supported_tokens) / len(response_tokens) | |
| # Step 6: Compute RMSE for metrics | |
| def compute_rmse(predicted_values, ground_truth_values): | |
| # Ensure that both predicted_values and ground_truth_values are numeric | |
| if all(isinstance(i, (int, float)) for i in predicted_values) and all(isinstance(i, (int, float)) for i in ground_truth_values): | |
| return np.sqrt(mean_squared_error(ground_truth_values, predicted_values)) | |
| else: | |
| print("Invalid input for RMSE calculation. Ensure all values are numeric.") | |
| return None | |
| def convert_adherence_to_numerical(adherence_score): | |
| if adherence_score: | |
| return 0.8 # True becomes 1 | |
| else: | |
| return 0.5 # False becomes 0 | |
| def retrieve_ground_truths(question, dataset,time_taken): | |
| """Retrieve the ground truth answer for a given question from the dataset.""" | |
| for split_name, instances in dataset.items(): | |
| for instance in instances: | |
| if instance['question'] == question: | |
| instance_response = instance['response'] | |
| adherence_numerical = convert_adherence_to_numerical(instance['adherence_score']) | |
| ground_truth_metrics = { | |
| "context_relevance": instance['relevance_score'], | |
| "context_utilization": instance['utilization_score'], | |
| "completeness": instance['completeness_score'], | |
| "adherence": adherence_numerical, | |
| "response_time": time_taken | |
| } | |
| return instance_response, ground_truth_metrics # Return the ground truth response immediately | |
| return None,None # Return None if no match is found | |
| def store_rmse(question, predicted_metrics, ground_truth_metrics): | |
| """Calculate and store RMSE for each metric.""" | |
| for metric_name in predicted_metrics: | |
| predicted_value = predicted_metrics[metric_name] | |
| # Get the corresponding ground truth value from ground_truth_metrics | |
| ground_truth_value = ground_truth_metrics.get(metric_name, None) | |
| # Debugging: Check the values being compared | |
| print(f"Comparing {metric_name}: Predicted = {predicted_value}, Ground Truth = {ground_truth_value}") | |
| # Ensure both predicted value and ground truth value are numeric before calculating RMSE | |
| if isinstance(predicted_value, (int, float)) and isinstance(ground_truth_value, (int, float)): | |
| rmse_value = compute_rmse([predicted_value], [ground_truth_value]) | |
| if rmse_value is not None: | |
| print(f"RMSE for {metric_name}: {rmse_value}") | |
| if question not in rmse_scores: | |
| rmse_scores[question] = {} | |
| rmse_scores[question][metric_name] = rmse_value | |
| else: | |
| print(f"Skipping RMSE for {metric_name}: One or both values are non-numeric") | |
| def calculate_metrics(question, q_dataset, response, docs, time_taken): | |
| data = load_query_dataset(q_dataset) | |
| ground_truth_answer, ground_truth_metrics = retrieve_ground_truths(question, data,time_taken) # Store the ground truth answer | |
| # Ensure ground_truth_answer is not empty before proceeding | |
| if ground_truth_answer is None: | |
| ground_truth_answer = "" # Default to an empty string if no ground truth is found | |
| # Convert ground truth to numeric form (e.g., using cosine similarity or some metric) | |
| # Here, let's assume completeness is based on cosine similarity between the response and the ground truth | |
| # ground_truth_completeness = compute_cosine_similarity(response, ground_truth_answer) | |
| # Predicted metrics | |
| # Predicted metrics | |
| predicted_metrics_rmse = { | |
| "context_relevance": context_relevance(question, docs), | |
| "context_utilization": context_utilization(response, docs), | |
| "completeness": compute_cosine_similarity(response, ground_truth_answer), #completeness(response, ground_truth_answer), | |
| "adherence": adherence(response, docs), | |
| "response_time": time_taken | |
| } | |
| store_rmse(question, predicted_metrics_rmse, ground_truth_metrics) | |
| # Now, make sure the values passed to RMSE calculation are numeric | |
| #predicted_completeness = predicted_metrics['completeness'] | |
| # Ensure both predicted_completeness and ground_truth_completeness are numeric before calculating RMSE | |
| ''' | |
| if isinstance(predicted_completeness, (int, float)) and isinstance(ground_truth_completeness, (int, float)): | |
| rmse_value = compute_rmse([predicted_completeness], [ground_truth_completeness]) | |
| predicted_metrics["rmse"] = rmse_value # Adding RMSE to metrics | |
| else: | |
| predicted_metrics["rmse"] = "Invalid RMSE calculation" | |
| ''' | |
| for metric_name in predicted_metrics_rmse: | |
| predicted_value = predicted_metrics_rmse[metric_name] | |
| print(f"RMSE for {metric_name}: {predicted_value}") | |
| for metric_name in ground_truth_metrics: | |
| ground_truth_value = ground_truth_metrics[metric_name] | |
| print(f"RMSE for {metric_name}: {ground_truth_value}") | |
| rmse_values = [] | |
| ground_truth_values = [] | |
| for metric_name in predicted_metrics_rmse: | |
| predicted_value = predicted_metrics_rmse[metric_name] | |
| ground_truth_value = ground_truth_metrics.get(metric_name, None) | |
| # Ensure both predicted and ground truth values are numeric | |
| if isinstance(predicted_value, (int, float)) and isinstance(ground_truth_value, (int, float)): | |
| rmse_values.append(predicted_value) | |
| ground_truth_values.append(ground_truth_value) | |
| else: | |
| print(f"Skipping RMSE for {metric_name}: One or both values are non-numeric") | |
| if rmse_values and ground_truth_values: | |
| overall_rmse = compute_rmse(rmse_values, ground_truth_values) | |
| print(f"Overall RMSE: {overall_rmse}") | |
| else: | |
| print("Invalid RMSE calculation due to non-numeric values.") | |
| predicted_metrics = { | |
| "RAG_model_response": response, | |
| "ground_truth": ground_truth_answer, | |
| "context_relevance": context_relevance(question, docs), | |
| "context_utilization": context_utilization(response, docs), | |
| "completeness": compute_cosine_similarity(response, ground_truth_answer), #completeness(response, ground_truth_answer), | |
| "adherence": adherence(response, docs), | |
| "response_time": time_taken, | |
| "rmse": overall_rmse | |
| } | |
| ''' | |
| if isinstance(predicted_metrics_rmse, (int, float)) and isinstance(ground_truth_metrics, (int, float)): | |
| rmse_value = compute_rmse(predicted_metrics_rmse.values(), ground_truth_metrics.values()) | |
| predicted_metrics_rmse["rmse"] = rmse_value # Adding RMSE to metrics | |
| else: | |
| predicted_metrics_rmse["rmse"] = "Invalid RMSE calculation" | |
| ''' | |
| return predicted_metrics | |
| ''' def retrieve_ground_truths(question, dataset): | |
| for split_name, instances in dataset.items(): | |
| print(f"Processing {split_name} split") | |
| for instance in instances: | |
| if instance['question'] == question: | |
| instance_id = instance['id'] | |
| instance_response = instance['response'] | |
| # ground_truth_metrics = { | |
| # "context_relevance": instance['relevance_score'], | |
| # "context_utilization": instance['utilization_score'], | |
| # "completeness": instance['completeness_score'], | |
| # "adherence": instance['adherence_score'] | |
| # } | |
| print(f"Match found in {split_name} split!") | |
| print(f"ID: {instance_id}, Response: {instance_response}") | |
| return instance_response # Return ground truth response immediately | |
| return None # Return None if no match is found | |
| ''' | |