CarRacing_Copy / app.py
JangTaeng's picture
Upload 6 files
14e8b91 verified
"""
CarRacing-v3: 랜덤 에이전트 vs DQN 에이전트 비교 데모
Hugging Face Spaces (Gradio)용 앱
기능:
1. 사전학습 모델 데모 (dqn_carracing_300ep.pth 로드)
2. 직접 학습시키기 (에피소드 수 선택 → 실시간 학습 → 결과 비교)
"""
import gradio as gr
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import gymnasium as gym
import cv2
import random
import copy
import time
import tempfile
import os
from collections import deque
# ─────────────────────────────────────────────
# 1. 모델 & 환경 정의 (5-1 노트북과 동일)
# ─────────────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def preprocess_frame(frame):
gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
resized = cv2.resize(gray, (84, 84))
return resized.astype(np.float32) / 255.0
def discrete_to_continuous(action):
action_map = {
0: np.array([-0.5, 0.1, 0.0]), # 좌회전 (가속을 0.1로 낮춤)
1: np.array([0.0, 0.3, 0.0]), # 직진 (가속을 0.3으로 낮춤)
2: np.array([0.5, 0.1, 0.0]), # 우회전 (가속을 0.1로 낮춤)
3: np.array([0.0, 0.0, 0.8]) # 브레이크 (유지)
}
return action_map.get(action, np.array([0.0, 0.0, 0.0]))
class CarRacingWrapper:
def __init__(self, render_mode=None):
self.env = gym.make("CarRacing-v3", render_mode=render_mode)
self.frames = deque(maxlen=4)
def reset(self):
obs, _ = self.env.reset()
p = preprocess_frame(obs)
for _ in range(4):
self.frames.append(p)
return np.array(list(self.frames))
def step(self, action):
obs, r, term, trunc, info = self.env.step(discrete_to_continuous(action))
self.frames.append(preprocess_frame(obs))
return np.array(list(self.frames)), r, term, trunc, info
def render(self):
return self.env.render()
def close(self):
self.env.close()
class DQN(nn.Module):
def __init__(self, action_dim=4, input_channels=4):
super().__init__()
self.conv1 = nn.Conv2d(input_channels, 32, 8, 4)
self.conv2 = nn.Conv2d(32, 64, 4, 2)
self.conv3 = nn.Conv2d(64, 64, 3, 1)
with torch.no_grad():
d = torch.zeros(1, input_channels, 84, 84)
d = F.relu(self.conv1(d))
d = F.relu(self.conv2(d))
d = F.relu(self.conv3(d))
self._cs = d.view(1, -1).size(1)
self.fc1 = nn.Linear(self._cs, 512)
self.fc2 = nn.Linear(512, action_dim)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.relu(self.conv2(x))
x = F.relu(self.conv3(x))
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
return self.fc2(x)
# ─────────────────────────────────────────────
# 2. ReplayBuffer & DQNAgent (5-1 노트북과 동일)
# ─────────────────────────────────────────────
class ReplayBuffer:
def __init__(self, cap):
self.buffer = deque(maxlen=cap)
def push(self, s, a, r, ns, d):
self.buffer.append((s, a, r, ns, d))
def sample(self, bs):
batch = random.sample(self.buffer, bs)
s, a, r, ns, d = zip(*batch)
return (torch.FloatTensor(np.array(s)),
torch.LongTensor(np.array(a)),
torch.FloatTensor(np.array(r)),
torch.FloatTensor(np.array(ns)),
torch.BoolTensor(np.array(d)))
def __len__(self):
return len(self.buffer)
class DQNAgent:
def __init__(self, lr=0.0001, gamma=0.99, eps_start=1.0, eps_end=0.05,
eps_decay=0.995, buf_size=10000, batch_size=32, target_update=1000):
self.action_dim = 4
self.gamma = gamma
self.batch_size = batch_size
self.target_update_freq = target_update
self.main_net = DQN(4).to(device)
self.target_net = copy.deepcopy(self.main_net)
self.optimizer = optim.Adam(self.main_net.parameters(), lr=lr)
self.buffer = ReplayBuffer(buf_size)
self.epsilon = eps_start
self.eps_end = eps_end
self.eps_decay = eps_decay
self.step_count = 0
def select_action(self, state, training=True):
if training and random.random() < self.epsilon:
return random.randint(0, 3)
with torch.no_grad():
st = torch.FloatTensor(state).unsqueeze(0).to(device)
return self.main_net(st).argmax(1).item()
def update(self):
if len(self.buffer) < self.batch_size:
return None
s, a, r, ns, d = self.buffer.sample(self.batch_size)
s, a, r, ns, d = s.to(device), a.to(device), r.to(device), ns.to(device), d.to(device)
cq = self.main_net(s).gather(1, a.unsqueeze(1)).squeeze(1)
with torch.no_grad():
tq = r + self.gamma * self.target_net(ns).max(1)[0] * (~d).float()
loss = F.smooth_l1_loss(cq, tq)
self.optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.main_net.parameters(), 10.0)
self.optimizer.step()
self.step_count += 1
if self.step_count % self.target_update_freq == 0:
self.target_net.load_state_dict(self.main_net.state_dict())
return loss.item()
def decay_epsilon(self):
self.epsilon = max(self.eps_end, self.epsilon * self.eps_decay)
# ─────────────────────────────────────────────
# 3. 사전학습 모델 로드
# ─────────────────────────────────────────────
MODEL_PATH = "dqn_carracing_300ep.pth"
pretrained_model = DQN(4).to(device)
if os.path.exists(MODEL_PATH):
try:
pretrained_model.load_state_dict(
torch.load(MODEL_PATH, map_location=device)
)
pretrained_model.eval()
MODEL_LOADED = True
print(f"✅ 사전학습 모델 로드 완료: {MODEL_PATH}")
except Exception as e:
MODEL_LOADED = False
print(f"❌ 모델 로드 실패: {e}")
else:
MODEL_LOADED = False
print(f"⚠️ 모델 파일({MODEL_PATH})이 없습니다.")
# ─────────────────────────────────────────────
# 4. 영상 녹화 함수
# ─────────────────────────────────────────────
def record_episode(model, use_model=True, max_steps=400):
"""에피소드 한 판을 녹화해서 mp4 경로와 총 보상을 반환"""
env = CarRacingWrapper(render_mode="rgb_array")
state = env.reset()
frames = []
total_reward = 0.0
for step in range(max_steps):
frame = env.render()
if frame is not None:
frames.append(frame)
if use_model:
with torch.no_grad():
st = torch.FloatTensor(state).unsqueeze(0).to(device)
action = model(st).argmax(1).item()
else:
action = random.randint(0, 3)
state, reward, term, trunc, _ = env.step(action)
total_reward += reward
if term or trunc:
break
env.close()
if not frames:
return None, 0.0
h, w, _ = frames[0].shape
tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
writer = cv2.VideoWriter(tmp.name, fourcc, 30, (w, h))
for f in frames:
writer.write(cv2.cvtColor(f, cv2.COLOR_RGB2BGR))
writer.release()
return tmp.name, total_reward
# ─────────────────────────────────────────────
# 5. DQN 학습 함수 (5-1 노트북의 train_dqn 기반)
# ─────────────────────────────────────────────
def train_dqn(num_episodes, progress=gr.Progress()):
"""DQN을 처음부터 학습하고, 학습 과정과 결과 영상을 반환"""
num_episodes = int(num_episodes)
max_steps = 400
env = CarRacingWrapper(render_mode="rgb_array")
agent = DQNAgent(eps_decay=0.99)
episode_rewards = []
episode_losses = []
epsilons = []
start_time = time.time()
for episode in range(num_episodes):
progress((episode + 1) / num_episodes,
desc=f"학습 중: 에피소드 {episode+1}/{num_episodes}")
state = env.reset()
ep_reward = 0
ep_losses = []
# [추가] 연속 감점(음수 보상)을 세는 카운터
negative_reward_count = 0
for step in range(max_steps):
action = agent.select_action(state)
next_state, reward, terminated, truncated, info = env.step(action)
done = terminated or truncated
# [추가] 트랙 이탈 방지 및 강제 종료 로직
if reward < 0:
negative_reward_count += 1
else:
negative_reward_count = 0 # 양수 보상을 받으면 카운터 초기화
# 50 프레임(약 1.5초~2초) 연속으로 감점만 받았다면 길을 잃은 것으로 간주
if negative_reward_count >= 50:
done = True # 에피소드 강제 종료
reward -= 20.0 # 트랙을 이탈한 것에 대한 강력한 페널티 부여
agent.buffer.push(state, action, reward, next_state, done)
loss = agent.update()
if loss is not None:
ep_losses.append(loss)
state = next_state
ep_reward += reward
if done:
break
agent.decay_epsilon()
episode_rewards.append(ep_reward)
episode_losses.append(np.mean(ep_losses) if ep_losses else 0)
epsilons.append(agent.epsilon)
total_time = time.time() - start_time
env.close()
# 학습된 모델을 eval 모드로 전환
agent.main_net.eval()
# 학습 완료 후 결과 영상 녹화
random_path, random_reward = record_episode(agent.main_net, use_model=False, max_steps=500)
trained_path, trained_reward = record_episode(agent.main_net, use_model=True, max_steps=500)
# 학습 로그 생성
log_lines = []
for i in range(len(episode_rewards)):
if (i + 1) % 10 == 0 or i == 0 or i == len(episode_rewards) - 1:
avg_r = np.mean(episode_rewards[max(0, i-9):i+1])
log_lines.append(
f"에피소드 {i+1:>4}/{num_episodes} | "
f"보상: {episode_rewards[i]:>7.1f} | "
f"최근10 평균: {avg_r:>7.1f} | "
f"ε: {epsilons[i]:.3f}"
)
result_summary = (
f"=== 학습 완료 ({num_episodes} 에피소드, {total_time/60:.1f}분 소요) ===\n"
f"최종 평균 보상 (마지막 10): {np.mean(episode_rewards[-10:]):.1f}\n"
f"최고 보상: {np.max(episode_rewards):.1f}\n"
f"최저 보상: {np.min(episode_rewards):.1f}\n"
f"\n--- 데모 결과 ---\n"
f"🎲 랜덤: {random_reward:.1f} vs 🧠 학습된 DQN: {trained_reward:.1f} "
f"({'DQN 승리! 🏆' if trained_reward > random_reward else '랜덤이 이김 😅' if random_reward > trained_reward else '무승부'})"
)
log_text = "\n".join(log_lines)
return random_path, trained_path, result_summary, log_text
# ─────────────────────────────────────────────
# 6. 사전학습 모델 데모 핸들러
# ─────────────────────────────────────────────
def run_pretrained_demo():
if not MODEL_LOADED:
return None, None, "⚠️ 사전학습 모델 파일(dqn_carracing_300ep.pth)이 없습니다."
random_path, random_reward = record_episode(pretrained_model, use_model=False, max_steps=400)
trained_path, trained_reward = record_episode(pretrained_model, use_model=True, max_steps=400)
info = (
f"🎲 랜덤: {random_reward:.1f} vs 🧠 사전학습 DQN: {trained_reward:.1f} "
f"({'DQN 승리! 🏆' if trained_reward > random_reward else '랜덤이 이김 😅' if random_reward > trained_reward else '무승부'})"
)
return random_path, trained_path, info
# ─────────────────────────────────────────────
# 7. Gradio UI
# ─────────────────────────────────────────────
with gr.Blocks(
title="🏎️ CarRacing: Random vs DQN",
theme=gr.themes.Soft(),
) as demo:
gr.Markdown(
"""
# 🏎️ CarRacing-v3 : 랜덤 vs DQN 에이전트
강화학습(DQN)으로 학습한 자동차 에이전트와 랜덤 에이전트를 비교합니다.
"""
)
# ── 탭 1: 사전학습 모델 데모 ──
with gr.Tab("📦 사전학습 모델 데모"):
gr.Markdown(
"### 미리 학습된 모델(dqn_carracing_300ep.pth)로 바로 비교\n"
"이전에 학습하여 저장한 모델을 불러와 랜덤 에이전트와 비교합니다."
)
btn_pretrained = gr.Button("🏁 사전학습 모델 실행", variant="primary", size="lg")
with gr.Row():
vid_pre_r = gr.Video(label="🎲 랜덤 에이전트")
vid_pre_t = gr.Video(label="🧠 사전학습 DQN 에이전트")
txt_pre = gr.Textbox(label="비교 결과", interactive=False)
btn_pretrained.click(
fn=run_pretrained_demo,
outputs=[vid_pre_r, vid_pre_t, txt_pre],
)
# ── 탭 2: 직접 학습시키기 ──
with gr.Tab("🎓 직접 학습시키기"):
gr.Markdown(
"### DQN을 처음부터 학습시키고 결과를 확인\n"
"에피소드 수를 선택하면 해당 횟수만큼 **실제로 학습**한 후 랜덤 에이전트와 비교합니다.\n"
"에피소드가 많을수록 성능이 좋아지지만 학습 시간도 늘어납니다.\n\n"
"⏱️ **예상 소요 시간** (CPU 기준): 50 에피소드 ~5분 / 100 에피소드 ~10분 / 300 에피소드 ~30분"
)
num_episodes = gr.Slider(
10, 500, value=50, step=10,
label="학습 에피소드 수",
info="DQN 학습에 사용할 에피소드 수 (많을수록 성능 향상, 시간 증가)"
)
btn_train = gr.Button("🚀 학습 시작", variant="primary", size="lg")
with gr.Row():
vid_train_r = gr.Video(label="🎲 랜덤 에이전트")
vid_train_t = gr.Video(label="🧠 학습된 DQN 에이전트")
txt_train_result = gr.Textbox(label="학습 결과 요약", interactive=False)
txt_train_log = gr.Textbox(label="학습 로그 (10 에피소드마다)", interactive=False, lines=10, max_lines=20)
btn_train.click(
fn=train_dqn,
inputs=[num_episodes],
outputs=[vid_train_r, vid_train_t, txt_train_result, txt_train_log],
)
# ── 하단 정보 ──
gr.Markdown(
"""
---
**사용 방법**
1. **사전학습 모델 데모**: 미리 학습된 모델(dqn_carracing_300ep.pth)로 바로 결과를 확인합니다.
2. **직접 학습시키기**: 에피소드 수를 선택하고 DQN을 처음부터 학습시킵니다.
- 에피소드 수가 많을수록 더 잘 학습됩니다.
- 학습 완료 후 랜덤 에이전트와 비교 영상을 자동으로 생성합니다.
**모델 파일**: `dqn_carracing_300ep.pth` (이전 학습 노트북에서 저장한 파일)를 이 Space에 함께 업로드하세요.
"""
)
# ─────────────────────────────────────────────
# 8. 실행
# ─────────────────────────────────────────────
if __name__ == "__main__":
demo.launch()