Spaces:
Sleeping
Sleeping
File size: 7,560 Bytes
6fa4fbd 8f2eab9 6fa4fbd a422c8d 6fa4fbd a422c8d 6fa4fbd 8f2eab9 a422c8d 6fa4fbd a422c8d 6fa4fbd 7909885 a422c8d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | from typing import Optional
from models import NeuralTunerAction
from server.neural_tuner_env_environment import NeuralTunerEnvironment
class NeuralTunerOpenEnv:
"""OpenEnv wrapper compatible with TRL environment_factory."""
scenario_schedule: list[dict] = []
schedule_idx: int = 0
def __init__(self):
self._env = NeuralTunerEnvironment()
self.reward = 0.0
self.done = False
self._last_action_signature = None
self._last_profiled_layer = None
self._state_revision = 0
self._last_benchmark_revision = -1
self._last_benchmark = None
self._pending_benchmark_delta = 0.0
self._pending_action_quality = 0.0
def reset(self, **kwargs) -> str:
scenario = None
if kwargs.get("model_id") or kwargs.get("difficulty"):
scenario = {
"model_id": kwargs.get("model_id", "inception_v3"),
"difficulty": kwargs.get("difficulty", "medium"),
}
elif self.scenario_schedule:
scenario = self.scenario_schedule[self.schedule_idx % len(self.scenario_schedule)]
NeuralTunerOpenEnv.schedule_idx += 1
else:
scenario = {"model_id": "inception_v3", "difficulty": "medium"}
obs = self._env.reset(
difficulty=scenario["difficulty"],
model_id=scenario["model_id"],
seed=kwargs.get("seed", 42),
)
self.reward = 0.0
self.done = False
self._last_action_signature = None
self._last_profiled_layer = None
self._state_revision = 0
self._last_benchmark_revision = -1
self._last_benchmark = None
self._pending_benchmark_delta = 0.0
self._pending_action_quality = 0.0
return obs.output
def _step(
self,
action_type: str,
layer_id: Optional[str] = None,
dtype: Optional[str] = None,
sparsity: Optional[str] = None,
) -> str:
action_signature = (action_type, layer_id, dtype, sparsity)
prev_action_signature = self._last_action_signature
if self._last_action_signature == action_signature:
# Penalize repeatedly issuing the exact same action.
self._pending_action_quality -= 0.01
if action_type == "profile_layer":
if self._last_profiled_layer == layer_id:
self._pending_action_quality -= 0.005
else:
self._pending_action_quality += 0.005
self._last_profiled_layer = layer_id
if action_type in {"quantize_layer", "prune_layer", "revert_layer"}:
self._state_revision += 1
if layer_id is not None and layer_id == self._last_profiled_layer:
# Reward profile->decision progression on the same layer.
self._pending_action_quality += 0.008
else:
self._pending_action_quality += 0.002
result = self._env.step(
NeuralTunerAction(action_type=action_type, layer_id=layer_id, dtype=dtype, sparsity=sparsity)
)
self.reward = float(result.reward)
self.done = bool(result.done)
self._last_action_signature = action_signature
if action_type == "benchmark":
report = result.metadata or {}
latency = float(report.get("quantized_latency_ms", 0.0))
memory = float(report.get("quantized_memory_mb", 0.0))
accuracy = float(report.get("estimated_accuracy_retention", 0.0))
current = {"latency": latency, "memory": memory, "accuracy": accuracy}
if self._last_benchmark is not None:
prev = self._last_benchmark
latency_gain = (prev["latency"] - current["latency"]) / max(prev["latency"], 1.0)
memory_gain = (prev["memory"] - current["memory"]) / max(prev["memory"], 1.0)
accuracy_term = 0.002 if current["accuracy"] >= prev["accuracy"] else -0.004
delta_reward = 0.05 * latency_gain + 0.05 * memory_gain + accuracy_term
if self._state_revision == self._last_benchmark_revision:
# Penalize benchmark spam without state changes.
delta_reward -= 0.01
else:
delta_reward = 0.0
self._pending_benchmark_delta += max(-0.03, min(0.03, delta_reward))
self._last_benchmark = current
self._last_benchmark_revision = self._state_revision
if prev_action_signature and prev_action_signature[0] in {"quantize_layer", "prune_layer", "revert_layer"}:
self._pending_action_quality += 0.004
return result.output
def profile_layer(self, layer_id: str) -> str:
"""Reveal sensitivity and hardware risk for a specific layer.
Args:
layer_id: Layer identifier from the environment layer table.
Returns:
Text report containing sensitivity score and optimization hints.
"""
return self._step("profile_layer", layer_id=layer_id)
def quantize_layer(self, layer_id: str, dtype: str) -> str:
"""Apply a quantization dtype to one layer.
Args:
layer_id: Layer identifier from the environment layer table.
dtype: Quantization target, one of FP32, FP16, INT8, INT4.
Returns:
Text summary of the quantization change.
"""
return self._step("quantize_layer", layer_id=layer_id, dtype=dtype)
def prune_layer(self, layer_id: str, sparsity: str) -> str:
"""Apply structured pruning to one layer for Snapdragon sparse-acceleration.
Pruning removes channels/filters, reducing compute and memory. The Snapdragon
HTP has dedicated hardware for sparse workloads — combine with quantization
for maximum compression. Profile first to gauge accuracy risk.
Args:
layer_id: Layer identifier from the environment layer table.
sparsity: Pruning level — LOW (25% removed), MEDIUM (50%), or HIGH (75%).
Returns:
Text summary of the pruning change and expected impact.
"""
return self._step("prune_layer", layer_id=layer_id, sparsity=sparsity)
def revert_layer(self, layer_id: str) -> str:
"""Reset one layer back to FP32 with no pruning.
Args:
layer_id: Layer identifier from the environment layer table.
Returns:
Text summary confirming the revert action.
"""
return self._step("revert_layer", layer_id=layer_id)
def benchmark(self) -> str:
"""Run hardware simulation for the current quantization and pruning plan.
Returns:
Benchmark report with latency, memory, accuracy, and projected reward.
"""
return self._step("benchmark")
def submit(self) -> str:
"""Finalize the episode and compute the final reward.
Returns:
Final submission summary including constraint pass/fail and reward.
"""
return self._step("submit")
def _consume_reward_components(self) -> dict:
"""Internal helper: return and reset pending shaping components."""
components = {
"benchmark_delta_reward": float(self._pending_benchmark_delta),
"action_quality_reward": float(self._pending_action_quality),
}
self._pending_benchmark_delta = 0.0
self._pending_action_quality = 0.0
return components
|