what-if-simulation-app / src /models /maximumlikelihood.py
tranhuonglan
first commit
e448441
from math import inf
import re
from pkg_resources import require
from models.maxcausal import *
import numpy as np
from utils.irl_helper import *
import torch
def mlirl(feature_matrix, p_start_state, transition_probability,
trajectories, epochs, learning_rate, init=None, discount=0.9,
l1_reg = 0.01, l2_reg = 0.01,
show=True, weights=None, eps=1e-5, seed=0):
"""
Find the reward function for the given trajectories.
feature_matrix: Matrix with the nth row representing the nth state. NumPy
array with shape (N, D) where N is the number of states and D is the
dimensionality of the state.
n_actions: Number of actions A. int.
discount: Discount factor of the MDP. float.
transition_probability: NumPy array mapping (state_i, action, state_k) to
the probability of transitioning from state_i to state_k under action.
Shape (N, A, N).
trajectories: 3D array of state/action pairs. States are ints, actions
are ints. NumPy array with shape (T, L, 2) where T is the number of
trajectories and L is the trajectory length.
epochs: Number of gradient descent steps. int.
learning_rate: Gradient descent learning rate. float.
-> Reward vector with shape (N,).
"""
torch.manual_seed(seed=seed)
np.random.seed(seed=seed)
torch.use_deterministic_algorithms(True)
if (trajectories is None or len(trajectories) == 0):
return {'train loss': [], 'alpha': [], 'reward': []}
training_loss = []
n_states, n_action, d_states = transition_probability.shape[0], transition_probability.shape[1], feature_matrix.shape[-1]
try:
max_length = np.max([len(traj) for traj in trajectories])
except:
print(trajectories)
not_valid_state_action = torch.tensor(transition_probability.sum(axis=-1) <= 0)
linear_layer = LinearLayer(num_features=d_states, init_weights=init)
optim = torch.optim.Adam(linear_layer.parameters(), lr=learning_rate, weight_decay=l2_reg)
if (show):
print('Max length of trajectories: ', max_length)
print('Feature matrix shape:', feature_matrix.shape)
max_iter = max(max_length, 1000)
for i in tqdm(range(epochs)):
loss=0
n_sa=0
if show:
print(f'Epoch {i}')
r = linear_layer(feature_matrix)
r[not_valid_state_action] = -float('inf')
policy = soft_policy(transition_probability=transition_probability, reward=r, discount=discount, show=show, reward_sa=True)
for trajectory in trajectories:
for (s,a,_) in trajectory[:-1]:
loss -= torch.log(policy[s, a])
n_sa += 1
optim.zero_grad()
loss.backward()
training_loss.append(loss.detach().item())
optim.step()
reward = linear_layer(feature_matrix)
reward[not_valid_state_action] = torch.nan
nan_mask = torch.isnan(reward)
mean_reward = torch.nanmean(reward)
std_reward = torch.std(reward[~nan_mask])
reward = (reward - mean_reward) / std_reward
reward[not_valid_state_action] = -float('inf')
if show:
plt.plot(np.arange(len(training_loss)), training_loss)
plt.show()
return {'train loss': training_loss, 'alpha': linear_layer.weights.detach().numpy(), 'reward': reward.detach().numpy()}
def soft_policy(transition_probability, reward, discount=0.9, eps=1e-5, show=True, reward_sa=True):
# Set up transition probability matrices
n_states, n_actions = transition_probability.shape[0], transition_probability.shape[1]
p = torch.tensor([transition_probability[:, a, :] for a in range(n_actions)], dtype=torch.float32)
v = torch.full((n_states,), -float(1e20), requires_grad=False) # Use a large negative value instead of -inf
q = torch.full((n_states, n_actions), -float(1e20), requires_grad=True)
delta = torch.inf
count = 0
while delta > eps:
count += 1
v_old = v.clone()
if not reward_sa:
q = torch.stack([reward + discount * p[a].matmul(v_old) for a in range(n_actions)], dim=1)
else:
q = torch.stack([reward[:, a] + discount * p[a].matmul(v_old) for a in range(n_actions)],dim=1)
v = torch.ones((n_states,), requires_grad=False)
for a in range(n_actions):
v = torch.maximum(v, q[:, a]) + torch.log1p(torch.exp(torch.minimum(v, q[:, a]) - torch.maximum(v, q[:, a])))
delta = torch.max(torch.abs(v - v_old))
# Compute and return policy
if show:
print(f"Delta: {delta.item()}")
print('Calculate policy: ', count, 'iterations (soft policy)')
if not reward_sa:
q = torch.stack([reward + discount * p[a].matmul(v) for a in range(n_actions)], dim=1)
else:
q = torch.stack([reward[:, a] + discount * p[a].matmul(v) for a in range(n_actions)], dim=1)
return torch.exp(q - v.unsqueeze(1))
def mlirl_each_week(trajectories_each_week, world, feature_matrix,
discount=0.9, learning_rate=0.001, epochs=50, l1_reg=0.01, l2_reg=0.01,
init=False, remove_outlier=False, show=True):
transition_probability = world.p_transition
p_start_state = world.p_start_state
history_weeks = []
print('Init, remove_outlier:', init, remove_outlier)
for itr in range(len(trajectories_each_week)):
print('WEEK:', itr)
if (len(trajectories_each_week[itr]) == 0):
print('Empty trajectories')
history_weeks.append({'train loss': [], 'alpha': [], 'reward': []})
continue
trajectories = []
if remove_outlier:
length_trajectories = [len(t) for t in trajectories_each_week[itr]]
lower_bound, upper_bound = int(np.percentile(length_trajectories, 5)), int(np.percentile(length_trajectories, 95))
print(f'lower_bound<upper_bound: {lower_bound}<{upper_bound}')
trajectories = [t[:min(len(t), upper_bound)] for t in trajectories_each_week[itr] if (len(t) > lower_bound)]
print('before:', len(trajectories_each_week[itr]), 'after:', len(trajectories))
else:
trajectories = trajectories_each_week[itr]
trajectories = [t for t in trajectories if (len(t) > 0)]
history = {}
if init and itr != 0:
history = mlirl(feature_matrix=feature_matrix, p_start_state=p_start_state, transition_probability=transition_probability,
trajectories=trajectories, epochs=epochs, discount=discount, learning_rate=learning_rate,
init=np.copy(history_weeks[-1]['alpha']), show=show, l1_reg=l1_reg, l2_reg=l2_reg)
else:
history = mlirl(feature_matrix=feature_matrix, p_start_state=p_start_state, transition_probability=transition_probability,
trajectories=trajectories, epochs=epochs, discount=discount, learning_rate=learning_rate, show=show, l1_reg=l1_reg, l2_reg=l2_reg)
train_loss, alpha, reward = history['train loss'], history['alpha'], history['reward']
if show:
plt.plot(np.arange(len(train_loss)), train_loss)
plt.show()
history_weeks.append(history)
return history_weeks
class LinearLayer(nn.Module):
def __init__(self, num_features, init_weights=None):
super().__init__()
self.weights = nn.Parameter(torch.rand(num_features))
if (init_weights is not None):
self.weights = nn.Parameter(torch.from_numpy(np.copy(init_weights)))
def forward(self, feature_matrix):
output = torch.matmul(torch.from_numpy(feature_matrix).float(), self.weights.t())
return output