Spaces:
Sleeping
Sleeping
| import mimetypes | |
| from urllib.parse import unquote_plus | |
| import re | |
| import urllib.parse | |
| from pathlib import Path | |
| from config import WEBSITE_URL | |
| import asyncio, aiohttp | |
| from utils.directoryHandler import get_current_utc_time, getRandomID | |
| from utils.logger import Logger | |
| logger = Logger(__name__) | |
| def convert_class_to_dict(data, isObject, showtrash=False): | |
| if isObject == True: | |
| data = data.__dict__.copy() | |
| new_data = {"contents": {}} | |
| for key in data["contents"]: | |
| if data["contents"][key].trash == showtrash: | |
| if data["contents"][key].type == "folder": | |
| folder = data["contents"][key] | |
| new_data["contents"][key] = { | |
| "name": folder.name, | |
| "type": folder.type, | |
| "id": folder.id, | |
| "path": folder.path, | |
| "upload_date": folder.upload_date, | |
| } | |
| else: | |
| file = data["contents"][key] | |
| new_data["contents"][key] = { | |
| "name": file.name, | |
| "type": file.type, | |
| "size": file.size, | |
| "id": file.id, | |
| "path": file.path, | |
| "upload_date": file.upload_date, | |
| } | |
| return new_data | |
| async def auto_ping_website(): | |
| if WEBSITE_URL is not None: | |
| async with aiohttp.ClientSession() as session: | |
| while True: | |
| try: | |
| async with session.get(WEBSITE_URL) as response: | |
| if response.status == 200: | |
| logger.info(f"Pinged website at {get_current_utc_time()}") | |
| else: | |
| logger.warning(f"Failed to ping website: {response.status}") | |
| except Exception as e: | |
| logger.warning(f"Failed to ping website: {e}") | |
| await asyncio.sleep(60) # Ping website every minute | |
| import shutil | |
| def reset_cache_dir(): | |
| cache_dir = Path("./cache") | |
| downloads_dir = Path("./downloads") | |
| # Save session files before deleting cache (but skip invalidated ones) | |
| session_files = {} | |
| if cache_dir.exists(): | |
| for session_file in cache_dir.glob("*.session"): | |
| # Check if this session file was marked as invalidated | |
| invalidated_marker = cache_dir / f"{session_file.name}.invalidated" | |
| if invalidated_marker.exists(): | |
| logger.warning(f"Skipping invalidated session file: {session_file.name} (was marked as AUTH_KEY_DUPLICATED)") | |
| # Delete the marker file so it can be retried on next restart | |
| invalidated_marker.unlink(missing_ok=True) | |
| continue | |
| session_files[session_file.name] = session_file.read_bytes() | |
| logger.info(f"Preserving session file: {session_file.name}") | |
| # Delete cache and downloads directories | |
| shutil.rmtree(cache_dir, ignore_errors=True) | |
| shutil.rmtree(downloads_dir, ignore_errors=True) | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| downloads_dir.mkdir(parents=True, exist_ok=True) | |
| # Restore session files | |
| for session_name, session_data in session_files.items(): | |
| (cache_dir / session_name).write_bytes(session_data) | |
| logger.info(f"Restored session file: {session_name}") | |
| logger.info("Cache and downloads directory reset (session files preserved)") | |
| def parse_content_disposition(content_disposition): | |
| # Split the content disposition into parts | |
| parts = content_disposition.split(";") | |
| # Initialize filename variable | |
| filename = None | |
| # Loop through parts to find the filename | |
| for part in parts: | |
| part = part.strip() | |
| if part.startswith("filename="): | |
| # If filename is found | |
| filename = part.split("=", 1)[1] | |
| elif part.startswith("filename*="): | |
| # If filename* is found | |
| match = re.match(r"filename\*=(\S*)''(.*)", part) | |
| if match: | |
| encoding, value = match.groups() | |
| try: | |
| filename = urllib.parse.unquote(value, encoding=encoding) | |
| except ValueError: | |
| # Handle invalid encoding | |
| pass | |
| if filename is None: | |
| raise Exception("Failed to get filename") | |
| return filename | |
| def get_filename(headers, url): | |
| try: | |
| if headers.get("Content-Disposition"): | |
| filename = parse_content_disposition(headers["Content-Disposition"]) | |
| else: | |
| filename = unquote_plus(url.strip("/").split("/")[-1]) | |
| filename = filename.strip(' "') | |
| except: | |
| filename = unquote_plus(url.strip("/").split("/")[-1]) | |
| filename = filename.strip() | |
| if filename == "" or "." not in filename: | |
| if headers.get("Content-Type"): | |
| extension = mimetypes.guess_extension(headers["Content-Type"]) | |
| if extension: | |
| filename = f"{getRandomID()}{extension}" | |
| else: | |
| filename = getRandomID() | |
| else: | |
| filename = getRandomID() | |
| return filename | |