Spaces:
Running
Running
| from pathlib import Path | |
| import sys | |
| import config, dill | |
| from pyrogram.types import InputMediaDocument, Message | |
| import os, random, string, asyncio | |
| from utils.logger import Logger | |
| from datetime import datetime, timezone | |
| import os | |
| import signal | |
| logger = Logger(__name__) | |
| cache_dir = Path("./cache") | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| drive_cache_path = cache_dir / "drive.data" | |
| def getRandomID(): | |
| global DRIVE_DATA | |
| while True: | |
| id = "".join(random.choices(string.ascii_uppercase + string.digits, k=6)) | |
| if not DRIVE_DATA: | |
| return id | |
| if id not in DRIVE_DATA.used_ids: | |
| DRIVE_DATA.used_ids.append(id) | |
| return id | |
| def get_current_utc_time(): | |
| return datetime.now(timezone.utc).strftime("Date - %Y-%m-%d | Time - %H:%M:%S") | |
| class Folder: | |
| def __init__(self, name: str, path: str) -> None: | |
| self.name = name | |
| self.contents = {} | |
| if name == "/": | |
| self.id = "root" | |
| else: | |
| self.id = getRandomID() | |
| self.type = "folder" | |
| self.trash = False | |
| self.path = ("/" + path.strip("/") + "/").replace("//", "/") | |
| self.upload_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| self.auth_hashes = [] | |
| class File: | |
| def __init__( | |
| self, | |
| name: str, | |
| file_id: int, | |
| size: int, | |
| path: str, | |
| ) -> None: | |
| self.name = name | |
| self.file_id = file_id | |
| self.id = getRandomID() | |
| self.size = size | |
| self.type = "file" | |
| self.trash = False | |
| self.path = path[:-1] if path[-1] == "/" else path | |
| self.upload_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| class NewDriveData: | |
| def __init__(self, contents: dict, used_ids: list) -> None: | |
| self.contents = contents | |
| self.used_ids = used_ids | |
| self.isUpdated = False | |
| def save(self) -> None: | |
| with open(drive_cache_path, "wb") as f: | |
| dill.dump(self, f) | |
| self.isUpdated = True | |
| logger.info("Drive data saved successfully.") | |
| def new_folder(self, path: str, name: str) -> None: | |
| logger.info(f"Creating new folder '{name}' in path '{path}'.") | |
| folder = Folder(name, path) | |
| if path == "/": | |
| directory_folder: Folder = self.contents[path] | |
| directory_folder.contents[folder.id] = folder | |
| else: | |
| paths = path.strip("/").split("/") | |
| directory_folder: Folder = self.contents["/"] | |
| for path in paths: | |
| directory_folder = directory_folder.contents[path] | |
| directory_folder.contents[folder.id] = folder | |
| self.save() | |
| return folder.path + folder.id | |
| def new_file(self, path: str, name: str, file_id: int, size: int) -> None: | |
| logger.info(f"Creating new file '{name}' in path '{path}'.") | |
| file = File(name, file_id, size, path) | |
| if path == "/": | |
| directory_folder: Folder = self.contents[path] | |
| directory_folder.contents[file.id] = file | |
| else: | |
| paths = path.strip("/").split("/") | |
| directory_folder: Folder = self.contents["/"] | |
| for path in paths: | |
| directory_folder = directory_folder.contents[path] | |
| directory_folder.contents[file.id] = file | |
| self.save() | |
| def get_directory( | |
| self, path: str, is_admin: bool = True, auth: str = None | |
| ) -> Folder: | |
| folder_data: Folder = self.contents["/"] | |
| auth_success = False | |
| auth_home_path = None | |
| if path != "/": | |
| path = path.strip("/") | |
| if "/" in path: | |
| path = path.split("/") | |
| else: | |
| path = [path] | |
| for folder in path: | |
| folder_data = folder_data.contents[folder] | |
| if auth in folder_data.auth_hashes: | |
| auth_success = True | |
| auth_home_path = ( | |
| "/" + folder_data.path.strip("/") + "/" + folder_data.id | |
| ) | |
| if not is_admin and not auth_success: | |
| logger.warning(f"Unauthorized access attempt to path '{path}'.") | |
| return None | |
| if auth_success: | |
| logger.info(f"Authorization successful for path '{path}'.") | |
| return folder_data, auth_home_path | |
| return folder_data | |
| def get_folder_auth(self, path: str) -> None: | |
| auth = getRandomID() | |
| folder_data: Folder = self.contents["/"] | |
| if path != "/": | |
| path = path.strip("/") | |
| if "/" in path: | |
| path = path.split("/") | |
| else: | |
| path = [path] | |
| for folder in path: | |
| folder_data = folder_data.contents[folder] | |
| folder_data.auth_hashes.append(auth) | |
| self.save() | |
| logger.info(f"Authorization hash generated for path '{path}'.") | |
| return auth | |
| def get_file(self, path) -> File: | |
| if len(path.strip("/").split("/")) > 0: | |
| folder_path = "/" + "/".join(path.strip("/").split("/")[:-1]) | |
| file_id = path.strip("/").split("/")[-1] | |
| else: | |
| folder_path = "/" | |
| file_id = path.strip("/") | |
| folder_data = self.get_directory(folder_path) | |
| return folder_data.contents[file_id] | |
| def rename_file_folder(self, path: str, new_name: str) -> None: | |
| if len(path.strip("/").split("/")) > 0: | |
| folder_path = "/" + "/".join(path.strip("/").split("/")[:-1]) | |
| file_id = path.strip("/").split("/")[-1] | |
| else: | |
| folder_path = "/" | |
| file_id = path.strip("/") | |
| folder_data = self.get_directory(folder_path) | |
| folder_data.contents[file_id].name = new_name | |
| self.save() | |
| logger.info(f"Item at path '{path}' renamed to '{new_name}'.") | |
| def trash_file_folder(self, path: str, trash: bool) -> None: | |
| action = "Trashing" if trash else "Restoring" | |
| if len(path.strip("/").split("/")) > 0: | |
| folder_path = "/" + "/".join(path.strip("/").split("/")[:-1]) | |
| file_id = path.strip("/").split("/")[-1] | |
| else: | |
| folder_path = "/" | |
| file_id = path.strip("/") | |
| folder_data = self.get_directory(folder_path) | |
| folder_data.contents[file_id].trash = trash | |
| self.save() | |
| logger.info(f"Item at path '{path}' {action.lower()} successfully.") | |
| def get_trashed_files_folders(self): | |
| root_dir = self.get_directory("/") | |
| trash_data = {} | |
| def traverse_directory(folder): | |
| for item in folder.contents.values(): | |
| if item.type == "folder": | |
| if item.trash: | |
| trash_data[item.id] = item | |
| else: | |
| # Recursively traverse the subfolder | |
| traverse_directory(item) | |
| elif item.type == "file": | |
| if item.trash: | |
| trash_data[item.id] = item | |
| traverse_directory(root_dir) | |
| return trash_data | |
| def delete_file_folder(self, path: str) -> None: | |
| if len(path.strip("/").split("/")) > 0: | |
| folder_path = "/" + "/".join(path.strip("/").split("/")[:-1]) | |
| file_id = path.strip("/").split("/")[-1] | |
| else: | |
| folder_path = "/" | |
| file_id = path.strip("/") | |
| folder_data = self.get_directory(folder_path) | |
| del folder_data.contents[file_id] | |
| self.save() | |
| logger.info(f"Item at path '{path}' deleted successfully.") | |
| def search_file_folder(self, query: str): | |
| logger.info(f"Searching for items matching query '{query}'.") | |
| root_dir = self.get_directory("/") | |
| search_results = {} | |
| def traverse_directory(folder): | |
| for item in folder.contents.values(): | |
| if query.lower() in item.name.lower(): | |
| search_results[item.id] = item | |
| if item.type == "folder": | |
| traverse_directory(item) | |
| traverse_directory(root_dir) | |
| logger.info(f"Search completed. Found {len(search_results)} matching items.") | |
| return search_results | |
| class NewBotMode: | |
| def __init__(self, drive_data: NewDriveData) -> None: | |
| self.drive_data = drive_data | |
| # Set the current folder to root directory by default | |
| self.current_folder = "/" | |
| self.current_folder_name = "/ (root directory)" | |
| def set_folder(self, folder_path: str, name: str) -> None: | |
| self.current_folder = folder_path | |
| self.current_folder_name = name | |
| self.drive_data.save() | |
| logger.info(f"Current folder set to '{name}' at path '{folder_path}'.") | |
| DRIVE_DATA: NewDriveData = None | |
| BOT_MODE: NewBotMode = None | |
| # Function to backup the drive data to telegram | |
| async def backup_drive_data(loop=True): | |
| global DRIVE_DATA | |
| logger.info("Starting backup drive data task.") | |
| while True: | |
| try: | |
| if not DRIVE_DATA.isUpdated: | |
| if not loop: | |
| break | |
| await asyncio.sleep(config.DATABASE_BACKUP_TIME) | |
| continue | |
| logger.info("Backing up drive data to Telegram.") | |
| from utils.clients import get_client | |
| client = get_client() | |
| time_text = f"๐ **Last Updated :** {get_current_utc_time()} (UTC +00:00)" | |
| caption = ( | |
| f"๐ **TG Drive Data Backup File**\n\n" | |
| "Do not edit or delete this message. This is a backup file for the tg drive data.\n\n" | |
| f"{time_text}" | |
| ) | |
| media_doc = InputMediaDocument(drive_cache_path, caption=caption) | |
| msg = await client.edit_message_media( | |
| config.STORAGE_CHANNEL, | |
| config.DATABASE_BACKUP_MSG_ID, | |
| media=media_doc, | |
| file_name="drive.data", | |
| ) | |
| DRIVE_DATA.isUpdated = False | |
| logger.info("Drive data backed up to Telegram successfully.") | |
| try: | |
| await msg.pin() | |
| except Exception as pin_e: | |
| logger.error(f"Error pinning backup message: {pin_e}") | |
| if not loop: | |
| break | |
| await asyncio.sleep(config.DATABASE_BACKUP_TIME) | |
| except Exception as e: | |
| logger.error(f"Backup Error: {e}") | |
| await asyncio.sleep(10) | |
| async def init_drive_data(): | |
| global DRIVE_DATA | |
| logger.info("Initializing drive data.") | |
| root_dir = DRIVE_DATA.get_directory("/") | |
| if not hasattr(root_dir, "auth_hashes"): | |
| root_dir.auth_hashes = [] | |
| def traverse_directory(folder): | |
| for item in folder.contents.values(): | |
| if item.type == "folder": | |
| traverse_directory(item) | |
| if not hasattr(item, "auth_hashes"): | |
| item.auth_hashes = [] | |
| traverse_directory(root_dir) | |
| DRIVE_DATA.save() | |
| logger.info("Drive data initialization completed.") | |
| async def loadDriveData(): | |
| global DRIVE_DATA, BOT_MODE | |
| logger.info("Loading drive data.") | |
| from utils.clients import get_client | |
| try: | |
| client = get_client() | |
| try: | |
| msg: Message = await client.get_messages( | |
| config.STORAGE_CHANNEL, config.DATABASE_BACKUP_MSG_ID | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error fetching backup message: {e}") | |
| raise | |
| if not msg.document: | |
| logger.error("Backup message does not contain a document") | |
| raise Exception("Backup message does not contain a document") | |
| if msg.document.file_name == "drive.data": | |
| dl_path = await msg.download() | |
| with open(dl_path, "rb") as f: | |
| DRIVE_DATA = dill.load(f) | |
| logger.info("Drive data loaded from Telegram backup.") | |
| else: | |
| raise Exception("Backup drive.data file not found on Telegram.") | |
| except Exception as e: | |
| logger.warning(f"Backup load failed: {e}") | |
| logger.info("Creating new drive.data file.") | |
| DRIVE_DATA = NewDriveData({"/": Folder("/", "/")}, []) | |
| DRIVE_DATA.save() | |
| await init_drive_data() | |
| if config.MAIN_BOT_TOKEN: | |
| from utils.bot_mode import start_bot_mode | |
| BOT_MODE = NewBotMode(DRIVE_DATA) | |
| await start_bot_mode(DRIVE_DATA, BOT_MODE) | |
| logger.info("Bot mode started.") | |
| async def initDriveDataWithoutClients(): | |
| """Initialize DRIVE_DATA without requiring Telegram clients (offline mode)""" | |
| global DRIVE_DATA, BOT_MODE | |
| logger.info("Initializing drive data in offline mode (without Telegram clients).") | |
| # Try to load from local cache if it exists | |
| if drive_cache_path.exists(): | |
| try: | |
| with open(drive_cache_path, "rb") as f: | |
| DRIVE_DATA = dill.load(f) | |
| logger.info("Drive data loaded from local cache.") | |
| except Exception as e: | |
| logger.warning(f"Failed to load from local cache: {e}") | |
| logger.info("Creating new drive.data file.") | |
| DRIVE_DATA = NewDriveData({"/": Folder("/", "/")}, []) | |
| DRIVE_DATA.save() | |
| else: | |
| logger.info("No local cache found. Creating new drive.data file.") | |
| DRIVE_DATA = NewDriveData({"/": Folder("/", "/")}, []) | |
| DRIVE_DATA.save() | |
| await init_drive_data() | |
| logger.info("Drive data initialized in offline mode.") | |