| | import mimetypes |
| | from urllib.parse import unquote_plus |
| | import re |
| | import urllib.parse |
| | from pathlib import Path |
| | from config import WEBSITE_URL |
| | import asyncio, aiohttp |
| | from utils.directoryHandler import get_current_utc_time, getRandomID |
| | from utils.logger import Logger |
| |
|
| | logger = Logger(__name__) |
| |
|
| |
|
| | def convert_class_to_dict(data, isObject, showtrash=False): |
| | if isObject == True: |
| | data = data.__dict__.copy() |
| | new_data = {"contents": {}} |
| |
|
| | for key in data["contents"]: |
| | if data["contents"][key].trash == showtrash: |
| | if data["contents"][key].type == "folder": |
| | folder = data["contents"][key] |
| | new_data["contents"][key] = { |
| | "name": folder.name, |
| | "type": folder.type, |
| | "id": folder.id, |
| | "path": folder.path, |
| | "upload_date": folder.upload_date, |
| | } |
| | else: |
| | file = data["contents"][key] |
| | new_data["contents"][key] = { |
| | "name": file.name, |
| | "type": file.type, |
| | "size": file.size, |
| | "id": file.id, |
| | "path": file.path, |
| | "upload_date": file.upload_date, |
| | } |
| | return new_data |
| |
|
| |
|
| | async def auto_ping_website(): |
| | if WEBSITE_URL is not None: |
| | async with aiohttp.ClientSession() as session: |
| | while True: |
| | try: |
| | async with session.get(WEBSITE_URL) as response: |
| | if response.status == 200: |
| | logger.info(f"Pinged website at {get_current_utc_time()}") |
| | else: |
| | logger.warning(f"Failed to ping website: {response.status}") |
| | except Exception as e: |
| | logger.warning(f"Failed to ping website: {e}") |
| |
|
| | await asyncio.sleep(60) |
| |
|
| |
|
| | import shutil |
| |
|
| |
|
| | def reset_cache_dir(): |
| | cache_dir = Path("./cache") |
| | downloads_dir = Path("./downloads") |
| | |
| | |
| | session_files = {} |
| | if cache_dir.exists(): |
| | for session_file in cache_dir.glob("*.session"): |
| | |
| | invalidated_marker = cache_dir / f"{session_file.name}.invalidated" |
| | if invalidated_marker.exists(): |
| | logger.warning(f"Skipping invalidated session file: {session_file.name} (was marked as AUTH_KEY_DUPLICATED)") |
| | |
| | invalidated_marker.unlink(missing_ok=True) |
| | continue |
| | session_files[session_file.name] = session_file.read_bytes() |
| | logger.info(f"Preserving session file: {session_file.name}") |
| | |
| | |
| | shutil.rmtree(cache_dir, ignore_errors=True) |
| | shutil.rmtree(downloads_dir, ignore_errors=True) |
| | cache_dir.mkdir(parents=True, exist_ok=True) |
| | downloads_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | for session_name, session_data in session_files.items(): |
| | (cache_dir / session_name).write_bytes(session_data) |
| | logger.info(f"Restored session file: {session_name}") |
| | |
| | logger.info("Cache and downloads directory reset (session files preserved)") |
| |
|
| |
|
| | def parse_content_disposition(content_disposition): |
| | |
| | parts = content_disposition.split(";") |
| |
|
| | |
| | filename = None |
| |
|
| | |
| | for part in parts: |
| | part = part.strip() |
| | if part.startswith("filename="): |
| | |
| | filename = part.split("=", 1)[1] |
| | elif part.startswith("filename*="): |
| | |
| | match = re.match(r"filename\*=(\S*)''(.*)", part) |
| | if match: |
| | encoding, value = match.groups() |
| | try: |
| | filename = urllib.parse.unquote(value, encoding=encoding) |
| | except ValueError: |
| | |
| | pass |
| |
|
| | if filename is None: |
| | raise Exception("Failed to get filename") |
| | return filename |
| |
|
| |
|
| | def get_filename(headers, url): |
| | try: |
| | if headers.get("Content-Disposition"): |
| | filename = parse_content_disposition(headers["Content-Disposition"]) |
| | else: |
| | filename = unquote_plus(url.strip("/").split("/")[-1]) |
| |
|
| | filename = filename.strip(' "') |
| | except: |
| | filename = unquote_plus(url.strip("/").split("/")[-1]) |
| |
|
| | filename = filename.strip() |
| |
|
| | if filename == "" or "." not in filename: |
| | if headers.get("Content-Type"): |
| | extension = mimetypes.guess_extension(headers["Content-Type"]) |
| | if extension: |
| | filename = f"{getRandomID()}{extension}" |
| | else: |
| | filename = getRandomID() |
| | else: |
| | filename = getRandomID() |
| |
|
| | return filename |
| |
|