perfusion / demo_simulation.py
Xiaonan LUO
Deploy Full Real Perfusion Monitoring System with DQN
e22b8f7
# Demo simulation system for HF Spaces deployment
# Simplified version that mimics the real DQN system behavior
import numpy as np
import random
import torch
import torch.nn as nn
from gymnasium import Env, spaces
# Import demo config
try:
import config_demo as config
except ImportError:
# Fallback configuration
class Config:
ACTION_DIMENSION = 9
criticalDepletion = [35.0, 0.01, 0.1, 7.0, 0.01, 50, 3.0, 5] + [0.1] * 11
depletion = [36.0, 0.1, 0.5, 7.1, 0.1, 80, 4.0, 10] + [0.5] * 11
excess = [38.5, 2.0, 40.0, 7.6, 2.0, 600, 15.0, 60] + [5.0] * 11
criticalExcess = [40.0, 5.0, 50.0, 7.8, 5.0, 700, 20.0, 80] + [10.0] * 11
config = Config()
def get_scenario_type():
"""Get scenario type for demo"""
return "EYE"
def initial_big_state():
"""Generate initial state for demo"""
# 19-dimensional state vector
state = [
37.0, # 0: Temperature
1.2, # 1: Pressure
0.8, # 2: Flow
15.0, # 3: VR (Vascular Resistance)
7.35, # 4: pH
1.0, # 5: Pressure2
400, # 6: pvO2
2.5, # 7: Other
1.0, # 8: Other
6.0, # 9: Glucose
16, # 10: Insulin
0.5, # 11: Other
1.0, # 12: Other
0.8, # 13: Other
1.2, # 14: Other
0.9, # 15: Other
0.0, # 16: Hours (simulation time)
1.0, # 17: Other
0.7 # 18: Other
]
return np.array(state, dtype=np.float32)
def single_step(action_combo):
"""Demo single step function"""
action_value, big_state = action_combo
# Decode action (simplified)
action_vector = []
temp = action_value
for i in range(config.ACTION_DIMENSION):
component = temp % 3
if component == 2:
component = -1
action_vector.append(component)
temp = temp // 3
# Apply infusion-only constraints
infusion_only_indices = [3, 4, 5, 6]
for idx in infusion_only_indices:
if idx < len(action_vector) and action_vector[idx] == -1:
action_vector[idx] = 0
# Simulate parameter changes based on actions
new_state = big_state.copy()
# Add some realistic parameter evolution
dt = 1.0 # 1 hour time step
# Temperature control
if len(action_vector) > 0:
new_state[0] += action_vector[0] * 0.2 * dt + random.gauss(0, 0.1)
new_state[0] = np.clip(new_state[0], 35.0, 40.0)
# VR (Vascular Resistance)
if len(action_vector) > 1:
new_state[3] += action_vector[1] * 2.0 * dt + random.gauss(0, 0.5)
new_state[3] = np.clip(new_state[3], 0.1, 50.0)
# pH control
if len(action_vector) > 2:
new_state[4] += action_vector[2] * 0.05 * dt + random.gauss(0, 0.02)
new_state[4] = np.clip(new_state[4], 6.9, 7.8)
# pvO2
new_state[6] += random.gauss(0, 10)
new_state[6] = np.clip(new_state[6], 50, 700)
# Glucose
if len(action_vector) > 3:
new_state[9] += action_vector[3] * 0.5 * dt + random.gauss(0, 0.3)
new_state[9] = np.clip(new_state[9], 2.0, 20.0)
# Insulin
if len(action_vector) > 4:
new_state[10] += action_vector[4] * 5.0 * dt + random.gauss(0, 2.0)
new_state[10] = np.clip(new_state[10], 5, 80)
# Update simulation time
new_state[16] += dt
# Calculate score vector (6-dimensional)
score_vector = np.zeros(6)
# Check if parameters are in critical ranges
for i, param_idx in enumerate([0, 3, 4, 6, 9, 10]):
value = new_state[param_idx]
if value <= config.criticalDepletion[param_idx] or value >= config.criticalExcess[param_idx]:
score_vector[min(i, 5)] = 2 # Critical
elif value <= config.depletion[param_idx] or value >= config.excess[param_idx]:
score_vector[min(i, 5)] = 1 # Warning
else:
score_vector[min(i, 5)] = 0 # Normal
# Calculate reward
reward = 0
if new_state[16] >= 24: # Successful completion
reward = 100
elif any(abs(s) >= 2 for s in score_vector): # Critical failure
reward = -50
else: # Normal operation
reward = 1
return new_state, score_vector, reward
class DemoEnv(Env):
"""Demo environment for HF Spaces"""
def __init__(self, scenario=None):
super(DemoEnv, self).__init__()
self.scenario = scenario if scenario else "EYE"
# State space for key parameters
if self.scenario == "EYE":
self.state_indices = [0, 3, 4, 6, 9, 10]
self.observation_space = spaces.Box(
low=np.array([34, 0.1, 6.9, 0, 2, 1]),
high=np.array([38, 50, 7.7, 700, 33, 80]),
dtype=np.float32
)
else: # VCA
self.state_indices = [0, 3, 4, 6, 9, 10]
self.observation_space = spaces.Box(
low=np.array([36, 0.01, 6.9, 0, 2, 1]),
high=np.array([41, 2.0, 7.6, 500, 33, 80]),
dtype=np.float32
)
self.action_space = spaces.Discrete(3**config.ACTION_DIMENSION)
self.state = None
self.big_state = None
def decode_state(self, big_state):
"""Extract key parameters"""
return np.array([big_state[i] for i in self.state_indices], dtype=np.float32)
def decode_action(self, action_value):
"""Convert action index to components"""
action_vector = []
temp = action_value
for i in range(config.ACTION_DIMENSION):
component = temp % 3
if component == 2:
component = -1
action_vector.append(component)
temp = temp // 3
# Apply constraints
infusion_only_indices = [3, 4, 5, 6]
for idx in infusion_only_indices:
if idx < len(action_vector) and action_vector[idx] == -1:
action_vector[idx] = 0
return action_vector
def step(self, action_value, train=True):
"""Take environment step"""
action_combo = [action_value, self.big_state]
try:
answer = single_step(action_combo)
self.big_state = answer[0]
score_vector = answer[1]
simulator_reward = answer[2]
except Exception as e:
print(f"Error in simulation step: {e}")
return self.decode_state(self.big_state), -1000, True, {}
self.state = self.decode_state(self.big_state)
hours_survived = self.big_state[16]
# Check termination conditions
done = False
if hours_survived >= 24:
done = True
critical_failure = any(abs(score) >= 2 for score in score_vector)
if critical_failure:
done = True
# Calculate reward
if done:
reward = hours_survived * hours_survived
if hours_survived > 12 and hours_survived < 24:
reward = hours_survived * 5
elif hours_survived < 12:
reward = hours_survived - 5
else:
reward = 0
return self.state, reward, done, {
"hours_survived": hours_survived,
"critical_failure": critical_failure
}
def reset(self):
"""Reset environment"""
self.big_state = initial_big_state()
# Add realistic variations
if self.scenario == "EYE":
self.big_state[0] = 37 + random.uniform(-0.5, 0.5)
self.big_state[4] = 7.35 + random.uniform(-0.05, 0.05)
self.big_state[9] = 6 + random.uniform(-1, 1)
self.big_state[10] = 16 + random.uniform(-3, 3)
else: # VCA
self.big_state[0] = 36 + random.uniform(-1, 1)
self.big_state[4] = 7.35 + random.uniform(-0.1, 0.1)
self.big_state[9] = 6 + random.uniform(-0.5, 0.5)
self.big_state[10] = 160 + random.uniform(-10, 10)
self.big_state[16] = 0 # Reset time
self.state = self.decode_state(self.big_state)
return self.state
class DemoDQN(nn.Module):
"""Demo DQN model"""
def __init__(self, state_size, action_size, hidden_size=256):
super(DemoDQN, self).__init__()
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, action_size)
def forward(self, x):
if x.dim() == 1:
x = x.unsqueeze(0)
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x)
class DemoAgent:
"""Demo DQN agent"""
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.policy_net = DemoDQN(state_size, action_size)
self.epsilon = 0.0
# Initialize with some random weights that produce reasonable actions
with torch.no_grad():
for param in self.policy_net.parameters():
param.data = torch.randn_like(param) * 0.1
def choose_action(self, state):
"""Choose action using policy network"""
if random.random() < self.epsilon:
return random.randrange(self.action_size)
state_tensor = torch.FloatTensor(state).unsqueeze(0)
self.policy_net.eval()
with torch.no_grad():
action_values = self.policy_net(state_tensor)
return action_values.argmax().item()
def load_agent_demo(state_size=6, action_size=3**9):
"""Load demo agent"""
return DemoAgent(state_size, action_size)
# Aliases to match expected interface
NewSimulationEnv = DemoEnv
load_agent = lambda path: load_agent_demo()