# Q-Learning en el juego del Dinosaurio en Google Chrome

Usamos el modulo Pygame y el tutorial de Max Teaches Tech de Youtube para recrear el juego del dinosaurio. El repositorio con los assets se encuentra aqui:

https://github.com/maxontech/chrome-dinosaur/tree/master/Assets


En los primeros 300 puntos del juego, solo aparecen cactus; despues, aparecen mas obstaculos, y la velocidad va aumentando entre mas avanza el juego.

# Configuraciones Iniciales y Importaciones
Aquí definimos las dimensiones de la pantalla del juego y establecemos configuraciones iniciales como la velocidad del juego, tamaños de memoria, y parámetros para el entrenamiento de la red neuronal. También importamos las bibliotecas necesarias como Pygame para la interfaz gráfica y PyTorch para la implementación de la red neuronal.

### 1. Tamaños de Memoria de Replay
- **`INIT_REPLAY_MEM_SIZE = 5_000` y `REPLAY_MEMORY_SIZE = 45_000`:** Una memoria de replay suficientemente grande permite al agente aprender de muchas experiencias pasadas, y asi generalizar.
- **`MIN_REPLAY_MEMORY_SIZE = 1_000`:** Este es el tamaño mínimo de memoria de replay necesario antes de comenzar el entrenamiento. Garantiza que el modelo tenga suficientes datos para un entrenamiento bueno.

### 2. Tamaño del Minibatch y Factor de Descuento
- **`MINIBATCH_SIZE = 64`:** Un tamaño de minibatch de 64 es un equilibrio común entre eficiencia computacional y estabilidad del entrenamiento. Permite que la red aprenda de diferentes experiencias en cada actualización.
- **`DISCOUNT = 0.95`:** El factor de descuento (gamma) de 0.95 reduce el valor presente de las recompensas futuras, lo que ayuda a enfocar el agente en recompensas a corto plazo pero sin ignorar completamente las consecuencias a largo plazo.

### 3. Actualización del Modelo Objetivo
- **`UPDATE_TARGET_THRESH = 5`:** La actualización de los pesos del modelo objetivo cada 5 episodios ayuda a mantener la estabilidad del aprendizaje. Asegura que el modelo objetivo no cambie demasiado rápido y proporciona estimaciones consistentes de los valores Q objetivo.

### 4. Estrategia Epsilon-Greedy
- **`EPSILON_INIT = 0.45`, `EPSILON_DECAY = 0.997`, `MIN_EPSILON = 0.05`:** Estos valores controlan la tasa de exploración/explotación del agente. Un `epsilon` inicial de 0.45 se eligió debido a las características específicas del juego (p. ej., la duración de la acción de saltar comparada con correr o agacharse). La tasa de decaimiento de 0.997 reduce gradualmente la exploración a medida que el agente aprende, pero evita que se reduzca a cero, manteniendo siempre una cierta probabilidad de exploración.

### 5. Duración del Entrenamiento
- **`NUM_EPISODES = 2_000`:** El numero de epochs que queremos que corra.

### Conclusión
La selección de estos hiperparámetros refleja un enfoque equilibrado que considera las particularidades del juego del dinosaurio, como la diferencia en la duración de las acciones y la necesidad de estabilizar el aprendizaje. Además, la estrategia de aprendizaje y memoria de replay ayuda a mantener un equilibrio entre aprender de experiencias recientes y no olvidar lecciones anteriores, facilitando un aprendizaje eficaz y robusto.

In [1]:
#%pip install sqlalchemy
#%pip install pygame
#Para usar GPU y torch
# %pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

In [2]:
SCREEN_HEIGHT = 600
SCREEN_WIDTH = 1100

INIT_GAME_SPEED = 14
X_POS_BG_INIT = 0
Y_POS_BG = 380

INIT_REPLAY_MEM_SIZE = 5_000
REPLAY_MEMORY_SIZE = 45_000
MODEL_NAME = "DINO"
MIN_REPLAY_MEMORY_SIZE = 1_000
MINIBATCH_SIZE = 64
DISCOUNT = 0.95
UPDATE_TARGET_THRESH = 5
#EPSILON_INIT = 0.45 epsilon inicial
EPSILON_INIT = 0.25 #modificamos para que sea menos exploratorio, menor epsilon menos exploratorio
#EPSILON_DECAY = 0.997 epsilon inicial
EPSILON_DECAY = 0.75 #modificamos para que sea menos exploratorio, menor epsilon menos exploratorio
NUM_EPISODES = 100
MIN_EPSILON = 0.05

In [3]:
import pygame
import os

import torch
import torch.nn as nn
import torch.optim as optim


import pandas as pd
import numpy as np
from collections import deque
import random

import pygame
import random
from typing import List

from argparse import Action
import random
import sys
import pygame

from sqlalchemy import asc
import math
import time
from tqdm import tqdm
from datetime import datetime

pygame 2.5.2 (SDL 2.28.3, Python 3.10.9)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [4]:
import threading
import queue

# Cola para comunicación entre hilos
action_queue = queue.Queue()


# Carga de Assets
Esta sección carga las imágenes necesarias para el juego, como el dinosaurio, cactus, pájaros, etc.
Utilizamos pygame.image.load() para cargar cada imagen y las almacenamos en listas o variables para su uso en el juego.

In [5]:
RUNNING = [pygame.image.load(os.path.join("Assets/Dino", "DinoRun1.png")), 
        pygame.image.load(os.path.join("Assets/Dino", "DinoRun2.png"))]

DUCKING = [pygame.image.load(os.path.join("Assets/Dino", "DinoDuck1.png")), 
        pygame.image.load(os.path.join("Assets/Dino", "DinoDuck2.png"))]


JUMPING = pygame.image.load(os.path.join("Assets/Dino", "DinoJump.png"))

SMALL_CACTUS = [pygame.image.load(os.path.join("Assets/Cactus", "SmallCactus1.png")), 
                pygame.image.load(os.path.join("Assets/Cactus", "SmallCactus2.png")), 
                pygame.image.load(os.path.join("Assets/Cactus", "SmallCactus3.png"))]


LARGE_CACTUS = [pygame.image.load(os.path.join("Assets/Cactus", "LargeCactus1.png")), 
                pygame.image.load(os.path.join("Assets/Cactus", "LargeCactus2.png")), 
                pygame.image.load(os.path.join("Assets/Cactus", "LargeCactus3.png"))]

BIRD = [pygame.image.load(os.path.join("Assets/Bird", "Bird1.png")), pygame.image.load(os.path.join("Assets/Bird", "Bird2.png"))]

CLOUD = pygame.image.load(os.path.join("Assets/Other", "Cloud.png"))

BACKGROUND = pygame.image.load(os.path.join("Assets/Other", "Track.png"))

### Explicacion de Q-Learning breve

Q-Learning es un algoritmo de aprendizaje por refuerzo que el agente (en este caso, el dinosaurio en el juego) utiliza para aprender qué acciones tomar en diferentes situaciones o estados para maximizar su recompensa total.  Es un proceso iterativo donde el agente aprende gradualmente a tomar decisiones óptimas (acciones) para maximizar su recompensa total (puntuación en el juego) mediante la experimentación y el ajuste de las predicciones de los valores Q a través de una red neuronal.

1. **Valores Q (Quality):** Q-Learning se basa en una función de valor Q, que estima la "calidad" o utilidad total esperada de tomar una acción específica en un estado dado. Aqui, los valores Q indicarían cuán bueno es realizar acciones como saltar, agacharse o correr en un determinado momento del juego.

2. **Recompensas y Decisiones:** El objetivo del agente es maximizar su recompensa total. Cada vez que el agente toma una acción, recibe una recompensa (o penalización). Las recompensas se basan en la supervivencia del dinosaurio sin chocar con obstáculos.

3. **Actualización de Valores Q:** Los valores Q se actualizan a medida que el agente experimenta nuevas situaciones (estados y recompensas) y aprende de sus errores y éxitos.

### Implementación en el Proyecto

1. **Estados y Acciones:** En cada paso del juego, el agente observa el estado actual del juego (la posición del dinosaurio, los obstáculos próximos, etc.) y decide qué acción tomar (saltar, agacharse, correr). La red neuronal predice los valores Q para cada posible acción en ese estado.

2. **Memoria de Replay:** Cada experiencia (estado, acción, recompensa, nuevo estado) se almacena en una memoria de replay. Esto permite al agente aprender de experiencias pasadas.

3. **Entrenamiento del Modelo:** Se toman muestras de la memoria de replay para entrenar la red neuronal. Durante el entrenamiento, la red ajusta sus pesos para predecir con mayor precisión los valores Q basándose en las recompensas obtenidas y las predicciones del modelo objetivo.

4. **Modelo Objetivo:** Se utiliza un segundo modelo, el modelo objetivo, para calcular los valores Q objetivo durante el entrenamiento. Esto ayuda a estabilizar el aprendizaje, ya que proporciona una estimación los valores Q objetivo.

5. **Exploración vs. Explotación:** Al principio, el agente tiene más probabilidades de tomar acciones aleatorias para explorar el entorno (alta tasa de exploración `epsilon`). A medida que aprende, se vuelve más propenso a confiar en las predicciones de la red neuronal (explotación).

# Implementación de Q-Learning y Entrenamiento
 Aquí implementamos el algoritmo Q-Learning, un tipo de aprendizaje por refuerzo. Definimos el estado del juego, las posibles acciones y las recompensas asociadas. La red neuronal se actualiza continuamente en base a la memoria de replay y las recompensas obtenidas,  ajustando las decisiones del agente.
 

Los inputs son:
- Distancia del Dinosaurio del obstaculo
- Coordenada Y del dinosaurio
- Coordenada Y del Obstaculo
- Ancho del obstaculo
- Velocidad del juego
- El tipo de obstaculo (Cactus o Pterodactilo)

Los outputs son caminar, saltar o agacharte.


### Clase NeuralNetwork

Originalmente queriamos hacer el proyecto con TensorFlow, pero por limitaciones tecnicas nos limitamos al uso de PyTorch; por eso, en esta clase definimos una red neuronal simple para la toma de decisiones dentro del juego, algo que originalmente no era necesario. Se usa para predecir los valores Q, que representan el valor esperado de tomar una determinada acción en un estado dado del juego. 
 
- Constructor __init__(self)

    Hereda de nn.Module de PyTorch, que es la base para todas las redes neuronales en PyTorch. Define dos capas lineales (fc1 y fc2). La primera capa (fc1) toma 7 características de entrada y las transforma en 4 características, y la segunda capa (fc2) toma estas 4 características y produce 3 salidas. Estas salidas representarán los valores Q para cada posible acción en el juego.

- forward(self, x)

    Define cómo se pasa la entrada a través de la red. La función relu se aplica a la salida de la primera capa para introducir no linealidad, y luego se pasa a la segunda capa. El resultado es el conjunto de valores Q predichos para las acciones dadas el estado actual del juego.

In [6]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.fc1 = nn.Linear(7, 4)  # 7 input features, 4 output features
        self.fc2 = nn.Linear(4, 3)  # 4 input features, 3 output features

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x



### Clase DQNAgent


El `DQNAgent` utiliza esta red para aprender y decidir acciones basándose en el método Q-Learning. El agente entrena la red utilizando experiencias de juego anteriores almacenadas en la memoria de replay, ajustando sus estrategias para mejorar el rendimiento en el juego.

#### Constructor `__init__(self)`
- Creamos dos instancias de `NeuralNetwork`, una como el modelo principal y otra como el modelo objetivo. El modelo objetivo se actualiza ocasionalmente con los pesos del modelo principal.
- Usamos el optimizador Adam para ajustar los pesos de la red y Mean Squared Error (MSE) como la función de pérdida.
- Se inicializa dos memorias de replay (`init_replay_memory` y `late_replay_memory`) para almacenar experiencias pasadas y aprender de ellas.

#### update_replay_memory(self, transition)
- Agrega una nueva experiencia (transición) a la memoria de replay. La memoria de replay se utiliza para entrenar la red neuronal con experiencias pasadas.

#### get_qs(self, state)
- Calcula y devuelve los valores Q para un estado dado utilizando el modelo principal. Esto se utiliza para decidir qué acción tomar en un estado particular del juego.

#### train(self, terminal_state, step)
- Entrenamos la red neuronal usando un minibatch de experiencias de la memoria de replay. Utiliza tanto el modelo principal como el modelo objetivo para calcular el valor Q objetivo y ajustar los pesos del modelo principal.
- Si se alcanza un cierto umbral (indicado por `UPDATE_TARGET_THRESH`), se actualizan los pesos del modelo objetivo con los del modelo principal.

In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") #Para poder usar GPU

class DQNAgent:
    def __init__(self,learning_rate=0.001):
        self.model = NeuralNetwork().to(device)  # Mover el modelo a la GPU si está disponible
        self.target_model = NeuralNetwork().to(device)  # Mover el modelo a la GPU si está disponible
        self.target_model.load_state_dict(self.model.state_dict())
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        self.loss_function = nn.MSELoss()

        self.init_replay_memory = deque(maxlen=INIT_REPLAY_MEM_SIZE)
        self.late_replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
        self.target_update_counter = 0
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
    # Update the memory store
    def update_replay_memory(self, transition):
        # if len(self.replay_memory) > 50_000:
        #     self.replay_memory.clear()
        if len(self.init_replay_memory) < INIT_REPLAY_MEM_SIZE:
            self.init_replay_memory.append(transition)
        else:
            self.late_replay_memory.append(transition)

    # Método get_qs dentro de la clase DQNAgent
    def get_qs(self, state):
        state_tensor = torch.Tensor(state).to(device)  # Asegúrate de mover el tensor al dispositivo correcto
        with torch.no_grad():
            return self.model(state_tensor).cpu().numpy()  # Luego mueve el resultado de vuelta a la CPU si es necesario
    
    def calculate_action(self, state_queue, action_queue):
        while True:
            state = state_queue.get()  # Espera y obtiene el estado del juego
            if state is None:
                break  # Si recibes None, termina el hilo

            # Calcula la acción usando el modelo
            q_values = self.get_qs(state)
            action = np.argmax(q_values)  # Elige la acción con el Q-value más alto

            action_queue.put(action)  # Coloca la acción en la cola
            
    def train(self, terminal_state, step):
        if len(self.init_replay_memory) < MIN_REPLAY_MEMORY_SIZE:
            return

        total_mem = list(self.init_replay_memory)
        total_mem.extend(self.late_replay_memory)
        minibatch = random.sample(total_mem, MINIBATCH_SIZE)

        # Asegurarse de que los tensores estén en el dispositivo correcto
        current_states = torch.Tensor([transition[0] for transition in minibatch]).to(device)
        current_qs_list = self.model(current_states)
        new_current_states = torch.Tensor([transition[3] for transition in minibatch]).to(device)
        future_qs_list = self.target_model(new_current_states)

        X = []
        y = []

        for index, (current_state, action, reward, new_current_state, done) in enumerate(minibatch):
            if not done:
                max_future_q = torch.max(future_qs_list[index])
                new_q = reward + DISCOUNT * max_future_q
            else:
                new_q = reward

            current_qs = current_qs_list[index]
            current_qs[action] = new_q

            X.append(current_state)
            y.append(current_qs)

        X = torch.tensor(np.array(X, dtype=np.float32)).to(device)  # Mover X a la GPU
        y = torch.tensor(np.array([y_item.detach().cpu().numpy() if isinstance(y_item, torch.Tensor) else y_item for y_item in y], dtype=np.float32)).to(device)  # Mover y a la GPU

        self.optimizer.zero_grad()
        output = self.model(X)  # X ya está en el dispositivo correcto
        loss = self.loss_function(output, y)  # y ya está en el dispositivo correcto
        loss.backward()
        self.optimizer.step()

        if terminal_state:
            self.target_update_counter += 1

        if self.target_update_counter > UPDATE_TARGET_THRESH:
            self.target_model.load_state_dict(self.model.state_dict())
            self.target_update_counter = 0
            # print(self.target_update_counter)

# Definición de Clases del Juego
En esta parte del código, definimos varias clases para manejar diferentes aspectos del juego:
 - Clase Obstacle: Representa los obstáculos del juego.
 - Clase Dino: Controla el personaje del dinosaurio y su interacción con el juego.
 - Clase Game: Maneja la lógica principal del juego, incluyendo la creación de obstáculos y la actualización de la interfaz

In [8]:
class Obstacle:
    def __init__(self, image: List[pygame.Surface], type: int) -> None:
        self.image = image
        self.type = type
        self.rect = self.image[self.type].get_rect()
        self.rect.x = SCREEN_WIDTH

    def update(self, obstacles: list, game_speed: int):
        self.rect.x -= game_speed
        if self.rect.x < -self.rect.width:
            obstacles.pop()
        
    def draw(self, SCREEN: pygame.Surface):
        SCREEN.blit(self.image[self.type], self.rect)

In [9]:
class Dino(DQNAgent):
    X_POS = 80
    Y_POS = 310
    Y_DUCK_POS = 340
    JUMP_VEL = 8.5
    #code here
    def __init__(self, learning_rate=0.001):
        super().__init__(learning_rate=learning_rate) 
        self.duck_img = DUCKING
        self.run_img = RUNNING
        self.jump_img = JUMPING


        #Initially the dino starts running
        self.dino_duck = False
        self.dino_run = True
        self.dino_jump = False

        self.step_index = 0
        self.jump_vel = self.JUMP_VEL
        self.image = self.run_img[0]
        self.dino_rect = self.image.get_rect()

        self.dino_rect.x = self.X_POS
        self.dino_rect.y = self.Y_POS

        self.score = 0

        super().__init__()
    
    
    # Update the Dino's state
    def update(self, move: pygame.key.ScancodeWrapper):
        if self.dino_duck:
            self.duck()
        
        if self.dino_jump:
            self.jump()
        
        if self.dino_run:
            self.run()

        if self.step_index >= 20:
            self.step_index = 0
        

        if move[pygame.K_UP] and not self.dino_jump:
            self.dino_jump = True
            self.dino_run = False
            self.dino_duck = False

        elif move[pygame.K_DOWN] and not self.dino_jump:
            self.dino_duck = True
            self.dino_run = False
            self.dino_jump = False
        
        elif not(self.dino_jump or move[pygame.K_DOWN]):
            self.dino_run = True
            self.dino_jump = False
            self.dino_duck = False
    
    def update_auto(self, move):
        if self.dino_duck == True:
            self.duck()
        
        if self.dino_jump == True:
            self.jump()
        
        if self.dino_run == True:
            self.run()

        if self.step_index >= 20:
            self.step_index = 0
        
        if move == 0 and not self.dino_jump:
            self.dino_jump = True
            self.dino_run = False
            self.dino_duck = False

        elif move == 1 and not self.dino_jump:
            self.dino_duck = True
            self.dino_run = False
            self.dino_jump = False
        
        elif not(self.dino_jump or move == 1):
            self.dino_run = True
            self.dino_jump = False
            self.dino_duck = False

    def duck(self) -> None:
        self.image = self.duck_img[self.step_index // 10]
        self.dino_rect = self.image.get_rect()
        self.dino_rect.x = self.X_POS
        self.dino_rect.y = self.Y_DUCK_POS
        self.step_index += 1

    def run(self) -> None:
        self.image = self.run_img[self.step_index // 10]
        self.dino_rect = self.image.get_rect()
        self.dino_rect.x = self.X_POS
        self.dino_rect.y = self.Y_POS
        self.step_index += 1
        

    def jump(self) -> None:
        self.image = self.jump_img
        if self.dino_jump:
            self.dino_rect.y -= self.jump_vel * 3
            self.jump_vel -= 0.6
        
        if self.jump_vel < -self.JUMP_VEL:
            self.dino_jump = False
            self.dino_run = True
            self.jump_vel = self.JUMP_VEL

    def draw(self, SCREEN: pygame.Surface):
        SCREEN.blit(self.image, (self.dino_rect.x, self.dino_rect.y))

In [10]:
class LargeCactus(Obstacle):
    def __init__(self, image: List[pygame.Surface]) -> None:
        self.type = random.randint(0, 2)
        super().__init__(image, self.type)
        self.rect.y = 300


class SmallCactus(Obstacle):
    def __init__(self, image: List[pygame.Surface]) -> None:
        self.type = random.randint(0, 2)
        super().__init__(image, self.type)
        self.rect.y = 325

class Bird(Obstacle):
    def __init__(self, image: List[pygame.Surface]) -> None:
        self.type = 0
        super().__init__(image, self.type)
        self.rect.y = SCREEN_HEIGHT - 340
        self.index = 0
    
    def draw(self, SCREEN: pygame.Surface):
        if self.index >= 19:
            self.index = 0
        
        SCREEN.blit(self.image[self.index // 10], self.rect)
        self.index += 1
        
class Cloud:
    def __init__(self) -> None:
        self.x = SCREEN_WIDTH + random.randint(800, 1000)
        self.y = random.randint(50, 100)
        self.image = CLOUD
        self.width = self.image.get_width()

    def update(self, game_speed: int):
        self.x -= game_speed
        if self.x < -self.width:
            self.x = SCREEN_WIDTH + random.randint(800, 1000)
            self.y = random.randint(50, 100)
    

    def draw(self, SCREEN: pygame.Surface):
        SCREEN.blit(self.image, (self.x, self.y))   

#  Clase Game

La clase Game es donde se ejecuta el juego del dinosaurio. En general es casi lo mismo que la clase original, pero con play_auto siendo la parte donde se entrena al agente:


####  Inicialización y Configuración del Juego
- Se establece points_label para rastrear los puntos obtenidos en cada episodio y se inicia un ciclo que recorre un número predefinido de episodios (NUM_EPISODES).
- Para cada episodio, se inicializa episode_reward a 0 y se obtiene el estado inicial del juego mediante get_state.

#### Ciclo Principal del Juego

- Dentro de cada episodio, se inicia un while que continúa mientras self.run sea True. Este representa el juego continuo hasta que el dinosaurio choca con un obstáculo.
- Se verifica si es necesario crear un nuevo obstáculo en el juego y se llama a create_obstacle si es así.
- El agente decide qué acción tomar. Si un valor aleatorio es mayor que epsilon (que representa la probabilidad de exploración), el agente selecciona la acción basada en las predicciones de la red neuronal (usando get_qs). Si el valor es menor, se elige una acción aleatoria.

#### Actualización y Aprendizaje

- Se actualiza el juego con la acción seleccionada llamando a update_game, y se obtiene el nuevo estado del juego.
- Se calcula la recompensa basada en la interacción del dinosaurio con los obstáculos y se actualiza la memoria de replay del agente con la transición (estado actual, acción, recompensa, nuevo estado).
- Se entrena el modelo del agente con la nueva experiencia (transición).

#### Fin del Episodio y Guardado del Modelo

- Si el dinosaurio colisiona con un obstáculo, se termina el episodio. Se actualiza episode_reward con la recompensa obtenida, y se reinicia el juego para el próximo episodio.
- Se añade la reward del episodio a ep_rewards para rastrear el rendimiento a lo largo del tiempo. epsilon se reduce (decay) para disminuir la exploración a medida que el agente aprende.
- El modelo se guarda a intervalos regulares para conservar el estado de aprendizaje.

In [11]:
#Codigo para guardar los modelos
# Verifica si el directorio 'models' existe, y si no, créalo
if not os.path.exists('models/episodes'):
    os.makedirs('models/episodes')

# Verifica si el directorio 'models/highscore' existe, y si no, créalo
if not os.path.exists('models/highscore'):
    os.makedirs('models/highscore')

In [12]:
class Game:
    def __init__(self, epsilon, learning_rate, num_episodes, load_model=False, model_path=None):
        pygame.init()
        self.SCREEN = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))

        self.obstacles = []

        self.run = True

        self.clock = pygame.time.Clock()

        self.cloud = Cloud()

        self.game_speed = INIT_GAME_SPEED

        self.font = pygame.font.Font("freesansbold.ttf", 20)

        self.dino = Dino()

        # tu código existente aquí...
        self.epsilon = epsilon
        self.num_episodes = num_episodes
        self.dino = Dino(learning_rate=learning_rate)  # Pasa learning_rate aquí
        
         # Cargar el modelo si se solicita
        if load_model and model_path:
            self.dino.model.load_state_dict(torch.load(model_path, map_location=device))

        self.x_pos_bg = X_POS_BG_INIT

        self.points = 0
        
        self.epsilon = epsilon

        self.ep_rewards = [-200]

        self.high_score = 0  # Inicializa el high score con 0 o carga el high score existente de un archivo si lo prefieres

        self.best_score = 0

    def reset(self):
        self.game_speed = INIT_GAME_SPEED
        old_dino = self.dino
        self.dino = Dino()
        self.dino.init_replay_memory = old_dino.init_replay_memory
        self.dino.late_replay_memory = old_dino.late_replay_memory
        self.dino.target_update_counter = old_dino.target_update_counter

        self.dino.model.load_state_dict(old_dino.model.state_dict())
        self.dino.target_model.load_state_dict(old_dino.target_model.state_dict())

        self.x_pos_bg = X_POS_BG_INIT
        self.points = 0
        self.SCREEN = pygame.display.set_mode((SCREEN_WIDTH, SCREEN_HEIGHT))
        self.clock = pygame.time.Clock()

    def get_dist(self, pos_a: tuple, pos_b:tuple):
        dx = pos_a[0] - pos_b[0]
        dy = pos_a[1] - pos_b[1]

        return math.sqrt(dx**2 + dy**2) 

    def update_background(self):
        image_width = BACKGROUND.get_width()

        self.SCREEN.blit(BACKGROUND, (self.x_pos_bg, Y_POS_BG))
        self.SCREEN.blit(BACKGROUND, (self.x_pos_bg + image_width, Y_POS_BG))

        if self.x_pos_bg <= -image_width:
            self.SCREEN.blit(BACKGROUND, (self.x_pos_bg + image_width, Y_POS_BG))
            self.x_pos_bg = 0
        
        self.x_pos_bg -= self.game_speed
        return self.x_pos_bg
    
    def get_state(self):
        state = []
        state.append(self.dino.dino_rect.y / self.dino.Y_DUCK_POS + 10) 
        pos_a = (self.dino.dino_rect.x, self.dino.dino_rect.y)
        bird = 0
        cactus = 0
        if len(self.obstacles) == 0:
            dist = self.get_dist(pos_a, tuple([SCREEN_WIDTH + 10, self.dino.Y_POS])) / math.sqrt(SCREEN_HEIGHT**2 + SCREEN_WIDTH**2)
            obs_height = 0
            obj_width = 0
        else:
            dist = self.get_dist(pos_a, (self.obstacles[0].rect.midtop)) / math.sqrt(SCREEN_HEIGHT**2 + SCREEN_WIDTH**2)
            obs_height = self.obstacles[0].rect.midtop[1] / self.dino.Y_DUCK_POS
            obj_width = self.obstacles[0].rect.width / SMALL_CACTUS[2].get_rect().width
            if self.obstacles[0].__class__ == SmallCactus(SMALL_CACTUS).__class__ or \
                self.obstacles[0].__class__ == LargeCactus(LARGE_CACTUS).__class__:
                cactus = 1
            else:
                bird = 1
        
        state.append(dist)
        state.append(obs_height)
        state.append(self.game_speed / 24)
        state.append(obj_width)
        state.append(cactus)
        state.append(bird)
        
        return state


    def update_score(self):
        self.points += 1
        if self.points % 200 == 0:
            self.game_speed += 1

        if self.points > self.high_score:
            self.high_score = self.points

        text = self.font.render(f"Points: {self.points} Highscore: {self.high_score}", True, (0, 0, 0))
        textRect = text.get_rect()
        textRect.center = (SCREEN_WIDTH - textRect.width // 2 - 10, 40)
        self.SCREEN.blit(text, textRect)

    
    def create_obstacle(self):
        # bird_prob = random.randint(0, 15)
        # cactus_prob = random.randint(0, 10)
        # if bird_prob == 0:
        #     self.obstacles.append(Bird(BIRD))
        # elif cactus_prob == 0:
        #     self.obstacles.append(SmallCactus(SMALL_CACTUS))
        # elif cactus_prob == 1:
        #     self.obstacles.append(LargeCactus(LARGE_CACTUS))

        obstacle_prob = random.randint(0, 50)
        if obstacle_prob == 0:
            self.obstacles.append(SmallCactus(SMALL_CACTUS))
        elif obstacle_prob == 1:
            self.obstacles.append(LargeCactus(LARGE_CACTUS))
        elif obstacle_prob == 2 and self.points > 300:
            self.obstacles.append(Bird(BIRD))
    
    def update_game(self, moves, user_input=None):
        self.dino.draw(self.SCREEN)
        if user_input is not None:
            self.dino.update(user_input)
        else:
            self.dino.update_auto(moves)

        self.update_background()

        self.cloud.draw(self.SCREEN)

        self.cloud.update(self.game_speed)

        self.update_score() 

        self.clock.tick(30)

        # pygame.display.update()

    def play_manual(self):
        
        while self.run is True:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    sys.exit()
                
            self.SCREEN.fill((255, 255, 255))
            user_input = pygame.key.get_pressed()
            # moves = []

            if len(self.obstacles) == 0:
                self.create_obstacle()

            for obstacle in self.obstacles:
                obstacle.draw(SCREEN=self.SCREEN)
                obstacle.update(self.obstacles, self.game_speed)
                if self.dino.dino_rect.colliderect(obstacle.rect):
                    self.dino.score = self.points
                    pygame.quit()
                    self.obstacles.pop()
                    print("Game over!")
                    return

            self.update_game(user_input=user_input, moves=2)
            pygame.display.update()


    def play_auto(self):
        state_queue = queue.Queue()
        # Crea y comienza el hilo de cálculo
        calculation_thread = threading.Thread(target=self.dino.calculate_action, args=(state_queue, action_queue))
        calculation_thread.start()
        
        try:
            points_label = 0
            for episode in tqdm(range(1, NUM_EPISODES + 1), ascii=True, unit='episodes'):
                episode_reward = 0
                step = 1
                current_state = self.get_state()
                self.run = True
                while self.run is True:

                    for event in pygame.event.get():
                        if event.type == pygame.QUIT:
                            sys.exit()
                    
                    self.SCREEN.fill((255, 255, 255))

                    if len(self.obstacles) == 0:
                        self.create_obstacle()

                    # if self.run == False:
                    #     print(current_state)
                    #     time.sleep(2)
                    #     continue

                    if np.random.random() > self.epsilon:
                        action = self.dino.get_qs(torch.Tensor(current_state))
                        # print(action)
                        action = np.argmax(action)
                        # print(action)
                    else:
                        num = np.random.randint(0, 10)
                        if num == 0:
                            # print("yes")
                            action = num
                        elif num <= 3:
                            action = 1
                        else:
                            action = 2

                    self.update_game(moves=action)
                    # print(self.game_speed)
                    next_state = self.get_state()
                    reward = 0

                    for obstacle in self.obstacles:
                        obstacle.draw(SCREEN=self.SCREEN)
                        obstacle.update(self.obstacles, self.game_speed)
                        next_state = self.get_state()
                        if self.dino.dino_rect.x > obstacle.rect.x + obstacle.rect.width:
                            reward = 3
                        
                        if action == 0 and obstacle.rect.x > SCREEN_WIDTH // 2:
                            reward = -1
                        
                        if self.dino.dino_rect.colliderect(obstacle.rect):
                            self.dino.score = self.points
                            # pygame.quit()
                            self.obstacles.pop()
                            points_label = self.points
                            self.reset()
                            reward = -10
                            # print("Game over!")
                            self.run = False
                            break
                    current_state = self.get_state()
                    state_queue.put(current_state)  # Envía el estado actual al hilo de cálculo

                    if not action_queue.empty():
                        action = action_queue.get()  # Recibe la acción del hilo de cálculo
                        self.update_game(moves=action)

                    # if reward != 0:
                    #     print(reward > 0)
                    
                    episode_reward += reward
                    
                    self.dino.update_replay_memory(tuple([current_state, action, reward, next_state, self.run]))

                    self.dino.train( not self.run, step=step)

                    current_state = next_state

                    step += 1

                    # self.clock.tick(60)

                    #print(self.points)
                    #print(self.high_score)

                    # Al final de cada episodio, verifica si hay un nuevo mejor puntaje
                    if self.points > self.best_score:
                        self.best_score = self.points
                        # Este archivo se sobrescribirá con el último mejor modelo
                        self.best_model_filename = 'models/highscore/BestScore_model.pth'
                        torch.save(self.dino.model.state_dict(), self.best_model_filename)

                    pygame.display.update()
                

                self.ep_rewards.append(episode_reward)

                # Obtenemos la fecha y hora actual
                current_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

                # Guardar el modelo cada 50 escenarios
                if episode % 50 == 0:
                    filename = f'models/episodes/{points_label}_Points,Episode_{episode}_Date_{current_time}_model.pth'
                    torch.save(self.dino.model.state_dict(), filename)


                if self.epsilon > MIN_EPSILON:
                    self.epsilon *= EPSILON_DECAY
                    if self.epsilon < MIN_EPSILON:
                        self.epsilon = 0
                        # print(self.epsilon)
                    else:
                        self.epsilon = max(MIN_EPSILON, self.epsilon)
                    # print(self.epsilon)
                    # print((self.dino.replay_memory))
                # Al salir del bucle del juego, envía None para detener el hilo de cálculo
                state_queue.put(None)
                calculation_thread.join()
        finally:
            # Este bloque se ejecutará incluso si se interrumpe el juego.
            # Aquí duplicas el archivo del mejor puntaje alcanzado hasta ahora.
            if hasattr(self, 'best_model_filename'):
                current_time = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
                final_model_filename = f'models/highscore/{self.best_score}_BestScore_Final_{current_time}_model.pth'
                import shutil
                shutil.copy(self.best_model_filename, final_model_filename)
                print(f"Modelo duplicado guardado como: {final_model_filename}")


El código tiene dos modos de juego:
 - Modo Manual: El usuario juega controlando al dinosaurio con el teclado.
 - Modo Automático: El agente controla al dinosaurio basándose en las decisiones tomadas por el modelo de Q-Learning.


In [13]:
def run_game(epsilon, learning_rate, num_episodes):
    model_path = 'models/highscore/4245_BestScore_Final_2023-12-10_18-43-53_model.pth'
    game = Game(epsilon, learning_rate=learning_rate, num_episodes=num_episodes, load_model=True, model_path=model_path)
    game.play_auto()


In [14]:
epsilons = [0.1, 0.2, 0.3]  # diferentes valores de epsilon que quieres probar
learning_rates = [0.001, 0.005, 0.01]  # diferentes tasas de aprendizaje
num_episodes_list = [100, 200, 300]  # diferentes números de episodios

for epsilon in epsilons:
    for lr in learning_rates:
        for num_episodes in num_episodes_list:
            run_game(epsilon, lr, num_episodes)

  0%|          | 0/100 [00:00<?, ?episodes/s]

100%|##########| 100/100 [22:16<00:00, 13.37s/episodes] 


Modelo duplicado guardado como: models/highscore/3836_BestScore_Final_2023-12-10_19-53-01_model.pth


100%|##########| 100/100 [26:16<00:00, 15.77s/episodes]


Modelo duplicado guardado como: models/highscore/3813_BestScore_Final_2023-12-10_20-19-18_model.pth


100%|##########| 100/100 [27:43<00:00, 16.64s/episodes]


Modelo duplicado guardado como: models/highscore/3683_BestScore_Final_2023-12-10_20-47-01_model.pth


100%|##########| 100/100 [30:23<00:00, 18.23s/episodes]


Modelo duplicado guardado como: models/highscore/3629_BestScore_Final_2023-12-10_21-17-25_model.pth


100%|##########| 100/100 [28:44<00:00, 17.24s/episodes] 


Modelo duplicado guardado como: models/highscore/3783_BestScore_Final_2023-12-10_21-46-09_model.pth


100%|##########| 100/100 [21:19<00:00, 12.79s/episodes]


Modelo duplicado guardado como: models/highscore/4157_BestScore_Final_2023-12-10_22-07-29_model.pth


 52%|#####2    | 52/100 [15:01<31:07, 38.90s/episodes]  