Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| from transformers import AutoModelForSequenceClassification, AutoTokenizer | |
| from datasets import load_dataset | |
| from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix | |
| import torch | |
| from sentence_transformers import SentenceTransformer | |
| import umap | |
| from sklearn.manifold import TSNE | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import seaborn as sns | |
| import tempfile | |
| from collections import Counter | |
| import os | |
| temp_dir = '/tmp/gradio_tmp' | |
| os.makedirs(temp_dir, exist_ok=True) # Creates the directory if it does not exist | |
| os.environ['GRADIO_TEMP_DIR'] = temp_dir | |
| # Load the models and their tokenizers | |
| model_paths = { | |
| "roberta-base-offensive": "./models/roberta-base-offensive", | |
| "distilbert-base-uncased-offensive": "./models/distilbert-base-uncased-offensive", | |
| "bert-offensive":"./models/bert-offensive", | |
| "deberta-offensive":"./models/deberta-offensive" | |
| } | |
| models = {name: AutoModelForSequenceClassification.from_pretrained(path) for name, path in model_paths.items()} | |
| tokenizers = {name: AutoTokenizer.from_pretrained(path) for name, path in model_paths.items()} | |
| # Load the dataset | |
| dataset = load_dataset("tweet_eval", "offensive") | |
| # Initialize Sentence Transformer for embedding generation | |
| model_embedding = SentenceTransformer('all-MiniLM-L6-v2') | |
| def encode(texts, tokenizer): | |
| return tokenizer(texts, padding="max_length", truncation=True, max_length=128, return_tensors="pt") | |
| def predict(model, inputs): | |
| model.eval() | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| preds = outputs.logits.argmax(-1).cpu().numpy() | |
| return preds | |
| def calculate_metrics(labels, preds): | |
| accuracy = accuracy_score(labels, preds) | |
| precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='binary') | |
| conf_matrix = confusion_matrix(labels, preds) | |
| return accuracy, precision, recall, f1, conf_matrix | |
| def generate_confusion_matrix(conf_matrix, model_name): | |
| plt.figure(figsize=(5, 4)) | |
| sns.heatmap(conf_matrix, annot=True, fmt="d") | |
| plt.title(f'Confusion Matrix: {model_name}') | |
| plt.ylabel('Actual') | |
| plt.xlabel('Predicted') | |
| plt.tight_layout() | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') | |
| plt.savefig(temp_file.name) | |
| plt.close() | |
| return temp_file.name | |
| def generate_embeddings_and_plot(categories): | |
| all_texts = sum(categories.values(), []) | |
| embeddings = model_embedding.encode(all_texts) | |
| # UMAP reduction | |
| umap_reducer = umap.UMAP(n_neighbors=15, n_components=2, metric='cosine') | |
| umap_embeddings = umap_reducer.fit_transform(embeddings) | |
| # t-SNE reduction | |
| tsne_embeddings = TSNE(n_components=2, perplexity=30).fit_transform(embeddings) | |
| # Plotting helper function to avoid repetition | |
| def plot_embeddings(embeddings, title, file_suffix): | |
| plt.figure(figsize=(10, 8)) | |
| colors = {"correct_both": "green", "incorrect_both": "red", "correct_model1_only": "blue", "correct_model2_only": "orange"} | |
| for category, color in colors.items(): | |
| indices = [i for i, text in enumerate(all_texts) if text in categories[category]] | |
| plt.scatter(embeddings[indices, 0], embeddings[indices, 1], label=category, color=color, alpha=0.6) | |
| plt.legend() | |
| plt.title(title) | |
| plt.xlabel('Component 1') | |
| plt.ylabel('Component 2') | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f'_{file_suffix}.png') | |
| plt.savefig(temp_file.name) | |
| plt.close() | |
| return temp_file.name | |
| # Generate and save plots | |
| umap_plot_path = plot_embeddings(umap_embeddings, "UMAP Projection of Text Categories", "umap") | |
| tsne_plot_path = plot_embeddings(tsne_embeddings, "t-SNE Projection of Text Categories", "tsne") | |
| return umap_plot_path, tsne_plot_path | |
| def compare_models(model1, model2): | |
| # Assuming dataset['test']['text'] returns a list of strings: | |
| test_texts = dataset['test']['text'] # This is directly usable if it's a list | |
| # Directly use the labels as a list, without calling .tolist() | |
| labels = dataset['test']['label'] | |
| inputs1 = encode(test_texts, tokenizers[model1]) | |
| inputs2 = encode(test_texts, tokenizers[model2]) | |
| preds1 = predict(models[model1], inputs1) | |
| preds2 = predict(models[model2], inputs2) | |
| metrics1 = calculate_metrics(labels, preds1) | |
| metrics2 = calculate_metrics(labels, preds2) | |
| categories = { | |
| "correct_both": [], | |
| "incorrect_both": [], | |
| "correct_model1_only": [], | |
| "correct_model2_only": [] | |
| } | |
| for i, label in enumerate(labels): | |
| text = test_texts[i] | |
| if preds1[i] == label and preds2[i] == label: | |
| categories["correct_both"].append(text) | |
| elif preds1[i] != label and preds2[i] != label: | |
| categories["incorrect_both"].append(text) | |
| elif preds1[i] == label and preds2[i] != label: | |
| categories["correct_model1_only"].append(text) | |
| elif preds1[i] != label and preds2[i] == label: | |
| categories["correct_model2_only"].append(text) | |
| # Generate metrics DataFrame | |
| metrics_df = pd.DataFrame({ | |
| "Metric": ["Accuracy", "Precision", "Recall", "F1 Score"], | |
| model1: metrics1[:-1], | |
| model2: metrics2[:-1], | |
| }) | |
| metrics_df["% Difference"] = ((metrics_df[model1] - metrics_df[model2]) / metrics_df[model2] * 100).apply(lambda x: f"{x:.2f}%") | |
| # Confusion matrices and visualizations | |
| conf_matrix_path1 = generate_confusion_matrix(metrics1[-1], model1) | |
| conf_matrix_path2 = generate_confusion_matrix(metrics2[-1], model2) | |
| umap_plot_path, tsne_plot_path = generate_embeddings_and_plot(categories) | |
| return metrics_df, conf_matrix_path1, conf_matrix_path2, umap_plot_path, tsne_plot_path, categories | |
| from sklearn.cluster import KMeans | |
| def generate_embeddings_and_cluster(categories): | |
| all_texts = sum(categories.values(), []) | |
| embeddings = model_embedding.encode(all_texts) | |
| # Category labels for all texts | |
| category_labels = [cat for cat, texts in categories.items() for _ in range(len(texts))] | |
| # Calculate overall category distribution | |
| overall_distribution = Counter(category_labels) | |
| overall_distribution_percent = {k: v / len(category_labels) * 100 for k, v in overall_distribution.items()} | |
| # K-means clustering | |
| kmeans = KMeans(n_clusters=3, random_state=42).fit(embeddings) | |
| labels = kmeans.labels_ | |
| # Map each text to its cluster and category | |
| cluster_categories = [[] for _ in range(3)] # Assuming 3 clusters | |
| for label, category in zip(labels, category_labels): | |
| cluster_categories[label].append(category) | |
| # Calculate category distribution within each cluster | |
| cluster_distributions = [] | |
| for i, cluster in enumerate(cluster_categories): | |
| distribution = Counter(cluster) | |
| distribution_percent = {k: v / len(cluster) * 100 for k, v in distribution.items()} | |
| cluster_distributions.append(distribution_percent) | |
| # Perform UMAP dimensionality reduction for visualization | |
| umap_reducer = umap.UMAP(n_neighbors=15, n_components=2, metric='cosine') | |
| reduced_embeddings = umap_reducer.fit_transform(embeddings) | |
| # Visualization | |
| plt.figure(figsize=(10, 8)) | |
| scatter = plt.scatter(reduced_embeddings[:, 0], reduced_embeddings[:, 1], c=labels, cmap='viridis', alpha=0.6) | |
| plt.legend(*scatter.legend_elements(), title="Clusters") | |
| plt.title("K-means Clustering of Text Embeddings") | |
| plt.xlabel('UMAP 1') | |
| plt.ylabel('UMAP 2') | |
| # Save the plot | |
| cluster_plot_path = tempfile.NamedTemporaryFile(delete=False, suffix='_cluster.png').name | |
| plt.savefig(cluster_plot_path) | |
| plt.close() | |
| return cluster_plot_path, overall_distribution_percent, cluster_distributions | |
| def setup_gradio_interface(): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Model Comparison and Text Analysis") | |
| with gr.Row(): | |
| model1_input = gr.Dropdown(list(model_paths.keys()), label="Model 1") | |
| model2_input = gr.Dropdown(list(model_paths.keys()), label="Model 2") | |
| submit_button = gr.Button("Compare") | |
| metrics_output = gr.Dataframe() | |
| with gr.Row(): | |
| model1_cm_output = gr.Image(label="Confusion Matrix for Model 1") | |
| model2_cm_output = gr.Image(label="Confusion Matrix for Model 2") | |
| with gr.Row(): | |
| umap_visualization_output = gr.Image(label="UMAP Text Categorization Visualization") | |
| tsne_visualization_output = gr.Image(label="t-SNE Text Categorization Visualization") | |
| clustering_visualization_output = gr.Image(label="K-means Clustering Visualization") | |
| category_distribution_output = gr.Dataframe(label="Category Distribution Comparison") | |
| def update_interface(model1, model2): | |
| metrics_df, cm_path1, cm_path2, umap_viz_path, tsne_viz_path, categories = compare_models(model1, model2) | |
| cluster_viz_path, overall_distribution_percent, cluster_distributions = generate_embeddings_and_cluster(categories) | |
| # Prepare DataFrame for category distribution comparison | |
| distribution_data = [] | |
| for cluster_index, cluster_distribution in enumerate(cluster_distributions, start=1): | |
| for category, percent in cluster_distribution.items(): | |
| distribution_data.append({ | |
| "Cluster": f"Cluster {cluster_index}", | |
| "Category": category, | |
| "Percentage": f"{percent:.2f}%", | |
| "Difference from Overall": f"{percent - overall_distribution_percent.get(category, 0):.2f}%" | |
| }) | |
| distribution_df = pd.DataFrame(distribution_data) | |
| return metrics_df, cm_path1, cm_path2, umap_viz_path, tsne_viz_path, cluster_viz_path, distribution_df | |
| submit_button.click( | |
| update_interface, | |
| inputs=[model1_input, model2_input], | |
| outputs=[metrics_output, model1_cm_output, model2_cm_output, umap_visualization_output, tsne_visualization_output, clustering_visualization_output, category_distribution_output] | |
| ) | |
| return demo | |
| demo = setup_gradio_interface() | |
| demo.launch(share=True) |