import os import traceback import json import logging from flask import Flask, request, render_template, jsonify, session, abort, send_from_directory from flask_socketio import SocketIO, emit from werkzeug.middleware.proxy_fix import ProxyFix import signal from functools import wraps #import threading #import multiprocessing import concurrent.futures from datetime import datetime import psutil import re from flask_cors import CORS # Import simulation functions from plugins from plugins.authentication.auth import generate_quantum_fingerprint_cirq from plugins.encryption_bb84.bb84 import bb84_protocol_cirq from plugins.error_correction.shor_code import run_shor_code from plugins.grover.grover import run_grover from plugins.handshake.handshake import handshake_cirq from plugins.network.network import entanglement_swapping_cirq from plugins.qrng.qrng import generate_random_number_cirq from plugins.quantum_decryption.quantum_decryption import grover_key_search, shor_factorization from plugins.teleportation.teleport import teleportation_circuit from plugins.variational.vqe import run_vqe from plugins.deutsch_jozsa.deutsch_jozsa import deutsch_jozsa_cirq from plugins.quantum_fourier.qft import run_qft from plugins.phase_estimation.phase_estimation import run_phase_estimation from plugins.optimization.qaoa import run_qaoa # Configure Errors class SimulationError(Exception): """Base class for simulation errors""" def __init__(self, message, param_info=None, suggestion=None): self.message = message self.param_info = param_info self.suggestion = suggestion super().__init__(self.message) class ParameterError(SimulationError): """Error for invalid simulation parameters""" pass class QuantumCircuitError(SimulationError): """Error in quantum circuit construction or execution""" pass class ResourceExceededError(SimulationError): """Error when simulation exceeds available resources""" pass def configure_logging(): # Use environment variable to determine log directory, fallback to tmp directory for containerized environments log_dir = os.environ.get('LOG_DIR', '/tmp' if os.path.exists('/tmp') else '.') # Create logs directory if it doesn't exist and we have permission try: if not os.path.exists(log_dir): os.makedirs(log_dir, exist_ok=True) # Test if we can write to the directory test_file = os.path.join(log_dir, '.test_write_access') with open(test_file, 'w') as f: f.write('test') os.remove(test_file) except Exception as e: # Fallback to current directory if we can't write to the log directory log_dir = '.' try: if not os.path.exists(log_dir): os.makedirs(log_dir, exist_ok=True) except Exception: # If we still can't create directory, use current directory log_dir = '.' # Create log file with date-based naming log_filename = f"{log_dir}/quantum_field_kit_{datetime.now().strftime('%Y-%m-%d')}.log" # Configure root logger logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", handlers=[ # Console handler for immediate feedback (always works) logging.StreamHandler() ] ) # Get logger instance logger = logging.getLogger('quantum_field_kit') # Try to add file handler, but don't fail if we can't write to file try: file_handler = logging.FileHandler(log_filename) logging.getLogger().addHandler(file_handler) logger.info(f"File logging configured successfully. Log file: {log_filename}") except Exception as e: # In containerized environments, we might not be able to write files # Just continue with console logging only logger.warning(f"File logging not available: {e}. Using console logging only.") # Set specific logger levels logging.getLogger('werkzeug').setLevel(logging.WARNING) logging.getLogger('socketio').setLevel(logging.WARNING) return logger # Create application logger logger = configure_logging() # Configure matplotlib for containerized environments # Set MPLCONFIGDIR to a writable directory to avoid permission errors def setup_matplotlib_config(): """Setup matplotlib configuration directory with proper permissions""" mpl_config_dir = os.environ.get('MPLCONFIGDIR', '/tmp/matplotlib' if os.path.exists('/tmp') else './.matplotlib') try: # Create the directory with exist_ok to avoid errors if it already exists os.makedirs(mpl_config_dir, exist_ok=True) # Test if we can write to the directory test_file = os.path.join(mpl_config_dir, '.test_write_access') with open(test_file, 'w') as f: f.write('test') os.remove(test_file) # Set the environment variable os.environ['MPLCONFIGDIR'] = mpl_config_dir logger.info(f"Matplotlib config directory set to: {mpl_config_dir}") return True except Exception as e: logger.warning(f"Could not configure matplotlib directory {mpl_config_dir}: {e}") # Try fallback directories fallback_dirs = ['./.matplotlib', '.', '/tmp'] for fallback_dir in fallback_dirs: try: os.makedirs(fallback_dir, exist_ok=True) test_file = os.path.join(fallback_dir, '.test_write_access') with open(test_file, 'w') as f: f.write('test') os.remove(test_file) os.environ['MPLCONFIGDIR'] = fallback_dir logger.info(f"Matplotlib config directory set to fallback: {fallback_dir}") return True except Exception as fallback_e: logger.warning(f"Fallback directory {fallback_dir} also failed: {fallback_e}") continue # If all else fails, let matplotlib handle it logger.warning("All attempts to set matplotlib config directory failed. Letting matplotlib use its default.") return False # Setup matplotlib configuration setup_matplotlib_config() # Initialize Flask application (serve React build in production) app = Flask(__name__, static_folder='frontend/build', static_url_path='/') # Configure CORS origins from environment variable cors_origins = os.environ.get('CORS_ORIGINS', 'http://localhost:3000,http://127.0.0.1:3000,http://localhost:5000,http://127.0.0.1:5000,https://quantumfieldkit.com,https://www.quantumfieldkit.com,https://quantumfieldkit.fly.dev' ).split(',') CORS(app, resources={ r"/api/*": { "origins": [origin.strip() for origin in cors_origins], "methods": ["GET", "POST", "OPTIONS"], "allow_headers": ["Content-Type"] } }) app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', os.urandom(24)) app.config['TEMPLATES_AUTO_RELOAD'] = True app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload # Add proxy fix for proper IP handling behind reverse proxies app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1) # Initialize Socket.IO for real-time communication # Use environment variable to determine if SocketIO should be enabled socketio_enabled = os.environ.get('SOCKETIO_ENABLED', 'true').lower() == 'true' if socketio_enabled: socketio = SocketIO(app, async_mode='eventlet', cors_allowed_origins="*", logger=True, engineio_logger=True) else: # Create a mock socketio object for environments where SocketIO is not supported class MockSocketIO: def on(self, *args, **kwargs): def decorator(func): return func return decorator def emit(self, *args, **kwargs): pass def run(self, *args, **kwargs): pass socketio = MockSocketIO() def timeout(seconds): """ Decorator that adds a timeout to a function. If the function takes longer than 'seconds' to execute, it will be terminated. Args: seconds: Maximum execution time in seconds Returns: Decorated function with timeout capability """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): plugin_name = kwargs.get('_plugin_name', 'Unknown plugin') with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(func, *args, **kwargs) try: return future.result(timeout=seconds) except concurrent.futures.TimeoutError: # More detailed timeout message return { "output": None, "log": f"Simulation for {plugin_name} started but could not complete within {seconds} seconds.\n" f"This may be due to complex parameters or high precision settings.", "error": f"Execution timed out after {seconds} seconds. Try reducing complexity of the simulation." } return wrapper return decorator def json_safe(obj): """ Recursively convert non-JSON-serializable objects to strings. Preserve raw SVG (under the key "circuit_svg") so it is not altered. """ try: json.dumps(obj) return obj except (TypeError, OverflowError): if isinstance(obj, dict): new_obj = {} for k, v in obj.items(): new_obj[k] = v if k == "circuit_svg" else json_safe(v) return new_obj elif isinstance(obj, (list, tuple)): return [json_safe(item) for item in obj] elif hasattr(obj, 'tolist'): # For numpy arrays return obj.tolist() else: return str(obj) def check_memory_usage(): """Check if memory usage is within acceptable limits""" try: process = psutil.Process(os.getpid()) memory_usage = process.memory_info().rss / (1024 * 1024) # MB memory_percent = process.memory_percent() if memory_percent > 85: logger.warning(f"High memory usage detected: {memory_usage:.2f} MB ({memory_percent:.1f}%)") return False return True except Exception as e: logger.error(f"Error checking memory usage: {e}") return True # Assume it's safe if we can't check def wrap_result(sim_result): """ Standardize the simulation result into a dictionary with keys: - "output": All keys except "log" - "log": Detailed process log - "error": None if successful, or the error message. """ if isinstance(sim_result, dict): if 'log' in sim_result: output = {k: v for k, v in sim_result.items() if k != 'log'} output = json_safe(output) return {"output": output, "log": sim_result['log'], "error": None} else: return {"output": json_safe(sim_result), "log": "", "error": None} elif isinstance(sim_result, tuple): if len(sim_result) > 1: *outputs, log = sim_result output = outputs[0] if len(outputs) == 1 else outputs output = json_safe(output) return {"output": output, "log": log, "error": None} else: return {"output": json_safe(sim_result[0]), "log": "", "error": None} else: return {"output": json_safe(sim_result), "log": "", "error": None} def run_plugin(sim_func, **params): """ Calls the simulation function with the given parameters. Returns a standardized result dictionary with improved error handling. """ # Check memory before running simulation if not check_memory_usage(): return { "output": None, "log": "Server is currently experiencing high memory usage. Please try again later.", "error": "Insufficient memory available for simulation. Try reducing simulation complexity." } try: # Extract plugin key for error reporting but don't pass it to the simulation function plugin_key = params.pop('_plugin_key', 'unknown_plugin') # Also remove _plugin_name if it exists (backward compatibility) params.pop('_plugin_name', None) logger.info(f"Running {plugin_key} simulation with parameters: {params}") # Apply timeout to simulation function @timeout(15) # Increased timeout for complex simulations def run_with_timeout(): # Don't pass plugin identifiers to the actual simulation function return sim_func(**params) sim_result = run_with_timeout() if isinstance(sim_result, dict) and "error" in sim_result and sim_result["error"] is not None: # This is a timeout error from our decorator return sim_result return wrap_result(sim_result) except ParameterError as e: # Handle parameter errors with helpful suggestions error_msg = f"Parameter error: {e.message}" if e.param_info: error_msg += f"\nParameter: {e.param_info}" if e.suggestion: error_msg += f"\nSuggestion: {e.suggestion}" logger.error(f"Parameter error in {plugin_key}: {error_msg}") return {"output": None, "log": None, "error": error_msg} except QuantumCircuitError as e: # Handle quantum circuit specific errors error_msg = f"Quantum circuit error: {e.message}" if e.suggestion: error_msg += f"\nSuggestion: {e.suggestion}" logger.error(f"Circuit error in {plugin_key}: {error_msg}") return {"output": None, "log": None, "error": error_msg} except Exception as e: # Generic exception handling with improved context error_type = type(e).__name__ error_msg = str(e) stack_trace = traceback.format_exc() # Create a user-friendly error message user_error = f"Error ({error_type}): {error_msg}" # Log the full technical details logger.error(f"Simulation error in {plugin_key}: {error_type} - {error_msg}\n{stack_trace}") return { "output": None, "log": f"Simulation failed. See error tab for details.", "error": user_error } def validate_parameters(plugin, params): """Enhanced parameter validation with detailed error messages""" validated_params = {} for param in plugin["parameters"]: param_name = param["name"] # Check if required parameter is missing if param_name not in params and "default" not in param: raise ParameterError( f"Missing required parameter: {param_name}", param_info=f"{param_name} ({param['type']})", suggestion="Please provide a value for this required parameter." ) # Use default if parameter is missing if param_name not in params and "default" in param: validated_params[param_name] = param["default"] continue # Validate parameter based on type raw_val = params[param_name] if param["type"] == "int": try: val = int(raw_val) # Check min/max bounds if "min" in param and val < param["min"]: raise ParameterError( f"Value for {param_name} is too small", param_info=f"{param_name} = {val}", suggestion=f"Minimum allowed value is {param['min']}." ) if "max" in param and val > param["max"]: raise ParameterError( f"Value for {param_name} is too large", param_info=f"{param_name} = {val}", suggestion=f"Maximum allowed value is {param['max']}." ) validated_params[param_name] = val except ValueError: raise ParameterError( f"Invalid integer value for {param_name}", param_info=f"Received: {raw_val}", suggestion="Please provide a valid integer value." ) elif param["type"] == "float": try: val = float(raw_val) # Check min/max bounds if "min" in param and val < param["min"]: raise ParameterError( f"Value for {param_name} is too small", param_info=f"{param_name} = {val}", suggestion=f"Minimum allowed value is {param['min']}." ) if "max" in param and val > param["max"]: raise ParameterError( f"Value for {param_name} is too large", param_info=f"{param_name} = {val}", suggestion=f"Maximum allowed value is {param['max']}." ) validated_params[param_name] = val except ValueError: raise ParameterError( f"Invalid float value for {param_name}", param_info=f"Received: {raw_val}", suggestion="Please provide a valid decimal number." ) elif param["type"] == "bool": if isinstance(raw_val, bool): validated_params[param_name] = raw_val elif isinstance(raw_val, str): validated_params[param_name] = raw_val.lower() == "true" else: raise ParameterError( f"Invalid boolean value for {param_name}", param_info=f"Received: {raw_val}", suggestion="Please provide 'true' or 'false'." ) elif param["type"] == "str": if not isinstance(raw_val, str): raw_val = str(raw_val) # Check max length for strings if "max_length" in param and len(raw_val) > param["max_length"]: raise ParameterError( f"Value for {param_name} is too long", param_info=f"Length: {len(raw_val)} characters", suggestion=f"Maximum allowed length is {param['max_length']} characters." ) # Check if value is in allowed options if "options" in param and raw_val not in param["options"]: raise ParameterError( f"Invalid option for {param_name}", param_info=f"Received: {raw_val}", suggestion=f"Allowed options are: {', '.join(param['options'])}" ) validated_params[param_name] = raw_val elif param["type"] == "select": if "options" in param and raw_val not in param["options"]: raise ParameterError( f"Invalid selection for {param_name}", param_info=f"Received: {raw_val}", suggestion=f"Please select one of: {', '.join(param['options'])}" ) validated_params[param_name] = raw_val else: # For unrecognized types, just pass through the value validated_params[param_name] = raw_val return validated_params # --- Plugin Registry --- # Define all available quantum simulation plugins PLUGINS = { 'auth': { 'name': 'Post-Quantum Authentication', 'description': 'Simulate a lattice-based authentication system that remains secure against quantum computer attacks, based on the Ring-LWE problem.', 'icon': 'fa-lock', 'category': 'security', 'parameters': [ {'name': 'username', 'type': 'str', 'default': 'Bob', 'description': 'Username for authentication'}, {'name': 'noise', 'type': 'float', 'default': 0.0, 'description': 'Noise level (0.0 - 0.2)',"min": 0, "max": 0.2}, {'name': 'dimension', 'type': 'int', 'default': 4, 'description': 'Lattice dimension parameter',"min": 1, "max": 32} ], 'function': generate_quantum_fingerprint_cirq, # Standardized runner to match other plugins and Socket.IO flow 'run': lambda p: run_plugin( generate_quantum_fingerprint_cirq, _plugin_key="auth", data=p.get("username", "Bob"), num_qubits=p.get("dimension", 4) ) }, "bb84": { "name": "BB84 Protocol Simulation", "description": "Simulate the BB84 quantum key distribution protocol with realistic physical effects.", "icon": "fa-key", "category": "cryptography", "parameters": [ {"name": "num_bits", "type": "int", "default": 10, "description": "Number of bits to simulate", "min": 1, "max": 16}, {"name": "distance_km", "type": "float", "default": 0.0, "description": "Distance between Alice and Bob (km)", "min": 0.0, "max": 1000.0}, {"name": "hardware_type", "type": "select", "default": "fiber", "description": "Hardware type (fiber, satellite, trapped_ion)", "options": ["fiber", "satellite", "trapped_ion"]}, {"name": "noise", "type": "float", "default": 0.0, "description": "Additional noise probability", "min": 0.0, "max": 0.3}, {"name": "eve_present", "type": "bool", "default": False, "description": "Eavesdropper present"}, {"name": "eve_strategy", "type": "select", "default": "intercept_resend", "description": "Eavesdropper strategy (intercept_resend, beam_splitting, trojan)", "options": ["intercept_resend", "beam_splitting", "trojan_horse"]}, {"name": "detailed_simulation", "type": "bool", "default": True, "description": "Run detailed quantum simulation"} ], "run": lambda p: run_plugin(bb84_protocol_cirq, _plugin_key="bb84", num_bits=p["num_bits"], distance_km=p["distance_km"], hardware_type=p["hardware_type"], eve_present=p["eve_present"] if isinstance(p["eve_present"], bool) else p["eve_present"].lower() == "true", eve_strategy=p["eve_strategy"], detailed_simulation=p["detailed_simulation"] if isinstance(p["detailed_simulation"], bool) else p["detailed_simulation"].lower() == "true", noise_prob=p["noise"]) }, "shor": { "name": "Shor's Code Simulation", "description": "Simulate quantum error correction using Shor's code.", "icon": "fa-shield-alt", "category": "error-correction", "parameters": [ {"name": "noise", "type": "float", "default": 0.01, "description": "Noise probability", "min": 0.0, "max": 0.3} ], "run": lambda p: run_plugin(run_shor_code, _plugin_key="shor", noise_prob=p["noise"]) }, "grover": { "name": "Grover's Algorithm Simulation", "description": "Simulate Grover's search algorithm.", "icon": "fa-search", "category": "algorithms", "parameters": [ {"name": "n", "type": "int", "default": 3, "description": "Number of qubits", "min": 1, "max": 8}, {"name": "target_state", "type": "str", "default": "101", "description": "Target state (binary)", "max_length": 8}, {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", "min": 0.0, "max": 0.3} ], "run": lambda p: run_plugin(run_grover, _plugin_key="grover", n=p["n"], target_state=p["target_state"], noise_prob=p["noise"]) }, "handshake": { "name": "Quantum Handshake Simulation", "description": "Simulate a quantum handshake using entangled Bell pairs.", "icon": "fa-handshake", "category": "protocols", "parameters": [ {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", "min": 0.0, "max": 0.3} ], "run": lambda p: run_plugin(handshake_cirq, _plugin_key="handshake", noise_prob=p["noise"]) }, "network": { "name": "Entanglement Swapping Simulation", "description": "Simulate a quantum network using entanglement swapping.", "icon": "fa-network-wired", "category": "protocols", "parameters": [ {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", "min": 0.0, "max": 0.3} ], "run": lambda p: run_plugin(entanglement_swapping_cirq, _plugin_key="network", noise_prob=p["noise"]) }, "qrng": { "name": "Enhanced Quantum Random Number Generator", "description": "Generate truly random numbers using multiple quantum sources with advanced statistical analysis.", "icon": "fa-dice", "category": "utilities", "parameters": [ {"name": "num_bits", "type": "int", "default": 8, "description": "Number of quantum bits to generate", "min": 1, "max": 32}, {"name": "source_type", "type": "select", "default": "superposition", "description": "Quantum randomness source", "options": ["superposition", "vacuum_fluctuation", "entanglement"]}, {"name": "noise_level", "type": "float", "default": 0.0, "description": "Hardware noise level", "min": 0.0, "max": 0.3}, {"name": "enable_post_processing", "type": "bool", "default": False, "description": "Apply bias correction"}, {"name": "hardware_simulation", "type": "bool", "default": False, "description": "Add timing delays"} ], "run": lambda p: run_plugin(generate_random_number_cirq, _plugin_key="qrng", num_bits=p["num_bits"], source_type=p["source_type"], noise_level=p["noise_level"], enable_post_processing=p["enable_post_processing"] if isinstance(p["enable_post_processing"], bool) else p["enable_post_processing"].lower() == "true", hardware_simulation=p["hardware_simulation"] if isinstance(p["hardware_simulation"], bool) else p["hardware_simulation"].lower() == "true") }, "teleport": { "name": "Quantum Teleportation Simulation", "description": "Simulate quantum teleportation protocol.", "icon": "fa-atom", "category": "protocols", "parameters": [ {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", "min": 0.0, "max": 0.3} ], "run": lambda p: run_plugin(teleportation_circuit, _plugin_key="teleport", noise_prob=p["noise"]) }, "vqe": { "name": "Variational Quantum Eigensolver (VQE)", "description": "Simulate VQE to find the ground state energy of a hydrogen molecule with quantum chemical accuracy.", "icon": "fa-wave-square", "category": "algorithms", "parameters": [ {"name": "num_qubits", "type": "int", "default": 2, "description": "Number of qubits", "min": 2, "max": 4}, {"name": "noise_prob", "type": "float", "default": 0.01, "description": "Noise probability", "min": 0.0, "max": 0.3}, {"name": "max_iter", "type": "int", "default": 3, "description": "Maximum iterations", "min": 1, "max": 5}, {"name": "bond_distance", "type": "float", "default": 0.7414, "description": "H-H bond distance (Å)", "min": 0.0, "max": 2.5} ], "run": lambda p: run_plugin(run_vqe, _plugin_key="vqe", num_qubits=p["num_qubits"], noise_prob=p["noise_prob"], max_iter=p["max_iter"], bond_distance=p["bond_distance"]) }, "quantum_decryption_grover": { "name": "Quantum Decryption via Grover Key Search", "description": "Use Grover's algorithm to search for a secret key.", "icon": "fa-unlock", "category": "cryptography", "parameters": [ {"name": "key", "type": "int", "default": 5, "description": "Secret key (integer)", "min": 0, "max": 255}, {"name": "num_bits", "type": "int", "default": 4, "description": "Number of bits (search space)", "min": 1, "max": 8}, {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", "min": 0.0, "max": 0.3} ], "run": lambda p: run_plugin(grover_key_search, _plugin_key="quantum_decryption_grover", key=p["key"], num_bits=p["num_bits"], noise_prob=p["noise"]) }, "quantum_decryption_shor": { "name": "Quantum Decryption via Shor Factorization", "description": "Simulate quantum decryption using Shor's code simulation.", "icon": "fa-key", "category": "cryptography", "parameters": [ {"name": "N", "type": "int", "default": 15, "description": "Composite number", "min": 4, "max": 100} ], "run": lambda p: run_plugin(shor_factorization, _plugin_key="quantum_decryption_shor", N=p["N"]) }, "deutsch_jozsa": { "name": "Deutsch-Jozsa Algorithm", "description": "Determine if a function is constant or balanced with a single quantum query.", "icon": "fa-balance-scale", "category": "algorithms", "parameters": [ {"name": "n_qubits", "type": "int", "default": 3, "description": "Number of input qubits", "min": 1, "max": 8}, {"name": "oracle_type", "type": "str", "default": "random", "description": "Oracle type: constant_0, constant_1, balanced, or random", "options": ["constant_0", "constant_1", "balanced", "random"], "max_length": 10}, {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", "min": 0.0, "max": 0.3} ], "run": lambda p: run_plugin(deutsch_jozsa_cirq, _plugin_key="deutsch_jozsa", n_qubits=p["n_qubits"], oracle_type=p["oracle_type"], noise_prob=p["noise"]) }, "qft": { "name": "Quantum Fourier Transform", "description": "Implement the quantum analogue of the discrete Fourier transform.", "icon": "fa-wave-square", "category": "algorithms", "parameters": [ {"name": "n_qubits", "type": "int", "default": 3, "description": "Number of qubits", "min": 1, "max": 8}, {"name": "input_state", "type": "str", "default": "010", "description": "Input state (binary)", "max_length": 8}, {"name": "include_inverse", "type": "str", "default": "False", "description": "Include inverse QFT", "options": ["True", "False"], "max_length": 5}, {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", "min": 0.0, "max": 0.3} ], "run": lambda p: run_plugin(run_qft, _plugin_key="qft", n_qubits=p["n_qubits"], input_state=p["input_state"], include_inverse=p["include_inverse"].lower() == "true", noise_prob=p["noise"]) }, "phase_estimation": { "name": "Quantum Phase Estimation", "description": "Estimate eigenvalues of unitary operators with applications in quantum computing.", "icon": "fa-ruler-combined", "category": "algorithms", "parameters": [ {"name": "precision_bits", "type": "int", "default": 3, "description": "Number of bits of precision", "min": 1, "max": 6}, {"name": "target_phase", "type": "float", "default": 0.125, "description": "Target phase to estimate (0-1)", "min": 0.0, "max": 1.0}, {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", "min": 0.0, "max": 0.3} ], "run": lambda p: run_plugin(run_phase_estimation, _plugin_key="phase_estimation", precision_bits=p["precision_bits"], target_phase=p["target_phase"], noise_prob=p["noise"]) }, "qaoa": { "name": "Quantum Approximate Optimization Algorithm", "description": "Solve combinatorial optimization problems like MaxCut using a hybrid quantum-classical approach.", "icon": "fa-project-diagram", "category": "optimization", "parameters": [ {"name": "n_nodes", "type": "int", "default": 4, "description": "Number of nodes in the graph", "min": 2, "max": 8}, {"name": "edge_probability", "type": "float", "default": 0.5, "description": "Probability of edge creation", "min": 0.1, "max": 1.0}, {"name": "p_layers", "type": "int", "default": 1, "description": "Number of QAOA layers", "min": 1, "max": 3}, {"name": "noise", "type": "float", "default": 0.0, "description": "Noise probability", "min": 0.0, "max": 0.3}, {"name": "num_samples", "type": "int", "default": 100, "description": "Number of samples", "min": 10, "max": 500} ], "run": lambda p: run_plugin(run_qaoa, _plugin_key="qaoa", n_nodes=p["n_nodes"], edge_probability=p["edge_probability"], p_layers=p["p_layers"], noise_prob=p["noise"], num_samples=p["num_samples"]) } } def get_template_path(plugin_key): """ Returns the appropriate template file path for the plugin's educational content. This function only returns the path, it doesn't extract content. """ templates = { 'bb84': 'educational/bb84.html', 'teleport': 'educational/teleport.html', 'grover': 'educational/grover.html', 'handshake': 'educational/handshake.html', 'auth': 'educational/auth.html', 'network': 'educational/network.html', 'qrng': 'educational/qrng.html', 'shor': 'educational/shor.html', 'vqe': 'educational/vqe.html', 'quantum_decryption_grover': 'educational/grover.html', 'quantum_decryption_shor': 'educational/shor.html', 'deutsch_jozsa': 'educational/deutsch_jozsa.html', 'qft': 'educational/qft.html', 'phase_estimation': 'educational/phase_estimation.html', 'qaoa': 'educational/qaoa.html', } # Return the template path or a default template_name = templates.get(plugin_key, 'educational/default.html') return os.path.join('templates', template_name) def extract_educational_content(template_path): """ Extracts the full educational content from a template file. Looks for content between and """ try: if not os.path.exists(template_path): logger.warning(f"Template file not found: {template_path}") return None with open(template_path, 'r') as f: content = f.read() # Use regex to extract content between markers content_pattern = re.compile(r'(.*?)', re.DOTALL) match = content_pattern.search(content) if match: return match.group(1).strip() else: logger.warning(f"Educational content markers not found in {template_path}") return None except Exception as e: logger.error(f"Error extracting educational content: {e}") return None def extract_mini_explanation(template_path): """ Extracts mini explanation from a template file. """ try: if not os.path.exists(template_path): logger.warning(f"Template file not found: {template_path}") return None with open(template_path, 'r') as f: content = f.read() # Try all possible marker formats marker_patterns = [ r'(.*?)' ] for pattern in marker_patterns: mini_pattern = re.compile(pattern, re.DOTALL) match = mini_pattern.search(content) if match: logger.info(f"Found mini explanation using pattern: {pattern}") return match.group(1).strip() logger.warning(f"No mini explanation markers found in {template_path}") return None except Exception as e: logger.error(f"Error extracting mini explanation: {e}") return None def get_mini_explanation(plugin_key): """ Gets the mini explanation for a plugin, either from its template file or returns a default explanation based on the plugin info. """ # Get the template path template_path = get_template_path(plugin_key) # Try to extract from template file mini_content = extract_mini_explanation(template_path) if mini_content: return mini_content # If no template or no mini section, generate default mini explanation plugin = PLUGINS.get(plugin_key, {}) plugin_name = plugin.get('name', plugin_key.replace('_', ' ').title()) return f"""
{plugin_name}

This simulation demonstrates key quantum computing concepts including superposition, entanglement, and measurement.

Key Quantum Concept: Quantum simulations provide insight into quantum behavior without requiring actual quantum hardware.
""" def get_educational_content(plugin_key): """ Gets the educational content for a plugin's modal. """ template_path = get_template_path(plugin_key) print(f"Template path: {template_path}") return extract_educational_content(template_path) # --- Route handlers --- @app.route("/") def index(): # Serve React index.html return send_from_directory(app.static_folder, 'index.html') @app.route("/health") def health(): """Health check endpoint for Hugging Face Spaces""" return jsonify({"status": "healthy", "timestamp": datetime.now().isoformat()}), 200 @app.route("/favicon.ico") def favicon(): """Favicon endpoint""" try: return send_from_directory(app.static_folder, 'favicon.ico') except Exception: return "", 204 @app.route('/sitemap.xml') def sitemap(): """Generate the sitemap.xml file dynamically.""" # Get base URL from request or use production URL if app.config.get('ENV') == 'production': base_url = "https://quantumfieldkit.com" else: base_url = request.url_root.rstrip('/') # Current date for lastmod import datetime today = datetime.date.today().isoformat() # Build the sitemap XML string xml = ['', '', ' ', f' {base_url}/', f' {today}', ' weekly', ' 1.0', ' '] # Add glossary page xml.append(' ') xml.append(f' {base_url}/glossary') xml.append(f' {today}') xml.append(' monthly') xml.append(' 0.8') xml.append(' ') # Add all plugin pages with categorization for plugin_key, plugin in PLUGINS.items(): # Group plugins by category for better SEO category = plugin.get("category", "other") xml.append(' ') xml.append(f' {base_url}/plugin/{plugin_key}') xml.append(f' {today}') xml.append(' monthly') xml.append(' 0.8') xml.append(' ') # Add category pages if you implement them categories = set(plugin.get("category", "other") for plugin in PLUGINS.values()) for category in categories: xml.append(' ') xml.append(f' {base_url}/category/{category}') xml.append(f' {today}') xml.append(' monthly') xml.append(' 0.7') xml.append(' ') xml.append('') return app.response_class( response='\n'.join(xml), status=200, mimetype='application/xml' ) @app.route("/sitemap") def html_sitemap(): # Serve SPA index for HTML sitemap requests return send_from_directory(app.static_folder, 'index.html') @app.route('/robots.txt') def robots(): """Serve robots.txt dynamically.""" if app.config.get('ENV') == 'production': base_url = "https://quantumfieldkit.com" else: base_url = request.url_root.rstrip('/') robots_txt = f"""User-agent: * Allow: / Sitemap: {base_url}/sitemap.xml """ return app.response_class( response=robots_txt, status=200, mimetype='text/plain' ) @app.route("/category/") def category_view(category): return send_from_directory(app.static_folder, 'index.html') @app.route("/glossary") def glossary(): return send_from_directory(app.static_folder, 'index.html') @app.after_request def add_cache_headers(response): """Add cache headers to responses to improve performance.""" import datetime # Don't cache dynamic content if request.path.startswith('/static/'): # Cache static files for 1 week expiry = datetime.datetime.now() + datetime.timedelta(days=7) response.headers['Cache-Control'] = 'public, max-age=604800' response.headers['Expires'] = expiry.strftime("%a, %d %b %Y %H:%M:%S GMT") else: # Don't cache dynamic content response.headers['Cache-Control'] = 'no-store, no-cache, must-revalidate, max-age=0' return response @app.route("/plugin/", methods=["GET", "POST"]) def plugin_view(plugin_key): if request.method == 'POST': # Support legacy AJAX POSTs from static JS (if any) if plugin_key not in PLUGINS: return jsonify({"error": "Plugin not found"}), 404 plugin = PLUGINS[plugin_key] try: raw_params = {} for param in plugin.get('parameters', []): param_name = param['name'] if param_name in request.form: raw_params[param_name] = request.form.get(param_name) params = validate_parameters(plugin, raw_params) if 'run' not in plugin: return jsonify({"error": "Plugin runner not defined"}), 500 result = plugin['run'](params) return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 400 # GET falls back to React return send_from_directory(app.static_folder, 'index.html') @app.route("/api/plugins", methods=["GET"]) def api_plugins(): """Return a list of available plugins.""" categories = {} for key, plugin in PLUGINS.items(): category = plugin.get("category", "other") if category not in categories: categories[category] = [] # Only include serializable data serializable_plugin = { "key": key, "name": plugin.get("name"), "description": plugin.get("description"), "icon": plugin.get("icon"), "category": plugin.get("category"), "parameters": plugin.get("parameters", []) } categories[category].append(serializable_plugin) return jsonify(categories) @app.route("/api/plugin/", methods=["GET"]) def api_plugin(plugin_key): """Return details for a specific plugin.""" if plugin_key not in PLUGINS: return jsonify({"error": "Plugin not found"}), 404 plugin = PLUGINS[plugin_key] # Only include serializable data serializable_plugin = { "key": plugin_key, "name": plugin.get("name"), "description": plugin.get("description"), "icon": plugin.get("icon"), "category": plugin.get("category"), "parameters": plugin.get("parameters", []) } return jsonify(serializable_plugin) @app.route("/api/run/", methods=["POST"]) def api_run_plugin(plugin_key): """Run a plugin simulation.""" if plugin_key not in PLUGINS: return jsonify({"error": "Plugin not found"}), 404 try: params = request.get_json() plugin = PLUGINS[plugin_key] if 'run' in plugin and callable(plugin['run']): # Use standardized runner which already wraps results result = plugin['run'](params or {}) elif 'function' in plugin and callable(plugin['function']): # Fallback for legacy plugins result = run_plugin(plugin['function'], _plugin_key=plugin_key, **(params or {})) else: return jsonify({"error": "Plugin is misconfigured"}), 500 return jsonify(result) except Exception as e: return jsonify({"error": str(e)}), 400 @app.route("/api/glossary", methods=["GET"]) def api_glossary(): """Return glossary terms.""" # Prefer glossary shipped with the built SPA, then public, then repo static fallback candidates = [ os.path.join(os.path.dirname(__file__), 'frontend', 'build', 'data', 'glossary_terms.json'), os.path.join(os.path.dirname(__file__), 'frontend', 'public', 'data', 'glossary_terms.json'), os.path.join(os.path.dirname(__file__), 'static', 'data', 'glossary_terms.json'), ] terms_file = next((p for p in candidates if os.path.exists(p)), None) try: with open(terms_file, 'r') as f: terms = json.load(f) return jsonify(terms) except (FileNotFoundError, json.JSONDecodeError) as e: app.logger.error(f"Error loading glossary terms: {e}") return jsonify([{"term": "Qubit", "definition": "The fundamental unit of quantum information."}, {"term": "Superposition", "definition": "A quantum property allowing particles to exist in multiple states."}]) @app.route("/api/category/", methods=["GET"]) def api_category(category): """Return plugins in a specific category.""" plugins_in_category = {k: v for k, v in PLUGINS.items() if v.get("category", "other") == category} if not plugins_in_category: return jsonify({"error": "Category not found"}), 404 return jsonify(plugins_in_category) @app.route("/api/educational/", methods=["GET"]) def api_educational(plugin_key): """Return educational content for a specific plugin.""" if plugin_key not in PLUGINS: return jsonify({"error": "Plugin not found"}), 404 try: # Get the template path for the plugin template_path = get_template_path(plugin_key) # Extract educational content content = extract_educational_content(template_path) if content: return jsonify({"content": content}) else: return jsonify({"error": "Educational content not found"}), 404 except Exception as e: logger.error(f"Error loading educational content: {e}") return jsonify({"error": "Failed to load educational content"}), 500 @app.route("/api/validate/", methods=["POST", "OPTIONS"]) def api_validate(plugin_key): """Validate parameters for a specific plugin.""" if request.method == "OPTIONS": return jsonify({"status": "ok"}), 200 if plugin_key not in PLUGINS: return jsonify({"error": "Plugin not found"}), 404 try: params = request.get_json() plugin = PLUGINS[plugin_key] validated_params = validate_parameters(plugin, params) return jsonify({"status": "valid", "params": validated_params}) except ParameterError as e: return jsonify({ "status": "invalid", "error": e.message, "param_info": e.param_info, "suggestion": e.suggestion }), 400 except Exception as e: return jsonify({"status": "error", "error": str(e)}), 500 @app.route("/circuit") def circuit_designer(): # SPA route return send_from_directory(app.static_folder, 'index.html') # API routes for Circuit Designer @app.route("/api/circuit/gates", methods=["GET"]) def api_circuit_gates(): """Get available quantum gates for the circuit designer""" try: # Basic set of gates organized by categories gates = { "single_qubit": [ {"id": "x", "name": "X", "description": "Pauli-X gate (NOT gate)", "symbol": "X"}, {"id": "y", "name": "Y", "description": "Pauli-Y gate", "symbol": "Y"}, {"id": "z", "name": "Z", "description": "Pauli-Z gate", "symbol": "Z"}, {"id": "h", "name": "H", "description": "Hadamard gate", "symbol": "H"}, {"id": "s", "name": "S", "description": "Phase gate (S)", "symbol": "S"}, {"id": "t", "name": "T", "description": "π/8 gate (T)", "symbol": "T"}, {"id": "rx", "name": "RX", "description": "Rotation around X-axis", "symbol": "RX", "params": [{"name": "theta", "default": "π/2"}]}, {"id": "ry", "name": "RY", "description": "Rotation around Y-axis", "symbol": "RY", "params": [{"name": "theta", "default": "π/2"}]}, {"id": "rz", "name": "RZ", "description": "Rotation around Z-axis", "symbol": "RZ", "params": [{"name": "theta", "default": "π/2"}]} ], "multi_qubit": [ {"id": "cnot", "name": "CNOT", "description": "Controlled-NOT gate", "symbol": "CNOT", "qubits": 2}, {"id": "cz", "name": "CZ", "description": "Controlled-Z gate", "symbol": "CZ", "qubits": 2}, {"id": "swap", "name": "SWAP", "description": "SWAP gate", "symbol": "SWAP", "qubits": 2}, {"id": "ccx", "name": "Toffoli", "description": "Toffoli gate (CCX)", "symbol": "CCX", "qubits": 3}, {"id": "cswap", "name": "Fredkin", "description": "Fredkin gate (CSWAP)", "symbol": "CSWAP", "qubits": 3} ], "special": [ {"id": "measure", "name": "Measure", "description": "Measurement operation", "symbol": "M"}, {"id": "reset", "name": "Reset", "description": "Reset qubit to |0⟩", "symbol": "R"} ] } return jsonify(gates) except Exception as e: logger.error(f"Error getting available gates: {str(e)}") return jsonify({"error": "Failed to get available gates"}), 500 @app.route("/api/circuit/simulate", methods=["POST"]) def api_circuit_simulate(): """Simulate a quantum circuit""" try: data = request.get_json() if not data: return jsonify({"error": "No circuit data provided"}), 400 circuit = data.get("circuit") shots = data.get("shots", 1024) if not circuit: return jsonify({"error": "No circuit definition provided"}), 400 # Ensure shots is an integer and within reasonable limits try: shots = int(shots) if shots < 1 or shots > 10000: return jsonify({"error": "Shots must be between 1 and 10000"}), 400 except ValueError: return jsonify({"error": "Shots must be an integer"}), 400 # This is where we'd build and simulate the circuit using Cirq # For now, we'll use mock data import numpy as np import cirq # Convert JSON circuit representation to a Cirq circuit cirq_circuit = cirq.Circuit() qubits = [cirq.LineQubit(i) for i in range(circuit.get("num_qubits", 3))] # Add gates to circuit based on the circuit data # This would need to be implemented based on your circuit JSON schema # For now, return mock simulation results result = { "state_vector": { "amplitudes": [ {"state": "000", "amplitude": 1.0, "probability": 1.0} # Additional states would be added here in a real implementation ], "visualization_data": {} # Visualization data would go here }, "measurements": { "counts": {"000": shots}, "probabilities": {"000": 1.0} }, "circuit_representation": { "qubits": circuit.get("num_qubits", 3), "depth": len(circuit.get("gates", [])), "gates": circuit.get("gates", []) } } return jsonify(result) except Exception as e: logger.error(f"Error simulating circuit: {str(e)}\n{traceback.format_exc()}") return jsonify({"error": f"Failed to simulate circuit: {str(e)}"}), 500 @app.route("/api/circuit/save", methods=["POST"]) def api_circuit_save(): """Save a quantum circuit""" try: data = request.get_json() if not data: return jsonify({"error": "No circuit data provided"}), 400 # Here you would save the circuit to a database or file # For this example, we'll pretend to save it and return a mock ID circuit_id = datetime.now().strftime("%Y%m%d%H%M%S") return jsonify({ "id": circuit_id, "name": data.get("name", "Unnamed Circuit"), "saved": True, "timestamp": datetime.now().isoformat() }) except Exception as e: logger.error(f"Error saving circuit: {str(e)}") return jsonify({"error": f"Failed to save circuit: {str(e)}"}), 500 @app.route("/api/circuit/load/", methods=["GET"]) def api_circuit_load(circuit_id): """Load a saved quantum circuit""" try: # Here you would load the circuit from a database or file # For this example, we'll return a mock circuit mock_circuit = { "id": circuit_id, "name": f"Circuit {circuit_id}", "num_qubits": 3, "gates": [ {"type": "h", "targets": [0]}, {"type": "cnot", "controls": [0], "targets": [1]}, {"type": "measure", "targets": [0, 1]} ], "created_at": datetime.now().isoformat() } return jsonify(mock_circuit) except Exception as e: logger.error(f"Error loading circuit {circuit_id}: {str(e)}") return jsonify({"error": f"Failed to load circuit: {str(e)}"}), 500 @app.route("/api/circuit/saved", methods=["GET"]) def api_circuit_saved(): """Get list of saved circuits""" try: # Here you would query a database for saved circuits # For this example, we'll return mock data mock_saved_circuits = [ {"id": "20230512120000", "name": "Bell State", "num_qubits": 2, "created_at": "2023-05-12T12:00:00"}, {"id": "20230513130000", "name": "GHZ State", "num_qubits": 3, "created_at": "2023-05-13T13:00:00"}, {"id": "20230514140000", "name": "Quantum Teleportation", "num_qubits": 3, "created_at": "2023-05-14T14:00:00"} ] return jsonify(mock_saved_circuits) except Exception as e: logger.error(f"Error getting saved circuits: {str(e)}") return jsonify({"error": f"Failed to get saved circuits: {str(e)}"}), 500 @app.route("/api/circuit/export", methods=["POST"]) def api_circuit_export(): """Export circuit to code (Cirq, Qiskit, etc.)""" try: data = request.get_json() if not data: return jsonify({"error": "No circuit data provided"}), 400 circuit = data.get("circuit") format = data.get("format", "cirq").lower() if not circuit: return jsonify({"error": "No circuit definition provided"}), 400 if format not in ["cirq", "qiskit"]: return jsonify({"error": f"Unsupported export format: {format}"}), 400 # Generate code based on the circuit definition num_qubits = circuit.get("num_qubits", 3) if format == "cirq": code = f""" import cirq import numpy as np # Create a circuit with {num_qubits} qubits circuit = cirq.Circuit() # Define qubits qubits = [cirq.LineQubit(i) for i in range({num_qubits})] # Add gates """ # Here you would iterate through the gates and add them to the code code += """ # Simulate simulator = cirq.Simulator() result = simulator.simulate(circuit) print("Final state vector:") print(result.final_state_vector) """ elif format == "qiskit": code = f""" from qiskit import QuantumCircuit, Aer, execute import numpy as np # Create a circuit with {num_qubits} qubits and {num_qubits} classical bits qc = QuantumCircuit({num_qubits}, {num_qubits}) # Add gates """ # Here you would iterate through the gates and add them to the code code += """ # Simulate simulator = Aer.get_backend('statevector_simulator') result = execute(qc, simulator).result() statevector = result.get_statevector() print("Final state vector:") print(statevector) """ return jsonify({ "code": code, "format": format }) except Exception as e: logger.error(f"Error exporting circuit: {str(e)}") return jsonify({"error": f"Failed to export circuit: {str(e)}"}), 500 # --- Socket.IO event handlers for real-time updates --- @socketio.on('connect') def handle_connect(): """Handle client connection.""" logger.info(f"Client connected: {request.sid}") @socketio.on('disconnect') def handle_disconnect(): """Handle client disconnection.""" logger.info(f"Client disconnected: {request.sid}") @socketio.on('run_plugin') def handle_run_plugin(data): """Run a plugin and emit progress updates.""" plugin_key = data.get('plugin_key') raw_params = data.get('params', {}) if plugin_key not in PLUGINS: emit('plugin_error', {'error': f"Plugin '{plugin_key}' not found"}) return plugin = PLUGINS[plugin_key] try: # Validate parameters params = validate_parameters(plugin, raw_params) # Add plugin key for better error reporting params['_plugin_key'] = plugin_key # Set up progress tracking session_id = request.sid def progress_callback(step, total, message): progress = int(100 * step / total) if total > 0 else 0 emit('plugin_progress', { 'plugin_key': plugin_key, 'progress': progress, 'message': message }) # Add progress callback to parameters if supported params['progress_callback'] = progress_callback # Run the plugin and emit the result emit('plugin_start', {'plugin_key': plugin_key}) result = plugin["run"](params) emit('plugin_result', {'plugin_key': plugin_key, 'result': result}) except SimulationError as e: # Handle validation errors error_msg = str(e) if hasattr(e, 'suggestion') and e.suggestion: error_msg += f"\n\nSuggestion: {e.suggestion}" emit('plugin_error', {'plugin_key': plugin_key, 'error': error_msg}) except Exception as e: emit('plugin_error', {'plugin_key': plugin_key, 'error': str(e)}) # --- Error handling --- @app.errorhandler(404) def page_not_found(e): # SPA fallback try: return send_from_directory(app.static_folder, 'index.html') except Exception: return jsonify({"error": "Not found"}), 404 @app.errorhandler(500) def server_error(e): return jsonify({"error": "Server error"}), 500 # --- Main entry point --- if __name__ == "__main__": # Check if we're running in a containerized environment is_containerized = os.environ.get('CONTAINERIZED', 'false').lower() == 'true' port = int(os.environ.get('PORT', 5000)) host = os.environ.get('HOST', '127.0.0.1') if is_containerized or os.environ.get('FLASK_ENV') == 'production': # In containerized environments, use standard Flask development server # This is more compatible with platforms like Hugging Face Spaces from werkzeug.serving import run_simple print(f"Starting Flask app on {host}:{port} (containerized mode)") app.run(host=host, port=port, debug=False) else: # SSL Configuration for development if os.environ.get('FLASK_ENV') == 'production' and os.path.exists('cert.pem') and os.path.exists('key.pem'): socketio.run(app, debug=False, host=host, port=port, certfile='cert.pem', keyfile='key.pem') else: # Development environment with SocketIO socketio.run(app, debug=True, host=host, port=port)