|
|
import random |
|
|
|
|
|
import numpy as np |
|
|
import gym |
|
|
import time |
|
|
from tqdm import tqdm |
|
|
import configparser |
|
|
class Qlearning: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, env = gym.make('CartPole-v1'), file='config.ini'): |
|
|
self.env = env |
|
|
self.load_values(file) |
|
|
|
|
|
|
|
|
|
|
|
def load_values(self,file): |
|
|
config = configparser.ConfigParser() |
|
|
config.read(file) |
|
|
|
|
|
cart_velocity_min = float(config['Parameters']['cart_velocity_min']) |
|
|
cart_velocity_max = float(config['Parameters']['cart_velocity_max']) |
|
|
pole_angle_velocity_min = float(config['Parameters']['pole_angle_velocity_min']) |
|
|
pole_angle_velocity_max = float(config['Parameters']['pole_angle_velocity_max']) |
|
|
number_of_bins_position = int(config['Parameters']['number_of_bins_position']) |
|
|
number_of_bins_velocity = int(config['Parameters']['number_of_bins_velocity']) |
|
|
number_of_bins_angle = int(config['Parameters']['number_of_bins_angle']) |
|
|
number_of_bins_angle_velocity = int(config['Parameters']['number_of_bins_angle_velocity']) |
|
|
self.action_number = self.env.action_space.n |
|
|
self.alpha = float(config['Parameters']['alpha']) |
|
|
self.gamma = float(config['Parameters']['gamma']) |
|
|
self.epsilon = float(config['Parameters']['epsilon']) |
|
|
self.numEpisodes = int(config['Parameters']['number_episodes']) |
|
|
|
|
|
self.upperBounds = self.env.observation_space.high |
|
|
self.lowerBounds = self.env.observation_space.low |
|
|
self.upperBounds[1] = cart_velocity_max |
|
|
self.upperBounds[3] = pole_angle_velocity_max |
|
|
self.lowerBounds[1] = cart_velocity_min |
|
|
self.lowerBounds[3] = pole_angle_velocity_min |
|
|
|
|
|
self.batch_size = int(config['Parameters']['batch_size']) |
|
|
|
|
|
self.rewardsEpisode = 0 |
|
|
self.sumRewardsEpisode = [] |
|
|
|
|
|
|
|
|
self.num_bins = [number_of_bins_position, number_of_bins_velocity, number_of_bins_angle, |
|
|
number_of_bins_angle_velocity] |
|
|
|
|
|
self.replayBuffer = [] |
|
|
self.Q = np.random.uniform(0, 1, size=(self.num_bins[0], self.num_bins[1], self.num_bins[2], self.num_bins[3], self.action_number)) |
|
|
|
|
|
|
|
|
def returnIndexState(self, state): |
|
|
position = state[0] |
|
|
velocity = state[1] |
|
|
angle = state[2] |
|
|
angularVelocity = state[3] |
|
|
|
|
|
cartPositionBin = np.linspace(self.lowerBounds[0], self.upperBounds[0], self.num_bins[0]) |
|
|
cartVelocityBin = np.linspace(self.lowerBounds[1], self.upperBounds[1], self.num_bins[1]) |
|
|
cartAngleBin = np.linspace(self.lowerBounds[2], self.upperBounds[2], self.num_bins[2]) |
|
|
cartAngularVelocityBin = np.linspace(self.lowerBounds[3], self.upperBounds[3], self.num_bins[3]) |
|
|
|
|
|
indexPosition = np.maximum(np.digitize(position, cartPositionBin) - 1, 0) |
|
|
indexVelocity = np.maximum(np.digitize(velocity, cartVelocityBin) - 1, 0) |
|
|
indexAngle = np.maximum(np.digitize(angle, cartAngleBin) - 1, 0) |
|
|
indexAngularVelocity = np.maximum(np.digitize(angularVelocity, cartAngularVelocityBin) - 1, 0) |
|
|
|
|
|
return tuple([indexPosition, indexVelocity, indexAngle, indexAngularVelocity]) |
|
|
|
|
|
def selectAction(self, state, index): |
|
|
|
|
|
if index < self.numEpisodes * 0.1: |
|
|
return np.random.choice(self.action_number) |
|
|
|
|
|
|
|
|
randomNumber = np.random.random() |
|
|
|
|
|
|
|
|
if index > self.numEpisodes * 0.6: |
|
|
self.epsilon = 0.999 * self.epsilon |
|
|
|
|
|
|
|
|
if randomNumber < self.epsilon: |
|
|
return np.random.choice(self.action_number) |
|
|
|
|
|
|
|
|
else: |
|
|
return np.random.choice(np.where( |
|
|
self.Q[self.returnIndexState(state)] == np.max(self.Q[self.returnIndexState(state)]))[0]) |
|
|
|
|
|
def train(self): |
|
|
for indexEpisode in tqdm(range(self.numEpisodes)): |
|
|
|
|
|
rewardsEpisode = [] |
|
|
(stateS, _) = self.env.reset() |
|
|
stateS = list(stateS) |
|
|
|
|
|
terminalState = False |
|
|
steps = 0 |
|
|
|
|
|
while not terminalState and steps < 2000: |
|
|
steps += 1 |
|
|
stateSIndex = self.returnIndexState(stateS) |
|
|
actionA = self.selectAction(stateS, indexEpisode) |
|
|
|
|
|
(stateSprime, reward, terminalState, _, _) = self.env.step(actionA) |
|
|
rewardsEpisode.append(reward) |
|
|
stateSprime = list(stateSprime) |
|
|
|
|
|
|
|
|
self.replayBuffer.append([stateS,actionA,reward,stateSprime,terminalState]) |
|
|
|
|
|
stateSprimeIndex = self.returnIndexState(stateSprime) |
|
|
|
|
|
QmaxPrime = np.max(self.Q[stateSprimeIndex]) |
|
|
if not terminalState: |
|
|
error = reward + self.gamma * QmaxPrime - self.Q[stateSIndex + (actionA,)] |
|
|
self.Q[stateSIndex + (actionA,)] = self.Q[stateSIndex + (actionA,)] + self.alpha * error |
|
|
else: |
|
|
error = reward - self.Q[stateSIndex + (actionA,)] |
|
|
self.Q[stateSIndex + (actionA,)] = self.Q[stateSIndex + (actionA,)] + self.alpha * error |
|
|
|
|
|
stateS = stateSprime |
|
|
|
|
|
if indexEpisode % 5 == 0: |
|
|
self.updateQValues() |
|
|
|
|
|
self.sumRewardsEpisode.append(np.sum(rewardsEpisode)) |
|
|
|
|
|
|
|
|
def updateQValues(self): |
|
|
if len(self.replayBuffer)<self.batch_size: |
|
|
return |
|
|
|
|
|
|
|
|
batch = random.sample(self.replayBuffer, self.batch_size) |
|
|
|
|
|
for experience in batch: |
|
|
state,action,reward,next_state,done = experience |
|
|
stateIndex = self.returnIndexState(state) |
|
|
actionIndex = action |
|
|
|
|
|
if not done: |
|
|
next_stateIndex = self.returnIndexState(next_state) |
|
|
QmaxPrime = np.max(self.Q[next_stateIndex]) |
|
|
error = reward + self.gamma * QmaxPrime - self.Q[stateIndex + (actionIndex,)] |
|
|
else: |
|
|
error = reward - self.Q[stateIndex + (actionIndex,)] |
|
|
self.Q[stateIndex + (actionIndex,)] += self.alpha * error |
|
|
|
|
|
def simulateLearnedStrategy(self,env1 = gym.make("CartPole-v1"), render=False): |
|
|
import gym |
|
|
import time |
|
|
|
|
|
|
|
|
(currentState, _) = env1.reset() |
|
|
if render: |
|
|
env1.render() |
|
|
timeSteps = 3000 |
|
|
steps = 0 |
|
|
|
|
|
obtainedRewards = [] |
|
|
terminated = False |
|
|
truncated = False |
|
|
while (not (terminated or truncated)) or steps < timeSteps: |
|
|
steps+=1 |
|
|
|
|
|
|
|
|
actionInStateS = np.random.choice(np.where(self.Q[self.returnIndexState(currentState)] == np.max( |
|
|
self.Q[self.returnIndexState(currentState)]))[0]) |
|
|
currentState, reward, terminated, truncated, info = env1.step(actionInStateS) |
|
|
obtainedRewards.append(reward) |
|
|
time.sleep(0.05) |
|
|
if (terminated): |
|
|
time.sleep(1) |
|
|
break |
|
|
return obtainedRewards, env1 |
|
|
|
|
|
def simulateRandomStrategy(self): |
|
|
env2 = gym.make('CartPole-v1') |
|
|
(currentState, _) = env2.reset() |
|
|
|
|
|
|
|
|
episodeNumber = 100 |
|
|
|
|
|
timeSteps = 1000 |
|
|
|
|
|
rewardsEpisode = [] |
|
|
|
|
|
|
|
|
for timeIndex in range(timeSteps): |
|
|
random_action = env2.action_space.sample() |
|
|
observation, reward, terminated, truncated, info = env2.step(random_action) |
|
|
rewardsEpisode.append(reward) |
|
|
if (terminated): |
|
|
break |
|
|
|
|
|
return np.sum(rewardsEpisode), env2 |
|
|
|
|
|
|
|
|
|