| | import numpy as np |
| | from chromadb import Client, Settings |
| | from sklearn.decomposition import PCA |
| | import hdbscan |
| | from sklearn.cluster import KMeans, OPTICS |
| | from sklearn.metrics import silhouette_score, calinski_harabasz_score |
| | import matplotlib.pyplot as plt |
| | from typing import List, Dict, Any, Tuple |
| | import seaborn as sns |
| | from tqdm import tqdm |
| | import joblib |
| | import os |
| | import json |
| | import argparse |
| | from datetime import datetime |
| | import warnings |
| |
|
| | |
| | import cudf |
| | import cuml |
| | from cuml.cluster import HDBSCAN as cuHDBSCAN |
| | from cuml.cluster import KMeans as cuKMeans |
| | from cuml.manifold import UMAP as cuUMAP |
| | import cupy as cp |
| |
|
| | |
| | warnings.filterwarnings('ignore', category=FutureWarning) |
| | warnings.filterwarnings('ignore', category=UserWarning) |
| |
|
| | class ClusterAnalyzer: |
| | def __init__(self, chroma_uri: str = "./Data/database", output_dir: str = None, experiment_name: str = None, use_gpu: bool = True): |
| | self.chroma_uri = chroma_uri |
| | self.client = Client(Settings( |
| | persist_directory=chroma_uri, |
| | anonymized_telemetry=False, |
| | is_persistent=True |
| | )) |
| | self.collection = self.client.get_collection("healthcare_qa") |
| | self.embeddings = None |
| | self.reduced_embeddings = None |
| | self.labels = None |
| | self.use_gpu = use_gpu |
| | |
| | |
| | self.results_dir = output_dir if output_dir else "./clustering_results" |
| | os.makedirs(self.results_dir, exist_ok=True) |
| | |
| | |
| | self.experiment_name = experiment_name if experiment_name else f"experiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
| | |
| | |
| | self.experiment_results = { |
| | "experiment_name": self.experiment_name, |
| | "timestamp": datetime.now().isoformat(), |
| | "parameters": {}, |
| | "metrics": {} |
| | } |
| | |
| | def load_embeddings(self) -> np.ndarray: |
| | """加载数据库中的embeddings""" |
| | print("正在加载embeddings...") |
| | cache_file = '/home/dyvm6xra/dyvm6xrauser11/workspace/projects/HKU/Chatbot/Data/Embeddings/embeddings_703df19c43bd6565563071b97e7172ce.npy' |
| | if os.path.exists(cache_file): |
| | self.embeddings = np.load(cache_file) |
| | print(f"从缓存文件加载embeddings,数据形状: {self.embeddings.shape}") |
| | else: |
| | result = self.collection.get(include=["embeddings"]) |
| | self.embeddings = np.array(result["embeddings"]) |
| | np.save(cache_file, self.embeddings) |
| | print(f"从数据库加载embeddings,并保存到缓存文件,数据形状: {self.embeddings.shape}") |
| | |
| | self.experiment_results["data_info"] = { |
| | "embeddings_shape": self.embeddings.shape |
| | } |
| | |
| | return self.embeddings |
| | |
| | def reduce_dimensions(self, method: str = "umap", n_components: int = 50, |
| | umap_n_neighbors: int = 50, umap_min_dist: float = 0.2) -> np.ndarray: |
| | """降维处理 |
| | |
| | Args: |
| | method: 降维方法,可选 "umap" 或 "pca" |
| | n_components: 降维后的维度 |
| | umap_n_neighbors: UMAP的邻居数量参数 |
| | umap_min_dist: UMAP的最小距离参数 |
| | """ |
| | if self.embeddings is None: |
| | self.load_embeddings() |
| | |
| | print(f"使用 {method} 进行降维...") |
| | |
| | |
| | self.experiment_results["parameters"]["dimension_reduction"] = { |
| | "method": method, |
| | "n_components": n_components |
| | } |
| | |
| | |
| | |
| | |
| | if method.lower() == "umap": |
| | |
| | self.experiment_results["parameters"]["dimension_reduction"].update({ |
| | "umap_n_neighbors": umap_n_neighbors, |
| | "umap_min_dist": umap_min_dist |
| | }) |
| | |
| | if self.use_gpu: |
| | print("使用 GPU 加速的 UMAP...") |
| | |
| | embeddings_gpu = cp.array(self.embeddings) |
| | |
| | |
| | reducer = cuUMAP( |
| | n_components=n_components, |
| | n_neighbors=umap_n_neighbors, |
| | min_dist=umap_min_dist, |
| | metric='cosine', |
| | random_state=42, |
| | verbose=True |
| | ) |
| | |
| | self.reduced_embeddings = reducer.fit_transform(embeddings_gpu) |
| | |
| | self.reduced_embeddings = cp.asnumpy(self.reduced_embeddings) |
| | else: |
| | |
| | reducer = UMAP( |
| | n_components=n_components, |
| | n_neighbors=umap_n_neighbors, |
| | min_dist=umap_min_dist, |
| | metric='cosine', |
| | random_state=42, |
| | n_jobs=-1, |
| | low_memory=True, |
| | verbose=True |
| | ) |
| | |
| | self.reduced_embeddings = reducer.fit_transform(self.embeddings) |
| | |
| | elif method.lower() == "pca": |
| | reducer = PCA( |
| | n_components=n_components, |
| | random_state=42, |
| | svd_solver='randomized' |
| | ) |
| | self.reduced_embeddings = reducer.fit_transform(self.embeddings) |
| | cumulative_variance = np.cumsum(reducer.explained_variance_ratio_) |
| | print(f"PCA累积解释方差比: {cumulative_variance[-1]:.4f}") |
| | |
| | |
| | self.experiment_results["metrics"]["pca_cumulative_variance"] = float(cumulative_variance[-1]) |
| | |
| | |
| | plt.figure(figsize=(10, 5)) |
| | plt.plot(range(1, len(cumulative_variance) + 1), cumulative_variance, 'bo-') |
| | plt.xlabel('主成分数量') |
| | plt.ylabel('累积解释方差比') |
| | plt.title('PCA Cumulative Explained Variance Ratio') |
| | plt.grid(True) |
| | plt.savefig(os.path.join(self.results_dir, f'{self.experiment_name}_pca_variance_ratio.png')) |
| | plt.close() |
| | |
| | |
| | |
| | |
| | |
| | return self.reduced_embeddings |
| | |
| | def cluster_hdbscan(self, min_cluster_size: int = 100, min_samples: int = 10) -> np.ndarray: |
| | """使用HDBSCAN进行聚类""" |
| | print("使用HDBSCAN进行聚类...") |
| | data = self.reduced_embeddings if self.reduced_embeddings is not None else self.embeddings |
| | |
| | |
| | self.experiment_results["parameters"]["clustering"] = { |
| | "method": "hdbscan", |
| | "min_cluster_size": min_cluster_size, |
| | "min_samples": min_samples |
| | } |
| | |
| | if self.use_gpu: |
| | print("使用 GPU 加速的 HDBSCAN...") |
| | |
| | data_gpu = cp.array(data) |
| | |
| | |
| | clusterer = cuHDBSCAN( |
| | min_cluster_size=min_cluster_size, |
| | min_samples=min_samples, |
| | metric='euclidean', |
| | cluster_selection_epsilon=0.0, |
| | allow_single_cluster=False, |
| | verbose=True |
| | ) |
| | clusterer.fit(data_gpu) |
| | self.labels = cp.asnumpy(clusterer.labels_) |
| | else: |
| | from umap import UMAP |
| | |
| | clusterer = hdbscan.HDBSCAN( |
| | min_cluster_size=min_cluster_size, |
| | min_samples=min_samples, |
| | metric='euclidean', |
| | cluster_selection_method='eom', |
| | prediction_data=True, |
| | core_dist_n_jobs=64 |
| | ) |
| | self.labels = clusterer.fit_predict(data) |
| | |
| | n_clusters = len(set(self.labels)) - (1 if -1 in self.labels else 0) |
| | n_noise = list(self.labels).count(-1) |
| | noise_ratio = n_noise / len(self.labels) |
| | |
| | print(f"发现 {n_clusters} 个聚类") |
| | print(f"噪声点数量: {n_noise} ({noise_ratio:.2%})") |
| | |
| | |
| | if n_clusters > 1: |
| | try: |
| | silhouette_avg = silhouette_score(data, self.labels, sample_size=10000) |
| | calinski_avg = calinski_harabasz_score(data, self.labels) |
| | |
| | print(f"轮廓系数: {silhouette_avg:.4f}") |
| | print(f"Calinski-Harabasz指数: {calinski_avg:.4f}") |
| | |
| | |
| | self.experiment_results["metrics"].update({ |
| | "silhouette_score": float(silhouette_avg), |
| | "calinski_harabasz_score": float(calinski_avg) |
| | }) |
| | except Exception as e: |
| | print(f"计算评估指标时出错: {e}") |
| | |
| | |
| | self.experiment_results["metrics"].update({ |
| | "n_clusters": n_clusters, |
| | "n_noise": n_noise, |
| | "noise_ratio": float(noise_ratio) |
| | }) |
| | |
| | |
| | results = { |
| | 'labels': self.labels, |
| | 'n_clusters': n_clusters, |
| | 'n_noise': n_noise, |
| | 'noise_ratio': noise_ratio |
| | } |
| | |
| | |
| | return self.labels |
| | |
| | def cluster_optics(self, min_samples: int = 50, max_eps: float = 0.5) -> np.ndarray: |
| | """使用OPTICS进行聚类""" |
| | print("使用OPTICS进行聚类...") |
| | data = self.reduced_embeddings if self.reduced_embeddings is not None else self.embeddings |
| | |
| | |
| | self.experiment_results["parameters"]["clustering"] = { |
| | "method": "optics", |
| | "min_samples": min_samples, |
| | "max_eps": max_eps |
| | } |
| | |
| | |
| | clustering = OPTICS( |
| | min_samples=min_samples, |
| | max_eps=max_eps, |
| | metric='euclidean', |
| | n_jobs=-1 |
| | ) |
| | self.labels = clustering.fit_predict(data) |
| | |
| | n_clusters = len(set(self.labels)) - (1 if -1 in self.labels else 0) |
| | n_noise = list(self.labels).count(-1) |
| | noise_ratio = n_noise / len(self.labels) |
| | |
| | print(f"发现 {n_clusters} 个聚类") |
| | print(f"噪声点数量: {n_noise} ({noise_ratio:.2%})") |
| | |
| | |
| | if n_clusters > 1: |
| | try: |
| | silhouette_avg = silhouette_score(data, self.labels, sample_size=10000) |
| | calinski_avg = calinski_harabasz_score(data, self.labels) |
| | |
| | print(f"轮廓系数: {silhouette_avg:.4f}") |
| | print(f"Calinski-Harabasz指数: {calinski_avg:.4f}") |
| | |
| | |
| | self.experiment_results["metrics"].update({ |
| | "silhouette_score": float(silhouette_avg), |
| | "calinski_harabasz_score": float(calinski_avg) |
| | }) |
| | except Exception as e: |
| | print(f"计算评估指标时出错: {e}") |
| | |
| | |
| | self.experiment_results["metrics"].update({ |
| | "n_clusters": n_clusters, |
| | "n_noise": n_noise, |
| | "noise_ratio": float(noise_ratio) |
| | }) |
| | |
| | |
| | results = { |
| | 'labels': self.labels, |
| | 'n_clusters': n_clusters, |
| | 'n_noise': n_noise, |
| | 'noise_ratio': noise_ratio |
| | } |
| | |
| | |
| | return self.labels |
| | |
| | def cluster_kmeans(self, n_clusters: int = 100) -> Tuple[np.ndarray, float]: |
| | """使用K-means进行聚类""" |
| | print("使用K-means进行聚类...") |
| | data = self.reduced_embeddings if self.reduced_embeddings is not None else self.embeddings |
| | |
| | |
| | self.experiment_results["parameters"]["clustering"] = { |
| | "method": "kmeans", |
| | "n_clusters": n_clusters |
| | } |
| | |
| | if self.use_gpu: |
| | print("使用 GPU 加速的 KMeans...") |
| | |
| | data_gpu = cp.array(data) |
| | |
| | |
| | kmeans = cuKMeans( |
| | n_clusters=n_clusters, |
| | random_state=42, |
| | n_init=10, |
| | max_iter=300, |
| | verbose=1 |
| | ) |
| | kmeans.fit(data_gpu) |
| | self.labels = cp.asnumpy(kmeans.labels_) |
| | inertia = float(kmeans.inertia_) |
| | else: |
| | |
| | kmeans = KMeans( |
| | n_clusters=n_clusters, |
| | random_state=42, |
| | n_init=10, |
| | max_iter=300, |
| | algorithm='elkan', |
| | n_jobs=-1 |
| | ) |
| | self.labels = kmeans.fit_predict(data) |
| | inertia = kmeans.inertia_ |
| | |
| | |
| | try: |
| | silhouette_avg = silhouette_score(data, self.labels, sample_size=10000) |
| | calinski_avg = calinski_harabasz_score(data, self.labels) |
| | |
| | print(f"聚类数量: {n_clusters}") |
| | print(f"轮廓系数: {silhouette_avg:.4f}") |
| | print(f"Calinski-Harabasz指数: {calinski_avg:.4f}") |
| | |
| | |
| | self.experiment_results["metrics"].update({ |
| | "silhouette_score": float(silhouette_avg), |
| | "calinski_harabasz_score": float(calinski_avg), |
| | "inertia": float(inertia) |
| | }) |
| | except Exception as e: |
| | print(f"计算评估指标时出错: {e}") |
| | |
| | |
| | results = { |
| | 'labels': self.labels, |
| | 'inertia': inertia |
| | } |
| | |
| | |
| | return self.labels, silhouette_avg |
| | |
| | def find_optimal_k(self, k_range: range) -> int: |
| | """使用肘部法则和多个评估指标找到最佳的K值""" |
| | print("寻找最佳K值...") |
| | data = self.reduced_embeddings if self.reduced_embeddings is not None else self.embeddings |
| | |
| | if self.use_gpu: |
| | |
| | data_gpu = cp.array(data) |
| | |
| | results = [] |
| | for k in tqdm(k_range): |
| | if self.use_gpu: |
| | kmeans = cuKMeans( |
| | n_clusters=k, |
| | random_state=42, |
| | n_init=5, |
| | max_iter=300, |
| | verbose=0 |
| | ) |
| | kmeans.fit(data_gpu) |
| | labels = cp.asnumpy(kmeans.labels_) |
| | inertia = float(kmeans.inertia_) |
| | else: |
| | kmeans = KMeans( |
| | n_clusters=k, |
| | random_state=42, |
| | n_init=5, |
| | algorithm='elkan', |
| | n_jobs=-1 |
| | ) |
| | labels = kmeans.fit_predict(data) |
| | inertia = kmeans.inertia_ |
| | |
| | |
| | silhouette_avg = silhouette_score(data, labels, sample_size=10000) |
| | calinski_avg = calinski_harabasz_score(data, labels) |
| | |
| | results.append({ |
| | 'k': k, |
| | 'inertia': float(inertia), |
| | 'silhouette': float(silhouette_avg), |
| | 'calinski': float(calinski_avg) |
| | }) |
| | |
| | |
| | |
| | |
| | |
| | self.experiment_results["kmeans_optimization"] = results |
| | |
| | |
| | plt.figure(figsize=(15, 5)) |
| | |
| | |
| | plt.subplot(1, 3, 1) |
| | plt.plot([r['k'] for r in results], [r['inertia'] for r in results], 'bx-') |
| | plt.xlabel('k') |
| | plt.ylabel('Inertia') |
| | plt.title('Elbow Method') |
| | |
| | |
| | plt.subplot(1, 3, 2) |
| | plt.plot([r['k'] for r in results], [r['silhouette'] for r in results], 'rx-') |
| | plt.xlabel('k') |
| | plt.ylabel('Silhouette Score') |
| | plt.title('Silhouette Analysis') |
| | |
| | |
| | plt.subplot(1, 3, 3) |
| | plt.plot([r['k'] for r in results], [r['calinski'] for r in results], 'gx-') |
| | plt.xlabel('k') |
| | plt.ylabel('Calinski-Harabasz Score') |
| | plt.title('Calinski-Harabasz Analysis') |
| | |
| | plt.tight_layout() |
| | plt.savefig(os.path.join(self.results_dir, f'{self.experiment_name}_kmeans_optimization.png')) |
| | plt.close() |
| | |
| | |
| | best_k = max(results, key=lambda x: x['silhouette'])['k'] |
| | |
| | |
| | self.experiment_results["metrics"]["best_k"] = best_k |
| | |
| | return best_k |
| | |
| | def visualize_clusters(self, title: str = "Cluster Visualization", sample_size: int = 10000): |
| | """可视化聚类结果(使用采样来处理大规模数据)""" |
| | if self.reduced_embeddings is None or self.labels is None: |
| | print("请先进行降维和聚类") |
| | return |
| | |
| | if self.reduced_embeddings.shape[1] != 2: |
| | print("只能可视化2维数据,请先使用reduce_dimensions降至2维") |
| | return |
| | |
| | |
| | if len(self.labels) > sample_size: |
| | indices = np.random.choice(len(self.labels), sample_size, replace=False) |
| | reduced_data = self.reduced_embeddings[indices] |
| | labels = self.labels[indices] |
| | else: |
| | reduced_data = self.reduced_embeddings |
| | labels = self.labels |
| | |
| | plt.figure(figsize=(12, 8)) |
| | scatter = plt.scatter( |
| | reduced_data[:, 0], |
| | reduced_data[:, 1], |
| | c=labels, |
| | cmap='tab20', |
| | alpha=0.6, |
| | s=20 |
| | ) |
| | plt.colorbar(scatter) |
| | plt.title(f"{title}\n(Sampled {sample_size:,} points)") |
| | |
| | |
| | plt.savefig(os.path.join(self.results_dir, f'{self.experiment_name}_cluster_visualization.png')) |
| | plt.close() |
| | |
| | def save_results(self): |
| | """保存实验结果到JSON文件""" |
| | |
| | self.experiment_results["end_time"] = datetime.now().isoformat() |
| | |
| | |
| | results_file = os.path.join(self.results_dir, f'{self.experiment_name}_results.json') |
| | with open(results_file, 'w') as f: |
| | json.dump(self.experiment_results, f, indent=2) |
| | |
| | print(f"实验结果已保存至: {results_file}") |
| | |
| | return results_file |
| |
|
| | def parse_args(): |
| | parser = argparse.ArgumentParser(description="聚类实验") |
| | |
| | |
| | parser.add_argument("--name", type=str, default=None, help="实验名称") |
| | |
| | |
| | parser.add_argument("--db_path", type=str, default="./Data/database", help="ChromaDB数据库路径") |
| | |
| | |
| | parser.add_argument("--output_dir", type=str, default="./clustering_results", help="结果输出目录") |
| | |
| | |
| | parser.add_argument("--dim_reduction", type=str, choices=["pca", "umap", "pca_umap"], default="pca_umap", |
| | help="降维方法: pca, umap, 或 pca_umap (两步降维)") |
| | |
| | |
| | parser.add_argument("--pca_components", type=int, default=50, help="PCA降维后的维度") |
| | |
| | |
| | parser.add_argument("--umap_components", type=int, default=2, help="UMAP降维后的维度") |
| | parser.add_argument("--umap_neighbors", type=int, default=50, help="UMAP邻居数量") |
| | parser.add_argument("--umap_min_dist", type=float, default=0.2, help="UMAP最小距离") |
| | |
| | |
| | parser.add_argument("--clustering", type=str, choices=["hdbscan", "kmeans", "optics"], default="hdbscan", |
| | help="聚类方法: hdbscan, kmeans, 或 optics") |
| | |
| | |
| | parser.add_argument("--hdbscan_min_cluster_size", type=int, default=100, help="HDBSCAN最小簇大小") |
| | parser.add_argument("--hdbscan_min_samples", type=int, default=10, help="HDBSCAN最小样本数") |
| | |
| | |
| | parser.add_argument("--optics_min_samples", type=int, default=50, help="OPTICS最小样本数") |
| | parser.add_argument("--optics_max_eps", type=float, default=0.5, help="OPTICS最大邻域距离") |
| | |
| | |
| | parser.add_argument("--kmeans_clusters", type=int, default=0, |
| | help="KMeans聚类数量 (0表示自动寻找最佳K值)") |
| | parser.add_argument("--kmeans_min_k", type=int, default=50, help="寻找最佳K值的最小K") |
| | parser.add_argument("--kmeans_max_k", type=int, default=200, help="寻找最佳K值的最大K") |
| | parser.add_argument("--kmeans_step", type=int, default=10, help="寻找最佳K值的步长") |
| | |
| | |
| | parser.add_argument("--use_gpu", action="store_true", help="是否使用 GPU 加速") |
| | |
| | return parser.parse_args() |
| |
|
| | def main(): |
| | |
| | args = parse_args() |
| | |
| | |
| | if not args.name: |
| | gpu_tag = "gpu" if args.use_gpu else "cpu" |
| | args.name = f"{args.dim_reduction}_{args.clustering}_{gpu_tag}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
| | |
| | |
| | analyzer = ClusterAnalyzer( |
| | chroma_uri=args.db_path, |
| | output_dir=args.output_dir, |
| | experiment_name=args.name, |
| | use_gpu=args.use_gpu |
| | ) |
| | |
| | |
| | analyzer.load_embeddings() |
| | |
| | |
| | if args.dim_reduction == "pca": |
| | |
| | analyzer.reduce_dimensions(method="pca", n_components=args.pca_components) |
| | elif args.dim_reduction == "umap": |
| | |
| | analyzer.reduce_dimensions( |
| | method="umap", |
| | n_components=args.umap_components, |
| | umap_n_neighbors=args.umap_neighbors, |
| | umap_min_dist=args.umap_min_dist |
| | ) |
| | elif args.dim_reduction == "pca_umap": |
| | |
| | print("\n=== 第一阶段降维(PCA) ===") |
| | analyzer.reduce_dimensions(method="pca", n_components=args.pca_components) |
| | |
| | print("\n=== 第二阶段降维(UMAP) ===") |
| | analyzer.reduce_dimensions( |
| | method="umap", |
| | n_components=args.umap_components, |
| | umap_n_neighbors=args.umap_neighbors, |
| | umap_min_dist=args.umap_min_dist |
| | ) |
| | |
| | |
| | if args.clustering == "hdbscan": |
| | print("\n=== HDBSCAN聚类 ===") |
| | analyzer.cluster_hdbscan( |
| | min_cluster_size=args.hdbscan_min_cluster_size, |
| | min_samples=args.hdbscan_min_samples |
| | ) |
| | elif args.clustering == "optics": |
| | print("\n=== OPTICS聚类 ===") |
| | analyzer.cluster_optics( |
| | min_samples=args.optics_min_samples, |
| | max_eps=args.optics_max_eps |
| | ) |
| | elif args.clustering == "kmeans": |
| | if args.kmeans_clusters > 0: |
| | |
| | print(f"\n=== K-means聚类 (K={args.kmeans_clusters}) ===") |
| | analyzer.cluster_kmeans(n_clusters=args.kmeans_clusters) |
| | else: |
| | |
| | print("\n=== 寻找最佳K值 ===") |
| | k_range = range(args.kmeans_min_k, args.kmeans_max_k + 1, args.kmeans_step) |
| | best_k = analyzer.find_optimal_k(k_range) |
| | print(f"最佳聚类数量: {best_k}") |
| | |
| | print("\n=== K-means聚类 (最佳K) ===") |
| | analyzer.cluster_kmeans(n_clusters=best_k) |
| | |
| | |
| | if args.umap_components == 2 or (args.dim_reduction == "pca" and args.pca_components == 2): |
| | analyzer.visualize_clusters(f"{args.clustering.upper()} Clustering Results") |
| | |
| | |
| | analyzer.save_results() |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|