Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import time | |
| from sklearn.base import BaseEstimator, clone | |
| from sklearn.cluster import AgglomerativeClustering | |
| from sklearn.datasets import make_blobs | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.inspection import DecisionBoundaryDisplay | |
| from sklearn.utils.metaestimators import available_if | |
| from sklearn.utils.validation import check_is_fitted | |
| model_card = f""" | |
| ## Description | |
| **Clustering** can be costly, especially when we have a lot of data. | |
| Some clustering algorithms cannot be used with new data without redoing the clustering, which can be difficult. | |
| Instead, we can use clustering to create a model with a classifier, it calls **Inductive Clustering** | |
| This demo illustrates a generic implementation of a meta-estimator which extends clustering by inducing a classifier from the cluster labels, and compares the running time. | |
| You can play around with different ``number of samples`` and ``number of new data`` to see the effect | |
| ## Dataset | |
| Simulation dataset | |
| """ | |
| def _classifier_has(attr): | |
| """Check if we can delegate a method to the underlying classifier. | |
| First, we check the first fitted classifier if available, otherwise we | |
| check the unfitted classifier. | |
| """ | |
| return lambda estimator: ( | |
| hasattr(estimator.classifier_, attr) | |
| if hasattr(estimator, "classifier_") | |
| else hasattr(estimator.classifier, attr) | |
| ) | |
| class InductiveClusterer(BaseEstimator): | |
| def __init__(self, clusterer, classifier): | |
| self.clusterer = clusterer | |
| self.classifier = classifier | |
| def fit(self, X, y=None): | |
| self.clusterer_ = clone(self.clusterer) | |
| self.classifier_ = clone(self.classifier) | |
| y = self.clusterer_.fit_predict(X) | |
| self.classifier_.fit(X, y) | |
| return self | |
| def predict(self, X): | |
| check_is_fitted(self) | |
| return self.classifier_.predict(X) | |
| def decision_function(self, X): | |
| check_is_fitted(self) | |
| return self.classifier_.decision_function(X) | |
| def do_train(n_samples, n_new_data): | |
| N_SAMPLES = n_samples | |
| N_NEW_DATA = n_new_data | |
| RANDOM_STATE = 42 | |
| # Generate some training data from clustering | |
| X, y = make_blobs( | |
| n_samples=N_SAMPLES, | |
| cluster_std=[1.0, 1.0, 0.5], | |
| centers=[(-5, -5), (0, 0), (5, 5)], | |
| random_state=RANDOM_STATE, | |
| ) | |
| # Train a clustering algorithm on the training data and get the cluster labels | |
| clusterer = AgglomerativeClustering(n_clusters=3) | |
| cluster_labels = clusterer.fit_predict(X) | |
| fig1, axes1 = plt.subplots() | |
| axes1.scatter(X[:, 0], X[:, 1], c=cluster_labels, alpha=0.5, edgecolor="k") | |
| axes1.set_title("Ward Linkage") | |
| # Generate new samples and plot them along with the original dataset | |
| X_new, y_new = make_blobs( | |
| n_samples=N_NEW_DATA, centers=[(-7, -1), (-2, 4), (3, 6)], random_state=RANDOM_STATE | |
| ) | |
| X_all = np.concatenate((X, X_new), axis=0) | |
| fig2, axes2 = plt.subplots() | |
| axes2.scatter(X[:, 0], X[:, 1], c=cluster_labels, alpha=0.5, edgecolor="k") | |
| axes2.scatter(X_new[:, 0], X_new[:, 1], c="black", alpha=1, edgecolor="k") | |
| axes2.set_title("Unknown instances") | |
| # Declare the inductive learning model that it will be used to | |
| # predict cluster membership for unknown instances | |
| t1 = time.time() | |
| classifier = RandomForestClassifier(random_state=RANDOM_STATE) | |
| inductive_learner = InductiveClusterer(clusterer, classifier).fit(X) | |
| probable_clusters = inductive_learner.predict(X_new) | |
| fig3, axes3 = plt.subplots() | |
| disp = DecisionBoundaryDisplay.from_estimator( | |
| inductive_learner, X, response_method="predict", alpha=0.4, ax=axes3 | |
| ) | |
| disp.ax_.set_title("Classify unknown instances with known clusters") | |
| disp.ax_.scatter(X[:, 0], X[:, 1], c=cluster_labels, alpha=0.5, edgecolor="k") | |
| disp.ax_.scatter(X_new[:, 0], X_new[:, 1], c=probable_clusters, alpha=0.5, edgecolor="k") | |
| t1_running = time.time() - t1 | |
| # recomputing clustering and classify boundary | |
| t2 = time.time() | |
| clusterer = AgglomerativeClustering(n_clusters=3) | |
| y = clusterer.fit_predict(X_all) | |
| classifier = RandomForestClassifier(random_state=RANDOM_STATE).fit(X_all, y) | |
| fig4, axes4 = plt.subplots() | |
| disp = DecisionBoundaryDisplay.from_estimator( | |
| classifier, X_all, response_method="predict", alpha=0.4, ax=axes4 | |
| ) | |
| disp.ax_.set_title("Classify unknown instance with recomputing clusters") | |
| disp.ax_.scatter(X_all[:, 0], X_all[:, 1], c=y, alpha=0.5, edgecolor="k") | |
| t2_running = time.time() - t2 | |
| text = f"Inductive Clustering running time: {t1_running:.4f}s. Recomputing clusters running time: {t2_running:.4f}s" | |
| return fig1, fig2, fig3, fig4, text | |
| with gr.Blocks() as demo: | |
| gr.Markdown(''' | |
| <div> | |
| <h1 style='text-align: center'>Inductive Clustering</h1> | |
| </div> | |
| ''') | |
| gr.Markdown(model_card) | |
| gr.Markdown("Author: <a href=\"https://huggingface.co/vumichien\">Vu Minh Chien</a>. Based on the example from <a href=\"https://scikit-learn.org/stable/auto_examples/cluster/plot_inductive_clustering.html#sphx-glr-auto-examples-cluster-plot-inductive-clustering-py\">scikit-learn</a>") | |
| n_samples = gr.Slider(minimum=1000, maximum=5000, step=500, value=1000, label="Number of samples") | |
| n_new_data = gr.Slider(minimum=10, maximum=100, step=10, value=10, label="Number of new data") | |
| with gr.Row(): | |
| with gr.Column(): | |
| plot1 = gr.Plot(label="Clustering") | |
| with gr.Column(): | |
| plot2 = gr.Plot(label="Clustering with new data") | |
| with gr.Row(): | |
| with gr.Column(): | |
| plot3 = gr.Plot(label="Inductive clustering") | |
| with gr.Column(): | |
| plot4 = gr.Plot(label="Recomputing clustering") | |
| with gr.Row(): | |
| results = gr.Textbox(label="Results") | |
| n_samples.change(fn=do_train, inputs=[n_samples, n_new_data], outputs=[plot1, plot2, plot3, plot4, results]) | |
| n_new_data.change(fn=do_train, inputs=[n_samples, n_new_data], outputs=[plot1, plot2, plot3, plot4, results]) | |
| demo.launch() |