| from math import inf |
| import re |
| from pkg_resources import require |
| from models.maxcausal import * |
| import numpy as np |
| from utils.irl_helper import * |
| import torch |
|
|
| def mlirl(feature_matrix, p_start_state, transition_probability, |
| trajectories, epochs, learning_rate, init=None, discount=0.9, |
| l1_reg = 0.01, l2_reg = 0.01, |
| show=True, weights=None, eps=1e-5, seed=0): |
| """ |
| Find the reward function for the given trajectories. |
| |
| feature_matrix: Matrix with the nth row representing the nth state. NumPy |
| array with shape (N, D) where N is the number of states and D is the |
| dimensionality of the state. |
| n_actions: Number of actions A. int. |
| discount: Discount factor of the MDP. float. |
| transition_probability: NumPy array mapping (state_i, action, state_k) to |
| the probability of transitioning from state_i to state_k under action. |
| Shape (N, A, N). |
| trajectories: 3D array of state/action pairs. States are ints, actions |
| are ints. NumPy array with shape (T, L, 2) where T is the number of |
| trajectories and L is the trajectory length. |
| epochs: Number of gradient descent steps. int. |
| learning_rate: Gradient descent learning rate. float. |
| -> Reward vector with shape (N,). |
| """ |
| torch.manual_seed(seed=seed) |
| np.random.seed(seed=seed) |
| torch.use_deterministic_algorithms(True) |
| if (trajectories is None or len(trajectories) == 0): |
| return {'train loss': [], 'alpha': [], 'reward': []} |
| training_loss = [] |
| n_states, n_action, d_states = transition_probability.shape[0], transition_probability.shape[1], feature_matrix.shape[-1] |
| try: |
| max_length = np.max([len(traj) for traj in trajectories]) |
| except: |
| print(trajectories) |
| |
| not_valid_state_action = torch.tensor(transition_probability.sum(axis=-1) <= 0) |
| linear_layer = LinearLayer(num_features=d_states, init_weights=init) |
| optim = torch.optim.Adam(linear_layer.parameters(), lr=learning_rate, weight_decay=l2_reg) |
| if (show): |
| print('Max length of trajectories: ', max_length) |
| print('Feature matrix shape:', feature_matrix.shape) |
| max_iter = max(max_length, 1000) |
|
|
| for i in tqdm(range(epochs)): |
| loss=0 |
| n_sa=0 |
| if show: |
| print(f'Epoch {i}') |
| r = linear_layer(feature_matrix) |
| r[not_valid_state_action] = -float('inf') |
| policy = soft_policy(transition_probability=transition_probability, reward=r, discount=discount, show=show, reward_sa=True) |
| for trajectory in trajectories: |
| for (s,a,_) in trajectory[:-1]: |
| loss -= torch.log(policy[s, a]) |
| n_sa += 1 |
| optim.zero_grad() |
| loss.backward() |
| training_loss.append(loss.detach().item()) |
| optim.step() |
| reward = linear_layer(feature_matrix) |
| reward[not_valid_state_action] = torch.nan |
| nan_mask = torch.isnan(reward) |
| mean_reward = torch.nanmean(reward) |
| std_reward = torch.std(reward[~nan_mask]) |
| reward = (reward - mean_reward) / std_reward |
| reward[not_valid_state_action] = -float('inf') |
| if show: |
| plt.plot(np.arange(len(training_loss)), training_loss) |
| plt.show() |
| return {'train loss': training_loss, 'alpha': linear_layer.weights.detach().numpy(), 'reward': reward.detach().numpy()} |
|
|
| def soft_policy(transition_probability, reward, discount=0.9, eps=1e-5, show=True, reward_sa=True): |
| |
| n_states, n_actions = transition_probability.shape[0], transition_probability.shape[1] |
| p = torch.tensor([transition_probability[:, a, :] for a in range(n_actions)], dtype=torch.float32) |
| |
| v = torch.full((n_states,), -float(1e20), requires_grad=False) |
| q = torch.full((n_states, n_actions), -float(1e20), requires_grad=True) |
| delta = torch.inf |
| count = 0 |
| |
| while delta > eps: |
| count += 1 |
| v_old = v.clone() |
| if not reward_sa: |
| q = torch.stack([reward + discount * p[a].matmul(v_old) for a in range(n_actions)], dim=1) |
| else: |
| q = torch.stack([reward[:, a] + discount * p[a].matmul(v_old) for a in range(n_actions)],dim=1) |
| v = torch.ones((n_states,), requires_grad=False) |
| for a in range(n_actions): |
| v = torch.maximum(v, q[:, a]) + torch.log1p(torch.exp(torch.minimum(v, q[:, a]) - torch.maximum(v, q[:, a]))) |
| delta = torch.max(torch.abs(v - v_old)) |
| |
| |
| if show: |
| print(f"Delta: {delta.item()}") |
| print('Calculate policy: ', count, 'iterations (soft policy)') |
| |
| if not reward_sa: |
| q = torch.stack([reward + discount * p[a].matmul(v) for a in range(n_actions)], dim=1) |
| else: |
| q = torch.stack([reward[:, a] + discount * p[a].matmul(v) for a in range(n_actions)], dim=1) |
| |
| return torch.exp(q - v.unsqueeze(1)) |
|
|
| def mlirl_each_week(trajectories_each_week, world, feature_matrix, |
| discount=0.9, learning_rate=0.001, epochs=50, l1_reg=0.01, l2_reg=0.01, |
| init=False, remove_outlier=False, show=True): |
| transition_probability = world.p_transition |
| p_start_state = world.p_start_state |
| history_weeks = [] |
| print('Init, remove_outlier:', init, remove_outlier) |
| for itr in range(len(trajectories_each_week)): |
| print('WEEK:', itr) |
| if (len(trajectories_each_week[itr]) == 0): |
| print('Empty trajectories') |
| history_weeks.append({'train loss': [], 'alpha': [], 'reward': []}) |
| continue |
| trajectories = [] |
| if remove_outlier: |
| length_trajectories = [len(t) for t in trajectories_each_week[itr]] |
| lower_bound, upper_bound = int(np.percentile(length_trajectories, 5)), int(np.percentile(length_trajectories, 95)) |
| print(f'lower_bound<upper_bound: {lower_bound}<{upper_bound}') |
| trajectories = [t[:min(len(t), upper_bound)] for t in trajectories_each_week[itr] if (len(t) > lower_bound)] |
| print('before:', len(trajectories_each_week[itr]), 'after:', len(trajectories)) |
| else: |
| trajectories = trajectories_each_week[itr] |
| trajectories = [t for t in trajectories if (len(t) > 0)] |
| history = {} |
| if init and itr != 0: |
| history = mlirl(feature_matrix=feature_matrix, p_start_state=p_start_state, transition_probability=transition_probability, |
| trajectories=trajectories, epochs=epochs, discount=discount, learning_rate=learning_rate, |
| init=np.copy(history_weeks[-1]['alpha']), show=show, l1_reg=l1_reg, l2_reg=l2_reg) |
| else: |
| history = mlirl(feature_matrix=feature_matrix, p_start_state=p_start_state, transition_probability=transition_probability, |
| trajectories=trajectories, epochs=epochs, discount=discount, learning_rate=learning_rate, show=show, l1_reg=l1_reg, l2_reg=l2_reg) |
| train_loss, alpha, reward = history['train loss'], history['alpha'], history['reward'] |
| if show: |
| plt.plot(np.arange(len(train_loss)), train_loss) |
| plt.show() |
| history_weeks.append(history) |
| return history_weeks |
|
|
|
|
| class LinearLayer(nn.Module): |
| def __init__(self, num_features, init_weights=None): |
| super().__init__() |
| self.weights = nn.Parameter(torch.rand(num_features)) |
| if (init_weights is not None): |
| self.weights = nn.Parameter(torch.from_numpy(np.copy(init_weights))) |
| |
| def forward(self, feature_matrix): |
| output = torch.matmul(torch.from_numpy(feature_matrix).float(), self.weights.t()) |
| return output |
| |
|
|