QPU-1-MCP / app.py
Reality123b's picture
Update app.py
cc4ea9e verified
import gradio as gr
import json
import requests
import os
from typing import Optional
import time
from PIL import Image
import re
import uuid
import base64
from io import BytesIO
import threading
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import asyncio
from functools import lru_cache
import hashlib
BASE_URL = "https://lap-quantum-qpu-1-api.hf.space"
HF_TOKEN = os.getenv("HF_TOKEN", "")
sessions = {}
session_lock = threading.Lock()
SESSION_TIMEOUT = timedelta(hours=1.5)
# ==================== CONCURRENT PROCESSING INFRASTRUCTURE ====================
# Connection pool with retry strategy
def create_session_pool():
"""Create a requests session with connection pooling and retry logic"""
session = requests.Session()
# Retry strategy: 3 retries with exponential backoff
retry_strategy = Retry(
total=3,
backoff_factor=0.1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "DELETE"]
)
# Adapter with connection pooling
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10000, # Number of connection pools
pool_maxsize=90000, # Max connections per pool
pool_block=False
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# Global session pool (thread-safe)
session_pool = create_session_pool()
session_pool_lock = threading.Lock()
# Thread pool for concurrent execution
# Scale based on expected workload - can handle thousands of concurrent requests
executor = ThreadPoolExecutor(
max_workers=5000, # Adjust based on your server capacity
thread_name_prefix="qpu_worker"
)
# Request cache (for identical requests within 60 seconds)
request_cache = {}
cache_lock = threading.Lock()
CACHE_TTL = 60 # seconds
def get_cache_key(endpoint: str, method: str, data: dict = None) -> str:
"""Generate cache key from request parameters"""
cache_data = {
'endpoint': endpoint,
'method': method,
'data': data
}
return hashlib.sha256(json.dumps(cache_data, sort_keys=True).encode()).hexdigest()
def get_cached_response(cache_key: str):
"""Retrieve cached response if still valid"""
with cache_lock:
if cache_key in request_cache:
cached_data, timestamp = request_cache[cache_key]
if time.time() - timestamp < CACHE_TTL:
return cached_data
else:
del request_cache[cache_key]
return None
def set_cached_response(cache_key: str, response_data):
"""Cache response with timestamp"""
with cache_lock:
request_cache[cache_key] = (response_data, time.time())
# Clean old cache entries (keep cache size manageable)
if len(request_cache) > 10000:
current_time = time.time()
expired_keys = [
k for k, (_, ts) in request_cache.items()
if current_time - ts > CACHE_TTL
]
for k in expired_keys:
del request_cache[k]
def make_concurrent_request(endpoint: str, method: str = "POST",
data: dict = None, timeout: int = 300,
use_cache: bool = True) -> dict:
"""
Make HTTP request with connection pooling and optional caching
Args:
endpoint: API endpoint URL
method: HTTP method (GET, POST, DELETE)
data: Request payload
timeout: Request timeout in seconds
use_cache: Whether to use response caching
Returns:
Response data as dictionary
"""
cache_key = None
# Check cache for GET requests and idempotent operations
if use_cache and method in ["GET", "POST"]:
cache_key = get_cache_key(endpoint, method, data)
cached_response = get_cached_response(cache_key)
if cached_response is not None:
return cached_response
# Make request using pooled session
try:
with session_pool_lock:
headers = get_headers()
if method == "GET":
response = session_pool.get(endpoint, headers=headers, timeout=timeout)
elif method == "POST":
response = session_pool.post(endpoint, headers=headers, json=data, timeout=timeout)
elif method == "DELETE":
response = session_pool.delete(endpoint, headers=headers, timeout=timeout)
else:
raise ValueError(f"Unsupported method: {method}")
# Handle different response types
if response.status_code == 204:
result = {"success": True, "status_code": 204}
else:
result = response.json()
# Cache successful responses
if use_cache and cache_key and response.status_code == 200:
set_cached_response(cache_key, result)
return result
except requests.exceptions.Timeout:
return {"error": "Request timed out"}
except requests.exceptions.RequestException as e:
return {"error": f"Request failed: {str(e)}"}
except Exception as e:
return {"error": f"Unexpected error: {str(e)}"}
def batch_execute_codes(codes: list, session_id: str, max_workers: int = 100) -> list:
"""
Execute multiple quantum codes concurrently
Args:
codes: List of quantum code strings
session_id: User session ID
max_workers: Maximum concurrent executions
Returns:
List of results in same order as input codes
"""
def execute_single(code_with_index):
idx, code = code_with_index
result = make_concurrent_request(
f"{BASE_URL}/script",
method="POST",
data={"code": code, "session_id": f"{session_id}_{idx}"},
timeout=300
)
return idx, result
# Submit all tasks
futures = []
with ThreadPoolExecutor(max_workers=max_workers) as batch_executor:
for idx, code in enumerate(codes):
future = batch_executor.submit(execute_single, (idx, code))
futures.append(future)
# Collect results in order
results = [None] * len(codes)
for future in as_completed(futures):
idx, result = future.result()
results[idx] = result
return results
# ==================== END CONCURRENT PROCESSING INFRASTRUCTURE ====================
def cleanup_expired_sessions():
with session_lock:
now = datetime.now()
expired = [sid for sid, data in sessions.items()
if now - data.get('last_access', datetime.min) > SESSION_TIMEOUT]
for sid in expired:
del sessions[sid]
def get_user_session_id(request: gr.Request) -> str:
if request:
session_id = request.session_hash
else:
session_id = str(uuid.uuid4())
cleanup_expired_sessions()
with session_lock:
if session_id not in sessions:
sessions[session_id] = {
'last_access': datetime.now(),
'request_count': 0,
'circuits': {}
}
sessions[session_id]['last_access'] = datetime.now()
sessions[session_id]['request_count'] += 1
return session_id
CUSTOM_CSS = """
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap');
* {
font-family: 'Inter', sans-serif !important;
}
.gradio-container {
max-width: 1400px !important;
}
.primary-btn {
background: linear-gradient(135deg, #8b5cf6, #6366f1) !important;
border: none !important;
font-weight: 600 !important;
transition: all 0.3s ease !important;
}
.primary-btn:hover {
transform: translateY(-2px) !important;
box-shadow: 0 10px 25px rgba(139, 92, 246, 0.4) !important;
}
h1, h2, h3 {
background: linear-gradient(135deg, #8b5cf6, #6366f1);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
font-weight: 700;
}
"""
def get_headers():
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else ""
}
def qasm_to_qreg(qasm_code: str) -> str:
# Track register names and their starting offsets
register_offsets = {}
total_qubits = 0
operations = []
# Pass 1: Identify all quantum registers and their offsets
for line in qasm_code.split('\n'):
line = line.strip()
if line.startswith('qreg'):
# Regex to find name and size: "qreg name[size]"
match = re.search(r'qreg\s+(\w+)\[(\d+)\]', line)
if match:
name = match.group(1)
size = int(match.group(2))
register_offsets[name] = total_qubits
total_qubits += size
# Helper to calculate global qubit index
def get_global_index(register_name, local_index):
base_offset = register_offsets.get(register_name, 0)
return base_offset + int(local_index)
# Pass 2: Process gates
for line in qasm_code.split('\n'):
line = line.strip()
if not line or line.startswith('//') or line.startswith('OPENQASM') or line.startswith('include'):
continue
if line.startswith('creg') or line.startswith('measure') or line.startswith('barrier'):
continue
# Match gates: gate_name[(params)] qreg[idx], ...;
gate_match = re.match(r'(\w+)(?:\(([\d.,\s-]+)\))?\s+(.*);', line)
if gate_match:
gate = gate_match.group(1).lower() # Normalize gate name
params = gate_match.group(2)
args = gate_match.group(3)
# Find all arguments: "q[0], c[1]" -> [('q', '0'), ('c', '1')]
qubit_args = re.findall(r'(\w+)\[(\d+)\]', args)
# Convert to global indices
global_indices = [get_global_index(name, idx) for name, idx in qubit_args]
# --- Gate Logic ---
# Single Qubit Gates
if gate in ['h', 'x', 'y', 'z']:
if global_indices:
operations.append(f"q.{gate.upper()}({global_indices[0]})")
elif gate == 's':
if global_indices:
operations.append(f"q.S({global_indices[0]})")
elif gate == 't':
if global_indices:
operations.append(f"q.T({global_indices[0]})")
# DG Gates (Convert to Rotation Gates as requested)
elif gate == 'sdg':
# Sdg is Rz(-pi/2)
if global_indices:
operations.append(f"q.Rz({global_indices[0]}, -1.57079632679)")
elif gate == 'tdg':
# Tdg is Rz(-pi/4)
if global_indices:
operations.append(f"q.Rz({global_indices[0]}, -0.78539816339)")
# Rotation Gates (rx, ry, rz)
elif gate in ['rx', 'ry', 'rz']:
if params and global_indices:
angle = params.strip()
# Capitalize R: rx -> Rx
operations.append(f"q.{gate.capitalize()}({global_indices[0]}, {angle})")
# Two Qubit Gates
elif gate in ['cx', 'cnot']:
if len(global_indices) >= 2:
# Check if control and target differ
if global_indices[0] != global_indices[1]:
operations.append(f"q.CNOT({global_indices[0]}, {global_indices[1]})")
else:
operations.append(f"# Skipped CNOT on same qubit")
elif gate == 'cz':
if len(global_indices) >= 2:
if global_indices[0] != global_indices[1]:
operations.append(f"q.CZ({global_indices[0]}, {global_indices[1]})")
else:
operations.append(f"# Skipped CZ on same qubit")
# Three Qubit Gates
elif gate in ['ccx', 'ccnot', 'toffoli']:
if len(global_indices) >= 3:
operations.append(f"q.CCNOT({global_indices[0]}, {global_indices[1]}, {global_indices[2]})")
if len(operations) > 1000000:
batches = []
for i in range(0, len(operations), 50000):
batch = operations[i:i+50000]
batches.append("\n".join(batch))
ops_code = "\n".join(batches)
else:
ops_code = "\n".join(operations)
return f"q = Qreg({total_qubits})\n{ops_code}\nprint(q.measure())"
# NEW: Qrisp to QASM transpiler (no simulation)
def qrisp_to_qasm(qrisp_code: str) -> str:
"""
Transpile Qrisp code to OpenQASM 2.0 without simulation.
Handles QuantumVariable, QuantumFloat, QuantumArray, and gate functions.
"""
lines = qrisp_code.strip().split('\n')
# Track quantum variables and their sizes
quantum_vars = {} # name -> (size, offset)
total_qubits = 0
gate_operations = []
has_measurement = False
# Parse imports and variable declarations
for line in lines:
line = line.strip()
# Skip imports and empty lines
if not line or line.startswith(('import ', 'from ', '#')):
continue
# QuantumVariable(size)
qv_match = re.search(r'(\w+)\s*=\s*QuantumVariable\((\d+)\)', line)
if qv_match:
name, size = qv_match.groups()
size = int(size)
quantum_vars[name] = (size, total_qubits)
total_qubits += size
continue
# QuantumFloat(size, exponent) or QuantumFloat(size)
qf_match = re.search(r'(\w+)\s*=\s*QuantumFloat\((\d+)(?:\s*,\s*[^)]+)?\)', line)
if qf_match:
name, size = qf_match.groups()
size = int(size)
quantum_vars[name] = (size, total_qubits)
total_qubits += size
continue
# QuantumArray(qtype, shape)
qa_match = re.search(r'(\w+)\s*=\s*QuantumArray\([^,]+,\s*shape\s*=\s*\(([^)]+)\)\)', line)
if qa_match:
name, shape_str = qa_match.groups()
# Calculate total size from shape
dims = [int(x.strip()) for x in shape_str.split(',')]
size = 1
for d in dims:
size *= d
quantum_vars[name] = (size, total_qubits)
total_qubits += size
continue
if total_qubits == 0:
return "Error: No QuantumVariable, QuantumFloat, or QuantumArray found"
# Helper to resolve qubit index
def resolve_qubit(var_expr):
"""
Resolve expressions like:
- qv[0] -> global index
- qv (whole variable) -> list of indices
- q_array[0,0,1] -> global index
"""
# Array indexing: q_array[0,0,1]
array_match = re.match(r'(\w+)\[([^\]]+)\]', var_expr)
if array_match:
var_name, indices_str = array_match.groups()
if var_name not in quantum_vars:
return None
size, offset = quantum_vars[var_name]
indices = [int(x.strip()) for x in indices_str.split(',')]
# Calculate flat index (simplified)
flat_idx = 0
for i, idx in enumerate(indices):
flat_idx += idx * (size // (i+1)) # Simplified
return offset + (indices[0] if len(indices) == 1 else flat_idx)
# Single qubit: qv[0]
single_match = re.match(r'(\w+)\[(\d+)\]', var_expr)
if single_match:
var_name, idx = single_match.groups()
if var_name not in quantum_vars:
return None
size, offset = quantum_vars[var_name]
return offset + int(idx)
# Whole variable: qv (expand to all qubits)
if var_expr in quantum_vars:
size, offset = quantum_vars[var_expr]
return list(range(offset, offset + size))
return None
# Parse gate operations
for line in lines:
line = line.strip()
if not line or line.startswith(('import ', 'from ', '#', 'print', 'return', 'def ', 'class ', 'try:', 'except', 'with ')):
continue
# Skip variable declarations
if re.search(r'=\s*Quantum', line):
continue
# Detect measurement
if re.match(r'measure\(', line):
has_measurement = True
# Parse what is being measured
match = re.match(r'measure\(([^)]+)\)', line)
if match:
target = match.group(1).strip()
qubits = resolve_qubit(target)
if isinstance(qubits, list):
for q in qubits:
gate_operations.append(f"measure q[{q}] -> c[{q}];")
elif qubits is not None:
gate_operations.append(f"measure q[{qubits}] -> c[{qubits}];")
continue
# Single qubit gates: h(qv[0]), x(qv), etc.
single_gate = re.match(r'(h|x|y|z|s|t)\(([^)]+)\)', line)
if single_gate:
gate, var_expr = single_gate.groups()
var_expr = var_expr.strip()
qubits = resolve_qubit(var_expr)
if isinstance(qubits, list):
for q in qubits:
gate_operations.append(f"{gate} q[{q}];")
elif qubits is not None:
gate_operations.append(f"{gate} q[{qubits}];")
continue
# Rotation gates: rx(angle, qv[0]) - Qrisp uses rx(angle, qubit)
rot_gate = re.match(r'(rx|ry|rz)\(([^,]+),\s*([^)]+)\)', line)
if rot_gate:
gate, angle, var_expr = rot_gate.groups()
var_expr = var_expr.strip()
qubits = resolve_qubit(var_expr)
# Capitalize the R as requested
gate_cap = gate.capitalize() # rx -> Rx
if isinstance(qubits, list):
for q in qubits:
gate_operations.append(f"{gate_cap}({angle}) q[{q}];")
elif qubits is not None:
gate_operations.append(f"{gate_cap}({angle}) q[{qubits}];")
continue
# CNOT/CX: cx(qv[0], qv[1])
cnot_gate = re.match(r'cx\(([^,]+),\s*([^)]+)\)', line)
if cnot_gate:
ctrl_expr, target_expr = cnot_gate.groups()
ctrl = resolve_qubit(ctrl_expr.strip())
target = resolve_qubit(target_expr.strip())
if isinstance(ctrl, list):
ctrl = ctrl[0] if ctrl else None
if isinstance(target, list):
target = target[0] if target else None
if ctrl is not None and target is not None:
gate_operations.append(f"cx q[{ctrl}],q[{target}];")
continue
# CZ: cz(qv[0], qv[1])
cz_gate = re.match(r'cz\(([^,]+),\s*([^)]+)\)', line)
if cz_gate:
q1_expr, q2_expr = cz_gate.groups()
q1 = resolve_qubit(q1_expr.strip())
q2 = resolve_qubit(q2_expr.strip())
if isinstance(q1, list):
q1 = q1[0] if q1 else None
if isinstance(q2, list):
q2 = q2[0] if q2 else None
if q1 is not None and q2 is not None:
gate_operations.append(f"cz q[{q1}],q[{q2}];")
continue
# CCNOT/Toffoli: ccx(qv[0], qv[1], qv[2])
ccnot_gate = re.match(r'ccx\(([^,]+),\s*([^,]+),\s*([^)]+)\)', line)
if ccnot_gate:
c1_expr, c2_expr, target_expr = ccnot_gate.groups()
c1 = resolve_qubit(c1_expr.strip())
c2 = resolve_qubit(c2_expr.strip())
target = resolve_qubit(target_expr.strip())
if isinstance(c1, list):
c1 = c1[0] if c1 else None
if isinstance(c2, list):
c2 = c2[0] if c2 else None
if isinstance(target, list):
target = target[0] if target else None
if all(x is not None for x in [c1, c2, target]):
gate_operations.append(f"ccx q[{c1}],q[{c2}],q[{target}];")
continue
# Build QASM output
qasm_lines = [
"OPENQASM 2.0;",
'include "qelib1.inc";',
f"qreg q[{total_qubits}];"
]
if has_measurement:
qasm_lines.append(f"creg c[{total_qubits}];")
qasm_lines.extend(gate_operations)
return "\n".join(qasm_lines)
def execute_quantum_code(code: str, request: gr.Request = None) -> str:
"""Execute quantum code using concurrent request infrastructure"""
session_id = get_user_session_id(request)
try:
if 'OPENQASM' in code or 'qreg' in code:
code = qasm_to_qreg(code)
result = make_concurrent_request(
f"{BASE_URL}/script",
method="POST",
data={"code": code, "session_id": session_id},
timeout=300,
use_cache=False # Don't cache quantum executions (non-deterministic)
)
if result.get("success"):
return result.get("output", "No output")
return f"Error: {result.get('error', 'Unknown error')}"
except Exception as e:
return f"Error: {str(e)}"
# NEW: Execute QASM directly on QPU-1
def execute_qasm_code(qasm_code: str, request: gr.Request = None) -> str:
"""Execute QASM code on QPU-1 (converts to Qreg internally then sends)"""
if not qasm_code or qasm_code.startswith("Error:"):
return "Error: Invalid QASM code"
return execute_quantum_code(qasm_code, request)
def execute_qasm_file(file, request: gr.Request = None) -> str:
"""Execute QASM file using concurrent infrastructure"""
session_id = get_user_session_id(request)
try:
if file is None:
return "Error: No file uploaded"
if hasattr(file, 'name'):
with open(file.name, 'r') as f:
qasm_code = f.read()
else:
qasm_code = file
qreg_code = qasm_to_qreg(qasm_code)
result = make_concurrent_request(
f"{BASE_URL}/script",
method="POST",
data={"code": qreg_code, "session_id": session_id},
timeout=300,
use_cache=False
)
if result.get("success"):
return result.get("output", "No output")
return f"Error: {result.get('error', 'Unknown error')}"
except Exception as e:
return f"Error: {str(e)}"
def create_circuit(num_qubits: str = "2", seed: str = "", request: gr.Request = None) -> str:
"""Create circuit using concurrent infrastructure"""
session_id = get_user_session_id(request)
try:
n = int(num_qubits)
payload = {"num_qubits": n}
if seed:
payload["seed"] = int(seed)
result = make_concurrent_request(
f"{BASE_URL}/circuit",
method="POST",
data=payload,
timeout=30,
use_cache=False
)
if "id" in result:
circuit_id = result["id"]
with session_lock:
sessions[session_id]['circuits'][circuit_id] = {
'num_qubits': n,
'created': datetime.now()
}
return f"Circuit created: {circuit_id}\nQubits: {result['num_qubits']}"
return f"Error: {result.get('error', 'Unknown error')}"
except ValueError:
return "Error: num_qubits and seed must be valid integers"
except Exception as e:
return f"Error: {str(e)}"
def apply_gate(circuit_id: str, gate: str, params: str, request: gr.Request = None) -> str:
"""Apply gate using concurrent infrastructure"""
try:
gate_params = json.loads(params) if params else {}
gate_params["gate"] = gate
result = make_concurrent_request(
f"{BASE_URL}/circuit/{circuit_id}/gate",
method="POST",
data=gate_params,
timeout=30,
use_cache=False
)
if result.get("success"):
return f"Gate applied: {result.get('gate', gate)}"
return f"Error: {result.get('error', 'Unknown error')}"
except json.JSONDecodeError:
return "Error: Invalid JSON in parameters"
except Exception as e:
return f"Error: {str(e)}"
def measure_circuit(circuit_id: str, qubit: str = "", request: gr.Request = None) -> str:
"""Measure circuit using concurrent infrastructure"""
try:
if qubit:
endpoint = f"{BASE_URL}/circuit/{circuit_id}/measure/{qubit}"
else:
endpoint = f"{BASE_URL}/circuit/{circuit_id}/measure"
result = make_concurrent_request(
endpoint,
method="POST",
timeout=30,
use_cache=False
)
if "result" in result:
return f"Measurement: {result['result']}"
elif "qubit" in result and "result" in result:
return f"Qubit {result['qubit']}: {result['result']}"
return f"Error: {result.get('error', 'Unknown error')}"
except Exception as e:
return f"Error: {str(e)}"
def get_circuit_state(circuit_id: str, request: gr.Request = None) -> str:
"""Get circuit state using concurrent infrastructure"""
try:
result = make_concurrent_request(
f"{BASE_URL}/circuit/{circuit_id}/state",
method="GET",
timeout=30,
use_cache=True
)
return json.dumps(result, indent=2)
except Exception as e:
return f"Error: {str(e)}"
def delete_circuit(circuit_id: str, request: gr.Request = None) -> str:
"""Delete circuit using concurrent infrastructure"""
session_id = get_user_session_id(request)
try:
result = make_concurrent_request(
f"{BASE_URL}/circuit/{circuit_id}",
method="DELETE",
timeout=30,
use_cache=False
)
if result.get("status_code") == 204 or result.get("success"):
with session_lock:
if circuit_id in sessions[session_id]['circuits']:
del sessions[session_id]['circuits'][circuit_id]
return f"Circuit {circuit_id} deleted successfully"
return f"Error: {result.get('error', 'Unknown error')}"
except Exception as e:
return f"Error: {str(e)}"
def list_circuits(request: gr.Request = None) -> str:
"""List circuits using concurrent infrastructure with caching"""
try:
result = make_concurrent_request(
f"{BASE_URL}/circuits",
method="GET",
timeout=30,
use_cache=True
)
return json.dumps(result, indent=2)
except Exception as e:
return f"Error: {str(e)}"
def create_bell_state() -> str:
code = "q = Qreg(2)\nq.H(0)\nq.CNOT(0, 1)\nprint(q.measure())"
return execute_quantum_code(code)
def create_superposition(num_qubits: str = "10") -> str:
try:
n = int(num_qubits)
if n < 1 or n > 1000000:
return "Error: num_qubits must be between 1 and 1,000,000"
code = f"q = Qreg({n})\nq.H_all()\nresult = q.measure()\nprint(f'Result: {{result[:50]}}')\nprint(f'Ones: {{result.count(\"1\")}}/{n}')"
return execute_quantum_code(code)
except ValueError:
return "Error: num_qubits must be a valid integer"
def test_bell_correlation(trials: str = "100") -> str:
try:
n = int(trials)
code = f"""n={n}
corr=0
for _ in range(n):
q=Qreg(2)
q.H(0)
q.CNOT(0,1)
r=q.measure()
if r in['00','11']:
corr+=1
print(f'Correlation: {{corr/n*100:.1f}}%')"""
return execute_quantum_code(code)
except ValueError:
return "Error: trials must be a valid integer"
def test_ghz_state(num_qubits: str = "3") -> str:
try:
n = int(num_qubits)
if n < 2:
return "Error: need at least 2 qubits for GHZ"
cnot_chain = "\n".join([f" q.CNOT(0, {i})" for i in range(1, n)])
code = f"""q = Qreg({n})
q.H(0)
{cnot_chain}
result = q.measure()
print(f'GHZ-{n}: {{result}}')"""
return execute_quantum_code(code)
except ValueError:
return "Error: num_qubits must be a valid integer"
def benchmark_gate_speed() -> str:
code = """import time
q = Qreg(100000)
start = time.time()
for _ in range(1000):
q.H(0)
elapsed = time.time() - start
print(f'Gate speed: {1000/elapsed/1e6:.0f}M gates/s')"""
return execute_quantum_code(code)
def test_long_range_entanglement(num_qubits: str = "1000000") -> str:
try:
n = int(num_qubits)
code = f"""import time
q = Qreg({n})
q.H(0)
q.CNOT(0, {n-1})
start = time.time()
result = q.measure()
elapsed = time.time() - start
print(f'Measurement: {{elapsed*1000:.1f}}ms')
print(f'Qubit 0: {{result[0]}}')
print(f'Qubit {n-1}: {{result[{n-1}]}}')
print(f'Correlated: {{result[0] == result[{n-1}]}}')"""
return execute_quantum_code(code)
except ValueError:
return "Error: num_qubits must be a valid integer"
def get_qpu_health() -> str:
"""Get QPU health using concurrent infrastructure with caching"""
try:
health = make_concurrent_request(
f"{BASE_URL}/health",
method="GET",
timeout=5,
use_cache=True
)
cleanup_expired_sessions()
with session_lock:
health['active_users'] = len(sessions)
health['total_requests'] = sum(s.get('request_count', 0) for s in sessions.values())
health['cache_size'] = len(request_cache)
health['thread_pool_workers'] = executor._max_workers
return json.dumps(health, indent=2)
except Exception as e:
return json.dumps({"error": str(e)}, indent=2)
def quantum_circuit_prompt(operation: str, num_qubits: str = "2") -> str:
prompts = {
"bell": f"Create a Bell state with {num_qubits} qubits showing maximum entanglement",
"ghz": f"Create a GHZ state with {num_qubits} qubits where all qubits are entangled",
"superposition": f"Put {num_qubits} qubits into equal superposition using Hadamard gates",
"teleportation": f"Implement quantum teleportation protocol using {num_qubits} qubits"
}
return prompts.get(operation, f"Create a quantum circuit with {num_qubits} qubits")
EXAMPLES = {
"Bell State": "q = Qreg(2)\nq.H(0)\nq.CNOT(0, 1)\nprint(q.measure())",
"Superposition": "q = Qreg(10)\nq.H_all()\nprint(q.measure())",
"GHZ-3": "q = Qreg(3)\nq.H(0)\nq.CNOT(0, 1)\nq.CNOT(0, 2)\nprint(q.measure())",
"Rotation Gates": "q = Qreg(1)\nq.Rx(0, 1.5708)\nq.Ry(0, 3.1416)\nq.Rz(0, 0.7854)\nprint(q.measure())",
"1M Qubits": "import time\nq = Qreg(1000000)\nstart = time.time()\nq.H_all()\nresult = q.measure()\nprint(f'{time.time()-start:.3f}s')",
"Correlation Test": "n=100\nc=0\nfor _ in range(n):\n q=Qreg(2);q.H(0);q.CNOT(0,1)\n if q.measure() in['00','11']:c+=1\nprint(f'{c/n*100:.0f}%')",
"QASM Bell": "OPENQASM 2.0;\ninclude \"qelib1.inc\";\nqreg q[2];\nh q[0];\ncx q[0],q[1];",
}
with gr.Blocks(title="QPU-1 by Lap Quantum", css=CUSTOM_CSS, theme=gr.themes.Base(
primary_hue="purple",
font=gr.themes.GoogleFont("Inter")
)) as app:
gr.Markdown("""
# 🔮 QPU-1 Quantum Processing Unit
## Lap Quantum™ | Part of Lap Technologies
**1,000,000+ qubits** • **MCP-enabled** • **QASM compatible** • **REST API** • **⚡ Concurrent Processing**
""")
with gr.Tab("⚡ Execute"):
code_input = gr.Code(
label="Quantum Script (Qreg or QASM)",
language="python",
lines=14,
value=EXAMPLES["Bell State"]
)
with gr.Row():
execute_btn = gr.Button("▶️ Execute", variant="primary", size="lg", elem_classes=["primary-btn"])
clear_btn = gr.Button("🗑️ Clear", size="lg")
result_output = gr.Textbox(label="Output", lines=8)
gr.Markdown("### 📚 Examples")
with gr.Row():
for name in EXAMPLES.keys():
gr.Button(name, size="sm").click(
fn=lambda n=name: EXAMPLES[n],
outputs=code_input
)
execute_btn.click(
fn=execute_quantum_code,
inputs=code_input,
outputs=result_output,
api_name="execute"
)
clear_btn.click(fn=lambda: ("", ""), outputs=[code_input, result_output])
# NEW: Transpiler Tab (Qrisp to QASM)
with gr.Tab("🔄 Transpiler"):
gr.Markdown("""
### Qrisp ↔ QASM Transpiler
Convert Qrisp high-level Python code to OpenQASM 2.0 without simulation, then execute on QPU-1.
Supports QuantumVariable, QuantumFloat, QuantumArray, and standard Qrisp gates.
""")
with gr.Row():
with gr.Column():
qrisp_input = gr.Code(
label="Qrisp Input (Python)",
language="python",
lines=12,
value="""from qrisp import QuantumVariable, h, cx, measure
qv = QuantumVariable(2)
h(qv[0])
cx(qv[0], qv[1])
measure(qv)"""
)
with gr.Row():
transpile_btn = gr.Button("🔄 Transpile to QASM", variant="primary", elem_classes=["primary-btn"])
execute_qasm_btn = gr.Button("▶️ Execute QASM on QPU-1", variant="secondary")
with gr.Column():
qasm_output = gr.Code(
label="Generated OpenQASM 2.0",
language="javascript",
lines=12,
value="// QASM will appear here..."
)
transpiler_result = gr.Textbox(label="Execution Result", lines=4)
# Wire up the transpile button
transpile_btn.click(
fn=qrisp_to_qasm,
inputs=qrisp_input,
outputs=qasm_output,
api_name="transpile_qrisp_to_qasm"
)
# Wire up execute button (takes QASM output and runs it)
execute_qasm_btn.click(
fn=execute_qasm_code,
inputs=qasm_output,
outputs=transpiler_result,
api_name="execute_transpiled_qasm"
)
gr.Markdown("""
**Supported Qrisp Syntax:**
- **Variables**: `QuantumVariable(n)`, `QuantumFloat(n)`, `QuantumArray(qtype, shape=(...))`
- **Single-qubit**: `h(qv)`, `x(qv[0])`, `y(qv)`, `z(qv[2])`, `s(qv)`, `t(qv)`
- **Rotation gates**: `rx(angle, qv)`, `ry(angle, qv[0])`, `rz(angle, qv)` → **Capitalized in QASM: Rx, Ry, Rz**
- **Two-qubit**: `cx(qv[0], qv[1])`, `cz(qv[0], qv[1])`
- **Three-qubit**: `ccx(qv[0], qv[1], qv[2])` (Toffoli)
- **Measurement**: `measure(qv)` (expands to measure all qubits)
**Note:** This is a pure transpiler - no quantum simulation happens during conversion.
""")
with gr.Tab("🔧 Circuit Builder"):
gr.Markdown("""
### Step-by-step Circuit Construction
Build quantum circuits incrementally using the REST API endpoints.
""")
with gr.Group():
gr.Markdown("#### 1. Create Circuit")
with gr.Row():
circuit_qubits = gr.Textbox(value="2", label="Number of Qubits")
circuit_seed = gr.Textbox(value="", label="Seed (optional)", placeholder="42")
create_circuit_btn = gr.Button("Create Circuit", variant="primary")
circuit_output = gr.Textbox(label="Circuit ID", lines=2)
create_circuit_btn.click(
fn=create_circuit,
inputs=[circuit_qubits, circuit_seed],
outputs=circuit_output,
api_name="create_circuit"
)
with gr.Group():
gr.Markdown("#### 2. Apply Gates")
circuit_id_input = gr.Textbox(label="Circuit ID", placeholder="550e8400-e29b-41d4-a716-446655440000")
with gr.Row():
gate_select = gr.Dropdown(
choices=["H", "X", "Y", "Z", "S", "T", "CNOT", "CZ", "CCNOT", "Rx", "Ry", "Rz"],
value="H",
label="Gate"
)
gate_params = gr.Textbox(
label="Parameters (JSON)",
placeholder='{"qubit": 0} or {"control": 0, "target": 1}',
value='{"qubit": 0}'
)
apply_gate_btn = gr.Button("Apply Gate", variant="secondary")
gate_output = gr.Textbox(label="Result", lines=2)
apply_gate_btn.click(
fn=apply_gate,
inputs=[circuit_id_input, gate_select, gate_params],
outputs=gate_output,
api_name="apply_gate"
)
gr.Markdown("""
**Parameter formats:**
- Single-qubit: `{"qubit": 0}`
- Rotation: `{"qubit": 0, "theta": 1.5708}`
- CNOT: `{"control": 0, "target": 1}`
- CZ: `{"qubit1": 0, "qubit2": 1}`
- CCNOT: `{"control1": 0, "control2": 1, "target": 2}`
""")
with gr.Group():
gr.Markdown("#### 3. Measure")
with gr.Row():
measure_circuit_id = gr.Textbox(label="Circuit ID")
measure_qubit = gr.Textbox(label="Qubit (empty for all)", placeholder="0")
measure_btn = gr.Button("Measure", variant="secondary")
measure_output = gr.Textbox(label="Result", lines=2)
measure_btn.click(
fn=measure_circuit,
inputs=[measure_circuit_id, measure_qubit],
outputs=measure_output,
api_name="measure_circuit"
)
with gr.Group():
gr.Markdown("#### 4. Circuit Management")
with gr.Row():
state_circuit_id = gr.Textbox(label="Circuit ID (for state)")
delete_circuit_id = gr.Textbox(label="Circuit ID (for delete)")
with gr.Row():
state_btn = gr.Button("Get State", variant="secondary")
delete_btn = gr.Button("Delete Circuit", variant="secondary")
list_btn = gr.Button("List All Circuits", variant="secondary")
management_output = gr.Textbox(label="Result", lines=6)
state_btn.click(
fn=get_circuit_state,
inputs=state_circuit_id,
outputs=management_output,
api_name="get_circuit_state"
)
delete_btn.click(
fn=delete_circuit,
inputs=delete_circuit_id,
outputs=management_output,
api_name="delete_circuit"
)
list_btn.click(
fn=list_circuits,
outputs=management_output,
api_name="list_circuits"
)
with gr.Tab("📄 QASM Upload"):
gr.Markdown("""
### Upload Large QASM Files
For huge QASM files like **Quandoom** (70K qubits, 80M gates), upload the .qasm file directly.
The transpiler will convert it to Qreg and execute on QPU-1.
""")
qasm_file = gr.File(label="Upload QASM File", file_types=[".qasm", ".txt"])
with gr.Row():
qasm_execute_btn = gr.Button("▶️ Execute QASM File", variant="primary", size="lg", elem_classes=["primary-btn"])
qasm_result = gr.Textbox(label="Output", lines=10)
gr.Markdown("""
**Supported Gates:** h, x, y, z, t, s, cx/cnot, cz, ccx/toffoli, rx, ry, rz
**Example:** Upload the Quandoom QASM file to run DOOM on QPU-1!
""")
qasm_execute_btn.click(
fn=execute_qasm_file,
inputs=qasm_file,
outputs=qasm_result,
api_name="execute_qasm_file"
)
with gr.Tab("🔧 MCP Tools"):
gr.Markdown("""
### Quantum Computing Tools
These functions are exposed as MCP tools for AI assistants:
""")
with gr.Group():
gr.Markdown("#### Bell State Generator")
bell_btn = gr.Button("Generate Bell State", variant="secondary")
bell_output = gr.Textbox(label="Result", lines=3)
bell_btn.click(fn=create_bell_state, outputs=bell_output, api_name="create_bell_state")
with gr.Group():
gr.Markdown("#### Superposition Creator")
super_qubits = gr.Textbox(value="10", label="Number of Qubits")
super_btn = gr.Button("Create Superposition", variant="secondary")
super_output = gr.Textbox(label="Result", lines=3)
super_btn.click(fn=create_superposition, inputs=super_qubits, outputs=super_output, api_name="create_superposition")
with gr.Group():
gr.Markdown("#### Bell Correlation Tester")
trials_input = gr.Textbox(value="100", label="Number of Trials")
corr_btn = gr.Button("Test Correlation", variant="secondary")
corr_output = gr.Textbox(label="Result", lines=3)
corr_btn.click(fn=test_bell_correlation, inputs=trials_input, outputs=corr_output, api_name="test_bell_correlation")
with gr.Group():
gr.Markdown("#### GHZ State Creator")
ghz_qubits = gr.Textbox(value="3", label="Number of Qubits")
ghz_btn = gr.Button("Create GHZ State", variant="secondary")
ghz_output = gr.Textbox(label="Result", lines=3)
ghz_btn.click(fn=test_ghz_state, inputs=ghz_qubits, outputs=ghz_output, api_name="test_ghz_state")
with gr.Group():
gr.Markdown("#### Long-Range Entanglement Test")
lre_qubits = gr.Textbox(value="1000000", label="Number of Qubits")
lre_btn = gr.Button("Test Entanglement", variant="secondary")
lre_output = gr.Textbox(label="Result", lines=4)
lre_btn.click(fn=test_long_range_entanglement, inputs=lre_qubits, outputs=lre_output, api_name="test_long_range_entanglement")
with gr.Group():
gr.Markdown("#### Benchmark")
bench_btn = gr.Button("Run Gate Speed Benchmark", variant="secondary")
bench_output = gr.Textbox(label="Result", lines=3)
bench_btn.click(fn=benchmark_gate_speed, outputs=bench_output, api_name="benchmark_gate_speed")
with gr.Group():
gr.Markdown("#### QPU Health")
health_btn = gr.Button("Check QPU Health", variant="secondary")
health_output = gr.Textbox(label="Result", lines=8)
health_btn.click(fn=get_qpu_health, outputs=health_output, api_name="get_qpu_health")
with gr.Tab("📖 MCP Configuration"):
gr.Markdown(f"""
### Connect to Claude Desktop / Cursor / Cline
Add this to your MCP client configuration:
```json
{{
"mcpServers": {{
"qpu-1": {{
"url": "{BASE_URL}/gradio_api/mcp/"
}}
}}
}}
```
For private spaces, add your HF token:
```json
{{
"mcpServers": {{
"qpu-1": {{
"url": "{BASE_URL}/gradio_api/mcp/",
"headers": {{
"Authorization": "Bearer YOUR_HF_TOKEN"
}}
}}
}}
}}
```
### Available MCP Tools
**Script Execution:**
- `execute_quantum_code` - Execute Qreg or QASM code
- `execute_qasm_file` - Execute QASM file
- `qrisp_to_qasm` - Transpile Qrisp code to OpenQASM 2.0 (NEW)
- `execute_transpiled_qasm` - Execute generated QASM on QPU-1 (NEW)
**Circuit Building:**
- `create_circuit` - Create new quantum circuit
- `apply_gate` - Apply gate to circuit
- `measure_circuit` - Measure circuit qubits
- `get_circuit_state` - Get circuit state
- `delete_circuit` - Delete circuit
- `list_circuits` - List all circuits
**Quick Operations:**
- `create_bell_state` - Create entangled Bell state
- `create_superposition` - Create N-qubit superposition
- `test_bell_correlation` - Verify quantum entanglement
- `test_ghz_state` - Create GHZ state
- `test_long_range_entanglement` - Test long-range entanglement
- `benchmark_gate_speed` - Measure gate throughput
- `get_qpu_health` - QPU-1 health status
### QASM Support
QPU-1 supports OpenQASM 2.0 input. Paste QASM code directly and it will be transpiled to Qreg automatically.
**Supported gates:** h, x, y, z, t, s, cx/cnot, cz, ccnot/ccx/toffoli, rx, ry, rz
### API Documentation
Full REST API documentation available at: `{BASE_URL}/`
**Available Gates:**
- Single-qubit: H, X, Y, Z, S, T, Rx, Ry, Rz
- Two-qubit: CNOT, CZ
- Three-qubit: CCNOT (Toffoli)
- Batch: H_all (Hadamard on all qubits)
**Specifications:**
- Max Qubits: 1,000,000+
- Gate Throughput: 186M gates/second
- Topology: Full mesh (any-to-any connectivity)
- **Concurrent Processing**: Up to 500 parallel requests
- **Connection Pooling**: 1000 max connections
- **Response Caching**: 60-second TTL for identical requests
""")
if __name__ == "__main__":
app.launch(
mcp_server=True,
server_name="0.0.0.0",
server_port=7860,
share=False
)