Spaces:
Running
Running
| import ast | |
| import logging | |
| import re | |
| from typing import Dict, List, Optional, Tuple | |
| import gradio as gr | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import pandas as pd | |
| from datasets import load_dataset | |
| from sklearn.ensemble import HistGradientBoostingClassifier | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.inspection import permutation_importance | |
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.pipeline import Pipeline | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| APP_TITLE = "Noise Detection" | |
| APP_SUBTITLE = ( | |
| "Classify quantum circuits into clean, depolarizing, amplitude_damping, or hardware-aware noise conditions." | |
| ) | |
| REPO_CONFIG = { | |
| "clean": { | |
| "label": "clean", | |
| "repo": "QSBench/QSBench-Core-v1.0.0-demo", | |
| }, | |
| "depolarizing": { | |
| "label": "depolarizing", | |
| "repo": "QSBench/QSBench-Depolarizing-Demo-v1.0.0", | |
| }, | |
| "amplitude_damping": { | |
| "label": "amplitude_damping", | |
| "repo": "QSBench/QSBench-Amplitude-v1.0.0-demo", | |
| }, | |
| "hardware_aware": { | |
| "label": "hardware_aware", | |
| "repo": "QSBench/QSBench-Transpilation-v1.0.0-demo", | |
| }, | |
| } | |
| CLASS_ORDER = ["clean", "depolarizing", "amplitude_damping", "hardware_aware"] | |
| NON_FEATURE_COLS = { | |
| "sample_id", | |
| "sample_seed", | |
| "circuit_hash", | |
| "split", | |
| "circuit_qasm", | |
| "qasm_raw", | |
| "qasm_transpiled", | |
| "circuit_type_resolved", | |
| "circuit_type_requested", | |
| "noise_type", | |
| "noise_prob", | |
| "observable_bases", | |
| "observable_mode", | |
| "backend_device", | |
| "precision_mode", | |
| "circuit_signature", | |
| "entanglement", | |
| "meyer_wallach", | |
| "cx_count", | |
| "noise_label", | |
| } | |
| SOFT_EXCLUDE_PATTERNS = ["ideal_", "noisy_", "error_", "sign_ideal_", "sign_noisy_"] | |
| _ASSET_CACHE: Dict[str, pd.DataFrame] = {} | |
| _COMBINED_CACHE: Optional[pd.DataFrame] = None | |
| def safe_parse(value): | |
| """Safely parse stringified Python literals.""" | |
| if isinstance(value, str): | |
| try: | |
| return ast.literal_eval(value) | |
| except Exception: | |
| return value | |
| return value | |
| def adjacency_features(adj_value) -> Dict[str, float]: | |
| """Derive graph statistics from an adjacency matrix.""" | |
| parsed = safe_parse(adj_value) | |
| if not isinstance(parsed, list) or len(parsed) == 0: | |
| return { | |
| "adj_edge_count": np.nan, | |
| "adj_density": np.nan, | |
| "adj_degree_mean": np.nan, | |
| "adj_degree_std": np.nan, | |
| } | |
| try: | |
| arr = np.array(parsed, dtype=float) | |
| n = arr.shape[0] | |
| edge_count = float(np.triu(arr, k=1).sum()) | |
| possible_edges = float(n * (n - 1) / 2) | |
| density = edge_count / possible_edges if possible_edges > 0 else np.nan | |
| degrees = arr.sum(axis=1) | |
| return { | |
| "adj_edge_count": edge_count, | |
| "adj_density": density, | |
| "adj_degree_mean": float(np.mean(degrees)), | |
| "adj_degree_std": float(np.std(degrees)), | |
| } | |
| except Exception: | |
| return { | |
| "adj_edge_count": np.nan, | |
| "adj_density": np.nan, | |
| "adj_degree_mean": np.nan, | |
| "adj_degree_std": np.nan, | |
| } | |
| def qasm_features(qasm_value) -> Dict[str, float]: | |
| """Extract lightweight text statistics from QASM.""" | |
| if not isinstance(qasm_value, str) or not qasm_value.strip(): | |
| return { | |
| "qasm_length": np.nan, | |
| "qasm_line_count": np.nan, | |
| "qasm_gate_keyword_count": np.nan, | |
| "qasm_measure_count": np.nan, | |
| "qasm_comment_count": np.nan, | |
| } | |
| text = qasm_value | |
| lines = [line for line in text.splitlines() if line.strip()] | |
| gate_keywords = re.findall( | |
| r"\b(cx|h|x|y|z|rx|ry|rz|u1|u2|u3|u|swap|cz|ccx|rxx|ryy|rzz)\b", | |
| text, | |
| flags=re.IGNORECASE, | |
| ) | |
| measure_count = len(re.findall(r"\bmeasure\b", text, flags=re.IGNORECASE)) | |
| comment_count = sum(1 for line in lines if line.strip().startswith("//")) | |
| return { | |
| "qasm_length": float(len(text)), | |
| "qasm_line_count": float(len(lines)), | |
| "qasm_gate_keyword_count": float(len(gate_keywords)), | |
| "qasm_measure_count": float(measure_count), | |
| "qasm_comment_count": float(comment_count), | |
| } | |
| def enrich_dataframe(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add derived numeric features for classification.""" | |
| df = df.copy() | |
| if "adjacency" in df.columns: | |
| adj_df = df["adjacency"].apply(adjacency_features).apply(pd.Series) | |
| df = pd.concat([df, adj_df], axis=1) | |
| qasm_source = "qasm_transpiled" if "qasm_transpiled" in df.columns else "qasm_raw" | |
| if qasm_source in df.columns: | |
| qasm_df = df[qasm_source].apply(qasm_features).apply(pd.Series) | |
| df = pd.concat([df, qasm_df], axis=1) | |
| return df | |
| def load_single_dataset(dataset_key: str) -> pd.DataFrame: | |
| """Load a dataset shard from Hugging Face and cache it in memory.""" | |
| if dataset_key not in _ASSET_CACHE: | |
| logger.info("Loading dataset: %s", dataset_key) | |
| ds = load_dataset(REPO_CONFIG[dataset_key]["repo"]) | |
| df = pd.DataFrame(ds["train"]) | |
| df = enrich_dataframe(df) | |
| df["noise_label"] = REPO_CONFIG[dataset_key]["label"] | |
| _ASSET_CACHE[dataset_key] = df | |
| return _ASSET_CACHE[dataset_key] | |
| def load_combined_dataset(dataset_keys: Optional[List[str]] = None) -> pd.DataFrame: | |
| """Load and merge selected noise-condition datasets.""" | |
| global _COMBINED_CACHE | |
| if dataset_keys is None: | |
| dataset_keys = list(REPO_CONFIG.keys()) | |
| cache_key = tuple(sorted(dataset_keys)) | |
| if _COMBINED_CACHE is None or not isinstance(_COMBINED_CACHE, pd.DataFrame) or getattr(_COMBINED_CACHE, "_cache_key", None) != cache_key: | |
| frames = [load_single_dataset(key) for key in dataset_keys] | |
| combined = pd.concat(frames, ignore_index=True) | |
| combined = combined[combined["noise_label"].isin(CLASS_ORDER)].copy() | |
| combined._cache_key = cache_key # type: ignore[attr-defined] | |
| _COMBINED_CACHE = combined | |
| return _COMBINED_CACHE | |
| def load_guide_content() -> str: | |
| """Load the markdown guide if it exists.""" | |
| try: | |
| with open("GUIDE.md", "r", encoding="utf-8") as f: | |
| return f.read() | |
| except FileNotFoundError: | |
| return "# Guide\n\nGuide file not found." | |
| def get_available_feature_columns(df: pd.DataFrame) -> List[str]: | |
| """Return numeric feature columns excluding metadata and target columns.""" | |
| numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() | |
| features = [] | |
| for col in numeric_cols: | |
| if col in NON_FEATURE_COLS: | |
| continue | |
| if any(pattern in col for pattern in SOFT_EXCLUDE_PATTERNS): | |
| continue | |
| features.append(col) | |
| return sorted(features) | |
| def default_feature_selection(features: List[str]) -> List[str]: | |
| """Select a stable default feature subset.""" | |
| preferred = [ | |
| "gate_entropy", | |
| "adj_density", | |
| "adj_degree_mean", | |
| "adj_degree_std", | |
| "depth", | |
| "total_gates", | |
| "single_qubit_gates", | |
| "two_qubit_gates", | |
| "cx_count", | |
| "qasm_length", | |
| "qasm_line_count", | |
| "qasm_gate_keyword_count", | |
| ] | |
| selected = [feature for feature in preferred if feature in features] | |
| return selected[:8] if selected else features[:8] | |
| def make_classification_figure( | |
| y_true: np.ndarray, | |
| y_pred: np.ndarray, | |
| class_names: List[str], | |
| feature_names: Optional[List[str]] = None, | |
| importances: Optional[np.ndarray] = None, | |
| ) -> plt.Figure: | |
| """Create a compact classification summary figure.""" | |
| fig = plt.figure(figsize=(20, 6)) | |
| gs = fig.add_gridspec(1, 3) | |
| ax1 = fig.add_subplot(gs[0, 0]) | |
| ax2 = fig.add_subplot(gs[0, 1]) | |
| ax3 = fig.add_subplot(gs[0, 2]) | |
| cm = confusion_matrix(y_true, y_pred, labels=class_names) | |
| image = ax1.imshow(cm, interpolation="nearest") | |
| ax1.set_title("Confusion Matrix") | |
| ax1.set_xlabel("Predicted") | |
| ax1.set_ylabel("Actual") | |
| ax1.set_xticks(np.arange(len(class_names))) | |
| ax1.set_yticks(np.arange(len(class_names))) | |
| ax1.set_xticklabels(class_names, rotation=45, ha="right") | |
| ax1.set_yticklabels(class_names) | |
| for i in range(cm.shape[0]): | |
| for j in range(cm.shape[1]): | |
| ax1.text(j, i, cm[i, j], ha="center", va="center") | |
| fig.colorbar(image, ax=ax1, fraction=0.046, pad=0.04) | |
| incorrect = (y_true != y_pred).astype(int) | |
| ax2.hist(incorrect, bins=[-0.5, 0.5, 1.5]) | |
| ax2.set_title("Correct vs Incorrect") | |
| ax2.set_xlabel("0 = Correct, 1 = Incorrect") | |
| ax2.set_ylabel("Count") | |
| if importances is not None and feature_names is not None and len(importances) == len(feature_names): | |
| idx = np.argsort(importances)[-10:] | |
| ax3.barh([feature_names[i] for i in idx], importances[idx]) | |
| ax3.set_title("Top-10 Feature Importances") | |
| ax3.set_xlabel("Importance") | |
| else: | |
| ax3.text(0.5, 0.5, "Feature importances are unavailable.", ha="center", va="center") | |
| ax3.set_axis_off() | |
| fig.tight_layout() | |
| return fig | |
| def build_dataset_profile(df: pd.DataFrame) -> str: | |
| """Build a short dataset summary for the explorer tab.""" | |
| return ( | |
| f"### Dataset profile\n\n" | |
| f"**Rows:** {len(df):,} \n" | |
| f"**Columns:** {len(df.columns):,} \n" | |
| f"**Source label:** `{df['noise_label'].iloc[0] if 'noise_label' in df.columns and not df.empty else 'n/a'}`" | |
| ) | |
| def refresh_explorer(dataset_key: str, split_name: str) -> Tuple[gr.update, pd.DataFrame, str, str, str, str]: | |
| """Refresh the explorer view for the selected source dataset.""" | |
| df = load_single_dataset(dataset_key) | |
| splits = df["split"].dropna().unique().tolist() if "split" in df.columns else ["train"] | |
| if not splits: | |
| splits = ["train"] | |
| if split_name not in splits: | |
| split_name = splits[0] | |
| filtered = df[df["split"] == split_name] if "split" in df.columns else df | |
| display_df = filtered.head(12).copy() | |
| raw_qasm = display_df["qasm_raw"].iloc[0] if "qasm_raw" in display_df.columns and not display_df.empty else "// N/A" | |
| transpiled_qasm = display_df["qasm_transpiled"].iloc[0] if "qasm_transpiled" in display_df.columns and not display_df.empty else "// N/A" | |
| profile_box = build_dataset_profile(df) | |
| summary_box = ( | |
| f"### Split summary\n\n" | |
| f"**Dataset:** `{dataset_key}` \n" | |
| f"**Label:** `{REPO_CONFIG[dataset_key]['label']}` \n" | |
| f"**Available splits:** {', '.join(splits)} \n" | |
| f"**Preview rows:** {len(display_df)}" | |
| ) | |
| return ( | |
| gr.update(choices=splits, value=split_name), | |
| display_df, | |
| raw_qasm, | |
| transpiled_qasm, | |
| profile_box, | |
| summary_box, | |
| ) | |
| def sync_feature_picker(dataset_keys: List[str]) -> gr.update: | |
| """Refresh the feature list from the selected datasets.""" | |
| if not dataset_keys: | |
| return gr.update(choices=[], value=[]) | |
| df = load_combined_dataset(dataset_keys) | |
| features = get_available_feature_columns(df) | |
| defaults = default_feature_selection(features) | |
| return gr.update(choices=features, value=defaults) | |
| def train_classifier( | |
| dataset_keys: List[str], | |
| feature_columns: List[str], | |
| test_size: float, | |
| n_estimators: int, | |
| max_depth: float, | |
| random_state: float, | |
| ) -> Tuple[Optional[plt.Figure], str]: | |
| """Train a four-class classifier and return metrics plus a plot.""" | |
| if not dataset_keys: | |
| return None, "### β Please select at least one dataset." | |
| if not feature_columns: | |
| return None, "### β Please select at least one feature." | |
| df = load_combined_dataset(dataset_keys).copy() | |
| required_cols = feature_columns + ["noise_label"] | |
| train_df = df.dropna(subset=required_cols).copy() | |
| train_df = train_df[train_df["noise_label"].isin(CLASS_ORDER)] | |
| if len(train_df) < 20: | |
| return None, "### β Not enough rows after filtering missing values." | |
| X = train_df[feature_columns] | |
| y = train_df["noise_label"] | |
| seed = int(random_state) | |
| depth = int(max_depth) if max_depth and int(max_depth) > 0 else None | |
| max_iter = int(n_estimators) | |
| try: | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, | |
| y, | |
| test_size=test_size, | |
| random_state=seed, | |
| stratify=y, | |
| ) | |
| except ValueError: | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X, | |
| y, | |
| test_size=test_size, | |
| random_state=seed, | |
| ) | |
| model = Pipeline( | |
| steps=[ | |
| ("imputer", SimpleImputer(strategy="median")), | |
| ( | |
| "classifier", | |
| HistGradientBoostingClassifier( | |
| max_iter=max_iter, | |
| max_depth=depth, | |
| random_state=seed, | |
| min_samples_leaf=1, | |
| class_weight="balanced", | |
| learning_rate=0.1, | |
| max_bins=255, | |
| ), | |
| ), | |
| ] | |
| ) | |
| model.fit(X_train, y_train) | |
| y_pred = model.predict(X_test) | |
| accuracy = float(accuracy_score(y_test, y_pred)) | |
| macro_f1 = float(f1_score(y_test, y_pred, average="macro", zero_division=0)) | |
| weighted_f1 = float(f1_score(y_test, y_pred, average="weighted", zero_division=0)) | |
| perm = permutation_importance( | |
| model, | |
| X_test, | |
| y_test, | |
| n_repeats=8, | |
| random_state=seed, | |
| scoring="f1_macro", | |
| n_jobs=-1, | |
| ) | |
| importances = perm.importances_mean | |
| fig = make_classification_figure(y_test.to_numpy(), y_pred, CLASS_ORDER, feature_columns, importances) | |
| report = classification_report( | |
| y_test, | |
| y_pred, | |
| labels=CLASS_ORDER, | |
| zero_division=0, | |
| ) | |
| results = ( | |
| "### Classification results\n\n" | |
| f"**Rows used:** {len(train_df):,} \n" | |
| f"**Datasets used:** {', '.join(dataset_keys)} \n" | |
| f"**Test size:** {test_size:.0%} \n" | |
| f"**Accuracy:** {accuracy:.4f} \n" | |
| f"**Macro F1:** {macro_f1:.4f} \n" | |
| f"**Weighted F1:** {weighted_f1:.4f}\n\n" | |
| "```text\n" | |
| f"{report}" | |
| "```" | |
| ) | |
| return fig, results | |
| CUSTOM_CSS = """ | |
| .gradio-container { | |
| max-width: 1400px !important; | |
| } | |
| footer { | |
| margin-top: 1rem; | |
| } | |
| """ | |
| with gr.Blocks(title=APP_TITLE) as demo: | |
| gr.Markdown(f"# π {APP_TITLE}") | |
| gr.Markdown(APP_SUBTITLE) | |
| with gr.Tabs(): | |
| with gr.TabItem("π Explorer"): | |
| dataset_dropdown = gr.Dropdown( | |
| list(REPO_CONFIG.keys()), | |
| value="clean", | |
| label="Dataset", | |
| ) | |
| split_dropdown = gr.Dropdown( | |
| ["train"], | |
| value="train", | |
| label="Split", | |
| ) | |
| profile_box = gr.Markdown(value="### Loading dataset...") | |
| summary_box = gr.Markdown(value="### Loading split summary...") | |
| explorer_df = gr.Dataframe(label="Preview", interactive=False) | |
| with gr.Row(): | |
| raw_qasm = gr.Code(label="Raw QASM", language=None) | |
| transpiled_qasm = gr.Code(label="Transpiled QASM", language=None) | |
| with gr.TabItem("π§ Classification"): | |
| class_dataset_picker = gr.CheckboxGroup( | |
| label="Datasets", | |
| choices=list(REPO_CONFIG.keys()), | |
| value=list(REPO_CONFIG.keys()), | |
| ) | |
| feature_picker = gr.CheckboxGroup(label="Input features", choices=[]) | |
| test_size = gr.Slider(0.1, 0.4, value=0.2, step=0.05, label="Test split") | |
| n_estimators = gr.Slider(50, 400, value=200, step=10, label="Trees") | |
| max_depth = gr.Slider(1, 30, value=12, step=1, label="Max depth") | |
| seed = gr.Number(value=42, precision=0, label="Random seed") | |
| run_btn = gr.Button("Train & Evaluate", variant="primary") | |
| plot = gr.Plot() | |
| metrics = gr.Markdown() | |
| with gr.TabItem("π Guide"): | |
| gr.Markdown(load_guide_content()) | |
| gr.Markdown("---") | |
| gr.Markdown( | |
| "### π Links\n" | |
| "[Website](https://qsbench.github.io) | " | |
| "[Hugging Face](https://huggingface.co/QSBench) | " | |
| "[GitHub](https://github.com/QSBench)" | |
| ) | |
| dataset_dropdown.change( | |
| refresh_explorer, | |
| [dataset_dropdown, split_dropdown], | |
| [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], | |
| ) | |
| split_dropdown.change( | |
| refresh_explorer, | |
| [dataset_dropdown, split_dropdown], | |
| [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], | |
| ) | |
| class_dataset_picker.change(sync_feature_picker, [class_dataset_picker], [feature_picker]) | |
| run_btn.click( | |
| train_classifier, | |
| [class_dataset_picker, feature_picker, test_size, n_estimators, max_depth, seed], | |
| [plot, metrics], | |
| ) | |
| demo.load( | |
| refresh_explorer, | |
| [dataset_dropdown, split_dropdown], | |
| [split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box], | |
| ) | |
| demo.load(sync_feature_picker, [class_dataset_picker], [feature_picker]) | |
| if __name__ == "__main__": | |
| demo.launch(theme=gr.themes.Soft(), css=CUSTOM_CSS) | |