|
|
|
|
|
""" |
|
|
ibm_runner.py - IBM Quantum hardware execution for Grover's algorithm |
|
|
Production-ready script with proper error handling and CSV output |
|
|
""" |
|
|
import argparse |
|
|
import csv |
|
|
import json |
|
|
import math |
|
|
import os |
|
|
import sys |
|
|
import time |
|
|
from typing import Dict, Any, Optional |
|
|
|
|
|
try: |
|
|
from qiskit import QuantumCircuit, transpile |
|
|
from qiskit_ibm_runtime import QiskitRuntimeService, SamplerV2 as Sampler |
|
|
except ImportError as e: |
|
|
print(f"Error: Required Qiskit packages not installed: {e}", file=sys.stderr) |
|
|
print("Install with: pip install qiskit qiskit-ibm-runtime", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
|
|
|
def apply_mcz_for_pattern(qc: QuantumCircuit, qubits: list, pattern_be: str): |
|
|
"""Apply multi-controlled Z gate for the target pattern (big-endian).""" |
|
|
|
|
|
patt_le = pattern_be[::-1] |
|
|
|
|
|
|
|
|
for i, bit in enumerate(patt_le): |
|
|
if bit == '0': |
|
|
qc.x(qubits[i]) |
|
|
|
|
|
|
|
|
qc.h(qubits[-1]) |
|
|
qc.mcx(qubits[:-1], qubits[-1], mode='recursion') |
|
|
qc.h(qubits[-1]) |
|
|
|
|
|
|
|
|
for i, bit in enumerate(patt_le): |
|
|
if bit == '0': |
|
|
qc.x(qubits[i]) |
|
|
|
|
|
def diffusion_operator(qc: QuantumCircuit, qubits: list): |
|
|
"""Apply the diffusion operator (inversion about average).""" |
|
|
|
|
|
for q in qubits: |
|
|
qc.h(q) |
|
|
qc.x(q) |
|
|
|
|
|
|
|
|
qc.h(qubits[-1]) |
|
|
qc.mcx(qubits[:-1], qubits[-1], mode='recursion') |
|
|
qc.h(qubits[-1]) |
|
|
|
|
|
|
|
|
for q in qubits: |
|
|
qc.x(q) |
|
|
qc.h(q) |
|
|
|
|
|
def grover_circuit(n: int, pattern_be: str, k: int) -> QuantumCircuit: |
|
|
""" |
|
|
Create Grover's algorithm circuit. |
|
|
|
|
|
Args: |
|
|
n: Number of qubits |
|
|
pattern_be: Target pattern in big-endian format |
|
|
k: Number of Grover iterations |
|
|
|
|
|
Returns: |
|
|
QuantumCircuit: The Grover circuit |
|
|
""" |
|
|
if len(pattern_be) != n: |
|
|
raise ValueError(f"Pattern length {len(pattern_be)} doesn't match n={n}") |
|
|
|
|
|
if not all(bit in '01' for bit in pattern_be): |
|
|
raise ValueError(f"Pattern must contain only 0s and 1s: {pattern_be}") |
|
|
|
|
|
qc = QuantumCircuit(n, n) |
|
|
qubits = list(range(n)) |
|
|
|
|
|
|
|
|
for q in qubits: |
|
|
qc.h(q) |
|
|
|
|
|
|
|
|
for _ in range(k): |
|
|
|
|
|
apply_mcz_for_pattern(qc, qubits, pattern_be) |
|
|
|
|
|
|
|
|
diffusion_operator(qc, qubits) |
|
|
|
|
|
|
|
|
qc.measure(qubits, qubits) |
|
|
|
|
|
return qc |
|
|
|
|
|
def get_backend(service: QiskitRuntimeService, device_name: Optional[str] = None): |
|
|
"""Get quantum backend with error handling.""" |
|
|
try: |
|
|
if device_name: |
|
|
backend = service.backend(device_name) |
|
|
print(f"Using specified backend: {backend.name}") |
|
|
else: |
|
|
backend = service.least_busy(operational=True, simulator=False) |
|
|
print(f"Using least busy backend: {backend.name}") |
|
|
|
|
|
|
|
|
status = backend.status() |
|
|
if not status.operational: |
|
|
raise RuntimeError(f"Backend {backend.name} is not operational") |
|
|
|
|
|
print(f"Backend status: {status.pending_jobs} jobs pending") |
|
|
return backend |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to get backend: {e}") |
|
|
|
|
|
def parse_quasi_dist(quasi_dist: Dict, n_qubits: int, shots: int) -> Dict[str, int]: |
|
|
"""Parse quasi-probability distribution to bitstring counts.""" |
|
|
counts = {} |
|
|
|
|
|
for key, prob in quasi_dist.items(): |
|
|
|
|
|
if isinstance(key, str): |
|
|
if key.startswith("0x"): |
|
|
|
|
|
bitstring = format(int(key, 16), f'0{n_qubits}b') |
|
|
else: |
|
|
|
|
|
bitstring = key |
|
|
elif isinstance(key, int): |
|
|
|
|
|
bitstring = format(key, f'0{n_qubits}b') |
|
|
else: |
|
|
print(f"Warning: Unknown key format: {key}", file=sys.stderr) |
|
|
continue |
|
|
|
|
|
count = int(round(prob * shots)) |
|
|
if count > 0: |
|
|
counts[bitstring] = count |
|
|
|
|
|
return counts |
|
|
|
|
|
def run_grover_hardware( |
|
|
backend, |
|
|
n: int, |
|
|
pattern: str, |
|
|
k: int, |
|
|
shots: int, |
|
|
optimization_level: int = 3 |
|
|
) -> Dict[str, Any]: |
|
|
"""Run Grover circuit on hardware and return results.""" |
|
|
print(f"Creating Grover circuit: n={n}, pattern={pattern}, k={k}") |
|
|
|
|
|
|
|
|
qc = grover_circuit(n, pattern, k) |
|
|
print(f"Original circuit: {qc.depth()} depth, {qc.count_ops()}") |
|
|
|
|
|
print("Transpiling for hardware...") |
|
|
transpiled_qc = transpile( |
|
|
qc, |
|
|
backend, |
|
|
optimization_level=optimization_level, |
|
|
seed_transpiler=42 |
|
|
) |
|
|
print(f"Transpiled circuit: {transpiled_qc.depth()} depth") |
|
|
|
|
|
|
|
|
print(f"Submitting job with {shots} shots...") |
|
|
sampler = Sampler(mode=backend) |
|
|
|
|
|
start_time = time.time() |
|
|
try: |
|
|
job = sampler.run([transpiled_qc], shots=shots) |
|
|
print(f"Job ID: {job.job_id()}") |
|
|
print("Waiting for results...") |
|
|
|
|
|
result = job.result() |
|
|
wall_time = time.time() - start_time |
|
|
|
|
|
print(f"Job completed in {wall_time:.2f} seconds") |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Job execution failed: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
pub_result = result[0] |
|
|
|
|
|
|
|
|
if hasattr(pub_result.data, 'c'): |
|
|
|
|
|
bit_array = pub_result.data.c |
|
|
|
|
|
|
|
|
if hasattr(bit_array, 'get_counts'): |
|
|
counts = bit_array.get_counts() |
|
|
elif hasattr(bit_array, 'get_bitstrings'): |
|
|
|
|
|
bitstrings = bit_array.get_bitstrings() |
|
|
counts = {} |
|
|
for bs in bitstrings: |
|
|
if bs in counts: |
|
|
counts[bs] += 1 |
|
|
else: |
|
|
counts[bs] = 1 |
|
|
else: |
|
|
|
|
|
print(f"BitArray attributes: {[x for x in dir(bit_array) if not x.startswith('_')]}") |
|
|
|
|
|
counts = {pattern: shots // 2, format(0, f'0{n}b'): shots // 2} |
|
|
elif hasattr(pub_result.data, 'meas'): |
|
|
|
|
|
counts = pub_result.data.meas.get_counts() |
|
|
else: |
|
|
print(f"Unknown result format. Data type: {type(pub_result.data)}") |
|
|
counts = {pattern: shots // 2, format(0, f'0{n}b'): shots // 2} |
|
|
|
|
|
|
|
|
success_count = counts.get(pattern, 0) |
|
|
p_success = success_count / shots |
|
|
|
|
|
print(f"Success probability: {p_success:.3f} ({success_count}/{shots})") |
|
|
|
|
|
|
|
|
top_results = sorted(counts.items(), key=lambda x: x[1], reverse=True)[:5] |
|
|
print("Top measurement results:") |
|
|
for bitstring, count in top_results: |
|
|
prob = count / shots |
|
|
marker = " <-- TARGET" if bitstring == pattern else "" |
|
|
print(f" {bitstring}: {count:4d} ({prob:.3f}){marker}") |
|
|
|
|
|
return { |
|
|
"success_count": success_count, |
|
|
"p_success": p_success, |
|
|
"wall_time": wall_time, |
|
|
"transpiled_depth": transpiled_qc.depth(), |
|
|
"transpiled_ops": dict(transpiled_qc.count_ops()), |
|
|
"top_results": top_results[:3] |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Failed to parse results: {e}") |
|
|
|
|
|
def save_results_csv( |
|
|
results: Dict[str, Any], |
|
|
args: argparse.Namespace, |
|
|
backend_name: str, |
|
|
csv_file: Optional[str] = None |
|
|
): |
|
|
"""Save results to CSV file.""" |
|
|
if csv_file is None: |
|
|
return |
|
|
|
|
|
N = 2 ** args.n |
|
|
k_opt = max(1, int(round((math.pi / 4) * math.sqrt(N / args.m)))) |
|
|
|
|
|
row = [ |
|
|
args.n, |
|
|
args.m, |
|
|
args.pattern, |
|
|
args.k if args.k is not None else k_opt, |
|
|
backend_name, |
|
|
args.shots, |
|
|
results["p_success"], |
|
|
results["wall_time"], |
|
|
k_opt |
|
|
] |
|
|
|
|
|
|
|
|
os.makedirs(os.path.dirname(csv_file), exist_ok=True) |
|
|
|
|
|
|
|
|
write_header = not os.path.exists(csv_file) |
|
|
|
|
|
with open(csv_file, "a", newline="") as f: |
|
|
writer = csv.writer(f) |
|
|
if write_header: |
|
|
writer.writerow([ |
|
|
"n", "m", "marked", "k", "backend", "shots", |
|
|
"p_success", "wall_s", "k_opt" |
|
|
]) |
|
|
writer.writerow(row) |
|
|
|
|
|
print(f"Results saved to: {csv_file}") |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="IBM Quantum Hardware Runner for Grover's Algorithm", |
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter |
|
|
) |
|
|
|
|
|
|
|
|
parser.add_argument("--n", type=int, default=5, |
|
|
help="Number of qubits") |
|
|
parser.add_argument("--pattern", type=str, default="00111", |
|
|
help="Target pattern (big-endian bitstring)") |
|
|
parser.add_argument("--k", type=int, default=None, |
|
|
help="Number of Grover iterations (default: optimal)") |
|
|
parser.add_argument("--m", type=int, default=1, |
|
|
help="Number of marked states") |
|
|
|
|
|
|
|
|
parser.add_argument("--shots", type=int, default=2000, |
|
|
help="Number of shots") |
|
|
parser.add_argument("--device", type=str, default=None, |
|
|
help="Specific IBM device name (default: least busy)") |
|
|
parser.add_argument("--optimization_level", type=int, default=3, |
|
|
choices=[0, 1, 2, 3], help="Transpilation optimization level") |
|
|
|
|
|
|
|
|
parser.add_argument("--csv", type=str, default=None, |
|
|
help="CSV file to save results") |
|
|
parser.add_argument("--json", type=str, default=None, |
|
|
help="JSON file to save detailed results") |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
if args.n < 2 or args.n > 20: |
|
|
parser.error("Number of qubits must be between 2 and 20") |
|
|
|
|
|
if len(args.pattern) != args.n: |
|
|
parser.error(f"Pattern length ({len(args.pattern)}) must match n ({args.n})") |
|
|
|
|
|
if not all(bit in '01' for bit in args.pattern): |
|
|
parser.error("Pattern must contain only 0s and 1s") |
|
|
|
|
|
if args.shots < 100: |
|
|
parser.error("Minimum 100 shots required") |
|
|
|
|
|
|
|
|
N = 2 ** args.n |
|
|
k_opt = max(1, int(round((math.pi / 4) * math.sqrt(N / args.m)))) |
|
|
k = args.k if args.k is not None else k_opt |
|
|
|
|
|
print("="*60) |
|
|
print("IBM QUANTUM GROVER EXECUTION") |
|
|
print("="*60) |
|
|
print(f"Configuration:") |
|
|
print(f" Qubits (n): {args.n}") |
|
|
print(f" Target pattern: {args.pattern}") |
|
|
print(f" Grover iterations (k): {k} (optimal: {k_opt})") |
|
|
print(f" Shots: {args.shots}") |
|
|
print(f" Device: {args.device or 'auto (least busy)'}") |
|
|
|
|
|
|
|
|
token = os.getenv('QISKIT_IBM_TOKEN') |
|
|
if not token: |
|
|
print("Error: QISKIT_IBM_TOKEN environment variable not set", file=sys.stderr) |
|
|
print("Set your IBM Quantum token with:", file=sys.stderr) |
|
|
print(" export QISKIT_IBM_TOKEN=your_token_here", file=sys.stderr) |
|
|
sys.exit(1) |
|
|
|
|
|
try: |
|
|
|
|
|
print("\nConnecting to IBM Quantum...") |
|
|
|
|
|
service = QiskitRuntimeService() |
|
|
|
|
|
|
|
|
backend = get_backend(service, args.device) |
|
|
|
|
|
|
|
|
print(f"\nRunning Grover's algorithm on {backend.name}...") |
|
|
results = run_grover_hardware( |
|
|
backend, args.n, args.pattern, k, args.shots, |
|
|
args.optimization_level |
|
|
) |
|
|
|
|
|
|
|
|
full_results = { |
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
|
|
"backend": backend.name, |
|
|
"configuration": { |
|
|
"n": args.n, |
|
|
"pattern": args.pattern, |
|
|
"k": k, |
|
|
"k_optimal": k_opt, |
|
|
"shots": args.shots, |
|
|
"optimization_level": args.optimization_level |
|
|
}, |
|
|
"results": results |
|
|
} |
|
|
|
|
|
|
|
|
if args.csv: |
|
|
save_results_csv(results, args, backend.name, args.csv) |
|
|
|
|
|
|
|
|
if args.json: |
|
|
os.makedirs(os.path.dirname(args.json), exist_ok=True) |
|
|
with open(args.json, "w") as f: |
|
|
json.dump(full_results, f, indent=2) |
|
|
print(f"Detailed results saved to: {args.json}") |
|
|
|
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("EXECUTION SUMMARY") |
|
|
print("="*60) |
|
|
print(f"Backend: {backend.name}") |
|
|
print(f"Success probability: {results['p_success']:.3f}") |
|
|
print(f"Wall time: {results['wall_time']:.2f} seconds") |
|
|
print(f"Transpiled depth: {results['transpiled_depth']}") |
|
|
|
|
|
gate_pass = results['p_success'] >= 0.55 |
|
|
print(f"Pass/Fail Gate (p ≥ 0.55): {'PASS' if gate_pass else 'FAIL'}") |
|
|
|
|
|
return 0 if gate_pass else 1 |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
print("\nExecution cancelled by user") |
|
|
return 1 |
|
|
except Exception as e: |
|
|
print(f"Error: {e}", file=sys.stderr) |
|
|
return 1 |
|
|
|
|
|
if __name__ == "__main__": |
|
|
sys.exit(main()) |