| |
| |
|
|
| import numpy as np |
| import random |
| import torch |
| import torch.nn as nn |
| from gymnasium import Env, spaces |
|
|
| |
| try: |
| import config_demo as config |
| except ImportError: |
| |
| class Config: |
| ACTION_DIMENSION = 9 |
| criticalDepletion = [35.0, 0.01, 0.1, 7.0, 0.01, 50, 3.0, 5] + [0.1] * 11 |
| depletion = [36.0, 0.1, 0.5, 7.1, 0.1, 80, 4.0, 10] + [0.5] * 11 |
| excess = [38.5, 2.0, 40.0, 7.6, 2.0, 600, 15.0, 60] + [5.0] * 11 |
| criticalExcess = [40.0, 5.0, 50.0, 7.8, 5.0, 700, 20.0, 80] + [10.0] * 11 |
| config = Config() |
|
|
| def get_scenario_type(): |
| """Get scenario type for demo""" |
| return "EYE" |
|
|
| def initial_big_state(): |
| """Generate initial state for demo""" |
| |
| state = [ |
| 37.0, |
| 1.2, |
| 0.8, |
| 15.0, |
| 7.35, |
| 1.0, |
| 400, |
| 2.5, |
| 1.0, |
| 6.0, |
| 16, |
| 0.5, |
| 1.0, |
| 0.8, |
| 1.2, |
| 0.9, |
| 0.0, |
| 1.0, |
| 0.7 |
| ] |
| return np.array(state, dtype=np.float32) |
|
|
| def single_step(action_combo): |
| """Demo single step function""" |
| action_value, big_state = action_combo |
| |
| |
| action_vector = [] |
| temp = action_value |
| for i in range(config.ACTION_DIMENSION): |
| component = temp % 3 |
| if component == 2: |
| component = -1 |
| action_vector.append(component) |
| temp = temp // 3 |
| |
| |
| infusion_only_indices = [3, 4, 5, 6] |
| for idx in infusion_only_indices: |
| if idx < len(action_vector) and action_vector[idx] == -1: |
| action_vector[idx] = 0 |
| |
| |
| new_state = big_state.copy() |
| |
| |
| dt = 1.0 |
| |
| |
| if len(action_vector) > 0: |
| new_state[0] += action_vector[0] * 0.2 * dt + random.gauss(0, 0.1) |
| new_state[0] = np.clip(new_state[0], 35.0, 40.0) |
| |
| |
| if len(action_vector) > 1: |
| new_state[3] += action_vector[1] * 2.0 * dt + random.gauss(0, 0.5) |
| new_state[3] = np.clip(new_state[3], 0.1, 50.0) |
| |
| |
| if len(action_vector) > 2: |
| new_state[4] += action_vector[2] * 0.05 * dt + random.gauss(0, 0.02) |
| new_state[4] = np.clip(new_state[4], 6.9, 7.8) |
| |
| |
| new_state[6] += random.gauss(0, 10) |
| new_state[6] = np.clip(new_state[6], 50, 700) |
| |
| |
| if len(action_vector) > 3: |
| new_state[9] += action_vector[3] * 0.5 * dt + random.gauss(0, 0.3) |
| new_state[9] = np.clip(new_state[9], 2.0, 20.0) |
| |
| |
| if len(action_vector) > 4: |
| new_state[10] += action_vector[4] * 5.0 * dt + random.gauss(0, 2.0) |
| new_state[10] = np.clip(new_state[10], 5, 80) |
| |
| |
| new_state[16] += dt |
| |
| |
| score_vector = np.zeros(6) |
| |
| |
| for i, param_idx in enumerate([0, 3, 4, 6, 9, 10]): |
| value = new_state[param_idx] |
| if value <= config.criticalDepletion[param_idx] or value >= config.criticalExcess[param_idx]: |
| score_vector[min(i, 5)] = 2 |
| elif value <= config.depletion[param_idx] or value >= config.excess[param_idx]: |
| score_vector[min(i, 5)] = 1 |
| else: |
| score_vector[min(i, 5)] = 0 |
| |
| |
| reward = 0 |
| if new_state[16] >= 24: |
| reward = 100 |
| elif any(abs(s) >= 2 for s in score_vector): |
| reward = -50 |
| else: |
| reward = 1 |
| |
| return new_state, score_vector, reward |
|
|
| class DemoEnv(Env): |
| """Demo environment for HF Spaces""" |
| def __init__(self, scenario=None): |
| super(DemoEnv, self).__init__() |
| self.scenario = scenario if scenario else "EYE" |
| |
| |
| if self.scenario == "EYE": |
| self.state_indices = [0, 3, 4, 6, 9, 10] |
| self.observation_space = spaces.Box( |
| low=np.array([34, 0.1, 6.9, 0, 2, 1]), |
| high=np.array([38, 50, 7.7, 700, 33, 80]), |
| dtype=np.float32 |
| ) |
| else: |
| self.state_indices = [0, 3, 4, 6, 9, 10] |
| self.observation_space = spaces.Box( |
| low=np.array([36, 0.01, 6.9, 0, 2, 1]), |
| high=np.array([41, 2.0, 7.6, 500, 33, 80]), |
| dtype=np.float32 |
| ) |
| |
| self.action_space = spaces.Discrete(3**config.ACTION_DIMENSION) |
| self.state = None |
| self.big_state = None |
| |
| def decode_state(self, big_state): |
| """Extract key parameters""" |
| return np.array([big_state[i] for i in self.state_indices], dtype=np.float32) |
| |
| def decode_action(self, action_value): |
| """Convert action index to components""" |
| action_vector = [] |
| temp = action_value |
| for i in range(config.ACTION_DIMENSION): |
| component = temp % 3 |
| if component == 2: |
| component = -1 |
| action_vector.append(component) |
| temp = temp // 3 |
| |
| |
| infusion_only_indices = [3, 4, 5, 6] |
| for idx in infusion_only_indices: |
| if idx < len(action_vector) and action_vector[idx] == -1: |
| action_vector[idx] = 0 |
| |
| return action_vector |
| |
| def step(self, action_value, train=True): |
| """Take environment step""" |
| action_combo = [action_value, self.big_state] |
| |
| try: |
| answer = single_step(action_combo) |
| self.big_state = answer[0] |
| score_vector = answer[1] |
| simulator_reward = answer[2] |
| except Exception as e: |
| print(f"Error in simulation step: {e}") |
| return self.decode_state(self.big_state), -1000, True, {} |
| |
| self.state = self.decode_state(self.big_state) |
| hours_survived = self.big_state[16] |
| |
| |
| done = False |
| if hours_survived >= 24: |
| done = True |
| |
| critical_failure = any(abs(score) >= 2 for score in score_vector) |
| if critical_failure: |
| done = True |
| |
| |
| if done: |
| reward = hours_survived * hours_survived |
| if hours_survived > 12 and hours_survived < 24: |
| reward = hours_survived * 5 |
| elif hours_survived < 12: |
| reward = hours_survived - 5 |
| else: |
| reward = 0 |
| |
| return self.state, reward, done, { |
| "hours_survived": hours_survived, |
| "critical_failure": critical_failure |
| } |
| |
| def reset(self): |
| """Reset environment""" |
| self.big_state = initial_big_state() |
| |
| |
| if self.scenario == "EYE": |
| self.big_state[0] = 37 + random.uniform(-0.5, 0.5) |
| self.big_state[4] = 7.35 + random.uniform(-0.05, 0.05) |
| self.big_state[9] = 6 + random.uniform(-1, 1) |
| self.big_state[10] = 16 + random.uniform(-3, 3) |
| else: |
| self.big_state[0] = 36 + random.uniform(-1, 1) |
| self.big_state[4] = 7.35 + random.uniform(-0.1, 0.1) |
| self.big_state[9] = 6 + random.uniform(-0.5, 0.5) |
| self.big_state[10] = 160 + random.uniform(-10, 10) |
| |
| self.big_state[16] = 0 |
| self.state = self.decode_state(self.big_state) |
| return self.state |
|
|
| class DemoDQN(nn.Module): |
| """Demo DQN model""" |
| def __init__(self, state_size, action_size, hidden_size=256): |
| super(DemoDQN, self).__init__() |
| self.fc1 = nn.Linear(state_size, hidden_size) |
| self.fc2 = nn.Linear(hidden_size, hidden_size) |
| self.fc3 = nn.Linear(hidden_size, action_size) |
| |
| def forward(self, x): |
| if x.dim() == 1: |
| x = x.unsqueeze(0) |
| x = torch.relu(self.fc1(x)) |
| x = torch.relu(self.fc2(x)) |
| return self.fc3(x) |
|
|
| class DemoAgent: |
| """Demo DQN agent""" |
| def __init__(self, state_size, action_size): |
| self.state_size = state_size |
| self.action_size = action_size |
| self.policy_net = DemoDQN(state_size, action_size) |
| self.epsilon = 0.0 |
| |
| |
| with torch.no_grad(): |
| for param in self.policy_net.parameters(): |
| param.data = torch.randn_like(param) * 0.1 |
| |
| def choose_action(self, state): |
| """Choose action using policy network""" |
| if random.random() < self.epsilon: |
| return random.randrange(self.action_size) |
| |
| state_tensor = torch.FloatTensor(state).unsqueeze(0) |
| self.policy_net.eval() |
| with torch.no_grad(): |
| action_values = self.policy_net(state_tensor) |
| |
| return action_values.argmax().item() |
|
|
| def load_agent_demo(state_size=6, action_size=3**9): |
| """Load demo agent""" |
| return DemoAgent(state_size, action_size) |
|
|
| |
| NewSimulationEnv = DemoEnv |
| load_agent = lambda path: load_agent_demo() |
|
|