Spaces:
Sleeping
Sleeping
| from typing import Optional | |
| from models import NeuralTunerAction | |
| from server.neural_tuner_env_environment import NeuralTunerEnvironment | |
| class NeuralTunerOpenEnv: | |
| """OpenEnv wrapper compatible with TRL environment_factory.""" | |
| scenario_schedule: list[dict] = [] | |
| schedule_idx: int = 0 | |
| def __init__(self): | |
| self._env = NeuralTunerEnvironment() | |
| self.reward = 0.0 | |
| self.done = False | |
| self._last_action_signature = None | |
| self._last_profiled_layer = None | |
| self._state_revision = 0 | |
| self._last_benchmark_revision = -1 | |
| self._last_benchmark = None | |
| self._pending_benchmark_delta = 0.0 | |
| self._pending_action_quality = 0.0 | |
| def reset(self, **kwargs) -> str: | |
| scenario = None | |
| if kwargs.get("model_id") or kwargs.get("difficulty"): | |
| scenario = { | |
| "model_id": kwargs.get("model_id", "inception_v3"), | |
| "difficulty": kwargs.get("difficulty", "medium"), | |
| } | |
| elif self.scenario_schedule: | |
| scenario = self.scenario_schedule[self.schedule_idx % len(self.scenario_schedule)] | |
| NeuralTunerOpenEnv.schedule_idx += 1 | |
| else: | |
| scenario = {"model_id": "inception_v3", "difficulty": "medium"} | |
| obs = self._env.reset( | |
| difficulty=scenario["difficulty"], | |
| model_id=scenario["model_id"], | |
| seed=kwargs.get("seed", 42), | |
| ) | |
| self.reward = 0.0 | |
| self.done = False | |
| self._last_action_signature = None | |
| self._last_profiled_layer = None | |
| self._state_revision = 0 | |
| self._last_benchmark_revision = -1 | |
| self._last_benchmark = None | |
| self._pending_benchmark_delta = 0.0 | |
| self._pending_action_quality = 0.0 | |
| return obs.output | |
| def _step( | |
| self, | |
| action_type: str, | |
| layer_id: Optional[str] = None, | |
| dtype: Optional[str] = None, | |
| sparsity: Optional[str] = None, | |
| ) -> str: | |
| action_signature = (action_type, layer_id, dtype, sparsity) | |
| prev_action_signature = self._last_action_signature | |
| if self._last_action_signature == action_signature: | |
| # Penalize repeatedly issuing the exact same action. | |
| self._pending_action_quality -= 0.01 | |
| if action_type == "profile_layer": | |
| if self._last_profiled_layer == layer_id: | |
| self._pending_action_quality -= 0.005 | |
| else: | |
| self._pending_action_quality += 0.005 | |
| self._last_profiled_layer = layer_id | |
| if action_type in {"quantize_layer", "prune_layer", "revert_layer"}: | |
| self._state_revision += 1 | |
| if layer_id is not None and layer_id == self._last_profiled_layer: | |
| # Reward profile->decision progression on the same layer. | |
| self._pending_action_quality += 0.008 | |
| else: | |
| self._pending_action_quality += 0.002 | |
| result = self._env.step( | |
| NeuralTunerAction(action_type=action_type, layer_id=layer_id, dtype=dtype, sparsity=sparsity) | |
| ) | |
| self.reward = float(result.reward) | |
| self.done = bool(result.done) | |
| self._last_action_signature = action_signature | |
| if action_type == "benchmark": | |
| report = result.metadata or {} | |
| latency = float(report.get("quantized_latency_ms", 0.0)) | |
| memory = float(report.get("quantized_memory_mb", 0.0)) | |
| accuracy = float(report.get("estimated_accuracy_retention", 0.0)) | |
| current = {"latency": latency, "memory": memory, "accuracy": accuracy} | |
| if self._last_benchmark is not None: | |
| prev = self._last_benchmark | |
| latency_gain = (prev["latency"] - current["latency"]) / max(prev["latency"], 1.0) | |
| memory_gain = (prev["memory"] - current["memory"]) / max(prev["memory"], 1.0) | |
| accuracy_term = 0.002 if current["accuracy"] >= prev["accuracy"] else -0.004 | |
| delta_reward = 0.05 * latency_gain + 0.05 * memory_gain + accuracy_term | |
| if self._state_revision == self._last_benchmark_revision: | |
| # Penalize benchmark spam without state changes. | |
| delta_reward -= 0.01 | |
| else: | |
| delta_reward = 0.0 | |
| self._pending_benchmark_delta += max(-0.03, min(0.03, delta_reward)) | |
| self._last_benchmark = current | |
| self._last_benchmark_revision = self._state_revision | |
| if prev_action_signature and prev_action_signature[0] in {"quantize_layer", "prune_layer", "revert_layer"}: | |
| self._pending_action_quality += 0.004 | |
| return result.output | |
| def profile_layer(self, layer_id: str) -> str: | |
| """Reveal sensitivity and hardware risk for a specific layer. | |
| Args: | |
| layer_id: Layer identifier from the environment layer table. | |
| Returns: | |
| Text report containing sensitivity score and optimization hints. | |
| """ | |
| return self._step("profile_layer", layer_id=layer_id) | |
| def quantize_layer(self, layer_id: str, dtype: str) -> str: | |
| """Apply a quantization dtype to one layer. | |
| Args: | |
| layer_id: Layer identifier from the environment layer table. | |
| dtype: Quantization target, one of FP32, FP16, INT8, INT4. | |
| Returns: | |
| Text summary of the quantization change. | |
| """ | |
| return self._step("quantize_layer", layer_id=layer_id, dtype=dtype) | |
| def prune_layer(self, layer_id: str, sparsity: str) -> str: | |
| """Apply structured pruning to one layer for Snapdragon sparse-acceleration. | |
| Pruning removes channels/filters, reducing compute and memory. The Snapdragon | |
| HTP has dedicated hardware for sparse workloads — combine with quantization | |
| for maximum compression. Profile first to gauge accuracy risk. | |
| Args: | |
| layer_id: Layer identifier from the environment layer table. | |
| sparsity: Pruning level — LOW (25% removed), MEDIUM (50%), or HIGH (75%). | |
| Returns: | |
| Text summary of the pruning change and expected impact. | |
| """ | |
| return self._step("prune_layer", layer_id=layer_id, sparsity=sparsity) | |
| def revert_layer(self, layer_id: str) -> str: | |
| """Reset one layer back to FP32 with no pruning. | |
| Args: | |
| layer_id: Layer identifier from the environment layer table. | |
| Returns: | |
| Text summary confirming the revert action. | |
| """ | |
| return self._step("revert_layer", layer_id=layer_id) | |
| def benchmark(self) -> str: | |
| """Run hardware simulation for the current quantization and pruning plan. | |
| Returns: | |
| Benchmark report with latency, memory, accuracy, and projected reward. | |
| """ | |
| return self._step("benchmark") | |
| def submit(self) -> str: | |
| """Finalize the episode and compute the final reward. | |
| Returns: | |
| Final submission summary including constraint pass/fail and reward. | |
| """ | |
| return self._step("submit") | |
| def _consume_reward_components(self) -> dict: | |
| """Internal helper: return and reset pending shaping components.""" | |
| components = { | |
| "benchmark_delta_reward": float(self._pending_benchmark_delta), | |
| "action_quality_reward": float(self._pending_action_quality), | |
| } | |
| self._pending_benchmark_delta = 0.0 | |
| self._pending_action_quality = 0.0 | |
| return components | |