| from typing import List, Tuple, Optional, Dict, Any |
| from dataclasses import dataclass |
| import random |
|
|
|
|
| |
| class PriorityLevels: |
| HELPING_HAND = 5 |
| PROTECT = 4 |
| FAKE_OUT = 3 |
| EXTREME_SPEED = 2 |
| QUICK_ATTACK = 1 |
| NORMAL = 0 |
| VITAL_THROW = -1 |
| BEAK_BLAST = -3 |
| AVALANCHE = -4 |
| ROAR = -6 |
| |
| MIN_PRIORITY = -6 |
| MAX_PRIORITY = 5 |
| |
| @classmethod |
| def validate_priority(cls, priority: int) -> int: |
| return max(cls.MIN_PRIORITY, min(cls.MAX_PRIORITY, priority)) |
|
|
|
|
| @dataclass |
| class BattleAction: |
| pokemon: Any |
| move: Any |
| target: Any |
| effective_priority: float |
| is_priority_counter: bool = False |
| counter_target_move: Optional[Any] = None |
| |
| def __post_init__(self): |
| pass |
| |
| def to_dict(self) -> Dict[str, Any]: |
| return { |
| 'pokemon_name': getattr(self.pokemon, 'name', 'Unknown'), |
| 'move_name': getattr(self.move, 'name', 'Unknown'), |
| 'target_name': getattr(self.target, 'name', 'Unknown'), |
| 'effective_priority': self.effective_priority, |
| 'is_priority_counter': self.is_priority_counter, |
| 'counter_target_move_name': getattr(self.counter_target_move, 'name', None) if self.counter_target_move else None |
| } |
| |
| def can_execute(self) -> bool: |
| """Check if this action can be executed (Pokemon not fainted, move has PP, etc.)""" |
| |
| if hasattr(self.pokemon, 'is_fainted') and self.pokemon.is_fainted(): |
| return False |
| |
| |
| if hasattr(self.pokemon, 'current_hp') and self.pokemon.current_hp <= 0: |
| return False |
| |
| |
| if hasattr(self.move, 'pp') and self.move.pp <= 0: |
| return False |
| |
| |
| if self.effective_priority == -999: |
| return False |
| |
| return True |
| |
| def get_speed_for_tiebreaker(self) -> int: |
| return getattr(self.pokemon, 'speed', 0) |
|
|
|
|
| class PriorityResolver: |
| |
| def __init__(self): |
| self.debug_enabled = True |
| self.sucker_punch_handler = SuckerPunchHandler() |
| |
| def resolve_turn_order(self, player_action: BattleAction, opponent_action: BattleAction) -> List[BattleAction]: |
| if self.debug_enabled: |
| print(f"DEBUG: Resolving turn order - Player: {player_action.move.name} (priority {player_action.effective_priority}), " |
| f"Opponent: {opponent_action.move.name} (priority {opponent_action.effective_priority})") |
| |
| |
| actions_with_counters = self.check_priority_counters([player_action, opponent_action]) |
| |
| |
| sorted_actions = sorted(actions_with_counters, key=self._get_sort_key, reverse=True) |
| |
| if self.debug_enabled: |
| for i, action in enumerate(sorted_actions): |
| print(f"DEBUG: Turn order {i+1}: {action.pokemon.name}'s {action.move.name} " |
| f"(priority: {action.effective_priority}, speed: {action.pokemon.speed})") |
| |
| return sorted_actions |
| |
| def check_priority_counters(self, actions: List[BattleAction]) -> List[BattleAction]: |
| if len(actions) != 2: |
| return actions |
| |
| action1, action2 = actions |
| |
| |
| action1_is_counter = self._is_priority_counter_move(action1.move) |
| action2_is_counter = self._is_priority_counter_move(action2.move) |
| |
| if not action1_is_counter and not action2_is_counter: |
| return actions |
| |
| |
| updated_actions = [] |
| |
| for action in actions: |
| other_action = action2 if action == action1 else action1 |
| |
| if self._is_priority_counter_move(action.move): |
| |
| if self._can_priority_counter_succeed(action.move, other_action.move): |
| |
| action.is_priority_counter = True |
| action.counter_target_move = other_action.move |
| action.effective_priority = self._get_counter_priority(action.move) |
| |
| if self.debug_enabled: |
| print(f"DEBUG: {action.pokemon.name}'s {action.move.name} priority counter succeeded! " |
| f"New priority: {action.effective_priority}") |
| else: |
| |
| action.effective_priority = -999 |
| |
| if self.debug_enabled: |
| print(f"DEBUG: {action.pokemon.name}'s {action.move.name} priority counter failed!") |
| |
| updated_actions.append(action) |
| |
| return updated_actions |
| |
| def calculate_effective_priority(self, pokemon: Any, move: Any) -> int: |
| base_priority = getattr(move, 'priority', 0) |
| |
| |
| effective_priority = base_priority |
| |
| |
| if hasattr(pokemon, 'ability') and hasattr(pokemon.ability, 'get_priority_modification'): |
| |
| |
| effective_priority += pokemon.ability.get_priority_modification() |
| |
| |
| clamped = PriorityLevels.validate_priority(int(effective_priority)) |
| |
| effective_priority = clamped + (effective_priority - int(effective_priority)) |
| |
| if self.debug_enabled: |
| print(f"DEBUG: {pokemon.name}'s {move.name} effective priority: {effective_priority}") |
| |
| return effective_priority |
| |
| def _get_sort_key(self, action: BattleAction) -> Tuple[int, int, float]: |
| return ( |
| action.effective_priority, |
| getattr(action.pokemon, 'speed', 0), |
| random.random() |
| ) |
| |
| def _is_priority_counter_move(self, move: Any) -> bool: |
| |
| return self.sucker_punch_handler.is_sucker_punch(move) |
| |
| def _can_priority_counter_succeed(self, counter_move: Any, target_move: Any) -> bool: |
| if self.sucker_punch_handler.is_sucker_punch(counter_move): |
| return self.sucker_punch_handler.check_success_condition(target_move) |
| |
| |
| return False |
| |
|
|
| |
| def _get_counter_priority(self, counter_move: Any) -> int: |
| if self.sucker_punch_handler.is_sucker_punch(counter_move): |
| return self.sucker_punch_handler.priority_when_successful |
| |
| |
| return PriorityLevels.NORMAL |
| |
| def get_priority_counter_failure_message(self, counter_move: Any) -> str: |
| if self.sucker_punch_handler.is_sucker_punch(counter_move): |
| return self.sucker_punch_handler.get_failure_message() |
| |
| return "The move failed!" |
| |
| def get_priority_counter_success_message(self, counter_move: Any, attacker_name: str, target_name: str, target_move_name: str) -> str: |
| if self.sucker_punch_handler.is_sucker_punch(counter_move): |
| return self.sucker_punch_handler.get_success_message(attacker_name, target_name, target_move_name) |
| |
| return f"{attacker_name} intercepted {target_name}'s {target_move_name}!" |
| |
| def set_debug_mode(self, enabled: bool): |
| """Enable or disable debug logging""" |
| self.debug_enabled = enabled |
|
|
|
|
| class PriorityCounterConditions: |
| SUCKER_PUNCH = { |
| 'name': 'sucker_punch', |
| 'counters': ['physical', 'special'], |
| 'fails_against': ['status'], |
| 'priority_when_successful': PriorityLevels.QUICK_ATTACK, |
| 'failure_message': "But it failed!" |
| } |
| |
| @classmethod |
| def get_counter_config(cls, move_name: str) -> Optional[Dict[str, Any]]: |
| move_name_normalized = move_name.lower().replace(' ', '_').replace('-', '_') |
| |
| |
| for attr_name in dir(cls): |
| if not attr_name.startswith('_') and attr_name.isupper(): |
| config = getattr(cls, attr_name) |
| if isinstance(config, dict) and config.get('name') == move_name_normalized: |
| return config |
| |
| return None |
|
|
|
|
| class SuckerPunchHandler: |
| |
| def __init__(self): |
| """Initialize the Sucker Punch handler""" |
| self.move_name = "sucker punch" |
| self.priority_when_successful = PriorityLevels.QUICK_ATTACK |
| self.failure_message = "But it failed!" |
| self.success_message_template = "{attacker} intercepted {target}'s {target_move}!" |
| |
| def check_success_condition(self, target_move: Any) -> bool: |
| if not target_move: |
| return False |
| |
| |
| target_category = getattr(target_move, 'category', '').lower() |
| |
| |
| return target_category in ['physical', 'special'] |
| |
| def check_failure_condition(self, target_move: Any) -> bool: |
| if not target_move: |
| return True |
| |
| |
| target_category = getattr(target_move, 'category', '').lower() |
| |
| |
| return target_category == 'status' |
| |
| def get_success_message(self, attacker_name: str, target_name: str, target_move_name: str) -> str: |
| |
| attacker_name = attacker_name.capitalize() |
| target_name = target_name.capitalize() |
| |
| if target_move_name.lower() in ['quick-attack', 'aqua-jet', 'bullet-punch', 'mach-punch']: |
| return f"{attacker_name} anticipated {target_name}'s priority move and struck first with Sucker Punch!" |
| elif target_move_name.lower() in ['extreme-speed']: |
| return f"{attacker_name} intercepted {target_name}'s Extreme Speed with a perfectly timed Sucker Punch!" |
| else: |
| return f"{attacker_name} read {target_name}'s attack and countered with Sucker Punch!" |
| |
| def get_failure_message(self) -> str: |
| return self.failure_message |
| |
| def get_effective_priority(self, target_move: Any) -> int: |
| if self.check_success_condition(target_move): |
| return self.priority_when_successful |
| else: |
| |
| return -999 |
| |
| def validate_target_move_category(self, target_move: Any) -> Tuple[bool, str]: |
| if not target_move: |
| return False, self.get_failure_message() |
| |
| if self.check_success_condition(target_move): |
| |
| return True, "" |
| else: |
| return False, self.get_failure_message() |
| |
| def is_sucker_punch(self, move: Any) -> bool: |
| if not move: |
| return False |
| |
| move_name = getattr(move, 'name', '').lower() |
| return move_name == self.move_name |
|
|
|
|
| class ActionQueue: |
| |
| def __init__(self, priority_resolver: PriorityResolver): |
| """ |
| Initialize the action queue. |
| |
| Args: |
| priority_resolver: The priority resolver to use for action processing |
| """ |
| self.priority_resolver = priority_resolver |
| self.actions: List[BattleAction] = [] |
| self.executed_actions: List[BattleAction] = [] |
| self.debug_enabled = True |
| |
| def add_action(self, pokemon: Any, move: Any, target: Any) -> BattleAction: |
| """ |
| Add a battle action to the queue. |
| |
| Args: |
| pokemon: The Pokemon using the move |
| move: The move being used |
| target: The target Pokemon |
| |
| Returns: |
| BattleAction: The created battle action |
| """ |
| action = create_battle_action(pokemon, move, target, self.priority_resolver) |
| self.actions.append(action) |
| |
| if self.debug_enabled: |
| print(f"DEBUG: Added action - {pokemon.name}'s {move.name} (priority: {action.effective_priority})") |
| |
| return action |
| |
| def add_actions_from_moves(self, move_pairs: List[Tuple[Any, Any, Any]]) -> List[BattleAction]: |
| """ |
| Add multiple battle actions from a list of (pokemon, move, target) tuples. |
| |
| Args: |
| move_pairs: List of (pokemon, move, target) tuples |
| |
| Returns: |
| List[BattleAction]: The created battle actions |
| """ |
| created_actions = [] |
| for pokemon, move, target in move_pairs: |
| action = self.add_action(pokemon, move, target) |
| created_actions.append(action) |
| |
| return created_actions |
| |
| def sort_by_priority(self) -> List[BattleAction]: |
| """ |
| Sort actions by priority and speed, applying priority counter logic. |
| |
| Returns: |
| List[BattleAction]: Actions sorted by execution order (first to last) |
| """ |
| if len(self.actions) == 0: |
| return [] |
| |
| |
| if len(self.actions) == 2: |
| processed_actions = self.priority_resolver.check_priority_counters(self.actions.copy()) |
| else: |
| processed_actions = self.actions.copy() |
| |
| |
| sorted_actions = sorted( |
| processed_actions, |
| key=lambda action: ( |
| action.effective_priority, |
| action.get_speed_for_tiebreaker(), |
| random.random() |
| ), |
| reverse=True |
| ) |
| |
| if self.debug_enabled: |
| print("DEBUG: Action queue sorted by priority:") |
| for i, action in enumerate(sorted_actions): |
| print(f" {i+1}. {action.pokemon.name}'s {action.move.name} " |
| f"(priority: {action.effective_priority}, speed: {action.get_speed_for_tiebreaker()})") |
| |
| return sorted_actions |
| |
| def get_executable_actions(self) -> List[BattleAction]: |
| """ |
| Get actions that can be executed (Pokemon not fainted, move has PP, etc.). |
| |
| Returns: |
| List[BattleAction]: Actions that can be executed |
| """ |
| executable = [] |
| for action in self.actions: |
| if action.can_execute(): |
| executable.append(action) |
| elif self.debug_enabled: |
| print(f"DEBUG: Action {action.pokemon.name}'s {action.move.name} cannot be executed") |
| |
| return executable |
| |
| def execute_next_action(self) -> Optional[BattleAction]: |
| """ |
| Execute the next action in priority order. |
| |
| Returns: |
| Optional[BattleAction]: The executed action, or None if no actions available |
| """ |
| if not self.actions: |
| return None |
| |
| |
| sorted_actions = self.sort_by_priority() |
| |
| |
| for action in sorted_actions: |
| if action.can_execute() and action not in self.executed_actions: |
| self.executed_actions.append(action) |
| |
| if self.debug_enabled: |
| print(f"DEBUG: Executing action - {action.pokemon.name}'s {action.move.name}") |
| |
| return action |
| |
| return None |
| |
| def execute_all_actions(self) -> List[BattleAction]: |
| """ |
| Execute all actions in priority order. |
| |
| Returns: |
| List[BattleAction]: List of executed actions in execution order |
| """ |
| executed = [] |
| |
| while True: |
| action = self.execute_next_action() |
| if action is None: |
| break |
| executed.append(action) |
| |
| return executed |
| |
| def clear(self): |
| """Clear all actions from the queue""" |
| self.actions.clear() |
| self.executed_actions.clear() |
| |
| if self.debug_enabled: |
| print("DEBUG: Action queue cleared") |
| |
| def get_action_summary(self) -> Dict[str, Any]: |
| """ |
| Get a summary of the current action queue state. |
| |
| Returns: |
| Dict: Summary information about the queue |
| """ |
| return { |
| 'total_actions': len(self.actions), |
| 'executed_actions': len(self.executed_actions), |
| 'remaining_actions': len(self.actions) - len(self.executed_actions), |
| 'actions': [action.to_dict() for action in self.actions], |
| 'executed': [action.to_dict() for action in self.executed_actions] |
| } |
| |
| def set_debug_mode(self, enabled: bool): |
| """Enable or disable debug logging""" |
| self.debug_enabled = enabled |
|
|
|
|
| def create_battle_action(pokemon: Any, move: Any, target: Any, priority_resolver: PriorityResolver) -> BattleAction: |
| """ |
| Factory function to create a BattleAction with calculated priority. |
| |
| Args: |
| pokemon: The Pokemon using the move |
| move: The move being used |
| target: The target Pokemon |
| priority_resolver: The priority resolver to use for calculations |
| |
| Returns: |
| BattleAction: A battle action with calculated effective priority |
| """ |
| effective_priority = priority_resolver.calculate_effective_priority(pokemon, move) |
| |
| return BattleAction( |
| pokemon=pokemon, |
| move=move, |
| target=target, |
| effective_priority=effective_priority |
| ) |
|
|
|
|
| def create_battle_actions_from_pokemon_moves( |
| pokemon_move_pairs: List[Tuple[Any, str, Any]], |
| priority_resolver: PriorityResolver |
| ) -> List[BattleAction]: |
| """ |
| Create battle actions from a list of Pokemon and move name pairs. |
| |
| Args: |
| pokemon_move_pairs: List of (pokemon, move_name, target) tuples |
| priority_resolver: The priority resolver to use for calculations |
| |
| Returns: |
| List[BattleAction]: List of created battle actions |
| """ |
| actions = [] |
| |
| for pokemon, move_name, target in pokemon_move_pairs: |
| |
| move = None |
| if hasattr(pokemon, 'moves') and isinstance(pokemon.moves, dict): |
| move = pokemon.moves.get(move_name) |
| |
| if move is None: |
| print(f"WARNING: Move '{move_name}' not found for {getattr(pokemon, 'name', 'Unknown')}") |
| continue |
| |
| |
| action = create_battle_action(pokemon, move, target, priority_resolver) |
| actions.append(action) |
| |
| return actions |
|
|
|
|
| def sort_actions_by_priority(actions: List[BattleAction]) -> List[BattleAction]: |
| """ |
| Sort battle actions by priority and speed. |
| |
| Args: |
| actions: List of battle actions to sort |
| |
| Returns: |
| List[BattleAction]: Actions sorted by execution order (first to last) |
| """ |
| return sorted( |
| actions, |
| key=lambda action: ( |
| action.effective_priority, |
| action.get_speed_for_tiebreaker(), |
| random.random() |
| ), |
| reverse=True |
| ) |
|
|
|
|
| def filter_executable_actions(actions: List[BattleAction]) -> List[BattleAction]: |
| """ |
| Filter battle actions to only include those that can be executed. |
| |
| Args: |
| actions: List of battle actions to filter |
| |
| Returns: |
| List[BattleAction]: Actions that can be executed |
| """ |
| return [action for action in actions if action.can_execute()] |
|
|
|
|
| def create_action_queue_for_turn( |
| player_pokemon: Any, |
| player_move_name: str, |
| opponent_pokemon: Any, |
| opponent_move_name: str, |
| priority_resolver: PriorityResolver |
| ) -> ActionQueue: |
| """ |
| Create an action queue for a battle turn with player and opponent moves. |
| |
| Args: |
| player_pokemon: The player's Pokemon |
| player_move_name: Name of the player's selected move |
| opponent_pokemon: The opponent's Pokemon |
| opponent_move_name: Name of the opponent's selected move |
| priority_resolver: The priority resolver to use |
| |
| Returns: |
| ActionQueue: Configured action queue for the turn |
| """ |
| queue = ActionQueue(priority_resolver) |
| |
| |
| player_move = player_pokemon.moves.get(player_move_name) if hasattr(player_pokemon, 'moves') else None |
| if player_move: |
| queue.add_action(player_pokemon, player_move, opponent_pokemon) |
| |
| |
| opponent_move = opponent_pokemon.moves.get(opponent_move_name) if hasattr(opponent_pokemon, 'moves') else None |
| if opponent_move: |
| queue.add_action(opponent_pokemon, opponent_move, player_pokemon) |
| |
| return queue |