| import ast |
| import logging |
| import re |
| from typing import Dict, List, Optional, Tuple |
|
|
| import gradio as gr |
| import matplotlib.pyplot as plt |
| import numpy as np |
| import pandas as pd |
| from datasets import load_dataset |
| from sklearn.cluster import KMeans |
| from sklearn.decomposition import PCA |
| from sklearn.impute import SimpleImputer |
| from sklearn.metrics import silhouette_score |
| from sklearn.pipeline import Pipeline |
| from sklearn.preprocessing import StandardScaler |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| APP_TITLE = "Circuit Complexity Clustering" |
| APP_SUBTITLE = ( |
| "Unsupervised grouping of quantum circuits by structural complexity " |
| "using only topology and gate features β no labels required." |
| ) |
|
|
| REPO_CONFIG = { |
| "Core (Clean)": "QSBench/QSBench-Core-v1.0.0-demo", |
| "Depolarizing Noise": "QSBench/QSBench-Depolarizing-Demo-v1.0.0", |
| "Amplitude Damping": "QSBench/QSBench-Amplitude-v1.0.0-demo", |
| "Transpilation (10q)": "QSBench/QSBench-Transpilation-v1.0.0-demo", |
| } |
|
|
| NON_FEATURE_COLS = { |
| "sample_id", |
| "sample_seed", |
| "circuit_hash", |
| "split", |
| "circuit_qasm", |
| "qasm_raw", |
| "qasm_transpiled", |
| "circuit_type_resolved", |
| "circuit_type_requested", |
| "noise_type", |
| "noise_prob", |
| "observable_bases", |
| "observable_mode", |
| "backend_device", |
| "precision_mode", |
| "circuit_signature", |
| "entanglement", |
| "meyer_wallach", |
| "noise_label", |
| } |
|
|
| SOFT_EXCLUDE_PATTERNS = ["ideal_", "noisy_", "error_", "sign_ideal_", "sign_noisy_"] |
|
|
| _ASSET_CACHE: Dict[str, pd.DataFrame] = {} |
| _COMBINED_CACHE: Optional[pd.DataFrame] = None |
|
|
|
|
| def safe_parse(value): |
| """Safely parse stringified Python literals.""" |
| if isinstance(value, str): |
| try: |
| return ast.literal_eval(value) |
| except Exception: |
| return value |
| return value |
|
|
|
|
| def adjacency_features(adj_value) -> Dict[str, float]: |
| """Derive basic graph features from an adjacency matrix.""" |
| parsed = safe_parse(adj_value) |
| if not isinstance(parsed, list) or len(parsed) == 0: |
| return { |
| "adj_edge_count": np.nan, |
| "adj_density": np.nan, |
| "adj_degree_mean": np.nan, |
| "adj_degree_std": np.nan, |
| } |
| try: |
| arr = np.array(parsed, dtype=float) |
| n = arr.shape[0] |
| edge_count = float(np.triu(arr, k=1).sum()) |
| possible_edges = float(n * (n - 1) / 2) |
| density = edge_count / possible_edges if possible_edges > 0 else np.nan |
| degrees = arr.sum(axis=1) |
| return { |
| "adj_edge_count": edge_count, |
| "adj_density": density, |
| "adj_degree_mean": float(np.mean(degrees)), |
| "adj_degree_std": float(np.std(degrees)), |
| } |
| except Exception: |
| return { |
| "adj_edge_count": np.nan, |
| "adj_density": np.nan, |
| "adj_degree_mean": np.nan, |
| "adj_degree_std": np.nan, |
| } |
|
|
|
|
| def qasm_features(qasm_value) -> Dict[str, float]: |
| """Extract lightweight statistics from QASM text.""" |
| if not isinstance(qasm_value, str) or not qasm_value.strip(): |
| return { |
| "qasm_length": np.nan, |
| "qasm_line_count": np.nan, |
| "qasm_gate_keyword_count": np.nan, |
| "qasm_measure_count": np.nan, |
| "qasm_comment_count": np.nan, |
| } |
| text = qasm_value |
| lines = [line for line in text.splitlines() if line.strip()] |
| gate_keywords = re.findall( |
| r"\b(cx|h|x|y|z|rx|ry|rz|u1|u2|u3|u|swap|cz|ccx|rxx|ryy|rzz)\b", |
| text, |
| flags=re.IGNORECASE, |
| ) |
| measure_count = len(re.findall(r"\bmeasure\b", text, flags=re.IGNORECASE)) |
| comment_count = sum(1 for line in lines if line.strip().startswith("//")) |
| return { |
| "qasm_length": float(len(text)), |
| "qasm_line_count": float(len(lines)), |
| "qasm_gate_keyword_count": float(len(gate_keywords)), |
| "qasm_measure_count": float(measure_count), |
| "qasm_comment_count": float(comment_count), |
| } |
|
|
|
|
| def enrich_dataframe(df: pd.DataFrame) -> pd.DataFrame: |
| """Add derived numeric features for clustering.""" |
| df = df.copy() |
| if "adjacency" in df.columns: |
| adj_df = df["adjacency"].apply(adjacency_features).apply(pd.Series) |
| df = pd.concat([df, adj_df], axis=1) |
| qasm_source = "qasm_transpiled" if "qasm_transpiled" in df.columns else "qasm_raw" |
| if qasm_source in df.columns: |
| qasm_df = df[qasm_source].apply(qasm_features).apply(pd.Series) |
| df = pd.concat([df, qasm_df], axis=1) |
| return df |
|
|
|
|
| def load_dataset_df(dataset_key: str) -> pd.DataFrame: |
| """Load a dataset shard from Hugging Face and cache it in memory.""" |
| if dataset_key not in _ASSET_CACHE: |
| logger.info("Loading dataset from Hugging Face: %s", dataset_key) |
| ds = load_dataset(REPO_CONFIG[dataset_key]) |
| df = pd.DataFrame(ds["train"]) |
| df = enrich_dataframe(df) |
| _ASSET_CACHE[dataset_key] = df |
| return _ASSET_CACHE[dataset_key] |
|
|
|
|
| def load_combined_dataset(dataset_keys: List[str]) -> pd.DataFrame: |
| """Load and merge selected datasets.""" |
| global _COMBINED_CACHE |
| cache_key = "|".join(sorted(dataset_keys)) |
| if _COMBINED_CACHE is None or getattr(load_combined_dataset, "_cache_key", None) != cache_key: |
| frames = [load_dataset_df(key).assign(source_dataset=key) for key in dataset_keys] |
| combined = pd.concat(frames, ignore_index=True) |
| _COMBINED_CACHE = combined |
| load_combined_dataset._cache_key = cache_key |
| return _COMBINED_CACHE |
|
|
|
|
| def load_guide_content() -> str: |
| """Load the markdown guide if it exists.""" |
| try: |
| with open("GUIDE.md", "r", encoding="utf-8") as f: |
| return f.read() |
| except FileNotFoundError: |
| return "# Guide\n\nGuide file not found." |
|
|
|
|
| def get_available_feature_columns(df: pd.DataFrame) -> List[str]: |
| """Collect numeric feature columns, excluding metadata.""" |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() |
| features = [] |
| for col in numeric_cols: |
| if col in NON_FEATURE_COLS: |
| continue |
| if any(pattern in col for pattern in SOFT_EXCLUDE_PATTERNS): |
| continue |
| features.append(col) |
| return sorted(features) |
|
|
|
|
| def default_feature_selection(features: List[str]) -> List[str]: |
| """Select a stable default feature subset.""" |
| preferred = [ |
| "gate_entropy", |
| "adj_density", |
| "adj_degree_mean", |
| "adj_degree_std", |
| "depth", |
| "total_gates", |
| "single_qubit_gates", |
| "two_qubit_gates", |
| "cx_count", |
| "qasm_length", |
| "qasm_line_count", |
| "qasm_gate_keyword_count", |
| ] |
| selected = [feature for feature in preferred if feature in features] |
| return selected[:10] if selected else features[:10] |
|
|
|
|
| def build_dataset_profile(df: pd.DataFrame) -> str: |
| """Build a short dataset summary for the explorer tab.""" |
| return ( |
| f"### Dataset profile\n\n" |
| f"**Rows:** {len(df):,} \n" |
| f"**Columns:** {len(df.columns):,} \n" |
| f"**Available datasets:** {len(REPO_CONFIG)}" |
| ) |
|
|
|
|
| def refresh_explorer(dataset_key: str, split_name: str) -> Tuple[gr.update, pd.DataFrame, str, str, str, str]: |
| """Refresh the explorer view for the selected source dataset.""" |
| df = load_dataset_df(dataset_key) |
| splits = df["split"].dropna().unique().tolist() if "split" in df.columns else ["train"] |
| if not splits: |
| splits = ["train"] |
|
|
| if split_name not in splits: |
| split_name = splits[0] |
|
|
| filtered = df[df["split"] == split_name] if "split" in df.columns else df |
| display_df = filtered.head(12).copy() |
|
|
| raw_qasm = display_df["qasm_raw"].iloc[0] if "qasm_raw" in display_df.columns and not display_df.empty else "// N/A" |
| transpiled_qasm = display_df["qasm_transpiled"].iloc[0] if "qasm_transpiled" in display_df.columns and not display_df.empty else "// N/A" |
|
|
| profile_box = build_dataset_profile(df) |
| summary_box = ( |
| f"### Split summary\n\n" |
| f"**Dataset:** `{dataset_key}` \n" |
| f"**Available splits:** {', '.join(splits)} \n" |
| f"**Preview rows:** {len(display_df)}" |
| ) |
|
|
| return ( |
| gr.update(choices=splits, value=split_name), |
| display_df, |
| raw_qasm, |
| transpiled_qasm, |
| profile_box, |
| summary_box, |
| ) |
|
|
|
|
| def sync_feature_picker(dataset_keys: List[str]) -> gr.update: |
| """Refresh the feature list from the selected datasets.""" |
| if not dataset_keys: |
| return gr.update(choices=[], value=[]) |
|
|
| df = load_combined_dataset(dataset_keys) |
| features = get_available_feature_columns(df) |
| defaults = default_feature_selection(features) |
| return gr.update(choices=features, value=defaults) |
|
|
|
|
| def run_clustering( |
| dataset_keys: List[str], |
| feature_columns: List[str], |
| n_clusters: int, |
| random_state: float, |
| ) -> Tuple[Optional[plt.Figure], str, pd.DataFrame]: |
| """Run K-Means clustering and return a PCA plot plus metrics.""" |
| if not dataset_keys: |
| return None, "### β Please select at least one dataset.", pd.DataFrame() |
|
|
| if not feature_columns: |
| return None, "### β Please select at least one feature.", pd.DataFrame() |
|
|
| df = load_combined_dataset(dataset_keys) |
| train_df = df.dropna(subset=feature_columns).copy() |
|
|
| if len(train_df) < 30: |
| return None, "### β Not enough rows after filtering missing values.", pd.DataFrame() |
|
|
| X = train_df[feature_columns] |
| seed = int(random_state) |
|
|
| pipeline = Pipeline( |
| steps=[ |
| ("imputer", SimpleImputer(strategy="median")), |
| ("scaler", StandardScaler()), |
| ("pca", PCA(n_components=2, random_state=seed)), |
| ("kmeans", KMeans(n_clusters=n_clusters, random_state=seed, n_init=10)), |
| ] |
| ) |
|
|
| pipeline.fit(X) |
| labels = pipeline.named_steps["kmeans"].labels_ |
|
|
| transformed = pipeline.named_steps["imputer"].transform(X) |
| transformed = pipeline.named_steps["scaler"].transform(transformed) |
| pca_coords = pipeline.named_steps["pca"].transform(transformed) |
|
|
| try: |
| sil_score = silhouette_score(X, labels) |
| except Exception: |
| sil_score = float("nan") |
|
|
| fig, ax = plt.subplots(figsize=(10, 8)) |
| scatter = ax.scatter( |
| pca_coords[:, 0], |
| pca_coords[:, 1], |
| c=labels, |
| cmap="tab10", |
| s=30, |
| alpha=0.8, |
| ) |
| ax.set_title(f"Circuit Complexity Clusters (K={n_clusters})") |
| ax.set_xlabel("PCA Component 1") |
| ax.set_ylabel("PCA Component 2") |
| ax.grid(True, alpha=0.3) |
| plt.colorbar(scatter, ax=ax, label="Cluster") |
| plt.tight_layout() |
|
|
| summary = train_df.copy() |
| summary["cluster"] = labels |
| cluster_summary = summary.groupby("cluster").size().reset_index(name="Number of Circuits") |
|
|
| dataset_counts = ( |
| summary.groupby(["cluster", "source_dataset"]).size().reset_index(name="Count") |
| if "source_dataset" in summary.columns |
| else pd.DataFrame() |
| ) |
|
|
| metrics_text = ( |
| f"### Clustering Results\n\n" |
| f"**Datasets used:** {', '.join(dataset_keys)} \n" |
| f"**Number of circuits clustered:** {len(train_df):,} \n" |
| f"**Number of clusters:** {n_clusters} \n" |
| f"**Silhouette Score:** {sil_score:.4f} (closer to 1 = better separation)\n\n" |
| f"**Cluster sizes:**\n" |
| f"{cluster_summary.to_markdown(index=False)}" |
| ) |
|
|
| if not dataset_counts.empty: |
| metrics_text += f"\n\n**Dataset composition per cluster:**\n{dataset_counts.to_markdown(index=False)}" |
|
|
| return fig, metrics_text, cluster_summary |
|
|
|
|
| CUSTOM_CSS = """ |
| .gradio-container { |
| max-width: 1400px !important; |
| } |
| footer { |
| margin-top: 1rem; |
| } |
| """ |
|
|
| with gr.Blocks(title=APP_TITLE) as demo: |
| gr.Markdown(f"# π {APP_TITLE}") |
| gr.Markdown(APP_SUBTITLE) |
|
|
| with gr.Tabs(): |
| with gr.TabItem("π Explorer"): |
| dataset_dropdown = gr.Dropdown( |
| list(REPO_CONFIG.keys()), |
| value="Amplitude Damping", |
| label="Dataset", |
| ) |
| split_dropdown = gr.Dropdown( |
| ["train"], |
| value="train", |
| label="Split", |
| ) |
| profile_box = gr.Markdown(value="### Loading dataset...") |
| summary_box = gr.Markdown(value="### Loading split summary...") |
| explorer_df = gr.Dataframe(label="Preview", interactive=False) |
|
|
| with gr.Row(): |
| raw_qasm = gr.Code(label="Raw QASM", language=None) |
| transpiled_qasm = gr.Code(label="Transpiled QASM", language=None) |
|
|
| with gr.TabItem("π§ Clustering"): |
| dataset_picker = gr.CheckboxGroup( |
| label="Datasets", |
| choices=list(REPO_CONFIG.keys()), |
| value=list(REPO_CONFIG.keys()), |
| ) |
| feature_picker = gr.CheckboxGroup(label="Input features", choices=[]) |
| n_clusters = gr.Slider(2, 10, value=4, step=1, label="Number of Clusters") |
| seed = gr.Number(value=42, precision=0, label="Random Seed") |
| run_btn = gr.Button("π Run K-Means Clustering", variant="primary") |
| plot = gr.Plot() |
| metrics = gr.Markdown() |
| cluster_table = gr.Dataframe(label="Cluster Sizes", interactive=False) |
|
|
| with gr.TabItem("π Guide"): |
| gr.Markdown(load_guide_content()) |
|
|
| gr.Markdown("---") |
| gr.Markdown( |
| "### π Links\n" |
| "[Website](https://qsbench.github.io) | " |
| "[Hugging Face](https://huggingface.co/QSBench) | " |
| "[GitHub](https://github.com/QSBench)" |
| ) |
|
|
| dataset_dropdown.change( |
| refresh_explorer, |
| [dataset_dropdown, split_dropdown], |
| [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], |
| ) |
| split_dropdown.change( |
| refresh_explorer, |
| [dataset_dropdown, split_dropdown], |
| [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], |
| ) |
|
|
| dataset_picker.change(sync_feature_picker, [dataset_picker], [feature_picker]) |
|
|
| run_btn.click( |
| run_clustering, |
| [dataset_picker, feature_picker, n_clusters, seed], |
| [plot, metrics, cluster_table], |
| ) |
|
|
| demo.load( |
| refresh_explorer, |
| [dataset_dropdown, split_dropdown], |
| [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], |
| ) |
| demo.load(sync_feature_picker, [dataset_picker], [feature_picker]) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch(theme=gr.themes.Soft(), css=CUSTOM_CSS) |
|
|