import ast import logging import re from typing import Dict, List, Optional, Tuple import gradio as gr import matplotlib.pyplot as plt import numpy as np import pandas as pd from datasets import load_dataset from sklearn.cluster import KMeans from sklearn.decomposition import PCA from sklearn.impute import SimpleImputer from sklearn.metrics import silhouette_score from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) APP_TITLE = "Circuit Complexity Clustering" APP_SUBTITLE = ( "Unsupervised grouping of quantum circuits by structural complexity " "using only topology and gate features — no labels required." ) REPO_CONFIG = { "Core (Clean)": "QSBench/QSBench-Core-v1.0.0-demo", "Depolarizing Noise": "QSBench/QSBench-Depolarizing-Demo-v1.0.0", "Amplitude Damping": "QSBench/QSBench-Amplitude-v1.0.0-demo", "Transpilation (10q)": "QSBench/QSBench-Transpilation-v1.0.0-demo", } NON_FEATURE_COLS = { "sample_id", "sample_seed", "circuit_hash", "split", "circuit_qasm", "qasm_raw", "qasm_transpiled", "circuit_type_resolved", "circuit_type_requested", "noise_type", "noise_prob", "observable_bases", "observable_mode", "backend_device", "precision_mode", "circuit_signature", "entanglement", "meyer_wallach", "noise_label", } SOFT_EXCLUDE_PATTERNS = ["ideal_", "noisy_", "error_", "sign_ideal_", "sign_noisy_"] _ASSET_CACHE: Dict[str, pd.DataFrame] = {} _COMBINED_CACHE: Optional[pd.DataFrame] = None def safe_parse(value): """Safely parse stringified Python literals.""" if isinstance(value, str): try: return ast.literal_eval(value) except Exception: return value return value def adjacency_features(adj_value) -> Dict[str, float]: """Derive basic graph features from an adjacency matrix.""" parsed = safe_parse(adj_value) if not isinstance(parsed, list) or len(parsed) == 0: return { "adj_edge_count": np.nan, "adj_density": np.nan, "adj_degree_mean": np.nan, "adj_degree_std": np.nan, } try: arr = np.array(parsed, dtype=float) n = arr.shape[0] edge_count = float(np.triu(arr, k=1).sum()) possible_edges = float(n * (n - 1) / 2) density = edge_count / possible_edges if possible_edges > 0 else np.nan degrees = arr.sum(axis=1) return { "adj_edge_count": edge_count, "adj_density": density, "adj_degree_mean": float(np.mean(degrees)), "adj_degree_std": float(np.std(degrees)), } except Exception: return { "adj_edge_count": np.nan, "adj_density": np.nan, "adj_degree_mean": np.nan, "adj_degree_std": np.nan, } def qasm_features(qasm_value) -> Dict[str, float]: """Extract lightweight statistics from QASM text.""" if not isinstance(qasm_value, str) or not qasm_value.strip(): return { "qasm_length": np.nan, "qasm_line_count": np.nan, "qasm_gate_keyword_count": np.nan, "qasm_measure_count": np.nan, "qasm_comment_count": np.nan, } text = qasm_value lines = [line for line in text.splitlines() if line.strip()] gate_keywords = re.findall( r"\b(cx|h|x|y|z|rx|ry|rz|u1|u2|u3|u|swap|cz|ccx|rxx|ryy|rzz)\b", text, flags=re.IGNORECASE, ) measure_count = len(re.findall(r"\bmeasure\b", text, flags=re.IGNORECASE)) comment_count = sum(1 for line in lines if line.strip().startswith("//")) return { "qasm_length": float(len(text)), "qasm_line_count": float(len(lines)), "qasm_gate_keyword_count": float(len(gate_keywords)), "qasm_measure_count": float(measure_count), "qasm_comment_count": float(comment_count), } def enrich_dataframe(df: pd.DataFrame) -> pd.DataFrame: """Add derived numeric features for clustering.""" df = df.copy() if "adjacency" in df.columns: adj_df = df["adjacency"].apply(adjacency_features).apply(pd.Series) df = pd.concat([df, adj_df], axis=1) qasm_source = "qasm_transpiled" if "qasm_transpiled" in df.columns else "qasm_raw" if qasm_source in df.columns: qasm_df = df[qasm_source].apply(qasm_features).apply(pd.Series) df = pd.concat([df, qasm_df], axis=1) return df def load_dataset_df(dataset_key: str) -> pd.DataFrame: """Load a dataset shard from Hugging Face and cache it in memory.""" if dataset_key not in _ASSET_CACHE: logger.info("Loading dataset from Hugging Face: %s", dataset_key) ds = load_dataset(REPO_CONFIG[dataset_key]) df = pd.DataFrame(ds["train"]) df = enrich_dataframe(df) _ASSET_CACHE[dataset_key] = df return _ASSET_CACHE[dataset_key] def load_combined_dataset(dataset_keys: List[str]) -> pd.DataFrame: """Load and merge selected datasets.""" global _COMBINED_CACHE cache_key = "|".join(sorted(dataset_keys)) if _COMBINED_CACHE is None or getattr(load_combined_dataset, "_cache_key", None) != cache_key: frames = [load_dataset_df(key).assign(source_dataset=key) for key in dataset_keys] combined = pd.concat(frames, ignore_index=True) _COMBINED_CACHE = combined load_combined_dataset._cache_key = cache_key # type: ignore[attr-defined] return _COMBINED_CACHE def load_guide_content() -> str: """Load the markdown guide if it exists.""" try: with open("GUIDE.md", "r", encoding="utf-8") as f: return f.read() except FileNotFoundError: return "# Guide\n\nGuide file not found." def get_available_feature_columns(df: pd.DataFrame) -> List[str]: """Collect numeric feature columns, excluding metadata.""" numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() features = [] for col in numeric_cols: if col in NON_FEATURE_COLS: continue if any(pattern in col for pattern in SOFT_EXCLUDE_PATTERNS): continue features.append(col) return sorted(features) def default_feature_selection(features: List[str]) -> List[str]: """Select a stable default feature subset.""" preferred = [ "gate_entropy", "adj_density", "adj_degree_mean", "adj_degree_std", "depth", "total_gates", "single_qubit_gates", "two_qubit_gates", "cx_count", "qasm_length", "qasm_line_count", "qasm_gate_keyword_count", ] selected = [feature for feature in preferred if feature in features] return selected[:10] if selected else features[:10] def build_dataset_profile(df: pd.DataFrame) -> str: """Build a short dataset summary for the explorer tab.""" return ( f"### Dataset profile\n\n" f"**Rows:** {len(df):,} \n" f"**Columns:** {len(df.columns):,} \n" f"**Available datasets:** {len(REPO_CONFIG)}" ) def refresh_explorer(dataset_key: str, split_name: str) -> Tuple[gr.update, pd.DataFrame, str, str, str, str]: """Refresh the explorer view for the selected source dataset.""" df = load_dataset_df(dataset_key) splits = df["split"].dropna().unique().tolist() if "split" in df.columns else ["train"] if not splits: splits = ["train"] if split_name not in splits: split_name = splits[0] filtered = df[df["split"] == split_name] if "split" in df.columns else df display_df = filtered.head(12).copy() raw_qasm = display_df["qasm_raw"].iloc[0] if "qasm_raw" in display_df.columns and not display_df.empty else "// N/A" transpiled_qasm = display_df["qasm_transpiled"].iloc[0] if "qasm_transpiled" in display_df.columns and not display_df.empty else "// N/A" profile_box = build_dataset_profile(df) summary_box = ( f"### Split summary\n\n" f"**Dataset:** `{dataset_key}` \n" f"**Available splits:** {', '.join(splits)} \n" f"**Preview rows:** {len(display_df)}" ) return ( gr.update(choices=splits, value=split_name), display_df, raw_qasm, transpiled_qasm, profile_box, summary_box, ) def sync_feature_picker(dataset_keys: List[str]) -> gr.update: """Refresh the feature list from the selected datasets.""" if not dataset_keys: return gr.update(choices=[], value=[]) df = load_combined_dataset(dataset_keys) features = get_available_feature_columns(df) defaults = default_feature_selection(features) return gr.update(choices=features, value=defaults) def run_clustering( dataset_keys: List[str], feature_columns: List[str], n_clusters: int, random_state: float, ) -> Tuple[Optional[plt.Figure], str, pd.DataFrame]: """Run K-Means clustering and return a PCA plot plus metrics.""" if not dataset_keys: return None, "### ❌ Please select at least one dataset.", pd.DataFrame() if not feature_columns: return None, "### ❌ Please select at least one feature.", pd.DataFrame() df = load_combined_dataset(dataset_keys) train_df = df.dropna(subset=feature_columns).copy() if len(train_df) < 30: return None, "### ❌ Not enough rows after filtering missing values.", pd.DataFrame() X = train_df[feature_columns] seed = int(random_state) pipeline = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()), ("pca", PCA(n_components=2, random_state=seed)), ("kmeans", KMeans(n_clusters=n_clusters, random_state=seed, n_init=10)), ] ) pipeline.fit(X) labels = pipeline.named_steps["kmeans"].labels_ transformed = pipeline.named_steps["imputer"].transform(X) transformed = pipeline.named_steps["scaler"].transform(transformed) pca_coords = pipeline.named_steps["pca"].transform(transformed) try: sil_score = silhouette_score(X, labels) except Exception: sil_score = float("nan") fig, ax = plt.subplots(figsize=(10, 8)) scatter = ax.scatter( pca_coords[:, 0], pca_coords[:, 1], c=labels, cmap="tab10", s=30, alpha=0.8, ) ax.set_title(f"Circuit Complexity Clusters (K={n_clusters})") ax.set_xlabel("PCA Component 1") ax.set_ylabel("PCA Component 2") ax.grid(True, alpha=0.3) plt.colorbar(scatter, ax=ax, label="Cluster") plt.tight_layout() summary = train_df.copy() summary["cluster"] = labels cluster_summary = summary.groupby("cluster").size().reset_index(name="Number of Circuits") dataset_counts = ( summary.groupby(["cluster", "source_dataset"]).size().reset_index(name="Count") if "source_dataset" in summary.columns else pd.DataFrame() ) metrics_text = ( f"### Clustering Results\n\n" f"**Datasets used:** {', '.join(dataset_keys)} \n" f"**Number of circuits clustered:** {len(train_df):,} \n" f"**Number of clusters:** {n_clusters} \n" f"**Silhouette Score:** {sil_score:.4f} (closer to 1 = better separation)\n\n" f"**Cluster sizes:**\n" f"{cluster_summary.to_markdown(index=False)}" ) if not dataset_counts.empty: metrics_text += f"\n\n**Dataset composition per cluster:**\n{dataset_counts.to_markdown(index=False)}" return fig, metrics_text, cluster_summary CUSTOM_CSS = """ .gradio-container { max-width: 1400px !important; } footer { margin-top: 1rem; } """ with gr.Blocks(title=APP_TITLE) as demo: gr.Markdown(f"# 🌌 {APP_TITLE}") gr.Markdown(APP_SUBTITLE) with gr.Tabs(): with gr.TabItem("🔎 Explorer"): dataset_dropdown = gr.Dropdown( list(REPO_CONFIG.keys()), value="Amplitude Damping", label="Dataset", ) split_dropdown = gr.Dropdown( ["train"], value="train", label="Split", ) profile_box = gr.Markdown(value="### Loading dataset...") summary_box = gr.Markdown(value="### Loading split summary...") explorer_df = gr.Dataframe(label="Preview", interactive=False) with gr.Row(): raw_qasm = gr.Code(label="Raw QASM", language=None) transpiled_qasm = gr.Code(label="Transpiled QASM", language=None) with gr.TabItem("🧠 Clustering"): dataset_picker = gr.CheckboxGroup( label="Datasets", choices=list(REPO_CONFIG.keys()), value=list(REPO_CONFIG.keys()), ) feature_picker = gr.CheckboxGroup(label="Input features", choices=[]) n_clusters = gr.Slider(2, 10, value=4, step=1, label="Number of Clusters") seed = gr.Number(value=42, precision=0, label="Random Seed") run_btn = gr.Button("🚀 Run K-Means Clustering", variant="primary") plot = gr.Plot() metrics = gr.Markdown() cluster_table = gr.Dataframe(label="Cluster Sizes", interactive=False) with gr.TabItem("📖 Guide"): gr.Markdown(load_guide_content()) gr.Markdown("---") gr.Markdown( "### 🔗 Links\n" "[Website](https://qsbench.github.io) | " "[Hugging Face](https://huggingface.co/QSBench) | " "[GitHub](https://github.com/QSBench)" ) dataset_dropdown.change( refresh_explorer, [dataset_dropdown, split_dropdown], [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], ) split_dropdown.change( refresh_explorer, [dataset_dropdown, split_dropdown], [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], ) dataset_picker.change(sync_feature_picker, [dataset_picker], [feature_picker]) run_btn.click( run_clustering, [dataset_picker, feature_picker, n_clusters, seed], [plot, metrics, cluster_table], ) demo.load( refresh_explorer, [dataset_dropdown, split_dropdown], [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], ) demo.load(sync_feature_picker, [dataset_picker], [feature_picker]) if __name__ == "__main__": demo.launch(theme=gr.themes.Soft(), css=CUSTOM_CSS)