"""Challenge other bots.""" import random import logging import datetime import contextlib from lib import model from lib.timer import Timer, days, seconds, minutes, years from collections import defaultdict from collections.abc import Sequence from lib.lichess import Lichess, RateLimitedError from lib.config import Configuration from typing import cast, TypeAlias from lib.blocklist import OnlineBlocklist from lib.lichess_types import UserProfileType, PerfType, EventType, FilterType, ChallengeType MULTIPROCESSING_LIST_TYPE: TypeAlias = Sequence[model.Challenge] logger = logging.getLogger(__name__) class Matchmaking: """Challenge other bots.""" def __init__(self, li: Lichess, config: Configuration, user_profile: UserProfileType) -> None: """Initialize values needed for matchmaking.""" self.li = li self.variants = list(filter(lambda variant: variant != "fromPosition", config.challenge.variants)) self.matchmaking_cfg = config.matchmaking self.user_profile = user_profile self.last_challenge_created_delay = Timer(seconds(25)) # Challenges expire after 20 seconds. self.last_game_ended_delay = Timer(minutes(self.matchmaking_cfg.challenge_timeout)) self.last_user_profile_update_time = Timer(minutes(5)) self.min_wait_time = seconds(60) # Wait before new challenge to avoid api rate limits. self.rate_limit_timer = Timer() # Maximum time between challenges, even if there are active games self.max_wait_time = minutes(10) if self.matchmaking_cfg.allow_during_games else years(10) self.challenge_id = "" # (opponent name, game aspect) --> other bot is likely to accept challenge # game aspect is the one the challenged bot objects to and is one of: # - game speed (bullet, blitz, etc.) # - variant (standard, horde, etc.) # - casual/rated # - empty string (if no other reason is given or self.filter_type is COARSE) self.challenge_type_acceptable: defaultdict[tuple[str, str], Timer] = defaultdict(Timer) self.challenge_filter = self.matchmaking_cfg.challenge_filter for name in self.matchmaking_cfg.block_list: self.add_to_block_list(name) self.online_block_list = OnlineBlocklist(self.matchmaking_cfg.online_block_list) def should_create_challenge(self) -> bool: """Whether we should create a challenge.""" matchmaking_enabled = self.matchmaking_cfg.allow_matchmaking rate_limit_ok = self.rate_limit_timer.is_expired() time_has_passed = self.last_game_ended_delay.is_expired() challenge_expired = self.last_challenge_created_delay.is_expired() and self.challenge_id min_wait_time_passed = self.last_challenge_created_delay.time_since_reset() > self.min_wait_time if challenge_expired: self.li.cancel(self.challenge_id) logger.info(f"Challenge id {self.challenge_id} cancelled.") self.discard_challenge(self.challenge_id) self.show_earliest_challenge_time() return bool(matchmaking_enabled and rate_limit_ok and (time_has_passed or challenge_expired) and min_wait_time_passed) def create_challenge(self, username: str, base_time: int, increment: int, days: int, variant: str, mode: str) -> str: """Create a challenge.""" params: dict[str, str | int | bool] = {"rated": mode == "rated", "variant": variant} if days: params["days"] = days elif base_time or increment: params["clock.limit"] = base_time params["clock.increment"] = increment else: logger.error("At least one of challenge_days, challenge_initial_time, or challenge_increment " "must be greater than zero in the matchmaking section of your config file.") return "" try: self.last_challenge_created_delay.reset() response = self.li.challenge(username, params) challenge_id = response.get("id", "") if not challenge_id: self.handle_challenge_error_response(response, username) return challenge_id except RateLimitedError as e: logger.warning(e) self.rate_limit_timer = Timer(e.timeout) except Exception as e: logger.debug(e, exc_info=e) logger.warning("Could not create challenge") self.show_earliest_challenge_time() return "" def handle_challenge_error_response(self, response: ChallengeType, username: str) -> None: """If a challenge fails, print the error and adjust the challenge requirements in response.""" logger.error(response) if response.get("bot_is_rate_limited"): timeout = cast(datetime.timedelta, response.get("rate_limit_timeout")) self.rate_limit_timer = Timer(timeout) elif response.get("opponent_is_rate_limited"): self.add_challenge_filter(username, "", response.get("rate_limit_timeout")) else: self.add_challenge_filter(username, "") self.show_earliest_challenge_time() def perf(self) -> dict[str, PerfType]: """Get the bot's rating in every variant. Bullet, blitz, rapid etc. are considered different variants.""" user_perf: dict[str, PerfType] = self.user_profile["perfs"] return user_perf def username(self) -> str: """Our username.""" username: str = self.user_profile["username"] return username def update_user_profile(self) -> None: """Update our user profile data, to get our latest rating.""" if self.last_user_profile_update_time.is_expired(): self.last_user_profile_update_time.reset() with contextlib.suppress(Exception): self.user_profile = self.li.get_profile() def get_weights(self, online_bots: list[UserProfileType], rating_preference: str, min_rating: int, max_rating: int, game_type: str) -> list[int]: """Get the weight for each bot. A higher weights means the bot is more likely to get challenged.""" def rating(bot: UserProfileType) -> int: perfs: dict[str, PerfType] = bot.get("perfs", {}) perf: PerfType = perfs.get(game_type, {}) return perf.get("rating", 0) if rating_preference == "high": # A bot with max_rating rating will be twice as likely to get picked than a bot with min_rating rating. reduce_ratings_by = min(min_rating - (max_rating - min_rating), min_rating - 1) weights = [rating(bot) - reduce_ratings_by for bot in online_bots] elif rating_preference == "low": # A bot with min_rating rating will be twice as likely to get picked than a bot with max_rating rating. reduce_ratings_by = max(max_rating - (min_rating - max_rating), max_rating + 1) weights = [reduce_ratings_by - rating(bot) for bot in online_bots] else: weights = [1] * len(online_bots) return weights def choose_opponent(self) -> tuple[str | None, int, int, int, str, str]: """Choose an opponent.""" override_choice = random.choice(self.matchmaking_cfg.overrides.keys() + [None]) logger.info(f"Using the {override_choice or 'default'} matchmaking configuration.") override = {} if override_choice is None else self.matchmaking_cfg.overrides.lookup(override_choice) match_config = self.matchmaking_cfg | override variant = self.get_random_config_value(match_config, "challenge_variant", self.variants) mode = self.get_random_config_value(match_config, "challenge_mode", ["casual", "rated"]) rating_preference = match_config.rating_preference base_time = random.choice(match_config.challenge_initial_time) increment = random.choice(match_config.challenge_increment) num_days = random.choice(match_config.challenge_days) play_correspondence = [bool(num_days), not bool(base_time or increment)] if random.choice(play_correspondence): base_time = 0 increment = 0 else: num_days = 0 game_type = game_category(variant, base_time, increment, num_days) min_rating = match_config.opponent_min_rating max_rating = match_config.opponent_max_rating rating_diff = match_config.opponent_rating_difference bot_rating = self.perf().get(game_type, {}).get("rating", 0) if rating_diff is not None and bot_rating > 0: min_rating = bot_rating - rating_diff max_rating = bot_rating + rating_diff logger.info(f"Seeking {game_type} game with opponent rating in [{min_rating}, {max_rating}] ...") def is_suitable_opponent(bot: UserProfileType) -> bool: perf = bot.get("perfs", {}).get(game_type, {}) return (bot["username"] != self.username() and not self.in_block_list(bot["username"]) and perf.get("games", 0) > 0 and min_rating <= perf.get("rating", 0) <= max_rating) self.online_block_list.refresh() online_bots = self.li.get_online_bots() logger.info(f"Found {len(online_bots)} online bots") online_bots = list(filter(is_suitable_opponent, online_bots)) logger.info(f"Choosing from {len(online_bots)} suitable opponents") def ready_for_challenge(bot: UserProfileType) -> bool: aspects = [variant, game_type, mode] if self.challenge_filter == FilterType.FINE else [] return all(self.should_accept_challenge(bot["username"], aspect) for aspect in aspects) ready_bots = list(filter(ready_for_challenge, online_bots)) online_bots = ready_bots or online_bots bot_username = None weights = self.get_weights(online_bots, rating_preference, min_rating, max_rating, game_type) try: bot = random.choices(online_bots, weights=weights)[0] bot_profile = self.li.get_public_data(bot["username"]) if bot_profile.get("blocking"): self.add_to_block_list(bot["username"]) else: bot_username = bot["username"] except Exception: if online_bots: logger.exception("Error:") else: logger.error("No suitable bots found to challenge.") return bot_username, base_time, increment, num_days, variant, mode def get_random_config_value(self, config: Configuration, parameter: str, choices: list[str]) -> str: """Choose a random value from `choices` if the parameter value in the config is `random`.""" value: str = config.lookup(parameter) return value if value != "random" else random.choice(choices) def challenge(self, active_games: set[str], challenge_queue: MULTIPROCESSING_LIST_TYPE, max_games: int) -> None: """ Challenge an opponent. :param active_games: The games that the bot is playing. :param challenge_queue: The queue containing the challenges. :param max_games: The maximum allowed number of simultaneous games. """ max_games_for_matchmaking = max_games if self.matchmaking_cfg.allow_during_games else min(1, max_games) game_count = len(active_games) + len(challenge_queue) if (game_count >= max_games_for_matchmaking or (game_count > 0 and self.last_challenge_created_delay.time_since_reset() < self.max_wait_time) or not self.should_create_challenge()): return logger.info("Challenging a random bot") self.update_user_profile() bot_username, base_time, increment, days, variant, mode = self.choose_opponent() if not bot_username: logger.info("No challenge will be created.") self.challenge_id = "" self.rate_limit_timer = Timer(seconds(60)) return logger.info(f"Will challenge {bot_username} for a {variant} game.") challenge_id = self.create_challenge(bot_username, base_time, increment, days, variant, mode) logger.info(f"Challenge id is {challenge_id or 'None'}.") self.challenge_id = challenge_id def discard_challenge(self, challenge_id: str) -> None: """ Clear the ID of the most recent challenge if it is no longer needed. :param challenge_id: The ID of the challenge that is expired, accepted, or declined. """ if self.challenge_id == challenge_id: self.challenge_id = "" def game_done(self) -> None: """Reset the timer for when the last game ended, and prints the earliest that the next challenge will be created.""" self.last_game_ended_delay.reset() self.show_earliest_challenge_time() def show_earliest_challenge_time(self) -> None: """Show the earliest that the next challenge will be created.""" if self.matchmaking_cfg.allow_matchmaking: postgame_timeout = self.last_game_ended_delay.time_until_expiration() time_to_next_challenge = self.min_wait_time - self.last_challenge_created_delay.time_since_reset() rate_limit_delay = self.rate_limit_timer.time_until_expiration() time_left = max(postgame_timeout, time_to_next_challenge, rate_limit_delay) earliest_challenge_time = datetime.datetime.now() + time_left logger.info(f"Next challenge will be created after {earliest_challenge_time.strftime('%c')}") def add_to_block_list(self, username: str) -> None: """Add a bot to the blocklist.""" self.add_challenge_filter(username, "", years(10)) def in_block_list(self, username: str) -> bool: """Check if an opponent is in the block list to prevent future challenges.""" return (not self.should_accept_challenge(username, "")) or username in self.online_block_list def add_challenge_filter(self, username: str, game_aspect: str, timeout: datetime.timedelta | None = None) -> None: """ Prevent creating another challenge for a timeout when an opponent has declined a challenge. :param username: The name of the opponent. :param game_aspect: The aspect of a game (time control, chess variant, etc.) that caused the opponent to decline a challenge. If the parameter is empty, that is equivalent to adding the opponent to the block list. :param timeout: The amount of time to not challenge an opponent. If None, the default is a day. """ self.challenge_type_acceptable[(username, game_aspect)] = Timer(timeout or days(1)) def should_accept_challenge(self, username: str, game_aspect: str) -> bool: """ Whether a bot is likely to accept a challenge to a game. :param username: The name of the opponent. :param game_aspect: A category of the challenge type (time control, chess variant, etc.) to test for acceptance. If game_aspect is empty, this is equivalent to checking if the opponent is in the block list. """ return self.challenge_type_acceptable[(username, game_aspect)].is_expired() def accepted_challenge(self, event: EventType) -> None: """ Set the challenge id to an empty string, if the challenge was accepted. Otherwise, we would attempt to cancel the challenge later. """ self.discard_challenge(event["game"]["id"]) def declined_challenge(self, event: EventType) -> None: """ Handle a challenge that was declined by the opponent. Depends on whether `FilterType` is `NONE`, `COARSE`, or `FINE`. """ challenge = model.Challenge(event["challenge"], self.user_profile) opponent = challenge.challenge_target reason = event["challenge"]["declineReason"] logger.info(f"{opponent} declined {challenge}: {reason}") self.discard_challenge(challenge.id) if not challenge.from_self or self.challenge_filter == FilterType.NONE: return mode = "rated" if challenge.rated else "casual" decline_details: dict[str, str] = {"generic": "", "later": "", "nobot": "", "toofast": challenge.speed, "tooslow": challenge.speed, "timecontrol": challenge.speed, "rated": mode, "casual": mode, "standard": challenge.variant, "variant": challenge.variant} reason_key = event["challenge"]["declineReasonKey"].lower() if reason_key not in decline_details: logger.warning(f"Unknown decline reason received: {reason_key}") game_problem = decline_details.get(reason_key, "") if self.challenge_filter == FilterType.FINE else "" self.add_challenge_filter(opponent.name, game_problem) logger.info(f"Will not challenge {opponent} to another {game_problem}".strip() + " game today.") self.show_earliest_challenge_time() def game_category(variant: str, base_time: int, increment: int, num_days: int) -> str: """ Get the game type (e.g. bullet, atomic, classical). Lichess has one rating for every variant regardless of time control. :param variant: The game's variant. :param base_time: The base time in seconds. :param increment: The increment in seconds. :param num_days: If the game is correspondence, we have some days to play the move. :return: The game category. """ game_duration = base_time + increment * 40 if variant != "standard": return variant if num_days: return "correspondence" if game_duration < 179: return "bullet" if game_duration < 479: return "blitz" if game_duration < 1499: return "rapid" return "classical"