|
|
import mimetypes |
|
|
from urllib.parse import unquote_plus |
|
|
import re |
|
|
import urllib.parse |
|
|
from pathlib import Path |
|
|
from config import WEBSITE_URL |
|
|
import asyncio, aiohttp |
|
|
from utils.directoryHandler import get_current_utc_time, getRandomID |
|
|
from utils.logger import Logger |
|
|
|
|
|
logger = Logger(__name__) |
|
|
|
|
|
|
|
|
def convert_class_to_dict(data, isObject, showtrash=False): |
|
|
if isObject == True: |
|
|
data = data.__dict__.copy() |
|
|
new_data = {"contents": {}} |
|
|
|
|
|
for key in data["contents"]: |
|
|
if data["contents"][key].trash == showtrash: |
|
|
if data["contents"][key].type == "folder": |
|
|
folder = data["contents"][key] |
|
|
new_data["contents"][key] = { |
|
|
"name": folder.name, |
|
|
"type": folder.type, |
|
|
"id": folder.id, |
|
|
"path": folder.path, |
|
|
"upload_date": folder.upload_date, |
|
|
} |
|
|
else: |
|
|
file = data["contents"][key] |
|
|
new_data["contents"][key] = { |
|
|
"name": file.name, |
|
|
"type": file.type, |
|
|
"size": file.size, |
|
|
"id": file.id, |
|
|
"path": file.path, |
|
|
"upload_date": file.upload_date, |
|
|
} |
|
|
return new_data |
|
|
|
|
|
|
|
|
async def auto_ping_website(): |
|
|
if WEBSITE_URL is not None: |
|
|
async with aiohttp.ClientSession() as session: |
|
|
while True: |
|
|
try: |
|
|
async with session.get(WEBSITE_URL) as response: |
|
|
if response.status == 200: |
|
|
logger.info(f"Pinged website at {get_current_utc_time()}") |
|
|
else: |
|
|
logger.warning(f"Failed to ping website: {response.status}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to ping website: {e}") |
|
|
|
|
|
await asyncio.sleep(60) |
|
|
|
|
|
|
|
|
import shutil |
|
|
|
|
|
|
|
|
def reset_cache_dir(): |
|
|
cache_dir = Path("./cache") |
|
|
downloads_dir = Path("./downloads") |
|
|
|
|
|
|
|
|
session_files = {} |
|
|
if cache_dir.exists(): |
|
|
for session_file in cache_dir.glob("*.session"): |
|
|
|
|
|
invalidated_marker = cache_dir / f"{session_file.name}.invalidated" |
|
|
if invalidated_marker.exists(): |
|
|
logger.warning(f"Skipping invalidated session file: {session_file.name} (was marked as AUTH_KEY_DUPLICATED)") |
|
|
|
|
|
invalidated_marker.unlink(missing_ok=True) |
|
|
continue |
|
|
session_files[session_file.name] = session_file.read_bytes() |
|
|
logger.info(f"Preserving session file: {session_file.name}") |
|
|
|
|
|
|
|
|
shutil.rmtree(cache_dir, ignore_errors=True) |
|
|
shutil.rmtree(downloads_dir, ignore_errors=True) |
|
|
cache_dir.mkdir(parents=True, exist_ok=True) |
|
|
downloads_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
for session_name, session_data in session_files.items(): |
|
|
(cache_dir / session_name).write_bytes(session_data) |
|
|
logger.info(f"Restored session file: {session_name}") |
|
|
|
|
|
logger.info("Cache and downloads directory reset (session files preserved)") |
|
|
|
|
|
|
|
|
def parse_content_disposition(content_disposition): |
|
|
|
|
|
parts = content_disposition.split(";") |
|
|
|
|
|
|
|
|
filename = None |
|
|
|
|
|
|
|
|
for part in parts: |
|
|
part = part.strip() |
|
|
if part.startswith("filename="): |
|
|
|
|
|
filename = part.split("=", 1)[1] |
|
|
elif part.startswith("filename*="): |
|
|
|
|
|
match = re.match(r"filename\*=(\S*)''(.*)", part) |
|
|
if match: |
|
|
encoding, value = match.groups() |
|
|
try: |
|
|
filename = urllib.parse.unquote(value, encoding=encoding) |
|
|
except ValueError: |
|
|
|
|
|
pass |
|
|
|
|
|
if filename is None: |
|
|
raise Exception("Failed to get filename") |
|
|
return filename |
|
|
|
|
|
|
|
|
def get_filename(headers, url): |
|
|
try: |
|
|
if headers.get("Content-Disposition"): |
|
|
filename = parse_content_disposition(headers["Content-Disposition"]) |
|
|
else: |
|
|
filename = unquote_plus(url.strip("/").split("/")[-1]) |
|
|
|
|
|
filename = filename.strip(' "') |
|
|
except: |
|
|
filename = unquote_plus(url.strip("/").split("/")[-1]) |
|
|
|
|
|
filename = filename.strip() |
|
|
|
|
|
if filename == "" or "." not in filename: |
|
|
if headers.get("Content-Type"): |
|
|
extension = mimetypes.guess_extension(headers["Content-Type"]) |
|
|
if extension: |
|
|
filename = f"{getRandomID()}{extension}" |
|
|
else: |
|
|
filename = getRandomID() |
|
|
else: |
|
|
filename = getRandomID() |
|
|
|
|
|
return filename |
|
|
|