Spaces:
Sleeping
Sleeping
| from pydantic.error_wrappers import ErrorWrapper | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from service import MySQLService,UserService,ChatService | |
| from request import RequestMySQL,RequestUser,RequestDefault | |
| from auth.authentication import decodeJWT | |
| from repository import UserRepository | |
| from auth import authentication | |
| from datetime import datetime, timedelta | |
| from fastapi import Depends, HTTPException, Form, File, UploadFile | |
| from typing import List | |
| from service import FileService,DefaultService,UserService | |
| from request import RequestFile,RequestChat,RequestDefault | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import JSONResponse | |
| from pydantic.error_wrappers import ErrorWrapper | |
| import json | |
| from function import support_function | |
| from repository import UserRepository | |
| from response import ResponseDefault as res | |
| import re | |
| def is_positive_integer(value): | |
| if isinstance(value, int) and value > 0: | |
| return True | |
| else: | |
| return False | |
| def check_value_user_id_controller(user_id: str): | |
| if user_id is None or user_id.strip() == "": | |
| return res.ReponseError(status=400, | |
| data=res.Message(message="user_id field is required.")) | |
| user_id = user_id.strip("'").strip('"') | |
| try: | |
| user_id_int = int(user_id) | |
| except ValueError: | |
| return res.ReponseError(status=400, | |
| data=res.Message(message="user_id must be an integer")) | |
| if not support_function.is_positive_integer(user_id_int): | |
| return res.ReponseError(status=400, | |
| data=res.Message(message="user_id must be greater than 0")) | |
| return True | |
| def check_value_user_id(user_id: str, current_user_email: str): | |
| if user_id is None or user_id.strip() == "": | |
| return res.ReponseError(status=400, | |
| data=res.Message(message="user_id field is required.")) | |
| user_id = user_id.strip("'").strip('"') | |
| try: | |
| user_id_int = int(user_id) | |
| except ValueError: | |
| return res.ReponseError(status=400, | |
| data=res.Message(message="user_id must be an integer")) | |
| if not support_function.is_positive_integer(user_id_int): | |
| return res.ReponseError(status=400, | |
| data=res.Message(message="user_id must be greater than 0")) | |
| email = UserRepository.getEmailUserByIdFix(user_id) | |
| if email is None: | |
| return res.ReponseError(status=404, | |
| data=res.Message(message="user_id not exist")) | |
| email = email[0] | |
| if email != current_user_email: | |
| raise HTTPException(status_code=403, detail="Sorry, you can't perform actions with this user id.") | |
| return True | |
| def check_value_email_controller(email: str): | |
| if email is None or email.strip() == "": | |
| return res.ReponseError(status = 400, | |
| data = res.Message(message="Email is required.")) | |
| try: | |
| int(email) | |
| return res.ReponseError(status=400, | |
| data=res.Message(message="Email must be a string, not a number.")) | |
| except ValueError: | |
| pass | |
| return True | |
| def check_value_otp(otp: str): | |
| if otp is None: | |
| return res.ReponseError(status=400, | |
| data=res.Message(message="OTP is required")) | |
| if otp.isdigit(): | |
| return res.ReponseError(status=400, | |
| data=res.Message(message="OTP must be a string, not a number.")) | |
| if len(otp) != 6: | |
| return res.ReponseError(status=400, | |
| data=res.Message(message="OTP max length is 6")) | |
| return True | |
| regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' | |
| def check_email(email): | |
| if(re.fullmatch(regex, email)): | |
| return True | |
| else: | |
| return False | |
| def check_email_service(user_id: str): | |
| email1 = UserRepository.getEmailUserByIdFix(user_id) | |
| if email1 is None: | |
| return res.ReponseError( | |
| status=404, | |
| data=res.Message(message="Id not exist") | |
| ) | |
| email = email1[0] | |
| if email is None: | |
| return res.ReponseError( | |
| status=400, | |
| data=res.Message(message="Email is empty") | |
| ) | |
| if check_email(email) == False: | |
| return res.ReponseError( | |
| status=400, | |
| data=res.Message(message="Email invalid") | |
| ) | |
| return email | |
| def check_email_empty_invalid(email: str): | |
| if email is None or email == "": | |
| return res.ReponseError( | |
| status=400, | |
| data=res.Message(message="Email is empty") | |
| ) | |
| if check_email(email) == False: | |
| return res.ReponseError( | |
| status=400, | |
| data =res.Message(message="Email invalid") | |
| ) | |
| return True |