File size: 5,166 Bytes
e1a2a92 5359a05 1f18aa5 5359a05 1f18aa5 5359a05 e1a2a92 5359a05 e1a2a92 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
import mimetypes
from urllib.parse import unquote_plus
import re
import urllib.parse
from pathlib import Path
from config import WEBSITE_URL
import asyncio, aiohttp
from utils.directoryHandler import get_current_utc_time, getRandomID
from utils.logger import Logger
logger = Logger(__name__)
def convert_class_to_dict(data, isObject, showtrash=False):
if isObject == True:
data = data.__dict__.copy()
new_data = {"contents": {}}
for key in data["contents"]:
if data["contents"][key].trash == showtrash:
if data["contents"][key].type == "folder":
folder = data["contents"][key]
new_data["contents"][key] = {
"name": folder.name,
"type": folder.type,
"id": folder.id,
"path": folder.path,
"upload_date": folder.upload_date,
}
else:
file = data["contents"][key]
new_data["contents"][key] = {
"name": file.name,
"type": file.type,
"size": file.size,
"id": file.id,
"path": file.path,
"upload_date": file.upload_date,
}
return new_data
async def auto_ping_website():
if WEBSITE_URL is not None:
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get(WEBSITE_URL) as response:
if response.status == 200:
logger.info(f"Pinged website at {get_current_utc_time()}")
else:
logger.warning(f"Failed to ping website: {response.status}")
except Exception as e:
logger.warning(f"Failed to ping website: {e}")
await asyncio.sleep(60) # Ping website every minute
import shutil
def reset_cache_dir():
cache_dir = Path("./cache")
downloads_dir = Path("./downloads")
# Save session files before deleting cache (but skip invalidated ones)
session_files = {}
if cache_dir.exists():
for session_file in cache_dir.glob("*.session"):
# Check if this session file was marked as invalidated
invalidated_marker = cache_dir / f"{session_file.name}.invalidated"
if invalidated_marker.exists():
logger.warning(f"Skipping invalidated session file: {session_file.name} (was marked as AUTH_KEY_DUPLICATED)")
# Delete the marker file so it can be retried on next restart
invalidated_marker.unlink(missing_ok=True)
continue
session_files[session_file.name] = session_file.read_bytes()
logger.info(f"Preserving session file: {session_file.name}")
# Delete cache and downloads directories
shutil.rmtree(cache_dir, ignore_errors=True)
shutil.rmtree(downloads_dir, ignore_errors=True)
cache_dir.mkdir(parents=True, exist_ok=True)
downloads_dir.mkdir(parents=True, exist_ok=True)
# Restore session files
for session_name, session_data in session_files.items():
(cache_dir / session_name).write_bytes(session_data)
logger.info(f"Restored session file: {session_name}")
logger.info("Cache and downloads directory reset (session files preserved)")
def parse_content_disposition(content_disposition):
# Split the content disposition into parts
parts = content_disposition.split(";")
# Initialize filename variable
filename = None
# Loop through parts to find the filename
for part in parts:
part = part.strip()
if part.startswith("filename="):
# If filename is found
filename = part.split("=", 1)[1]
elif part.startswith("filename*="):
# If filename* is found
match = re.match(r"filename\*=(\S*)''(.*)", part)
if match:
encoding, value = match.groups()
try:
filename = urllib.parse.unquote(value, encoding=encoding)
except ValueError:
# Handle invalid encoding
pass
if filename is None:
raise Exception("Failed to get filename")
return filename
def get_filename(headers, url):
try:
if headers.get("Content-Disposition"):
filename = parse_content_disposition(headers["Content-Disposition"])
else:
filename = unquote_plus(url.strip("/").split("/")[-1])
filename = filename.strip(' "')
except:
filename = unquote_plus(url.strip("/").split("/")[-1])
filename = filename.strip()
if filename == "" or "." not in filename:
if headers.get("Content-Type"):
extension = mimetypes.guess_extension(headers["Content-Type"])
if extension:
filename = f"{getRandomID()}{extension}"
else:
filename = getRandomID()
else:
filename = getRandomID()
return filename
|