# Demo simulation system for HF Spaces deployment # Simplified version that mimics the real DQN system behavior import numpy as np import random import torch import torch.nn as nn from gymnasium import Env, spaces # Import demo config try: import config_demo as config except ImportError: # Fallback configuration class Config: ACTION_DIMENSION = 9 criticalDepletion = [35.0, 0.01, 0.1, 7.0, 0.01, 50, 3.0, 5] + [0.1] * 11 depletion = [36.0, 0.1, 0.5, 7.1, 0.1, 80, 4.0, 10] + [0.5] * 11 excess = [38.5, 2.0, 40.0, 7.6, 2.0, 600, 15.0, 60] + [5.0] * 11 criticalExcess = [40.0, 5.0, 50.0, 7.8, 5.0, 700, 20.0, 80] + [10.0] * 11 config = Config() def get_scenario_type(): """Get scenario type for demo""" return "EYE" def initial_big_state(): """Generate initial state for demo""" # 19-dimensional state vector state = [ 37.0, # 0: Temperature 1.2, # 1: Pressure 0.8, # 2: Flow 15.0, # 3: VR (Vascular Resistance) 7.35, # 4: pH 1.0, # 5: Pressure2 400, # 6: pvO2 2.5, # 7: Other 1.0, # 8: Other 6.0, # 9: Glucose 16, # 10: Insulin 0.5, # 11: Other 1.0, # 12: Other 0.8, # 13: Other 1.2, # 14: Other 0.9, # 15: Other 0.0, # 16: Hours (simulation time) 1.0, # 17: Other 0.7 # 18: Other ] return np.array(state, dtype=np.float32) def single_step(action_combo): """Demo single step function""" action_value, big_state = action_combo # Decode action (simplified) action_vector = [] temp = action_value for i in range(config.ACTION_DIMENSION): component = temp % 3 if component == 2: component = -1 action_vector.append(component) temp = temp // 3 # Apply infusion-only constraints infusion_only_indices = [3, 4, 5, 6] for idx in infusion_only_indices: if idx < len(action_vector) and action_vector[idx] == -1: action_vector[idx] = 0 # Simulate parameter changes based on actions new_state = big_state.copy() # Add some realistic parameter evolution dt = 1.0 # 1 hour time step # Temperature control if len(action_vector) > 0: new_state[0] += action_vector[0] * 0.2 * dt + random.gauss(0, 0.1) new_state[0] = np.clip(new_state[0], 35.0, 40.0) # VR (Vascular Resistance) if len(action_vector) > 1: new_state[3] += action_vector[1] * 2.0 * dt + random.gauss(0, 0.5) new_state[3] = np.clip(new_state[3], 0.1, 50.0) # pH control if len(action_vector) > 2: new_state[4] += action_vector[2] * 0.05 * dt + random.gauss(0, 0.02) new_state[4] = np.clip(new_state[4], 6.9, 7.8) # pvO2 new_state[6] += random.gauss(0, 10) new_state[6] = np.clip(new_state[6], 50, 700) # Glucose if len(action_vector) > 3: new_state[9] += action_vector[3] * 0.5 * dt + random.gauss(0, 0.3) new_state[9] = np.clip(new_state[9], 2.0, 20.0) # Insulin if len(action_vector) > 4: new_state[10] += action_vector[4] * 5.0 * dt + random.gauss(0, 2.0) new_state[10] = np.clip(new_state[10], 5, 80) # Update simulation time new_state[16] += dt # Calculate score vector (6-dimensional) score_vector = np.zeros(6) # Check if parameters are in critical ranges for i, param_idx in enumerate([0, 3, 4, 6, 9, 10]): value = new_state[param_idx] if value <= config.criticalDepletion[param_idx] or value >= config.criticalExcess[param_idx]: score_vector[min(i, 5)] = 2 # Critical elif value <= config.depletion[param_idx] or value >= config.excess[param_idx]: score_vector[min(i, 5)] = 1 # Warning else: score_vector[min(i, 5)] = 0 # Normal # Calculate reward reward = 0 if new_state[16] >= 24: # Successful completion reward = 100 elif any(abs(s) >= 2 for s in score_vector): # Critical failure reward = -50 else: # Normal operation reward = 1 return new_state, score_vector, reward class DemoEnv(Env): """Demo environment for HF Spaces""" def __init__(self, scenario=None): super(DemoEnv, self).__init__() self.scenario = scenario if scenario else "EYE" # State space for key parameters if self.scenario == "EYE": self.state_indices = [0, 3, 4, 6, 9, 10] self.observation_space = spaces.Box( low=np.array([34, 0.1, 6.9, 0, 2, 1]), high=np.array([38, 50, 7.7, 700, 33, 80]), dtype=np.float32 ) else: # VCA self.state_indices = [0, 3, 4, 6, 9, 10] self.observation_space = spaces.Box( low=np.array([36, 0.01, 6.9, 0, 2, 1]), high=np.array([41, 2.0, 7.6, 500, 33, 80]), dtype=np.float32 ) self.action_space = spaces.Discrete(3**config.ACTION_DIMENSION) self.state = None self.big_state = None def decode_state(self, big_state): """Extract key parameters""" return np.array([big_state[i] for i in self.state_indices], dtype=np.float32) def decode_action(self, action_value): """Convert action index to components""" action_vector = [] temp = action_value for i in range(config.ACTION_DIMENSION): component = temp % 3 if component == 2: component = -1 action_vector.append(component) temp = temp // 3 # Apply constraints infusion_only_indices = [3, 4, 5, 6] for idx in infusion_only_indices: if idx < len(action_vector) and action_vector[idx] == -1: action_vector[idx] = 0 return action_vector def step(self, action_value, train=True): """Take environment step""" action_combo = [action_value, self.big_state] try: answer = single_step(action_combo) self.big_state = answer[0] score_vector = answer[1] simulator_reward = answer[2] except Exception as e: print(f"Error in simulation step: {e}") return self.decode_state(self.big_state), -1000, True, {} self.state = self.decode_state(self.big_state) hours_survived = self.big_state[16] # Check termination conditions done = False if hours_survived >= 24: done = True critical_failure = any(abs(score) >= 2 for score in score_vector) if critical_failure: done = True # Calculate reward if done: reward = hours_survived * hours_survived if hours_survived > 12 and hours_survived < 24: reward = hours_survived * 5 elif hours_survived < 12: reward = hours_survived - 5 else: reward = 0 return self.state, reward, done, { "hours_survived": hours_survived, "critical_failure": critical_failure } def reset(self): """Reset environment""" self.big_state = initial_big_state() # Add realistic variations if self.scenario == "EYE": self.big_state[0] = 37 + random.uniform(-0.5, 0.5) self.big_state[4] = 7.35 + random.uniform(-0.05, 0.05) self.big_state[9] = 6 + random.uniform(-1, 1) self.big_state[10] = 16 + random.uniform(-3, 3) else: # VCA self.big_state[0] = 36 + random.uniform(-1, 1) self.big_state[4] = 7.35 + random.uniform(-0.1, 0.1) self.big_state[9] = 6 + random.uniform(-0.5, 0.5) self.big_state[10] = 160 + random.uniform(-10, 10) self.big_state[16] = 0 # Reset time self.state = self.decode_state(self.big_state) return self.state class DemoDQN(nn.Module): """Demo DQN model""" def __init__(self, state_size, action_size, hidden_size=256): super(DemoDQN, self).__init__() self.fc1 = nn.Linear(state_size, hidden_size) self.fc2 = nn.Linear(hidden_size, hidden_size) self.fc3 = nn.Linear(hidden_size, action_size) def forward(self, x): if x.dim() == 1: x = x.unsqueeze(0) x = torch.relu(self.fc1(x)) x = torch.relu(self.fc2(x)) return self.fc3(x) class DemoAgent: """Demo DQN agent""" def __init__(self, state_size, action_size): self.state_size = state_size self.action_size = action_size self.policy_net = DemoDQN(state_size, action_size) self.epsilon = 0.0 # Initialize with some random weights that produce reasonable actions with torch.no_grad(): for param in self.policy_net.parameters(): param.data = torch.randn_like(param) * 0.1 def choose_action(self, state): """Choose action using policy network""" if random.random() < self.epsilon: return random.randrange(self.action_size) state_tensor = torch.FloatTensor(state).unsqueeze(0) self.policy_net.eval() with torch.no_grad(): action_values = self.policy_net(state_tensor) return action_values.argmax().item() def load_agent_demo(state_size=6, action_size=3**9): """Load demo agent""" return DemoAgent(state_size, action_size) # Aliases to match expected interface NewSimulationEnv = DemoEnv load_agent = lambda path: load_agent_demo()