import gradio as gr import numpy as np import torch import torch.nn as nn import os import json from pathlib import Path import random from typing import List, Tuple, Dict, Optional from datetime import datetime import threading import time # ============================================================================= # 1. WATER SORT ENVIRONMENT # ============================================================================= class WaterSortEnv: def __init__(self, num_colors=6, bottle_height=4, num_bottles=8): self.num_colors = num_colors self.bottle_height = bottle_height self.num_bottles = num_bottles self.bottles = np.zeros((num_bottles, bottle_height), dtype=int) self.move_history = [] self.state_history = [] # Lưu lịch sử các state self.game_started = False self.game_finished = False def reset(self) -> np.ndarray: """Reset game to solvable initial state""" colors = list(range(1, self.num_colors + 1)) * self.bottle_height random.shuffle(colors) self.bottles = np.zeros((self.num_bottles, self.bottle_height), dtype=int) color_idx = 0 for i in range(self.num_bottles - 2): for j in range(self.bottle_height): if color_idx < len(colors): self.bottles[i, self.bottle_height - 1 - j] = colors[color_idx] color_idx += 1 self.move_history = [] self.state_history = [self.bottles.copy()] # Lưu state ban đầu self.game_started = True self.game_finished = False return self.get_state() def get_state(self) -> np.ndarray: return self.bottles.copy() def get_valid_moves(self) -> List[Tuple[int, int]]: """Get all valid moves""" valid_moves = [] for from_idx in range(self.num_bottles): for to_idx in range(self.num_bottles): if from_idx != to_idx and self._is_valid_move(from_idx, to_idx): valid_moves.append((from_idx, to_idx)) return valid_moves def _is_valid_move(self, from_idx: int, to_idx: int) -> bool: """Check if move is valid""" from_bottle = self.bottles[from_idx] to_bottle = self.bottles[to_idx] if np.sum(from_bottle > 0) == 0: return False if np.sum(to_bottle > 0) == self.bottle_height: return False source_top_idx = np.where(from_bottle > 0)[0] if len(source_top_idx) == 0: return False source_top_color = from_bottle[source_top_idx[0]] dest_top_idx = np.where(to_bottle > 0)[0] if len(dest_top_idx) == 0: return True dest_top_color = to_bottle[dest_top_idx[0]] return source_top_color == dest_top_color def step(self, action: Tuple[int, int]): """Execute move""" from_idx, to_idx = action if not self._is_valid_move(from_idx, to_idx): return self.get_state(), -1, False self._pour_liquid(from_idx, to_idx) self.move_history.append((from_idx, to_idx)) self.state_history.append(self.bottles.copy()) # Lưu state sau mỗi nước done = self.is_solved() if done: self.game_finished = True reward = 10.0 if done else 0.1 return self.get_state(), reward, done def undo_last_move(self) -> bool: """Quay lại nước đi trước đó""" if len(self.state_history) <= 1: # Chỉ còn state ban đầu return False # Xóa state hiện tại self.state_history.pop() self.move_history.pop() # Khôi phục state trước đó self.bottles = self.state_history[-1].copy() self.game_finished = False return True def _pour_liquid(self, from_idx: int, to_idx: int): """Pour liquid from one bottle to another""" from_bottle = self.bottles[from_idx] to_bottle = self.bottles[to_idx] source_non_empty = np.where(from_bottle > 0)[0] if len(source_non_empty) == 0: return source_top_idx = source_non_empty[0] source_color = from_bottle[source_top_idx] pour_amount = 1 for i in range(source_top_idx + 1, len(from_bottle)): if from_bottle[i] == source_color: pour_amount += 1 else: break dest_empty = np.where(to_bottle == 0)[0] if len(dest_empty) == 0: return available_space = len(dest_empty) actual_pour = min(pour_amount, available_space) for i in range(actual_pour): from_pos = source_top_idx + i to_pos = dest_empty[-(i+1)] self.bottles[to_idx, to_pos] = source_color self.bottles[from_idx, from_pos] = 0 def is_solved(self) -> bool: """Check if puzzle is solved""" for bottle in self.bottles: unique_colors = np.unique(bottle[bottle > 0]) if len(unique_colors) > 1: return False if len(unique_colors) == 1 and np.sum(bottle > 0) != self.bottle_height and np.sum(bottle > 0) != 0: return False return True # ============================================================================= # 2. NEURAL NETWORK ARCHITECTURE # ============================================================================= class WaterSortNet(nn.Module): def __init__(self, num_bottles=8, bottle_height=4, num_colors=6): super(WaterSortNet, self).__init__() self.num_bottles = num_bottles self.bottle_height = bottle_height self.num_colors = num_colors self.conv1 = nn.Conv2d(num_colors, 128, kernel_size=3, padding=1) self.conv2 = nn.Conv2d(128, 128, kernel_size=3, padding=1) self.conv3 = nn.Conv2d(128, 128, kernel_size=3, padding=1) self.conv4 = nn.Conv2d(128, 128, kernel_size=3, padding=1) self.bn1 = nn.BatchNorm2d(128) self.bn2 = nn.BatchNorm2d(128) self.bn3 = nn.BatchNorm2d(128) self.bn4 = nn.BatchNorm2d(128) self.policy_conv = nn.Conv2d(128, 64, kernel_size=3, padding=1) self.policy_fc1 = nn.Linear(64 * num_bottles * bottle_height, 512) self.policy_fc2 = nn.Linear(512, num_bottles * num_bottles) self.policy_bn = nn.BatchNorm1d(512) self.value_conv = nn.Conv2d(128, 64, kernel_size=3, padding=1) self.value_fc1 = nn.Linear(64 * num_bottles * bottle_height, 512) self.value_fc2 = nn.Linear(512, 256) self.value_fc3 = nn.Linear(256, 1) self.value_bn1 = nn.BatchNorm1d(512) self.value_bn2 = nn.BatchNorm1d(256) self.relu = nn.ReLU() self.dropout = nn.Dropout(0.3) def forward(self, x): batch_size = x.size(0) x = self.relu(self.bn1(self.conv1(x))) x = self.relu(self.bn2(self.conv2(x))) x = self.relu(self.bn3(self.conv3(x))) x = self.relu(self.bn4(self.conv4(x))) policy = self.relu(self.policy_conv(x)) policy = policy.view(batch_size, -1) policy = self.dropout(self.relu(self.policy_bn(self.policy_fc1(policy)))) policy = self.policy_fc2(policy) value = self.relu(self.value_conv(x)) value = value.view(batch_size, -1) value = self.dropout(self.relu(self.value_bn1(self.value_fc1(value)))) value = self.dropout(self.relu(self.value_bn2(self.value_fc2(value)))) value = torch.tanh(self.value_fc3(value)) return policy, value # ============================================================================= # 3. DATA PROCESSOR # ============================================================================= class DataProcessor: def __init__(self, num_bottles=8, bottle_height=4, num_colors=6): self.num_bottles = num_bottles self.bottle_height = bottle_height self.num_colors = num_colors def state_to_tensor(self, state): """Chuyển state thành one-hot encoded tensor""" one_hot = np.zeros((self.num_colors, self.num_bottles, self.bottle_height), dtype=np.float32) for bottle_idx in range(self.num_bottles): for height_idx in range(self.bottle_height): color = int(state[bottle_idx, height_idx]) if color > 0: one_hot[color - 1, bottle_idx, height_idx] = 1.0 return torch.from_numpy(one_hot) # ============================================================================= # 4. GAME STATE MANAGER # ============================================================================= class GameStateManager: def __init__(self): self.env = WaterSortEnv() self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model = None self.processor = DataProcessor() self.current_model_name = None self.ai_running = False self.ai_thread = None self.ai_delay = 1.0 # Delay giữa các nước (giây) self.selected_bottles = None self.game_stats = { 'moves_count': 0, 'start_time': None, 'ai_suggested_move': None, 'valid_moves': [] } def load_model(self, model_path: str) -> bool: """Load model từ file""" try: self.model = WaterSortNet(num_bottles=8, bottle_height=4, num_colors=6).to(self.device) checkpoint = torch.load(model_path, map_location=self.device) if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint: self.model.load_state_dict(checkpoint['model_state_dict']) else: self.model.load_state_dict(checkpoint) self.model.eval() self.current_model_name = Path(model_path).stem return True except Exception as e: print(f"Error loading model: {e}") return False def start_game(self): """Bắt đầu game mới""" self.env.reset() self.game_stats = { 'moves_count': 0, 'start_time': datetime.now(), 'ai_suggested_move': None, 'valid_moves': self.env.get_valid_moves() } self.selected_bottles = None self.ai_running = False return self.env.get_state() def reset_game(self): """Reset game""" self.ai_running = False self.env.game_finished = False return self.start_game() def undo_move(self) -> Tuple[bool, str]: """Quay lại nước đi trước""" if self.ai_running: return False, "❌ Không thể undo khi AI đang chơi!" success = self.env.undo_last_move() if success: self.game_stats['moves_count'] = len(self.env.move_history) self.game_stats['valid_moves'] = self.env.get_valid_moves() return True, f"✅ Đã quay lại! Tổng bước: {self.game_stats['moves_count']}" else: return False, "❌ Không thể quay lại thêm!" def get_next_move_suggestion(self) -> Optional[Tuple[int, int]]: """Lấy gợi ý từ AI""" if self.model is None: return None try: state = self.env.get_state() valid_moves = self.env.get_valid_moves() if not valid_moves: return None state_tensor = self.processor.state_to_tensor(state).unsqueeze(0).to(self.device) with torch.no_grad(): policy, _ = self.model(state_tensor) policy_probs = torch.softmax(policy, dim=1).cpu().numpy()[0] best_move = None best_score = -float('inf') for move in valid_moves: from_idx, to_idx = move move_index = from_idx * 8 + to_idx score = policy_probs[move_index] if score > best_score: best_score = score best_move = move self.game_stats['ai_suggested_move'] = best_move return best_move except Exception as e: print(f"Error getting suggestion: {e}") return None def make_move(self, from_bottle: int, to_bottle: int) -> Tuple[bool, str]: """Thực hiện di chuyển""" state, reward, done = self.env.step((from_bottle, to_bottle)) if reward < 0: return False, "Nước không thể đổ vào chai này!" self.game_stats['moves_count'] += 1 self.game_stats['valid_moves'] = self.env.get_valid_moves() self.selected_bottles = None if done: return True, f"Chúc mừng! Bạn đã giải xong trong {self.game_stats['moves_count']} bước!" return True, f"Bước thành công! Tổng bước: {self.game_stats['moves_count']}" def select_bottle(self, bottle_idx: int) -> str: """Chọn chai""" if self.selected_bottles is None: self.selected_bottles = bottle_idx return f"Đã chọn chai {bottle_idx}. Chọn chai đích." else: from_bottle = self.selected_bottles to_bottle = bottle_idx success, message = self.make_move(from_bottle, to_bottle) return message def start_ai_autoplay(self): """Bắt đầu AI tự động chơi""" if self.ai_running: return False, "AI đã đang chạy!" if self.model is None: return False, "Vui lòng tải model trước!" if self.env.game_finished: return False, "Game đã kết thúc!" self.ai_running = True return True, "✅ AI bắt đầu tự động chơi!" def stop_ai_autoplay(self): """Dừng AI tự động chơi""" self.ai_running = False return "⏸️ AI đã dừng!" def set_ai_speed(self, delay: float): """Đặt tốc độ AI (delay giữa các nước)""" self.ai_delay = delay # ============================================================================= # 5. VISUALIZATION # ============================================================================= def draw_game_board(state: np.ndarray, selected_bottle: Optional[int] = None, last_move: Optional[Tuple[int, int]] = None) -> str: """Vẽ bảng game dưới dạng HTML""" colors_map = { 0: '#ffffff', 1: '#FF6B6B', 2: '#4ECDC4', 3: '#45B7D1', 4: '#FFA07A', 5: '#98D8C8', 6: '#F7DC6F' } html = '
Chai {bottle_idx}
' html += '📊 Số bước
{stats['moves_count']}
🤖 Model
{model_info}
💾 Thiết bị
{'GPU' if torch.cuda.is_available() else 'CPU'}
📜 Lịch sử
{history_count} nước đi
🎮 Trạng thái AI
{ai_status}
Nước hợp lệ: {len(stats['valid_moves'])} nước
Chưa có nước đi nào
" html = "| Bước | " html += "Từ chai | " html += "Đến chai | " html += "
|---|---|---|
| {i} | " html += f"{from_idx} | " html += f"{to_idx} | " html += "