Noise_Detection / app.py
QSBench's picture
Update app.py
e4692be verified
import ast
import logging
import re
from typing import Dict, List, Optional, Tuple
import gradio as gr
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from datasets import load_dataset
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.impute import SimpleImputer
from sklearn.inspection import permutation_importance
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
APP_TITLE = "Noise Detection"
APP_SUBTITLE = (
"Classify quantum circuits into clean, depolarizing, amplitude_damping, or hardware-aware noise conditions."
)
REPO_CONFIG = {
"clean": {
"label": "clean",
"repo": "QSBench/QSBench-Core-v1.0.0-demo",
},
"depolarizing": {
"label": "depolarizing",
"repo": "QSBench/QSBench-Depolarizing-Demo-v1.0.0",
},
"amplitude_damping": {
"label": "amplitude_damping",
"repo": "QSBench/QSBench-Amplitude-v1.0.0-demo",
},
"hardware_aware": {
"label": "hardware_aware",
"repo": "QSBench/QSBench-Transpilation-v1.0.0-demo",
},
}
CLASS_ORDER = ["clean", "depolarizing", "amplitude_damping", "hardware_aware"]
NON_FEATURE_COLS = {
"sample_id",
"sample_seed",
"circuit_hash",
"split",
"circuit_qasm",
"qasm_raw",
"qasm_transpiled",
"circuit_type_resolved",
"circuit_type_requested",
"noise_type",
"noise_prob",
"observable_bases",
"observable_mode",
"backend_device",
"precision_mode",
"circuit_signature",
"entanglement",
"meyer_wallach",
"cx_count",
"noise_label",
}
SOFT_EXCLUDE_PATTERNS = ["ideal_", "noisy_", "error_", "sign_ideal_", "sign_noisy_"]
_ASSET_CACHE: Dict[str, pd.DataFrame] = {}
_COMBINED_CACHE: Optional[pd.DataFrame] = None
def safe_parse(value):
"""Safely parse stringified Python literals."""
if isinstance(value, str):
try:
return ast.literal_eval(value)
except Exception:
return value
return value
def adjacency_features(adj_value) -> Dict[str, float]:
"""Derive graph statistics from an adjacency matrix."""
parsed = safe_parse(adj_value)
if not isinstance(parsed, list) or len(parsed) == 0:
return {
"adj_edge_count": np.nan,
"adj_density": np.nan,
"adj_degree_mean": np.nan,
"adj_degree_std": np.nan,
}
try:
arr = np.array(parsed, dtype=float)
n = arr.shape[0]
edge_count = float(np.triu(arr, k=1).sum())
possible_edges = float(n * (n - 1) / 2)
density = edge_count / possible_edges if possible_edges > 0 else np.nan
degrees = arr.sum(axis=1)
return {
"adj_edge_count": edge_count,
"adj_density": density,
"adj_degree_mean": float(np.mean(degrees)),
"adj_degree_std": float(np.std(degrees)),
}
except Exception:
return {
"adj_edge_count": np.nan,
"adj_density": np.nan,
"adj_degree_mean": np.nan,
"adj_degree_std": np.nan,
}
def qasm_features(qasm_value) -> Dict[str, float]:
"""Extract lightweight text statistics from QASM."""
if not isinstance(qasm_value, str) or not qasm_value.strip():
return {
"qasm_length": np.nan,
"qasm_line_count": np.nan,
"qasm_gate_keyword_count": np.nan,
"qasm_measure_count": np.nan,
"qasm_comment_count": np.nan,
}
text = qasm_value
lines = [line for line in text.splitlines() if line.strip()]
gate_keywords = re.findall(
r"\b(cx|h|x|y|z|rx|ry|rz|u1|u2|u3|u|swap|cz|ccx|rxx|ryy|rzz)\b",
text,
flags=re.IGNORECASE,
)
measure_count = len(re.findall(r"\bmeasure\b", text, flags=re.IGNORECASE))
comment_count = sum(1 for line in lines if line.strip().startswith("//"))
return {
"qasm_length": float(len(text)),
"qasm_line_count": float(len(lines)),
"qasm_gate_keyword_count": float(len(gate_keywords)),
"qasm_measure_count": float(measure_count),
"qasm_comment_count": float(comment_count),
}
def enrich_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Add derived numeric features for classification."""
df = df.copy()
if "adjacency" in df.columns:
adj_df = df["adjacency"].apply(adjacency_features).apply(pd.Series)
df = pd.concat([df, adj_df], axis=1)
qasm_source = "qasm_transpiled" if "qasm_transpiled" in df.columns else "qasm_raw"
if qasm_source in df.columns:
qasm_df = df[qasm_source].apply(qasm_features).apply(pd.Series)
df = pd.concat([df, qasm_df], axis=1)
return df
def load_single_dataset(dataset_key: str) -> pd.DataFrame:
"""Load a dataset shard from Hugging Face and cache it in memory."""
if dataset_key not in _ASSET_CACHE:
logger.info("Loading dataset: %s", dataset_key)
ds = load_dataset(REPO_CONFIG[dataset_key]["repo"])
df = pd.DataFrame(ds["train"])
df = enrich_dataframe(df)
df["noise_label"] = REPO_CONFIG[dataset_key]["label"]
_ASSET_CACHE[dataset_key] = df
return _ASSET_CACHE[dataset_key]
def load_combined_dataset(dataset_keys: Optional[List[str]] = None) -> pd.DataFrame:
"""Load and merge selected noise-condition datasets."""
global _COMBINED_CACHE
if dataset_keys is None:
dataset_keys = list(REPO_CONFIG.keys())
cache_key = tuple(sorted(dataset_keys))
if _COMBINED_CACHE is None or not isinstance(_COMBINED_CACHE, pd.DataFrame) or getattr(_COMBINED_CACHE, "_cache_key", None) != cache_key:
frames = [load_single_dataset(key) for key in dataset_keys]
combined = pd.concat(frames, ignore_index=True)
combined = combined[combined["noise_label"].isin(CLASS_ORDER)].copy()
combined._cache_key = cache_key # type: ignore[attr-defined]
_COMBINED_CACHE = combined
return _COMBINED_CACHE
def load_guide_content() -> str:
"""Load the markdown guide if it exists."""
try:
with open("GUIDE.md", "r", encoding="utf-8") as f:
return f.read()
except FileNotFoundError:
return "# Guide\n\nGuide file not found."
def get_available_feature_columns(df: pd.DataFrame) -> List[str]:
"""Return numeric feature columns excluding metadata and target columns."""
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
features = []
for col in numeric_cols:
if col in NON_FEATURE_COLS:
continue
if any(pattern in col for pattern in SOFT_EXCLUDE_PATTERNS):
continue
features.append(col)
return sorted(features)
def default_feature_selection(features: List[str]) -> List[str]:
"""Select a stable default feature subset."""
preferred = [
"gate_entropy",
"adj_density",
"adj_degree_mean",
"adj_degree_std",
"depth",
"total_gates",
"single_qubit_gates",
"two_qubit_gates",
"cx_count",
"qasm_length",
"qasm_line_count",
"qasm_gate_keyword_count",
]
selected = [feature for feature in preferred if feature in features]
return selected[:8] if selected else features[:8]
def make_classification_figure(
y_true: np.ndarray,
y_pred: np.ndarray,
class_names: List[str],
feature_names: Optional[List[str]] = None,
importances: Optional[np.ndarray] = None,
) -> plt.Figure:
"""Create a compact classification summary figure."""
fig = plt.figure(figsize=(20, 6))
gs = fig.add_gridspec(1, 3)
ax1 = fig.add_subplot(gs[0, 0])
ax2 = fig.add_subplot(gs[0, 1])
ax3 = fig.add_subplot(gs[0, 2])
cm = confusion_matrix(y_true, y_pred, labels=class_names)
image = ax1.imshow(cm, interpolation="nearest")
ax1.set_title("Confusion Matrix")
ax1.set_xlabel("Predicted")
ax1.set_ylabel("Actual")
ax1.set_xticks(np.arange(len(class_names)))
ax1.set_yticks(np.arange(len(class_names)))
ax1.set_xticklabels(class_names, rotation=45, ha="right")
ax1.set_yticklabels(class_names)
for i in range(cm.shape[0]):
for j in range(cm.shape[1]):
ax1.text(j, i, cm[i, j], ha="center", va="center")
fig.colorbar(image, ax=ax1, fraction=0.046, pad=0.04)
incorrect = (y_true != y_pred).astype(int)
ax2.hist(incorrect, bins=[-0.5, 0.5, 1.5])
ax2.set_title("Correct vs Incorrect")
ax2.set_xlabel("0 = Correct, 1 = Incorrect")
ax2.set_ylabel("Count")
if importances is not None and feature_names is not None and len(importances) == len(feature_names):
idx = np.argsort(importances)[-10:]
ax3.barh([feature_names[i] for i in idx], importances[idx])
ax3.set_title("Top-10 Feature Importances")
ax3.set_xlabel("Importance")
else:
ax3.text(0.5, 0.5, "Feature importances are unavailable.", ha="center", va="center")
ax3.set_axis_off()
fig.tight_layout()
return fig
def build_dataset_profile(df: pd.DataFrame) -> str:
"""Build a short dataset summary for the explorer tab."""
return (
f"### Dataset profile\n\n"
f"**Rows:** {len(df):,} \n"
f"**Columns:** {len(df.columns):,} \n"
f"**Source label:** `{df['noise_label'].iloc[0] if 'noise_label' in df.columns and not df.empty else 'n/a'}`"
)
def refresh_explorer(dataset_key: str, split_name: str) -> Tuple[gr.update, pd.DataFrame, str, str, str, str]:
"""Refresh the explorer view for the selected source dataset."""
df = load_single_dataset(dataset_key)
splits = df["split"].dropna().unique().tolist() if "split" in df.columns else ["train"]
if not splits:
splits = ["train"]
if split_name not in splits:
split_name = splits[0]
filtered = df[df["split"] == split_name] if "split" in df.columns else df
display_df = filtered.head(12).copy()
raw_qasm = display_df["qasm_raw"].iloc[0] if "qasm_raw" in display_df.columns and not display_df.empty else "// N/A"
transpiled_qasm = display_df["qasm_transpiled"].iloc[0] if "qasm_transpiled" in display_df.columns and not display_df.empty else "// N/A"
profile_box = build_dataset_profile(df)
summary_box = (
f"### Split summary\n\n"
f"**Dataset:** `{dataset_key}` \n"
f"**Label:** `{REPO_CONFIG[dataset_key]['label']}` \n"
f"**Available splits:** {', '.join(splits)} \n"
f"**Preview rows:** {len(display_df)}"
)
return (
gr.update(choices=splits, value=split_name),
display_df,
raw_qasm,
transpiled_qasm,
profile_box,
summary_box,
)
def sync_feature_picker(dataset_keys: List[str]) -> gr.update:
"""Refresh the feature list from the selected datasets."""
if not dataset_keys:
return gr.update(choices=[], value=[])
df = load_combined_dataset(dataset_keys)
features = get_available_feature_columns(df)
defaults = default_feature_selection(features)
return gr.update(choices=features, value=defaults)
def train_classifier(
dataset_keys: List[str],
feature_columns: List[str],
test_size: float,
n_estimators: int,
max_depth: float,
random_state: float,
) -> Tuple[Optional[plt.Figure], str]:
"""Train a four-class classifier and return metrics plus a plot."""
if not dataset_keys:
return None, "### ❌ Please select at least one dataset."
if not feature_columns:
return None, "### ❌ Please select at least one feature."
df = load_combined_dataset(dataset_keys).copy()
required_cols = feature_columns + ["noise_label"]
train_df = df.dropna(subset=required_cols).copy()
train_df = train_df[train_df["noise_label"].isin(CLASS_ORDER)]
if len(train_df) < 20:
return None, "### ❌ Not enough rows after filtering missing values."
X = train_df[feature_columns]
y = train_df["noise_label"]
seed = int(random_state)
depth = int(max_depth) if max_depth and int(max_depth) > 0 else None
max_iter = int(n_estimators)
try:
X_train, X_test, y_train, y_test = train_test_split(
X,
y,
test_size=test_size,
random_state=seed,
stratify=y,
)
except ValueError:
X_train, X_test, y_train, y_test = train_test_split(
X,
y,
test_size=test_size,
random_state=seed,
)
model = Pipeline(
steps=[
("imputer", SimpleImputer(strategy="median")),
(
"classifier",
HistGradientBoostingClassifier(
max_iter=max_iter,
max_depth=depth,
random_state=seed,
min_samples_leaf=1,
class_weight="balanced",
learning_rate=0.1,
max_bins=255,
),
),
]
)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = float(accuracy_score(y_test, y_pred))
macro_f1 = float(f1_score(y_test, y_pred, average="macro", zero_division=0))
weighted_f1 = float(f1_score(y_test, y_pred, average="weighted", zero_division=0))
perm = permutation_importance(
model,
X_test,
y_test,
n_repeats=8,
random_state=seed,
scoring="f1_macro",
n_jobs=-1,
)
importances = perm.importances_mean
fig = make_classification_figure(y_test.to_numpy(), y_pred, CLASS_ORDER, feature_columns, importances)
report = classification_report(
y_test,
y_pred,
labels=CLASS_ORDER,
zero_division=0,
)
results = (
"### Classification results\n\n"
f"**Rows used:** {len(train_df):,} \n"
f"**Datasets used:** {', '.join(dataset_keys)} \n"
f"**Test size:** {test_size:.0%} \n"
f"**Accuracy:** {accuracy:.4f} \n"
f"**Macro F1:** {macro_f1:.4f} \n"
f"**Weighted F1:** {weighted_f1:.4f}\n\n"
"```text\n"
f"{report}"
"```"
)
return fig, results
CUSTOM_CSS = """
.gradio-container {
max-width: 1400px !important;
}
footer {
margin-top: 1rem;
}
"""
with gr.Blocks(title=APP_TITLE) as demo:
gr.Markdown(f"# 🌌 {APP_TITLE}")
gr.Markdown(APP_SUBTITLE)
with gr.Tabs():
with gr.TabItem("πŸ”Ž Explorer"):
dataset_dropdown = gr.Dropdown(
list(REPO_CONFIG.keys()),
value="clean",
label="Dataset",
)
split_dropdown = gr.Dropdown(
["train"],
value="train",
label="Split",
)
profile_box = gr.Markdown(value="### Loading dataset...")
summary_box = gr.Markdown(value="### Loading split summary...")
explorer_df = gr.Dataframe(label="Preview", interactive=False)
with gr.Row():
raw_qasm = gr.Code(label="Raw QASM", language=None)
transpiled_qasm = gr.Code(label="Transpiled QASM", language=None)
with gr.TabItem("🧠 Classification"):
class_dataset_picker = gr.CheckboxGroup(
label="Datasets",
choices=list(REPO_CONFIG.keys()),
value=list(REPO_CONFIG.keys()),
)
feature_picker = gr.CheckboxGroup(label="Input features", choices=[])
test_size = gr.Slider(0.1, 0.4, value=0.2, step=0.05, label="Test split")
n_estimators = gr.Slider(50, 400, value=200, step=10, label="Trees")
max_depth = gr.Slider(1, 30, value=12, step=1, label="Max depth")
seed = gr.Number(value=42, precision=0, label="Random seed")
run_btn = gr.Button("Train & Evaluate", variant="primary")
plot = gr.Plot()
metrics = gr.Markdown()
with gr.TabItem("πŸ“– Guide"):
gr.Markdown(load_guide_content())
gr.Markdown("---")
gr.Markdown(
"### πŸ”— Links\n"
"[Website](https://qsbench.github.io) | "
"[Hugging Face](https://huggingface.co/QSBench) | "
"[GitHub](https://github.com/QSBench)"
)
dataset_dropdown.change(
refresh_explorer,
[dataset_dropdown, split_dropdown],
[split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box],
)
split_dropdown.change(
refresh_explorer,
[dataset_dropdown, split_dropdown],
[split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box],
)
class_dataset_picker.change(sync_feature_picker, [class_dataset_picker], [feature_picker])
run_btn.click(
train_classifier,
[class_dataset_picker, feature_picker, test_size, n_estimators, max_depth, seed],
[plot, metrics],
)
demo.load(
refresh_explorer,
[dataset_dropdown, split_dropdown],
[split_dropdown, explorer_df, raw_qasm, transpiled_qasm, profile_box, summary_box],
)
demo.load(sync_feature_picker, [class_dataset_picker], [feature_picker])
if __name__ == "__main__":
demo.launch(theme=gr.themes.Soft(), css=CUSTOM_CSS)