Spaces:
Sleeping
Sleeping
| from request import RequestFile as req | |
| from response import ResponseFile as res | |
| from response import ResponseDefault as res1 | |
| import function.dropbox as sf_dropbox | |
| import os | |
| import shutil | |
| import re | |
| from repository import UserRepository | |
| from function import support_function as sf | |
| ALLOWED_EXTENSIONS = {'csv', 'txt', 'doc', 'docx', 'pdf', 'xlsx', 'pptx', 'json','md'} | |
| regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' | |
| def check_email(email): | |
| if(re.fullmatch(regex, email)): | |
| return True | |
| else: | |
| return False | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| async def listNameFiles(request: req.RequestGetNameFile ): | |
| try: | |
| user_id = request.user_id | |
| email = sf.check_email_service(user_id) | |
| if isinstance(email,res1.ReponseError): | |
| return email | |
| list_files = sf_dropbox.list_files(email) | |
| return res.ResponseGetNameFile( | |
| status= 200, | |
| data = res.DataGetNameFile(files=list_files) | |
| ) | |
| except: | |
| return res.ReponseError( | |
| status=500, | |
| data =res.Message(message="Server Error") | |
| ) | |
| async def deleteFile(request: req.RequestDeleteFile): | |
| try: | |
| user_id = request.user_id | |
| name_file = request.name_file | |
| email = sf.check_email_service(user_id) | |
| if isinstance(email, res1.ReponseError): | |
| return email | |
| if name_file is None or name_file == "": | |
| return res.ReponseError( | |
| status=400, | |
| data =res.Message(message="Name file is empty") | |
| ) | |
| sf_dropbox.delete_file(email,name_file) | |
| return res.ResponseDeleteFile( | |
| status=200, | |
| data =res.Message(message=f"delete {name_file} success") | |
| ) | |
| except: | |
| return res.ReponseError( | |
| status=500, | |
| data =res.Message(message=f"delete {name_file} error") | |
| ) | |
| async def download_folder(request:req.RequestDownLoadFolder): | |
| try: | |
| user_id = request.user_id | |
| email = sf.check_email_service(user_id) | |
| if isinstance(email, res1.ReponseError): | |
| return email | |
| sf_dropbox.download_folder(email) | |
| return res.ResponseDownloadFolder( | |
| status=200, | |
| data =res.Message(message=f"Downloaded folder {email} success") | |
| ) | |
| except: | |
| return res.ReponseError( | |
| status=500, | |
| data =res.Message(message=f"Server error") | |
| ) | |
| async def download_file(request:req.RequestDownLoadFile): | |
| try: | |
| user_id = request.user_id | |
| name_file = request.name_file | |
| email = sf.check_email_service(user_id) | |
| if isinstance(email, res1.ReponseError): | |
| return email | |
| if name_file is None or name_file == "": | |
| return res.ReponseError( | |
| status=400, | |
| data =res.Message(message="name_file is empty") | |
| ) | |
| sf_dropbox.search_and_download_file(name_file,email) | |
| return res.ResponseDownloadFile( | |
| status=200, | |
| data =res.Message(message=f"Downloaded file '{name_file}' by email: '{email}' success") | |
| ) | |
| except: | |
| return res.ReponseError( | |
| status=500, | |
| data =res.Message(message=f"Server error") | |
| ) | |
| ALLOWED_EXTENSIONS = {'csv', 'txt', 'doc', 'docx', 'pdf', 'xlsx', 'pptx', 'json','md'} | |
| def allowed_file1(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| async def upload_files(request: req.RequestUploadFile): | |
| try: | |
| user_id = request.user_id | |
| files = request.files | |
| email = sf.check_email_service(user_id) | |
| if isinstance(email, res1.ReponseError): | |
| return email | |
| for file in files: | |
| if not allowed_file(file.filename): | |
| return res.ReponseError( | |
| status=415, | |
| data =res.Message(message=f"File type not allow") | |
| ) | |
| temp_dir = f"/code/temp/{email}" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| file_path = os.path.join(temp_dir, file.filename) | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| cloud_path = f"/{email}/{file.filename}" | |
| sf_dropbox.upload_file(file_path, cloud_path) | |
| return res.ResponseUploadedFile( | |
| status=200, | |
| data =res.Message(message=f"Load file success") | |
| ) | |
| except: | |
| return res.ReponseError( | |
| status=500, | |
| data =res.Message(message=f"Load file error") | |
| ) | |
| async def deleteAllFile(request: req.RequestDeleteAllFile): | |
| try: | |
| user_id = request.user_id | |
| email = sf.check_email_service(user_id) | |
| if isinstance(email, res.ReponseError): | |
| return email | |
| sf_dropbox.delete_all_files_in_folder(email) | |
| return res.ResponseDeleteAllFile( | |
| status=200, | |
| data=res.Message(message=f"Delete all file success") | |
| ) | |
| except: | |
| return res.ReponseError( | |
| status=500, | |
| data=res.Message(message=f"Delete all file error") | |
| ) | |