Spaces:
Running
Running
| import os | |
| import traceback | |
| import json | |
| import logging | |
| from flask import Flask, request, render_template, jsonify, session, abort, send_from_directory | |
| from flask_socketio import SocketIO, emit | |
| from werkzeug.middleware.proxy_fix import ProxyFix | |
| import signal | |
| from functools import wraps | |
| #import threading | |
| #import multiprocessing | |
| import concurrent.futures | |
| from datetime import datetime | |
| import psutil | |
| import re | |
| from flask_cors import CORS | |
| # Import simulation functions from plugins | |
| from plugins.authentication.auth import generate_quantum_fingerprint_cirq | |
| from plugins.encryption_bb84.bb84 import bb84_protocol_cirq | |
| from plugins.error_correction.shor_code import run_shor_code | |
| from plugins.grover.grover import run_grover | |
| from plugins.handshake.handshake import handshake_cirq | |
| from plugins.network.network import entanglement_swapping_cirq | |
| from plugins.qrng.qrng import generate_random_number_cirq | |
| from plugins.quantum_decryption.quantum_decryption import grover_key_search, shor_factorization | |
| from plugins.teleportation.teleport import teleportation_circuit | |
| from plugins.variational.vqe import run_vqe | |
| from plugins.deutsch_jozsa.deutsch_jozsa import deutsch_jozsa_cirq | |
| from plugins.quantum_fourier.qft import run_qft | |
| from plugins.phase_estimation.phase_estimation import run_phase_estimation | |
| from plugins.optimization.qaoa import run_qaoa | |
| # Configure Errors | |
| class SimulationError(Exception): | |
| """Base class for simulation errors""" | |
| def __init__(self, message, param_info=None, suggestion=None): | |
| self.message = message | |
| self.param_info = param_info | |
| self.suggestion = suggestion | |
| super().__init__(self.message) | |
| class ParameterError(SimulationError): | |
| """Error for invalid simulation parameters""" | |
| pass | |
| class QuantumCircuitError(SimulationError): | |
| """Error in quantum circuit construction or execution""" | |
| pass | |
| class ResourceExceededError(SimulationError): | |
| """Error when simulation exceeds available resources""" | |
| pass | |
| def configure_logging(): | |
| # Use environment variable to determine log directory, fallback to tmp directory for containerized environments | |
| log_dir = os.environ.get('LOG_DIR', '/tmp' if os.path.exists('/tmp') else '.') | |
| # Create logs directory if it doesn't exist and we have permission | |
| try: | |
| if not os.path.exists(log_dir): | |
| os.makedirs(log_dir, exist_ok=True) | |
| # Test if we can write to the directory | |
| test_file = os.path.join(log_dir, '.test_write_access') | |
| with open(test_file, 'w') as f: | |
| f.write('test') | |
| os.remove(test_file) | |
| except Exception as e: | |
| # Fallback to current directory if we can't write to the log directory | |
| log_dir = '.' | |
| try: | |
| if not os.path.exists(log_dir): | |
| os.makedirs(log_dir, exist_ok=True) | |
| except Exception: | |
| # If we still can't create directory, use current directory | |
| log_dir = '.' | |
| # Create log file with date-based naming | |
| log_filename = f"{log_dir}/quantum_field_kit_{datetime.now().strftime('%Y-%m-%d')}.log" | |
| # Configure root logger | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", | |
| handlers=[ | |
| # Console handler for immediate feedback (always works) | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| # Get logger instance | |
| logger = logging.getLogger('quantum_field_kit') | |
| # Try to add file handler, but don't fail if we can't write to file | |
| try: | |
| file_handler = logging.FileHandler(log_filename) | |
| logging.getLogger().addHandler(file_handler) | |
| logger.info(f"File logging configured successfully. Log file: {log_filename}") | |
| except Exception as e: | |
| # In containerized environments, we might not be able to write files | |
| # Just continue with console logging only | |
| logger.warning(f"File logging not available: {e}. Using console logging only.") | |
| # Set specific logger levels | |
| logging.getLogger('werkzeug').setLevel(logging.WARNING) | |
| logging.getLogger('socketio').setLevel(logging.WARNING) | |
| return logger | |
| # Create application logger | |
| logger = configure_logging() | |
| # Configure matplotlib for containerized environments | |
| # Set MPLCONFIGDIR to a writable directory to avoid permission errors | |
| def setup_matplotlib_config(): | |
| """Setup matplotlib configuration directory with proper permissions""" | |
| mpl_config_dir = os.environ.get('MPLCONFIGDIR', '/tmp/matplotlib' if os.path.exists('/tmp') else './.matplotlib') | |
| try: | |
| # Create the directory with exist_ok to avoid errors if it already exists | |
| os.makedirs(mpl_config_dir, exist_ok=True) | |
| # Test if we can write to the directory | |
| test_file = os.path.join(mpl_config_dir, '.test_write_access') | |
| with open(test_file, 'w') as f: | |
| f.write('test') | |
| os.remove(test_file) | |
| # Set the environment variable | |
| os.environ['MPLCONFIGDIR'] = mpl_config_dir | |
| logger.info(f"Matplotlib config directory set to: {mpl_config_dir}") | |
| return True | |
| except Exception as e: | |
| logger.warning(f"Could not configure matplotlib directory {mpl_config_dir}: {e}") | |
| # Try fallback directories | |
| fallback_dirs = ['./.matplotlib', '.', '/tmp'] | |
| for fallback_dir in fallback_dirs: | |
| try: | |
| os.makedirs(fallback_dir, exist_ok=True) | |
| test_file = os.path.join(fallback_dir, '.test_write_access') | |
| with open(test_file, 'w') as f: | |
| f.write('test') | |
| os.remove(test_file) | |
| os.environ['MPLCONFIGDIR'] = fallback_dir | |
| logger.info(f"Matplotlib config directory set to fallback: {fallback_dir}") | |
| return True | |
| except Exception as fallback_e: | |
| logger.warning(f"Fallback directory {fallback_dir} also failed: {fallback_e}") | |
| continue | |
| # If all else fails, let matplotlib handle it | |
| logger.warning("All attempts to set matplotlib config directory failed. Letting matplotlib use its default.") | |
| return False | |
| # Setup matplotlib configuration | |
| setup_matplotlib_config() | |
| # Initialize Flask application (serve React build in production) | |
| app = Flask(__name__, static_folder='frontend/build', static_url_path='/') | |
| # Configure CORS origins from environment variable | |
| cors_origins = os.environ.get('CORS_ORIGINS', | |
| 'http://localhost:3000,http://127.0.0.1:3000,http://localhost:5000,http://127.0.0.1:5000,https://quantumfieldkit.com,https://www.quantumfieldkit.com,https://quantumfieldkit.fly.dev' | |
| ).split(',') | |
| CORS(app, resources={ | |
| r"/api/*": { | |
| "origins": [origin.strip() for origin in cors_origins], | |
| "methods": ["GET", "POST", "OPTIONS"], | |
| "allow_headers": ["Content-Type"] | |
| } | |
| }) | |
| app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', os.urandom(24)) | |
| app.config['TEMPLATES_AUTO_RELOAD'] = True | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload | |
| # Add proxy fix for proper IP handling behind reverse proxies | |
| app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1) | |
| # Initialize Socket.IO for real-time communication | |
| # Use environment variable to determine if SocketIO should be enabled | |
| socketio_enabled = os.environ.get('SOCKETIO_ENABLED', 'true').lower() == 'true' | |
| if socketio_enabled: | |
| socketio = SocketIO(app, async_mode='eventlet', cors_allowed_origins="*", logger=True, engineio_logger=True) | |
| else: | |
| # Create a mock socketio object for environments where SocketIO is not supported | |
| class MockSocketIO: | |
| def on(self, *args, **kwargs): | |
| def decorator(func): | |
| return func | |
| return decorator | |
| def emit(self, *args, **kwargs): | |
| pass | |
| def run(self, *args, **kwargs): | |
| pass | |
| socketio = MockSocketIO() | |
| def timeout(seconds): | |
| """ | |
| Decorator that adds a timeout to a function. | |
| If the function takes longer than 'seconds' to execute, it will be terminated. | |
| Args: | |
| seconds: Maximum execution time in seconds | |
| Returns: | |
| Decorated function with timeout capability | |
| """ | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| plugin_name = kwargs.get('_plugin_name', 'Unknown plugin') | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: | |
| future = executor.submit(func, *args, **kwargs) | |
| try: | |
| return future.result(timeout=seconds) | |
| except concurrent.futures.TimeoutError: | |
| # More detailed timeout message | |
| return { | |
| "output": None, | |
| "log": f"Simulation for {plugin_name} started but could not complete within {seconds} seconds.\n" | |
| f"This may be due to complex parameters or high precision settings.", | |
| "error": f"Execution timed out after {seconds} seconds. Try reducing complexity of the simulation." | |
| } | |
| return wrapper | |
| return decorator | |
| def json_safe(obj): | |
| """ | |
| Recursively convert non-JSON-serializable objects to strings. | |
| Preserve raw SVG (under the key "circuit_svg") so it is not altered. | |
| """ | |
| try: | |
| json.dumps(obj) | |
| return obj | |
| except (TypeError, OverflowError): | |
| if isinstance(obj, dict): | |
| new_obj = {} | |
| for k, v in obj.items(): | |
| new_obj[k] = v if k == "circuit_svg" else json_safe(v) | |
| return new_obj | |
| elif isinstance(obj, (list, tuple)): | |
| return [json_safe(item) for item in obj] | |
| elif hasattr(obj, 'tolist'): # For numpy arrays | |
| return obj.tolist() | |
| else: | |
| return str(obj) | |
| def check_memory_usage(): | |
| """Check if memory usage is within acceptable limits""" | |
| try: | |
| process = psutil.Process(os.getpid()) | |
| memory_usage = process.memory_info().rss / (1024 * 1024) # MB | |
| memory_percent = process.memory_percent() | |
| if memory_percent > 85: | |
| logger.warning(f"High memory usage detected: {memory_usage:.2f} MB ({memory_percent:.1f}%)") | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error checking memory usage: {e}") | |
| return True # Assume it's safe if we can't check | |
| def wrap_result(sim_result): | |
| """ | |
| Standardize the simulation result into a dictionary with keys: | |
| - "output": All keys except "log" | |
| - "log": Detailed process log | |
| - "error": None if successful, or the error message. | |
| """ | |
| if isinstance(sim_result, dict): | |
| if 'log' in sim_result: | |
| output = {k: v for k, v in sim_result.items() if k != 'log'} | |
| output = json_safe(output) | |
| return {"output": output, "log": sim_result['log'], "error": None} | |
| else: | |
| return {"output": json_safe(sim_result), "log": "", "error": None} | |
| elif isinstance(sim_result, tuple): | |
| if len(sim_result) > 1: | |
| *outputs, log = sim_result | |
| output = outputs[0] if len(outputs) == 1 else outputs | |
| output = json_safe(output) | |
| return {"output": output, "log": log, "error": None} | |
| else: | |
| return {"output": json_safe(sim_result[0]), "log": "", "error": None} | |
| else: | |
| return {"output": json_safe(sim_result), "log": "", "error": None} | |
| def run_plugin(sim_func, **params): | |
| """ | |
| Calls the simulation function with the given parameters. | |
| Returns a standardized result dictionary with improved error handling. | |
| """ | |
| # Check memory before running simulation | |
| if not check_memory_usage(): | |
| return { | |
| "output": None, | |
| "log": "Server is currently experiencing high memory usage. Please try again later.", | |
| "error": "Insufficient memory available for simulation. Try reducing simulation complexity." | |
| } | |
| try: | |
| # Extract plugin key for error reporting but don't pass it to the simulation function | |
| plugin_key = params.pop('_plugin_key', 'unknown_plugin') | |
| # Also remove _plugin_name if it exists (backward compatibility) | |
| params.pop('_plugin_name', None) | |
| logger.info(f"Running {plugin_key} simulation with parameters: {params}") | |
| # Apply timeout to simulation function | |
| # Increased timeout for complex simulations | |
| def run_with_timeout(): | |
| # Don't pass plugin identifiers to the actual simulation function | |
| return sim_func(**params) | |
| sim_result = run_with_timeout() | |
| if isinstance(sim_result, dict) and "error" in sim_result and sim_result["error"] is not None: | |
| # This is a timeout error from our decorator | |
| return sim_result | |
| return wrap_result(sim_result) | |
| except ParameterError as e: | |
| # Handle parameter errors with helpful suggestions | |
| error_msg = f"Parameter error: {e.message}" | |
| if e.param_info: | |
| error_msg += f"\nParameter: {e.param_info}" | |
| if e.suggestion: | |
| error_msg += f"\nSuggestion: {e.suggestion}" | |
| logger.error(f"Parameter error in {plugin_key}: {error_msg}") | |
| return {"output": None, "log": None, "error": error_msg} | |
| except QuantumCircuitError as e: | |
| # Handle quantum circuit specific errors | |
| error_msg = f"Quantum circuit error: {e.message}" | |
| if e.suggestion: | |
| error_msg += f"\nSuggestion: {e.suggestion}" | |
| logger.error(f"Circuit error in {plugin_key}: {error_msg}") | |
| return {"output": None, "log": None, "error": error_msg} | |
| except Exception as e: | |
| # Generic exception handling with improved context | |
| error_type = type(e).__name__ | |
| error_msg = str(e) | |
| stack_trace = traceback.format_exc() | |
| # Create a user-friendly error message | |
| user_error = f"Error ({error_type}): {error_msg}" | |
| # Log the full technical details | |
| logger.error(f"Simulation error in {plugin_key}: {error_type} - {error_msg}\n{stack_trace}") | |
| return { | |
| "output": None, | |
| "log": f"Simulation failed. See error tab for details.", | |
| "error": user_error | |
| } | |
| def validate_parameters(plugin, params): | |
| """Enhanced parameter validation with detailed error messages""" | |
| validated_params = {} | |
| for param in plugin["parameters"]: | |
| param_name = param["name"] | |
| # Check if required parameter is missing | |
| if param_name not in params and "default" not in param: | |
| raise ParameterError( | |
| f"Missing required parameter: {param_name}", | |
| param_info=f"{param_name} ({param['type']})", | |
| suggestion="Please provide a value for this required parameter." | |
| ) | |
| # Use default if parameter is missing | |
| if param_name not in params and "default" in param: | |
| validated_params[param_name] = param["default"] | |
| continue | |
| # Validate parameter based on type | |
| raw_val = params[param_name] | |
| if param["type"] == "int": | |
| try: | |
| val = int(raw_val) | |
| # Check min/max bounds | |
| if "min" in param and val < param["min"]: | |
| raise ParameterError( | |
| f"Value for {param_name} is too small", | |
| param_info=f"{param_name} = {val}", | |
| suggestion=f"Minimum allowed value is {param['min']}." | |
| ) | |
| if "max" in param and val > param["max"]: | |
| raise ParameterError( | |
| f"Value for {param_name} is too large", | |
| param_info=f"{param_name} = {val}", | |
| suggestion=f"Maximum allowed value is {param['max']}." | |
| ) | |
| validated_params[param_name] = val | |
| except ValueError: | |
| raise ParameterError( | |
| f"Invalid integer value for {param_name}", | |
| param_info=f"Received: {raw_val}", | |
| suggestion="Please provide a valid integer value." | |
| ) | |
| elif param["type"] == "float": | |
| try: | |
| val = float(raw_val) | |
| # Check min/max bounds | |
| if "min" in param and val < param["min"]: | |
| raise ParameterError( | |
| f"Value for {param_name} is too small", | |
| param_info=f"{param_name} = {val}", | |
| suggestion=f"Minimum allowed value is {param['min']}." | |
| ) | |
| if "max" in param and val > param["max"]: | |
| raise ParameterError( | |
| f"Value for {param_name} is too large", | |
| param_info=f"{param_name} = {val}", | |
| suggestion=f"Maximum allowed value is {param['max']}." | |
| ) | |
| validated_params[param_name] = val | |
| except ValueError: | |
| raise ParameterError( | |
| f"Invalid float value for {param_name}", | |
| param_info=f"Received: {raw_val}", | |
| suggestion="Please provide a valid decimal number." | |
| ) | |
| elif param["type"] == "bool": | |
| if isinstance(raw_val, bool): | |
| validated_params[param_name] = raw_val | |
| elif isinstance(raw_val, str): | |
| validated_params[param_name] = raw_val.lower() == "true" | |
| else: | |
| raise ParameterError( | |
| f"Invalid boolean value for {param_name}", | |
| param_info=f"Received: {raw_val}", | |
| suggestion="Please provide 'true' or 'false'." | |
| ) | |
| elif param["type"] == "str": | |
| if not isinstance(raw_val, str): | |
| raw_val = str(raw_val) | |
| # Check max length for strings | |
| if "max_length" in param and len(raw_val) > param["max_length"]: | |
| raise ParameterError( | |
| f"Value for {param_name} is too long", | |
| param_info=f"Length: {len(raw_val)} characters", | |
| suggestion=f"Maximum allowed length is {param['max_length']} characters." | |
| ) | |
| # Check if value is in allowed options | |
| if "options" in param and raw_val not in param["options"]: | |
| raise ParameterError( | |
| f"Invalid option for {param_name}", | |
| param_info=f"Received: {raw_val}", | |
| suggestion=f"Allowed options are: {', '.join(param['options'])}" | |
| ) | |
| validated_params[param_name] = raw_val | |
| elif param["type"] == "select": | |
| if "options" in param and raw_val not in param["options"]: | |
| raise ParameterError( | |
| f"Invalid selection for {param_name}", | |
| param_info=f"Received: {raw_val}", | |
| suggestion=f"Please select one of: {', '.join(param['options'])}" | |
| ) | |
| validated_params[param_name] = raw_val | |
| else: | |
| # For unrecognized types, just pass through the value | |
| validated_params[param_name] = raw_val | |
| return validated_params | |
| # --- Plugin Registry --- | |
| # Define all available quantum simulation plugins | |
| PLUGINS = { | |
| 'auth': { | |
| 'name': 'Post-Quantum Authentication', | |
| 'description': 'Simulate a lattice-based authentication system that remains secure against quantum computer attacks, based on the Ring-LWE problem.', | |
| 'icon': 'fa-lock', | |
| 'category': 'security', | |
| 'parameters': [ | |
| {'name': 'username', 'type': 'str', 'default': 'Bob', 'description': 'Username for authentication'}, | |
| {'name': 'noise', 'type': 'float', 'default': 0.0, 'description': 'Noise level (0.0 - 0.2)',"min": 0, "max": 0.2}, | |
| {'name': 'dimension', 'type': 'int', 'default': 4, 'description': 'Lattice dimension parameter',"min": 1, "max": 32} | |
| ], | |
| 'function': generate_quantum_fingerprint_cirq, | |
| # Standardized runner to match other plugins and Socket.IO flow | |
| 'run': lambda p: run_plugin( | |
| generate_quantum_fingerprint_cirq, | |
| _plugin_key="auth", | |
| data=p.get("username", "Bob"), | |
| num_qubits=p.get("dimension", 4) | |
| ) | |
| }, | |
| "bb84": { | |
| "name": "BB84 Protocol Simulation", | |
| "description": "Simulate the BB84 quantum key distribution protocol with realistic physical effects.", | |
| "icon": "fa-key", | |
| "category": "cryptography", | |
| "parameters": [ | |
| {"name": "num_bits", "type": "int", "default": 10, "description": "Number of bits to simulate", | |
| "min": 1, "max": 16}, | |
| {"name": "distance_km", "type": "float", "default": 0.0, "description": "Distance between Alice and Bob (km)", | |
| "min": 0.0, "max": 1000.0}, | |
| {"name": "hardware_type", "type": "select", "default": "fiber", | |
| "description": "Hardware type (fiber, satellite, trapped_ion)", | |
| "options": ["fiber", "satellite", "trapped_ion"]}, | |
| {"name": "noise", "type": "float", "default": 0.0, "description": "Additional noise probability", | |
| "min": 0.0, "max": 0.3}, | |
| {"name": "eve_present", "type": "bool", "default": False, | |
| "description": "Eavesdropper present"}, | |
| {"name": "eve_strategy", "type": "select", "default": "intercept_resend", | |
| "description": "Eavesdropper strategy (intercept_resend, beam_splitting, trojan)", | |
| "options": ["intercept_resend", "beam_splitting", "trojan_horse"]}, | |
| {"name": "detailed_simulation", "type": "bool", "default": True, | |
| "description": "Run detailed quantum simulation"} | |
| ], | |
| "run": lambda p: run_plugin(bb84_protocol_cirq, | |
| _plugin_key="bb84", | |
| num_bits=p["num_bits"], | |
| distance_km=p["distance_km"], | |
| hardware_type=p["hardware_type"], | |
| eve_present=p["eve_present"] if isinstance(p["eve_present"], bool) else p["eve_present"].lower() == "true", | |
| eve_strategy=p["eve_strategy"], | |
| detailed_simulation=p["detailed_simulation"] if isinstance(p["detailed_simulation"], bool) else p["detailed_simulation"].lower() == "true", | |
| noise_prob=p["noise"]) | |
| }, | |
| "shor": { | |
| "name": "Shor's Code Simulation", | |
| "description": "Simulate quantum error correction using Shor's code.", | |
| "icon": "fa-shield-alt", | |
| "category": "error-correction", | |
| "parameters": [ | |
| {"name": "noise", "type": "float", "default": 0.01, "description": "Noise probability", | |
| "min": 0.0, "max": 0.3} | |
| ], | |
| "run": lambda p: run_plugin(run_shor_code, _plugin_key="shor", noise_prob=p["noise"]) | |
| }, | |
| "grover": { | |
| "name": "Grover's Algorithm Simulation", | |
| "description": "Simulate Grover's search algorithm.", | |
| "icon": "fa-search", | |
| "category": "algorithms", | |
| "parameters": [ | |
| {"name": "n", "type": "int", "default": 3, "description": "Number of qubits", | |
| "min": 1, "max": 8}, | |
| {"name": "target_state", "type": "str", "default": "101", "description": "Target state (binary)", | |
| "max_length": 8}, | |
| {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", | |
| "min": 0.0, "max": 0.3} | |
| ], | |
| "run": lambda p: run_plugin(run_grover, _plugin_key="grover", n=p["n"], target_state=p["target_state"], noise_prob=p["noise"]) | |
| }, | |
| "handshake": { | |
| "name": "Quantum Handshake Simulation", | |
| "description": "Simulate a quantum handshake using entangled Bell pairs.", | |
| "icon": "fa-handshake", | |
| "category": "protocols", | |
| "parameters": [ | |
| {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", | |
| "min": 0.0, "max": 0.3} | |
| ], | |
| "run": lambda p: run_plugin(handshake_cirq, _plugin_key="handshake", noise_prob=p["noise"]) | |
| }, | |
| "network": { | |
| "name": "Entanglement Swapping Simulation", | |
| "description": "Simulate a quantum network using entanglement swapping.", | |
| "icon": "fa-network-wired", | |
| "category": "protocols", | |
| "parameters": [ | |
| {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", | |
| "min": 0.0, "max": 0.3} | |
| ], | |
| "run": lambda p: run_plugin(entanglement_swapping_cirq, _plugin_key="network", noise_prob=p["noise"]) | |
| }, | |
| "qrng": { | |
| "name": "Enhanced Quantum Random Number Generator", | |
| "description": "Generate truly random numbers using multiple quantum sources with advanced statistical analysis.", | |
| "icon": "fa-dice", | |
| "category": "utilities", | |
| "parameters": [ | |
| {"name": "num_bits", "type": "int", "default": 8, "description": "Number of quantum bits to generate", | |
| "min": 1, "max": 32}, | |
| {"name": "source_type", "type": "select", "default": "superposition", | |
| "description": "Quantum randomness source", | |
| "options": ["superposition", "vacuum_fluctuation", "entanglement"]}, | |
| {"name": "noise_level", "type": "float", "default": 0.0, "description": "Hardware noise level", | |
| "min": 0.0, "max": 0.3}, | |
| {"name": "enable_post_processing", "type": "bool", "default": False, | |
| "description": "Apply bias correction"}, | |
| {"name": "hardware_simulation", "type": "bool", "default": False, | |
| "description": "Add timing delays"} | |
| ], | |
| "run": lambda p: run_plugin(generate_random_number_cirq, | |
| _plugin_key="qrng", | |
| num_bits=p["num_bits"], | |
| source_type=p["source_type"], | |
| noise_level=p["noise_level"], | |
| enable_post_processing=p["enable_post_processing"] if isinstance(p["enable_post_processing"], bool) else p["enable_post_processing"].lower() == "true", | |
| hardware_simulation=p["hardware_simulation"] if isinstance(p["hardware_simulation"], bool) else p["hardware_simulation"].lower() == "true") | |
| }, | |
| "teleport": { | |
| "name": "Quantum Teleportation Simulation", | |
| "description": "Simulate quantum teleportation protocol.", | |
| "icon": "fa-atom", | |
| "category": "protocols", | |
| "parameters": [ | |
| {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", | |
| "min": 0.0, "max": 0.3} | |
| ], | |
| "run": lambda p: run_plugin(teleportation_circuit, _plugin_key="teleport", noise_prob=p["noise"]) | |
| }, | |
| "vqe": { | |
| "name": "Variational Quantum Eigensolver (VQE)", | |
| "description": "Simulate VQE to find the ground state energy of a hydrogen molecule with quantum chemical accuracy.", | |
| "icon": "fa-wave-square", | |
| "category": "algorithms", | |
| "parameters": [ | |
| {"name": "num_qubits", "type": "int", "default": 2, "description": "Number of qubits", | |
| "min": 2, "max": 4}, | |
| {"name": "noise_prob", "type": "float", "default": 0.01, "description": "Noise probability", | |
| "min": 0.0, "max": 0.3}, | |
| {"name": "max_iter", "type": "int", "default": 3, "description": "Maximum iterations", | |
| "min": 1, "max": 5}, | |
| {"name": "bond_distance", "type": "float", "default": 0.7414, "description": "H-H bond distance (Å)", | |
| "min": 0.0, "max": 2.5} | |
| ], | |
| "run": lambda p: run_plugin(run_vqe, | |
| _plugin_key="vqe", | |
| num_qubits=p["num_qubits"], | |
| noise_prob=p["noise_prob"], | |
| max_iter=p["max_iter"], | |
| bond_distance=p["bond_distance"]) | |
| }, | |
| "quantum_decryption_grover": { | |
| "name": "Quantum Decryption via Grover Key Search", | |
| "description": "Use Grover's algorithm to search for a secret key.", | |
| "icon": "fa-unlock", | |
| "category": "cryptography", | |
| "parameters": [ | |
| {"name": "key", "type": "int", "default": 5, "description": "Secret key (integer)", | |
| "min": 0, "max": 255}, | |
| {"name": "num_bits", "type": "int", "default": 4, "description": "Number of bits (search space)", | |
| "min": 1, "max": 8}, | |
| {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", | |
| "min": 0.0, "max": 0.3} | |
| ], | |
| "run": lambda p: run_plugin(grover_key_search, _plugin_key="quantum_decryption_grover", key=p["key"], num_bits=p["num_bits"], noise_prob=p["noise"]) | |
| }, | |
| "quantum_decryption_shor": { | |
| "name": "Quantum Decryption via Shor Factorization", | |
| "description": "Simulate quantum decryption using Shor's code simulation.", | |
| "icon": "fa-key", | |
| "category": "cryptography", | |
| "parameters": [ | |
| {"name": "N", "type": "int", "default": 15, "description": "Composite number", | |
| "min": 4, "max": 100} | |
| ], | |
| "run": lambda p: run_plugin(shor_factorization, _plugin_key="quantum_decryption_shor", N=p["N"]) | |
| }, | |
| "deutsch_jozsa": { | |
| "name": "Deutsch-Jozsa Algorithm", | |
| "description": "Determine if a function is constant or balanced with a single quantum query.", | |
| "icon": "fa-balance-scale", | |
| "category": "algorithms", | |
| "parameters": [ | |
| {"name": "n_qubits", "type": "int", "default": 3, "description": "Number of input qubits", | |
| "min": 1, "max": 8}, | |
| {"name": "oracle_type", "type": "str", "default": "random", "description": "Oracle type: constant_0, constant_1, balanced, or random", | |
| "options": ["constant_0", "constant_1", "balanced", "random"], "max_length": 10}, | |
| {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", | |
| "min": 0.0, "max": 0.3} | |
| ], | |
| "run": lambda p: run_plugin(deutsch_jozsa_cirq, _plugin_key="deutsch_jozsa", n_qubits=p["n_qubits"], oracle_type=p["oracle_type"], noise_prob=p["noise"]) | |
| }, | |
| "qft": { | |
| "name": "Quantum Fourier Transform", | |
| "description": "Implement the quantum analogue of the discrete Fourier transform.", | |
| "icon": "fa-wave-square", | |
| "category": "algorithms", | |
| "parameters": [ | |
| {"name": "n_qubits", "type": "int", "default": 3, "description": "Number of qubits", | |
| "min": 1, "max": 8}, | |
| {"name": "input_state", "type": "str", "default": "010", "description": "Input state (binary)", | |
| "max_length": 8}, | |
| {"name": "include_inverse", "type": "str", "default": "False", "description": "Include inverse QFT", | |
| "options": ["True", "False"], "max_length": 5}, | |
| {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", | |
| "min": 0.0, "max": 0.3} | |
| ], | |
| "run": lambda p: run_plugin(run_qft, _plugin_key="qft", n_qubits=p["n_qubits"], input_state=p["input_state"], | |
| include_inverse=p["include_inverse"].lower() == "true", noise_prob=p["noise"]) | |
| }, | |
| "phase_estimation": { | |
| "name": "Quantum Phase Estimation", | |
| "description": "Estimate eigenvalues of unitary operators with applications in quantum computing.", | |
| "icon": "fa-ruler-combined", | |
| "category": "algorithms", | |
| "parameters": [ | |
| {"name": "precision_bits", "type": "int", "default": 3, "description": "Number of bits of precision", | |
| "min": 1, "max": 6}, | |
| {"name": "target_phase", "type": "float", "default": 0.125, "description": "Target phase to estimate (0-1)", | |
| "min": 0.0, "max": 1.0}, | |
| {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", | |
| "min": 0.0, "max": 0.3} | |
| ], | |
| "run": lambda p: run_plugin(run_phase_estimation, _plugin_key="phase_estimation", precision_bits=p["precision_bits"], | |
| target_phase=p["target_phase"], noise_prob=p["noise"]) | |
| }, | |
| "qaoa": { | |
| "name": "Quantum Approximate Optimization Algorithm", | |
| "description": "Solve combinatorial optimization problems like MaxCut using a hybrid quantum-classical approach.", | |
| "icon": "fa-project-diagram", | |
| "category": "optimization", | |
| "parameters": [ | |
| {"name": "n_nodes", "type": "int", "default": 4, "description": "Number of nodes in the graph", | |
| "min": 2, "max": 8}, | |
| {"name": "edge_probability", "type": "float", "default": 0.5, "description": "Probability of edge creation", | |
| "min": 0.1, "max": 1.0}, | |
| {"name": "p_layers", "type": "int", "default": 1, "description": "Number of QAOA layers", | |
| "min": 1, "max": 3}, | |
| {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", | |
| "min": 0.0, "max": 0.3}, | |
| {"name": "num_samples", "type": "int", "default": 100, "description": "Number of samples", | |
| "min": 10, "max": 500} | |
| ], | |
| "run": lambda p: run_plugin(run_qaoa, _plugin_key="qaoa", n_nodes=p["n_nodes"], edge_probability=p["edge_probability"], | |
| p_layers=p["p_layers"], noise_prob=p["noise"], num_samples=p["num_samples"]) | |
| } | |
| } | |
| def get_template_path(plugin_key): | |
| """ | |
| Returns the appropriate template file path for the plugin's educational content. | |
| This function only returns the path, it doesn't extract content. | |
| """ | |
| templates = { | |
| 'bb84': 'educational/bb84.html', | |
| 'teleport': 'educational/teleport.html', | |
| 'grover': 'educational/grover.html', | |
| 'handshake': 'educational/handshake.html', | |
| 'auth': 'educational/auth.html', | |
| 'network': 'educational/network.html', | |
| 'qrng': 'educational/qrng.html', | |
| 'shor': 'educational/shor.html', | |
| 'vqe': 'educational/vqe.html', | |
| 'quantum_decryption_grover': 'educational/grover.html', | |
| 'quantum_decryption_shor': 'educational/shor.html', | |
| 'deutsch_jozsa': 'educational/deutsch_jozsa.html', | |
| 'qft': 'educational/qft.html', | |
| 'phase_estimation': 'educational/phase_estimation.html', | |
| 'qaoa': 'educational/qaoa.html', | |
| } | |
| # Return the template path or a default | |
| template_name = templates.get(plugin_key, 'educational/default.html') | |
| return os.path.join('templates', template_name) | |
| def extract_educational_content(template_path): | |
| """ | |
| Extracts the full educational content from a template file. | |
| Looks for content between <!-- EDUCATIONAL-CONTENT BEGIN --> and <!-- EDUCATIONAL-CONTENT END --> | |
| """ | |
| try: | |
| if not os.path.exists(template_path): | |
| logger.warning(f"Template file not found: {template_path}") | |
| return None | |
| with open(template_path, 'r') as f: | |
| content = f.read() | |
| # Use regex to extract content between markers | |
| content_pattern = re.compile(r'<!-- EDUCATIONAL-CONTENT BEGIN -->(.*?)<!-- EDUCATIONAL-CONTENT END -->', | |
| re.DOTALL) | |
| match = content_pattern.search(content) | |
| if match: | |
| return match.group(1).strip() | |
| else: | |
| logger.warning(f"Educational content markers not found in {template_path}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error extracting educational content: {e}") | |
| return None | |
| def extract_mini_explanation(template_path): | |
| """ | |
| Extracts mini explanation from a template file. | |
| """ | |
| try: | |
| if not os.path.exists(template_path): | |
| logger.warning(f"Template file not found: {template_path}") | |
| return None | |
| with open(template_path, 'r') as f: | |
| content = f.read() | |
| # Try all possible marker formats | |
| marker_patterns = [ | |
| r'<!-- MINI_EXPLANATION_START -->(.*?)<!-- MINI_EXPLANATION_END -->' | |
| ] | |
| for pattern in marker_patterns: | |
| mini_pattern = re.compile(pattern, re.DOTALL) | |
| match = mini_pattern.search(content) | |
| if match: | |
| logger.info(f"Found mini explanation using pattern: {pattern}") | |
| return match.group(1).strip() | |
| logger.warning(f"No mini explanation markers found in {template_path}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error extracting mini explanation: {e}") | |
| return None | |
| def get_mini_explanation(plugin_key): | |
| """ | |
| Gets the mini explanation for a plugin, either from its template file | |
| or returns a default explanation based on the plugin info. | |
| """ | |
| # Get the template path | |
| template_path = get_template_path(plugin_key) | |
| # Try to extract from template file | |
| mini_content = extract_mini_explanation(template_path) | |
| if mini_content: | |
| return mini_content | |
| # If no template or no mini section, generate default mini explanation | |
| plugin = PLUGINS.get(plugin_key, {}) | |
| plugin_name = plugin.get('name', plugin_key.replace('_', ' ').title()) | |
| return f""" | |
| <h5 class="fw-bold">{plugin_name}</h5> | |
| <p>This simulation demonstrates key quantum computing concepts including superposition, entanglement, and measurement.</p> | |
| <div class="alert alert-primary"> | |
| <strong>Key Quantum Concept:</strong> Quantum simulations provide insight into quantum behavior without requiring actual quantum hardware. | |
| </div> | |
| """ | |
| def get_educational_content(plugin_key): | |
| """ | |
| Gets the educational content for a plugin's modal. | |
| """ | |
| template_path = get_template_path(plugin_key) | |
| print(f"Template path: {template_path}") | |
| return extract_educational_content(template_path) | |
| # --- Route handlers --- | |
| def index(): | |
| # Serve React index.html | |
| return send_from_directory(app.static_folder, 'index.html') | |
| def health(): | |
| """Health check endpoint for Hugging Face Spaces""" | |
| return jsonify({"status": "healthy", "timestamp": datetime.now().isoformat()}), 200 | |
| def favicon(): | |
| """Favicon endpoint""" | |
| try: | |
| return send_from_directory(app.static_folder, 'favicon.ico') | |
| except Exception: | |
| return "", 204 | |
| def sitemap(): | |
| """Generate the sitemap.xml file dynamically.""" | |
| # Get base URL from request or use production URL | |
| if app.config.get('ENV') == 'production': | |
| base_url = "https://quantumfieldkit.com" | |
| else: | |
| base_url = request.url_root.rstrip('/') | |
| # Current date for lastmod | |
| import datetime | |
| today = datetime.date.today().isoformat() | |
| # Build the sitemap XML string | |
| xml = ['<?xml version="1.0" encoding="UTF-8"?>', | |
| '<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">', | |
| ' <url>', | |
| f' <loc>{base_url}/</loc>', | |
| f' <lastmod>{today}</lastmod>', | |
| ' <changefreq>weekly</changefreq>', | |
| ' <priority>1.0</priority>', | |
| ' </url>'] | |
| # Add glossary page | |
| xml.append(' <url>') | |
| xml.append(f' <loc>{base_url}/glossary</loc>') | |
| xml.append(f' <lastmod>{today}</lastmod>') | |
| xml.append(' <changefreq>monthly</changefreq>') | |
| xml.append(' <priority>0.8</priority>') | |
| xml.append(' </url>') | |
| # Add all plugin pages with categorization | |
| for plugin_key, plugin in PLUGINS.items(): | |
| # Group plugins by category for better SEO | |
| category = plugin.get("category", "other") | |
| xml.append(' <url>') | |
| xml.append(f' <loc>{base_url}/plugin/{plugin_key}</loc>') | |
| xml.append(f' <lastmod>{today}</lastmod>') | |
| xml.append(' <changefreq>monthly</changefreq>') | |
| xml.append(' <priority>0.8</priority>') | |
| xml.append(' </url>') | |
| # Add category pages if you implement them | |
| categories = set(plugin.get("category", "other") for plugin in PLUGINS.values()) | |
| for category in categories: | |
| xml.append(' <url>') | |
| xml.append(f' <loc>{base_url}/category/{category}</loc>') | |
| xml.append(f' <lastmod>{today}</lastmod>') | |
| xml.append(' <changefreq>monthly</changefreq>') | |
| xml.append(' <priority>0.7</priority>') | |
| xml.append(' </url>') | |
| xml.append('</urlset>') | |
| return app.response_class( | |
| response='\n'.join(xml), | |
| status=200, | |
| mimetype='application/xml' | |
| ) | |
| def html_sitemap(): | |
| # Serve SPA index for HTML sitemap requests | |
| return send_from_directory(app.static_folder, 'index.html') | |
| def robots(): | |
| """Serve robots.txt dynamically.""" | |
| if app.config.get('ENV') == 'production': | |
| base_url = "https://quantumfieldkit.com" | |
| else: | |
| base_url = request.url_root.rstrip('/') | |
| robots_txt = f"""User-agent: * | |
| Allow: / | |
| Sitemap: {base_url}/sitemap.xml | |
| """ | |
| return app.response_class( | |
| response=robots_txt, | |
| status=200, | |
| mimetype='text/plain' | |
| ) | |
| def category_view(category): | |
| return send_from_directory(app.static_folder, 'index.html') | |
| def glossary(): | |
| return send_from_directory(app.static_folder, 'index.html') | |
| def add_cache_headers(response): | |
| """Add cache headers to responses to improve performance.""" | |
| import datetime | |
| # Don't cache dynamic content | |
| if request.path.startswith('/static/'): | |
| # Cache static files for 1 week | |
| expiry = datetime.datetime.now() + datetime.timedelta(days=7) | |
| response.headers['Cache-Control'] = 'public, max-age=604800' | |
| response.headers['Expires'] = expiry.strftime("%a, %d %b %Y %H:%M:%S GMT") | |
| else: | |
| # Don't cache dynamic content | |
| response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' | |
| return response | |
| def plugin_view(plugin_key): | |
| if request.method == 'POST': | |
| # Support legacy AJAX POSTs from static JS (if any) | |
| if plugin_key not in PLUGINS: | |
| return jsonify({"error": "Plugin not found"}), 404 | |
| plugin = PLUGINS[plugin_key] | |
| try: | |
| raw_params = {} | |
| for param in plugin.get('parameters', []): | |
| param_name = param['name'] | |
| if param_name in request.form: | |
| raw_params[param_name] = request.form.get(param_name) | |
| params = validate_parameters(plugin, raw_params) | |
| if 'run' not in plugin: | |
| return jsonify({"error": "Plugin runner not defined"}), 500 | |
| result = plugin['run'](params) | |
| return jsonify(result) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 400 | |
| # GET falls back to React | |
| return send_from_directory(app.static_folder, 'index.html') | |
| def api_plugins(): | |
| """Return a list of available plugins.""" | |
| categories = {} | |
| for key, plugin in PLUGINS.items(): | |
| category = plugin.get("category", "other") | |
| if category not in categories: | |
| categories[category] = [] | |
| # Only include serializable data | |
| serializable_plugin = { | |
| "key": key, | |
| "name": plugin.get("name"), | |
| "description": plugin.get("description"), | |
| "icon": plugin.get("icon"), | |
| "category": plugin.get("category"), | |
| "parameters": plugin.get("parameters", []) | |
| } | |
| categories[category].append(serializable_plugin) | |
| return jsonify(categories) | |
| def api_plugin(plugin_key): | |
| """Return details for a specific plugin.""" | |
| if plugin_key not in PLUGINS: | |
| return jsonify({"error": "Plugin not found"}), 404 | |
| plugin = PLUGINS[plugin_key] | |
| # Only include serializable data | |
| serializable_plugin = { | |
| "key": plugin_key, | |
| "name": plugin.get("name"), | |
| "description": plugin.get("description"), | |
| "icon": plugin.get("icon"), | |
| "category": plugin.get("category"), | |
| "parameters": plugin.get("parameters", []) | |
| } | |
| return jsonify(serializable_plugin) | |
| def api_run_plugin(plugin_key): | |
| """Run a plugin simulation.""" | |
| if plugin_key not in PLUGINS: | |
| return jsonify({"error": "Plugin not found"}), 404 | |
| try: | |
| params = request.get_json() | |
| plugin = PLUGINS[plugin_key] | |
| if 'run' in plugin and callable(plugin['run']): | |
| # Use standardized runner which already wraps results | |
| result = plugin['run'](params or {}) | |
| elif 'function' in plugin and callable(plugin['function']): | |
| # Fallback for legacy plugins | |
| result = run_plugin(plugin['function'], _plugin_key=plugin_key, **(params or {})) | |
| else: | |
| return jsonify({"error": "Plugin is misconfigured"}), 500 | |
| return jsonify(result) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 400 | |
| def api_glossary(): | |
| """Return glossary terms.""" | |
| # Prefer glossary shipped with the built SPA, then public, then repo static fallback | |
| candidates = [ | |
| os.path.join(os.path.dirname(__file__), 'frontend', 'build', 'data', 'glossary_terms.json'), | |
| os.path.join(os.path.dirname(__file__), 'frontend', 'public', 'data', 'glossary_terms.json'), | |
| os.path.join(os.path.dirname(__file__), 'static', 'data', 'glossary_terms.json'), | |
| ] | |
| terms_file = next((p for p in candidates if os.path.exists(p)), None) | |
| try: | |
| with open(terms_file, 'r') as f: | |
| terms = json.load(f) | |
| return jsonify(terms) | |
| except (FileNotFoundError, json.JSONDecodeError) as e: | |
| app.logger.error(f"Error loading glossary terms: {e}") | |
| return jsonify([{"term": "Qubit", "definition": "The fundamental unit of quantum information."}, | |
| {"term": "Superposition", "definition": "A quantum property allowing particles to exist in multiple states."}]) | |
| def api_category(category): | |
| """Return plugins in a specific category.""" | |
| plugins_in_category = {k: v for k, v in PLUGINS.items() if v.get("category", "other") == category} | |
| if not plugins_in_category: | |
| return jsonify({"error": "Category not found"}), 404 | |
| return jsonify(plugins_in_category) | |
| def api_educational(plugin_key): | |
| """Return educational content for a specific plugin.""" | |
| if plugin_key not in PLUGINS: | |
| return jsonify({"error": "Plugin not found"}), 404 | |
| try: | |
| # Get the template path for the plugin | |
| template_path = get_template_path(plugin_key) | |
| # Extract educational content | |
| content = extract_educational_content(template_path) | |
| if content: | |
| return jsonify({"content": content}) | |
| else: | |
| return jsonify({"error": "Educational content not found"}), 404 | |
| except Exception as e: | |
| logger.error(f"Error loading educational content: {e}") | |
| return jsonify({"error": "Failed to load educational content"}), 500 | |
| def api_validate(plugin_key): | |
| """Validate parameters for a specific plugin.""" | |
| if request.method == "OPTIONS": | |
| return jsonify({"status": "ok"}), 200 | |
| if plugin_key not in PLUGINS: | |
| return jsonify({"error": "Plugin not found"}), 404 | |
| try: | |
| params = request.get_json() | |
| plugin = PLUGINS[plugin_key] | |
| validated_params = validate_parameters(plugin, params) | |
| return jsonify({"status": "valid", "params": validated_params}) | |
| except ParameterError as e: | |
| return jsonify({ | |
| "status": "invalid", | |
| "error": e.message, | |
| "param_info": e.param_info, | |
| "suggestion": e.suggestion | |
| }), 400 | |
| except Exception as e: | |
| return jsonify({"status": "error", "error": str(e)}), 500 | |
| def circuit_designer(): | |
| # SPA route | |
| return send_from_directory(app.static_folder, 'index.html') | |
| # API routes for Circuit Designer | |
| def api_circuit_gates(): | |
| """Get available quantum gates for the circuit designer""" | |
| try: | |
| # Basic set of gates organized by categories | |
| gates = { | |
| "single_qubit": [ | |
| {"id": "x", "name": "X", "description": "Pauli-X gate (NOT gate)", "symbol": "X"}, | |
| {"id": "y", "name": "Y", "description": "Pauli-Y gate", "symbol": "Y"}, | |
| {"id": "z", "name": "Z", "description": "Pauli-Z gate", "symbol": "Z"}, | |
| {"id": "h", "name": "H", "description": "Hadamard gate", "symbol": "H"}, | |
| {"id": "s", "name": "S", "description": "Phase gate (S)", "symbol": "S"}, | |
| {"id": "t", "name": "T", "description": "π/8 gate (T)", "symbol": "T"}, | |
| {"id": "rx", "name": "RX", "description": "Rotation around X-axis", "symbol": "RX", "params": [{"name": "theta", "default": "π/2"}]}, | |
| {"id": "ry", "name": "RY", "description": "Rotation around Y-axis", "symbol": "RY", "params": [{"name": "theta", "default": "π/2"}]}, | |
| {"id": "rz", "name": "RZ", "description": "Rotation around Z-axis", "symbol": "RZ", "params": [{"name": "theta", "default": "π/2"}]} | |
| ], | |
| "multi_qubit": [ | |
| {"id": "cnot", "name": "CNOT", "description": "Controlled-NOT gate", "symbol": "CNOT", "qubits": 2}, | |
| {"id": "cz", "name": "CZ", "description": "Controlled-Z gate", "symbol": "CZ", "qubits": 2}, | |
| {"id": "swap", "name": "SWAP", "description": "SWAP gate", "symbol": "SWAP", "qubits": 2}, | |
| {"id": "ccx", "name": "Toffoli", "description": "Toffoli gate (CCX)", "symbol": "CCX", "qubits": 3}, | |
| {"id": "cswap", "name": "Fredkin", "description": "Fredkin gate (CSWAP)", "symbol": "CSWAP", "qubits": 3} | |
| ], | |
| "special": [ | |
| {"id": "measure", "name": "Measure", "description": "Measurement operation", "symbol": "M"}, | |
| {"id": "reset", "name": "Reset", "description": "Reset qubit to |0⟩", "symbol": "R"} | |
| ] | |
| } | |
| return jsonify(gates) | |
| except Exception as e: | |
| logger.error(f"Error getting available gates: {str(e)}") | |
| return jsonify({"error": "Failed to get available gates"}), 500 | |
| def api_circuit_simulate(): | |
| """Simulate a quantum circuit""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({"error": "No circuit data provided"}), 400 | |
| circuit = data.get("circuit") | |
| shots = data.get("shots", 1024) | |
| if not circuit: | |
| return jsonify({"error": "No circuit definition provided"}), 400 | |
| # Ensure shots is an integer and within reasonable limits | |
| try: | |
| shots = int(shots) | |
| if shots < 1 or shots > 10000: | |
| return jsonify({"error": "Shots must be between 1 and 10000"}), 400 | |
| except ValueError: | |
| return jsonify({"error": "Shots must be an integer"}), 400 | |
| # This is where we'd build and simulate the circuit using Cirq | |
| # For now, we'll use mock data | |
| import numpy as np | |
| import cirq | |
| # Convert JSON circuit representation to a Cirq circuit | |
| cirq_circuit = cirq.Circuit() | |
| qubits = [cirq.LineQubit(i) for i in range(circuit.get("num_qubits", 3))] | |
| # Add gates to circuit based on the circuit data | |
| # This would need to be implemented based on your circuit JSON schema | |
| # For now, return mock simulation results | |
| result = { | |
| "state_vector": { | |
| "amplitudes": [ | |
| {"state": "000", "amplitude": 1.0, "probability": 1.0} | |
| # Additional states would be added here in a real implementation | |
| ], | |
| "visualization_data": {} # Visualization data would go here | |
| }, | |
| "measurements": { | |
| "counts": {"000": shots}, | |
| "probabilities": {"000": 1.0} | |
| }, | |
| "circuit_representation": { | |
| "qubits": circuit.get("num_qubits", 3), | |
| "depth": len(circuit.get("gates", [])), | |
| "gates": circuit.get("gates", []) | |
| } | |
| } | |
| return jsonify(result) | |
| except Exception as e: | |
| logger.error(f"Error simulating circuit: {str(e)}\n{traceback.format_exc()}") | |
| return jsonify({"error": f"Failed to simulate circuit: {str(e)}"}), 500 | |
| def api_circuit_save(): | |
| """Save a quantum circuit""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({"error": "No circuit data provided"}), 400 | |
| # Here you would save the circuit to a database or file | |
| # For this example, we'll pretend to save it and return a mock ID | |
| circuit_id = datetime.now().strftime("%Y%m%d%H%M%S") | |
| return jsonify({ | |
| "id": circuit_id, | |
| "name": data.get("name", "Unnamed Circuit"), | |
| "saved": True, | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error saving circuit: {str(e)}") | |
| return jsonify({"error": f"Failed to save circuit: {str(e)}"}), 500 | |
| def api_circuit_load(circuit_id): | |
| """Load a saved quantum circuit""" | |
| try: | |
| # Here you would load the circuit from a database or file | |
| # For this example, we'll return a mock circuit | |
| mock_circuit = { | |
| "id": circuit_id, | |
| "name": f"Circuit {circuit_id}", | |
| "num_qubits": 3, | |
| "gates": [ | |
| {"type": "h", "targets": [0]}, | |
| {"type": "cnot", "controls": [0], "targets": [1]}, | |
| {"type": "measure", "targets": [0, 1]} | |
| ], | |
| "created_at": datetime.now().isoformat() | |
| } | |
| return jsonify(mock_circuit) | |
| except Exception as e: | |
| logger.error(f"Error loading circuit {circuit_id}: {str(e)}") | |
| return jsonify({"error": f"Failed to load circuit: {str(e)}"}), 500 | |
| def api_circuit_saved(): | |
| """Get list of saved circuits""" | |
| try: | |
| # Here you would query a database for saved circuits | |
| # For this example, we'll return mock data | |
| mock_saved_circuits = [ | |
| {"id": "20230512120000", "name": "Bell State", "num_qubits": 2, "created_at": "2023-05-12T12:00:00"}, | |
| {"id": "20230513130000", "name": "GHZ State", "num_qubits": 3, "created_at": "2023-05-13T13:00:00"}, | |
| {"id": "20230514140000", "name": "Quantum Teleportation", "num_qubits": 3, "created_at": "2023-05-14T14:00:00"} | |
| ] | |
| return jsonify(mock_saved_circuits) | |
| except Exception as e: | |
| logger.error(f"Error getting saved circuits: {str(e)}") | |
| return jsonify({"error": f"Failed to get saved circuits: {str(e)}"}), 500 | |
| def api_circuit_export(): | |
| """Export circuit to code (Cirq, Qiskit, etc.)""" | |
| try: | |
| data = request.get_json() | |
| if not data: | |
| return jsonify({"error": "No circuit data provided"}), 400 | |
| circuit = data.get("circuit") | |
| format = data.get("format", "cirq").lower() | |
| if not circuit: | |
| return jsonify({"error": "No circuit definition provided"}), 400 | |
| if format not in ["cirq", "qiskit"]: | |
| return jsonify({"error": f"Unsupported export format: {format}"}), 400 | |
| # Generate code based on the circuit definition | |
| num_qubits = circuit.get("num_qubits", 3) | |
| if format == "cirq": | |
| code = f""" | |
| import cirq | |
| import numpy as np | |
| # Create a circuit with {num_qubits} qubits | |
| circuit = cirq.Circuit() | |
| # Define qubits | |
| qubits = [cirq.LineQubit(i) for i in range({num_qubits})] | |
| # Add gates | |
| """ | |
| # Here you would iterate through the gates and add them to the code | |
| code += """ | |
| # Simulate | |
| simulator = cirq.Simulator() | |
| result = simulator.simulate(circuit) | |
| print("Final state vector:") | |
| print(result.final_state_vector) | |
| """ | |
| elif format == "qiskit": | |
| code = f""" | |
| from qiskit import QuantumCircuit, Aer, execute | |
| import numpy as np | |
| # Create a circuit with {num_qubits} qubits and {num_qubits} classical bits | |
| qc = QuantumCircuit({num_qubits}, {num_qubits}) | |
| # Add gates | |
| """ | |
| # Here you would iterate through the gates and add them to the code | |
| code += """ | |
| # Simulate | |
| simulator = Aer.get_backend('statevector_simulator') | |
| result = execute(qc, simulator).result() | |
| statevector = result.get_statevector() | |
| print("Final state vector:") | |
| print(statevector) | |
| """ | |
| return jsonify({ | |
| "code": code, | |
| "format": format | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error exporting circuit: {str(e)}") | |
| return jsonify({"error": f"Failed to export circuit: {str(e)}"}), 500 | |
| # --- Socket.IO event handlers for real-time updates --- | |
| def handle_connect(): | |
| """Handle client connection.""" | |
| logger.info(f"Client connected: {request.sid}") | |
| def handle_disconnect(): | |
| """Handle client disconnection.""" | |
| logger.info(f"Client disconnected: {request.sid}") | |
| def handle_run_plugin(data): | |
| """Run a plugin and emit progress updates.""" | |
| plugin_key = data.get('plugin_key') | |
| raw_params = data.get('params', {}) | |
| if plugin_key not in PLUGINS: | |
| emit('plugin_error', {'error': f"Plugin '{plugin_key}' not found"}) | |
| return | |
| plugin = PLUGINS[plugin_key] | |
| try: | |
| # Validate parameters | |
| params = validate_parameters(plugin, raw_params) | |
| # Add plugin key for better error reporting | |
| params['_plugin_key'] = plugin_key | |
| # Set up progress tracking | |
| session_id = request.sid | |
| def progress_callback(step, total, message): | |
| progress = int(100 * step / total) if total > 0 else 0 | |
| emit('plugin_progress', { | |
| 'plugin_key': plugin_key, | |
| 'progress': progress, | |
| 'message': message | |
| }) | |
| # Add progress callback to parameters if supported | |
| params['progress_callback'] = progress_callback | |
| # Run the plugin and emit the result | |
| emit('plugin_start', {'plugin_key': plugin_key}) | |
| result = plugin["run"](params) | |
| emit('plugin_result', {'plugin_key': plugin_key, 'result': result}) | |
| except SimulationError as e: | |
| # Handle validation errors | |
| error_msg = str(e) | |
| if hasattr(e, 'suggestion') and e.suggestion: | |
| error_msg += f"\n\nSuggestion: {e.suggestion}" | |
| emit('plugin_error', {'plugin_key': plugin_key, 'error': error_msg}) | |
| except Exception as e: | |
| emit('plugin_error', {'plugin_key': plugin_key, 'error': str(e)}) | |
| # --- Error handling --- | |
| def page_not_found(e): | |
| # SPA fallback | |
| try: | |
| return send_from_directory(app.static_folder, 'index.html') | |
| except Exception: | |
| return jsonify({"error": "Not found"}), 404 | |
| def server_error(e): | |
| return jsonify({"error": "Server error"}), 500 | |
| # --- Main entry point --- | |
| if __name__ == "__main__": | |
| # Check if we're running in a containerized environment | |
| is_containerized = os.environ.get('CONTAINERIZED', 'false').lower() == 'true' | |
| port = int(os.environ.get('PORT', 5000)) | |
| host = os.environ.get('HOST', '127.0.0.1') | |
| if is_containerized or os.environ.get('FLASK_ENV') == 'production': | |
| # In containerized environments, use standard Flask development server | |
| # This is more compatible with platforms like Hugging Face Spaces | |
| from werkzeug.serving import run_simple | |
| print(f"Starting Flask app on {host}:{port} (containerized mode)") | |
| app.run(host=host, port=port, debug=False) | |
| else: | |
| # SSL Configuration for development | |
| if os.environ.get('FLASK_ENV') == 'production' and os.path.exists('cert.pem') and os.path.exists('key.pem'): | |
| socketio.run(app, debug=False, host=host, port=port, | |
| certfile='cert.pem', keyfile='key.pem') | |
| else: | |
| # Development environment with SocketIO | |
| socketio.run(app, debug=True, host=host, port=port) | |