| from datetime import datetime, timedelta, timezone |
| import tiktoken |
| from edubotics_core.chat_processor.helpers import update_user_info, convert_to_dict |
|
|
|
|
| def get_time(): |
| return datetime.now(timezone.utc).isoformat() |
|
|
|
|
| async def check_user_cooldown( |
| user_info, current_time, COOLDOWN_TIME, TOKENS_LEFT, REGEN_TIME |
| ): |
| |
| tokens_left = user_info.metadata.get("tokens_left", 0) |
| if tokens_left > 0 and not user_info.metadata.get("in_cooldown", False): |
| return False, None |
|
|
| user_info = convert_to_dict(user_info) |
| last_message_time_str = user_info["metadata"].get("last_message_time") |
|
|
| |
| last_message_time = datetime.fromisoformat(last_message_time_str).replace( |
| tzinfo=timezone.utc |
| ) |
| current_time = datetime.fromisoformat(current_time).replace(tzinfo=timezone.utc) |
|
|
| |
| elapsed_time = current_time - last_message_time |
| elapsed_time_in_seconds = elapsed_time.total_seconds() |
|
|
| |
| cooldown_end_time = last_message_time + timedelta(seconds=COOLDOWN_TIME) |
| cooldown_end_time_iso = cooldown_end_time.isoformat() |
|
|
| |
| if elapsed_time_in_seconds < COOLDOWN_TIME: |
| return True, cooldown_end_time_iso |
|
|
| user_info["metadata"]["in_cooldown"] = False |
| |
| await reset_tokens_for_user(user_info, TOKENS_LEFT, REGEN_TIME) |
|
|
| return False, None |
|
|
|
|
| async def reset_tokens_for_user(user_info, TOKENS_LEFT, REGEN_TIME): |
| user_info = convert_to_dict(user_info) |
| last_message_time_str = user_info["metadata"].get("last_message_time") |
|
|
| last_message_time = datetime.fromisoformat(last_message_time_str).replace( |
| tzinfo=timezone.utc |
| ) |
| current_time = datetime.fromisoformat(get_time()).replace(tzinfo=timezone.utc) |
|
|
| |
| elapsed_time_in_seconds = (current_time - last_message_time).total_seconds() |
|
|
| |
| current_tokens = user_info["metadata"].get("tokens_left_at_last_message", 0) |
| current_tokens = min(current_tokens, TOKENS_LEFT) |
|
|
| |
| max_tokens = user_info["metadata"].get("max_tokens", TOKENS_LEFT) |
|
|
| |
| if current_tokens < max_tokens: |
| |
| |
| regeneration_rate_per_second = ( |
| max_tokens - max(current_tokens, 0) |
| ) / REGEN_TIME |
|
|
| |
| tokens_to_regenerate = int( |
| elapsed_time_in_seconds * regeneration_rate_per_second |
| ) |
|
|
| |
| new_token_count = min(current_tokens + tokens_to_regenerate, max_tokens) |
|
|
| |
| user_info["metadata"]["tokens_left"] = new_token_count |
|
|
| await update_user_info(user_info) |
|
|
|
|
| def get_num_tokens(text, model): |
| encoding = tiktoken.encoding_for_model(model) |
| tokens = encoding.encode(text) |
| return len(tokens) |
|
|