import gradio as gr import numpy as np import torch import torch.nn as nn import os import json from pathlib import Path import random from typing import List, Tuple, Dict, Optional from datetime import datetime import threading import time # ============================================================================= # 1. WATER SORT ENVIRONMENT # ============================================================================= class WaterSortEnv: def __init__(self, num_colors=6, bottle_height=4, num_bottles=8): self.num_colors = num_colors self.bottle_height = bottle_height self.num_bottles = num_bottles self.bottles = np.zeros((num_bottles, bottle_height), dtype=int) self.move_history = [] self.state_history = [] # Lưu lịch sử các state self.game_started = False self.game_finished = False def reset(self) -> np.ndarray: """Reset game to solvable initial state""" colors = list(range(1, self.num_colors + 1)) * self.bottle_height random.shuffle(colors) self.bottles = np.zeros((self.num_bottles, self.bottle_height), dtype=int) color_idx = 0 for i in range(self.num_bottles - 2): for j in range(self.bottle_height): if color_idx < len(colors): self.bottles[i, self.bottle_height - 1 - j] = colors[color_idx] color_idx += 1 self.move_history = [] self.state_history = [self.bottles.copy()] # Lưu state ban đầu self.game_started = True self.game_finished = False return self.get_state() def get_state(self) -> np.ndarray: return self.bottles.copy() def get_valid_moves(self) -> List[Tuple[int, int]]: """Get all valid moves""" valid_moves = [] for from_idx in range(self.num_bottles): for to_idx in range(self.num_bottles): if from_idx != to_idx and self._is_valid_move(from_idx, to_idx): valid_moves.append((from_idx, to_idx)) return valid_moves def _is_valid_move(self, from_idx: int, to_idx: int) -> bool: """Check if move is valid""" from_bottle = self.bottles[from_idx] to_bottle = self.bottles[to_idx] if np.sum(from_bottle > 0) == 0: return False if np.sum(to_bottle > 0) == self.bottle_height: return False source_top_idx = np.where(from_bottle > 0)[0] if len(source_top_idx) == 0: return False source_top_color = from_bottle[source_top_idx[0]] dest_top_idx = np.where(to_bottle > 0)[0] if len(dest_top_idx) == 0: return True dest_top_color = to_bottle[dest_top_idx[0]] return source_top_color == dest_top_color def step(self, action: Tuple[int, int]): """Execute move""" from_idx, to_idx = action if not self._is_valid_move(from_idx, to_idx): return self.get_state(), -1, False self._pour_liquid(from_idx, to_idx) self.move_history.append((from_idx, to_idx)) self.state_history.append(self.bottles.copy()) # Lưu state sau mỗi nước done = self.is_solved() if done: self.game_finished = True reward = 10.0 if done else 0.1 return self.get_state(), reward, done def undo_last_move(self) -> bool: """Quay lại nước đi trước đó""" if len(self.state_history) <= 1: # Chỉ còn state ban đầu return False # Xóa state hiện tại self.state_history.pop() self.move_history.pop() # Khôi phục state trước đó self.bottles = self.state_history[-1].copy() self.game_finished = False return True def _pour_liquid(self, from_idx: int, to_idx: int): """Pour liquid from one bottle to another""" from_bottle = self.bottles[from_idx] to_bottle = self.bottles[to_idx] source_non_empty = np.where(from_bottle > 0)[0] if len(source_non_empty) == 0: return source_top_idx = source_non_empty[0] source_color = from_bottle[source_top_idx] pour_amount = 1 for i in range(source_top_idx + 1, len(from_bottle)): if from_bottle[i] == source_color: pour_amount += 1 else: break dest_empty = np.where(to_bottle == 0)[0] if len(dest_empty) == 0: return available_space = len(dest_empty) actual_pour = min(pour_amount, available_space) for i in range(actual_pour): from_pos = source_top_idx + i to_pos = dest_empty[-(i+1)] self.bottles[to_idx, to_pos] = source_color self.bottles[from_idx, from_pos] = 0 def is_solved(self) -> bool: """Check if puzzle is solved""" for bottle in self.bottles: unique_colors = np.unique(bottle[bottle > 0]) if len(unique_colors) > 1: return False if len(unique_colors) == 1 and np.sum(bottle > 0) != self.bottle_height and np.sum(bottle > 0) != 0: return False return True # ============================================================================= # 2. NEURAL NETWORK ARCHITECTURE # ============================================================================= class WaterSortNet(nn.Module): def __init__(self, num_bottles=8, bottle_height=4, num_colors=6): super(WaterSortNet, self).__init__() self.num_bottles = num_bottles self.bottle_height = bottle_height self.num_colors = num_colors self.conv1 = nn.Conv2d(num_colors, 128, kernel_size=3, padding=1) self.conv2 = nn.Conv2d(128, 128, kernel_size=3, padding=1) self.conv3 = nn.Conv2d(128, 128, kernel_size=3, padding=1) self.conv4 = nn.Conv2d(128, 128, kernel_size=3, padding=1) self.bn1 = nn.BatchNorm2d(128) self.bn2 = nn.BatchNorm2d(128) self.bn3 = nn.BatchNorm2d(128) self.bn4 = nn.BatchNorm2d(128) self.policy_conv = nn.Conv2d(128, 64, kernel_size=3, padding=1) self.policy_fc1 = nn.Linear(64 * num_bottles * bottle_height, 512) self.policy_fc2 = nn.Linear(512, num_bottles * num_bottles) self.policy_bn = nn.BatchNorm1d(512) self.value_conv = nn.Conv2d(128, 64, kernel_size=3, padding=1) self.value_fc1 = nn.Linear(64 * num_bottles * bottle_height, 512) self.value_fc2 = nn.Linear(512, 256) self.value_fc3 = nn.Linear(256, 1) self.value_bn1 = nn.BatchNorm1d(512) self.value_bn2 = nn.BatchNorm1d(256) self.relu = nn.ReLU() self.dropout = nn.Dropout(0.3) def forward(self, x): batch_size = x.size(0) x = self.relu(self.bn1(self.conv1(x))) x = self.relu(self.bn2(self.conv2(x))) x = self.relu(self.bn3(self.conv3(x))) x = self.relu(self.bn4(self.conv4(x))) policy = self.relu(self.policy_conv(x)) policy = policy.view(batch_size, -1) policy = self.dropout(self.relu(self.policy_bn(self.policy_fc1(policy)))) policy = self.policy_fc2(policy) value = self.relu(self.value_conv(x)) value = value.view(batch_size, -1) value = self.dropout(self.relu(self.value_bn1(self.value_fc1(value)))) value = self.dropout(self.relu(self.value_bn2(self.value_fc2(value)))) value = torch.tanh(self.value_fc3(value)) return policy, value # ============================================================================= # 3. DATA PROCESSOR # ============================================================================= class DataProcessor: def __init__(self, num_bottles=8, bottle_height=4, num_colors=6): self.num_bottles = num_bottles self.bottle_height = bottle_height self.num_colors = num_colors def state_to_tensor(self, state): """Chuyển state thành one-hot encoded tensor""" one_hot = np.zeros((self.num_colors, self.num_bottles, self.bottle_height), dtype=np.float32) for bottle_idx in range(self.num_bottles): for height_idx in range(self.bottle_height): color = int(state[bottle_idx, height_idx]) if color > 0: one_hot[color - 1, bottle_idx, height_idx] = 1.0 return torch.from_numpy(one_hot) # ============================================================================= # 4. GAME STATE MANAGER # ============================================================================= class GameStateManager: def __init__(self): self.env = WaterSortEnv() self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model = None self.processor = DataProcessor() self.current_model_name = None self.ai_running = False self.ai_thread = None self.ai_delay = 1.0 # Delay giữa các nước (giây) self.selected_bottles = None self.game_stats = { 'moves_count': 0, 'start_time': None, 'ai_suggested_move': None, 'valid_moves': [] } def load_model(self, model_path: str) -> bool: """Load model từ file""" try: self.model = WaterSortNet(num_bottles=8, bottle_height=4, num_colors=6).to(self.device) checkpoint = torch.load(model_path, map_location=self.device) if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint: self.model.load_state_dict(checkpoint['model_state_dict']) else: self.model.load_state_dict(checkpoint) self.model.eval() self.current_model_name = Path(model_path).stem return True except Exception as e: print(f"Error loading model: {e}") return False def start_game(self): """Bắt đầu game mới""" self.env.reset() self.game_stats = { 'moves_count': 0, 'start_time': datetime.now(), 'ai_suggested_move': None, 'valid_moves': self.env.get_valid_moves() } self.selected_bottles = None self.ai_running = False return self.env.get_state() def reset_game(self): """Reset game""" self.ai_running = False self.env.game_finished = False return self.start_game() def undo_move(self) -> Tuple[bool, str]: """Quay lại nước đi trước""" if self.ai_running: return False, "❌ Không thể undo khi AI đang chơi!" success = self.env.undo_last_move() if success: self.game_stats['moves_count'] = len(self.env.move_history) self.game_stats['valid_moves'] = self.env.get_valid_moves() return True, f"✅ Đã quay lại! Tổng bước: {self.game_stats['moves_count']}" else: return False, "❌ Không thể quay lại thêm!" def get_next_move_suggestion(self) -> Optional[Tuple[int, int]]: """Lấy gợi ý từ AI""" if self.model is None: return None try: state = self.env.get_state() valid_moves = self.env.get_valid_moves() if not valid_moves: return None state_tensor = self.processor.state_to_tensor(state).unsqueeze(0).to(self.device) with torch.no_grad(): policy, _ = self.model(state_tensor) policy_probs = torch.softmax(policy, dim=1).cpu().numpy()[0] best_move = None best_score = -float('inf') for move in valid_moves: from_idx, to_idx = move move_index = from_idx * 8 + to_idx score = policy_probs[move_index] if score > best_score: best_score = score best_move = move self.game_stats['ai_suggested_move'] = best_move return best_move except Exception as e: print(f"Error getting suggestion: {e}") return None def make_move(self, from_bottle: int, to_bottle: int) -> Tuple[bool, str]: """Thực hiện di chuyển""" state, reward, done = self.env.step((from_bottle, to_bottle)) if reward < 0: return False, "Nước không thể đổ vào chai này!" self.game_stats['moves_count'] += 1 self.game_stats['valid_moves'] = self.env.get_valid_moves() self.selected_bottles = None if done: return True, f"Chúc mừng! Bạn đã giải xong trong {self.game_stats['moves_count']} bước!" return True, f"Bước thành công! Tổng bước: {self.game_stats['moves_count']}" def select_bottle(self, bottle_idx: int) -> str: """Chọn chai""" if self.selected_bottles is None: self.selected_bottles = bottle_idx return f"Đã chọn chai {bottle_idx}. Chọn chai đích." else: from_bottle = self.selected_bottles to_bottle = bottle_idx success, message = self.make_move(from_bottle, to_bottle) return message def start_ai_autoplay(self): """Bắt đầu AI tự động chơi""" if self.ai_running: return False, "AI đã đang chạy!" if self.model is None: return False, "Vui lòng tải model trước!" if self.env.game_finished: return False, "Game đã kết thúc!" self.ai_running = True return True, "✅ AI bắt đầu tự động chơi!" def stop_ai_autoplay(self): """Dừng AI tự động chơi""" self.ai_running = False return "⏸️ AI đã dừng!" def set_ai_speed(self, delay: float): """Đặt tốc độ AI (delay giữa các nước)""" self.ai_delay = delay # ============================================================================= # 5. VISUALIZATION # ============================================================================= def draw_game_board(state: np.ndarray, selected_bottle: Optional[int] = None, last_move: Optional[Tuple[int, int]] = None) -> str: """Vẽ bảng game dưới dạng HTML""" colors_map = { 0: '#ffffff', 1: '#FF6B6B', 2: '#4ECDC4', 3: '#45B7D1', 4: '#FFA07A', 5: '#98D8C8', 6: '#F7DC6F' } html = '
' num_bottles = state.shape[0] bottle_height = state.shape[1] for bottle_idx in range(num_bottles): bottle = state[bottle_idx] is_selected = bottle_idx == selected_bottle # Highlight nếu là nước vừa chơi is_last_move = False if last_move: is_last_move = bottle_idx in last_move border_color = '#FFD700' if is_selected else ('#00FF00' if is_last_move else '#333') border_width = '3' if (is_selected or is_last_move) else '2' html += f'
' html += f'
' for height_idx in range(bottle_height): color_val = int(bottle[height_idx]) color = colors_map.get(color_val, '#ffffff') html += f'
' html += '
' html += f'

Chai {bottle_idx}

' html += '
' html += '
' return html def get_game_stats_html(game_manager: GameStateManager) -> str: """Tạo HTML hiển thị thống kê game""" stats = game_manager.game_stats model_info = game_manager.current_model_name if game_manager.current_model_name else "Chưa tải" ai_status = "🤖 Đang chơi..." if game_manager.ai_running else "⏸️ Dừng" history_count = len(game_manager.env.move_history) html = f"""

📊 Số bước

{stats['moves_count']}

🤖 Model

{model_info}

💾 Thiết bị

{'GPU' if torch.cuda.is_available() else 'CPU'}

📜 Lịch sử

{history_count} nước đi

🎮 Trạng thái AI

{ai_status}

Nước hợp lệ: {len(stats['valid_moves'])} nước

""" return html def get_move_history_html(game_manager: GameStateManager) -> str: """Hiển thị lịch sử các nước đi""" moves = game_manager.env.move_history if not moves: return "

Chưa có nước đi nào

" html = "
" html += "" html += "" html += "" html += "" html += "" html += "" for i, (from_idx, to_idx) in enumerate(moves, 1): html += f"" html += f"" html += f"" html += f"" html += "" html += "
BướcTừ chaiĐến chai
{i}{from_idx}{to_idx}
" return html # ============================================================================= # 6. MAIN GRADIO APP # ============================================================================= def get_available_models() -> List[str]: """Lấy danh sách model từ folder models""" models_dir = Path("models") models_dir.mkdir(exist_ok=True) model_files = list(models_dir.glob("*.pth")) return [f.name for f in sorted(model_files)] # Initialize global game manager game_manager = GameStateManager() def load_model_ui(selected_model: str) -> Tuple[str, str]: """Load model từ UI""" if not selected_model: return "❌ Vui lòng chọn model", "" model_path = Path("models") / selected_model if game_manager.load_model(str(model_path)): return f"✅ Tải model thành công: {selected_model}", draw_game_board(np.zeros((8, 4))) else: return f"❌ Lỗi khi tải model: {selected_model}", "" def start_game_ui() -> Tuple[str, str, str, str]: """Bắt đầu game mới""" state = game_manager.start_game() board = draw_game_board(state) stats = get_game_stats_html(game_manager) history = get_move_history_html(game_manager) return stats, board, "✅ Bắt đầu game mới!", history def reset_game_ui() -> Tuple[str, str, str, str]: """Reset game""" state = game_manager.reset_game() board = draw_game_board(state) stats = get_game_stats_html(game_manager) history = get_move_history_html(game_manager) return stats, board, "🔄 Game đã được reset!", history def suggest_move_ui() -> Tuple[str, str]: """Gợi ý di chuyển từ AI""" if game_manager.model is None: return "", "❌ Vui lòng tải model trước!" if game_manager.env.game_finished: return "", "🎉 Game đã kết thúc!" move = game_manager.get_next_move_suggestion() if move: from_bottle, to_bottle = move message = f"💡 Gợi ý: Đổ từ chai {from_bottle} sang chai {to_bottle}" return message, message else: return "", "❌ Không thể tìm được nước gợi ý" def bottle_click_ui(bottle_idx: int) -> Tuple[str, str, str, str]: """Xử lý click bottle""" if not game_manager.env.game_started: return "", draw_game_board(game_manager.env.get_state()), "❌ Vui lòng bắt đầu game!", "" if game_manager.env.game_finished: return "", draw_game_board(game_manager.env.get_state()), "🎉 Game đã kết thúc!", "" if game_manager.ai_running: return "", draw_game_board(game_manager.env.get_state()), "❌ AI đang chơi, không thể thao tác!", "" if game_manager.selected_bottles is None: game_manager.selected_bottles = bottle_idx state = game_manager.env.get_state() board = draw_game_board(state, bottle_idx) stats = get_game_stats_html(game_manager) history = get_move_history_html(game_manager) return stats, board, f"✓ Chọn chai {bottle_idx}. Chọn chai đích.", history else: from_bottle = game_manager.selected_bottles to_bottle = bottle_idx success, message = game_manager.make_move(from_bottle, to_bottle) state = game_manager.env.get_state() last_move = (from_bottle, to_bottle) if success else None board = draw_game_board(state, last_move=last_move) stats = get_game_stats_html(game_manager) history = get_move_history_html(game_manager) return stats, board, message, history def undo_move_ui() -> Tuple[str, str, str, str]: """Undo nước đi trước""" success, message = game_manager.undo_move() state = game_manager.env.get_state() board = draw_game_board(state) stats = get_game_stats_html(game_manager) history = get_move_history_html(game_manager) return stats, board, message, history def ai_autoplay_ui() -> Tuple[str, str, str, str]: """Bắt đầu AI tự động chơi""" success, message = game_manager.start_ai_autoplay() state = game_manager.env.get_state() board = draw_game_board(state) stats = get_game_stats_html(game_manager) history = get_move_history_html(game_manager) return stats, board, message, history def stop_ai_ui() -> Tuple[str, str, str, str]: """Dừng AI""" message = game_manager.stop_ai_autoplay() state = game_manager.env.get_state() board = draw_game_board(state) stats = get_game_stats_html(game_manager) history = get_move_history_html(game_manager) return stats, board, message, history def ai_step_loop(): """Vòng lặp AI tự động chơi (chạy trong background)""" while game_manager.ai_running: if game_manager.env.game_finished: game_manager.ai_running = False break # Lấy nước đi từ AI move = game_manager.get_next_move_suggestion() if move is None: game_manager.ai_running = False break from_bottle, to_bottle = move success, message = game_manager.make_move(from_bottle, to_bottle) if not success: game_manager.ai_running = False break # Delay để người xem thấy được time.sleep(game_manager.ai_delay) game_manager.ai_running = False def update_ai_speed_ui(speed_value: float) -> str: """Cập nhật tốc độ AI""" game_manager.set_ai_speed(speed_value) return f"⚡ Tốc độ AI: {speed_value:.1f}s/bước" def get_current_state(): """Lấy trạng thái hiện tại để refresh UI""" state = game_manager.env.get_state() # Highlight nước vừa chơi nếu có last_move = None if len(game_manager.env.move_history) > 0: last_move = game_manager.env.move_history[-1] board = draw_game_board(state, last_move=last_move) stats = get_game_stats_html(game_manager) history = get_move_history_html(game_manager) if game_manager.env.game_finished: message = f"🎉 Hoàn thành trong {game_manager.game_stats['moves_count']} bước!" else: message = f"📊 Bước hiện tại: {game_manager.game_stats['moves_count']}" return stats, board, message, history def create_bottle_buttons(): """Tạo buttons cho các chai""" buttons = [] for i in range(8): buttons.append( gr.Button(f"Chai {i}", size="lg", scale=1) ) return buttons # Create Gradio Interface with gr.Blocks(title="Water Sort Puzzle", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🧪 Water Sort Puzzle Solver") gr.Markdown("Giải Water Sort Puzzle với sự trợ giúp của AI!") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### ⚙️ Cấu hình") model_dropdown = gr.Dropdown( label="Chọn Model", choices=get_available_models(), interactive=True ) load_model_btn = gr.Button("📥 Tải Model", variant="primary", size="lg") model_status = gr.Textbox(label="Trạng thái", interactive=False) gr.Markdown("### 🎮 Điều khiển") start_btn = gr.Button("🎮 Bắt đầu", variant="primary", size="lg") reset_btn = gr.Button("🔄 Reset", size="lg") with gr.Row(): undo_btn = gr.Button("↩️ Undo", size="lg", scale=1) suggest_btn = gr.Button("💡 Gợi ý", size="lg", scale=1) gr.Markdown("### 🤖 AI Auto Play") ai_speed_slider = gr.Slider( minimum=0.1, maximum=3.0, value=1.0, step=0.1, label="Tốc độ AI (giây/bước)", interactive=True ) speed_status = gr.Textbox(label="Trạng thái tốc độ", value="⚡ Tốc độ AI: 1.0s/bước", interactive=False) with gr.Row(): ai_play_btn = gr.Button("▶️ AI Tự Chơi", variant="primary", size="lg", scale=1) ai_stop_btn = gr.Button("⏸️ Dừng AI", variant="stop", size="lg", scale=1) gr.Markdown("### 📊 Thống kê") game_stats = gr.HTML() gr.Markdown("### 📜 Lịch sử nước đi") move_history = gr.HTML() with gr.Column(scale=2): gr.Markdown("### 🎯 Bảng trò chơi") game_board = gr.HTML() gr.Markdown("### Chọn chai để di chuyển") with gr.Row(): bottle_buttons = create_bottle_buttons() message_display = gr.Textbox( label="Thông báo", interactive=False, lines=2 ) suggestion_display = gr.Textbox( label="Gợi ý từ AI", interactive=False ) # Thêm nút refresh để cập nhật UI khi AI đang chạy with gr.Row(): refresh_btn = gr.Button("🔄 Refresh", size="sm") # Event handlers load_model_btn.click( fn=load_model_ui, inputs=[model_dropdown], outputs=[model_status, game_board] ) start_btn.click( fn=start_game_ui, outputs=[game_stats, game_board, message_display, move_history] ) reset_btn.click( fn=reset_game_ui, outputs=[game_stats, game_board, message_display, move_history] ) suggest_btn.click( fn=suggest_move_ui, outputs=[suggestion_display, message_display] ) undo_btn.click( fn=undo_move_ui, outputs=[game_stats, game_board, message_display, move_history] ) ai_speed_slider.change( fn=update_ai_speed_ui, inputs=[ai_speed_slider], outputs=[speed_status] ) ai_play_btn.click( fn=ai_autoplay_ui, outputs=[game_stats, game_board, message_display, move_history] ) ai_stop_btn.click( fn=stop_ai_ui, outputs=[game_stats, game_board, message_display, move_history] ) refresh_btn.click( fn=get_current_state, outputs=[game_stats, game_board, message_display, move_history] ) for i, btn in enumerate(bottle_buttons): btn.click( fn=lambda idx=i: bottle_click_ui(idx), outputs=[game_stats, game_board, message_display, move_history] ) # Timer để tự động refresh khi AI đang chạy demo.load( fn=get_current_state, outputs=[game_stats, game_board, message_display, move_history], every=1 # Refresh mỗi 1 giây ) if __name__ == "__main__": # Khởi chạy AI thread def run_ai_background(): while True: if game_manager.ai_running: ai_step_loop() time.sleep(0.1) ai_thread = threading.Thread(target=run_ai_background, daemon=True) ai_thread.start() demo.launch(share=True)