VeloCT_Base / lib /matchmaking.py
Taperx's picture
Deploy Base model with clean official lichess-bot folder
3014f14
Raw
History Blame Contribute Delete
18.3 kB
"""Challenge other bots."""
import random
import logging
import datetime
import contextlib
from lib import model
from lib.timer import Timer, days, seconds, minutes, years
from collections import defaultdict
from collections.abc import Sequence
from lib.lichess import Lichess, RateLimitedError
from lib.config import Configuration
from typing import cast, TypeAlias
from lib.blocklist import OnlineBlocklist
from lib.lichess_types import UserProfileType, PerfType, EventType, FilterType, ChallengeType
MULTIPROCESSING_LIST_TYPE: TypeAlias = Sequence[model.Challenge]
logger = logging.getLogger(__name__)
class Matchmaking:
"""Challenge other bots."""
def __init__(self, li: Lichess, config: Configuration, user_profile: UserProfileType) -> None:
"""Initialize values needed for matchmaking."""
self.li = li
self.variants = list(filter(lambda variant: variant != "fromPosition", config.challenge.variants))
self.matchmaking_cfg = config.matchmaking
self.user_profile = user_profile
self.last_challenge_created_delay = Timer(seconds(25)) # Challenges expire after 20 seconds.
self.last_game_ended_delay = Timer(minutes(self.matchmaking_cfg.challenge_timeout))
self.last_user_profile_update_time = Timer(minutes(5))
self.min_wait_time = seconds(60) # Wait before new challenge to avoid api rate limits.
self.rate_limit_timer = Timer()
# Maximum time between challenges, even if there are active games
self.max_wait_time = minutes(10) if self.matchmaking_cfg.allow_during_games else years(10)
self.challenge_id = ""
# (opponent name, game aspect) --> other bot is likely to accept challenge
# game aspect is the one the challenged bot objects to and is one of:
# - game speed (bullet, blitz, etc.)
# - variant (standard, horde, etc.)
# - casual/rated
# - empty string (if no other reason is given or self.filter_type is COARSE)
self.challenge_type_acceptable: defaultdict[tuple[str, str], Timer] = defaultdict(Timer)
self.challenge_filter = self.matchmaking_cfg.challenge_filter
for name in self.matchmaking_cfg.block_list:
self.add_to_block_list(name)
self.online_block_list = OnlineBlocklist(self.matchmaking_cfg.online_block_list)
def should_create_challenge(self) -> bool:
"""Whether we should create a challenge."""
matchmaking_enabled = self.matchmaking_cfg.allow_matchmaking
rate_limit_ok = self.rate_limit_timer.is_expired()
time_has_passed = self.last_game_ended_delay.is_expired()
challenge_expired = self.last_challenge_created_delay.is_expired() and self.challenge_id
min_wait_time_passed = self.last_challenge_created_delay.time_since_reset() > self.min_wait_time
if challenge_expired:
self.li.cancel(self.challenge_id)
logger.info(f"Challenge id {self.challenge_id} cancelled.")
self.discard_challenge(self.challenge_id)
self.show_earliest_challenge_time()
return bool(matchmaking_enabled and rate_limit_ok and (time_has_passed or challenge_expired) and min_wait_time_passed)
def create_challenge(self, username: str, base_time: int, increment: int, days: int, variant: str,
mode: str) -> str:
"""Create a challenge."""
params: dict[str, str | int | bool] = {"rated": mode == "rated", "variant": variant}
if days:
params["days"] = days
elif base_time or increment:
params["clock.limit"] = base_time
params["clock.increment"] = increment
else:
logger.error("At least one of challenge_days, challenge_initial_time, or challenge_increment "
"must be greater than zero in the matchmaking section of your config file.")
return ""
try:
self.last_challenge_created_delay.reset()
response = self.li.challenge(username, params)
challenge_id = response.get("id", "")
if not challenge_id:
self.handle_challenge_error_response(response, username)
return challenge_id
except RateLimitedError as e:
logger.warning(e)
self.rate_limit_timer = Timer(e.timeout)
except Exception as e:
logger.debug(e, exc_info=e)
logger.warning("Could not create challenge")
self.show_earliest_challenge_time()
return ""
def handle_challenge_error_response(self, response: ChallengeType, username: str) -> None:
"""If a challenge fails, print the error and adjust the challenge requirements in response."""
logger.error(response)
if response.get("bot_is_rate_limited"):
timeout = cast(datetime.timedelta, response.get("rate_limit_timeout"))
self.rate_limit_timer = Timer(timeout)
elif response.get("opponent_is_rate_limited"):
self.add_challenge_filter(username, "", response.get("rate_limit_timeout"))
else:
self.add_challenge_filter(username, "")
self.show_earliest_challenge_time()
def perf(self) -> dict[str, PerfType]:
"""Get the bot's rating in every variant. Bullet, blitz, rapid etc. are considered different variants."""
user_perf: dict[str, PerfType] = self.user_profile["perfs"]
return user_perf
def username(self) -> str:
"""Our username."""
username: str = self.user_profile["username"]
return username
def update_user_profile(self) -> None:
"""Update our user profile data, to get our latest rating."""
if self.last_user_profile_update_time.is_expired():
self.last_user_profile_update_time.reset()
with contextlib.suppress(Exception):
self.user_profile = self.li.get_profile()
def get_weights(self, online_bots: list[UserProfileType], rating_preference: str, min_rating: int, max_rating: int,
game_type: str) -> list[int]:
"""Get the weight for each bot. A higher weights means the bot is more likely to get challenged."""
def rating(bot: UserProfileType) -> int:
perfs: dict[str, PerfType] = bot.get("perfs", {})
perf: PerfType = perfs.get(game_type, {})
return perf.get("rating", 0)
if rating_preference == "high":
# A bot with max_rating rating will be twice as likely to get picked than a bot with min_rating rating.
reduce_ratings_by = min(min_rating - (max_rating - min_rating), min_rating - 1)
weights = [rating(bot) - reduce_ratings_by for bot in online_bots]
elif rating_preference == "low":
# A bot with min_rating rating will be twice as likely to get picked than a bot with max_rating rating.
reduce_ratings_by = max(max_rating - (min_rating - max_rating), max_rating + 1)
weights = [reduce_ratings_by - rating(bot) for bot in online_bots]
else:
weights = [1] * len(online_bots)
return weights
def choose_opponent(self) -> tuple[str | None, int, int, int, str, str]:
"""Choose an opponent."""
override_choice = random.choice(self.matchmaking_cfg.overrides.keys() + [None])
logger.info(f"Using the {override_choice or 'default'} matchmaking configuration.")
override = {} if override_choice is None else self.matchmaking_cfg.overrides.lookup(override_choice)
match_config = self.matchmaking_cfg | override
variant = self.get_random_config_value(match_config, "challenge_variant", self.variants)
mode = self.get_random_config_value(match_config, "challenge_mode", ["casual", "rated"])
rating_preference = match_config.rating_preference
base_time = random.choice(match_config.challenge_initial_time)
increment = random.choice(match_config.challenge_increment)
num_days = random.choice(match_config.challenge_days)
play_correspondence = [bool(num_days), not bool(base_time or increment)]
if random.choice(play_correspondence):
base_time = 0
increment = 0
else:
num_days = 0
game_type = game_category(variant, base_time, increment, num_days)
min_rating = match_config.opponent_min_rating
max_rating = match_config.opponent_max_rating
rating_diff = match_config.opponent_rating_difference
bot_rating = self.perf().get(game_type, {}).get("rating", 0)
if rating_diff is not None and bot_rating > 0:
min_rating = bot_rating - rating_diff
max_rating = bot_rating + rating_diff
logger.info(f"Seeking {game_type} game with opponent rating in [{min_rating}, {max_rating}] ...")
def is_suitable_opponent(bot: UserProfileType) -> bool:
perf = bot.get("perfs", {}).get(game_type, {})
return (bot["username"] != self.username()
and not self.in_block_list(bot["username"])
and perf.get("games", 0) > 0
and min_rating <= perf.get("rating", 0) <= max_rating)
self.online_block_list.refresh()
online_bots = self.li.get_online_bots()
logger.info(f"Found {len(online_bots)} online bots")
online_bots = list(filter(is_suitable_opponent, online_bots))
logger.info(f"Choosing from {len(online_bots)} suitable opponents")
def ready_for_challenge(bot: UserProfileType) -> bool:
aspects = [variant, game_type, mode] if self.challenge_filter == FilterType.FINE else []
return all(self.should_accept_challenge(bot["username"], aspect) for aspect in aspects)
ready_bots = list(filter(ready_for_challenge, online_bots))
online_bots = ready_bots or online_bots
bot_username = None
weights = self.get_weights(online_bots, rating_preference, min_rating, max_rating, game_type)
try:
bot = random.choices(online_bots, weights=weights)[0]
bot_profile = self.li.get_public_data(bot["username"])
if bot_profile.get("blocking"):
self.add_to_block_list(bot["username"])
else:
bot_username = bot["username"]
except Exception:
if online_bots:
logger.exception("Error:")
else:
logger.error("No suitable bots found to challenge.")
return bot_username, base_time, increment, num_days, variant, mode
def get_random_config_value(self, config: Configuration, parameter: str, choices: list[str]) -> str:
"""Choose a random value from `choices` if the parameter value in the config is `random`."""
value: str = config.lookup(parameter)
return value if value != "random" else random.choice(choices)
def challenge(self, active_games: set[str], challenge_queue: MULTIPROCESSING_LIST_TYPE, max_games: int) -> None:
"""
Challenge an opponent.
:param active_games: The games that the bot is playing.
:param challenge_queue: The queue containing the challenges.
:param max_games: The maximum allowed number of simultaneous games.
"""
max_games_for_matchmaking = max_games if self.matchmaking_cfg.allow_during_games else min(1, max_games)
game_count = len(active_games) + len(challenge_queue)
if (game_count >= max_games_for_matchmaking
or (game_count > 0 and self.last_challenge_created_delay.time_since_reset() < self.max_wait_time)
or not self.should_create_challenge()):
return
logger.info("Challenging a random bot")
self.update_user_profile()
bot_username, base_time, increment, days, variant, mode = self.choose_opponent()
if not bot_username:
logger.info("No challenge will be created.")
self.challenge_id = ""
self.rate_limit_timer = Timer(seconds(60))
return
logger.info(f"Will challenge {bot_username} for a {variant} game.")
challenge_id = self.create_challenge(bot_username, base_time, increment, days, variant, mode)
logger.info(f"Challenge id is {challenge_id or 'None'}.")
self.challenge_id = challenge_id
def discard_challenge(self, challenge_id: str) -> None:
"""
Clear the ID of the most recent challenge if it is no longer needed.
:param challenge_id: The ID of the challenge that is expired, accepted, or declined.
"""
if self.challenge_id == challenge_id:
self.challenge_id = ""
def game_done(self) -> None:
"""Reset the timer for when the last game ended, and prints the earliest that the next challenge will be created."""
self.last_game_ended_delay.reset()
self.show_earliest_challenge_time()
def show_earliest_challenge_time(self) -> None:
"""Show the earliest that the next challenge will be created."""
if self.matchmaking_cfg.allow_matchmaking:
postgame_timeout = self.last_game_ended_delay.time_until_expiration()
time_to_next_challenge = self.min_wait_time - self.last_challenge_created_delay.time_since_reset()
rate_limit_delay = self.rate_limit_timer.time_until_expiration()
time_left = max(postgame_timeout, time_to_next_challenge, rate_limit_delay)
earliest_challenge_time = datetime.datetime.now() + time_left
logger.info(f"Next challenge will be created after {earliest_challenge_time.strftime('%c')}")
def add_to_block_list(self, username: str) -> None:
"""Add a bot to the blocklist."""
self.add_challenge_filter(username, "", years(10))
def in_block_list(self, username: str) -> bool:
"""Check if an opponent is in the block list to prevent future challenges."""
return (not self.should_accept_challenge(username, "")) or username in self.online_block_list
def add_challenge_filter(self, username: str, game_aspect: str, timeout: datetime.timedelta | None = None) -> None:
"""
Prevent creating another challenge for a timeout when an opponent has declined a challenge.
:param username: The name of the opponent.
:param game_aspect: The aspect of a game (time control, chess variant, etc.) that caused the opponent to decline a
challenge. If the parameter is empty, that is equivalent to adding the opponent to the block list.
:param timeout: The amount of time to not challenge an opponent. If None, the default is a day.
"""
self.challenge_type_acceptable[(username, game_aspect)] = Timer(timeout or days(1))
def should_accept_challenge(self, username: str, game_aspect: str) -> bool:
"""
Whether a bot is likely to accept a challenge to a game.
:param username: The name of the opponent.
:param game_aspect: A category of the challenge type (time control, chess variant, etc.) to test for acceptance.
If game_aspect is empty, this is equivalent to checking if the opponent is in the block list.
"""
return self.challenge_type_acceptable[(username, game_aspect)].is_expired()
def accepted_challenge(self, event: EventType) -> None:
"""
Set the challenge id to an empty string, if the challenge was accepted.
Otherwise, we would attempt to cancel the challenge later.
"""
self.discard_challenge(event["game"]["id"])
def declined_challenge(self, event: EventType) -> None:
"""
Handle a challenge that was declined by the opponent.
Depends on whether `FilterType` is `NONE`, `COARSE`, or `FINE`.
"""
challenge = model.Challenge(event["challenge"], self.user_profile)
opponent = challenge.challenge_target
reason = event["challenge"]["declineReason"]
logger.info(f"{opponent} declined {challenge}: {reason}")
self.discard_challenge(challenge.id)
if not challenge.from_self or self.challenge_filter == FilterType.NONE:
return
mode = "rated" if challenge.rated else "casual"
decline_details: dict[str, str] = {"generic": "",
"later": "",
"nobot": "",
"toofast": challenge.speed,
"tooslow": challenge.speed,
"timecontrol": challenge.speed,
"rated": mode,
"casual": mode,
"standard": challenge.variant,
"variant": challenge.variant}
reason_key = event["challenge"]["declineReasonKey"].lower()
if reason_key not in decline_details:
logger.warning(f"Unknown decline reason received: {reason_key}")
game_problem = decline_details.get(reason_key, "") if self.challenge_filter == FilterType.FINE else ""
self.add_challenge_filter(opponent.name, game_problem)
logger.info(f"Will not challenge {opponent} to another {game_problem}".strip() + " game today.")
self.show_earliest_challenge_time()
def game_category(variant: str, base_time: int, increment: int, num_days: int) -> str:
"""
Get the game type (e.g. bullet, atomic, classical). Lichess has one rating for every variant regardless of time control.
:param variant: The game's variant.
:param base_time: The base time in seconds.
:param increment: The increment in seconds.
:param num_days: If the game is correspondence, we have some days to play the move.
:return: The game category.
"""
game_duration = base_time + increment * 40
if variant != "standard":
return variant
if num_days:
return "correspondence"
if game_duration < 179:
return "bullet"
if game_duration < 479:
return "blitz"
if game_duration < 1499:
return "rapid"
return "classical"