Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import streamlit as st | |
| from json import loads | |
| from re import search, compile | |
| from trycourier import Courier | |
| from secrets import token_urlsafe | |
| from argon2 import PasswordHasher | |
| from argon2.exceptions import VerifyMismatchError | |
| ph = PasswordHasher() | |
| def check_usr_pass(user_log_in_database, user_name: str, password: str) -> bool: | |
| """ | |
| Authenticates the user_name and password. | |
| """ | |
| registered_user = user_log_in_database.select('*').eq('user_name', user_name).execute() | |
| if not registered_user.data: | |
| return False | |
| else: | |
| try: | |
| passwd_verification_bool = ph.verify(registered_user.data[0]['password'], password) | |
| if passwd_verification_bool: | |
| return True | |
| return False | |
| except VerifyMismatchError: | |
| pass | |
| except IndexError: | |
| pass | |
| return False | |
| def check_valid_name(name_sign_up: str) -> bool: | |
| """ | |
| Checks if the user entered a valid name while creating the account. | |
| """ | |
| name_regex_eng = r'^[A-Za-z_]\w *' | |
| name_regex_rus = r'^[А-Яа-я_][А-Яа-я0-9_] *' | |
| if search(name_regex_eng, name_sign_up) or search(name_regex_rus, name_sign_up): | |
| return True | |
| return False | |
| def check_valid_email(email_sign_up: str) -> bool: | |
| """ | |
| Checks if the user entered a valid email while creating the account. | |
| """ | |
| regex = compile(r'([A-Za-z0-9]+[.-_])*[A-Za-z0-9]+@[A-Za-z0-9-]+(\.[A-Z|a-z]{2,})+') | |
| return True | |
| def check_unique_email(user_log_in_database, email_sign_up: str) -> bool: | |
| """ | |
| Checks if the email already exists (since email needs to be unique). | |
| """ | |
| authorized_users_data = user_log_in_database.select('*').eq('email', email_sign_up).execute().data | |
| if len(authorized_users_data) == 0: | |
| return True | |
| return False | |
| def non_empty_str_check(user_name_sign_up: str) -> bool: | |
| """ | |
| Checks for non-empty strings. | |
| """ | |
| empty_count = 0 | |
| for i in user_name_sign_up: | |
| if i == ' ': | |
| empty_count = empty_count + 1 | |
| if empty_count == len(user_name_sign_up): | |
| return False | |
| if not user_name_sign_up: | |
| return False | |
| return True | |
| def check_unique_usr(user_log_in_database, user_name_sign_up: str): | |
| """ | |
| Checks if the user_name already exists (since user_name needs to be unique), | |
| also checks for non-empty user_name. | |
| """ | |
| authorized_users_data = user_log_in_database.select('*').eq('user_name', user_name_sign_up).execute().data | |
| if len(authorized_users_data) != 0: | |
| return False | |
| non_empty_check = non_empty_str_check(user_name_sign_up) | |
| if not non_empty_check: | |
| return None | |
| return True | |
| def register_new_usr(user_log_in_database, name_sign_up: str, email_sign_up: str, user_name_sign_up: str, | |
| password_sign_up: str, professional_level: str, created_at: str) -> None: | |
| """ | |
| Saves the information of the new user in the _secret_auth.json file. | |
| """ | |
| new_usr_data = {'user_name': user_name_sign_up, | |
| 'name': name_sign_up, | |
| 'email': email_sign_up, | |
| 'password': ph.hash(password_sign_up), | |
| 'professional_level': professional_level, | |
| 'created_at': created_at} | |
| return user_log_in_database.insert(new_usr_data).execute() | |
| def check_user_name_exists(user_log_in_database, user_name: str) -> bool: | |
| """ | |
| Checks if the user_name exists in the _secret_auth.json file. | |
| """ | |
| authorized_users_data = user_log_in_database.select('*').eq('user_name', user_name).execute().data | |
| if len(authorized_users_data) == 1: | |
| return True | |
| return False | |
| def check_email_exists(user_log_in_database, email_forgot_passwd: str): | |
| """ | |
| Checks if the email entered is present in the _secret_auth.json file. | |
| """ | |
| authorized_users_data = user_log_in_database.select('*').eq('email', email_forgot_passwd).execute().data | |
| if len(authorized_users_data) == 1: | |
| return True, authorized_users_data[0]['user_name'] | |
| return False, None | |
| def generate_random_passwd() -> str: | |
| """ | |
| Generates a random password to be sent in email. | |
| """ | |
| password_length = 10 | |
| return token_urlsafe(password_length) | |
| def send_passwd_in_email(auth_token: str, user_name_forgot_passwd: str, email_forgot_passwd: str, company_name: str, | |
| random_password: str) -> None: | |
| """ | |
| Triggers an email to the user containing the randomly generated password. | |
| """ | |
| client = Courier(auth_token=auth_token) | |
| client.send_message( | |
| message={ | |
| "to": { | |
| "email": email_forgot_passwd | |
| }, | |
| "content": { | |
| "title": f'{company_name}: Login Password!', | |
| "body": f'Hi! {user_name_forgot_passwd},\n\nYour temporary login password is: {random_password}\n\n' | |
| + '{{info}}' | |
| }, | |
| "data": { | |
| "info": "Please reset your password at the earliest for security reasons." | |
| } | |
| } | |
| ) | |
| def change_passwd(user_log_in_database, email_forgot_passwd: str, random_password: str) -> None: | |
| """ | |
| Replaces the old password with the newly generated password. | |
| """ | |
| updates = {'password': ph.hash(random_password)} | |
| return user_log_in_database.update(updates).eq('email', email_forgot_passwd).execute() | |
| def check_current_passwd(user_log_in_database, email_reset_passwd: str, current_passwd: str = None) -> bool: | |
| """ | |
| Authenticates the password entered against the user_name when | |
| resetting the password. | |
| """ | |
| authorized_user_data = user_log_in_database.select('*').eq('email', email_reset_passwd).execute().data[0] | |
| if current_passwd is None: | |
| current_passwd = 'b' | |
| try: | |
| if ph.verify(authorized_user_data['password'], current_passwd): | |
| return True | |
| except VerifyMismatchError: | |
| pass | |
| return False | |
| def save_data_in_database(user_task_database, save_type, save_name, cefr_level, created_at, creator_name=None, | |
| generated_result=None, test_taker_name=None, test_taker_answers=None, test_taker_result=None, | |
| comments=None, distractor_model=None, allow=False): | |
| already_saved = user_task_database.select('*').execute().data | |
| already_saved_names = [task for task in already_saved | |
| if (task['creator_name']==creator_name | |
| and task['save_name']==save_name | |
| and task['cefr_level']==cefr_level)] | |
| already_saved_tasks = [task for task in already_saved | |
| if (task['creator_name']==creator_name | |
| and task['generated_result']==generated_result | |
| and task['cefr_level']==cefr_level)] | |
| already_saved_tests = [task for task in already_saved | |
| if (task['test_taker_name']==test_taker_name | |
| and task['save_name']==save_name | |
| and task['cefr_level']==cefr_level)] | |
| if save_name == '' and save_type == 'download': | |
| save_name = generated_result['name'] | |
| if len(already_saved_names) != 0 and save_type == 'download': | |
| return st.success('Файл с таким названием уже существует! Введите другое название и повторите попытку.') | |
| elif len(already_saved_tasks) != 0 and save_type == 'download': | |
| return st.error(f'Вы уже сохраняли эти задания под именем {already_saved_tasks[0]["save_name"]}. ') | |
| elif (len(already_saved_tests) != 0 | |
| and save_type == 'online_test'): # and int(test_taker_result) == int(already_saved_tests[0]["user_points"]) | |
| return st.error('Вы уже решали данный тест!') | |
| else: | |
| if save_type == 'download': | |
| new_save_data = { | |
| 'save_type': save_type, | |
| 'save_name': save_name, | |
| 'cefr_level': cefr_level, | |
| 'created_at': created_at, | |
| 'creator_name': creator_name, | |
| 'generated_result': generated_result, | |
| 'distractor_model': distractor_model | |
| } | |
| else: | |
| new_save_data = { | |
| 'save_type': save_type, | |
| 'save_name': save_name, | |
| 'cefr_level': cefr_level, | |
| 'created_at': created_at, | |
| 'creator_name': creator_name, | |
| 'test_taker_name': test_taker_name, | |
| 'test_taker_answers': test_taker_answers, | |
| 'generated_result': generated_result, | |
| 'test_taker_result': test_taker_result, | |
| 'comments': comments} | |
| user_task_database.insert(new_save_data).execute() | |
| if save_type == 'download': | |
| if allow: | |
| return st.success('Задания успешно сохранены! Можете переходить на следующие вкладки') | |
| elif save_type == 'online_test': | |
| return st.success('Ответы успешно сохранены!') | |
| def load_user_tasks_data(user_task_database, save_type, creator_name=None, test_taker_name=None): | |
| if save_type == 'download': | |
| user_data = user_task_database.select('*').eq('creator_name', creator_name).eq('save_type', save_type).execute().data | |
| names = [item['save_name'] for item in user_data] | |
| cefr_level = [item['cefr_level'] for item in user_data] | |
| created_at = [item['created_at'] for item in user_data] | |
| return_data = pd.DataFrame([names, cefr_level, created_at]).transpose() | |
| return_data.columns = ['Название', 'Уровень', 'Время создания'] | |
| else: | |
| user_data = user_task_database.select('*').eq('test_taker_name', test_taker_name).eq('save_type', save_type).execute().data | |
| names = [item['save_name'] for item in user_data] | |
| cefr_level = [item['cefr_level'] for item in user_data] | |
| created_at = [item['created_at'] for item in user_data] | |
| creator_name = [item['creator_name'] for item in user_data] | |
| test_taker_result = [item['test_taker_result'] for item in user_data] | |
| return_data = pd.DataFrame([names, cefr_level, test_taker_result, created_at, creator_name]).transpose() | |
| return_data.columns = ['Название', 'Уровень', 'Оценка', 'Дата прохождения', 'Автор заданий'] | |
| return return_data | |
| def load_users_particular_task(user_task_database, load_mode, creator_name, save_name, cefr_level,): | |
| return_data = user_task_database.select('*').eq('creator_name', creator_name)\ | |
| .eq('save_name', save_name)\ | |
| .eq('save_type', load_mode)\ | |
| .eq('cefr_level',cefr_level).execute().data[0]['generated_result'] | |
| return_data = loads(return_data.replace("'", '"'), strict=False) | |
| return return_data | |