# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ Quantum Circuit Optimization Environment Implementation. Architecture: - Dynamically generated circuits across 3 difficulty tiers to challenge frontier models. - Instance-isolated PRNG (seeding) for strict reproducibility in server environments. - Relative Compression Grading: grading math lives exclusively in graders.py. The class methods grade_easy / grade_medium / grade_hard are thin delegates that call graders.py — there is no duplicated math here. - Advanced action tracking: medium grader rewards agents that discover algebraic identities (H-X-H=Z, CNOT-SWAP=CZ) beyond simple cancellations. """ import os import random from uuid import uuid4 from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import EnvironmentMetadata, State from quantum_openenv_env.models import QuantumAction, QuantumGate, QuantumObservation # Grading math lives here and ONLY here — environment methods delegate to these from quantum_openenv_env.server.graders import grade_easy, grade_medium, grade_hard # ============================================================================ # Dynamic Task Configurations # ============================================================================ class TaskConfig: def __init__(self, name: str, num_qubits: int, num_pairs: int, num_noise: int, use_entangling: bool): self.name = name self.num_qubits = num_qubits self.num_pairs = num_pairs self.num_noise = num_noise self.use_entangling = use_entangling def generate_circuit(self, rng: random.Random) -> list[QuantumGate]: single_gates = ["H", "X", "Y", "Z"] multi_gates = ["CNOT", "SWAP"] circuit = [] for _ in range(self.num_noise): if self.use_entangling and self.num_qubits > 1 and rng.random() > 0.5: q1, q2 = rng.sample(range(self.num_qubits), 2) circuit.append(QuantumGate(name=rng.choice(multi_gates), target_qubits=[q1, q2])) else: q = rng.randint(0, self.num_qubits - 1) circuit.append(QuantumGate(name=rng.choice(single_gates), target_qubits=[q])) for _ in range(self.num_pairs): if self.use_entangling and self.num_qubits > 1 and rng.random() > 0.5: gate_name = rng.choice(multi_gates) qubits = rng.sample(range(self.num_qubits), 2) else: gate_name = rng.choice(single_gates) qubits = [rng.randint(0, self.num_qubits - 1)] gate1 = QuantumGate(name=gate_name, target_qubits=qubits) gate2 = QuantumGate(name=gate_name, target_qubits=qubits) insert_idx_1 = rng.randint(0, len(circuit)) circuit.insert(insert_idx_1, gate1) insert_idx_2 = rng.randint(insert_idx_1, len(circuit)) circuit.insert(insert_idx_2, gate2) if self.use_entangling and self.num_qubits > 1: num_patterns = 1 if self.name == "medium" else 2 # hard gets 2 for _ in range(num_patterns): if rng.random() > 0.3: # 70% chance per pattern, keeps it non-deterministic q1, q2 = rng.sample(range(self.num_qubits), 2) insert_at = rng.randint(0, len(circuit)) circuit.insert(insert_at, QuantumGate(name="CNOT", target_qubits=[q1, q2])) circuit.insert(insert_at + 1, QuantumGate(name="CNOT", target_qubits=[q2, q1])) circuit.insert(insert_at + 2, QuantumGate(name="CNOT", target_qubits=[q1, q2])) return circuit TASK_CONFIGS = { "easy": TaskConfig("easy", num_qubits=2, num_pairs=8, num_noise=4, use_entangling=False), "medium": TaskConfig("medium", num_qubits=4, num_pairs=12, num_noise=8, use_entangling=True), "hard": TaskConfig("hard", num_qubits=6, num_pairs=25, num_noise=20, use_entangling=True), } TASKS = ["easy", "medium", "hard"] GRADERS = { "easy": grade_easy, "medium": grade_medium, "hard": grade_hard, } # ============================================================================ # Environment # ============================================================================ class QuantumCircuitOptimizationEnvironment(Environment): """ Quantum Circuit Optimization RL Environment. The agent acts as a quantum compiler, reducing circuit depth by applying mathematical identities and commutativity rules across 3 difficulty tiers. Observation: circuit - Current list of QuantumGate objects gate_count - Number of gates remaining num_qubits - System qubit count done - Episode terminal flag reward - Last step reward prompt - Human-readable state for the web UI playground metadata - task, initial_count, step, seed, used_advanced_actions Action types: 1 - Cancel identical self-inverse gate pairs (+1.0) 2 - Swap adjacent commuting gates (different qubits) (-0.05) 3 - Replace H-X-H sequence with Z gate (+2.0) 4 - Replace CNOT-SWAP sequence with CZ gate (+1.0) Invalid actions (-0.1) """ SUPPORTS_CONCURRENT_SESSIONS: bool = True SELF_INVERSE_GATES = { "H", "X", "Y", "Z", "CNOT", "CX", "CZ", "SWAP", "CCX", "TOFFOLI", "CSWAP", "FREDKIN" } def __init__(self, task: str = "random", seed: int = None): if task == "random": task = os.getenv("QUANTUM_TASK", "random") self.mode = task if self.mode != "random" and self.mode not in TASK_CONFIGS: raise ValueError( f"Unknown task: {task}. Must be 'random' or one of {list(TASK_CONFIGS.keys())}" ) self._state = State(episode_id=str(uuid4()), step_count=0) self._reset_count = 0 self.current_seed = seed self.rng = random.Random(self.current_seed) if self.current_seed is not None else random.Random() self.task_name = "easy" self.task_config = TASK_CONFIGS["easy"] self._circuit: list[QuantumGate] = [] self._initial_gate_count = 0 self._used_advanced_actions = False # ============================================================================ # OpenEnv API # ============================================================================ def reset(self, seed: int = None, **kwargs) -> QuantumObservation: """Reset the environment to a fresh circuit for the configured task.""" self._state = State(episode_id=str(uuid4()), step_count=0) self._reset_count += 1 self._used_advanced_actions = False if seed is not None: self.current_seed = seed self.rng = random.Random(self.current_seed) if self.mode == "random": self.task_name = self.rng.choice(TASKS) else: self.task_name = self.mode self.task_config = TASK_CONFIGS[self.task_name] self._circuit = self.task_config.generate_circuit(self.rng) self._initial_gate_count = len(self._circuit) return QuantumObservation( circuit=self._circuit, gate_count=len(self._circuit), num_qubits=self.task_config.num_qubits, done=False, reward=0.0, prompt=self._generate_prompt(), metadata={ "task": self.task_name, "reset_count": self._reset_count, "initial_count": self._initial_gate_count, "seed": self.current_seed, "used_advanced_actions": False, }, ) def step(self, action: QuantumAction, **kwargs) -> QuantumObservation: # type: ignore[override] """Execute one action in the environment.""" self._state.step_count += 1 target_index = action.target_index action_type = action.action_type reward = -0.1 action_result = "invalid" if target_index < 0 or target_index >= len(self._circuit): return self._build_observation(reward, "invalid_index") gate_at_index = self._circuit[target_index] active_qubits = set(gate_at_index.target_qubits) # ACTION 1: Cancel Identical Self-Inverse Gates if action_type == 1: next_gate_index = None for j in range(target_index + 1, len(self._circuit)): next_qubits = set(self._circuit[j].target_qubits) if active_qubits.intersection(next_qubits): next_gate_index = j break if (next_gate_index is not None and self._circuit[next_gate_index].name == gate_at_index.name and self._circuit[next_gate_index].target_qubits == gate_at_index.target_qubits and gate_at_index.name in self.SELF_INVERSE_GATES): self._circuit.pop(next_gate_index) self._circuit.pop(target_index) reward = 1.0 action_result = "cancelled_identical" # ACTION 2: Swap Commuting Gates elif action_type == 2: if target_index + 1 < len(self._circuit): next_gate = self._circuit[target_index + 1] next_qubits = set(next_gate.target_qubits) if not active_qubits.intersection(next_qubits): self._circuit[target_index], self._circuit[target_index + 1] = ( self._circuit[target_index + 1], self._circuit[target_index], ) reward = -0.05 action_result = "swapped_commuting" # ACTION 3: Replace H-X-H with Z (advanced identity) elif action_type == 3: if target_index + 2 < len(self._circuit): g1 = self._circuit[target_index] g2 = self._circuit[target_index + 1] g3 = self._circuit[target_index + 2] if (g1.name == "H" and g2.name == "X" and g3.name == "H" and g1.target_qubits == g2.target_qubits == g3.target_qubits): self._circuit.pop(target_index + 2) self._circuit.pop(target_index + 1) self._circuit[target_index] = QuantumGate( name="Z", target_qubits=g1.target_qubits ) reward = 2.0 action_result = "identity_hxh_to_z" self._used_advanced_actions = True # ACTION 4: Replace CNOT(a,b)→CNOT(b,a)→CNOT(a,b) with SWAP (advanced identity) elif action_type == 4: if target_index + 2 < len(self._circuit): g1 = self._circuit[target_index] g2 = self._circuit[target_index + 1] g3 = self._circuit[target_index + 2] qubits_ab = g1.target_qubits # e.g. [0, 1] qubits_ba = list(reversed(g1.target_qubits)) # e.g. [1, 0] if (g1.name == "CNOT" and g2.name == "CNOT" and g3.name == "CNOT" and g1.target_qubits == g3.target_qubits and g2.target_qubits == qubits_ba): self._circuit.pop(target_index + 2) self._circuit.pop(target_index + 1) self._circuit[target_index] = QuantumGate( name="SWAP", target_qubits=g1.target_qubits ) reward = 2.0 # saves 2 gates, same as H-X-H identity action_result = "identity_3cnot_to_swap" self._used_advanced_actions = True return self._build_observation(reward, action_result) @property def state(self) -> State: return self._state def get_metadata(self) -> EnvironmentMetadata: """Return metadata shown in the HF Space web UI and consumed by platform agent.""" return EnvironmentMetadata( name="Quantum Circuit Optimizer", description=( "RL environment where an agent acts as a quantum compiler, " "reducing circuit depth by applying gate cancellation, " "commutativity swaps, and algebraic identities " "(H·X·H = Z, CNOT·SWAP = CZ) across 3 difficulty tiers " "(2-qubit easy → 4-qubit medium → 6-qubit hard with deep entanglement)." ), version="0.1.0", ) # ============================================================================ # Grader methods — thin delegates to graders.py (single source of truth) # No math here. Change grader logic only in graders.py. # ============================================================================ def _make_grader_obs(self) -> QuantumObservation: """ Build a minimal observation for grader calls. No side effects — does not trigger dead-end check or prompt generation. Only carries the fields that graders.py actually reads from metadata. """ return QuantumObservation( circuit=self._circuit, gate_count=len(self._circuit), num_qubits=self.task_config.num_qubits, metadata={ "initial_count": self._initial_gate_count, "step": self._state.step_count, "used_advanced_actions": self._used_advanced_actions, }, ) def grade_easy(self) -> float: return grade_easy(self._make_grader_obs()) def grade_medium(self) -> float: return grade_medium(self._make_grader_obs()) def grade_hard(self) -> float: return grade_hard(self._make_grader_obs()) def grade(self) -> float: """Grade current state using the active task's grader.""" return GRADERS[self.task_name](self._make_grader_obs()) # ============================================================================ # Internal helpers # ============================================================================ def _build_observation(self, reward: float, action_result: str) -> QuantumObservation: max_steps_reached = self._state.step_count >= 150 is_done = max_steps_reached or self._is_circuit_dead_end() return QuantumObservation( circuit=self._circuit, gate_count=len(self._circuit), num_qubits=self.task_config.num_qubits, done=is_done, reward=reward, prompt=self._generate_prompt(), metadata={ "task": self.task_name, "action_result": action_result, "step": self._state.step_count, "initial_count": self._initial_gate_count, "seed": self.current_seed, "used_advanced_actions": self._used_advanced_actions, }, ) def _is_circuit_dead_end(self) -> bool: if len(self._circuit) == 0: return True for i in range(len(self._circuit)): curr_gate = self._circuit[i] active_qubits = set(curr_gate.target_qubits) for j in range(i + 1, len(self._circuit)): next_qubits = set(self._circuit[j].target_qubits) if active_qubits.intersection(next_qubits): next_gate = self._circuit[j] if (next_gate.name == curr_gate.name and next_gate.target_qubits == curr_gate.target_qubits and curr_gate.name in self.SELF_INVERSE_GATES): return False break for i in range(len(self._circuit) - 1): if not set(self._circuit[i].target_qubits).intersection( set(self._circuit[i + 1].target_qubits)): return False return True def _generate_prompt(self) -> str: """Generates a human-readable prompt for the Web UI playground.""" prompt_text = ( f"Quantum Circuit Optimizer ({self.task_name.upper()})\n\n" f"A quantum circuit on {self.task_config.num_qubits} qubits has been generated. " "Your goal is to compress it by finding logical reductions.\n\n" "ACTIONS:\n\n" "1: Cancel identical self-inverse gates (H, X, Y, Z, CNOT, SWAP).\n\n" "2: Swap adjacent commuting gates (gates not sharing qubits).\n\n" "3: Replace an H-X-H sequence with a Z gate.\n\n" "4: Replace CNOT(a,b)→CNOT(b,a)→CNOT(a,b) with a single SWAP gate.\n\n" "CURRENT CIRCUIT STATE:\n\n" ) if not self._circuit: prompt_text += "[Empty Circuit - Optimization Complete!]" else: gate_strings = [] for i, gate in enumerate(self._circuit): qubits = ",".join(str(q) for q in gate.target_qubits) gate_strings.append(f"[{i}]{gate.name}({qubits})") prompt_text += " ".join(gate_strings) return prompt_text