Spaces:
Sleeping
Sleeping
File size: 2,530 Bytes
3f48026 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | # Copyright (C) @TheSmartBisnu
# Channel: https://t.me/itsSmartDev
import os
import shutil
from typing import Optional
from logger import LOGGER
SIZE_UNITS = ["B", "KB", "MB", "GB", "TB", "PB"]
def get_download_path(folder_id: int, filename: str, root_dir: str = "downloads") -> str:
folder = os.path.join(root_dir, str(folder_id))
os.makedirs(folder, exist_ok=True)
return os.path.join(folder, filename)
def cleanup_download(path: str) -> None:
try:
LOGGER(__name__).info(f"Cleaning Download: {path}")
if os.path.exists(path):
os.remove(path)
if os.path.exists(path + ".temp"):
os.remove(path + ".temp")
folder = os.path.dirname(path)
if os.path.isdir(folder) and not os.listdir(folder):
os.rmdir(folder)
except Exception as e:
LOGGER(__name__).error(f"Cleanup failed for {path}: {e}")
def cleanup_downloads_root(root_dir: str = "downloads") -> tuple[int, int]:
if not os.path.isdir(root_dir):
return 0, 0
file_count = 0
total_size = 0
for dirpath, _, filenames in os.walk(root_dir):
for name in filenames:
file_count += 1
try:
total_size += os.path.getsize(os.path.join(dirpath, name))
except OSError:
pass
shutil.rmtree(root_dir, ignore_errors=True)
return file_count, total_size
def get_readable_file_size(size_in_bytes: Optional[float]) -> str:
if size_in_bytes is None or size_in_bytes < 0:
return "0B"
for unit in SIZE_UNITS:
if size_in_bytes < 1024:
return f"{size_in_bytes:.2f} {unit}"
size_in_bytes /= 1024
return "File too large"
def get_readable_time(seconds: int) -> str:
result = ""
days, remainder = divmod(seconds, 86400)
if int(days):
result += f"{int(days)}d"
hours, remainder = divmod(remainder, 3600)
if int(hours):
result += f"{int(hours)}h"
minutes, seconds = divmod(remainder, 60)
if int(minutes):
result += f"{int(minutes)}m"
result += f"{int(seconds)}s"
return result
async def fileSizeLimit(file_size, message, action_type="download", is_premium=False):
MAX_FILE_SIZE = 2 * 2_097_152_000 if is_premium else 2_097_152_000
if file_size > MAX_FILE_SIZE:
await message.reply(
f"The file size exceeds the {get_readable_file_size(MAX_FILE_SIZE)} "
f"limit and cannot be {action_type}ed."
)
return False
return True
|