""" CarRacing-v3: 랜덤 에이전트 vs DQN 에이전트 비교 데모 Hugging Face Spaces (Gradio)용 앱 기능: 1. 사전학습 모델 데모 (dqn_carracing_300ep.pth 로드) 2. 직접 학습시키기 (에피소드 수 선택 → 실시간 학습 → 결과 비교) """ import gradio as gr import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import numpy as np import gymnasium as gym import cv2 import random import copy import time import tempfile import os from collections import deque # ───────────────────────────────────────────── # 1. 모델 & 환경 정의 (5-1 노트북과 동일) # ───────────────────────────────────────────── device = torch.device("cuda" if torch.cuda.is_available() else "cpu") def preprocess_frame(frame): gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) resized = cv2.resize(gray, (84, 84)) return resized.astype(np.float32) / 255.0 def discrete_to_continuous(action): action_map = { 0: np.array([-0.5, 0.1, 0.0]), # 좌회전 (가속을 0.1로 낮춤) 1: np.array([0.0, 0.3, 0.0]), # 직진 (가속을 0.3으로 낮춤) 2: np.array([0.5, 0.1, 0.0]), # 우회전 (가속을 0.1로 낮춤) 3: np.array([0.0, 0.0, 0.8]) # 브레이크 (유지) } return action_map.get(action, np.array([0.0, 0.0, 0.0])) class CarRacingWrapper: def __init__(self, render_mode=None): self.env = gym.make("CarRacing-v3", render_mode=render_mode) self.frames = deque(maxlen=4) def reset(self): obs, _ = self.env.reset() p = preprocess_frame(obs) for _ in range(4): self.frames.append(p) return np.array(list(self.frames)) def step(self, action): obs, r, term, trunc, info = self.env.step(discrete_to_continuous(action)) self.frames.append(preprocess_frame(obs)) return np.array(list(self.frames)), r, term, trunc, info def render(self): return self.env.render() def close(self): self.env.close() class DQN(nn.Module): def __init__(self, action_dim=4, input_channels=4): super().__init__() self.conv1 = nn.Conv2d(input_channels, 32, 8, 4) self.conv2 = nn.Conv2d(32, 64, 4, 2) self.conv3 = nn.Conv2d(64, 64, 3, 1) with torch.no_grad(): d = torch.zeros(1, input_channels, 84, 84) d = F.relu(self.conv1(d)) d = F.relu(self.conv2(d)) d = F.relu(self.conv3(d)) self._cs = d.view(1, -1).size(1) self.fc1 = nn.Linear(self._cs, 512) self.fc2 = nn.Linear(512, action_dim) def forward(self, x): x = F.relu(self.conv1(x)) x = F.relu(self.conv2(x)) x = F.relu(self.conv3(x)) x = x.view(x.size(0), -1) x = F.relu(self.fc1(x)) return self.fc2(x) # ───────────────────────────────────────────── # 2. ReplayBuffer & DQNAgent (5-1 노트북과 동일) # ───────────────────────────────────────────── class ReplayBuffer: def __init__(self, cap): self.buffer = deque(maxlen=cap) def push(self, s, a, r, ns, d): self.buffer.append((s, a, r, ns, d)) def sample(self, bs): batch = random.sample(self.buffer, bs) s, a, r, ns, d = zip(*batch) return (torch.FloatTensor(np.array(s)), torch.LongTensor(np.array(a)), torch.FloatTensor(np.array(r)), torch.FloatTensor(np.array(ns)), torch.BoolTensor(np.array(d))) def __len__(self): return len(self.buffer) class DQNAgent: def __init__(self, lr=0.0001, gamma=0.99, eps_start=1.0, eps_end=0.05, eps_decay=0.995, buf_size=10000, batch_size=32, target_update=1000): self.action_dim = 4 self.gamma = gamma self.batch_size = batch_size self.target_update_freq = target_update self.main_net = DQN(4).to(device) self.target_net = copy.deepcopy(self.main_net) self.optimizer = optim.Adam(self.main_net.parameters(), lr=lr) self.buffer = ReplayBuffer(buf_size) self.epsilon = eps_start self.eps_end = eps_end self.eps_decay = eps_decay self.step_count = 0 def select_action(self, state, training=True): if training and random.random() < self.epsilon: return random.randint(0, 3) with torch.no_grad(): st = torch.FloatTensor(state).unsqueeze(0).to(device) return self.main_net(st).argmax(1).item() def update(self): if len(self.buffer) < self.batch_size: return None s, a, r, ns, d = self.buffer.sample(self.batch_size) s, a, r, ns, d = s.to(device), a.to(device), r.to(device), ns.to(device), d.to(device) cq = self.main_net(s).gather(1, a.unsqueeze(1)).squeeze(1) with torch.no_grad(): tq = r + self.gamma * self.target_net(ns).max(1)[0] * (~d).float() loss = F.smooth_l1_loss(cq, tq) self.optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(self.main_net.parameters(), 10.0) self.optimizer.step() self.step_count += 1 if self.step_count % self.target_update_freq == 0: self.target_net.load_state_dict(self.main_net.state_dict()) return loss.item() def decay_epsilon(self): self.epsilon = max(self.eps_end, self.epsilon * self.eps_decay) # ───────────────────────────────────────────── # 3. 사전학습 모델 로드 # ───────────────────────────────────────────── MODEL_PATH = "dqn_carracing_300ep.pth" pretrained_model = DQN(4).to(device) if os.path.exists(MODEL_PATH): try: pretrained_model.load_state_dict( torch.load(MODEL_PATH, map_location=device) ) pretrained_model.eval() MODEL_LOADED = True print(f"✅ 사전학습 모델 로드 완료: {MODEL_PATH}") except Exception as e: MODEL_LOADED = False print(f"❌ 모델 로드 실패: {e}") else: MODEL_LOADED = False print(f"⚠️ 모델 파일({MODEL_PATH})이 없습니다.") # ───────────────────────────────────────────── # 4. 영상 녹화 함수 # ───────────────────────────────────────────── def record_episode(model, use_model=True, max_steps=400): """에피소드 한 판을 녹화해서 mp4 경로와 총 보상을 반환""" env = CarRacingWrapper(render_mode="rgb_array") state = env.reset() frames = [] total_reward = 0.0 for step in range(max_steps): frame = env.render() if frame is not None: frames.append(frame) if use_model: with torch.no_grad(): st = torch.FloatTensor(state).unsqueeze(0).to(device) action = model(st).argmax(1).item() else: action = random.randint(0, 3) state, reward, term, trunc, _ = env.step(action) total_reward += reward if term or trunc: break env.close() if not frames: return None, 0.0 h, w, _ = frames[0].shape tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) fourcc = cv2.VideoWriter_fourcc(*"mp4v") writer = cv2.VideoWriter(tmp.name, fourcc, 30, (w, h)) for f in frames: writer.write(cv2.cvtColor(f, cv2.COLOR_RGB2BGR)) writer.release() return tmp.name, total_reward # ───────────────────────────────────────────── # 5. DQN 학습 함수 (5-1 노트북의 train_dqn 기반) # ───────────────────────────────────────────── def train_dqn(num_episodes, progress=gr.Progress()): """DQN을 처음부터 학습하고, 학습 과정과 결과 영상을 반환""" num_episodes = int(num_episodes) max_steps = 400 env = CarRacingWrapper(render_mode="rgb_array") agent = DQNAgent(eps_decay=0.99) episode_rewards = [] episode_losses = [] epsilons = [] start_time = time.time() for episode in range(num_episodes): progress((episode + 1) / num_episodes, desc=f"학습 중: 에피소드 {episode+1}/{num_episodes}") state = env.reset() ep_reward = 0 ep_losses = [] # [추가] 연속 감점(음수 보상)을 세는 카운터 negative_reward_count = 0 for step in range(max_steps): action = agent.select_action(state) next_state, reward, terminated, truncated, info = env.step(action) done = terminated or truncated # [추가] 트랙 이탈 방지 및 강제 종료 로직 if reward < 0: negative_reward_count += 1 else: negative_reward_count = 0 # 양수 보상을 받으면 카운터 초기화 # 50 프레임(약 1.5초~2초) 연속으로 감점만 받았다면 길을 잃은 것으로 간주 if negative_reward_count >= 50: done = True # 에피소드 강제 종료 reward -= 20.0 # 트랙을 이탈한 것에 대한 강력한 페널티 부여 agent.buffer.push(state, action, reward, next_state, done) loss = agent.update() if loss is not None: ep_losses.append(loss) state = next_state ep_reward += reward if done: break agent.decay_epsilon() episode_rewards.append(ep_reward) episode_losses.append(np.mean(ep_losses) if ep_losses else 0) epsilons.append(agent.epsilon) total_time = time.time() - start_time env.close() # 학습된 모델을 eval 모드로 전환 agent.main_net.eval() # 학습 완료 후 결과 영상 녹화 random_path, random_reward = record_episode(agent.main_net, use_model=False, max_steps=500) trained_path, trained_reward = record_episode(agent.main_net, use_model=True, max_steps=500) # 학습 로그 생성 log_lines = [] for i in range(len(episode_rewards)): if (i + 1) % 10 == 0 or i == 0 or i == len(episode_rewards) - 1: avg_r = np.mean(episode_rewards[max(0, i-9):i+1]) log_lines.append( f"에피소드 {i+1:>4}/{num_episodes} | " f"보상: {episode_rewards[i]:>7.1f} | " f"최근10 평균: {avg_r:>7.1f} | " f"ε: {epsilons[i]:.3f}" ) result_summary = ( f"=== 학습 완료 ({num_episodes} 에피소드, {total_time/60:.1f}분 소요) ===\n" f"최종 평균 보상 (마지막 10): {np.mean(episode_rewards[-10:]):.1f}\n" f"최고 보상: {np.max(episode_rewards):.1f}\n" f"최저 보상: {np.min(episode_rewards):.1f}\n" f"\n--- 데모 결과 ---\n" f"🎲 랜덤: {random_reward:.1f} vs 🧠 학습된 DQN: {trained_reward:.1f} " f"({'DQN 승리! 🏆' if trained_reward > random_reward else '랜덤이 이김 😅' if random_reward > trained_reward else '무승부'})" ) log_text = "\n".join(log_lines) return random_path, trained_path, result_summary, log_text # ───────────────────────────────────────────── # 6. 사전학습 모델 데모 핸들러 # ───────────────────────────────────────────── def run_pretrained_demo(): if not MODEL_LOADED: return None, None, "⚠️ 사전학습 모델 파일(dqn_carracing_300ep.pth)이 없습니다." random_path, random_reward = record_episode(pretrained_model, use_model=False, max_steps=400) trained_path, trained_reward = record_episode(pretrained_model, use_model=True, max_steps=400) info = ( f"🎲 랜덤: {random_reward:.1f} vs 🧠 사전학습 DQN: {trained_reward:.1f} " f"({'DQN 승리! 🏆' if trained_reward > random_reward else '랜덤이 이김 😅' if random_reward > trained_reward else '무승부'})" ) return random_path, trained_path, info # ───────────────────────────────────────────── # 7. Gradio UI # ───────────────────────────────────────────── with gr.Blocks( title="🏎️ CarRacing: Random vs DQN", theme=gr.themes.Soft(), ) as demo: gr.Markdown( """ # 🏎️ CarRacing-v3 : 랜덤 vs DQN 에이전트 강화학습(DQN)으로 학습한 자동차 에이전트와 랜덤 에이전트를 비교합니다. """ ) # ── 탭 1: 사전학습 모델 데모 ── with gr.Tab("📦 사전학습 모델 데모"): gr.Markdown( "### 미리 학습된 모델(dqn_carracing_300ep.pth)로 바로 비교\n" "이전에 학습하여 저장한 모델을 불러와 랜덤 에이전트와 비교합니다." ) btn_pretrained = gr.Button("🏁 사전학습 모델 실행", variant="primary", size="lg") with gr.Row(): vid_pre_r = gr.Video(label="🎲 랜덤 에이전트") vid_pre_t = gr.Video(label="🧠 사전학습 DQN 에이전트") txt_pre = gr.Textbox(label="비교 결과", interactive=False) btn_pretrained.click( fn=run_pretrained_demo, outputs=[vid_pre_r, vid_pre_t, txt_pre], ) # ── 탭 2: 직접 학습시키기 ── with gr.Tab("🎓 직접 학습시키기"): gr.Markdown( "### DQN을 처음부터 학습시키고 결과를 확인\n" "에피소드 수를 선택하면 해당 횟수만큼 **실제로 학습**한 후 랜덤 에이전트와 비교합니다.\n" "에피소드가 많을수록 성능이 좋아지지만 학습 시간도 늘어납니다.\n\n" "⏱️ **예상 소요 시간** (CPU 기준): 50 에피소드 ~5분 / 100 에피소드 ~10분 / 300 에피소드 ~30분" ) num_episodes = gr.Slider( 10, 500, value=50, step=10, label="학습 에피소드 수", info="DQN 학습에 사용할 에피소드 수 (많을수록 성능 향상, 시간 증가)" ) btn_train = gr.Button("🚀 학습 시작", variant="primary", size="lg") with gr.Row(): vid_train_r = gr.Video(label="🎲 랜덤 에이전트") vid_train_t = gr.Video(label="🧠 학습된 DQN 에이전트") txt_train_result = gr.Textbox(label="학습 결과 요약", interactive=False) txt_train_log = gr.Textbox(label="학습 로그 (10 에피소드마다)", interactive=False, lines=10, max_lines=20) btn_train.click( fn=train_dqn, inputs=[num_episodes], outputs=[vid_train_r, vid_train_t, txt_train_result, txt_train_log], ) # ── 하단 정보 ── gr.Markdown( """ --- **사용 방법** 1. **사전학습 모델 데모**: 미리 학습된 모델(dqn_carracing_300ep.pth)로 바로 결과를 확인합니다. 2. **직접 학습시키기**: 에피소드 수를 선택하고 DQN을 처음부터 학습시킵니다. - 에피소드 수가 많을수록 더 잘 학습됩니다. - 학습 완료 후 랜덤 에이전트와 비교 영상을 자동으로 생성합니다. **모델 파일**: `dqn_carracing_300ep.pth` (이전 학습 노트북에서 저장한 파일)를 이 Space에 함께 업로드하세요. """ ) # ───────────────────────────────────────────── # 8. 실행 # ───────────────────────────────────────────── if __name__ == "__main__": demo.launch()