| | import re |
| | import time |
| | import pymongo |
| | import motor.motor_asyncio |
| | from bson.objectid import ObjectId |
| | from bson.errors import InvalidId |
| | from bson.json_util import dumps |
| |
|
| | |
| | from FileStream.Exceptions import FIleNotFound |
| | from FileStream.Tools import Time_ISTKolNow |
| |
|
| |
|
| | class Database: |
| |
|
| | def __init__(self, uri, database_name): |
| | self._client = motor.motor_asyncio.AsyncIOMotorClient(uri) |
| | self.db = self._client[database_name] |
| | self.users = self.db.Users |
| | self.files = self.db.Public_Files |
| | self.pfile = self.db.Private_Files |
| | self.web_upload = self.db.Web_Files |
| |
|
| | |
| | |
| |
|
| | def new_user(self, id): |
| | return dict( |
| | telegram_id=id, |
| | access="USER", |
| | tele_status={ |
| | "status": "ACTIVE", |
| | "activity": None, |
| | "joined": Time_ISTKolNow() |
| | }, |
| | file={ |
| | "links": 0, |
| | "private_files": 0, |
| | "public_files": 0, |
| | }, |
| | site_id="None", |
| | site_status={ |
| | "status": None, |
| | "activity": None, |
| | "password": None, |
| | "links": 0, |
| | "joined": "None" |
| | }, |
| | ) |
| | |
| |
|
| | def NewTG_Files(self, details): |
| | return { |
| | "user_id": |
| | details['user_id'] if details['user_id'] else None, |
| | "user_type": |
| | details['user_type'] if details['user_type'] else None, |
| | "message_id": |
| | details['message_id'] if details['message_id'] else None, |
| | "location": |
| | details['location'] if details['location'] else None, |
| | "file": { |
| | "file_id": |
| | details['file']['file_id'] if details['file']['file_id'] else None, |
| | "file_unique_id": |
| | details['file']['file_unique_id'] |
| | if details['file']['file_unique_id'] else None, |
| | "file_name": |
| | details['file']['file_name'] |
| | if details['file']['file_name'] else None, |
| | "file_size": |
| | details['file']['file_size'] |
| | if details['file']['file_size'] else None, |
| | "mime_type": |
| | details['file']['mime_type'] |
| | if details['file']['mime_type'] else None, |
| | "taged_users": {} |
| | }, |
| | "time": |
| | details['time'] if details['time'] else None, |
| | "privacy_type": |
| | details['privacy_type'] if details['privacy_type'] else None, |
| | } |
| |
|
| | |
| |
|
| | async def add_user(self, id): |
| | user = self.new_user(id) |
| | await self.users.insert_one(user) |
| |
|
| | async def add_admin(self, id): |
| | user= await self.get_user(id) |
| | if user: |
| | await self.users.update_one({"_id": user['_id']}, {"$set": {"access":"ADMIN" }}) |
| | else: |
| | user = self.new_user(id) |
| | user['access']="ADMIN" |
| | await self.users.insert_one(user) |
| |
|
| | |
| |
|
| | async def get_user(self, id): |
| | user = await self.users.find_one({'telegram_id': int(id)}) |
| | return user |
| |
|
| | |
| |
|
| | async def total_users_count(self): |
| | count = await self.users.count_documents({}) |
| | return count |
| |
|
| | async def get_all_users(self): |
| | all_users = self.users.find({}) |
| | return all_users |
| |
|
| | async def is_admin(self, user_id): |
| | user = await self.users.find_one({'telegram_id': int(user_id)}) |
| | return True if user['access'] == "ADMIN" else False |
| |
|
| | |
| |
|
| | async def delete_user(self, user_id): |
| | await self.users.delete_many({'telegram_id': int(user_id)}) |
| |
|
| | |
| |
|
| | async def ban_user(self, id): |
| | await self.users.update_one({"_id": ObjectId(_id)}, { |
| | "$set": { |
| | "tele_status": { |
| | "status": "BANNED", |
| | "activity": Time_ISTKolNow() |
| | }, |
| | } |
| | }) |
| |
|
| | async def unban_user(self, id): |
| | await self.users.update_one({"_id": ObjectId(_id)}, { |
| | "$set": { |
| | "tele_status": { |
| | "status": "ACTIVE", |
| | "activity": Time_ISTKolNow(), |
| | } |
| | } |
| | }) |
| |
|
| | async def is_user_banned(self, id): |
| | if await self.users.find_one({'telegram_id': int(id)}): |
| | return True |
| | else: |
| | return False |
| |
|
| | |
| | async def add_file(self, file_info): |
| | file_info["time"] = Time_ISTKolNow() |
| | fetch_old = await self.get_file_by_fileuniqueid(file_info["user_id"], file_info['file']["file_unique_id"]) |
| | if fetch_old: |
| | return fetch_old["_id"] |
| | await self.count_links(file_info["user_id"], "+") |
| | return (await self.files.insert_one(file_info)).inserted_id |
| |
|
| | |
| | async def get_file(self, _id): |
| | try: |
| | file_info = await self.files.find_one({"_id": ObjectId(_id)}) |
| | if not file_info: |
| | print('file not found') |
| | |
| | return file_info |
| | except InvalidId: |
| | raise FIleNotFound |
| |
|
| | async def get_all_files_api(self,range=None): |
| | |
| | files= await self.files.find().to_list(length=None) |
| | |
| | print("\n get_all_files_api : Return Type : ", type(files)) |
| | return files |
| |
|
| | async def get_all_files(self,range=None): |
| | user_files = self.files.find({}) |
| | if range : |
| | user_files.skip(range[0] - 1) |
| | user_files.limit(range[1] - range[0] + 1) |
| | user_files.sort('_id', pymongo.DESCENDING) |
| | return user_files |
| |
|
| | async def find_files(self, user_id, range): |
| | user_files = self.files.find( |
| | {f"file.tagged_users.{user_id}": { |
| | "$exists": True |
| | }}) |
| | user_files.skip(range[0] - 1) |
| | user_files.limit(range[1] - range[0] + 1) |
| | user_files.sort('_id', pymongo.DESCENDING) |
| | total_files = await self.files.count_documents( |
| | {f"file.tagged_users.{user_id}": { |
| | "$exists": True |
| | }}) |
| | return user_files, total_files |
| |
|
| | async def find_all_public_files(self, range): |
| | user_files = self.files.find({"privacy_type": "PUBLIC"}) |
| | user_files.skip(range[0] - 1) |
| | user_files.limit(range[1] - range[0] + 1) |
| | user_files.sort('_id', pymongo.DESCENDING) |
| | total_files = await self.files.count_documents({"privacy_type": "PUBLIC"}) |
| | return user_files, total_files |
| |
|
| | async def find_all_files(self, range): |
| | user_files = self.files.find({}) |
| | user_files.skip(range[0] - 1) |
| | user_files.limit(range[1] - range[0] + 1) |
| | user_files.sort('_id', pymongo.DESCENDING) |
| | total_files = await self.files.count_documents({}) |
| | return user_files, total_files |
| |
|
| | async def find_private_files(self, user_id, range): |
| | |
| | user_files = self.files.find({f"file.tagged_users.{user_id}": "PRIVATE"}) |
| | user_files.skip(range[0] - 1) |
| | user_files.limit(range[1] - range[0] + 1) |
| | user_files.sort('_id', pymongo.DESCENDING) |
| | total_files = await self.files.count_documents( |
| | {"file.tagged_users." + str(user_id): "PRIVATE"}) |
| | return user_files, total_files |
| |
|
| | async def get_file_by_fileuniqueid_only(self, file_unique_id): |
| | return await self.files.find_one({"file.file_unique_id": file_unique_id}) |
| |
|
| | async def get_file_by_fileuniqueid(self, id, file_unique_id): |
| | count = await self.files.count_documents({"user_id":id,"file.file_unique_id":file_unique_id}) |
| | if count == 0: |
| | return False |
| | elif count == 1: |
| | return await self.files.find_one({"user_id": id,"file.file_unique_id": file_unique_id}) |
| | else: |
| | return self.files.find({"user_id": id,"file.file_unique_id": file_unique_id}) |
| |
|
| | |
| |
|
| | async def update_privacy(self, file_details: dict): |
| | file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id']) |
| | |
| | updated_tagged_users = file['file']['tagged_users'].copy() |
| | updated_tagged_users.update(file_details['file']['tagged_users']) |
| | |
| | |
| | |
| | file_details['privacy_type'] = "PRIVATE" if any(value == "PRIVATE" for value in updated_tagged_users.values()) else file_details['privacy_type'] |
| | await self.files.update_one({"_id": file['_id']}, { |
| | "$set": { |
| | "privacy_type": file_details['privacy_type'], |
| | "file.tagged_users": updated_tagged_users |
| | } |
| | }) |
| | return await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id']) |
| |
|
| | async def update_file_ids(self, _id, file_ids: dict): |
| | await self.files.update_one({"_id": ObjectId(_id)}, |
| | {"$set": { |
| | "file_ids": file_ids |
| | }}) |
| |
|
| | async def update_file_info(self, _id, file_info: dict): |
| | await self.files.update_one({"_id": ObjectId(_id)}, { |
| | "$set": { |
| | "message_id": file_info['message_id'], |
| | "location": file_info['location'], |
| | "file": file_info['file'] |
| | } |
| | }) |
| |
|
| |
|
| | |
| | async def get_private_file(self, _id): |
| | try: |
| | file_info = await self.pfile.find_one({"_id": ObjectId(_id)}) |
| | if not file_info: |
| | raise FIleNotFound |
| | return file_info |
| | except InvalidId: |
| | raise FIleNotFound |
| |
|
| | async def add_private_file(self, file_info): |
| | file_info["time"] = Time_ISTKolNow() |
| | fetch_old = await self.get_private_file_by_fileuniqueid_only(file_info['file']["file_unique_id"]) |
| | if fetch_old: |
| | return fetch_old["_id"] |
| | return (await self.pfile.insert_one(file_info)) |
| |
|
| | async def get_private_file_by_fileuniqueid_only(self, file_unique_id): |
| | return await self.pfile.find_one({"file.file_unique_id": file_unique_id}) |
| |
|
| | async def update_private_file_ids(self, _id, file_ids: dict): |
| | await self.pfile.update_one({"_id": ObjectId(_id)}, |
| | {"$set": { |
| | "file_ids": file_ids |
| | }}) |
| |
|
| | async def update_private_privacy(self, file_details: dict, instruction: dict): |
| | file = await self.get_file_by_fileuniqueid_only(file_details['file']['file_unique_id']) |
| | await self.pfile.insert_one(file_details) |
| |
|
| | |
| |
|
| | async def get_search_results(self,query=None, file_type=None, max_results=10, offset=0): |
| | |
| | regex = re.compile(re.escape(query), re.IGNORECASE) |
| | filter = {'$or': [{'file.file_name': {"$regex": regex}}, {'file.caption': {"$regex": regex}}]} |
| |
|
| | if file_type: |
| | filter['mime_type'] = file_type |
| |
|
| | total_results = await self.files.count_documents(filter) |
| | next_offset = offset + max_results |
| |
|
| | if next_offset > total_results: |
| | next_offset = '' |
| | |
| | cursor = self.files.find(filter) |
| | |
| | cursor.sort('$natural', -1) |
| | |
| | cursor.skip(offset).limit(max_results) |
| | |
| | files = await cursor.to_list(length=max_results) |
| | return files, next_offset |
| |
|
| | |
| | async def total_files(self, id=None): |
| | if id: |
| | return await self.files.count_documents({"user_id": id}) |
| | return await self.files.count_documents({}) |
| |
|
| | async def total_privfiles(self, id=None): |
| | if id: |
| | return await self.pfile.count_documents({"user_id": id}) |
| | return await self.pfile.count_documents({}) |
| |
|
| | |
| |
|
| | async def delete_one_file(self, _id): |
| | await self.files.delete_one({'_id': ObjectId(_id)}) |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | async def count_links(self, id, operation: str): |
| | if operation == "-": |
| | await self.users.update_one({"id": id}, {"$inc": {"file.links": -1}}) |
| | elif operation == "+": |
| | await self.users.update_one({"id": id}, {"$inc": {"file.links": 1}}) |
| |
|
| | |
| | async def add_webfile(self, upload_info): |
| | fetch_old = await self.get_web_file(upload_info["dropzone_id"]) |
| | if fetch_old: |
| | return fetch_old |
| | else: |
| | await self.web_upload.insert_one(upload_info) |
| | return await self.get_web_file(upload_info["dropzone_id"]) |
| |
|
| | |
| |
|
| | async def get_web_file(self, upload_id): |
| | file_info = await self.web_upload.find_one({"dropzone_id": upload_id}) |
| | if not file_info: |
| | return None |
| | return file_info |
| |
|
| | async def uploaded_web_file(self, upload_id): |
| | await self.web_upload.delete_one({"dropzone_id": upload_id}) |
| |
|