Spaces:
Running
Running
| import gradio as gr | |
| import json | |
| import requests | |
| import os | |
| from typing import Optional | |
| import time | |
| from PIL import Image | |
| import re | |
| import uuid | |
| import base64 | |
| from io import BytesIO | |
| import threading | |
| from datetime import datetime, timedelta | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| import asyncio | |
| from functools import lru_cache | |
| import hashlib | |
| BASE_URL = "https://lap-quantum-qpu-1-api.hf.space" | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| sessions = {} | |
| session_lock = threading.Lock() | |
| SESSION_TIMEOUT = timedelta(hours=1.5) | |
| # ==================== CONCURRENT PROCESSING INFRASTRUCTURE ==================== | |
| # Connection pool with retry strategy | |
| def create_session_pool(): | |
| """Create a requests session with connection pooling and retry logic""" | |
| session = requests.Session() | |
| # Retry strategy: 3 retries with exponential backoff | |
| retry_strategy = Retry( | |
| total=3, | |
| backoff_factor=0.1, | |
| status_forcelist=[429, 500, 502, 503, 504], | |
| allowed_methods=["GET", "POST", "DELETE"] | |
| ) | |
| # Adapter with connection pooling | |
| adapter = HTTPAdapter( | |
| max_retries=retry_strategy, | |
| pool_connections=10000, # Number of connection pools | |
| pool_maxsize=90000, # Max connections per pool | |
| pool_block=False | |
| ) | |
| session.mount("http://", adapter) | |
| session.mount("https://", adapter) | |
| return session | |
| # Global session pool (thread-safe) | |
| session_pool = create_session_pool() | |
| session_pool_lock = threading.Lock() | |
| # Thread pool for concurrent execution | |
| # Scale based on expected workload - can handle thousands of concurrent requests | |
| executor = ThreadPoolExecutor( | |
| max_workers=5000, # Adjust based on your server capacity | |
| thread_name_prefix="qpu_worker" | |
| ) | |
| # Request cache (for identical requests within 60 seconds) | |
| request_cache = {} | |
| cache_lock = threading.Lock() | |
| CACHE_TTL = 60 # seconds | |
| def get_cache_key(endpoint: str, method: str, data: dict = None) -> str: | |
| """Generate cache key from request parameters""" | |
| cache_data = { | |
| 'endpoint': endpoint, | |
| 'method': method, | |
| 'data': data | |
| } | |
| return hashlib.sha256(json.dumps(cache_data, sort_keys=True).encode()).hexdigest() | |
| def get_cached_response(cache_key: str): | |
| """Retrieve cached response if still valid""" | |
| with cache_lock: | |
| if cache_key in request_cache: | |
| cached_data, timestamp = request_cache[cache_key] | |
| if time.time() - timestamp < CACHE_TTL: | |
| return cached_data | |
| else: | |
| del request_cache[cache_key] | |
| return None | |
| def set_cached_response(cache_key: str, response_data): | |
| """Cache response with timestamp""" | |
| with cache_lock: | |
| request_cache[cache_key] = (response_data, time.time()) | |
| # Clean old cache entries (keep cache size manageable) | |
| if len(request_cache) > 10000: | |
| current_time = time.time() | |
| expired_keys = [ | |
| k for k, (_, ts) in request_cache.items() | |
| if current_time - ts > CACHE_TTL | |
| ] | |
| for k in expired_keys: | |
| del request_cache[k] | |
| def make_concurrent_request(endpoint: str, method: str = "POST", | |
| data: dict = None, timeout: int = 300, | |
| use_cache: bool = True) -> dict: | |
| """ | |
| Make HTTP request with connection pooling and optional caching | |
| Args: | |
| endpoint: API endpoint URL | |
| method: HTTP method (GET, POST, DELETE) | |
| data: Request payload | |
| timeout: Request timeout in seconds | |
| use_cache: Whether to use response caching | |
| Returns: | |
| Response data as dictionary | |
| """ | |
| cache_key = None | |
| # Check cache for GET requests and idempotent operations | |
| if use_cache and method in ["GET", "POST"]: | |
| cache_key = get_cache_key(endpoint, method, data) | |
| cached_response = get_cached_response(cache_key) | |
| if cached_response is not None: | |
| return cached_response | |
| # Make request using pooled session | |
| try: | |
| with session_pool_lock: | |
| headers = get_headers() | |
| if method == "GET": | |
| response = session_pool.get(endpoint, headers=headers, timeout=timeout) | |
| elif method == "POST": | |
| response = session_pool.post(endpoint, headers=headers, json=data, timeout=timeout) | |
| elif method == "DELETE": | |
| response = session_pool.delete(endpoint, headers=headers, timeout=timeout) | |
| else: | |
| raise ValueError(f"Unsupported method: {method}") | |
| # Handle different response types | |
| if response.status_code == 204: | |
| result = {"success": True, "status_code": 204} | |
| else: | |
| result = response.json() | |
| # Cache successful responses | |
| if use_cache and cache_key and response.status_code == 200: | |
| set_cached_response(cache_key, result) | |
| return result | |
| except requests.exceptions.Timeout: | |
| return {"error": "Request timed out"} | |
| except requests.exceptions.RequestException as e: | |
| return {"error": f"Request failed: {str(e)}"} | |
| except Exception as e: | |
| return {"error": f"Unexpected error: {str(e)}"} | |
| def batch_execute_codes(codes: list, session_id: str, max_workers: int = 100) -> list: | |
| """ | |
| Execute multiple quantum codes concurrently | |
| Args: | |
| codes: List of quantum code strings | |
| session_id: User session ID | |
| max_workers: Maximum concurrent executions | |
| Returns: | |
| List of results in same order as input codes | |
| """ | |
| def execute_single(code_with_index): | |
| idx, code = code_with_index | |
| result = make_concurrent_request( | |
| f"{BASE_URL}/script", | |
| method="POST", | |
| data={"code": code, "session_id": f"{session_id}_{idx}"}, | |
| timeout=300 | |
| ) | |
| return idx, result | |
| # Submit all tasks | |
| futures = [] | |
| with ThreadPoolExecutor(max_workers=max_workers) as batch_executor: | |
| for idx, code in enumerate(codes): | |
| future = batch_executor.submit(execute_single, (idx, code)) | |
| futures.append(future) | |
| # Collect results in order | |
| results = [None] * len(codes) | |
| for future in as_completed(futures): | |
| idx, result = future.result() | |
| results[idx] = result | |
| return results | |
| # ==================== END CONCURRENT PROCESSING INFRASTRUCTURE ==================== | |
| def cleanup_expired_sessions(): | |
| with session_lock: | |
| now = datetime.now() | |
| expired = [sid for sid, data in sessions.items() | |
| if now - data.get('last_access', datetime.min) > SESSION_TIMEOUT] | |
| for sid in expired: | |
| del sessions[sid] | |
| def get_user_session_id(request: gr.Request) -> str: | |
| if request: | |
| session_id = request.session_hash | |
| else: | |
| session_id = str(uuid.uuid4()) | |
| cleanup_expired_sessions() | |
| with session_lock: | |
| if session_id not in sessions: | |
| sessions[session_id] = { | |
| 'last_access': datetime.now(), | |
| 'request_count': 0, | |
| 'circuits': {} | |
| } | |
| sessions[session_id]['last_access'] = datetime.now() | |
| sessions[session_id]['request_count'] += 1 | |
| return session_id | |
| CUSTOM_CSS = """ | |
| @import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap'); | |
| * { | |
| font-family: 'Inter', sans-serif !important; | |
| } | |
| .gradio-container { | |
| max-width: 1400px !important; | |
| } | |
| .primary-btn { | |
| background: linear-gradient(135deg, #8b5cf6, #6366f1) !important; | |
| border: none !important; | |
| font-weight: 600 !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| .primary-btn:hover { | |
| transform: translateY(-2px) !important; | |
| box-shadow: 0 10px 25px rgba(139, 92, 246, 0.4) !important; | |
| } | |
| h1, h2, h3 { | |
| background: linear-gradient(135deg, #8b5cf6, #6366f1); | |
| -webkit-background-clip: text; | |
| -webkit-text-fill-color: transparent; | |
| font-weight: 700; | |
| } | |
| """ | |
| def get_headers(): | |
| return { | |
| "Content-Type": "application/json", | |
| "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "" | |
| } | |
| def qasm_to_qreg(qasm_code: str) -> str: | |
| # Track register names and their starting offsets | |
| register_offsets = {} | |
| total_qubits = 0 | |
| operations = [] | |
| # Pass 1: Identify all quantum registers and their offsets | |
| for line in qasm_code.split('\n'): | |
| line = line.strip() | |
| if line.startswith('qreg'): | |
| # Regex to find name and size: "qreg name[size]" | |
| match = re.search(r'qreg\s+(\w+)\[(\d+)\]', line) | |
| if match: | |
| name = match.group(1) | |
| size = int(match.group(2)) | |
| register_offsets[name] = total_qubits | |
| total_qubits += size | |
| # Helper to calculate global qubit index | |
| def get_global_index(register_name, local_index): | |
| base_offset = register_offsets.get(register_name, 0) | |
| return base_offset + int(local_index) | |
| # Pass 2: Process gates | |
| for line in qasm_code.split('\n'): | |
| line = line.strip() | |
| if not line or line.startswith('//') or line.startswith('OPENQASM') or line.startswith('include'): | |
| continue | |
| if line.startswith('creg') or line.startswith('measure') or line.startswith('barrier'): | |
| continue | |
| # Match gates: gate_name[(params)] qreg[idx], ...; | |
| gate_match = re.match(r'(\w+)(?:\(([\d.,\s-]+)\))?\s+(.*);', line) | |
| if gate_match: | |
| gate = gate_match.group(1).lower() # Normalize gate name | |
| params = gate_match.group(2) | |
| args = gate_match.group(3) | |
| # Find all arguments: "q[0], c[1]" -> [('q', '0'), ('c', '1')] | |
| qubit_args = re.findall(r'(\w+)\[(\d+)\]', args) | |
| # Convert to global indices | |
| global_indices = [get_global_index(name, idx) for name, idx in qubit_args] | |
| # --- Gate Logic --- | |
| # Single Qubit Gates | |
| if gate in ['h', 'x', 'y', 'z']: | |
| if global_indices: | |
| operations.append(f"q.{gate.upper()}({global_indices[0]})") | |
| elif gate == 's': | |
| if global_indices: | |
| operations.append(f"q.S({global_indices[0]})") | |
| elif gate == 't': | |
| if global_indices: | |
| operations.append(f"q.T({global_indices[0]})") | |
| # DG Gates (Convert to Rotation Gates as requested) | |
| elif gate == 'sdg': | |
| # Sdg is Rz(-pi/2) | |
| if global_indices: | |
| operations.append(f"q.Rz({global_indices[0]}, -1.57079632679)") | |
| elif gate == 'tdg': | |
| # Tdg is Rz(-pi/4) | |
| if global_indices: | |
| operations.append(f"q.Rz({global_indices[0]}, -0.78539816339)") | |
| # Rotation Gates (rx, ry, rz) | |
| elif gate in ['rx', 'ry', 'rz']: | |
| if params and global_indices: | |
| angle = params.strip() | |
| # Capitalize R: rx -> Rx | |
| operations.append(f"q.{gate.capitalize()}({global_indices[0]}, {angle})") | |
| # Two Qubit Gates | |
| elif gate in ['cx', 'cnot']: | |
| if len(global_indices) >= 2: | |
| # Check if control and target differ | |
| if global_indices[0] != global_indices[1]: | |
| operations.append(f"q.CNOT({global_indices[0]}, {global_indices[1]})") | |
| else: | |
| operations.append(f"# Skipped CNOT on same qubit") | |
| elif gate == 'cz': | |
| if len(global_indices) >= 2: | |
| if global_indices[0] != global_indices[1]: | |
| operations.append(f"q.CZ({global_indices[0]}, {global_indices[1]})") | |
| else: | |
| operations.append(f"# Skipped CZ on same qubit") | |
| # Three Qubit Gates | |
| elif gate in ['ccx', 'ccnot', 'toffoli']: | |
| if len(global_indices) >= 3: | |
| operations.append(f"q.CCNOT({global_indices[0]}, {global_indices[1]}, {global_indices[2]})") | |
| if len(operations) > 1000000: | |
| batches = [] | |
| for i in range(0, len(operations), 50000): | |
| batch = operations[i:i+50000] | |
| batches.append("\n".join(batch)) | |
| ops_code = "\n".join(batches) | |
| else: | |
| ops_code = "\n".join(operations) | |
| return f"q = Qreg({total_qubits})\n{ops_code}\nprint(q.measure())" | |
| # NEW: Qrisp to QASM transpiler (no simulation) | |
| def qrisp_to_qasm(qrisp_code: str) -> str: | |
| """ | |
| Transpile Qrisp code to OpenQASM 2.0 without simulation. | |
| Handles QuantumVariable, QuantumFloat, QuantumArray, and gate functions. | |
| """ | |
| lines = qrisp_code.strip().split('\n') | |
| # Track quantum variables and their sizes | |
| quantum_vars = {} # name -> (size, offset) | |
| total_qubits = 0 | |
| gate_operations = [] | |
| has_measurement = False | |
| # Parse imports and variable declarations | |
| for line in lines: | |
| line = line.strip() | |
| # Skip imports and empty lines | |
| if not line or line.startswith(('import ', 'from ', '#')): | |
| continue | |
| # QuantumVariable(size) | |
| qv_match = re.search(r'(\w+)\s*=\s*QuantumVariable\((\d+)\)', line) | |
| if qv_match: | |
| name, size = qv_match.groups() | |
| size = int(size) | |
| quantum_vars[name] = (size, total_qubits) | |
| total_qubits += size | |
| continue | |
| # QuantumFloat(size, exponent) or QuantumFloat(size) | |
| qf_match = re.search(r'(\w+)\s*=\s*QuantumFloat\((\d+)(?:\s*,\s*[^)]+)?\)', line) | |
| if qf_match: | |
| name, size = qf_match.groups() | |
| size = int(size) | |
| quantum_vars[name] = (size, total_qubits) | |
| total_qubits += size | |
| continue | |
| # QuantumArray(qtype, shape) | |
| qa_match = re.search(r'(\w+)\s*=\s*QuantumArray\([^,]+,\s*shape\s*=\s*\(([^)]+)\)\)', line) | |
| if qa_match: | |
| name, shape_str = qa_match.groups() | |
| # Calculate total size from shape | |
| dims = [int(x.strip()) for x in shape_str.split(',')] | |
| size = 1 | |
| for d in dims: | |
| size *= d | |
| quantum_vars[name] = (size, total_qubits) | |
| total_qubits += size | |
| continue | |
| if total_qubits == 0: | |
| return "Error: No QuantumVariable, QuantumFloat, or QuantumArray found" | |
| # Helper to resolve qubit index | |
| def resolve_qubit(var_expr): | |
| """ | |
| Resolve expressions like: | |
| - qv[0] -> global index | |
| - qv (whole variable) -> list of indices | |
| - q_array[0,0,1] -> global index | |
| """ | |
| # Array indexing: q_array[0,0,1] | |
| array_match = re.match(r'(\w+)\[([^\]]+)\]', var_expr) | |
| if array_match: | |
| var_name, indices_str = array_match.groups() | |
| if var_name not in quantum_vars: | |
| return None | |
| size, offset = quantum_vars[var_name] | |
| indices = [int(x.strip()) for x in indices_str.split(',')] | |
| # Calculate flat index (simplified) | |
| flat_idx = 0 | |
| for i, idx in enumerate(indices): | |
| flat_idx += idx * (size // (i+1)) # Simplified | |
| return offset + (indices[0] if len(indices) == 1 else flat_idx) | |
| # Single qubit: qv[0] | |
| single_match = re.match(r'(\w+)\[(\d+)\]', var_expr) | |
| if single_match: | |
| var_name, idx = single_match.groups() | |
| if var_name not in quantum_vars: | |
| return None | |
| size, offset = quantum_vars[var_name] | |
| return offset + int(idx) | |
| # Whole variable: qv (expand to all qubits) | |
| if var_expr in quantum_vars: | |
| size, offset = quantum_vars[var_expr] | |
| return list(range(offset, offset + size)) | |
| return None | |
| # Parse gate operations | |
| for line in lines: | |
| line = line.strip() | |
| if not line or line.startswith(('import ', 'from ', '#', 'print', 'return', 'def ', 'class ', 'try:', 'except', 'with ')): | |
| continue | |
| # Skip variable declarations | |
| if re.search(r'=\s*Quantum', line): | |
| continue | |
| # Detect measurement | |
| if re.match(r'measure\(', line): | |
| has_measurement = True | |
| # Parse what is being measured | |
| match = re.match(r'measure\(([^)]+)\)', line) | |
| if match: | |
| target = match.group(1).strip() | |
| qubits = resolve_qubit(target) | |
| if isinstance(qubits, list): | |
| for q in qubits: | |
| gate_operations.append(f"measure q[{q}] -> c[{q}];") | |
| elif qubits is not None: | |
| gate_operations.append(f"measure q[{qubits}] -> c[{qubits}];") | |
| continue | |
| # Single qubit gates: h(qv[0]), x(qv), etc. | |
| single_gate = re.match(r'(h|x|y|z|s|t)\(([^)]+)\)', line) | |
| if single_gate: | |
| gate, var_expr = single_gate.groups() | |
| var_expr = var_expr.strip() | |
| qubits = resolve_qubit(var_expr) | |
| if isinstance(qubits, list): | |
| for q in qubits: | |
| gate_operations.append(f"{gate} q[{q}];") | |
| elif qubits is not None: | |
| gate_operations.append(f"{gate} q[{qubits}];") | |
| continue | |
| # Rotation gates: rx(angle, qv[0]) - Qrisp uses rx(angle, qubit) | |
| rot_gate = re.match(r'(rx|ry|rz)\(([^,]+),\s*([^)]+)\)', line) | |
| if rot_gate: | |
| gate, angle, var_expr = rot_gate.groups() | |
| var_expr = var_expr.strip() | |
| qubits = resolve_qubit(var_expr) | |
| # Capitalize the R as requested | |
| gate_cap = gate.capitalize() # rx -> Rx | |
| if isinstance(qubits, list): | |
| for q in qubits: | |
| gate_operations.append(f"{gate_cap}({angle}) q[{q}];") | |
| elif qubits is not None: | |
| gate_operations.append(f"{gate_cap}({angle}) q[{qubits}];") | |
| continue | |
| # CNOT/CX: cx(qv[0], qv[1]) | |
| cnot_gate = re.match(r'cx\(([^,]+),\s*([^)]+)\)', line) | |
| if cnot_gate: | |
| ctrl_expr, target_expr = cnot_gate.groups() | |
| ctrl = resolve_qubit(ctrl_expr.strip()) | |
| target = resolve_qubit(target_expr.strip()) | |
| if isinstance(ctrl, list): | |
| ctrl = ctrl[0] if ctrl else None | |
| if isinstance(target, list): | |
| target = target[0] if target else None | |
| if ctrl is not None and target is not None: | |
| gate_operations.append(f"cx q[{ctrl}],q[{target}];") | |
| continue | |
| # CZ: cz(qv[0], qv[1]) | |
| cz_gate = re.match(r'cz\(([^,]+),\s*([^)]+)\)', line) | |
| if cz_gate: | |
| q1_expr, q2_expr = cz_gate.groups() | |
| q1 = resolve_qubit(q1_expr.strip()) | |
| q2 = resolve_qubit(q2_expr.strip()) | |
| if isinstance(q1, list): | |
| q1 = q1[0] if q1 else None | |
| if isinstance(q2, list): | |
| q2 = q2[0] if q2 else None | |
| if q1 is not None and q2 is not None: | |
| gate_operations.append(f"cz q[{q1}],q[{q2}];") | |
| continue | |
| # CCNOT/Toffoli: ccx(qv[0], qv[1], qv[2]) | |
| ccnot_gate = re.match(r'ccx\(([^,]+),\s*([^,]+),\s*([^)]+)\)', line) | |
| if ccnot_gate: | |
| c1_expr, c2_expr, target_expr = ccnot_gate.groups() | |
| c1 = resolve_qubit(c1_expr.strip()) | |
| c2 = resolve_qubit(c2_expr.strip()) | |
| target = resolve_qubit(target_expr.strip()) | |
| if isinstance(c1, list): | |
| c1 = c1[0] if c1 else None | |
| if isinstance(c2, list): | |
| c2 = c2[0] if c2 else None | |
| if isinstance(target, list): | |
| target = target[0] if target else None | |
| if all(x is not None for x in [c1, c2, target]): | |
| gate_operations.append(f"ccx q[{c1}],q[{c2}],q[{target}];") | |
| continue | |
| # Build QASM output | |
| qasm_lines = [ | |
| "OPENQASM 2.0;", | |
| 'include "qelib1.inc";', | |
| f"qreg q[{total_qubits}];" | |
| ] | |
| if has_measurement: | |
| qasm_lines.append(f"creg c[{total_qubits}];") | |
| qasm_lines.extend(gate_operations) | |
| return "\n".join(qasm_lines) | |
| def execute_quantum_code(code: str, request: gr.Request = None) -> str: | |
| """Execute quantum code using concurrent request infrastructure""" | |
| session_id = get_user_session_id(request) | |
| try: | |
| if 'OPENQASM' in code or 'qreg' in code: | |
| code = qasm_to_qreg(code) | |
| result = make_concurrent_request( | |
| f"{BASE_URL}/script", | |
| method="POST", | |
| data={"code": code, "session_id": session_id}, | |
| timeout=300, | |
| use_cache=False # Don't cache quantum executions (non-deterministic) | |
| ) | |
| if result.get("success"): | |
| return result.get("output", "No output") | |
| return f"Error: {result.get('error', 'Unknown error')}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| # NEW: Execute QASM directly on QPU-1 | |
| def execute_qasm_code(qasm_code: str, request: gr.Request = None) -> str: | |
| """Execute QASM code on QPU-1 (converts to Qreg internally then sends)""" | |
| if not qasm_code or qasm_code.startswith("Error:"): | |
| return "Error: Invalid QASM code" | |
| return execute_quantum_code(qasm_code, request) | |
| def execute_qasm_file(file, request: gr.Request = None) -> str: | |
| """Execute QASM file using concurrent infrastructure""" | |
| session_id = get_user_session_id(request) | |
| try: | |
| if file is None: | |
| return "Error: No file uploaded" | |
| if hasattr(file, 'name'): | |
| with open(file.name, 'r') as f: | |
| qasm_code = f.read() | |
| else: | |
| qasm_code = file | |
| qreg_code = qasm_to_qreg(qasm_code) | |
| result = make_concurrent_request( | |
| f"{BASE_URL}/script", | |
| method="POST", | |
| data={"code": qreg_code, "session_id": session_id}, | |
| timeout=300, | |
| use_cache=False | |
| ) | |
| if result.get("success"): | |
| return result.get("output", "No output") | |
| return f"Error: {result.get('error', 'Unknown error')}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def create_circuit(num_qubits: str = "2", seed: str = "", request: gr.Request = None) -> str: | |
| """Create circuit using concurrent infrastructure""" | |
| session_id = get_user_session_id(request) | |
| try: | |
| n = int(num_qubits) | |
| payload = {"num_qubits": n} | |
| if seed: | |
| payload["seed"] = int(seed) | |
| result = make_concurrent_request( | |
| f"{BASE_URL}/circuit", | |
| method="POST", | |
| data=payload, | |
| timeout=30, | |
| use_cache=False | |
| ) | |
| if "id" in result: | |
| circuit_id = result["id"] | |
| with session_lock: | |
| sessions[session_id]['circuits'][circuit_id] = { | |
| 'num_qubits': n, | |
| 'created': datetime.now() | |
| } | |
| return f"Circuit created: {circuit_id}\nQubits: {result['num_qubits']}" | |
| return f"Error: {result.get('error', 'Unknown error')}" | |
| except ValueError: | |
| return "Error: num_qubits and seed must be valid integers" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def apply_gate(circuit_id: str, gate: str, params: str, request: gr.Request = None) -> str: | |
| """Apply gate using concurrent infrastructure""" | |
| try: | |
| gate_params = json.loads(params) if params else {} | |
| gate_params["gate"] = gate | |
| result = make_concurrent_request( | |
| f"{BASE_URL}/circuit/{circuit_id}/gate", | |
| method="POST", | |
| data=gate_params, | |
| timeout=30, | |
| use_cache=False | |
| ) | |
| if result.get("success"): | |
| return f"Gate applied: {result.get('gate', gate)}" | |
| return f"Error: {result.get('error', 'Unknown error')}" | |
| except json.JSONDecodeError: | |
| return "Error: Invalid JSON in parameters" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def measure_circuit(circuit_id: str, qubit: str = "", request: gr.Request = None) -> str: | |
| """Measure circuit using concurrent infrastructure""" | |
| try: | |
| if qubit: | |
| endpoint = f"{BASE_URL}/circuit/{circuit_id}/measure/{qubit}" | |
| else: | |
| endpoint = f"{BASE_URL}/circuit/{circuit_id}/measure" | |
| result = make_concurrent_request( | |
| endpoint, | |
| method="POST", | |
| timeout=30, | |
| use_cache=False | |
| ) | |
| if "result" in result: | |
| return f"Measurement: {result['result']}" | |
| elif "qubit" in result and "result" in result: | |
| return f"Qubit {result['qubit']}: {result['result']}" | |
| return f"Error: {result.get('error', 'Unknown error')}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def get_circuit_state(circuit_id: str, request: gr.Request = None) -> str: | |
| """Get circuit state using concurrent infrastructure""" | |
| try: | |
| result = make_concurrent_request( | |
| f"{BASE_URL}/circuit/{circuit_id}/state", | |
| method="GET", | |
| timeout=30, | |
| use_cache=True | |
| ) | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def delete_circuit(circuit_id: str, request: gr.Request = None) -> str: | |
| """Delete circuit using concurrent infrastructure""" | |
| session_id = get_user_session_id(request) | |
| try: | |
| result = make_concurrent_request( | |
| f"{BASE_URL}/circuit/{circuit_id}", | |
| method="DELETE", | |
| timeout=30, | |
| use_cache=False | |
| ) | |
| if result.get("status_code") == 204 or result.get("success"): | |
| with session_lock: | |
| if circuit_id in sessions[session_id]['circuits']: | |
| del sessions[session_id]['circuits'][circuit_id] | |
| return f"Circuit {circuit_id} deleted successfully" | |
| return f"Error: {result.get('error', 'Unknown error')}" | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def list_circuits(request: gr.Request = None) -> str: | |
| """List circuits using concurrent infrastructure with caching""" | |
| try: | |
| result = make_concurrent_request( | |
| f"{BASE_URL}/circuits", | |
| method="GET", | |
| timeout=30, | |
| use_cache=True | |
| ) | |
| return json.dumps(result, indent=2) | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def create_bell_state() -> str: | |
| code = "q = Qreg(2)\nq.H(0)\nq.CNOT(0, 1)\nprint(q.measure())" | |
| return execute_quantum_code(code) | |
| def create_superposition(num_qubits: str = "10") -> str: | |
| try: | |
| n = int(num_qubits) | |
| if n < 1 or n > 1000000: | |
| return "Error: num_qubits must be between 1 and 1,000,000" | |
| code = f"q = Qreg({n})\nq.H_all()\nresult = q.measure()\nprint(f'Result: {{result[:50]}}')\nprint(f'Ones: {{result.count(\"1\")}}/{n}')" | |
| return execute_quantum_code(code) | |
| except ValueError: | |
| return "Error: num_qubits must be a valid integer" | |
| def test_bell_correlation(trials: str = "100") -> str: | |
| try: | |
| n = int(trials) | |
| code = f"""n={n} | |
| corr=0 | |
| for _ in range(n): | |
| q=Qreg(2) | |
| q.H(0) | |
| q.CNOT(0,1) | |
| r=q.measure() | |
| if r in['00','11']: | |
| corr+=1 | |
| print(f'Correlation: {{corr/n*100:.1f}}%')""" | |
| return execute_quantum_code(code) | |
| except ValueError: | |
| return "Error: trials must be a valid integer" | |
| def test_ghz_state(num_qubits: str = "3") -> str: | |
| try: | |
| n = int(num_qubits) | |
| if n < 2: | |
| return "Error: need at least 2 qubits for GHZ" | |
| cnot_chain = "\n".join([f" q.CNOT(0, {i})" for i in range(1, n)]) | |
| code = f"""q = Qreg({n}) | |
| q.H(0) | |
| {cnot_chain} | |
| result = q.measure() | |
| print(f'GHZ-{n}: {{result}}')""" | |
| return execute_quantum_code(code) | |
| except ValueError: | |
| return "Error: num_qubits must be a valid integer" | |
| def benchmark_gate_speed() -> str: | |
| code = """import time | |
| q = Qreg(100000) | |
| start = time.time() | |
| for _ in range(1000): | |
| q.H(0) | |
| elapsed = time.time() - start | |
| print(f'Gate speed: {1000/elapsed/1e6:.0f}M gates/s')""" | |
| return execute_quantum_code(code) | |
| def test_long_range_entanglement(num_qubits: str = "1000000") -> str: | |
| try: | |
| n = int(num_qubits) | |
| code = f"""import time | |
| q = Qreg({n}) | |
| q.H(0) | |
| q.CNOT(0, {n-1}) | |
| start = time.time() | |
| result = q.measure() | |
| elapsed = time.time() - start | |
| print(f'Measurement: {{elapsed*1000:.1f}}ms') | |
| print(f'Qubit 0: {{result[0]}}') | |
| print(f'Qubit {n-1}: {{result[{n-1}]}}') | |
| print(f'Correlated: {{result[0] == result[{n-1}]}}')""" | |
| return execute_quantum_code(code) | |
| except ValueError: | |
| return "Error: num_qubits must be a valid integer" | |
| def get_qpu_health() -> str: | |
| """Get QPU health using concurrent infrastructure with caching""" | |
| try: | |
| health = make_concurrent_request( | |
| f"{BASE_URL}/health", | |
| method="GET", | |
| timeout=5, | |
| use_cache=True | |
| ) | |
| cleanup_expired_sessions() | |
| with session_lock: | |
| health['active_users'] = len(sessions) | |
| health['total_requests'] = sum(s.get('request_count', 0) for s in sessions.values()) | |
| health['cache_size'] = len(request_cache) | |
| health['thread_pool_workers'] = executor._max_workers | |
| return json.dumps(health, indent=2) | |
| except Exception as e: | |
| return json.dumps({"error": str(e)}, indent=2) | |
| def quantum_circuit_prompt(operation: str, num_qubits: str = "2") -> str: | |
| prompts = { | |
| "bell": f"Create a Bell state with {num_qubits} qubits showing maximum entanglement", | |
| "ghz": f"Create a GHZ state with {num_qubits} qubits where all qubits are entangled", | |
| "superposition": f"Put {num_qubits} qubits into equal superposition using Hadamard gates", | |
| "teleportation": f"Implement quantum teleportation protocol using {num_qubits} qubits" | |
| } | |
| return prompts.get(operation, f"Create a quantum circuit with {num_qubits} qubits") | |
| EXAMPLES = { | |
| "Bell State": "q = Qreg(2)\nq.H(0)\nq.CNOT(0, 1)\nprint(q.measure())", | |
| "Superposition": "q = Qreg(10)\nq.H_all()\nprint(q.measure())", | |
| "GHZ-3": "q = Qreg(3)\nq.H(0)\nq.CNOT(0, 1)\nq.CNOT(0, 2)\nprint(q.measure())", | |
| "Rotation Gates": "q = Qreg(1)\nq.Rx(0, 1.5708)\nq.Ry(0, 3.1416)\nq.Rz(0, 0.7854)\nprint(q.measure())", | |
| "1M Qubits": "import time\nq = Qreg(1000000)\nstart = time.time()\nq.H_all()\nresult = q.measure()\nprint(f'{time.time()-start:.3f}s')", | |
| "Correlation Test": "n=100\nc=0\nfor _ in range(n):\n q=Qreg(2);q.H(0);q.CNOT(0,1)\n if q.measure() in['00','11']:c+=1\nprint(f'{c/n*100:.0f}%')", | |
| "QASM Bell": "OPENQASM 2.0;\ninclude \"qelib1.inc\";\nqreg q[2];\nh q[0];\ncx q[0],q[1];", | |
| } | |
| with gr.Blocks(title="QPU-1 by Lap Quantum", css=CUSTOM_CSS, theme=gr.themes.Base( | |
| primary_hue="purple", | |
| font=gr.themes.GoogleFont("Inter") | |
| )) as app: | |
| gr.Markdown(""" | |
| # 🔮 QPU-1 Quantum Processing Unit | |
| ## Lap Quantum™ | Part of Lap Technologies | |
| **1,000,000+ qubits** • **MCP-enabled** • **QASM compatible** • **REST API** • **⚡ Concurrent Processing** | |
| """) | |
| with gr.Tab("⚡ Execute"): | |
| code_input = gr.Code( | |
| label="Quantum Script (Qreg or QASM)", | |
| language="python", | |
| lines=14, | |
| value=EXAMPLES["Bell State"] | |
| ) | |
| with gr.Row(): | |
| execute_btn = gr.Button("▶️ Execute", variant="primary", size="lg", elem_classes=["primary-btn"]) | |
| clear_btn = gr.Button("🗑️ Clear", size="lg") | |
| result_output = gr.Textbox(label="Output", lines=8) | |
| gr.Markdown("### 📚 Examples") | |
| with gr.Row(): | |
| for name in EXAMPLES.keys(): | |
| gr.Button(name, size="sm").click( | |
| fn=lambda n=name: EXAMPLES[n], | |
| outputs=code_input | |
| ) | |
| execute_btn.click( | |
| fn=execute_quantum_code, | |
| inputs=code_input, | |
| outputs=result_output, | |
| api_name="execute" | |
| ) | |
| clear_btn.click(fn=lambda: ("", ""), outputs=[code_input, result_output]) | |
| # NEW: Transpiler Tab (Qrisp to QASM) | |
| with gr.Tab("🔄 Transpiler"): | |
| gr.Markdown(""" | |
| ### Qrisp ↔ QASM Transpiler | |
| Convert Qrisp high-level Python code to OpenQASM 2.0 without simulation, then execute on QPU-1. | |
| Supports QuantumVariable, QuantumFloat, QuantumArray, and standard Qrisp gates. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| qrisp_input = gr.Code( | |
| label="Qrisp Input (Python)", | |
| language="python", | |
| lines=12, | |
| value="""from qrisp import QuantumVariable, h, cx, measure | |
| qv = QuantumVariable(2) | |
| h(qv[0]) | |
| cx(qv[0], qv[1]) | |
| measure(qv)""" | |
| ) | |
| with gr.Row(): | |
| transpile_btn = gr.Button("🔄 Transpile to QASM", variant="primary", elem_classes=["primary-btn"]) | |
| execute_qasm_btn = gr.Button("▶️ Execute QASM on QPU-1", variant="secondary") | |
| with gr.Column(): | |
| qasm_output = gr.Code( | |
| label="Generated OpenQASM 2.0", | |
| language="javascript", | |
| lines=12, | |
| value="// QASM will appear here..." | |
| ) | |
| transpiler_result = gr.Textbox(label="Execution Result", lines=4) | |
| # Wire up the transpile button | |
| transpile_btn.click( | |
| fn=qrisp_to_qasm, | |
| inputs=qrisp_input, | |
| outputs=qasm_output, | |
| api_name="transpile_qrisp_to_qasm" | |
| ) | |
| # Wire up execute button (takes QASM output and runs it) | |
| execute_qasm_btn.click( | |
| fn=execute_qasm_code, | |
| inputs=qasm_output, | |
| outputs=transpiler_result, | |
| api_name="execute_transpiled_qasm" | |
| ) | |
| gr.Markdown(""" | |
| **Supported Qrisp Syntax:** | |
| - **Variables**: `QuantumVariable(n)`, `QuantumFloat(n)`, `QuantumArray(qtype, shape=(...))` | |
| - **Single-qubit**: `h(qv)`, `x(qv[0])`, `y(qv)`, `z(qv[2])`, `s(qv)`, `t(qv)` | |
| - **Rotation gates**: `rx(angle, qv)`, `ry(angle, qv[0])`, `rz(angle, qv)` → **Capitalized in QASM: Rx, Ry, Rz** | |
| - **Two-qubit**: `cx(qv[0], qv[1])`, `cz(qv[0], qv[1])` | |
| - **Three-qubit**: `ccx(qv[0], qv[1], qv[2])` (Toffoli) | |
| - **Measurement**: `measure(qv)` (expands to measure all qubits) | |
| **Note:** This is a pure transpiler - no quantum simulation happens during conversion. | |
| """) | |
| with gr.Tab("🔧 Circuit Builder"): | |
| gr.Markdown(""" | |
| ### Step-by-step Circuit Construction | |
| Build quantum circuits incrementally using the REST API endpoints. | |
| """) | |
| with gr.Group(): | |
| gr.Markdown("#### 1. Create Circuit") | |
| with gr.Row(): | |
| circuit_qubits = gr.Textbox(value="2", label="Number of Qubits") | |
| circuit_seed = gr.Textbox(value="", label="Seed (optional)", placeholder="42") | |
| create_circuit_btn = gr.Button("Create Circuit", variant="primary") | |
| circuit_output = gr.Textbox(label="Circuit ID", lines=2) | |
| create_circuit_btn.click( | |
| fn=create_circuit, | |
| inputs=[circuit_qubits, circuit_seed], | |
| outputs=circuit_output, | |
| api_name="create_circuit" | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("#### 2. Apply Gates") | |
| circuit_id_input = gr.Textbox(label="Circuit ID", placeholder="550e8400-e29b-41d4-a716-446655440000") | |
| with gr.Row(): | |
| gate_select = gr.Dropdown( | |
| choices=["H", "X", "Y", "Z", "S", "T", "CNOT", "CZ", "CCNOT", "Rx", "Ry", "Rz"], | |
| value="H", | |
| label="Gate" | |
| ) | |
| gate_params = gr.Textbox( | |
| label="Parameters (JSON)", | |
| placeholder='{"qubit": 0} or {"control": 0, "target": 1}', | |
| value='{"qubit": 0}' | |
| ) | |
| apply_gate_btn = gr.Button("Apply Gate", variant="secondary") | |
| gate_output = gr.Textbox(label="Result", lines=2) | |
| apply_gate_btn.click( | |
| fn=apply_gate, | |
| inputs=[circuit_id_input, gate_select, gate_params], | |
| outputs=gate_output, | |
| api_name="apply_gate" | |
| ) | |
| gr.Markdown(""" | |
| **Parameter formats:** | |
| - Single-qubit: `{"qubit": 0}` | |
| - Rotation: `{"qubit": 0, "theta": 1.5708}` | |
| - CNOT: `{"control": 0, "target": 1}` | |
| - CZ: `{"qubit1": 0, "qubit2": 1}` | |
| - CCNOT: `{"control1": 0, "control2": 1, "target": 2}` | |
| """) | |
| with gr.Group(): | |
| gr.Markdown("#### 3. Measure") | |
| with gr.Row(): | |
| measure_circuit_id = gr.Textbox(label="Circuit ID") | |
| measure_qubit = gr.Textbox(label="Qubit (empty for all)", placeholder="0") | |
| measure_btn = gr.Button("Measure", variant="secondary") | |
| measure_output = gr.Textbox(label="Result", lines=2) | |
| measure_btn.click( | |
| fn=measure_circuit, | |
| inputs=[measure_circuit_id, measure_qubit], | |
| outputs=measure_output, | |
| api_name="measure_circuit" | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("#### 4. Circuit Management") | |
| with gr.Row(): | |
| state_circuit_id = gr.Textbox(label="Circuit ID (for state)") | |
| delete_circuit_id = gr.Textbox(label="Circuit ID (for delete)") | |
| with gr.Row(): | |
| state_btn = gr.Button("Get State", variant="secondary") | |
| delete_btn = gr.Button("Delete Circuit", variant="secondary") | |
| list_btn = gr.Button("List All Circuits", variant="secondary") | |
| management_output = gr.Textbox(label="Result", lines=6) | |
| state_btn.click( | |
| fn=get_circuit_state, | |
| inputs=state_circuit_id, | |
| outputs=management_output, | |
| api_name="get_circuit_state" | |
| ) | |
| delete_btn.click( | |
| fn=delete_circuit, | |
| inputs=delete_circuit_id, | |
| outputs=management_output, | |
| api_name="delete_circuit" | |
| ) | |
| list_btn.click( | |
| fn=list_circuits, | |
| outputs=management_output, | |
| api_name="list_circuits" | |
| ) | |
| with gr.Tab("📄 QASM Upload"): | |
| gr.Markdown(""" | |
| ### Upload Large QASM Files | |
| For huge QASM files like **Quandoom** (70K qubits, 80M gates), upload the .qasm file directly. | |
| The transpiler will convert it to Qreg and execute on QPU-1. | |
| """) | |
| qasm_file = gr.File(label="Upload QASM File", file_types=[".qasm", ".txt"]) | |
| with gr.Row(): | |
| qasm_execute_btn = gr.Button("▶️ Execute QASM File", variant="primary", size="lg", elem_classes=["primary-btn"]) | |
| qasm_result = gr.Textbox(label="Output", lines=10) | |
| gr.Markdown(""" | |
| **Supported Gates:** h, x, y, z, t, s, cx/cnot, cz, ccx/toffoli, rx, ry, rz | |
| **Example:** Upload the Quandoom QASM file to run DOOM on QPU-1! | |
| """) | |
| qasm_execute_btn.click( | |
| fn=execute_qasm_file, | |
| inputs=qasm_file, | |
| outputs=qasm_result, | |
| api_name="execute_qasm_file" | |
| ) | |
| with gr.Tab("🔧 MCP Tools"): | |
| gr.Markdown(""" | |
| ### Quantum Computing Tools | |
| These functions are exposed as MCP tools for AI assistants: | |
| """) | |
| with gr.Group(): | |
| gr.Markdown("#### Bell State Generator") | |
| bell_btn = gr.Button("Generate Bell State", variant="secondary") | |
| bell_output = gr.Textbox(label="Result", lines=3) | |
| bell_btn.click(fn=create_bell_state, outputs=bell_output, api_name="create_bell_state") | |
| with gr.Group(): | |
| gr.Markdown("#### Superposition Creator") | |
| super_qubits = gr.Textbox(value="10", label="Number of Qubits") | |
| super_btn = gr.Button("Create Superposition", variant="secondary") | |
| super_output = gr.Textbox(label="Result", lines=3) | |
| super_btn.click(fn=create_superposition, inputs=super_qubits, outputs=super_output, api_name="create_superposition") | |
| with gr.Group(): | |
| gr.Markdown("#### Bell Correlation Tester") | |
| trials_input = gr.Textbox(value="100", label="Number of Trials") | |
| corr_btn = gr.Button("Test Correlation", variant="secondary") | |
| corr_output = gr.Textbox(label="Result", lines=3) | |
| corr_btn.click(fn=test_bell_correlation, inputs=trials_input, outputs=corr_output, api_name="test_bell_correlation") | |
| with gr.Group(): | |
| gr.Markdown("#### GHZ State Creator") | |
| ghz_qubits = gr.Textbox(value="3", label="Number of Qubits") | |
| ghz_btn = gr.Button("Create GHZ State", variant="secondary") | |
| ghz_output = gr.Textbox(label="Result", lines=3) | |
| ghz_btn.click(fn=test_ghz_state, inputs=ghz_qubits, outputs=ghz_output, api_name="test_ghz_state") | |
| with gr.Group(): | |
| gr.Markdown("#### Long-Range Entanglement Test") | |
| lre_qubits = gr.Textbox(value="1000000", label="Number of Qubits") | |
| lre_btn = gr.Button("Test Entanglement", variant="secondary") | |
| lre_output = gr.Textbox(label="Result", lines=4) | |
| lre_btn.click(fn=test_long_range_entanglement, inputs=lre_qubits, outputs=lre_output, api_name="test_long_range_entanglement") | |
| with gr.Group(): | |
| gr.Markdown("#### Benchmark") | |
| bench_btn = gr.Button("Run Gate Speed Benchmark", variant="secondary") | |
| bench_output = gr.Textbox(label="Result", lines=3) | |
| bench_btn.click(fn=benchmark_gate_speed, outputs=bench_output, api_name="benchmark_gate_speed") | |
| with gr.Group(): | |
| gr.Markdown("#### QPU Health") | |
| health_btn = gr.Button("Check QPU Health", variant="secondary") | |
| health_output = gr.Textbox(label="Result", lines=8) | |
| health_btn.click(fn=get_qpu_health, outputs=health_output, api_name="get_qpu_health") | |
| with gr.Tab("📖 MCP Configuration"): | |
| gr.Markdown(f""" | |
| ### Connect to Claude Desktop / Cursor / Cline | |
| Add this to your MCP client configuration: | |
| ```json | |
| {{ | |
| "mcpServers": {{ | |
| "qpu-1": {{ | |
| "url": "{BASE_URL}/gradio_api/mcp/" | |
| }} | |
| }} | |
| }} | |
| ``` | |
| For private spaces, add your HF token: | |
| ```json | |
| {{ | |
| "mcpServers": {{ | |
| "qpu-1": {{ | |
| "url": "{BASE_URL}/gradio_api/mcp/", | |
| "headers": {{ | |
| "Authorization": "Bearer YOUR_HF_TOKEN" | |
| }} | |
| }} | |
| }} | |
| }} | |
| ``` | |
| ### Available MCP Tools | |
| **Script Execution:** | |
| - `execute_quantum_code` - Execute Qreg or QASM code | |
| - `execute_qasm_file` - Execute QASM file | |
| - `qrisp_to_qasm` - Transpile Qrisp code to OpenQASM 2.0 (NEW) | |
| - `execute_transpiled_qasm` - Execute generated QASM on QPU-1 (NEW) | |
| **Circuit Building:** | |
| - `create_circuit` - Create new quantum circuit | |
| - `apply_gate` - Apply gate to circuit | |
| - `measure_circuit` - Measure circuit qubits | |
| - `get_circuit_state` - Get circuit state | |
| - `delete_circuit` - Delete circuit | |
| - `list_circuits` - List all circuits | |
| **Quick Operations:** | |
| - `create_bell_state` - Create entangled Bell state | |
| - `create_superposition` - Create N-qubit superposition | |
| - `test_bell_correlation` - Verify quantum entanglement | |
| - `test_ghz_state` - Create GHZ state | |
| - `test_long_range_entanglement` - Test long-range entanglement | |
| - `benchmark_gate_speed` - Measure gate throughput | |
| - `get_qpu_health` - QPU-1 health status | |
| ### QASM Support | |
| QPU-1 supports OpenQASM 2.0 input. Paste QASM code directly and it will be transpiled to Qreg automatically. | |
| **Supported gates:** h, x, y, z, t, s, cx/cnot, cz, ccnot/ccx/toffoli, rx, ry, rz | |
| ### API Documentation | |
| Full REST API documentation available at: `{BASE_URL}/` | |
| **Available Gates:** | |
| - Single-qubit: H, X, Y, Z, S, T, Rx, Ry, Rz | |
| - Two-qubit: CNOT, CZ | |
| - Three-qubit: CCNOT (Toffoli) | |
| - Batch: H_all (Hadamard on all qubits) | |
| **Specifications:** | |
| - Max Qubits: 1,000,000+ | |
| - Gate Throughput: 186M gates/second | |
| - Topology: Full mesh (any-to-any connectivity) | |
| - **Concurrent Processing**: Up to 500 parallel requests | |
| - **Connection Pooling**: 1000 max connections | |
| - **Response Caching**: 60-second TTL for identical requests | |
| """) | |
| if __name__ == "__main__": | |
| app.launch( | |
| mcp_server=True, | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) |