Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Quantum Circuit Optimization Environment Implementation. | |
| Architecture: | |
| - Dynamically generated circuits across 3 difficulty tiers to challenge frontier models. | |
| - Instance-isolated PRNG (seeding) for strict reproducibility in server environments. | |
| - Relative Compression Grading: grading math lives exclusively in graders.py. | |
| The class methods grade_easy / grade_medium / grade_hard are thin delegates | |
| that call graders.py — there is no duplicated math here. | |
| - Advanced action tracking: medium grader rewards agents that discover | |
| algebraic identities (H-X-H=Z, CNOT-SWAP=CZ) beyond simple cancellations. | |
| """ | |
| import os | |
| import random | |
| from uuid import uuid4 | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import EnvironmentMetadata, State | |
| from quantum_openenv_env.models import QuantumAction, QuantumGate, QuantumObservation | |
| # Grading math lives here and ONLY here — environment methods delegate to these | |
| from quantum_openenv_env.server.graders import grade_easy, grade_medium, grade_hard | |
| # ============================================================================ | |
| # Dynamic Task Configurations | |
| # ============================================================================ | |
| class TaskConfig: | |
| def __init__(self, name: str, num_qubits: int, num_pairs: int, num_noise: int, use_entangling: bool): | |
| self.name = name | |
| self.num_qubits = num_qubits | |
| self.num_pairs = num_pairs | |
| self.num_noise = num_noise | |
| self.use_entangling = use_entangling | |
| def generate_circuit(self, rng: random.Random) -> list[QuantumGate]: | |
| single_gates = ["H", "X", "Y", "Z"] | |
| multi_gates = ["CNOT", "SWAP"] | |
| circuit = [] | |
| for _ in range(self.num_noise): | |
| if self.use_entangling and self.num_qubits > 1 and rng.random() > 0.5: | |
| q1, q2 = rng.sample(range(self.num_qubits), 2) | |
| circuit.append(QuantumGate(name=rng.choice(multi_gates), target_qubits=[q1, q2])) | |
| else: | |
| q = rng.randint(0, self.num_qubits - 1) | |
| circuit.append(QuantumGate(name=rng.choice(single_gates), target_qubits=[q])) | |
| for _ in range(self.num_pairs): | |
| if self.use_entangling and self.num_qubits > 1 and rng.random() > 0.5: | |
| gate_name = rng.choice(multi_gates) | |
| qubits = rng.sample(range(self.num_qubits), 2) | |
| else: | |
| gate_name = rng.choice(single_gates) | |
| qubits = [rng.randint(0, self.num_qubits - 1)] | |
| gate1 = QuantumGate(name=gate_name, target_qubits=qubits) | |
| gate2 = QuantumGate(name=gate_name, target_qubits=qubits) | |
| insert_idx_1 = rng.randint(0, len(circuit)) | |
| circuit.insert(insert_idx_1, gate1) | |
| insert_idx_2 = rng.randint(insert_idx_1, len(circuit)) | |
| circuit.insert(insert_idx_2, gate2) | |
| if self.use_entangling and self.num_qubits > 1: | |
| num_patterns = 1 if self.name == "medium" else 2 # hard gets 2 | |
| for _ in range(num_patterns): | |
| if rng.random() > 0.3: # 70% chance per pattern, keeps it non-deterministic | |
| q1, q2 = rng.sample(range(self.num_qubits), 2) | |
| insert_at = rng.randint(0, len(circuit)) | |
| circuit.insert(insert_at, QuantumGate(name="CNOT", target_qubits=[q1, q2])) | |
| circuit.insert(insert_at + 1, QuantumGate(name="CNOT", target_qubits=[q2, q1])) | |
| circuit.insert(insert_at + 2, QuantumGate(name="CNOT", target_qubits=[q1, q2])) | |
| return circuit | |
| TASK_CONFIGS = { | |
| "easy": TaskConfig("easy", num_qubits=2, num_pairs=8, num_noise=4, use_entangling=False), | |
| "medium": TaskConfig("medium", num_qubits=4, num_pairs=12, num_noise=8, use_entangling=True), | |
| "hard": TaskConfig("hard", num_qubits=6, num_pairs=25, num_noise=20, use_entangling=True), | |
| } | |
| TASKS = ["easy", "medium", "hard"] | |
| GRADERS = { | |
| "easy": grade_easy, | |
| "medium": grade_medium, | |
| "hard": grade_hard, | |
| } | |
| # ============================================================================ | |
| # Environment | |
| # ============================================================================ | |
| class QuantumCircuitOptimizationEnvironment(Environment): | |
| """ | |
| Quantum Circuit Optimization RL Environment. | |
| The agent acts as a quantum compiler, reducing circuit depth by applying | |
| mathematical identities and commutativity rules across 3 difficulty tiers. | |
| Observation: | |
| circuit - Current list of QuantumGate objects | |
| gate_count - Number of gates remaining | |
| num_qubits - System qubit count | |
| done - Episode terminal flag | |
| reward - Last step reward | |
| prompt - Human-readable state for the web UI playground | |
| metadata - task, initial_count, step, seed, used_advanced_actions | |
| Action types: | |
| 1 - Cancel identical self-inverse gate pairs (+1.0) | |
| 2 - Swap adjacent commuting gates (different qubits) (-0.05) | |
| 3 - Replace H-X-H sequence with Z gate (+2.0) | |
| 4 - Replace CNOT-SWAP sequence with CZ gate (+1.0) | |
| Invalid actions (-0.1) | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| SELF_INVERSE_GATES = { | |
| "H", "X", "Y", "Z", "CNOT", "CX", "CZ", "SWAP", | |
| "CCX", "TOFFOLI", "CSWAP", "FREDKIN" | |
| } | |
| def __init__(self, task: str = "random", seed: int = None): | |
| if task == "random": | |
| task = os.getenv("QUANTUM_TASK", "random") | |
| self.mode = task | |
| if self.mode != "random" and self.mode not in TASK_CONFIGS: | |
| raise ValueError( | |
| f"Unknown task: {task}. Must be 'random' or one of {list(TASK_CONFIGS.keys())}" | |
| ) | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._reset_count = 0 | |
| self.current_seed = seed | |
| self.rng = random.Random(self.current_seed) if self.current_seed is not None else random.Random() | |
| self.task_name = "easy" | |
| self.task_config = TASK_CONFIGS["easy"] | |
| self._circuit: list[QuantumGate] = [] | |
| self._initial_gate_count = 0 | |
| self._used_advanced_actions = False | |
| # ============================================================================ | |
| # OpenEnv API | |
| # ============================================================================ | |
| def reset(self, seed: int = None, **kwargs) -> QuantumObservation: | |
| """Reset the environment to a fresh circuit for the configured task.""" | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._reset_count += 1 | |
| self._used_advanced_actions = False | |
| if seed is not None: | |
| self.current_seed = seed | |
| self.rng = random.Random(self.current_seed) | |
| if self.mode == "random": | |
| self.task_name = self.rng.choice(TASKS) | |
| else: | |
| self.task_name = self.mode | |
| self.task_config = TASK_CONFIGS[self.task_name] | |
| self._circuit = self.task_config.generate_circuit(self.rng) | |
| self._initial_gate_count = len(self._circuit) | |
| return QuantumObservation( | |
| circuit=self._circuit, | |
| gate_count=len(self._circuit), | |
| num_qubits=self.task_config.num_qubits, | |
| done=False, | |
| reward=0.0, | |
| prompt=self._generate_prompt(), | |
| metadata={ | |
| "task": self.task_name, | |
| "reset_count": self._reset_count, | |
| "initial_count": self._initial_gate_count, | |
| "seed": self.current_seed, | |
| "used_advanced_actions": False, | |
| }, | |
| ) | |
| def step(self, action: QuantumAction, **kwargs) -> QuantumObservation: # type: ignore[override] | |
| """Execute one action in the environment.""" | |
| self._state.step_count += 1 | |
| target_index = action.target_index | |
| action_type = action.action_type | |
| reward = -0.1 | |
| action_result = "invalid" | |
| if target_index < 0 or target_index >= len(self._circuit): | |
| return self._build_observation(reward, "invalid_index") | |
| gate_at_index = self._circuit[target_index] | |
| active_qubits = set(gate_at_index.target_qubits) | |
| # ACTION 1: Cancel Identical Self-Inverse Gates | |
| if action_type == 1: | |
| next_gate_index = None | |
| for j in range(target_index + 1, len(self._circuit)): | |
| next_qubits = set(self._circuit[j].target_qubits) | |
| if active_qubits.intersection(next_qubits): | |
| next_gate_index = j | |
| break | |
| if (next_gate_index is not None and | |
| self._circuit[next_gate_index].name == gate_at_index.name and | |
| self._circuit[next_gate_index].target_qubits == gate_at_index.target_qubits and | |
| gate_at_index.name in self.SELF_INVERSE_GATES): | |
| self._circuit.pop(next_gate_index) | |
| self._circuit.pop(target_index) | |
| reward = 1.0 | |
| action_result = "cancelled_identical" | |
| # ACTION 2: Swap Commuting Gates | |
| elif action_type == 2: | |
| if target_index + 1 < len(self._circuit): | |
| next_gate = self._circuit[target_index + 1] | |
| next_qubits = set(next_gate.target_qubits) | |
| if not active_qubits.intersection(next_qubits): | |
| self._circuit[target_index], self._circuit[target_index + 1] = ( | |
| self._circuit[target_index + 1], | |
| self._circuit[target_index], | |
| ) | |
| reward = -0.05 | |
| action_result = "swapped_commuting" | |
| # ACTION 3: Replace H-X-H with Z (advanced identity) | |
| elif action_type == 3: | |
| if target_index + 2 < len(self._circuit): | |
| g1 = self._circuit[target_index] | |
| g2 = self._circuit[target_index + 1] | |
| g3 = self._circuit[target_index + 2] | |
| if (g1.name == "H" and g2.name == "X" and g3.name == "H" and | |
| g1.target_qubits == g2.target_qubits == g3.target_qubits): | |
| self._circuit.pop(target_index + 2) | |
| self._circuit.pop(target_index + 1) | |
| self._circuit[target_index] = QuantumGate( | |
| name="Z", target_qubits=g1.target_qubits | |
| ) | |
| reward = 2.0 | |
| action_result = "identity_hxh_to_z" | |
| self._used_advanced_actions = True | |
| # ACTION 4: Replace CNOT(a,b)→CNOT(b,a)→CNOT(a,b) with SWAP (advanced identity) | |
| elif action_type == 4: | |
| if target_index + 2 < len(self._circuit): | |
| g1 = self._circuit[target_index] | |
| g2 = self._circuit[target_index + 1] | |
| g3 = self._circuit[target_index + 2] | |
| qubits_ab = g1.target_qubits # e.g. [0, 1] | |
| qubits_ba = list(reversed(g1.target_qubits)) # e.g. [1, 0] | |
| if (g1.name == "CNOT" and g2.name == "CNOT" and g3.name == "CNOT" and | |
| g1.target_qubits == g3.target_qubits and | |
| g2.target_qubits == qubits_ba): | |
| self._circuit.pop(target_index + 2) | |
| self._circuit.pop(target_index + 1) | |
| self._circuit[target_index] = QuantumGate( | |
| name="SWAP", target_qubits=g1.target_qubits | |
| ) | |
| reward = 2.0 # saves 2 gates, same as H-X-H identity | |
| action_result = "identity_3cnot_to_swap" | |
| self._used_advanced_actions = True | |
| return self._build_observation(reward, action_result) | |
| def state(self) -> State: | |
| return self._state | |
| def get_metadata(self) -> EnvironmentMetadata: | |
| """Return metadata shown in the HF Space web UI and consumed by platform agent.""" | |
| return EnvironmentMetadata( | |
| name="Quantum Circuit Optimizer", | |
| description=( | |
| "RL environment where an agent acts as a quantum compiler, " | |
| "reducing circuit depth by applying gate cancellation, " | |
| "commutativity swaps, and algebraic identities " | |
| "(H·X·H = Z, CNOT·SWAP = CZ) across 3 difficulty tiers " | |
| "(2-qubit easy → 4-qubit medium → 6-qubit hard with deep entanglement)." | |
| ), | |
| version="0.1.0", | |
| ) | |
| # ============================================================================ | |
| # Grader methods — thin delegates to graders.py (single source of truth) | |
| # No math here. Change grader logic only in graders.py. | |
| # ============================================================================ | |
| def _make_grader_obs(self) -> QuantumObservation: | |
| """ | |
| Build a minimal observation for grader calls. | |
| No side effects — does not trigger dead-end check or prompt generation. | |
| Only carries the fields that graders.py actually reads from metadata. | |
| """ | |
| return QuantumObservation( | |
| circuit=self._circuit, | |
| gate_count=len(self._circuit), | |
| num_qubits=self.task_config.num_qubits, | |
| metadata={ | |
| "initial_count": self._initial_gate_count, | |
| "step": self._state.step_count, | |
| "used_advanced_actions": self._used_advanced_actions, | |
| }, | |
| ) | |
| def grade_easy(self) -> float: | |
| return grade_easy(self._make_grader_obs()) | |
| def grade_medium(self) -> float: | |
| return grade_medium(self._make_grader_obs()) | |
| def grade_hard(self) -> float: | |
| return grade_hard(self._make_grader_obs()) | |
| def grade(self) -> float: | |
| """Grade current state using the active task's grader.""" | |
| return GRADERS[self.task_name](self._make_grader_obs()) | |
| # ============================================================================ | |
| # Internal helpers | |
| # ============================================================================ | |
| def _build_observation(self, reward: float, action_result: str) -> QuantumObservation: | |
| max_steps_reached = self._state.step_count >= 150 | |
| is_done = max_steps_reached or self._is_circuit_dead_end() | |
| return QuantumObservation( | |
| circuit=self._circuit, | |
| gate_count=len(self._circuit), | |
| num_qubits=self.task_config.num_qubits, | |
| done=is_done, | |
| reward=reward, | |
| prompt=self._generate_prompt(), | |
| metadata={ | |
| "task": self.task_name, | |
| "action_result": action_result, | |
| "step": self._state.step_count, | |
| "initial_count": self._initial_gate_count, | |
| "seed": self.current_seed, | |
| "used_advanced_actions": self._used_advanced_actions, | |
| }, | |
| ) | |
| def _is_circuit_dead_end(self) -> bool: | |
| if len(self._circuit) == 0: | |
| return True | |
| for i in range(len(self._circuit)): | |
| curr_gate = self._circuit[i] | |
| active_qubits = set(curr_gate.target_qubits) | |
| for j in range(i + 1, len(self._circuit)): | |
| next_qubits = set(self._circuit[j].target_qubits) | |
| if active_qubits.intersection(next_qubits): | |
| next_gate = self._circuit[j] | |
| if (next_gate.name == curr_gate.name and | |
| next_gate.target_qubits == curr_gate.target_qubits and | |
| curr_gate.name in self.SELF_INVERSE_GATES): | |
| return False | |
| break | |
| for i in range(len(self._circuit) - 1): | |
| if not set(self._circuit[i].target_qubits).intersection( | |
| set(self._circuit[i + 1].target_qubits)): | |
| return False | |
| return True | |
| def _generate_prompt(self) -> str: | |
| """Generates a human-readable prompt for the Web UI playground.""" | |
| prompt_text = ( | |
| f"Quantum Circuit Optimizer ({self.task_name.upper()})\n\n" | |
| f"A quantum circuit on {self.task_config.num_qubits} qubits has been generated. " | |
| "Your goal is to compress it by finding logical reductions.\n\n" | |
| "ACTIONS:\n\n" | |
| "1: Cancel identical self-inverse gates (H, X, Y, Z, CNOT, SWAP).\n\n" | |
| "2: Swap adjacent commuting gates (gates not sharing qubits).\n\n" | |
| "3: Replace an H-X-H sequence with a Z gate.\n\n" | |
| "4: Replace CNOT(a,b)→CNOT(b,a)→CNOT(a,b) with a single SWAP gate.\n\n" | |
| "CURRENT CIRCUIT STATE:\n\n" | |
| ) | |
| if not self._circuit: | |
| prompt_text += "[Empty Circuit - Optimization Complete!]" | |
| else: | |
| gate_strings = [] | |
| for i, gate in enumerate(self._circuit): | |
| qubits = ",".join(str(q) for q in gate.target_qubits) | |
| gate_strings.append(f"[{i}]{gate.name}({qubits})") | |
| prompt_text += " ".join(gate_strings) | |
| return prompt_text |